From 860e407ba0a86be30865a77ec29c6ecacf7899a4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 6 Feb 2014 01:39:47 -0400 Subject: defer copy and soledad writes --- src/leap/mail/imap/soledadstore.py | 61 +++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 21 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index bfa53b6..13f896f 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -216,6 +216,8 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + from twisted.internet import reactor + def docWriteCallBack(doc_wrapper): """ Callback for a successful write of a document wrapper. @@ -234,10 +236,10 @@ class SoledadStore(ContentDedup): while not queue.empty(): doc_wrapper = queue.get() + d = defer.Deferred() d.addCallbacks(docWriteCallBack, docWriteErrorBack) - - self._consume_doc(doc_wrapper, d) + reactor.callLater(0, self._consume_doc, doc_wrapper, d) # FIXME this should not run the callback in the deferred thred @deferred_to_thread @@ -254,8 +256,6 @@ class SoledadStore(ContentDedup): doc_wrapper.new = False doc_wrapper.dirty = False - # FIXME this should not run the callback in the deferred thred - #@deferred_to_thread def _consume_doc(self, doc_wrapper, deferred): """ Consume each document wrapper in a separate thread. @@ -267,33 +267,52 @@ class SoledadStore(ContentDedup): errback depending on whether it succeed. :type deferred: Deferred """ - items = self._process(doc_wrapper) + def notifyBack(failed, observer, doc_wrapper): + if failed: + observer.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + observer.callback(doc_wrapper) + + def doSoledadCalls(items, observer): + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + d_sol = self._soledad_write_document_parts(items) + d_sol.addCallback(notifyBack, observer, doc_wrapper) + d_sol.addErrback(ebSoledadCalls) - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + def ebSoledadCalls(failure): + log.msg(failure.getTraceback()) + + d = self._iter_wrapper_subparts(doc_wrapper) + d.addCallback(doSoledadCalls, deferred) + d.addErrback(ebSoledadCalls) + + # + # SoledadStore specific methods. + # - # From here, we unpack the subpart items and - # the right soledad call. + @deferred_to_thread + def _soledad_write_document_parts(self, items): + """ + Write the document parts to soledad in a separate thread. + :param items: the iterator through the different document wrappers + payloads. + :type items: iterator + """ failed = False for item, call in items: try: self._try_call(call, item) except Exception as exc: logger.exception(exc) - failed = exc + failed = True continue - if failed: - deferred.errback(MsgWriteError( - "There was an error writing the mesage")) - else: - deferred.callback(doc_wrapper) + return failed - # - # SoledadStore specific methods. - # - - def _process(self, doc_wrapper): + @deferred_to_thread + def _iter_wrapper_subparts(self, doc_wrapper): """ Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every -- cgit v1.2.3