diff options
-rw-r--r-- | src/leap/mail/decorators.py | 2 | ||||
-rw-r--r-- | src/leap/mail/imap/fetch.py | 4 | ||||
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 112 | ||||
-rw-r--r-- | src/leap/mail/imap/memorystore.py | 43 | ||||
-rw-r--r-- | src/leap/mail/imap/messages.py | 133 | ||||
-rw-r--r-- | src/leap/mail/imap/soledadstore.py | 99 |
6 files changed, 264 insertions, 129 deletions
diff --git a/src/leap/mail/decorators.py b/src/leap/mail/decorators.py index d5eac97..ae115f8 100644 --- a/src/leap/mail/decorators.py +++ b/src/leap/mail/decorators.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) # See this answer: http://stackoverflow.com/a/19019648/1157664 # And the notes by glyph and jpcalderone -def deferred(f): +def deferred_to_thread(f): """ Decorator, for deferring methods to Threads. diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 817ad6a..40dadb3 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import json_loads from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -199,7 +199,7 @@ class LeapIncomingMail(object): logger.exception(failure.value) traceback.print_tb(*sys.exc_info()) - @deferred + @deferred_to_thread def _sync_soledad(self): """ Synchronizes with remote soledad. diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 802ebf3..79fb476 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -27,6 +27,7 @@ import os from collections import defaultdict from twisted.internet import defer +from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -35,7 +36,7 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection @@ -51,6 +52,11 @@ notifying clients of new messages. Use during stress tests. NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +class MessageCopyError(Exception): + """ + """ + + class SoledadMailbox(WithMsgFields, MBoxParser): """ A Soledad-backed IMAP mailbox. @@ -534,7 +540,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred + @deferred_to_thread #@profile def fetch(self, messages_asked, uid): """ @@ -574,7 +580,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result = ((msgid, getmsg(msgid)) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -615,10 +621,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self.messages.all_flags() result = ((msgid, flagsPart( - msgid, all_flags[msgid])) for msgid in seq_messg) + msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -698,28 +704,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise they are message sequence IDs. :type uid: bool - :return: A dict mapping message sequence numbers to sequences of - str representing the flags set on the message after this - operation has been performed. - :rtype: dict + :return: A deferred, that will be called with a dict mapping message + sequence numbers to sequences of str representing the flags + set on the message after this operation has been performed. + :rtype: deferred :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ + from twisted.internet import reactor + if not self.isWriteable(): + log.msg('read only mailbox!') + raise imap4.ReadOnlyMailbox + + d = defer.Deferred() + deferLater(reactor, 0, self._do_store, messages_asked, flags, + mode, uid, d) + return d + + def _do_store(self, messages_asked, flags, mode, uid, observer): + """ + Helper method, invoke set_flags method in the MessageCollection. + + See the documentation for the `store` method for the parameters. + + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag. + # XXX we should prevent cclient from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) - messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - - if not self.isWriteable(): - log.msg('read only mailbox!') - raise imap4.ReadOnlyMailbox - - return self.messages.set_flags(self.mbox, seq_messg, flags, mode) + self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -767,13 +788,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier - #@deferred - #@profile - def copy(self, messageObject): + def copy(self, message): """ Copy the given message object into this mailbox. - """ - msg = messageObject + + :param message: an IMessage implementor + :type message: LeapMessage + :return: a deferred that will be fired with the message + uid when the copy succeed. + :rtype: Deferred + """ + from twisted.internet import reactor + print "COPY :", message + d = defer.Deferred() + + # XXX this should not happen ... track it down, + # probably to FETCH... + if message is None: + log.msg("BUG: COPY found a None in passed message") + d.calback(None) + deferLater(reactor, 0, self._do_copy, message, d) + return d + + #@profile + def _do_copy(self, message, observer): + """ + Call invoked from the deferLater in `copy`. This will + copy the flags and header documents, and pass them to the + `create_message` method in the MemoryStore, together with + the observer deferred that we've been passed along. + + :param message: an IMessage implementor + :type message: LeapMessage + :param observer: the deferred that will fire with the + UID of the message + :type observer: Deferred + """ + # XXX for clarity, this could be delegated to a + # MessageCollection mixin that implements copy too, and + # moved out of here. + msg = message memstore = self._memstore # XXX should use a public api instead @@ -785,12 +839,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser): new_fdoc = copy.deepcopy(fdoc.content) fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] + + # XXX is this hitting the db??? --- probably. + # We should profile after the pre-fetch. dest_fdoc = memstore.get_fdoc_from_chash( fdoc_chash, self.mbox) exist = dest_fdoc and not empty(dest_fdoc.content) if exist: + # Should we signal error on the callback? logger.warning("Destination message already exists!") + + # XXX I'm still not clear if we should raise the + # callback. This actually rases an ugly warning + # in some muas like thunderbird. I guess the user does + # not deserve that. + #observer.errback(MessageCopyError("Already exists!")) + observer.callback(True) else: mbox = self.mbox uid_next = memstore.increment_last_soledad_uid(mbox) @@ -799,10 +864,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # FIXME set recent! - return self._memstore.create_message( + self._memstore.create_message( self.mbox, uid_next, MessageWrapper( new_fdoc, hdoc.content), + observer=observer, notify_on_disk=False) # convenience fun diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 217ad8e..211d282 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -32,7 +32,7 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces @@ -200,7 +200,8 @@ class MemoryStore(object): # We would have to add a put_flags operation to modify only # the flags doc (and set the dirty flag accordingly) - def create_message(self, mbox, uid, message, notify_on_disk=True): + def create_message(self, mbox, uid, message, observer, + notify_on_disk=True): """ Create the passed message into this MemoryStore. @@ -212,38 +213,38 @@ class MemoryStore(object): :type uid: int :param message: a message to be added :type message: MessageWrapper - :param notify_on_disk: whether the deferred that is returned should + :param observer: the deferred that will fire with the + UID of the message. If notify_on_disk is True, + this will happen when the message is written to + Soledad. Otherwise it will fire as soon as we've + added the message to the memory store. + :type observer: Deferred + :param notify_on_disk: whether the `observer` deferred should wait until the message is written to disk to be fired. :type notify_on_disk: bool - - :return: a Deferred. if notify_on_disk is True, will be fired - when written to the db on disk. - Otherwise will fire inmediately - :rtype: Deferred """ log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) - - d = defer.Deferred() - d.addCallback(lambda result: log.msg("message save: %s" % result)) self._new.add(key) - # We store this deferred so we can keep track of the pending - # operations internally. - self._new_deferreds[key] = d + def log_add(result): + log.msg("message save: %s" % result) + return result + observer.addCallback(log_add) if notify_on_disk: - # Caller wants to be notified when the message is on disk - # so we pass the deferred that will be fired when the message - # has been written. - return d - else: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + if not notify_on_disk: # Caller does not care, just fired and forgot, so we pass # a defer that will inmediately have its callback triggered. - return defer.succeed('fire-and-forget:%s' % str(key)) + observer.callback(uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ @@ -541,7 +542,7 @@ class MemoryStore(object): self.write_last_uid(mbox, value) return value - @deferred + @deferred_to_thread def write_last_uid(self, mbox, value): """ Increment the soledad integer cache for the highest uid value. diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 0e5c74a..03dde29 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -37,7 +37,7 @@ from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk from leap.mail.utils import first, find_charset, lowerdict, empty -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields from leap.mail.imap.memorystore import MessageWrapper @@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser): REMOVE = -1 SET = 0 - with self.flags_lock: - current = doc.content[self.FLAGS_KEY] - if mode == APPEND: - newflags = tuple(set(tuple(current) + flags)) - elif mode == REMOVE: - newflags = tuple(set(current).difference(set(flags))) - elif mode == SET: - newflags = flags - - # We could defer this, but I think it's better - # to put it under the lock... - doc.content[self.FLAGS_KEY] = newflags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - - if self._collection.memstore is not None: - log.msg("putting message in collection") - self._collection.memstore.put_message( - self._mbox, self._uid, - MessageWrapper(fdoc=doc.content, new=False, dirty=True, - docs_id={'fdoc': doc.doc_id})) - else: - # fallback for non-memstore initializations. - self._soledad.put_doc(doc) + #with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) return map(str, newflags) def getInternalDate(self): @@ -457,8 +457,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: Any object implementing C{IMessagePart}. :return: The specified sub-part. """ - if not self.isMultipart(): - raise TypeError + #if not self.isMultipart(): + #raise TypeError try: pmap_dict = self._get_part_from_parts_map(part + 1) except KeyError: @@ -846,14 +846,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): else: return False - # not deferring to thread cause this now uses deferred asa retval - #@deferred #@profile def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, notify_on_disk=False): """ Creates a new message document. - Here lives the magic of the leap mail. Well, in soledad, really. :param raw: the raw message :type raw: str @@ -869,6 +866,31 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :param uid: the message uid for this mailbox :type uid: int + + :return: a deferred that will be fired with the message + uid when the adding succeed. + :rtype: deferred + """ + logger.debug('adding message') + if flags is None: + flags = tuple() + leap_assert_type(flags, tuple) + + d = defer.Deferred() + self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) + return d + + @deferred_to_thread + def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + """ + Helper that creates a new message document. + Here lives the magic of the leap mail. Well, in soledad, really. + + See `add_msg` docstring for parameter info. + + :param observer: a deferred that will be fired with the message + uid when the adding succeed. + :type observer: deferred """ # TODO signal that we can delete the original message!----- # when all the processing is done. @@ -876,11 +898,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO add the linked-from info ! # TODO add reference to the original message - logger.debug('adding message') - if flags is None: - flags = tuple() - leap_assert_type(flags, tuple) - # parse msg, chash, size, multi = self._do_parse(raw) @@ -918,16 +935,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.set_recent_flag(uid) - # Saving ---------------------------------------- # TODO ---- add reference to original doc, to be deleted # after writes are done. msg_container = MessageWrapper(fd, hd, cdocs) - # we return a deferred that by default will be triggered - # inmediately. - d = self.memstore.create_message(self.mbox, uid, msg_container, - notify_on_disk=notify_on_disk) - return d + self.memstore.create_message(self.mbox, uid, msg_container, + observer=observer, + notify_on_disk=notify_on_disk) # # getters: specific queries @@ -1030,7 +1044,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.recent_flags.difference_update( set([uid])) - @deferred + @deferred_to_thread def set_recent_flag(self, uid): """ Set Recent flag for a given uid. @@ -1080,7 +1094,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return None return fdoc.content.get(fields.UID_KEY, None) - @deferred + @deferred_to_thread def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -1100,7 +1114,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): return self._get_uid_from_msgidCb(msgid) #@profile - def set_flags(self, mbox, messages, flags, mode): + def set_flags(self, mbox, messages, flags, mode, observer): """ Set flags for a sequence of messages. @@ -1112,16 +1126,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :type flags: tuple :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. :type mode: int + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred """ - result = {} + # XXX we could defer *this* to thread pool, and gather results... + # XXX use deferredList + + deferreds = [] for msg_id in messages: - log.msg("MSG ID = %s" % msg_id) - msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) - if not msg: - continue - result[msg_id] = msg.setFlags(flags, mode) + deferreds.append( + self._set_flag_for_uid(msg_id, flags, mode)) - return result + def notify(result): + observer.callback(dict(result)) + d1 = defer.gatherResults(deferreds, consumeErrors=True) + d1.addCallback(notify) + + @deferred_to_thread + def _set_flag_for_uid(self, msg_id, flags, mode): + """ + Run the set_flag operation in the thread pool. + """ + log.msg("MSG ID = %s" % msg_id) + msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) + if msg is not None: + return msg_id, msg.setFlags(flags, mode) # getters: generic for a mailbox diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ff5e03b..82f27e7 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,10 +23,12 @@ import threading from itertools import chain from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log from zope.interface import implements from leap.common.check import leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import RecentFlagsDoc @@ -209,52 +211,87 @@ class SoledadStore(ContentDedup): # TODO could generalize this method into a generic consumer # and only implement `process` here + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + while not queue.empty(): - items = self._process(queue) + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: + :type doc_wrapper: + :param deferred: + :type deferred: Deferred + """ + items = self._process(doc_wrapper) - # From here, we unpack the subpart items and - # the right soledad call. + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: try: - failed = False - for item, call in items: - try: - self._try_call(call, item) - except Exception as exc: - failed = exc - continue - if failed: - raise MsgWriteError - - except MsgWriteError: - logger.error("Error while processing item.") - logger.exception(failed) - else: - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) # # SoledadStore specific methods. # - def _process(self, queue): + def _process(self, doc_wrapper): """ - Return an iterator that will yield the msg_wrapper in the first place, + Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. :param queue: the queue from where we'll pick item. :type queue: Queue """ - doc_wrapper = queue.get() - if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), self._get_calls_for_msg_parts(doc_wrapper)) |