diff options
| author | Kali Kaneko <kali@leap.se> | 2014-01-23 02:36:38 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-01-28 19:38:44 -0400 | 
| commit | 1a8e3d51fbbaca219f96efd768c5980f4eb566ac (patch) | |
| tree | 8a02d197575d187774e1206f31c9b539c0cdc056 | |
| parent | 06c9b95a4e92a7f43f1e91ffcb718aebfe9c3c7d (diff) | |
add soledadstore class
move parts-related bits to messageparts
pass soledad in initialization for memory messages
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 29 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 185 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 183 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 16 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 4 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 237 | 
6 files changed, 446 insertions, 208 deletions
| diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 9babe6b..5e16b4b 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -37,8 +37,8 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.mail.decorators import deferred  from leap.mail.imap.fields import WithMsgFields, fields -from leap.mail.imap.memorystore import MessageDict  from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.messageparts import MessageWrapper  from leap.mail.imap.parser import MBoxParser  logger = logging.getLogger(__name__) @@ -549,10 +549,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          #sequence = True if uid == 0 else False          messages_asked = self._bound_seq(messages_asked) -        print "asked: ", messages_asked          seq_messg = self._filter_msg_seq(messages_asked) - -        print "seq: ", seq_messg          getmsg = lambda uid: self.messages.get_msg_by_uid(uid)          # for sequence numbers (uid = 0) @@ -791,29 +788,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              logger.debug("Tried to copy a MSG with no fdoc")              return -        #old_mbox = fdoc.content[self.MBOX_KEY] -        #old_uid = fdoc.content[self.UID_KEY] -        #old_key = old_mbox, old_uid -        #print "copying from OLD MBOX ", old_mbox - -        # XXX bit doubt... to duplicate in memory -        # or not to...? -        # I think it should be ok to duplicate as long as we're -        # careful at the hour of writes... -        # We could use also proxies, but it will break when -        # the original mailbox is flushed. - -        # XXX DEBUG ---------------------------------------- -        #print "copying MESSAGE from %s (%s) to %s (%s)" % ( -            #msg._mbox, msg._uid, self.mbox, uid_next) -          new_fdoc = copy.deepcopy(fdoc.content)          new_fdoc[self.UID_KEY] = uid_next          new_fdoc[self.MBOX_KEY] = self.mbox -        self._memstore.put(self.mbox, uid_next, MessageDict( -            new_fdoc, hdoc.content)) +        self._memstore.create_message( +            self.mbox, uid_next, +            MessageWrapper( +                new_fdoc, hdoc.content)) -        # XXX use memory store +        # XXX use memory store !!!          if hasattr(hdoc, 'doc_id'):              self.messages.add_hdocset_docid(hdoc.doc_id) diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index b8829e0..7cb361f 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -21,187 +21,20 @@ import contextlib  import logging  import weakref -from collections import namedtuple -  from twisted.internet.task import LoopingCall  from zope.interface import implements  from leap.mail import size  from leap.mail.messageflow import MessageProducer -from leap.mail.messageparts import MessagePartType  from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields +from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import ReferenciableDict  logger = logging.getLogger(__name__) -""" -A MessagePartDoc is a light wrapper around the dictionary-like -data that we pass along for message parts. It can be used almost everywhere -that you would expect a SoledadDocument, since it has a dict under the -`content` attribute. - -We also keep some metadata on it, relative in part to the message as a whole, -and sometimes to a part in particular only. - -* `new` indicates that the document has just been created. SoledadStore -  should just create a new doc for all the related message parts. -* `store` indicates the type of store a given MessagePartDoc lives in. -  We currently use this to indicate that  the document comes from memeory, -  but we should probably get rid of it as soon as we extend the use of the -  SoledadStore interface along LeapMessage, MessageCollection and Mailbox. -* `part` is one of the MessagePartType enums. - -* `dirty` indicates that, while we already have the document in Soledad, -  we have modified its state in memory, so we need to put_doc instead while -  dumping the MemoryStore contents. -  `dirty` attribute would only apply to flags-docs and linkage-docs. - - -  XXX this is still not implemented! - -""" - -MessagePartDoc = namedtuple( -    'MessagePartDoc', -    ['new', 'dirty', 'part', 'store', 'content']) - - -class ReferenciableDict(dict): -    """ -    A dict that can be weak-referenced. - -    Some builtin objects are not weak-referenciable unless -    subclassed. So we do. - -    Used to return pointers to the items in the MemoryStore. -    """ - - -class MessageWrapper(object): -    """ -    A simple nested dictionary container around the different message subparts. -    """ -    implements(interfaces.IMessageContainer) - -    FDOC = "fdoc" -    HDOC = "hdoc" -    CDOCS = "cdocs" - -    # XXX can use this to limit the memory footprint, -    # or is it too premature to optimize? -    # Does it work well together with the interfaces.implements? - -    #__slots__ = ["_dict", "_new", "_dirty", "memstore"] - -    def __init__(self, fdoc=None, hdoc=None, cdocs=None, -                 from_dict=None, memstore=None, -                 new=True, dirty=False): -        self._dict = {} - -        self._new = new -        self._dirty = dirty -        self.memstore = memstore - -        if from_dict is not None: -            self.from_dict(from_dict) -        else: -            if fdoc is not None: -                self._dict[self.FDOC] = ReferenciableDict(fdoc) -            if hdoc is not None: -                self._dict[self.HDOC] = ReferenciableDict(hdoc) -            if cdocs is not None: -                self._dict[self.CDOCS] = ReferenciableDict(cdocs) - -    # properties - -    @property -    def new(self): -        return self._new - -    def set_new(self, value=True): -        self._new = value - -    @property -    def dirty(self): -        return self._dirty - -    def set_dirty(self, value=True): -        self._dirty = value - -    # IMessageContainer - -    @property -    def fdoc(self): -        _fdoc = self._dict.get(self.FDOC, None) -        if _fdoc: -            content_ref = weakref.proxy(_fdoc) -        else: -            logger.warning("NO FDOC!!!") -            content_ref = {} -        return MessagePartDoc(new=self.new, dirty=self.dirty, -                              store=self._storetype, -                              part=MessagePartType.fdoc, -                              content=content_ref) - -    @property -    def hdoc(self): -        _hdoc = self._dict.get(self.HDOC, None) -        if _hdoc: -            content_ref = weakref.proxy(_hdoc) -        else: -            logger.warning("NO HDOC!!!!") -            content_ref = {} -        return MessagePartDoc(new=self.new, dirty=self.dirty, -                              store=self._storetype, -                              part=MessagePartType.hdoc, -                              content=content_ref) - -    @property -    def cdocs(self): -        _cdocs = self._dict.get(self.CDOCS, None) -        if _cdocs: -            return weakref.proxy(_cdocs) -        else: -            return {} - -    def walk(self): -        """ -        Generator that iterates through all the parts, returning -        MessagePartDoc. -        """ -        yield self.fdoc -        yield self.hdoc -        for cdoc in self.cdocs.values(): -            # XXX this will break ---- -            content_ref = weakref.proxy(cdoc) -            yield MessagePartDoc(new=self.new, dirty=self.dirty, -                                 store=self._storetype, -                                 part=MessagePartType.cdoc, -                                 content=content_ref) - -    # i/o - -    def as_dict(self): -        """ -        Return a dict representation of the parts contained. -        """ -        return self._dict - -    def from_dict(self, msg_dict): -        """ -        Populate MessageWrapper parts from a dictionary. -        It expects the same format that we use in a -        MessageWrapper. -        """ -        fdoc, hdoc, cdocs = map( -            lambda part: msg_dict.get(part, None), -            [self.FDOC, self.HDOC, self.CDOCS]) -        self._dict[self.FDOC] = fdoc -        self._dict[self.HDOC] = hdoc -        self._dict[self.CDOCS] = cdocs - -  @contextlib.contextmanager  def set_bool_flag(obj, att):      """ @@ -232,8 +65,8 @@ class MemoryStore(object):      writes to the permanent storage is controled by the write_period parameter      in the constructor.      """ -    implements(interfaces.IMessageStore) -    implements(interfaces.IMessageStoreWriter) +    implements(interfaces.IMessageStore, +               interfaces.IMessageStoreWriter)      producer = None @@ -332,7 +165,7 @@ class MemoryStore(object):              print "saving cdoc"              cdoc = self._msg_store[key]['cdocs'][cdoc_key] -            # XXX this should be done in the MessageWrapper constructor +            # FIXME this should be done in the MessageWrapper constructor              # instead...              # first we make it weak-referenciable              referenciable_cdoc = ReferenciableDict(cdoc) @@ -399,10 +232,8 @@ class MemoryStore(object):          """          Get the highest UID for a given mbox.          """ -        # XXX should get from msg_store keys instead! -        if not self._new: -            return 0 -        return max(self.get_uids(mbox)) +        uids = self.get_uids(mbox) +        return uids and max(uids) or 0      def count_new_mbox(self, mbox):          """ diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index a47ea1d..3f89193 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -20,6 +20,9 @@ MessagePart implementation. Used from LeapMessage.  import logging  import re  import StringIO +import weakref + +from collections import namedtuple  from enum import Enum  from zope.interface import implements @@ -27,6 +30,7 @@ from twisted.mail import imap4  from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset +from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields  from leap.mail.utils import first @@ -36,13 +40,188 @@ MessagePartType = Enum("hdoc", "fdoc", "cdoc")  logger = logging.getLogger(__name__) +# XXX not needed anymoar ...  CHARSET_PATTERN = r"""charset=([\w-]+)"""  CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE) +""" +A MessagePartDoc is a light wrapper around the dictionary-like +data that we pass along for message parts. It can be used almost everywhere +that you would expect a SoledadDocument, since it has a dict under the +`content` attribute. + +We also keep some metadata on it, relative in part to the message as a whole, +and sometimes to a part in particular only. + +* `new` indicates that the document has just been created. SoledadStore +  should just create a new doc for all the related message parts. +* `store` indicates the type of store a given MessagePartDoc lives in. +  We currently use this to indicate that  the document comes from memeory, +  but we should probably get rid of it as soon as we extend the use of the +  SoledadStore interface along LeapMessage, MessageCollection and Mailbox. +* `part` is one of the MessagePartType enums. + +* `dirty` indicates that, while we already have the document in Soledad, +  we have modified its state in memory, so we need to put_doc instead while +  dumping the MemoryStore contents. +  `dirty` attribute would only apply to flags-docs and linkage-docs. + + +  XXX this is still not implemented! + +""" + +MessagePartDoc = namedtuple( +    'MessagePartDoc', +    ['new', 'dirty', 'part', 'store', 'content']) + + +class ReferenciableDict(dict): +    """ +    A dict that can be weak-referenced. + +    Some builtin objects are not weak-referenciable unless +    subclassed. So we do. + +    Used to return pointers to the items in the MemoryStore. +    """ + + +class MessageWrapper(object): +    """ +    A simple nested dictionary container around the different message subparts. +    """ +    implements(interfaces.IMessageContainer) + +    FDOC = "fdoc" +    HDOC = "hdoc" +    CDOCS = "cdocs" + +    # XXX can use this to limit the memory footprint, +    # or is it too premature to optimize? +    # Does it work well together with the interfaces.implements? + +    #__slots__ = ["_dict", "_new", "_dirty", "memstore"] + +    def __init__(self, fdoc=None, hdoc=None, cdocs=None, +                 from_dict=None, memstore=None, +                 new=True, dirty=False): +        self._dict = {} +        self.memstore = memstore + +        self._new = new +        self._dirty = dirty +        self._storetype = "mem" + +        if from_dict is not None: +            self.from_dict(from_dict) +        else: +            if fdoc is not None: +                self._dict[self.FDOC] = ReferenciableDict(fdoc) +            if hdoc is not None: +                self._dict[self.HDOC] = ReferenciableDict(hdoc) +            if cdocs is not None: +                self._dict[self.CDOCS] = ReferenciableDict(cdocs) + +    # properties + +    @property +    def new(self): +        return self._new + +    def set_new(self, value=True): +        self._new = value + +    @property +    def dirty(self): +        return self._dirty + +    def set_dirty(self, value=True): +        self._dirty = value + +    # IMessageContainer + +    @property +    def fdoc(self): +        _fdoc = self._dict.get(self.FDOC, None) +        if _fdoc: +            content_ref = weakref.proxy(_fdoc) +        else: +            logger.warning("NO FDOC!!!") +            content_ref = {} +        return MessagePartDoc(new=self.new, dirty=self.dirty, +                              store=self._storetype, +                              part=MessagePartType.fdoc, +                              content=content_ref) + +    @property +    def hdoc(self): +        _hdoc = self._dict.get(self.HDOC, None) +        if _hdoc: +            content_ref = weakref.proxy(_hdoc) +        else: +            logger.warning("NO HDOC!!!!") +            content_ref = {} +        return MessagePartDoc(new=self.new, dirty=self.dirty, +                              store=self._storetype, +                              part=MessagePartType.hdoc, +                              content=content_ref) + +    @property +    def cdocs(self): +        _cdocs = self._dict.get(self.CDOCS, None) +        if _cdocs: +            return weakref.proxy(_cdocs) +        else: +            return {} + +    def walk(self): +        """ +        Generator that iterates through all the parts, returning +        MessagePartDoc. +        """ +        yield self.fdoc +        yield self.hdoc +        for cdoc in self.cdocs.values(): +            # XXX this will break ---- +            #content_ref = weakref.proxy(cdoc) +            #yield MessagePartDoc(new=self.new, dirty=self.dirty, +                                 #store=self._storetype, +                                 #part=MessagePartType.cdoc, +                                 #content=content_ref) + +            # the put is handling this for us, so +            # we already have stored a MessagePartDoc +            # but we should really do it while adding in the +            # constructor or the from_dict method +            yield cdoc + +    # i/o + +    def as_dict(self): +        """ +        Return a dict representation of the parts contained. +        """ +        return self._dict + +    def from_dict(self, msg_dict): +        """ +        Populate MessageWrapper parts from a dictionary. +        It expects the same format that we use in a +        MessageWrapper. +        """ +        fdoc, hdoc, cdocs = map( +            lambda part: msg_dict.get(part, None), +            [self.FDOC, self.HDOC, self.CDOCS]) +        self._dict[self.FDOC] = fdoc +        self._dict[self.HDOC] = hdoc +        self._dict[self.CDOCS] = cdocs +  class MessagePart(object):      """ -    IMessagePart implementor. +    IMessagePart implementor, to be passed to several methods +    of the IMAP4Server.      It takes a subpart message and is able to find      the inner parts. @@ -117,6 +296,8 @@ class MessagePart(object):              payload = str("")          if payload: +            # XXX use find_charset instead -------------------------- +            # bad rebase???              content_type = self._get_ctype_from_document(phash)              charset = first(CHARSET_RE.findall(content_type))              logger.debug("Got charset from header: %s" % (charset,)) diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 67e5a41..46c9dc9 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -41,7 +41,7 @@ from leap.mail.utils import first, find_charset, lowerdict  from leap.mail.decorators import deferred  from leap.mail.imap.index import IndexedDB  from leap.mail.imap.fields import fields, WithMsgFields -from leap.mail.imap.memorystore import MessageDict +from leap.mail.imap.memorystore import MessageWrapper  from leap.mail.imap.parser import MailParser, MBoxParser  logger = logging.getLogger(__name__) @@ -984,7 +984,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # TODO ---- add reference to original doc, to be deleted          # after writes are done. -        msg_container = MessageDict(fd, hd, cdocs) +        msg_container = MessageWrapper(fd, hd, cdocs)          self._memstore.create_message(self.mbox, uid, msg_container)      def _remove_cb(self, result): @@ -1215,6 +1215,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # and we cannot find it otherwise. This seems to be enough.          # XXX do a deferLater instead ?? +        # FIXME this won't be needed after the CHECK command is implemented.          time.sleep(0.3)          return self._get_uid_from_msgidCb(msgid) @@ -1233,11 +1234,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: LeapMessage          """          print "getting msg by id!" -        msg_container = self._memstore.get(self.mbox, uid) +        msg_container = self._memstore.get_message(self.mbox, uid)          print "msg container", msg_container          if msg_container is not None:              print "getting LeapMessage (from memstore)" -            msg = LeapMessage(None, uid, self.mbox, collection=self, +            # We pass a reference to soledad just to be able to retrieve +            # missing parts that cannot be found in the container, like +            # the content docs after a copy. +            msg = LeapMessage(self._soledad, uid, self.mbox, collection=self,                                container=msg_container)              print "got msg:", msg          else: @@ -1309,7 +1313,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          """          Return a dict with all flags documents for this mailbox.          """ -        # XXX get all from memstore and cahce it there +        # XXX get all from memstore and cache it there          all_flags = dict(((              doc.content[self.UID_KEY],              doc.content[self.FLAGS_KEY]) for doc in @@ -1319,7 +1323,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          if self._memstore is not None:              # XXX              uids = self._memstore.get_uids(self.mbox) -            fdocs = [(uid, self._memstore.get(self.mbox, uid).fdoc) +            fdocs = [(uid, self._memstore.get_message(self.mbox, uid).fdoc)                       for uid in uids]              for uid, doc in fdocs:                  all_flags[uid] = doc.content[self.FLAGS_KEY] diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 3f99da6..8350988 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -32,6 +32,7 @@ from leap.mail.imap.account import SoledadBackedAccount  from leap.mail.imap.fetch import LeapIncomingMail  from leap.mail.imap.memorystore import MemoryStore  from leap.mail.imap.server import LeapIMAPServer +from leap.mail.imap.soledadstore import SoledadStore  from leap.soledad.client import Soledad  # The default port in which imap service will run @@ -96,7 +97,8 @@ class LeapIMAPFactory(ServerFactory):          self._uuid = uuid          self._userid = userid          self._soledad = soledad -        self._memstore = MemoryStore() +        self._memstore = MemoryStore( +            permanent_store=SoledadStore(soledad))          theAccount = SoledadBackedAccount(              uuid, soledad=soledad, diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py new file mode 100644 index 0000000..62a3c53 --- /dev/null +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +# soledadstore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +A MessageStore that writes to Soledad. +""" +import logging + +from u1db import errors as u1db_errors +from zope.interface import implements + +from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.fields import fields +from leap.mail.imap.interfaces import IMessageStore +from leap.mail.messageflow import IMessageConsumer + +logger = logging.getLogger(__name__) + + +class ContentDedup(object): +    """ +    Message deduplication. + +    We do a query for the content hashes before writing to our beloved +    sqlcipher backend of Soledad. This means, by now, that: + +    1. We will not store the same attachment twice, only the hash of it. +    2. We will not store the same message body twice, only the hash of it. + +    The first case is useful if you are always receiving the same old memes +    from unwary friends that still have not discovered that 4chan is the +    generator of the internet. The second will save your day if you have +    initiated session with the same account in two different machines. I also +    wonder why would you do that, but let's respect each other choices, like +    with the religious celebrations, and assume that one day we'll be able +    to run Bitmask in completely free phones. Yes, I mean that, the whole GSM +    Stack. +    """ + +    def _header_does_exist(self, doc): +        """ +        Check whether we already have a header document for this +        content hash in our database. + +        :param doc: tentative header document +        :type doc: dict +        :returns: True if it exists, False otherwise. +        """ +        if not doc: +            return False +        chash = doc[fields.CONTENT_HASH_KEY] +        header_docs = self._soledad.get_from_index( +            fields.TYPE_C_HASH_IDX, +            fields.TYPE_HEADERS_VAL, str(chash)) +        if not header_docs: +            return False + +        if len(header_docs) != 1: +            logger.warning("Found more than one copy of chash %s!" +                           % (chash,)) +        logger.debug("Found header doc with that hash! Skipping save!") +        return True + +    def _content_does_exist(self, doc): +        """ +        Check whether we already have a content document for a payload +        with this hash in our database. + +        :param doc: tentative content document +        :type doc: dict +        :returns: True if it exists, False otherwise. +        """ +        if not doc: +            return False +        phash = doc[fields.PAYLOAD_HASH_KEY] +        attach_docs = self._soledad.get_from_index( +            fields.TYPE_P_HASH_IDX, +            fields.TYPE_CONTENT_VAL, str(phash)) +        if not attach_docs: +            return False + +        if len(attach_docs) != 1: +            logger.warning("Found more than one copy of phash %s!" +                           % (phash,)) +        logger.debug("Found attachment doc with that hash! Skipping save!") +        return True + + +class SoledadStore(ContentDedup): +    """ +    This will create docs in the local Soledad database. +    """ + +    implements(IMessageConsumer, IMessageStore) + +    def __init__(self, soledad): +        """ +        Initialize the writer. + +        :param soledad: the soledad instance +        :type soledad: Soledad +        """ +        self._soledad = soledad + +    # IMessageStore + +    # ------------------------------------------------------------------- +    # We are not yet using this interface, but it would make sense +    # to implement it. + +    def create_message(self, mbox, uid, message): +        """ +        Create the passed message into this SoledadStore. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        :param message: a IMessageContainer implementor. +        """ + +    def put_message(self, mbox, uid, message): +        """ +        Put the passed existing message into this SoledadStore. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        :param message: a IMessageContainer implementor. +        """ + +    def remove_message(self, mbox, uid): +        """ +        Remove the given message from this SoledadStore. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        """ + +    def get_message(self, mbox, uid): +        """ +        Get a IMessageContainer for the given mbox and uid combination. + +        :param mbox: the mbox this message belongs. +        :param uid: the UID that identifies this message in this mailbox. +        """ + +    # IMessageConsumer + +    def consume(self, queue): +        """ +        Creates a new document in soledad db. + +        :param queue: queue to get item from, with content of the document +                      to be inserted. +        :type queue: Queue +        """ +        # TODO should delete the original message from incoming after +        # the writes are done. +        # TODO should handle the delete case +        # TODO should handle errors + +        empty = queue.empty() +        while not empty: +            for item, call in self._process(queue): +                self._try_call(call, item) +            empty = queue.empty() + +    # +    # SoledadStore specific methods. +    # + +    def _process(self, queue): +        """ +        Return the item and the proper call type for the next +        item in the queue if any. + +        :param queue: the queue from where we'll pick item. +        :type queue: Queue +        """ +        msg_wrapper = queue.get() +        return self._get_calls_for_msg_parts(msg_wrapper) + +    def _try_call(self, call, item): +        """ +        Try to invoke a given call with item as a parameter. +        """ +        if not call: +            return +        try: +            call(item) +        except u1db_errors.RevisionConflict as exc: +            logger.error("Error: %r" % (exc,)) +            raise exc + +    def _get_calls_for_msg_parts(self, msg_wrapper): +        """ +        Return the proper call type for a given item. + +        :param msg_wrapper: A MessageWrapper +        :type msg_wrapper: IMessageContainer +        """ +        call = None + +        if msg_wrapper.new is True: +            call = self._soledad.create_doc + +            # item is expected to be a MessagePartDoc +            for item in msg_wrapper.walk(): +                if item.part == MessagePartType.fdoc: +                    yield dict(item.content), call + +                if item.part == MessagePartType.hdoc: +                    if not self._header_does_exist(item.content): +                        yield dict(item.content), call + +                if item.part == MessagePartType.cdoc: +                    if self._content_does_exist(item.content): +                        yield dict(item.content), call + +        # TODO should check for elements with the dirty state +        # TODO if new == False and dirty == True, put_doc +        # XXX for puts, we will have to retrieve +        # the document, change the content, and +        # pass the whole document under "content" +        else: +            logger.error("Cannot put documents yet!") | 
