diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-07 05:50:55 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:39:44 -0400 | 
| commit | 2087c78ce5a4a4cdb8ed4192840059513088838f (patch) | |
| tree | de58da34ecc47f80f38c21f5a92ee956c98171a5 | |
| parent | ad15196600995911b24d413e9a44743e6fd1cf8f (diff) | |
separate better dirty/new flags; add cdocs
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 88 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 21 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 22 | ||||
| -rw-r--r-- | mail/src/leap/mail/utils.py | 19 | 
4 files changed, 106 insertions, 44 deletions
| diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 4156c0b..ee3ee92 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -24,6 +24,7 @@ import weakref  from collections import defaultdict  from copy import copy +from itertools import chain  from twisted.internet import defer  from twisted.internet.task import LoopingCall @@ -33,7 +34,7 @@ from zope.interface import implements  from leap.common.check import leap_assert_type  from leap.mail import size  from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty +from leap.mail.utils import empty, phash_iter  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields @@ -110,13 +111,12 @@ class MemoryStore(object):          # Internal Storage: messages          """ -        Flags document store. +        flags document store.          _fdoc_store[mbox][uid] = { 'content': 'aaa' }          """          self._fdoc_store = defaultdict(lambda: defaultdict(              lambda: ReferenciableDict({}))) -<<<<<<< HEAD          # Sizes          """          {'mbox, uid': <int>} @@ -124,9 +124,14 @@ class MemoryStore(object):          self._sizes = {}          # Internal Storage: payload-hash -======= +        """ +        fdocs:doc-id store, stores document IDs for putting +        the dirty flags-docs. +        """ +        self._fdoc_id_store = defaultdict(lambda: defaultdict( +            lambda: '')) +          # Internal Storage: content-hash:hdoc ->>>>>>> change internal storage and keying scheme in memstore          """          hdoc-store keeps references to          the header-documents indexed by content-hash. @@ -360,14 +365,6 @@ class MemoryStore(object):          self._sizes[key] = size.get_size(self._fdoc_store[key])          # TODO add hdoc and cdocs sizes too -        # XXX what to do with this? -        #docs_id = msg_dict.get(DOCS_ID, None) -        #if docs_id is not None: -            #if not store.get(DOCS_ID, None): -                #store[DOCS_ID] = {} -            #store[DOCS_ID].update(docs_id) - -      def get_docid_for_fdoc(self, mbox, uid):          """          Return Soledad document id for the flags-doc for a given mbox and uid, @@ -379,13 +376,18 @@ class MemoryStore(object):          :type uid: int          :rtype: unicode or None          """ -        fdoc = self._permanent_store.get_flags_doc(mbox, uid) -        if empty(fdoc): -            return None -        doc_id = fdoc.doc_id +        doc_id = self._fdoc_id_store[mbox][uid] + +        if empty(doc_id): +            fdoc = self._permanent_store.get_flags_doc(mbox, uid) +            if empty(fdoc.content): +                return None +            doc_id = fdoc.doc_id +            self._fdoc_id_store[mbox][uid] = doc_id +          return doc_id -    def get_message(self, mbox, uid, flags_only=False): +    def get_message(self, mbox, uid, dirtystate="none", flags_only=False):          """          Get a MessageWrapper for the given mbox and uid combination. @@ -393,19 +395,32 @@ class MemoryStore(object):          :type mbox: str or unicode          :param uid: the message UID          :type uid: int +        :param dirtystate: one of `dirty`, `new` or `none` (default) +        :type dirtystate: str          :param flags_only: whether the message should carry only a reference                             to the flags document.          :type flags_only: bool +        :          :return: MessageWrapper or None          """ +        if dirtystate == "dirty": +            flags_only = True +          key = mbox, uid          fdoc = self._fdoc_store[mbox][uid]          if empty(fdoc):              return None -        new, dirty = self._get_new_dirty_state(key) +        new, dirty = False, False +        if dirtystate == "none": +            new, dirty = self._get_new_dirty_state(key) +        if dirtystate == "dirty": +            new, dirty = False, True +        if dirtystate == "new": +            new, dirty = True, False +          if flags_only:              return MessageWrapper(fdoc=fdoc,                                    new=new, dirty=dirty, @@ -413,7 +428,22 @@ class MemoryStore(object):          else:              chash = fdoc.get(fields.CONTENT_HASH_KEY)              hdoc = self._hdoc_store[chash] -            return MessageWrapper(fdoc=fdoc, hdoc=hdoc, +            if empty(hdoc): +                hdoc = self._permanent_store.get_headers_doc(chash) +                if not empty(hdoc.content): +                    self._hdoc_store[chash] = hdoc.content +                    hdoc = hdoc.content +            cdocs = None + +            pmap = hdoc.get(fields.PARTS_MAP_KEY, None) +            if new and pmap is not None: +                # take the different cdocs for write... +                cdoc_store = self._cdoc_store +                cdocs_list = phash_iter(hdoc) +                cdocs = dict(enumerate( +                    [cdoc_store[phash] for phash in cdocs_list], 1)) + +            return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs,                                    new=new, dirty=dirty,                                    memstore=weakref.proxy(self)) @@ -437,14 +467,9 @@ class MemoryStore(object):              key = mbox, uid              self._new.discard(key)              self._dirty.discard(key) -<<<<<<< HEAD -            self._msg_store.pop(key, None)              if key in self._sizes:                  del self._sizes[key] - -=======              self._fdoc_store[mbox].pop(uid, None) ->>>>>>> change internal storage and keying scheme in memstore          except Exception as exc:              logger.exception(exc) @@ -464,7 +489,7 @@ class MemoryStore(object):          # XXX this could return the deferred for all the enqueued operations          if not self.producer.is_queue_empty(): -            return +            return False          if any(map(lambda i: not empty(i), (self._new, self._dirty))):              logger.info("Writing messages to Soledad...") @@ -598,6 +623,7 @@ class MemoryStore(object):          """          # We can do direct assignments cause we know this will only          # be called during initialization of the mailbox. +          fdoc_store = self._fdoc_store[mbox]          for uid in flag_docs:              fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) @@ -626,7 +652,8 @@ class MemoryStore(object):          """          flags_dict = {}          uids = self.get_uids(mbox) -        fdoc_store = self._fdoc_store +        fdoc_store = self._fdoc_store[mbox] +          for uid in uids:              try:                  flags = fdoc_store[uid][fields.FLAGS_KEY] @@ -763,9 +790,10 @@ class MemoryStore(object):          :return: generator of MessageWrappers          :rtype: generator          """ -        return (self.get_message(*key) -                for key in sorted(self.iter_fdoc_keys()) -                if key in self._new or key in self._dirty) +        gm = self.get_message +        new = (gm(*key) for key in self._new) +        dirty = (gm(*key, flags_only=True) for key in self._dirty) +        return chain(new, dirty)      def all_deleted_uid_iter(self, mbox):          """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 4b95689..8b6d3f3 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -264,17 +264,15 @@ class LeapMessage(fields, MailParser, MBoxParser):              # to put it under the lock...              doc.content[self.FLAGS_KEY] = newflags              doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + +            # XXX check if this is working ok.              doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags -            if self._collection.memstore is not None: -                log.msg("putting message in collection") -                self._collection.memstore.put_message( -                    self._mbox, self._uid, -                    MessageWrapper(fdoc=doc.content, new=False, dirty=True, -                                   docs_id={'fdoc': doc.doc_id})) -            else: -                # fallback for non-memstore initializations. -                self._soledad.put_doc(doc) +            log.msg("putting message in collection") +            self._collection.memstore.put_message( +                self._mbox, self._uid, +                MessageWrapper(fdoc=doc.content, new=False, dirty=True, +                               docs_id={'fdoc': doc.doc_id}))          return map(str, newflags)      def getInternalDate(self): @@ -524,6 +522,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          finally:              return result +    # TODO move to soledadstore instead of accessing soledad directly      def _get_headers_doc(self):          """          Return the document that keeps the headers for this @@ -534,6 +533,7 @@ class LeapMessage(fields, MailParser, MBoxParser):              fields.TYPE_HEADERS_VAL, str(self.chash))          return first(head_docs) +    # TODO move to soledadstore instead of accessing soledad directly      def _get_body_doc(self):          """          Return the document that keeps the body for this @@ -1165,7 +1165,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                   or None if not found.          :rtype: LeapMessage          """ -        msg_container = self.memstore.get_message(self.mbox, uid, flags_only) +        msg_container = self.memstore.get_message( +            self.mbox, uid, flags_only=flags_only)          if msg_container is not None:              if mem_only:                  msg = LeapMessage(None, uid, self.mbox, collection=self, diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index a74b49c..6cd3749 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -212,10 +212,8 @@ class SoledadStore(ContentDedup):                        to be inserted.          :type queue: Queue          """ -        # TODO should delete the original message from incoming only after -        # the writes are done.          # TODO should handle the delete case -        # TODO should handle errors +        # TODO should handle errors better          # TODO could generalize this method into a generic consumer          # and only implement `process` here @@ -235,7 +233,7 @@ class SoledadStore(ContentDedup):              Errorback for write operations.              """              log.msg("ERROR: Error while processing item.") -            log.msg(failure.getTraceBack()) +            log.msg(failure.getTraceback())          while not queue.empty():              doc_wrapper = queue.get() @@ -354,6 +352,7 @@ class SoledadStore(ContentDedup):                  doc = self._GET_DOC_FUN(doc_id)                  doc.content = dict(item.content)                  item = doc +              try:                  call(item)              except u1db_errors.RevisionConflict as exc: @@ -451,6 +450,7 @@ class SoledadStore(ContentDedup):          :type mbox: str or unicode          :param uid: the UID for the message          :type uid: int +        :rtype: SoledadDocument or None          """          result = None          try: @@ -465,6 +465,20 @@ class SoledadStore(ContentDedup):          finally:              return result +    def get_headers_doc(self, chash): +        """ +        Return the document that keeps the headers for a message +        indexed by its content-hash. + +        :param chash: the content-hash to retrieve the document from. +        :type chash: str or unicode +        :rtype: SoledadDocument or None +        """ +        head_docs = self._soledad.get_from_index( +            fields.TYPE_C_HASH_IDX, +            fields.TYPE_HEADERS_VAL, str(chash)) +        return first(head_docs) +      def write_last_uid(self, mbox, value):          """          Write the `last_uid` integer to the proper mailbox document diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index 942acfb..8b75cfc 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -94,6 +94,7 @@ def lowerdict(_dict):  PART_MAP = "part_map" +PHASH = "phash"  def _str_dict(d, k): @@ -130,6 +131,24 @@ def stringify_parts_map(d):      return d +def phash_iter(d): +    """ +    A recursive generator that extracts all the payload-hashes +    from an arbitrary nested parts-map dictionary. + +    :param d: the dictionary to walk +    :type d: dictionary +    :return: a list of all the phashes found +    :rtype: list +    """ +    if PHASH in d: +        yield d[PHASH] +    if PART_MAP in d: +        for key in d[PART_MAP]: +            for phash in phash_iter(d[PART_MAP][key]): +                yield phash + +  class CustomJsonScanner(object):      """      This class is a context manager definition used to monkey patch the default | 
