diff options
Diffstat (limited to 'mail/src')
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 70 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 265 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 72 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 162 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 35 | ||||
| -rw-r--r-- | mail/src/leap/mail/messageflow.py | 8 | ||||
| -rw-r--r-- | mail/src/leap/mail/utils.py | 9 | 
7 files changed, 479 insertions, 142 deletions
| diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 5e16b4b5..108d0da6 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -36,6 +36,7 @@ from leap.common import events as leap_events  from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type  from leap.mail.decorators import deferred +from leap.mail.utils import empty  from leap.mail.imap.fields import WithMsgFields, fields  from leap.mail.imap.messages import MessageCollection  from leap.mail.imap.messageparts import MessageWrapper @@ -475,8 +476,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          """          Remove all messages flagged \\Deleted          """ +        print "EXPUNGE!"          if not self.isWriteable():              raise imap4.ReadOnlyMailbox +        mstore = self._memstore +        if mstore is not None: +            deleted = mstore.all_deleted_uid_iter(self.mbox) +            print "deleted ", list(deleted) +            for uid in deleted: +                mstore.remove_message(self.mbox, uid) + +        print "now deleting from soledad"          d = self.messages.remove_all_deleted()          d.addCallback(self._expunge_cb)          d.addCallback(self.messages.reset_last_uid) @@ -709,21 +719,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              msg = self.messages.get_msg_by_uid(msg_id)              if not msg:                  continue +            # We duplicate the set operations here +            # to return the result because it's less costly than +            # retrieving the flags again. +            newflags = set(msg.getFlags()) +              if mode == 1:                  msg.addFlags(flags) +                newflags = newflags.union(set(flags))              elif mode == -1:                  msg.removeFlags(flags) +                newflags.difference_update(flags)              elif mode == 0:                  msg.setFlags(flags) -            result[msg_id] = msg.getFlags() - -        # After changing flags, we want to signal again to the -        # UI because the number of unread might have changed. -        # Hoever, we should probably limit this to INBOX only? -        # this should really be called as a final callback of -        # the do_STORE method... -        from twisted.internet import reactor -        deferLater(reactor, 1, self.signal_unread_to_ui) +                newflags = set(flags) +            result[msg_id] = newflags          return result      # ISearchableMailbox @@ -780,6 +790,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          from twisted.internet import reactor          uid_next = self.getUIDNext()          msg = messageObject +        memstore = self._memstore          # XXX should use a public api instead          fdoc = msg._fdoc @@ -787,20 +798,35 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          if not fdoc:              logger.debug("Tried to copy a MSG with no fdoc")              return -          new_fdoc = copy.deepcopy(fdoc.content) -        new_fdoc[self.UID_KEY] = uid_next -        new_fdoc[self.MBOX_KEY] = self.mbox -        self._memstore.create_message( -            self.mbox, uid_next, -            MessageWrapper( -                new_fdoc, hdoc.content)) - -        # XXX use memory store !!! -        if hasattr(hdoc, 'doc_id'): -            self.messages.add_hdocset_docid(hdoc.doc_id) - -        deferLater(reactor, 1, self.notify_new) + +        fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] +        dest_fdoc = memstore.get_fdoc_from_chash( +            fdoc_chash, self.mbox) +        exist = dest_fdoc and not empty(dest_fdoc.content) + +        if exist: +            print "Destination message already exists!" + +        else: +            print "DO COPY MESSAGE!" +            new_fdoc[self.UID_KEY] = uid_next +            new_fdoc[self.MBOX_KEY] = self.mbox + +            # XXX set recent! + +            print "****************************" +            print "copy message..." +            print "new fdoc ", new_fdoc +            print "hdoc: ", hdoc +            print "****************************" + +            self._memstore.create_message( +                self.mbox, uid_next, +                MessageWrapper( +                    new_fdoc, hdoc.content)) + +            deferLater(reactor, 1, self.notify_new)      # convenience fun diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index f0bdab57..f0c0d4b0 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -21,10 +21,13 @@ import contextlib  import logging  import weakref +from twisted.internet import defer  from twisted.internet.task import LoopingCall +from twisted.python import log  from zope.interface import implements  from leap.mail import size +from leap.mail.utils import empty  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields @@ -34,6 +37,8 @@ from leap.mail.imap.messageparts import ReferenciableDict  logger = logging.getLogger(__name__) +SOLEDAD_WRITE_PERIOD = 20 +  @contextlib.contextmanager  def set_bool_flag(obj, att): @@ -79,7 +84,8 @@ class MemoryStore(object):      WRITING_FLAG = "_writing" -    def __init__(self, permanent_store=None, write_period=60): +    def __init__(self, permanent_store=None, +                 write_period=SOLEDAD_WRITE_PERIOD):          """          Initialize a MemoryStore. @@ -92,10 +98,23 @@ class MemoryStore(object):          self._permanent_store = permanent_store          self._write_period = write_period -        # Internal Storage +        # Internal Storage: messages          self._msg_store = {} + +        # Internal Storage: payload-hash +        """ +        {'phash': weakreaf.proxy(dict)} +        """          self._phash_store = {} +        # Internal Storage: content-hash:fdoc +        """ +        {'chash': {'mbox-a': weakref.proxy(dict), +                   'mbox-b': weakref.proxy(dict)} +        } +        """ +        self._chash_fdoc_store = {} +          # TODO ----------------- implement mailbox-level flags store too! ----          self._rflags_store = {}          self._hdocset_store = {} @@ -103,7 +122,9 @@ class MemoryStore(object):          # New and dirty flags, to set MessageWrapper State.          self._new = set([]) +        self._new_deferreds = {}          self._dirty = set([]) +        self._dirty_deferreds = {}          # Flag for signaling we're busy writing to the disk storage.          setattr(self, self.WRITING_FLAG, False) @@ -141,48 +162,141 @@ class MemoryStore(object):      # We would have to add a put_flags operation to modify only      # the flags doc (and set the dirty flag accordingly) -    def create_message(self, mbox, uid, message): +    def create_message(self, mbox, uid, message, notify_on_disk=True):          """          Create the passed message into this MemoryStore.          By default we consider that any message is a new message. + +        :param mbox: the mailbox +        :type mbox: basestring +        :param uid: the UID for the message +        :type uid: int +        :param message: a to be added +        :type message: MessageWrapper +        :param notify_on_disk: +        :type notify_on_disk: bool + +        :return: a Deferred. if notify_on_disk is True, will be fired +                 when written to the db on disk. +                 Otherwise will fire inmediately +        :rtype: Deferred          """          print "adding new doc to memstore %s (%s)" % (mbox, uid)          key = mbox, uid + +        d = defer.Deferred() +        d.addCallback(lambda result: log.msg("message save: %s" % result)) +          self._new.add(key) +        self._new_deferreds[key] = d +        self._add_message(mbox, uid, message, notify_on_disk) +        print "create message: ", d +        return d -        msg_dict = message.as_dict() -        self._msg_store[key] = msg_dict +    def put_message(self, mbox, uid, message, notify_on_disk=True): +        """ +        Put an existing message. -        cdocs = message.cdocs +        :param mbox: the mailbox +        :type mbox: basestring +        :param uid: the UID for the message +        :type uid: int +        :param message: a to be added +        :type message: MessageWrapper +        :param notify_on_disk: +        :type notify_on_disk: bool -        dirty = key in self._dirty -        new = key in self._new +        :return: a Deferred. if notify_on_disk is True, will be fired +                 when written to the db on disk. +                 Otherwise will fire inmediately +        :rtype: Deferred +        """ +        key = mbox, uid + +        d = defer.Deferred() +        d.addCallback(lambda result: log.msg("message save: %s" % result)) + +        self._dirty.add(key) +        self._dirty_deferreds[key] = d +        self._add_message(mbox, uid, message, notify_on_disk) +        return d -        # XXX should capture this in log... +    def _add_message(self, mbox, uid, message, notify_on_disk=True): +        # XXX have to differentiate between notify_new and notify_dirty + +        key = mbox, uid +        msg_dict = message.as_dict() +        print "ADDING MESSAGE..." +        import pprint; pprint.pprint(msg_dict) + +        # XXX use the enum as keys + +        try: +            store = self._msg_store[key] +        except KeyError: +            self._msg_store[key] = {'fdoc': {}, +                                    'hdoc': {}, +                                    'cdocs': {}, +                                    'docs_id': {}} +            store = self._msg_store[key] + +        print "In store (before):" +        import pprint; pprint.pprint(store) + +        #self._msg_store[key] = msg_dict +        fdoc = msg_dict.get('fdoc', None) +        if fdoc: +            if not store.get('fdoc', None): +                store['fdoc'] = ReferenciableDict({}) +            store['fdoc'].update(fdoc) + +            # content-hash indexing +            chash = fdoc.get(fields.CONTENT_HASH_KEY) +            chash_fdoc_store = self._chash_fdoc_store +            if not chash in chash_fdoc_store: +                chash_fdoc_store[chash] = {} + +            chash_fdoc_store[chash][mbox] = weakref.proxy( +                store['fdoc']) + +        hdoc = msg_dict.get('hdoc', None) +        if hdoc: +            if not store.get('hdoc', None): +                store['hdoc'] = ReferenciableDict({}) +            store['hdoc'].update(hdoc) + +        docs_id = msg_dict.get('docs_id', None) +        if docs_id: +            if not store.get('docs_id', None): +                store['docs_id'] = {} +            store['docs_id'].update(docs_id) +        cdocs = message.cdocs          for cdoc_key in cdocs.keys(): -            print "saving cdoc" -            cdoc = self._msg_store[key]['cdocs'][cdoc_key] +            if not store.get('cdocs', None): +                store['cdocs'] = {} -            # FIXME this should be done in the MessageWrapper constructor -            # instead... +            cdoc = cdocs[cdoc_key]              # first we make it weak-referenciable              referenciable_cdoc = ReferenciableDict(cdoc) -            self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc( -                new=new, dirty=dirty, store="mem", -                part=MessagePartType.cdoc, -                content=referenciable_cdoc) +            store['cdocs'][cdoc_key] = referenciable_cdoc              phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)              if not phash:                  continue              self._phash_store[phash] = weakref.proxy(referenciable_cdoc) -    def put_message(self, mbox, uid, msg): -        """ -        Put an existing message. -        """ -        return NotImplementedError() +        def prune(seq, store): +            for key in seq: +                if key in store and empty(store.get(key)): +                    store.pop(key) + +        prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store) +        #import ipdb; ipdb.set_trace() + + +        print "after appending to store: ", key +        import pprint; pprint.pprint(self._msg_store[key])      def get_message(self, mbox, uid):          """ @@ -203,7 +317,13 @@ class MemoryStore(object):          """          Remove a Message from this MemoryStore.          """ -        raise NotImplementedError() +        try: +            key = mbox, uid +            self._new.discard(key) +            self._dirty.discard(key) +            self._msg_store.pop(key, None) +        except Exception as exc: +            logger.exception(exc)      # IMessageStoreWriter @@ -211,12 +331,15 @@ class MemoryStore(object):          """          Write the message documents in this MemoryStore to a different store.          """ -        # XXX pass if it's writing (ie, the queue is not empty...) -        # See how to make the writing_flag aware of the queue state... -        print "writing messages to producer..." +        # For now, we pass if the queue is not empty, to avoid duplication. +        # We would better use a flag to know when we've already enqueued an +        # item. +        if not self.producer.is_queue_empty(): +            return +        print "Writing messages to Soledad..."          with set_bool_flag(self, self.WRITING_FLAG): -            for msg_wrapper in self.all_msg_iter(): +            for msg_wrapper in self.all_new_dirty_msg_iter():                  self.producer.push(msg_wrapper)      # MemoryStore specific methods. @@ -247,12 +370,14 @@ class MemoryStore(object):          """          return len(self._new) -    def get_by_phash(self, phash): +    def get_cdoc_from_phash(self, phash):          """          Return a content-document by its payload-hash.          """          doc = self._phash_store.get(phash, None) +        # XXX return None for consistency? +          # XXX have to keep a mapping between phash and its linkage          # info, to know if this payload is been already saved or not.          # We will be able to get this from the linkage-docs, @@ -262,7 +387,40 @@ class MemoryStore(object):          return MessagePartDoc(              new=new, dirty=dirty, store="mem",              part=MessagePartType.cdoc, -            content=doc) +            content=doc, +            doc_id=None) + +    def get_fdoc_from_chash(self, chash, mbox): +        """ +        Return a flags-document by its content-hash and a given mailbox. + +        :return: MessagePartDoc, or None. +        """ +        docs_dict = self._chash_fdoc_store.get(chash, None) +        fdoc = docs_dict.get(mbox, None) if docs_dict else None + +        print "GETTING FDOC BY CHASH:", fdoc + +        # a couple of special cases. +        # 1. We might have a doc with empty content... +        if empty(fdoc): +            return None + +        # ...Or the message could exist, but being flagged for deletion. +        # We want to create a new one in this case. +        # Hmmm what if the deletion is un-done?? We would end with a +        # duplicate... +        if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: +            return None + +        # XXX get flags +        new = True +        dirty = False +        return MessagePartDoc( +            new=new, dirty=dirty, store="mem", +            part=MessagePartType.fdoc, +            content=fdoc, +            doc_id=None)      def all_msg_iter(self):          """ @@ -271,6 +429,25 @@ class MemoryStore(object):          return (self.get_message(*key)                  for key in sorted(self._msg_store.keys())) +    def all_new_dirty_msg_iter(self): +        """ +        Return geneator that iterates through all new and dirty messages. +        """ +        return (self.get_message(*key) +                for key in sorted(self._msg_store.keys()) +                if key in self._new or key in self._dirty) + +    def all_deleted_uid_iter(self, mbox): +        """ +        Return generator that iterates through the UIDs for all messags +        with deleted flag in a given mailbox. +        """ +        all_deleted = ( +            msg['fdoc']['uid'] for msg in self._msg_store.values() +            if msg.get('fdoc', None) +            and fields.DELETED_FLAG in msg['fdoc']['flags']) +        return all_deleted +      # new, dirty flags      def _get_new_dirty_state(self, key): @@ -289,9 +466,35 @@ class MemoryStore(object):          """          Remove the key value from the `new` set.          """ -        print "******************" -        print "UNSETTING NEW FOR: %s" % str(key) +        print "Unsetting NEW for: %s" % str(key)          self._new.discard(key) +        deferreds = self._new_deferreds +        d = deferreds.get(key, None) +        if d: +            # XXX use a namedtuple for passing the result +            # when we check it in the other side. +            d.callback('%s, ok' % str(key)) +            deferreds.pop(key) + +    def set_dirty(self, key): +        """ +        Add the key value to the `dirty` set. +        """ +        self._dirty.add(key) + +    def unset_dirty(self, key): +        """ +        Remove the key value from the `dirty` set. +        """ +        print "Unsetting DIRTY for: %s" % str(key) +        self._dirty.discard(key) +        deferreds = self._dirty_deferreds +        d = deferreds.get(key, None) +        if d: +            # XXX use a namedtuple for passing the result +            # when we check it in the other side. +            d.callback('%s, ok' % str(key)) +            deferreds.pop(key)      @property      def is_writing(self): diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index 42eef02d..b43bc37b 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -65,15 +65,13 @@ and sometimes to a part in particular only.    we have modified its state in memory, so we need to put_doc instead while    dumping the MemoryStore contents.    `dirty` attribute would only apply to flags-docs and linkage-docs. - - -  XXX this is still not implemented! +* `doc_id` is the identifier for the document in the u1db database, if any.  """  MessagePartDoc = namedtuple(      'MessagePartDoc', -    ['new', 'dirty', 'part', 'store', 'content']) +    ['new', 'dirty', 'part', 'store', 'content', 'doc_id'])  class ReferenciableDict(dict): @@ -96,6 +94,7 @@ class MessageWrapper(object):      FDOC = "fdoc"      HDOC = "hdoc"      CDOCS = "cdocs" +    DOCS_ID = "docs_id"      # XXX can use this to limit the memory footprint,      # or is it too premature to optimize? @@ -105,12 +104,17 @@ class MessageWrapper(object):      def __init__(self, fdoc=None, hdoc=None, cdocs=None,                   from_dict=None, memstore=None, -                 new=True, dirty=False): +                 new=True, dirty=False, docs_id={}): +        """ +        Initialize a MessageWrapper. +        """ +        # TODO add optional reference to original message in the incoming          self._dict = {}          self.memstore = memstore          self._new = new          self._dirty = dirty +          self._storetype = "mem"          if from_dict is not None: @@ -122,6 +126,7 @@ class MessageWrapper(object):                  self._dict[self.HDOC] = ReferenciableDict(hdoc)              if cdocs is not None:                  self._dict[self.CDOCS] = ReferenciableDict(cdocs) +        self._dict[self.DOCS_ID] = docs_id      # properties @@ -153,10 +158,28 @@ class MessageWrapper(object):                     doc="The `new` flag for this MessageWrapper")      def _get_dirty(self): +        """ +        Get the value for the `dirty` flag. +        """          return self._dirty      def _set_dirty(self, value=True): +        """ +        Set the value for the `dirty` flag, and propagate it +        to the memory store if any. +        """          self._dirty = value +        if self.memstore: +            mbox = self.fdoc.content['mbox'] +            uid = self.fdoc.content['uid'] +            key = mbox, uid +            fun = [self.memstore.unset_dirty, +                   self.memstore.set_dirty][int(value)] +            fun(key) +        else: +            logger.warning("Could not find a memstore referenced from this " +                           "MessageWrapper. The value for new will not be " +                           "propagated")      dirty = property(_get_dirty, _set_dirty) @@ -173,7 +196,9 @@ class MessageWrapper(object):          return MessagePartDoc(new=self.new, dirty=self.dirty,                                store=self._storetype,                                part=MessagePartType.fdoc, -                              content=content_ref) +                              content=content_ref, +                              doc_id=self._dict[self.DOCS_ID].get( +                                  self.FDOC, None))      @property      def hdoc(self): @@ -186,7 +211,9 @@ class MessageWrapper(object):          return MessagePartDoc(new=self.new, dirty=self.dirty,                                store=self._storetype,                                part=MessagePartType.hdoc, -                              content=content_ref) +                              content=content_ref, +                              doc_id=self._dict[self.DOCS_ID].get( +                                  self.HDOC, None))      @property      def cdocs(self): @@ -201,21 +228,18 @@ class MessageWrapper(object):          Generator that iterates through all the parts, returning          MessagePartDoc.          """ -        yield self.fdoc -        yield self.hdoc +        if self.fdoc is not None: +            yield self.fdoc +        if self.hdoc is not None: +            yield self.hdoc          for cdoc in self.cdocs.values(): -            # XXX this will break ---- -            #content_ref = weakref.proxy(cdoc) -            #yield MessagePartDoc(new=self.new, dirty=self.dirty, -                                 #store=self._storetype, -                                 #part=MessagePartType.cdoc, -                                 #content=content_ref) - -            # the put is handling this for us, so -            # we already have stored a MessagePartDoc -            # but we should really do it while adding in the -            # constructor or the from_dict method -            yield cdoc +            if cdoc is not None: +                content_ref = weakref.proxy(cdoc) +                yield MessagePartDoc(new=self.new, dirty=self.dirty, +                                     store=self._storetype, +                                     part=MessagePartType.cdoc, +                                     content=content_ref, +                                     doc_id=None)      # i/o @@ -234,9 +258,9 @@ class MessageWrapper(object):          fdoc, hdoc, cdocs = map(              lambda part: msg_dict.get(part, None),              [self.FDOC, self.HDOC, self.CDOCS]) -        self._dict[self.FDOC] = fdoc -        self._dict[self.HDOC] = hdoc -        self._dict[self.CDOCS] = cdocs +        for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), +                       (self.CDOCS, cdocs)): +            self._dict[t] = ReferenciableDict(doc) if doc else None  class MessagePart(object): diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 94bd7141..c212472b 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -37,7 +37,7 @@ from leap.common.check import leap_assert, leap_assert_type  from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset  from leap.mail import walk -from leap.mail.utils import first, find_charset, lowerdict +from leap.mail.utils import first, find_charset, lowerdict, empty  from leap.mail.decorators import deferred  from leap.mail.imap.index import IndexedDB  from leap.mail.imap.fields import fields, WithMsgFields @@ -130,6 +130,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          self.__chash = None          self.__bdoc = None +    # XXX make these properties public +      @property      def _fdoc(self):          """ @@ -154,8 +156,9 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          if self._container is not None:              hdoc = self._container.hdoc -            if hdoc: +            if hdoc and not empty(hdoc.content):                  return hdoc +        # XXX cache this into the memory store !!!          return self._get_headers_doc()      @property @@ -248,7 +251,13 @@ class LeapMessage(fields, MailParser, MBoxParser):          doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags          doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags -        if getattr(doc, 'store', None) != "mem": +        if self._collection.memstore is not None: +            self._collection.memstore.put_message( +                self._mbox, self._uid, +                MessageWrapper(fdoc=doc.content, new=False, dirty=True, +                               docs_id={'fdoc': doc.doc_id})) +        else: +            # fallback for non-memstore initializations.              self._soledad.put_doc(doc)      def addFlags(self, flags): @@ -547,20 +556,18 @@ class LeapMessage(fields, MailParser, MBoxParser):          # phash doc...          if self._container is not None: -            bdoc = self._container.memstore.get_by_phash(body_phash) +            bdoc = self._container.memstore.get_cdoc_from_phash(body_phash)              print "bdoc from container -->", bdoc              if bdoc and bdoc.content is not None:                  return bdoc              else:                  print "no doc or not bdoc content for that phash found!" -        print "nuthing. soledad?"          # no memstore or no doc found there          if self._soledad:              body_docs = self._soledad.get_from_index(                  fields.TYPE_P_HASH_IDX,                  fields.TYPE_CONTENT_VAL, str(body_phash)) -            print "returning body docs...", body_docs              return first(body_docs)          else:              logger.error("No phash in container, and no soledad found!") @@ -581,32 +588,32 @@ class LeapMessage(fields, MailParser, MBoxParser):      # setters      # XXX to be used in the messagecopier interface?! - -    def set_uid(self, uid): -        """ -        Set new uid for this message. - -        :param uid: the new uid -        :type uid: basestring -        """ +# +    #def set_uid(self, uid): +        #""" +        #Set new uid for this message. +# +        #:param uid: the new uid +        #:type uid: basestring +        #"""          # XXX dangerous! lock? -        self._uid = uid -        d = self._fdoc -        d.content[self.UID_KEY] = uid -        self._soledad.put_doc(d) - -    def set_mbox(self, mbox): -        """ -        Set new mbox for this message. - -        :param mbox: the new mbox -        :type mbox: basestring -        """ +        #self._uid = uid +        #d = self._fdoc +        #d.content[self.UID_KEY] = uid +        #self._soledad.put_doc(d) +# +    #def set_mbox(self, mbox): +        #""" +        #Set new mbox for this message. +# +        #:param mbox: the new mbox +        #:type mbox: basestring +        #"""          # XXX dangerous! lock? -        self._mbox = mbox -        d = self._fdoc -        d.content[self.MBOX_KEY] = mbox -        self._soledad.put_doc(d) +        #self._mbox = mbox +        #d = self._fdoc +        #d.content[self.MBOX_KEY] = mbox +        #self._soledad.put_doc(d)      # destructor @@ -614,14 +621,13 @@ class LeapMessage(fields, MailParser, MBoxParser):      def remove(self):          """          Remove all docs associated with this message. +        Currently it removes only the flags doc.          """          # XXX For the moment we are only removing the flags and headers          # docs. The rest we leave there polluting your hard disk,          # until we think about a good way of deorphaning.          # Maybe a crawler of unreferenced docs. -        # XXX remove from memory store!!! -          # XXX implement elijah's idea of using a PUT document as a          # token to ensure consistency in the removal. @@ -632,13 +638,35 @@ class LeapMessage(fields, MailParser, MBoxParser):          #bd = self._get_body_doc()          #docs = [fd, hd, bd] -        docs = [fd] +        try: +            memstore = self._collection.memstore +        except AttributeError: +            memstore = False + +        if memstore and hasattr(fd, "store", None) == "mem": +            key = self._mbox, self._uid +            if fd.new: +                # it's a new document, so we can remove it and it will not +                # be writen. Watch out! We need to be sure it has not been +                # just queued to write! +                memstore.remove_message(*key) + +            if fd.dirty: +                doc_id = fd.doc_id +                doc = self._soledad.get_doc(doc_id) +                try: +                    self._soledad.delete_doc(doc) +                except Exception as exc: +                    logger.exception(exc) -        for d in filter(None, docs): +        else: +            # we just got a soledad_doc              try: -                self._soledad.delete_doc(d) +                doc_id = fd.doc_id +                latest_doc = self._soledad.get_doc(doc_id) +                self._soledad.delete_doc(latest_doc)              except Exception as exc: -                logger.error(exc) +                logger.exception(exc)          return uid      def does_exist(self): @@ -786,8 +814,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # okay, all in order, keep going...          self.mbox = self._parse_mailbox_name(mbox) + +        # XXX get a SoledadStore passed instead          self._soledad = soledad -        self._memstore = memstore +        self.memstore = memstore          self.__rflags = None          self.__hdocset = None @@ -913,13 +943,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :type chash: basestring          :return: False, if it does not exist, or UID.          """ -        exist = self._get_fdoc_from_chash(chash) +        exist = False +        if self.memstore is not None: +            exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + +        if not exist: +            exist = self._get_fdoc_from_chash(chash) + +        print "FDOC EXIST?", exist          if exist:              return exist.content.get(fields.UID_KEY, "unknown-uid")          else:              return False -    @deferred +    # not deferring to thread cause this now uses deferred asa retval +    #@deferred      def add_msg(self, raw, subject=None, flags=None, date=None, uid=1):          """          Creates a new message document. @@ -945,6 +983,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # TODO add the linked-from info !          # TODO add reference to the original message +        print "ADDING MESSAGE..."          logger.debug('adding message')          if flags is None: @@ -956,11 +995,14 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # check for uniqueness.          if self._fdoc_already_exists(chash): +            print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" +            print +            print              logger.warning("We already have that message in this mailbox.")              # note that this operation will leave holes in the UID sequence,              # but we're gonna change that all the same for a local-only table.              # so not touch it by the moment. -            return False +            return defer.succeed('already_exists')          fd = self._populate_flags(flags, uid, chash, size, multi)          hd = self._populate_headr(msg, chash, subject, date) @@ -999,7 +1041,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # TODO ---- add reference to original doc, to be deleted          # after writes are done.          msg_container = MessageWrapper(fd, hd, cdocs) -        self._memstore.create_message(self.mbox, uid, msg_container) + +        # XXX Should allow also to dump to disk directly, +        # for no-memstore cases. + +        # we return a deferred that, by default, will be triggered when +        # saved to disk +        d = self.memstore.create_message(self.mbox, uid, msg_container) +        print "defered-add", d +        print "adding message", d +        return d      def _remove_cb(self, result):          return result @@ -1247,17 +1298,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                   or None if not found.          :rtype: LeapMessage          """ -        print "getting msg by id!" -        msg_container = self._memstore.get_message(self.mbox, uid) -        print "msg container", msg_container +        msg_container = self.memstore.get_message(self.mbox, uid)          if msg_container is not None: -            print "getting LeapMessage (from memstore)"              # We pass a reference to soledad just to be able to retrieve              # missing parts that cannot be found in the container, like              # the content docs after a copy.              msg = LeapMessage(self._soledad, uid, self.mbox, collection=self,                                container=msg_container) -            print "got msg:", msg          else:              msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)          if not msg.does_exist(): @@ -1303,8 +1350,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                         self._soledad.get_from_index(                             fields.TYPE_MBOX_IDX,                             fields.TYPE_FLAGS_VAL, self.mbox)]) -        if self._memstore is not None: -            mem_uids = self._memstore.get_uids(self.mbox) +        if self.memstore is not None: +            mem_uids = self.memstore.get_uids(self.mbox)              uids = db_uids.union(set(mem_uids))          else:              uids = db_uids @@ -1328,19 +1375,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          Return a dict with all flags documents for this mailbox.          """          # XXX get all from memstore and cache it there +        # FIXME should get all uids, get them fro memstore, +        # and get only the missing ones from disk. +          all_flags = dict(((              doc.content[self.UID_KEY],              doc.content[self.FLAGS_KEY]) for doc in              self._soledad.get_from_index(                  fields.TYPE_MBOX_IDX,                  fields.TYPE_FLAGS_VAL, self.mbox))) -        if self._memstore is not None: +        if self.memstore is not None:              # XXX -            uids = self._memstore.get_uids(self.mbox) -            fdocs = [(uid, self._memstore.get_message(self.mbox, uid).fdoc) -                     for uid in uids] -            for uid, doc in fdocs: -                all_flags[uid] = doc.content[self.FLAGS_KEY] +            uids = self.memstore.get_uids(self.mbox) +            docs = ((uid, self.memstore.get_message(self.mbox, uid)) +                    for uid in uids) +            for uid, doc in docs: +                all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY]          return all_flags @@ -1378,8 +1428,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          count = self._soledad.get_count_from_index(              fields.TYPE_MBOX_IDX,              fields.TYPE_FLAGS_VAL, self.mbox) -        if self._memstore is not None: -            count += self._memstore.count_new() +        if self.memstore is not None: +            count += self.memstore.count_new()          return count      # unseen messages diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index d36acaed..b321da8a 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -81,7 +81,8 @@ class ContentDedup(object):          if len(header_docs) != 1:              logger.warning("Found more than one copy of chash %s!"                             % (chash,)) -        logger.debug("Found header doc with that hash! Skipping save!") +        # XXX re-enable +        #logger.debug("Found header doc with that hash! Skipping save!")          return True      def _content_does_exist(self, doc): @@ -105,7 +106,8 @@ class ContentDedup(object):          if len(attach_docs) != 1:              logger.warning("Found more than one copy of phash %s!"                             % (phash,)) -        logger.debug("Found attachment doc with that hash! Skipping save!") +        # XXX re-enable +        #logger.debug("Found attachment doc with that hash! Skipping save!")          return True @@ -215,6 +217,7 @@ class SoledadStore(ContentDedup):                  # If everything went well, we can unset the new flag                  # in the source store (memory store)                  msg_wrapper.new = False +                msg_wrapper.dirty = False              empty = queue.empty()      # @@ -261,6 +264,9 @@ class SoledadStore(ContentDedup):              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk():                  if item.part == MessagePartType.fdoc: + +                    # FIXME add content duplication for HEADERS too! +                    # (only 1 chash per mailbox!)                      yield dict(item.content), call                  elif item.part == MessagePartType.hdoc: @@ -276,18 +282,31 @@ class SoledadStore(ContentDedup):                          yield dict(item.content), call +        # For now, the only thing that will be dirty is +        # the flags doc. + +        elif msg_wrapper.dirty is True: +            print "DIRTY DOC! ----------------------" +            call = self._soledad.put_doc + +            # item is expected to be a MessagePartDoc +            for item in msg_wrapper.walk(): +                doc_id = item.doc_id  # defend! +                doc = self._soledad.get_doc(doc_id) +                doc.content = item.content + +                if item.part == MessagePartType.fdoc: +                    print "Will PUT the doc: ", doc +                    yield dict(doc), call + +                # XXX also for linkage-doc +          # TODO should write back to the queue          # with the results of the operation.          # We can write there:          # (*) MsgWriteACK  --> Should remove from incoming queue.          #                      (We should do this here). -          # Implement using callbacks for each operation. -        # TODO should check for elements with the dirty state -        # TODO if new == False and dirty == True, put_doc -        # XXX for puts, we will have to retrieve -        # the document, change the content, and -        # pass the whole document under "content"          else:              logger.error("Cannot put/delete documents yet!") diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index ed6abcdf..b7fc0304 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -126,9 +126,15 @@ class MessageProducer(object):          again after the addition of new items.          """          self._consumer.consume(self._queue) -        if self._queue.empty(): +        if self.is_queue_empty():              self.stop() +    def is_queue_empty(self): +        """ +        Return True if queue is empty, False otherwise. +        """ +        return self._queue.empty() +      # public methods: IMessageProducer      def push(self, item): diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index 64af04f9..bae28984 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -36,6 +36,15 @@ def first(things):          return None +def empty(thing): +    """ +    Return True if a thing is None or its length is zero. +    """ +    if thing is None: +        return True +    return len(thing) == 0 + +  def maybe_call(thing):      """      Return the same thing, or the result of its invocation if it is a | 
