summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py99
1 files changed, 68 insertions, 31 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index ff5e03b..82f27e7 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -23,10 +23,12 @@ import threading
from itertools import chain
from u1db import errors as u1db_errors
+from twisted.internet import defer
+from twisted.python import log
from zope.interface import implements
from leap.common.check import leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
@@ -209,52 +211,87 @@ class SoledadStore(ContentDedup):
# TODO could generalize this method into a generic consumer
# and only implement `process` here
+ def docWriteCallBack(doc_wrapper):
+ """
+ Callback for a successful write of a document wrapper.
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ self._unset_new_dirty(doc_wrapper)
+
+ def docWriteErrorBack(failure):
+ """
+ Errorback for write operations.
+ """
+ log.error("Error while processing item.")
+ log.msg(failure.getTraceBack())
+
while not queue.empty():
- items = self._process(queue)
+ doc_wrapper = queue.get()
+ d = defer.Deferred()
+ d.addCallbacks(docWriteCallBack, docWriteErrorBack)
+
+ self._consume_doc(doc_wrapper, d)
+
+ @deferred_to_thread
+ def _unset_new_dirty(self, doc_wrapper):
+ """
+ Unset the `new` and `dirty` flags for this document wrapper in the
+ memory store.
+
+ :param doc_wrapper: a MessageWrapper instance
+ :type doc_wrapper: MessageWrapper
+ """
+ # XXX debug msg id/mbox?
+ logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, deferred):
+ """
+ Consume each document wrapper in a separate thread.
+
+ :param doc_wrapper:
+ :type doc_wrapper:
+ :param deferred:
+ :type deferred: Deferred
+ """
+ items = self._process(doc_wrapper)
- # From here, we unpack the subpart items and
- # the right soledad call.
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ failed = False
+ for item, call in items:
try:
- failed = False
- for item, call in items:
- try:
- self._try_call(call, item)
- except Exception as exc:
- failed = exc
- continue
- if failed:
- raise MsgWriteError
-
- except MsgWriteError:
- logger.error("Error while processing item.")
- logger.exception(failed)
- else:
- if isinstance(doc_wrapper, MessageWrapper):
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
+ self._try_call(call, item)
+ except Exception as exc:
+ failed = exc
+ continue
+ if failed:
+ deferred.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ deferred.callback(doc_wrapper)
#
# SoledadStore specific methods.
#
- def _process(self, queue):
+ def _process(self, doc_wrapper):
"""
- Return an iterator that will yield the msg_wrapper in the first place,
+ Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every
item in the queue, if any.
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
- doc_wrapper = queue.get()
-
if isinstance(doc_wrapper, MessageWrapper):
return chain((doc_wrapper,),
self._get_calls_for_msg_parts(doc_wrapper))