diff options
author | Kali Kaneko <kali@leap.se> | 2014-01-23 02:36:38 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-01-28 19:38:44 -0400 |
commit | 1a8e3d51fbbaca219f96efd768c5980f4eb566ac (patch) | |
tree | 8a02d197575d187774e1206f31c9b539c0cdc056 /mail | |
parent | 06c9b95a4e92a7f43f1e91ffcb718aebfe9c3c7d (diff) |
add soledadstore class
move parts-related bits to messageparts
pass soledad in initialization for memory messages
Diffstat (limited to 'mail')
-rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 29 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 185 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 183 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messages.py | 16 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 4 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 237 |
6 files changed, 446 insertions, 208 deletions
diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 9babe6b..5e16b4b 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -37,8 +37,8 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type from leap.mail.decorators import deferred from leap.mail.imap.fields import WithMsgFields, fields -from leap.mail.imap.memorystore import MessageDict from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.parser import MBoxParser logger = logging.getLogger(__name__) @@ -549,10 +549,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): #sequence = True if uid == 0 else False messages_asked = self._bound_seq(messages_asked) - print "asked: ", messages_asked seq_messg = self._filter_msg_seq(messages_asked) - - print "seq: ", seq_messg getmsg = lambda uid: self.messages.get_msg_by_uid(uid) # for sequence numbers (uid = 0) @@ -791,29 +788,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): logger.debug("Tried to copy a MSG with no fdoc") return - #old_mbox = fdoc.content[self.MBOX_KEY] - #old_uid = fdoc.content[self.UID_KEY] - #old_key = old_mbox, old_uid - #print "copying from OLD MBOX ", old_mbox - - # XXX bit doubt... to duplicate in memory - # or not to...? - # I think it should be ok to duplicate as long as we're - # careful at the hour of writes... - # We could use also proxies, but it will break when - # the original mailbox is flushed. - - # XXX DEBUG ---------------------------------------- - #print "copying MESSAGE from %s (%s) to %s (%s)" % ( - #msg._mbox, msg._uid, self.mbox, uid_next) - new_fdoc = copy.deepcopy(fdoc.content) new_fdoc[self.UID_KEY] = uid_next new_fdoc[self.MBOX_KEY] = self.mbox - self._memstore.put(self.mbox, uid_next, MessageDict( - new_fdoc, hdoc.content)) + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper( + new_fdoc, hdoc.content)) - # XXX use memory store + # XXX use memory store !!! if hasattr(hdoc, 'doc_id'): self.messages.add_hdocset_docid(hdoc.doc_id) diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index b8829e0..7cb361f 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -21,187 +21,20 @@ import contextlib import logging import weakref -from collections import namedtuple - from twisted.internet.task import LoopingCall from zope.interface import implements from leap.mail import size from leap.mail.messageflow import MessageProducer -from leap.mail.messageparts import MessagePartType from leap.mail.imap import interfaces from leap.mail.imap.fields import fields +from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) -""" -A MessagePartDoc is a light wrapper around the dictionary-like -data that we pass along for message parts. It can be used almost everywhere -that you would expect a SoledadDocument, since it has a dict under the -`content` attribute. - -We also keep some metadata on it, relative in part to the message as a whole, -and sometimes to a part in particular only. - -* `new` indicates that the document has just been created. SoledadStore - should just create a new doc for all the related message parts. -* `store` indicates the type of store a given MessagePartDoc lives in. - We currently use this to indicate that the document comes from memeory, - but we should probably get rid of it as soon as we extend the use of the - SoledadStore interface along LeapMessage, MessageCollection and Mailbox. -* `part` is one of the MessagePartType enums. - -* `dirty` indicates that, while we already have the document in Soledad, - we have modified its state in memory, so we need to put_doc instead while - dumping the MemoryStore contents. - `dirty` attribute would only apply to flags-docs and linkage-docs. - - - XXX this is still not implemented! - -""" - -MessagePartDoc = namedtuple( - 'MessagePartDoc', - ['new', 'dirty', 'part', 'store', 'content']) - - -class ReferenciableDict(dict): - """ - A dict that can be weak-referenced. - - Some builtin objects are not weak-referenciable unless - subclassed. So we do. - - Used to return pointers to the items in the MemoryStore. - """ - - -class MessageWrapper(object): - """ - A simple nested dictionary container around the different message subparts. - """ - implements(interfaces.IMessageContainer) - - FDOC = "fdoc" - HDOC = "hdoc" - CDOCS = "cdocs" - - # XXX can use this to limit the memory footprint, - # or is it too premature to optimize? - # Does it work well together with the interfaces.implements? - - #__slots__ = ["_dict", "_new", "_dirty", "memstore"] - - def __init__(self, fdoc=None, hdoc=None, cdocs=None, - from_dict=None, memstore=None, - new=True, dirty=False): - self._dict = {} - - self._new = new - self._dirty = dirty - self.memstore = memstore - - if from_dict is not None: - self.from_dict(from_dict) - else: - if fdoc is not None: - self._dict[self.FDOC] = ReferenciableDict(fdoc) - if hdoc is not None: - self._dict[self.HDOC] = ReferenciableDict(hdoc) - if cdocs is not None: - self._dict[self.CDOCS] = ReferenciableDict(cdocs) - - # properties - - @property - def new(self): - return self._new - - def set_new(self, value=True): - self._new = value - - @property - def dirty(self): - return self._dirty - - def set_dirty(self, value=True): - self._dirty = value - - # IMessageContainer - - @property - def fdoc(self): - _fdoc = self._dict.get(self.FDOC, None) - if _fdoc: - content_ref = weakref.proxy(_fdoc) - else: - logger.warning("NO FDOC!!!") - content_ref = {} - return MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.fdoc, - content=content_ref) - - @property - def hdoc(self): - _hdoc = self._dict.get(self.HDOC, None) - if _hdoc: - content_ref = weakref.proxy(_hdoc) - else: - logger.warning("NO HDOC!!!!") - content_ref = {} - return MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.hdoc, - content=content_ref) - - @property - def cdocs(self): - _cdocs = self._dict.get(self.CDOCS, None) - if _cdocs: - return weakref.proxy(_cdocs) - else: - return {} - - def walk(self): - """ - Generator that iterates through all the parts, returning - MessagePartDoc. - """ - yield self.fdoc - yield self.hdoc - for cdoc in self.cdocs.values(): - # XXX this will break ---- - content_ref = weakref.proxy(cdoc) - yield MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.cdoc, - content=content_ref) - - # i/o - - def as_dict(self): - """ - Return a dict representation of the parts contained. - """ - return self._dict - - def from_dict(self, msg_dict): - """ - Populate MessageWrapper parts from a dictionary. - It expects the same format that we use in a - MessageWrapper. - """ - fdoc, hdoc, cdocs = map( - lambda part: msg_dict.get(part, None), - [self.FDOC, self.HDOC, self.CDOCS]) - self._dict[self.FDOC] = fdoc - self._dict[self.HDOC] = hdoc - self._dict[self.CDOCS] = cdocs - - @contextlib.contextmanager def set_bool_flag(obj, att): """ @@ -232,8 +65,8 @@ class MemoryStore(object): writes to the permanent storage is controled by the write_period parameter in the constructor. """ - implements(interfaces.IMessageStore) - implements(interfaces.IMessageStoreWriter) + implements(interfaces.IMessageStore, + interfaces.IMessageStoreWriter) producer = None @@ -332,7 +165,7 @@ class MemoryStore(object): print "saving cdoc" cdoc = self._msg_store[key]['cdocs'][cdoc_key] - # XXX this should be done in the MessageWrapper constructor + # FIXME this should be done in the MessageWrapper constructor # instead... # first we make it weak-referenciable referenciable_cdoc = ReferenciableDict(cdoc) @@ -399,10 +232,8 @@ class MemoryStore(object): """ Get the highest UID for a given mbox. """ - # XXX should get from msg_store keys instead! - if not self._new: - return 0 - return max(self.get_uids(mbox)) + uids = self.get_uids(mbox) + return uids and max(uids) or 0 def count_new_mbox(self, mbox): """ diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index a47ea1d..3f89193 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -20,6 +20,9 @@ MessagePart implementation. Used from LeapMessage. import logging import re import StringIO +import weakref + +from collections import namedtuple from enum import Enum from zope.interface import implements @@ -27,6 +30,7 @@ from twisted.mail import imap4 from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset +from leap.mail.imap import interfaces from leap.mail.imap.fields import fields from leap.mail.utils import first @@ -36,13 +40,188 @@ MessagePartType = Enum("hdoc", "fdoc", "cdoc") logger = logging.getLogger(__name__) +# XXX not needed anymoar ... CHARSET_PATTERN = r"""charset=([\w-]+)""" CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE) +""" +A MessagePartDoc is a light wrapper around the dictionary-like +data that we pass along for message parts. It can be used almost everywhere +that you would expect a SoledadDocument, since it has a dict under the +`content` attribute. + +We also keep some metadata on it, relative in part to the message as a whole, +and sometimes to a part in particular only. + +* `new` indicates that the document has just been created. SoledadStore + should just create a new doc for all the related message parts. +* `store` indicates the type of store a given MessagePartDoc lives in. + We currently use this to indicate that the document comes from memeory, + but we should probably get rid of it as soon as we extend the use of the + SoledadStore interface along LeapMessage, MessageCollection and Mailbox. +* `part` is one of the MessagePartType enums. + +* `dirty` indicates that, while we already have the document in Soledad, + we have modified its state in memory, so we need to put_doc instead while + dumping the MemoryStore contents. + `dirty` attribute would only apply to flags-docs and linkage-docs. + + + XXX this is still not implemented! + +""" + +MessagePartDoc = namedtuple( + 'MessagePartDoc', + ['new', 'dirty', 'part', 'store', 'content']) + + +class ReferenciableDict(dict): + """ + A dict that can be weak-referenced. + + Some builtin objects are not weak-referenciable unless + subclassed. So we do. + + Used to return pointers to the items in the MemoryStore. + """ + + +class MessageWrapper(object): + """ + A simple nested dictionary container around the different message subparts. + """ + implements(interfaces.IMessageContainer) + + FDOC = "fdoc" + HDOC = "hdoc" + CDOCS = "cdocs" + + # XXX can use this to limit the memory footprint, + # or is it too premature to optimize? + # Does it work well together with the interfaces.implements? + + #__slots__ = ["_dict", "_new", "_dirty", "memstore"] + + def __init__(self, fdoc=None, hdoc=None, cdocs=None, + from_dict=None, memstore=None, + new=True, dirty=False): + self._dict = {} + self.memstore = memstore + + self._new = new + self._dirty = dirty + self._storetype = "mem" + + if from_dict is not None: + self.from_dict(from_dict) + else: + if fdoc is not None: + self._dict[self.FDOC] = ReferenciableDict(fdoc) + if hdoc is not None: + self._dict[self.HDOC] = ReferenciableDict(hdoc) + if cdocs is not None: + self._dict[self.CDOCS] = ReferenciableDict(cdocs) + + # properties + + @property + def new(self): + return self._new + + def set_new(self, value=True): + self._new = value + + @property + def dirty(self): + return self._dirty + + def set_dirty(self, value=True): + self._dirty = value + + # IMessageContainer + + @property + def fdoc(self): + _fdoc = self._dict.get(self.FDOC, None) + if _fdoc: + content_ref = weakref.proxy(_fdoc) + else: + logger.warning("NO FDOC!!!") + content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.fdoc, + content=content_ref) + + @property + def hdoc(self): + _hdoc = self._dict.get(self.HDOC, None) + if _hdoc: + content_ref = weakref.proxy(_hdoc) + else: + logger.warning("NO HDOC!!!!") + content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.hdoc, + content=content_ref) + + @property + def cdocs(self): + _cdocs = self._dict.get(self.CDOCS, None) + if _cdocs: + return weakref.proxy(_cdocs) + else: + return {} + + def walk(self): + """ + Generator that iterates through all the parts, returning + MessagePartDoc. + """ + yield self.fdoc + yield self.hdoc + for cdoc in self.cdocs.values(): + # XXX this will break ---- + #content_ref = weakref.proxy(cdoc) + #yield MessagePartDoc(new=self.new, dirty=self.dirty, + #store=self._storetype, + #part=MessagePartType.cdoc, + #content=content_ref) + + # the put is handling this for us, so + # we already have stored a MessagePartDoc + # but we should really do it while adding in the + # constructor or the from_dict method + yield cdoc + + # i/o + + def as_dict(self): + """ + Return a dict representation of the parts contained. + """ + return self._dict + + def from_dict(self, msg_dict): + """ + Populate MessageWrapper parts from a dictionary. + It expects the same format that we use in a + MessageWrapper. + """ + fdoc, hdoc, cdocs = map( + lambda part: msg_dict.get(part, None), + [self.FDOC, self.HDOC, self.CDOCS]) + self._dict[self.FDOC] = fdoc + self._dict[self.HDOC] = hdoc + self._dict[self.CDOCS] = cdocs + class MessagePart(object): """ - IMessagePart implementor. + IMessagePart implementor, to be passed to several methods + of the IMAP4Server. It takes a subpart message and is able to find the inner parts. @@ -117,6 +296,8 @@ class MessagePart(object): payload = str("") if payload: + # XXX use find_charset instead -------------------------- + # bad rebase??? content_type = self._get_ctype_from_document(phash) charset = first(CHARSET_RE.findall(content_type)) logger.debug("Got charset from header: %s" % (charset,)) diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 67e5a41..46c9dc9 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -41,7 +41,7 @@ from leap.mail.utils import first, find_charset, lowerdict from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields -from leap.mail.imap.memorystore import MessageDict +from leap.mail.imap.memorystore import MessageWrapper from leap.mail.imap.parser import MailParser, MBoxParser logger = logging.getLogger(__name__) @@ -984,7 +984,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO ---- add reference to original doc, to be deleted # after writes are done. - msg_container = MessageDict(fd, hd, cdocs) + msg_container = MessageWrapper(fd, hd, cdocs) self._memstore.create_message(self.mbox, uid, msg_container) def _remove_cb(self, result): @@ -1215,6 +1215,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # and we cannot find it otherwise. This seems to be enough. # XXX do a deferLater instead ?? + # FIXME this won't be needed after the CHECK command is implemented. time.sleep(0.3) return self._get_uid_from_msgidCb(msgid) @@ -1233,11 +1234,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: LeapMessage """ print "getting msg by id!" - msg_container = self._memstore.get(self.mbox, uid) + msg_container = self._memstore.get_message(self.mbox, uid) print "msg container", msg_container if msg_container is not None: print "getting LeapMessage (from memstore)" - msg = LeapMessage(None, uid, self.mbox, collection=self, + # We pass a reference to soledad just to be able to retrieve + # missing parts that cannot be found in the container, like + # the content docs after a copy. + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self, container=msg_container) print "got msg:", msg else: @@ -1309,7 +1313,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ Return a dict with all flags documents for this mailbox. """ - # XXX get all from memstore and cahce it there + # XXX get all from memstore and cache it there all_flags = dict((( doc.content[self.UID_KEY], doc.content[self.FLAGS_KEY]) for doc in @@ -1319,7 +1323,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): if self._memstore is not None: # XXX uids = self._memstore.get_uids(self.mbox) - fdocs = [(uid, self._memstore.get(self.mbox, uid).fdoc) + fdocs = [(uid, self._memstore.get_message(self.mbox, uid).fdoc) for uid in uids] for uid, doc in fdocs: all_flags[uid] = doc.content[self.FLAGS_KEY] diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 3f99da6..8350988 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -32,6 +32,7 @@ from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.fetch import LeapIncomingMail from leap.mail.imap.memorystore import MemoryStore from leap.mail.imap.server import LeapIMAPServer +from leap.mail.imap.soledadstore import SoledadStore from leap.soledad.client import Soledad # The default port in which imap service will run @@ -96,7 +97,8 @@ class LeapIMAPFactory(ServerFactory): self._uuid = uuid self._userid = userid self._soledad = soledad - self._memstore = MemoryStore() + self._memstore = MemoryStore( + permanent_store=SoledadStore(soledad)) theAccount = SoledadBackedAccount( uuid, soledad=soledad, diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py new file mode 100644 index 0000000..62a3c53 --- /dev/null +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +# soledadstore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A MessageStore that writes to Soledad. +""" +import logging + +from u1db import errors as u1db_errors +from zope.interface import implements + +from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.fields import fields +from leap.mail.imap.interfaces import IMessageStore +from leap.mail.messageflow import IMessageConsumer + +logger = logging.getLogger(__name__) + + +class ContentDedup(object): + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + sqlcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same attachment twice, only the hash of it. + 2. We will not store the same message body twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + + def _header_does_exist(self, doc): + """ + Check whether we already have a header document for this + content hash in our database. + + :param doc: tentative header document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + header_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + if not header_docs: + return False + + if len(header_docs) != 1: + logger.warning("Found more than one copy of chash %s!" + % (chash,)) + logger.debug("Found header doc with that hash! Skipping save!") + return True + + def _content_does_exist(self, doc): + """ + Check whether we already have a content document for a payload + with this hash in our database. + + :param doc: tentative content document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + + +class SoledadStore(ContentDedup): + """ + This will create docs in the local Soledad database. + """ + + implements(IMessageConsumer, IMessageStore) + + def __init__(self, soledad): + """ + Initialize the writer. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + # IMessageStore + + # ------------------------------------------------------------------- + # We are not yet using this interface, but it would make sense + # to implement it. + + def create_message(self, mbox, uid, message): + """ + Create the passed message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def put_message(self, mbox, uid, message): + """ + Put the passed existing message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def remove_message(self, mbox, uid): + """ + Remove the given message from this SoledadStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + """ + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + """ + + # IMessageConsumer + + def consume(self, queue): + """ + Creates a new document in soledad db. + + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue + """ + # TODO should delete the original message from incoming after + # the writes are done. + # TODO should handle the delete case + # TODO should handle errors + + empty = queue.empty() + while not empty: + for item, call in self._process(queue): + self._try_call(call, item) + empty = queue.empty() + + # + # SoledadStore specific methods. + # + + def _process(self, queue): + """ + Return the item and the proper call type for the next + item in the queue if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + msg_wrapper = queue.get() + return self._get_calls_for_msg_parts(msg_wrapper) + + def _try_call(self, call, item): + """ + Try to invoke a given call with item as a parameter. + """ + if not call: + return + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.error("Error: %r" % (exc,)) + raise exc + + def _get_calls_for_msg_parts(self, msg_wrapper): + """ + Return the proper call type for a given item. + + :param msg_wrapper: A MessageWrapper + :type msg_wrapper: IMessageContainer + """ + call = None + + if msg_wrapper.new is True: + call = self._soledad.create_doc + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + if item.part == MessagePartType.fdoc: + yield dict(item.content), call + + if item.part == MessagePartType.hdoc: + if not self._header_does_exist(item.content): + yield dict(item.content), call + + if item.part == MessagePartType.cdoc: + if self._content_does_exist(item.content): + yield dict(item.content), call + + # TODO should check for elements with the dirty state + # TODO if new == False and dirty == True, put_doc + # XXX for puts, we will have to retrieve + # the document, change the content, and + # pass the whole document under "content" + else: + logger.error("Cannot put documents yet!") |