diff options
author | Tomás Touceda <chiiph@leap.se> | 2014-02-17 12:45:19 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2014-02-17 12:45:19 -0300 |
commit | 32e3c5ddaa5df30a573762e273f3a12f7eb3c255 (patch) | |
tree | 0924be918d990628e73eb2059fb6eb1200748b7c /src/leap/mail/imap/soledadstore.py | |
parent | 7828c517ae162de4676a71e05f77339598acd6f7 (diff) | |
parent | 985ff0a78a8df0eafb7789383f711b9e5ceb1cb6 (diff) |
Merge remote-tracking branch 'refs/remotes/kali/bug/separate_deferreds_threads' into develop
Diffstat (limited to 'src/leap/mail/imap/soledadstore.py')
-rw-r--r-- | src/leap/mail/imap/soledadstore.py | 369 |
1 files changed, 245 insertions, 124 deletions
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 8e22f26..732fe03 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -20,14 +20,14 @@ A MessageStore that writes to Soledad. import logging import threading +from collections import defaultdict from itertools import chain from u1db import errors as u1db_errors -from twisted.internet import defer from twisted.python import log from zope.interface import implements -from leap.common.check import leap_assert_type +from leap.common.check import leap_assert_type, leap_assert from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper @@ -35,15 +35,13 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first +from leap.mail.utils import first, empty, accumulator_queue logger = logging.getLogger(__name__) # TODO -# [ ] Delete original message from the incoming queue after all successful -# writes. -# [ ] Implement a retry queue. +# [ ] Implement a retry queue? # [ ] Consider journaling of operations. @@ -86,10 +84,12 @@ class ContentDedup(object): if not header_docs: return False - if len(header_docs) != 1: - logger.warning("Found more than one copy of chash %s!" - % (chash,)) - logger.debug("Found header doc with that hash! Skipping save!") + # FIXME enable only to debug this problem. + #if len(header_docs) != 1: + #logger.warning("Found more than one copy of chash %s!" + #% (chash,)) + + #logger.debug("Found header doc with that hash! Skipping save!") return True def _content_does_exist(self, doc): @@ -110,10 +110,11 @@ class ContentDedup(object): if not attach_docs: return False - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") + # FIXME enable only to debug this problem + #if len(attach_docs) != 1: + #logger.warning("Found more than one copy of phash %s!" + #% (phash,)) + #logger.debug("Found attachment doc with that hash! Skipping save!") return True @@ -121,13 +122,26 @@ class MsgWriteError(Exception): """ Raised if any exception is found while saving message parts. """ + pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock()) class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ - _last_uid_lock = threading.Lock() + _soledad_rw_lock = threading.Lock() + _remove_lock = threading.Lock() + _mbox_doc_locks = defaultdict(lambda: threading.Lock()) implements(IMessageConsumer, IMessageStore) @@ -138,8 +152,20 @@ class SoledadStore(ContentDedup): :param soledad: the soledad instance :type soledad: Soledad """ + from twisted.internet import reactor + self.reactor = reactor + self._soledad = soledad + self._CREATE_DOC_FUN = self._soledad.create_doc + self._PUT_DOC_FUN = self._soledad.put_doc + self._GET_DOC_FUN = self._soledad.get_doc + + # we instantiate an accumulator to batch the notifications + self.docs_notify_queue = accumulator_queue( + lambda item: reactor.callFromThread(self._unset_new_dirty, item), + 20) + # IMessageStore # ------------------------------------------------------------------- @@ -194,47 +220,32 @@ class SoledadStore(ContentDedup): # IMessageConsumer - # It's not thread-safe to defer this to a different thread + # TODO should handle the delete case + # TODO should handle errors better + # TODO could generalize this method into a generic consumer + # and only implement `process` here def consume(self, queue): """ Creates a new document in soledad db. - :param queue: queue to get item from, with content of the document - to be inserted. - :type queue: Queue - """ - # TODO should delete the original message from incoming only after - # the writes are done. - # TODO should handle the delete case - # TODO should handle errors - # TODO could generalize this method into a generic consumer - # and only implement `process` here - - def docWriteCallBack(doc_wrapper): - """ - Callback for a successful write of a document wrapper. - """ - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - self._unset_new_dirty(doc_wrapper) - - def docWriteErrorBack(failure): - """ - Errorback for write operations. - """ - log.error("Error while processing item.") - log.msg(failure.getTraceBack()) - - while not queue.empty(): - doc_wrapper = queue.get() - d = defer.Deferred() - d.addCallbacks(docWriteCallBack, docWriteErrorBack) - - self._consume_doc(doc_wrapper, d) + :param queue: a tuple of queues to get item from, with content of the + document to be inserted. + :type queue: tuple of Queues + """ + new, dirty = queue + while not new.empty(): + doc_wrapper = new.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + while not dirty.empty(): + doc_wrapper = dirty.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + + # Queue empty, flush the notifications queue. + self.docs_notify_queue(None, flush=True) - @deferred_to_thread def _unset_new_dirty(self, doc_wrapper): """ Unset the `new` and `dirty` flags for this document wrapper in the @@ -243,56 +254,76 @@ class SoledadStore(ContentDedup): :param doc_wrapper: a MessageWrapper instance :type doc_wrapper: MessageWrapper """ - # XXX debug msg id/mbox? - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + if isinstance(doc_wrapper, MessageWrapper): + # XXX still needed for debug quite often + #logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False @deferred_to_thread - def _consume_doc(self, doc_wrapper, deferred): + def _consume_doc(self, doc_wrapper, notify_queue): """ Consume each document wrapper in a separate thread. + We pass an instance of an accumulator that handles the notifications + to the memorystore when the write has been done. :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance :type doc_wrapper: MessageWrapper or RecentFlagsDoc - :param deferred: a deferred that will be fired when the write operation - has finished, either calling its callback or its - errback depending on whether it succeed. - :type deferred: Deferred + :param notify_queue: a callable that handles the writeback + notifications to the memstore. + :type notify_queue: callable """ - items = self._process(doc_wrapper) + def queueNotifyBack(failed, doc_wrapper): + if failed: + log.msg("There was an error writing the mesage...") + else: + notify_queue(doc_wrapper) + + def doSoledadCalls(items): + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) - # From here, we unpack the subpart items and - # the right soledad call. + # + # SoledadStore specific methods. + # + + def _soledad_write_document_parts(self, items): + """ + Write the document parts to soledad in a separate thread. + + :param items: the iterator through the different document wrappers + payloads. + :type items: iterator + :return: whether the write was successful or not + :rtype: bool + """ failed = False for item, call in items: + if empty(item): + continue try: self._try_call(call, item) except Exception as exc: - failed = exc + logger.debug("ITEM WAS: %s" % str(item)) + logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) + logger.exception(exc) + failed = True continue - if failed: - deferred.errback(MsgWriteError( - "There was an error writing the mesage")) - else: - deferred.callback(doc_wrapper) - - # - # SoledadStore specific methods. - # + return failed - def _process(self, doc_wrapper): + def _iter_wrapper_subparts(self, doc_wrapper): """ Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. - :param queue: the queue from where we'll pick item. - :type queue: Queue + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc """ if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), @@ -315,11 +346,38 @@ class SoledadStore(ContentDedup): """ if call is None: return - try: - call(item) - except u1db_errors.RevisionConflict as exc: - logger.exception("Error: %r" % (exc,)) - raise exc + + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + with put_locks[doc_id]: + doc = self._GET_DOC_FUN(doc_id) + + if doc is None: + logger.warning("BUG! Dirty doc but could not " + "find document %s" % (doc_id,)) + return + + doc.content = dict(item.content) + + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + + else: + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc def _get_calls_for_msg_parts(self, msg_wrapper): """ @@ -334,7 +392,7 @@ class SoledadStore(ContentDedup): call = None if msg_wrapper.new: - call = self._soledad.create_doc + call = self._CREATE_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -353,18 +411,18 @@ class SoledadStore(ContentDedup): # the flags doc. elif msg_wrapper.dirty: - call = self._soledad.put_doc + call = self._PUT_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! if not doc_id: + logger.warning("Dirty item but no doc_id!") continue - doc = self._soledad.get_doc(doc_id) - doc.content = dict(item.content) + if item.part == MessagePartType.fdoc: - logger.debug("PUT dirty fdoc") - yield doc, call + #logger.debug("PUT dirty fdoc") + yield item, call # XXX also for linkage-doc !!! else: @@ -379,17 +437,16 @@ class SoledadStore(ContentDedup): :return: a tuple with recent-flags doc payload and callable :rtype: tuple """ - call = self._soledad.put_doc - rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + call = self._CREATE_DOC_FUN payload = rflags_wrapper.content - logger.debug("Saving RFLAGS to Soledad...") - if payload: - rdoc.content = payload - yield rdoc, call + logger.debug("Saving RFLAGS to Soledad...") + yield payload, call - def _get_mbox_document(self, mbox): + # Mbox documents and attributes + + def get_mbox_document(self, mbox): """ Return mailbox document. @@ -399,15 +456,80 @@ class SoledadStore(ContentDedup): the query failed. :rtype: SoledadDocument or None. """ + with self._mbox_doc_locks[mbox]: + return self._get_mbox_document(mbox) + + def _get_mbox_document(self, mbox): + """ + Helper for returning the mailbox document. + """ try: query = self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_MBOX_VAL, mbox) if query: return query.pop() + else: + logger.error("Could not find mbox document for %r" % + (self.mbox,)) except Exception as exc: logger.exception("Unhandled error %r" % exc) + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + mbox_doc = self.get_mbox_document() + return mbox_doc.content.get(fields.CLOSED_KEY, False) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param closed: the value to be set + :type closed: bool + """ + leap_assert(isinstance(closed, bool), "closed needs to be boolean") + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + if mbox_doc is None: + logger.error( + "Could not find mbox document for %r" % (mbox,)) + return + mbox_doc.content[fields.CLOSED_KEY] = closed + self._soledad.put_doc(mbox_doc) + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + # XXX change for a lock related to the mbox document + # itself. + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value > old_val: + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + def get_flags_doc(self, mbox, uid): """ Return the SoledadDocument for the given mbox and uid. @@ -416,12 +538,16 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode :param uid: the UID for the message :type uid: int + :rtype: SoledadDocument or None """ result = None try: flag_docs = self._soledad.get_from_index( fields.TYPE_MBOX_UID_IDX, fields.TYPE_FLAGS_VAL, mbox, str(uid)) + if len(flag_docs) != 1: + logger.warning("More than one flag doc for %r:%s" % + (mbox, uid)) result = first(flag_docs) except Exception as exc: # ugh! Something's broken down there! @@ -430,36 +556,25 @@ class SoledadStore(ContentDedup): finally: return result - def write_last_uid(self, mbox, value): + def get_headers_doc(self, chash): """ - Write the `last_uid` integer to the proper mailbox document - in Soledad. - This is called from the deferred triggered by - memorystore.increment_last_soledad_uid, which is expected to - run in a separate thread. + Return the document that keeps the headers for a message + indexed by its content-hash. - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int + :param chash: the content-hash to retrieve the document from. + :type chash: str or unicode + :rtype: SoledadDocument or None """ - leap_assert_type(value, int) - key = fields.LAST_UID_KEY - - with self._last_uid_lock: - mbox_doc = self._get_mbox_document(mbox) - old_val = mbox_doc.content[key] - if value < old_val: - logger.error("%r:%s Tried to write a UID lesser than what's " - "stored!" % (mbox, value)) - mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + return first(head_docs) # deleted messages def deleted_iter(self, mbox): """ - Get an iterator for the SoledadDocuments for messages + Get an iterator for the the doc_id for SoledadDocuments for messages with \\Deleted flag for a given mailbox. :param mbox: the mailbox @@ -467,11 +582,10 @@ class SoledadStore(ContentDedup): :return: iterator through deleted message docs :rtype: iterable """ - return (doc for doc in self._soledad.get_from_index( + return [doc.doc_id for doc in self._soledad.get_from_index( fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, mbox, '1')) + fields.TYPE_FLAGS_VAL, mbox, '1')] - # TODO can deferToThread this? def remove_all_deleted(self, mbox): """ Remove from Soledad all messages flagged as deleted for a given @@ -481,7 +595,14 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode """ deleted = [] - for doc in self.deleted_iter(mbox): - deleted.append(doc.content[fields.UID_KEY]) - self._soledad.delete_doc(doc) + for doc_id in self.deleted_iter(mbox): + with self._remove_lock: + doc = self._soledad.get_doc(doc_id) + if doc is not None: + self._soledad.delete_doc(doc) + try: + deleted.append(doc.content[fields.UID_KEY]) + except TypeError: + # empty content + pass return deleted |