From 0f6a8e1c83995cffec51e81f626d4bb29d4f7345 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 31 Jan 2014 03:34:03 -0400 Subject: properly implement deferreds in several commands Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha! --- src/leap/mail/imap/soledadstore.py | 99 ++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 31 deletions(-) (limited to 'src/leap/mail/imap/soledadstore.py') diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ff5e03b..82f27e7 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,10 +23,12 @@ import threading from itertools import chain from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log from zope.interface import implements from leap.common.check import leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -209,52 +211,87 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + while not queue.empty(): - items = self._process(queue) + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: + :type doc_wrapper: + :param deferred: + :type deferred: Deferred + """ + items = self._process(doc_wrapper) - # From here, we unpack the subpart items and - # the right soledad call. + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: try: - failed = False - for item, call in items: - try: - self._try_call(call, item) - except Exception as exc: - failed = exc - continue - if failed: - raise MsgWriteError - - except MsgWriteError: - logger.error("Error while processing item.") - logger.exception(failed) - else: - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) # # SoledadStore specific methods. # - def _process(self, queue): + def _process(self, doc_wrapper): """ - Return an iterator that will yield the msg_wrapper in the first place, + Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. :param queue: the queue from where we'll pick item. :type queue: Queue """ - doc_wrapper = queue.get() - if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) -- cgit v1.2.3