diff options
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r-- | src/leap/mail/imap/soledadstore.py | 99 |
1 files changed, 68 insertions, 31 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ff5e03b..82f27e7 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,10 +23,12 @@ import threading from itertools import chain from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log from zope.interface import implements from leap.common.check import leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -209,52 +211,87 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + while not queue.empty(): - items = self._process(queue) + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: + :type doc_wrapper: + :param deferred: + :type deferred: Deferred + """ + items = self._process(doc_wrapper) - # From here, we unpack the subpart items and - # the right soledad call. + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: try: - failed = False - for item, call in items: - try: - self._try_call(call, item) - except Exception as exc: - failed = exc - continue - if failed: - raise MsgWriteError - - except MsgWriteError: - logger.error("Error while processing item.") - logger.exception(failed) - else: - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) # # SoledadStore specific methods. # - def _process(self, queue): + def _process(self, doc_wrapper): """ - Return an iterator that will yield the msg_wrapper in the first place, + Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. :param queue: the queue from where we'll pick item. :type queue: Queue """ - doc_wrapper = queue.get() - if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) |