summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-31 03:34:03 -0400
committerKali Kaneko <kali@leap.se>2014-01-31 11:24:50 -0400
commit0f6a8e1c83995cffec51e81f626d4bb29d4f7345 (patch)
tree97d068f620403b4042058fcb6d6566c06a910177 /src/leap/mail/imap/soledadstore.py
parentff7de0c9bc760e097c0286d2d62a19095be3f35e (diff)
properly implement deferreds in several commands
Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha!
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py99
1 files changed, 68 insertions, 31 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index ff5e03b..82f27e7 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -23,10 +23,12 @@ import threading
from itertools import chain
from u1db import errors as u1db_errors
+from twisted.internet import defer
+from twisted.python import log
from zope.interface import implements
from leap.common.check import leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
@@ -209,52 +211,87 @@ class SoledadStore(ContentDedup):
# TODO could generalize this method into a generic consumer
# and only implement `process` here
+ def docWriteCallBack(doc_wrapper):
+ """
+ Callback for a successful write of a document wrapper.
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ self._unset_new_dirty(doc_wrapper)
+
+ def docWriteErrorBack(failure):
+ """
+ Errorback for write operations.
+ """
+ log.error("Error while processing item.")
+ log.msg(failure.getTraceBack())
+
while not queue.empty():
- items = self._process(queue)
+ doc_wrapper = queue.get()
+ d = defer.Deferred()
+ d.addCallbacks(docWriteCallBack, docWriteErrorBack)
+
+ self._consume_doc(doc_wrapper, d)
+
+ @deferred_to_thread
+ def _unset_new_dirty(self, doc_wrapper):
+ """
+ Unset the `new` and `dirty` flags for this document wrapper in the
+ memory store.
+
+ :param doc_wrapper: a MessageWrapper instance
+ :type doc_wrapper: MessageWrapper
+ """
+ # XXX debug msg id/mbox?
+ logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, deferred):
+ """
+ Consume each document wrapper in a separate thread.
+
+ :param doc_wrapper:
+ :type doc_wrapper:
+ :param deferred:
+ :type deferred: Deferred
+ """
+ items = self._process(doc_wrapper)
- # From here, we unpack the subpart items and
- # the right soledad call.
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ failed = False
+ for item, call in items:
try:
- failed = False
- for item, call in items:
- try:
- self._try_call(call, item)
- except Exception as exc:
- failed = exc
- continue
- if failed:
- raise MsgWriteError
-
- except MsgWriteError:
- logger.error("Error while processing item.")
- logger.exception(failed)
- else:
- if isinstance(doc_wrapper, MessageWrapper):
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
+ self._try_call(call, item)
+ except Exception as exc:
+ failed = exc
+ continue
+ if failed:
+ deferred.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ deferred.callback(doc_wrapper)
#
# SoledadStore specific methods.
#
- def _process(self, queue):
+ def _process(self, doc_wrapper):
"""
- Return an iterator that will yield the msg_wrapper in the first place,
+ Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every
item in the queue, if any.
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
- doc_wrapper = queue.get()
-
if isinstance(doc_wrapper, MessageWrapper):
return chain((doc_wrapper,),
self._get_calls_for_msg_parts(doc_wrapper))