From 9ffcaa09c2d6a57f3f34350298eff8412b540bc9 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 07:27:38 -0400 Subject: defer_to_thread the bulk of write operations and batch the notifications back to the memorystore, within the reactor thread. --- src/leap/mail/imap/memorystore.py | 9 ++-- src/leap/mail/imap/soledadstore.py | 88 +++++++++++++++----------------------- 2 files changed, 39 insertions(+), 58 deletions(-) (limited to 'src/leap/mail') diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index ee3ee92..786a9c4 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -380,7 +380,7 @@ class MemoryStore(object): if empty(doc_id): fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if empty(fdoc.content): + if empty(fdoc) or empty(fdoc.content): return None doc_id = fdoc.doc_id self._fdoc_id_store[mbox][uid] = doc_id @@ -706,9 +706,10 @@ class MemoryStore(object): :rtype: iterable """ fdocs = self._fdoc_store[mbox] + return [uid for uid, value in fdocs.items() - if fields.SEEN_FLAG not in value["flags"]] + if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])] def get_cdoc_from_phash(self, phash): """ @@ -760,7 +761,7 @@ class MemoryStore(object): # We want to create a new one in this case. # Hmmm what if the deletion is un-done?? We would end with a # duplicate... - if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []): return None uid = fdoc[fields.UID_KEY] @@ -810,7 +811,7 @@ class MemoryStore(object): fdocs = self._fdoc_store[mbox] return [uid for uid, value in fdocs.items() - if fields.DELETED_FLAG in value["flags"]] + if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])] # new, dirty flags diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 6cd3749..e7c6b29 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,7 +23,6 @@ import threading from itertools import chain from u1db import errors as u1db_errors -from twisted.internet import defer from twisted.python import log from zope.interface import implements @@ -35,7 +34,7 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first, empty +from leap.mail.utils import first, empty, accumulator_queue logger = logging.getLogger(__name__) @@ -142,12 +141,18 @@ class SoledadStore(ContentDedup): :param soledad: the soledad instance :type soledad: Soledad """ + from twisted.internet import reactor self._soledad = soledad self._CREATE_DOC_FUN = self._soledad.create_doc self._PUT_DOC_FUN = self._soledad.put_doc self._GET_DOC_FUN = self._soledad.get_doc + # we instantiate an accumulator to batch the notifications + self.docs_notify_queue = accumulator_queue( + lambda item: reactor.callFromThread(self._unset_new_dirty, item), + 20) + # IMessageStore # ------------------------------------------------------------------- @@ -202,7 +207,10 @@ class SoledadStore(ContentDedup): # IMessageConsumer - # It's not thread-safe to defer this to a different thread + # TODO should handle the delete case + # TODO should handle errors better + # TODO could generalize this method into a generic consumer + # and only implement `process` here def consume(self, queue): """ @@ -212,38 +220,16 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - # TODO should handle the delete case - # TODO should handle errors better - # TODO could generalize this method into a generic consumer - # and only implement `process` here - from twisted.internet import reactor - def docWriteCallBack(doc_wrapper): - """ - Callback for a successful write of a document wrapper. - """ - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - self._unset_new_dirty(doc_wrapper) - - def docWriteErrorBack(failure): - """ - Errorback for write operations. - """ - log.msg("ERROR: Error while processing item.") - log.msg(failure.getTraceback()) - while not queue.empty(): doc_wrapper = queue.get() + reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) - d = defer.Deferred() - d.addCallbacks(docWriteCallBack, docWriteErrorBack) - reactor.callLater(0, self._consume_doc, doc_wrapper, d) + # Queue empty, flush the notifications queue. + self.docs_notify_queue(None, flush=True) - # FIXME this should not run the callback in the deferred thred - @deferred_to_thread def _unset_new_dirty(self, doc_wrapper): """ Unset the `new` and `dirty` flags for this document wrapper in the @@ -252,49 +238,38 @@ class SoledadStore(ContentDedup): :param doc_wrapper: a MessageWrapper instance :type doc_wrapper: MessageWrapper """ - # XXX debug msg id/mbox? - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + if isinstance(doc_wrapper, MessageWrapper): + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - def _consume_doc(self, doc_wrapper, deferred): + @deferred_to_thread + def _consume_doc(self, doc_wrapper, notify_queue): """ Consume each document wrapper in a separate thread. :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance :type doc_wrapper: MessageWrapper or RecentFlagsDoc - :param deferred: a deferred that will be fired when the write operation - has finished, either calling its callback or its - errback depending on whether it succeed. - :type deferred: Deferred """ - def notifyBack(failed, observer, doc_wrapper): + def queueNotifyBack(failed, doc_wrapper): if failed: - observer.errback(MsgWriteError( - "There was an error writing the mesage")) + log.msg("There was an error writing the mesage...") else: - observer.callback(doc_wrapper) + notify_queue(doc_wrapper) - def doSoledadCalls(items, observer): + def doSoledadCalls(items): # we prime the generator, that should return the # message or flags wrapper item in the first place. doc_wrapper = items.next() - d_sol = self._soledad_write_document_parts(items) - d_sol.addCallback(notifyBack, observer, doc_wrapper) - d_sol.addErrback(ebSoledadCalls) - - def ebSoledadCalls(failure): - log.msg(failure.getTraceback()) + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) - d = self._iter_wrapper_subparts(doc_wrapper) - d.addCallback(doSoledadCalls, deferred) - d.addErrback(ebSoledadCalls) + doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) # # SoledadStore specific methods. # - @deferred_to_thread def _soledad_write_document_parts(self, items): """ Write the document parts to soledad in a separate thread. @@ -314,7 +289,6 @@ class SoledadStore(ContentDedup): continue return failed - @deferred_to_thread def _iter_wrapper_subparts(self, doc_wrapper): """ Return an iterator that will yield the doc_wrapper in the first place, @@ -350,6 +324,12 @@ class SoledadStore(ContentDedup): if call == self._PUT_DOC_FUN: doc_id = item.doc_id doc = self._GET_DOC_FUN(doc_id) + + if doc is None: + logger.warning("BUG! Dirty doc but could not " + "find document %s" % (doc_id,)) + return + doc.content = dict(item.content) item = doc -- cgit v1.2.3