From 0f6a8e1c83995cffec51e81f626d4bb29d4f7345 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 31 Jan 2014 03:34:03 -0400 Subject: properly implement deferreds in several commands Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha! --- src/leap/mail/decorators.py | 2 +- src/leap/mail/imap/fetch.py | 4 +- src/leap/mail/imap/mailbox.py | 112 ++++++++++++++++++++++++------- src/leap/mail/imap/memorystore.py | 43 ++++++------ src/leap/mail/imap/messages.py | 133 +++++++++++++++++++++++-------------- src/leap/mail/imap/soledadstore.py | 99 ++++++++++++++++++--------- 6 files changed, 264 insertions(+), 129 deletions(-) diff --git a/src/leap/mail/decorators.py b/src/leap/mail/decorators.py index d5eac97..ae115f8 100644 --- a/src/leap/mail/decorators.py +++ b/src/leap/mail/decorators.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) # See this answer: http://stackoverflow.com/a/19019648/1157664 # And the notes by glyph and jpcalderone -def deferred(f): +def deferred_to_thread(f): """ Decorator, for deferring methods to Threads. diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 817ad6a..40dadb3 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import json_loads from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -199,7 +199,7 @@ class LeapIncomingMail(object): logger.exception(failure.value) traceback.print_tb(*sys.exc_info()) - @deferred + @deferred_to_thread def _sync_soledad(self): """ Synchronizes with remote soledad. diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 802ebf3..79fb476 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -27,6 +27,7 @@ import os from collections import defaultdict from twisted.internet import defer +from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -35,7 +36,7 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection @@ -51,6 +52,11 @@ notifying clients of new messages. Use during stress tests. NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +class MessageCopyError(Exception): + """ + """ + + class SoledadMailbox(WithMsgFields, MBoxParser): """ A Soledad-backed IMAP mailbox. @@ -534,7 +540,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred + @deferred_to_thread #@profile def fetch(self, messages_asked, uid): """ @@ -574,7 +580,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result = ((msgid, getmsg(msgid)) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -615,10 +621,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self.messages.all_flags() result = ((msgid, flagsPart( - msgid, all_flags[msgid])) for msgid in seq_messg) + msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -698,28 +704,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise they are message sequence IDs. :type uid: bool - :return: A dict mapping message sequence numbers to sequences of - str representing the flags set on the message after this - operation has been performed. - :rtype: dict + :return: A deferred, that will be called with a dict mapping message + sequence numbers to sequences of str representing the flags + set on the message after this operation has been performed. + :rtype: deferred :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ + from twisted.internet import reactor + if not self.isWriteable(): + log.msg('read only mailbox!') + raise imap4.ReadOnlyMailbox + + d = defer.Deferred() + deferLater(reactor, 0, self._do_store, messages_asked, flags, + mode, uid, d) + return d + + def _do_store(self, messages_asked, flags, mode, uid, observer): + """ + Helper method, invoke set_flags method in the MessageCollection. + + See the documentation for the `store` method for the parameters. + + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag. + # XXX we should prevent cclient from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) - messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - - if not self.isWriteable(): - log.msg('read only mailbox!') - raise imap4.ReadOnlyMailbox - - return self.messages.set_flags(self.mbox, seq_messg, flags, mode) + self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -767,13 +788,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier - #@deferred - #@profile - def copy(self, messageObject): + def copy(self, message): """ Copy the given message object into this mailbox. - """ - msg = messageObject + + :param message: an IMessage implementor + :type message: LeapMessage + :return: a deferred that will be fired with the message + uid when the copy succeed. + :rtype: Deferred + """ + from twisted.internet import reactor + print "COPY :", message + d = defer.Deferred() + + # XXX this should not happen ... track it down, + # probably to FETCH... + if message is None: + log.msg("BUG: COPY found a None in passed message") + d.calback(None) + deferLater(reactor, 0, self._do_copy, message, d) + return d + + #@profile + def _do_copy(self, message, observer): + """ + Call invoked from the deferLater in `copy`. This will + copy the flags and header documents, and pass them to the + `create_message` method in the MemoryStore, together with + the observer deferred that we've been passed along. + + :param message: an IMessage implementor + :type message: LeapMessage + :param observer: the deferred that will fire with the + UID of the message + :type observer: Deferred + """ + # XXX for clarity, this could be delegated to a + # MessageCollection mixin that implements copy too, and + # moved out of here. + msg = message memstore = self._memstore # XXX should use a public api instead @@ -785,12 +839,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser): new_fdoc = copy.deepcopy(fdoc.content) fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] + + # XXX is this hitting the db??? --- probably. + # We should profile after the pre-fetch. dest_fdoc = memstore.get_fdoc_from_chash( fdoc_chash, self.mbox) exist = dest_fdoc and not empty(dest_fdoc.content) if exist: + # Should we signal error on the callback? logger.warning("Destination message already exists!") + + # XXX I'm still not clear if we should raise the + # callback. This actually rases an ugly warning + # in some muas like thunderbird. I guess the user does + # not deserve that. + #observer.errback(MessageCopyError("Already exists!")) + observer.callback(True) else: mbox = self.mbox uid_next = memstore.increment_last_soledad_uid(mbox) @@ -799,10 +864,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # FIXME set recent! - return self._memstore.create_message( + self._memstore.create_message( self.mbox, uid_next, MessageWrapper( new_fdoc, hdoc.content), + observer=observer, notify_on_disk=False) # convenience fun diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 217ad8e..211d282 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -32,7 +32,7 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -200,7 +200,8 @@ class MemoryStore(object): # We would have to add a put_flags operation to modify only # the flags doc (and set the dirty flag accordingly) - def create_message(self, mbox, uid, message, notify_on_disk=True): + def create_message(self, mbox, uid, message, observer, + notify_on_disk=True): """ Create the passed message into this MemoryStore. @@ -212,38 +213,38 @@ class MemoryStore(object): :type uid: int :param message: a message to be added :type message: MessageWrapper - :param notify_on_disk: whether the deferred that is returned should + :param observer: the deferred that will fire with the + UID of the message. If notify_on_disk is True, + this will happen when the message is written to + Soledad. Otherwise it will fire as soon as we've + added the message to the memory store. + :type observer: Deferred + :param notify_on_disk: whether the `observer` deferred should wait until the message is written to disk to be fired. :type notify_on_disk: bool - - :return: a Deferred. if notify_on_disk is True, will be fired - when written to the db on disk. - Otherwise will fire inmediately - :rtype: Deferred """ log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) - - d = defer.Deferred() - d.addCallback(lambda result: log.msg("message save: %s" % result)) self._new.add(key) - # We store this deferred so we can keep track of the pending - # operations internally. - self._new_deferreds[key] = d + def log_add(result): + log.msg("message save: %s" % result) + return result + observer.addCallback(log_add) if notify_on_disk: - # Caller wants to be notified when the message is on disk - # so we pass the deferred that will be fired when the message - # has been written. - return d - else: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + if not notify_on_disk: # Caller does not care, just fired and forgot, so we pass # a defer that will inmediately have its callback triggered. - return defer.succeed('fire-and-forget:%s' % str(key)) + observer.callback(uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ @@ -541,7 +542,7 @@ class MemoryStore(object): self.write_last_uid(mbox, value) return value - @deferred + @deferred_to_thread def write_last_uid(self, mbox, value): """ Increment the soledad integer cache for the highest uid value. diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 0e5c74a..03dde29 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -37,7 +37,7 @@ from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk from leap.mail.utils import first, find_charset, lowerdict, empty -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields from leap.mail.imap.memorystore import MessageWrapper @@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser): REMOVE = -1 SET = 0 - with self.flags_lock: - current = doc.content[self.FLAGS_KEY] - if mode == APPEND: - newflags = tuple(set(tuple(current) + flags)) - elif mode == REMOVE: - newflags = tuple(set(current).difference(set(flags))) - elif mode == SET: - newflags = flags - - # We could defer this, but I think it's better - # to put it under the lock... - doc.content[self.FLAGS_KEY] = newflags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - - if self._collection.memstore is not None: - log.msg("putting message in collection") - self._collection.memstore.put_message( - self._mbox, self._uid, - MessageWrapper(fdoc=doc.content, new=False, dirty=True, - docs_id={'fdoc': doc.doc_id})) - else: - # fallback for non-memstore initializations. - self._soledad.put_doc(doc) + #with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) return map(str, newflags) def getInternalDate(self): @@ -457,8 +457,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - if not self.isMultipart(): - raise TypeError + #if not self.isMultipart(): + #raise TypeError try: pmap_dict = self._get_part_from_parts_map(part + 1) except KeyError: @@ -846,14 +846,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): else: return False - # not deferring to thread cause this now uses deferred asa retval - #@deferred #@profile def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, notify_on_disk=False): """ Creates a new message document. - Here lives the magic of the leap mail. Well, in soledad, really. :param raw: the raw message :type raw: str @@ -869,6 +866,31 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param uid: the message uid for this mailbox :type uid: int + + :return: a deferred that will be fired with the message + uid when the adding succeed. + :rtype: deferred + """ + logger.debug('adding message') + if flags is None: + flags = tuple() + leap_assert_type(flags, tuple) + + d = defer.Deferred() + self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) + return d + + @deferred_to_thread + def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + """ + Helper that creates a new message document. + Here lives the magic of the leap mail. Well, in soledad, really. + + See `add_msg` docstring for parameter info. + + :param observer: a deferred that will be fired with the message + uid when the adding succeed. + :type observer: deferred """ # TODO signal that we can delete the original message!----- # when all the processing is done. @@ -876,11 +898,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO add the linked-from info ! # TODO add reference to the original message - logger.debug('adding message') - if flags is None: - flags = tuple() - leap_assert_type(flags, tuple) - # parse msg, chash, size, multi = self._do_parse(raw) @@ -918,16 +935,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.set_recent_flag(uid) - # Saving ---------------------------------------- # TODO ---- add reference to original doc, to be deleted # after writes are done. msg_container = MessageWrapper(fd, hd, cdocs) - # we return a deferred that by default will be triggered - # inmediately. - d = self.memstore.create_message(self.mbox, uid, msg_container, - notify_on_disk=notify_on_disk) - return d + self.memstore.create_message(self.mbox, uid, msg_container, + observer=observer, + notify_on_disk=notify_on_disk) # # getters: specific queries @@ -1030,7 +1044,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.recent_flags.difference_update( set([uid])) - @deferred + @deferred_to_thread def set_recent_flag(self, uid): """ Set Recent flag for a given uid. @@ -1080,7 +1094,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return None return fdoc.content.get(fields.UID_KEY, None) - @deferred + @deferred_to_thread def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -1100,7 +1114,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return self._get_uid_from_msgidCb(msgid) #@profile - def set_flags(self, mbox, messages, flags, mode): + def set_flags(self, mbox, messages, flags, mode, observer): """ Set flags for a sequence of messages. @@ -1112,16 +1126,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :type flags: tuple :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. :type mode: int + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred """ - result = {} + # XXX we could defer *this* to thread pool, and gather results... + # XXX use deferredList + + deferreds = [] for msg_id in messages: - log.msg("MSG ID = %s" % msg_id) - msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) - if not msg: - continue - result[msg_id] = msg.setFlags(flags, mode) + deferreds.append( + self._set_flag_for_uid(msg_id, flags, mode)) - return result + def notify(result): + observer.callback(dict(result)) + d1 = defer.gatherResults(deferreds, consumeErrors=True) + d1.addCallback(notify) + + @deferred_to_thread + def _set_flag_for_uid(self, msg_id, flags, mode): + """ + Run the set_flag operation in the thread pool. + """ + log.msg("MSG ID = %s" % msg_id) + msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) + if msg is not None: + return msg_id, msg.setFlags(flags, mode) # getters: generic for a mailbox diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ff5e03b..82f27e7 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,10 +23,12 @@ import threading from itertools import chain from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log from zope.interface import implements from leap.common.check import leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -209,52 +211,87 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + while not queue.empty(): - items = self._process(queue) + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: + :type doc_wrapper: + :param deferred: + :type deferred: Deferred + """ + items = self._process(doc_wrapper) - # From here, we unpack the subpart items and - # the right soledad call. + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: try: - failed = False - for item, call in items: - try: - self._try_call(call, item) - except Exception as exc: - failed = exc - continue - if failed: - raise MsgWriteError - - except MsgWriteError: - logger.error("Error while processing item.") - logger.exception(failed) - else: - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) # # SoledadStore specific methods. # - def _process(self, queue): + def _process(self, doc_wrapper): """ - Return an iterator that will yield the msg_wrapper in the first place, + Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. :param queue: the queue from where we'll pick item. :type queue: Queue """ - doc_wrapper = queue.get() - if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) -- cgit v1.2.3