summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-07 07:27:38 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:39:48 -0400
commitaa0f73ae27714f71bd71eb47b7f0a54b320bec38 (patch)
tree4cd3dc1780471f8432dc28f454667aa56d41d02a
parentf3a6c1933138acdbb69f926e160b25ec3e4097ea (diff)
defer_to_thread the bulk of write operations
and batch the notifications back to the memorystore, within the reactor thread.
-rw-r--r--mail/src/leap/mail/imap/memorystore.py9
-rw-r--r--mail/src/leap/mail/imap/soledadstore.py88
2 files changed, 39 insertions, 58 deletions
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py
index ee3ee925..786a9c46 100644
--- a/mail/src/leap/mail/imap/memorystore.py
+++ b/mail/src/leap/mail/imap/memorystore.py
@@ -380,7 +380,7 @@ class MemoryStore(object):
if empty(doc_id):
fdoc = self._permanent_store.get_flags_doc(mbox, uid)
- if empty(fdoc.content):
+ if empty(fdoc) or empty(fdoc.content):
return None
doc_id = fdoc.doc_id
self._fdoc_id_store[mbox][uid] = doc_id
@@ -706,9 +706,10 @@ class MemoryStore(object):
:rtype: iterable
"""
fdocs = self._fdoc_store[mbox]
+
return [uid for uid, value
in fdocs.items()
- if fields.SEEN_FLAG not in value["flags"]]
+ if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])]
def get_cdoc_from_phash(self, phash):
"""
@@ -760,7 +761,7 @@ class MemoryStore(object):
# We want to create a new one in this case.
# Hmmm what if the deletion is un-done?? We would end with a
# duplicate...
- if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []):
return None
uid = fdoc[fields.UID_KEY]
@@ -810,7 +811,7 @@ class MemoryStore(object):
fdocs = self._fdoc_store[mbox]
return [uid for uid, value
in fdocs.items()
- if fields.DELETED_FLAG in value["flags"]]
+ if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])]
# new, dirty flags
diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py
index 6cd3749a..e7c6b297 100644
--- a/mail/src/leap/mail/imap/soledadstore.py
+++ b/mail/src/leap/mail/imap/soledadstore.py
@@ -23,7 +23,6 @@ import threading
from itertools import chain
from u1db import errors as u1db_errors
-from twisted.internet import defer
from twisted.python import log
from zope.interface import implements
@@ -35,7 +34,7 @@ from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
-from leap.mail.utils import first, empty
+from leap.mail.utils import first, empty, accumulator_queue
logger = logging.getLogger(__name__)
@@ -142,12 +141,18 @@ class SoledadStore(ContentDedup):
:param soledad: the soledad instance
:type soledad: Soledad
"""
+ from twisted.internet import reactor
self._soledad = soledad
self._CREATE_DOC_FUN = self._soledad.create_doc
self._PUT_DOC_FUN = self._soledad.put_doc
self._GET_DOC_FUN = self._soledad.get_doc
+ # we instantiate an accumulator to batch the notifications
+ self.docs_notify_queue = accumulator_queue(
+ lambda item: reactor.callFromThread(self._unset_new_dirty, item),
+ 20)
+
# IMessageStore
# -------------------------------------------------------------------
@@ -202,7 +207,10 @@ class SoledadStore(ContentDedup):
# IMessageConsumer
- # It's not thread-safe to defer this to a different thread
+ # TODO should handle the delete case
+ # TODO should handle errors better
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
def consume(self, queue):
"""
@@ -212,38 +220,16 @@ class SoledadStore(ContentDedup):
to be inserted.
:type queue: Queue
"""
- # TODO should handle the delete case
- # TODO should handle errors better
- # TODO could generalize this method into a generic consumer
- # and only implement `process` here
-
from twisted.internet import reactor
- def docWriteCallBack(doc_wrapper):
- """
- Callback for a successful write of a document wrapper.
- """
- if isinstance(doc_wrapper, MessageWrapper):
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- self._unset_new_dirty(doc_wrapper)
-
- def docWriteErrorBack(failure):
- """
- Errorback for write operations.
- """
- log.msg("ERROR: Error while processing item.")
- log.msg(failure.getTraceback())
-
while not queue.empty():
doc_wrapper = queue.get()
+ reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
- d = defer.Deferred()
- d.addCallbacks(docWriteCallBack, docWriteErrorBack)
- reactor.callLater(0, self._consume_doc, doc_wrapper, d)
+ # Queue empty, flush the notifications queue.
+ self.docs_notify_queue(None, flush=True)
- # FIXME this should not run the callback in the deferred thred
- @deferred_to_thread
def _unset_new_dirty(self, doc_wrapper):
"""
Unset the `new` and `dirty` flags for this document wrapper in the
@@ -252,49 +238,38 @@ class SoledadStore(ContentDedup):
:param doc_wrapper: a MessageWrapper instance
:type doc_wrapper: MessageWrapper
"""
- # XXX debug msg id/mbox?
- logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
+ if isinstance(doc_wrapper, MessageWrapper):
+ logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
- def _consume_doc(self, doc_wrapper, deferred):
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, notify_queue):
"""
Consume each document wrapper in a separate thread.
:param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
:type doc_wrapper: MessageWrapper or RecentFlagsDoc
- :param deferred: a deferred that will be fired when the write operation
- has finished, either calling its callback or its
- errback depending on whether it succeed.
- :type deferred: Deferred
"""
- def notifyBack(failed, observer, doc_wrapper):
+ def queueNotifyBack(failed, doc_wrapper):
if failed:
- observer.errback(MsgWriteError(
- "There was an error writing the mesage"))
+ log.msg("There was an error writing the mesage...")
else:
- observer.callback(doc_wrapper)
+ notify_queue(doc_wrapper)
- def doSoledadCalls(items, observer):
+ def doSoledadCalls(items):
# we prime the generator, that should return the
# message or flags wrapper item in the first place.
doc_wrapper = items.next()
- d_sol = self._soledad_write_document_parts(items)
- d_sol.addCallback(notifyBack, observer, doc_wrapper)
- d_sol.addErrback(ebSoledadCalls)
-
- def ebSoledadCalls(failure):
- log.msg(failure.getTraceback())
+ failed = self._soledad_write_document_parts(items)
+ queueNotifyBack(failed, doc_wrapper)
- d = self._iter_wrapper_subparts(doc_wrapper)
- d.addCallback(doSoledadCalls, deferred)
- d.addErrback(ebSoledadCalls)
+ doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
#
# SoledadStore specific methods.
#
- @deferred_to_thread
def _soledad_write_document_parts(self, items):
"""
Write the document parts to soledad in a separate thread.
@@ -314,7 +289,6 @@ class SoledadStore(ContentDedup):
continue
return failed
- @deferred_to_thread
def _iter_wrapper_subparts(self, doc_wrapper):
"""
Return an iterator that will yield the doc_wrapper in the first place,
@@ -350,6 +324,12 @@ class SoledadStore(ContentDedup):
if call == self._PUT_DOC_FUN:
doc_id = item.doc_id
doc = self._GET_DOC_FUN(doc_id)
+
+ if doc is None:
+ logger.warning("BUG! Dirty doc but could not "
+ "find document %s" % (doc_id,))
+ return
+
doc.content = dict(item.content)
item = doc