diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-07 07:27:38 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:39:48 -0400 | 
| commit | aa0f73ae27714f71bd71eb47b7f0a54b320bec38 (patch) | |
| tree | 4cd3dc1780471f8432dc28f454667aa56d41d02a /mail/src | |
| parent | f3a6c1933138acdbb69f926e160b25ec3e4097ea (diff) | |
defer_to_thread the bulk of write operations
and batch the notifications back to the memorystore,
within the reactor thread.
Diffstat (limited to 'mail/src')
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 9 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 88 | 
2 files changed, 39 insertions, 58 deletions
| diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index ee3ee92..786a9c4 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -380,7 +380,7 @@ class MemoryStore(object):          if empty(doc_id):              fdoc = self._permanent_store.get_flags_doc(mbox, uid) -            if empty(fdoc.content): +            if empty(fdoc) or empty(fdoc.content):                  return None              doc_id = fdoc.doc_id              self._fdoc_id_store[mbox][uid] = doc_id @@ -706,9 +706,10 @@ class MemoryStore(object):          :rtype: iterable          """          fdocs = self._fdoc_store[mbox] +          return [uid for uid, value                  in fdocs.items() -                if fields.SEEN_FLAG not in value["flags"]] +                if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])]      def get_cdoc_from_phash(self, phash):          """ @@ -760,7 +761,7 @@ class MemoryStore(object):          # We want to create a new one in this case.          # Hmmm what if the deletion is un-done?? We would end with a          # duplicate... -        if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: +        if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []):              return None          uid = fdoc[fields.UID_KEY] @@ -810,7 +811,7 @@ class MemoryStore(object):          fdocs = self._fdoc_store[mbox]          return [uid for uid, value                  in fdocs.items() -                if fields.DELETED_FLAG in value["flags"]] +                if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])]      # new, dirty flags diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index 6cd3749..e7c6b29 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -23,7 +23,6 @@ import threading  from itertools import chain  from u1db import errors as u1db_errors -from twisted.internet import defer  from twisted.python import log  from zope.interface import implements @@ -35,7 +34,7 @@ from leap.mail.imap.messageparts import RecentFlagsDoc  from leap.mail.imap.fields import fields  from leap.mail.imap.interfaces import IMessageStore  from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first, empty +from leap.mail.utils import first, empty, accumulator_queue  logger = logging.getLogger(__name__) @@ -142,12 +141,18 @@ class SoledadStore(ContentDedup):          :param soledad: the soledad instance          :type soledad: Soledad          """ +        from twisted.internet import reactor          self._soledad = soledad          self._CREATE_DOC_FUN = self._soledad.create_doc          self._PUT_DOC_FUN = self._soledad.put_doc          self._GET_DOC_FUN = self._soledad.get_doc +        # we instantiate an accumulator to batch the notifications +        self.docs_notify_queue = accumulator_queue( +            lambda item: reactor.callFromThread(self._unset_new_dirty, item), +            20) +      # IMessageStore      # ------------------------------------------------------------------- @@ -202,7 +207,10 @@ class SoledadStore(ContentDedup):      # IMessageConsumer -    # It's not thread-safe to defer this to a different thread +    # TODO should handle the delete case +    # TODO should handle errors better +    # TODO could generalize this method into a generic consumer +    # and only implement `process` here      def consume(self, queue):          """ @@ -212,38 +220,16 @@ class SoledadStore(ContentDedup):                        to be inserted.          :type queue: Queue          """ -        # TODO should handle the delete case -        # TODO should handle errors better -        # TODO could generalize this method into a generic consumer -        # and only implement `process` here -          from twisted.internet import reactor -        def docWriteCallBack(doc_wrapper): -            """ -            Callback for a successful write of a document wrapper. -            """ -            if isinstance(doc_wrapper, MessageWrapper): -                # If everything went well, we can unset the new flag -                # in the source store (memory store) -                self._unset_new_dirty(doc_wrapper) - -        def docWriteErrorBack(failure): -            """ -            Errorback for write operations. -            """ -            log.msg("ERROR: Error while processing item.") -            log.msg(failure.getTraceback()) -          while not queue.empty():              doc_wrapper = queue.get() +            reactor.callInThread(self._consume_doc, doc_wrapper, +                                 self.docs_notify_queue) -            d = defer.Deferred() -            d.addCallbacks(docWriteCallBack, docWriteErrorBack) -            reactor.callLater(0, self._consume_doc, doc_wrapper, d) +        # Queue empty, flush the notifications queue. +        self.docs_notify_queue(None, flush=True) -    # FIXME this should not run the callback in the deferred thred -    @deferred_to_thread      def _unset_new_dirty(self, doc_wrapper):          """          Unset the `new` and `dirty` flags for this document wrapper in the @@ -252,49 +238,38 @@ class SoledadStore(ContentDedup):          :param doc_wrapper: a MessageWrapper instance          :type doc_wrapper: MessageWrapper          """ -        # XXX debug msg id/mbox? -        logger.info("unsetting new flag!") -        doc_wrapper.new = False -        doc_wrapper.dirty = False +        if isinstance(doc_wrapper, MessageWrapper): +            logger.info("unsetting new flag!") +            doc_wrapper.new = False +            doc_wrapper.dirty = False -    def _consume_doc(self, doc_wrapper, deferred): +    @deferred_to_thread +    def _consume_doc(self, doc_wrapper, notify_queue):          """          Consume each document wrapper in a separate thread.          :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance          :type doc_wrapper: MessageWrapper or RecentFlagsDoc -        :param deferred: a deferred that will be fired when the write operation -                         has finished, either calling its callback or its -                         errback depending on whether it succeed. -        :type deferred: Deferred          """ -        def notifyBack(failed, observer, doc_wrapper): +        def queueNotifyBack(failed, doc_wrapper):              if failed: -                observer.errback(MsgWriteError( -                    "There was an error writing the mesage")) +                log.msg("There was an error writing the mesage...")              else: -                observer.callback(doc_wrapper) +                notify_queue(doc_wrapper) -        def doSoledadCalls(items, observer): +        def doSoledadCalls(items):              # we prime the generator, that should return the              # message or flags wrapper item in the first place.              doc_wrapper = items.next() -            d_sol = self._soledad_write_document_parts(items) -            d_sol.addCallback(notifyBack, observer, doc_wrapper) -            d_sol.addErrback(ebSoledadCalls) - -        def ebSoledadCalls(failure): -            log.msg(failure.getTraceback()) +            failed = self._soledad_write_document_parts(items) +            queueNotifyBack(failed, doc_wrapper) -        d = self._iter_wrapper_subparts(doc_wrapper) -        d.addCallback(doSoledadCalls, deferred) -        d.addErrback(ebSoledadCalls) +        doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))      #      # SoledadStore specific methods.      # -    @deferred_to_thread      def _soledad_write_document_parts(self, items):          """          Write the document parts to soledad in a separate thread. @@ -314,7 +289,6 @@ class SoledadStore(ContentDedup):                  continue          return failed -    @deferred_to_thread      def _iter_wrapper_subparts(self, doc_wrapper):          """          Return an iterator that will yield the doc_wrapper in the first place, @@ -350,6 +324,12 @@ class SoledadStore(ContentDedup):              if call == self._PUT_DOC_FUN:                  doc_id = item.doc_id                  doc = self._GET_DOC_FUN(doc_id) + +                if doc is None: +                    logger.warning("BUG! Dirty doc but could not " +                                   "find document %s" % (doc_id,)) +                    return +                  doc.content = dict(item.content)                  item = doc | 
