summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/soledadstore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-06 01:39:47 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:37:03 -0400
commit860e407ba0a86be30865a77ec29c6ecacf7899a4 (patch)
tree84415a8b5d2e79e6488b6a65471f662cf4f43c88 /src/leap/mail/imap/soledadstore.py
parent06556ec6dc56a4859736fc2782779ee2eb9c1f55 (diff)
defer copy and soledad writes
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r--src/leap/mail/imap/soledadstore.py61
1 files changed, 40 insertions, 21 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index bfa53b6..13f896f 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -216,6 +216,8 @@ class SoledadStore(ContentDedup):
# TODO could generalize this method into a generic consumer
# and only implement `process` here
+ from twisted.internet import reactor
+
def docWriteCallBack(doc_wrapper):
"""
Callback for a successful write of a document wrapper.
@@ -234,10 +236,10 @@ class SoledadStore(ContentDedup):
while not queue.empty():
doc_wrapper = queue.get()
+
d = defer.Deferred()
d.addCallbacks(docWriteCallBack, docWriteErrorBack)
-
- self._consume_doc(doc_wrapper, d)
+ reactor.callLater(0, self._consume_doc, doc_wrapper, d)
# FIXME this should not run the callback in the deferred thred
@deferred_to_thread
@@ -254,8 +256,6 @@ class SoledadStore(ContentDedup):
doc_wrapper.new = False
doc_wrapper.dirty = False
- # FIXME this should not run the callback in the deferred thred
- #@deferred_to_thread
def _consume_doc(self, doc_wrapper, deferred):
"""
Consume each document wrapper in a separate thread.
@@ -267,33 +267,52 @@ class SoledadStore(ContentDedup):
errback depending on whether it succeed.
:type deferred: Deferred
"""
- items = self._process(doc_wrapper)
+ def notifyBack(failed, observer, doc_wrapper):
+ if failed:
+ observer.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ observer.callback(doc_wrapper)
+
+ def doSoledadCalls(items, observer):
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+ d_sol = self._soledad_write_document_parts(items)
+ d_sol.addCallback(notifyBack, observer, doc_wrapper)
+ d_sol.addErrback(ebSoledadCalls)
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ def ebSoledadCalls(failure):
+ log.msg(failure.getTraceback())
+
+ d = self._iter_wrapper_subparts(doc_wrapper)
+ d.addCallback(doSoledadCalls, deferred)
+ d.addErrback(ebSoledadCalls)
+
+ #
+ # SoledadStore specific methods.
+ #
- # From here, we unpack the subpart items and
- # the right soledad call.
+ @deferred_to_thread
+ def _soledad_write_document_parts(self, items):
+ """
+ Write the document parts to soledad in a separate thread.
+ :param items: the iterator through the different document wrappers
+ payloads.
+ :type items: iterator
+ """
failed = False
for item, call in items:
try:
self._try_call(call, item)
except Exception as exc:
logger.exception(exc)
- failed = exc
+ failed = True
continue
- if failed:
- deferred.errback(MsgWriteError(
- "There was an error writing the mesage"))
- else:
- deferred.callback(doc_wrapper)
+ return failed
- #
- # SoledadStore specific methods.
- #
-
- def _process(self, doc_wrapper):
+ @deferred_to_thread
+ def _iter_wrapper_subparts(self, doc_wrapper):
"""
Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every