summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-06 01:39:47 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:37:03 -0400
commit860e407ba0a86be30865a77ec29c6ecacf7899a4 (patch)
tree84415a8b5d2e79e6488b6a65471f662cf4f43c88 /src
parent06556ec6dc56a4859736fc2782779ee2eb9c1f55 (diff)
defer copy and soledad writes
Diffstat (limited to 'src')
-rw-r--r--src/leap/mail/imap/mailbox.py68
-rw-r--r--src/leap/mail/imap/soledadstore.py61
2 files changed, 81 insertions, 48 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index d8af0a5..84bfa54 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -447,7 +447,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
return
exists = self.getMessageCount()
recent = self.getRecentCount()
- logger.debug("NOTIFY: there are %s messages, %s recent" % (
+ logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
+ self.mbox,
exists,
recent))
@@ -528,7 +529,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
return seq_messg
@deferred_to_thread
- #@profile
def fetch(self, messages_asked, uid):
"""
Retrieve one or more messages in this mailbox.
@@ -809,6 +809,44 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
UID of the message
:type observer: Deferred
"""
+ memstore = self._memstore
+
+ def createCopy(result):
+ exist, new_fdoc, hdoc = result
+ if exist:
+ # Should we signal error on the callback?
+ logger.warning("Destination message already exists!")
+
+ # XXX I'm still not clear if we should raise the
+ # errback. This actually rases an ugly warning
+ # in some muas like thunderbird. I guess the user does
+ # not deserve that.
+ observer.callback(True)
+ else:
+ mbox = self.mbox
+ uid_next = memstore.increment_last_soledad_uid(mbox)
+ new_fdoc[self.UID_KEY] = uid_next
+ new_fdoc[self.MBOX_KEY] = mbox
+
+ # FIXME set recent!
+
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(
+ new_fdoc, hdoc.content),
+ observer=observer,
+ notify_on_disk=False)
+
+ d = self._get_msg_copy(message)
+ d.addCallback(createCopy)
+ d.addErrback(lambda f: log.msg(f.getTraceback()))
+
+ @deferred_to_thread
+ def _get_msg_copy(self, message):
+ """
+ Get a copy of the fdoc for this message, and check whether
+ it already exists.
+ """
# XXX for clarity, this could be delegated to a
# MessageCollection mixin that implements copy too, and
# moved out of here.
@@ -822,7 +860,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
logger.warning("Tried to copy a MSG with no fdoc")
return
new_fdoc = copy.deepcopy(fdoc.content)
-
fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
# XXX is this hitting the db??? --- probably.
@@ -830,30 +867,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
dest_fdoc = memstore.get_fdoc_from_chash(
fdoc_chash, self.mbox)
exist = dest_fdoc and not empty(dest_fdoc.content)
-
- if exist:
- # Should we signal error on the callback?
- logger.warning("Destination message already exists!")
-
- # XXX I'm still not clear if we should raise the
- # errback. This actually rases an ugly warning
- # in some muas like thunderbird. I guess the user does
- # not deserve that.
- observer.callback(True)
- else:
- mbox = self.mbox
- uid_next = memstore.increment_last_soledad_uid(mbox)
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = mbox
-
- # FIXME set recent!
-
- self._memstore.create_message(
- self.mbox, uid_next,
- MessageWrapper(
- new_fdoc, hdoc.content),
- observer=observer,
- notify_on_disk=False)
+ return exist, new_fdoc, hdoc
# convenience fun
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index bfa53b6..13f896f 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -216,6 +216,8 @@ class SoledadStore(ContentDedup):
# TODO could generalize this method into a generic consumer
# and only implement `process` here
+ from twisted.internet import reactor
+
def docWriteCallBack(doc_wrapper):
"""
Callback for a successful write of a document wrapper.
@@ -234,10 +236,10 @@ class SoledadStore(ContentDedup):
while not queue.empty():
doc_wrapper = queue.get()
+
d = defer.Deferred()
d.addCallbacks(docWriteCallBack, docWriteErrorBack)
-
- self._consume_doc(doc_wrapper, d)
+ reactor.callLater(0, self._consume_doc, doc_wrapper, d)
# FIXME this should not run the callback in the deferred thred
@deferred_to_thread
@@ -254,8 +256,6 @@ class SoledadStore(ContentDedup):
doc_wrapper.new = False
doc_wrapper.dirty = False
- # FIXME this should not run the callback in the deferred thred
- #@deferred_to_thread
def _consume_doc(self, doc_wrapper, deferred):
"""
Consume each document wrapper in a separate thread.
@@ -267,33 +267,52 @@ class SoledadStore(ContentDedup):
errback depending on whether it succeed.
:type deferred: Deferred
"""
- items = self._process(doc_wrapper)
+ def notifyBack(failed, observer, doc_wrapper):
+ if failed:
+ observer.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ observer.callback(doc_wrapper)
+
+ def doSoledadCalls(items, observer):
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+ d_sol = self._soledad_write_document_parts(items)
+ d_sol.addCallback(notifyBack, observer, doc_wrapper)
+ d_sol.addErrback(ebSoledadCalls)
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ def ebSoledadCalls(failure):
+ log.msg(failure.getTraceback())
+
+ d = self._iter_wrapper_subparts(doc_wrapper)
+ d.addCallback(doSoledadCalls, deferred)
+ d.addErrback(ebSoledadCalls)
+
+ #
+ # SoledadStore specific methods.
+ #
- # From here, we unpack the subpart items and
- # the right soledad call.
+ @deferred_to_thread
+ def _soledad_write_document_parts(self, items):
+ """
+ Write the document parts to soledad in a separate thread.
+ :param items: the iterator through the different document wrappers
+ payloads.
+ :type items: iterator
+ """
failed = False
for item, call in items:
try:
self._try_call(call, item)
except Exception as exc:
logger.exception(exc)
- failed = exc
+ failed = True
continue
- if failed:
- deferred.errback(MsgWriteError(
- "There was an error writing the mesage"))
- else:
- deferred.callback(doc_wrapper)
+ return failed
- #
- # SoledadStore specific methods.
- #
-
- def _process(self, doc_wrapper):
+ @deferred_to_thread
+ def _iter_wrapper_subparts(self, doc_wrapper):
"""
Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every