diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-06 01:39:47 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:37:03 -0400 | 
| commit | 8c3359728b6f403b9932288b5f2df984441b150b (patch) | |
| tree | b32a48e3d237752477170666976e7cbd578b1594 | |
| parent | 553e5e27495f71cb5721b715fcae8561d37cc305 (diff) | |
defer copy and soledad writes
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 68 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 61 | 
2 files changed, 81 insertions, 48 deletions
| diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index d8af0a5..84bfa54 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -447,7 +447,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              return          exists = self.getMessageCount()          recent = self.getRecentCount() -        logger.debug("NOTIFY: there are %s messages, %s recent" % ( +        logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( +            self.mbox,              exists,              recent)) @@ -528,7 +529,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          return seq_messg      @deferred_to_thread -    #@profile      def fetch(self, messages_asked, uid):          """          Retrieve one or more messages in this mailbox. @@ -809,6 +809,44 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                           UID of the message          :type observer: Deferred          """ +        memstore = self._memstore + +        def createCopy(result): +            exist, new_fdoc, hdoc = result +            if exist: +                # Should we signal error on the callback? +                logger.warning("Destination message already exists!") + +                # XXX I'm still not clear if we should raise the +                # errback. This actually rases an ugly warning +                # in some muas like thunderbird. I guess the user does +                # not deserve that. +                observer.callback(True) +            else: +                mbox = self.mbox +                uid_next = memstore.increment_last_soledad_uid(mbox) +                new_fdoc[self.UID_KEY] = uid_next +                new_fdoc[self.MBOX_KEY] = mbox + +                # FIXME set recent! + +                self._memstore.create_message( +                    self.mbox, uid_next, +                    MessageWrapper( +                        new_fdoc, hdoc.content), +                    observer=observer, +                    notify_on_disk=False) + +        d = self._get_msg_copy(message) +        d.addCallback(createCopy) +        d.addErrback(lambda f: log.msg(f.getTraceback())) + +    @deferred_to_thread +    def _get_msg_copy(self, message): +        """ +        Get a copy of the fdoc for this message, and check whether +        it already exists. +        """          # XXX  for clarity, this could be delegated to a          # MessageCollection mixin that implements copy too, and          # moved out of here. @@ -822,7 +860,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              logger.warning("Tried to copy a MSG with no fdoc")              return          new_fdoc = copy.deepcopy(fdoc.content) -          fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]          # XXX is this hitting the db??? --- probably. @@ -830,30 +867,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          dest_fdoc = memstore.get_fdoc_from_chash(              fdoc_chash, self.mbox)          exist = dest_fdoc and not empty(dest_fdoc.content) - -        if exist: -            # Should we signal error on the callback? -            logger.warning("Destination message already exists!") - -            # XXX I'm still not clear if we should raise the -            # errback. This actually rases an ugly warning -            # in some muas like thunderbird. I guess the user does -            # not deserve that. -            observer.callback(True) -        else: -            mbox = self.mbox -            uid_next = memstore.increment_last_soledad_uid(mbox) -            new_fdoc[self.UID_KEY] = uid_next -            new_fdoc[self.MBOX_KEY] = mbox - -            # FIXME set recent! - -            self._memstore.create_message( -                self.mbox, uid_next, -                MessageWrapper( -                    new_fdoc, hdoc.content), -                observer=observer, -                notify_on_disk=False) +        return exist, new_fdoc, hdoc      # convenience fun diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index bfa53b6..13f896f 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -216,6 +216,8 @@ class SoledadStore(ContentDedup):          # TODO could generalize this method into a generic consumer          # and only implement `process` here +        from twisted.internet import reactor +          def docWriteCallBack(doc_wrapper):              """              Callback for a successful write of a document wrapper. @@ -234,10 +236,10 @@ class SoledadStore(ContentDedup):          while not queue.empty():              doc_wrapper = queue.get() +              d = defer.Deferred()              d.addCallbacks(docWriteCallBack, docWriteErrorBack) - -            self._consume_doc(doc_wrapper, d) +            reactor.callLater(0, self._consume_doc, doc_wrapper, d)      # FIXME this should not run the callback in the deferred thred      @deferred_to_thread @@ -254,8 +256,6 @@ class SoledadStore(ContentDedup):          doc_wrapper.new = False          doc_wrapper.dirty = False -    # FIXME this should not run the callback in the deferred thred -    #@deferred_to_thread      def _consume_doc(self, doc_wrapper, deferred):          """          Consume each document wrapper in a separate thread. @@ -267,33 +267,52 @@ class SoledadStore(ContentDedup):                           errback depending on whether it succeed.          :type deferred: Deferred          """ -        items = self._process(doc_wrapper) +        def notifyBack(failed, observer, doc_wrapper): +            if failed: +                observer.errback(MsgWriteError( +                    "There was an error writing the mesage")) +            else: +                observer.callback(doc_wrapper) + +        def doSoledadCalls(items, observer): +            # we prime the generator, that should return the +            # message or flags wrapper item in the first place. +            doc_wrapper = items.next() +            d_sol = self._soledad_write_document_parts(items) +            d_sol.addCallback(notifyBack, observer, doc_wrapper) +            d_sol.addErrback(ebSoledadCalls) -        # we prime the generator, that should return the -        # message or flags wrapper item in the first place. -        doc_wrapper = items.next() +        def ebSoledadCalls(failure): +            log.msg(failure.getTraceback()) + +        d = self._iter_wrapper_subparts(doc_wrapper) +        d.addCallback(doSoledadCalls, deferred) +        d.addErrback(ebSoledadCalls) + +    # +    # SoledadStore specific methods. +    # -        # From here, we unpack the subpart items and -        # the right soledad call. +    @deferred_to_thread +    def _soledad_write_document_parts(self, items): +        """ +        Write the document parts to soledad in a separate thread. +        :param items: the iterator through the different document wrappers +                      payloads. +        :type items: iterator +        """          failed = False          for item, call in items:              try:                  self._try_call(call, item)              except Exception as exc:                  logger.exception(exc) -                failed = exc +                failed = True                  continue -        if failed: -            deferred.errback(MsgWriteError( -                "There was an error writing the mesage")) -        else: -            deferred.callback(doc_wrapper) +        return failed -    # -    # SoledadStore specific methods. -    # - -    def _process(self, doc_wrapper): +    @deferred_to_thread +    def _iter_wrapper_subparts(self, doc_wrapper):          """          Return an iterator that will yield the doc_wrapper in the first place,          followed by the subparts item and the proper call type for every | 
