diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/mail/decorators.py | 2 | ||||
| -rw-r--r-- | src/leap/mail/imap/fetch.py | 4 | ||||
| -rw-r--r-- | src/leap/mail/imap/mailbox.py | 112 | ||||
| -rw-r--r-- | src/leap/mail/imap/memorystore.py | 43 | ||||
| -rw-r--r-- | src/leap/mail/imap/messages.py | 133 | ||||
| -rw-r--r-- | src/leap/mail/imap/soledadstore.py | 99 | 
6 files changed, 264 insertions, 129 deletions
| diff --git a/src/leap/mail/decorators.py b/src/leap/mail/decorators.py index d5eac97..ae115f8 100644 --- a/src/leap/mail/decorators.py +++ b/src/leap/mail/decorators.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)  # See this answer: http://stackoverflow.com/a/19019648/1157664  # And the notes by glyph and jpcalderone -def deferred(f): +def deferred_to_thread(f):      """      Decorator, for deferring methods to Threads. diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 817ad6a..40dadb3 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.mail import get_email_charset  from leap.keymanager import errors as keymanager_errors  from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread  from leap.mail.utils import json_loads  from leap.soledad.client import Soledad  from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -199,7 +199,7 @@ class LeapIncomingMail(object):          logger.exception(failure.value)          traceback.print_tb(*sys.exc_info()) -    @deferred +    @deferred_to_thread      def _sync_soledad(self):          """          Synchronizes with remote soledad. diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 802ebf3..79fb476 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -27,6 +27,7 @@ import os  from collections import defaultdict  from twisted.internet import defer +from twisted.internet.task import deferLater  from twisted.python import log  from twisted.mail import imap4 @@ -35,7 +36,7 @@ from zope.interface import implements  from leap.common import events as leap_events  from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread  from leap.mail.utils import empty  from leap.mail.imap.fields import WithMsgFields, fields  from leap.mail.imap.messages import MessageCollection @@ -51,6 +52,11 @@ notifying clients of new messages. Use during stress tests.  NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +class MessageCopyError(Exception): +    """ +    """ + +  class SoledadMailbox(WithMsgFields, MBoxParser):      """      A Soledad-backed IMAP mailbox. @@ -534,7 +540,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          seq_messg = set_asked.intersection(set_exist)          return seq_messg -    @deferred +    @deferred_to_thread      #@profile      def fetch(self, messages_asked, uid):          """ @@ -574,7 +580,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              result = ((msgid, getmsg(msgid)) for msgid in seq_messg)          return result -    @deferred +    @deferred_to_thread      def fetch_flags(self, messages_asked, uid):          """          A fast method to fetch all flags, tricking just the @@ -615,10 +621,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          all_flags = self.messages.all_flags()          result = ((msgid, flagsPart( -            msgid, all_flags[msgid])) for msgid in seq_messg) +            msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)          return result -    @deferred +    @deferred_to_thread      def fetch_headers(self, messages_asked, uid):          """          A fast method to fetch all headers, tricking just the @@ -698,28 +704,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                      otherwise they are message sequence IDs.          :type uid: bool -        :return: A dict mapping message sequence numbers to sequences of -                 str representing the flags set on the message after this -                 operation has been performed. -        :rtype: dict +        :return: A deferred, that will be called with a dict mapping message +                 sequence numbers to sequences of str representing the flags +                 set on the message after this operation has been performed. +        :rtype: deferred          :raise ReadOnlyMailbox: Raised if this mailbox is not open for                                  read-write.          """ +        from twisted.internet import reactor +        if not self.isWriteable(): +            log.msg('read only mailbox!') +            raise imap4.ReadOnlyMailbox + +        d = defer.Deferred() +        deferLater(reactor, 0, self._do_store, messages_asked, flags, +                   mode, uid, d) +        return d + +    def _do_store(self, messages_asked, flags, mode, uid, observer): +        """ +        Helper method, invoke set_flags method in the MessageCollection. + +        See the documentation for the `store` method for the parameters. + +        :param observer: a deferred that will be called with the dictionary +                         mapping UIDs to flags after the operation has been +                         done. +        :type observer: deferred +        """          # XXX implement also sequence (uid = 0) -        # XXX we should prevent cclient from setting Recent flag. +        # XXX we should prevent cclient from setting Recent flag?          leap_assert(not isinstance(flags, basestring),                      "flags cannot be a string")          flags = tuple(flags) -          messages_asked = self._bound_seq(messages_asked)          seq_messg = self._filter_msg_seq(messages_asked) - -        if not self.isWriteable(): -            log.msg('read only mailbox!') -            raise imap4.ReadOnlyMailbox - -        return self.messages.set_flags(self.mbox, seq_messg, flags, mode) +        self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)      # ISearchableMailbox @@ -767,13 +788,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      # IMessageCopier -    #@deferred -    #@profile -    def copy(self, messageObject): +    def copy(self, message):          """          Copy the given message object into this mailbox. -        """ -        msg = messageObject + +        :param message: an IMessage implementor +        :type message: LeapMessage +        :return: a deferred that will be fired with the message +                 uid when the copy succeed. +        :rtype: Deferred +        """ +        from twisted.internet import reactor +        print "COPY :", message +        d = defer.Deferred() + +        # XXX this should not happen ... track it down, +        # probably to FETCH... +        if message is None: +            log.msg("BUG: COPY found a None in passed message") +            d.calback(None) +        deferLater(reactor, 0, self._do_copy, message, d) +        return d + +    #@profile +    def _do_copy(self, message, observer): +        """ +        Call invoked from the deferLater in `copy`. This will +        copy the flags and header documents, and pass them to the +        `create_message` method in the MemoryStore, together with +        the observer deferred that we've been passed along. + +        :param message: an IMessage implementor +        :type message: LeapMessage +        :param observer: the deferred that will fire with the +                         UID of the message +        :type observer: Deferred +        """ +        # XXX  for clarity, this could be delegated to a +        # MessageCollection mixin that implements copy too, and +        # moved out of here. +        msg = message          memstore = self._memstore          # XXX should use a public api instead @@ -785,12 +839,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          new_fdoc = copy.deepcopy(fdoc.content)          fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] + +        # XXX is this hitting the db??? --- probably. +        # We should profile after the pre-fetch.          dest_fdoc = memstore.get_fdoc_from_chash(              fdoc_chash, self.mbox)          exist = dest_fdoc and not empty(dest_fdoc.content)          if exist: +            # Should we signal error on the callback?              logger.warning("Destination message already exists!") + +            # XXX I'm still not clear if we should raise the +            # callback. This actually rases an ugly warning +            # in some muas like thunderbird. I guess the user does +            # not deserve that. +            #observer.errback(MessageCopyError("Already exists!")) +            observer.callback(True)          else:              mbox = self.mbox              uid_next = memstore.increment_last_soledad_uid(mbox) @@ -799,10 +864,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              # FIXME set recent! -            return self._memstore.create_message( +            self._memstore.create_message(                  self.mbox, uid_next,                  MessageWrapper(                      new_fdoc, hdoc.content), +                observer=observer,                  notify_on_disk=False)      # convenience fun diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 217ad8e..211d282 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -32,7 +32,7 @@ from zope.interface import implements  from leap.common.check import leap_assert_type  from leap.mail import size -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread  from leap.mail.utils import empty  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces @@ -200,7 +200,8 @@ class MemoryStore(object):      # We would have to add a put_flags operation to modify only      # the flags doc (and set the dirty flag accordingly) -    def create_message(self, mbox, uid, message, notify_on_disk=True): +    def create_message(self, mbox, uid, message, observer, +                       notify_on_disk=True):          """          Create the passed message into this MemoryStore. @@ -212,38 +213,38 @@ class MemoryStore(object):          :type uid: int          :param message: a message to be added          :type message: MessageWrapper -        :param notify_on_disk: whether the deferred that is returned should +        :param observer: the deferred that will fire with the +                         UID of the message. If notify_on_disk is True, +                         this will happen when the message is written to +                         Soledad. Otherwise it will fire as soon as we've +                         added the message to the memory store. +        :type observer: Deferred +        :param notify_on_disk: whether the `observer` deferred should                                 wait until the message is written to disk to                                 be fired.          :type notify_on_disk: bool - -        :return: a Deferred. if notify_on_disk is True, will be fired -                 when written to the db on disk. -                 Otherwise will fire inmediately -        :rtype: Deferred          """          log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))          key = mbox, uid          self._add_message(mbox, uid, message, notify_on_disk) - -        d = defer.Deferred() -        d.addCallback(lambda result: log.msg("message save: %s" % result))          self._new.add(key) -        # We store this deferred so we can keep track of the pending -        # operations internally. -        self._new_deferreds[key] = d +        def log_add(result): +            log.msg("message save: %s" % result) +            return result +        observer.addCallback(log_add)          if notify_on_disk: -            # Caller wants to be notified when the message is on disk -            # so we pass the deferred that will be fired when the message -            # has been written. -            return d -        else: +            # We store this deferred so we can keep track of the pending +            # operations internally. +            # TODO this should fire with the UID !!! -- change that in +            # the soledad store code. +            self._new_deferreds[key] = observer +        if not notify_on_disk:              # Caller does not care, just fired and forgot, so we pass              # a defer that will inmediately have its callback triggered. -            return defer.succeed('fire-and-forget:%s' % str(key)) +            observer.callback(uid)      def put_message(self, mbox, uid, message, notify_on_disk=True):          """ @@ -541,7 +542,7 @@ class MemoryStore(object):              self.write_last_uid(mbox, value)              return value -    @deferred +    @deferred_to_thread      def write_last_uid(self, mbox, value):          """          Increment the soledad integer cache for the highest uid value. diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 0e5c74a..03dde29 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -37,7 +37,7 @@ from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset  from leap.mail import walk  from leap.mail.utils import first, find_charset, lowerdict, empty -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread  from leap.mail.imap.index import IndexedDB  from leap.mail.imap.fields import fields, WithMsgFields  from leap.mail.imap.memorystore import MessageWrapper @@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser):          REMOVE = -1          SET = 0 -        with self.flags_lock: -            current = doc.content[self.FLAGS_KEY] -            if mode == APPEND: -                newflags = tuple(set(tuple(current) + flags)) -            elif mode == REMOVE: -                newflags = tuple(set(current).difference(set(flags))) -            elif mode == SET: -                newflags = flags - -            # We could defer this, but I think it's better -            # to put it under the lock... -            doc.content[self.FLAGS_KEY] = newflags -            doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags -            doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - -            if self._collection.memstore is not None: -                log.msg("putting message in collection") -                self._collection.memstore.put_message( -                    self._mbox, self._uid, -                    MessageWrapper(fdoc=doc.content, new=False, dirty=True, -                                   docs_id={'fdoc': doc.doc_id})) -            else: -                # fallback for non-memstore initializations. -                self._soledad.put_doc(doc) +        #with self.flags_lock: +        current = doc.content[self.FLAGS_KEY] +        if mode == APPEND: +            newflags = tuple(set(tuple(current) + flags)) +        elif mode == REMOVE: +            newflags = tuple(set(current).difference(set(flags))) +        elif mode == SET: +            newflags = flags + +        # We could defer this, but I think it's better +        # to put it under the lock... +        doc.content[self.FLAGS_KEY] = newflags +        doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags +        doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + +        if self._collection.memstore is not None: +            log.msg("putting message in collection") +            self._collection.memstore.put_message( +                self._mbox, self._uid, +                MessageWrapper(fdoc=doc.content, new=False, dirty=True, +                               docs_id={'fdoc': doc.doc_id})) +        else: +            # fallback for non-memstore initializations. +            self._soledad.put_doc(doc)          return map(str, newflags)      def getInternalDate(self): @@ -457,8 +457,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          :rtype: Any object implementing C{IMessagePart}.          :return: The specified sub-part.          """ -        if not self.isMultipart(): -            raise TypeError +        #if not self.isMultipart(): +            #raise TypeError          try:              pmap_dict = self._get_part_from_parts_map(part + 1)          except KeyError: @@ -846,14 +846,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          else:              return False -    # not deferring to thread cause this now uses deferred asa retval -    #@deferred      #@profile      def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,                  notify_on_disk=False):          """          Creates a new message document. -        Here lives the magic of the leap mail. Well, in soledad, really.          :param raw: the raw message          :type raw: str @@ -869,6 +866,31 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :param uid: the message uid for this mailbox          :type uid: int + +        :return: a deferred that will be fired with the message +                 uid when the adding succeed. +        :rtype: deferred +        """ +        logger.debug('adding message') +        if flags is None: +            flags = tuple() +        leap_assert_type(flags, tuple) + +        d = defer.Deferred() +        self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) +        return d + +    @deferred_to_thread +    def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): +        """ +        Helper that creates a new message document. +        Here lives the magic of the leap mail. Well, in soledad, really. + +        See `add_msg` docstring for parameter info. + +        :param observer: a deferred that will be fired with the message +                         uid when the adding succeed. +        :type observer: deferred          """          # TODO signal that we can delete the original message!-----          # when all the processing is done. @@ -876,11 +898,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # TODO add the linked-from info !          # TODO add reference to the original message -        logger.debug('adding message') -        if flags is None: -            flags = tuple() -        leap_assert_type(flags, tuple) -          # parse          msg, chash, size, multi = self._do_parse(raw) @@ -918,16 +935,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          self.set_recent_flag(uid) -        # Saving ----------------------------------------          # TODO ---- add reference to original doc, to be deleted          # after writes are done.          msg_container = MessageWrapper(fd, hd, cdocs) -        # we return a deferred that by default will be triggered -        # inmediately. -        d = self.memstore.create_message(self.mbox, uid, msg_container, -                                         notify_on_disk=notify_on_disk) -        return d +        self.memstore.create_message(self.mbox, uid, msg_container, +                                     observer=observer, +                                     notify_on_disk=notify_on_disk)      #      # getters: specific queries @@ -1030,7 +1044,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              self.recent_flags.difference_update(                  set([uid])) -    @deferred +    @deferred_to_thread      def set_recent_flag(self, uid):          """          Set Recent flag for a given uid. @@ -1080,7 +1094,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              return None          return fdoc.content.get(fields.UID_KEY, None) -    @deferred +    @deferred_to_thread      def _get_uid_from_msgid(self, msgid):          """          Return a UID for a given message-id. @@ -1100,7 +1114,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          return self._get_uid_from_msgidCb(msgid)      #@profile -    def set_flags(self, mbox, messages, flags, mode): +    def set_flags(self, mbox, messages, flags, mode, observer):          """          Set flags for a sequence of messages. @@ -1112,16 +1126,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :type flags: tuple          :param mode: the mode for setting. 1 is append, -1 is remove, 0 set.          :type mode: int +        :param observer: a deferred that will be called with the dictionary +                         mapping UIDs to flags after the operation has been +                         done. +        :type observer: deferred          """ -        result = {} +        # XXX we could defer *this* to thread pool, and gather results... +        # XXX use deferredList + +        deferreds = []          for msg_id in messages: -            log.msg("MSG ID = %s" % msg_id) -            msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) -            if not msg: -                continue -            result[msg_id] = msg.setFlags(flags, mode) +            deferreds.append( +                self._set_flag_for_uid(msg_id, flags, mode)) -        return result +        def notify(result): +            observer.callback(dict(result)) +        d1 = defer.gatherResults(deferreds, consumeErrors=True) +        d1.addCallback(notify) + +    @deferred_to_thread +    def _set_flag_for_uid(self, msg_id, flags, mode): +        """ +        Run the set_flag operation in the thread pool. +        """ +        log.msg("MSG ID = %s" % msg_id) +        msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) +        if msg is not None: +            return msg_id, msg.setFlags(flags, mode)      # getters: generic for a mailbox diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index ff5e03b..82f27e7 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -23,10 +23,12 @@ import threading  from itertools import chain  from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log  from zope.interface import implements  from leap.common.check import leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread  from leap.mail.imap.messageparts import MessagePartType  from leap.mail.imap.messageparts import MessageWrapper  from leap.mail.imap.messageparts import RecentFlagsDoc @@ -209,52 +211,87 @@ class SoledadStore(ContentDedup):          # TODO could generalize this method into a generic consumer          # and only implement `process` here +        def docWriteCallBack(doc_wrapper): +            """ +            Callback for a successful write of a document wrapper. +            """ +            if isinstance(doc_wrapper, MessageWrapper): +                # If everything went well, we can unset the new flag +                # in the source store (memory store) +                self._unset_new_dirty(doc_wrapper) + +        def docWriteErrorBack(failure): +            """ +            Errorback for write operations. +            """ +            log.error("Error while processing item.") +            log.msg(failure.getTraceBack()) +          while not queue.empty(): -            items = self._process(queue) +            doc_wrapper = queue.get() +            d = defer.Deferred() +            d.addCallbacks(docWriteCallBack, docWriteErrorBack) + +            self._consume_doc(doc_wrapper, d) + +    @deferred_to_thread +    def _unset_new_dirty(self, doc_wrapper): +        """ +        Unset the `new` and `dirty` flags for this document wrapper in the +        memory store. + +        :param doc_wrapper: a MessageWrapper instance +        :type doc_wrapper: MessageWrapper +        """ +        # XXX debug msg id/mbox? +        logger.info("unsetting new flag!") +        doc_wrapper.new = False +        doc_wrapper.dirty = False -            # we prime the generator, that should return the -            # message or flags wrapper item in the first place. -            doc_wrapper = items.next() +    @deferred_to_thread +    def _consume_doc(self, doc_wrapper, deferred): +        """ +        Consume each document wrapper in a separate thread. + +        :param doc_wrapper: +        :type doc_wrapper: +        :param deferred: +        :type deferred: Deferred +        """ +        items = self._process(doc_wrapper) -            # From here, we unpack the subpart items and -            # the right soledad call. +        # we prime the generator, that should return the +        # message or flags wrapper item in the first place. +        doc_wrapper = items.next() + +        # From here, we unpack the subpart items and +        # the right soledad call. +        failed = False +        for item, call in items:              try: -                failed = False -                for item, call in items: -                    try: -                        self._try_call(call, item) -                    except Exception as exc: -                        failed = exc -                        continue -                if failed: -                    raise MsgWriteError - -            except MsgWriteError: -                logger.error("Error while processing item.") -                logger.exception(failed) -            else: -                if isinstance(doc_wrapper, MessageWrapper): -                    # If everything went well, we can unset the new flag -                    # in the source store (memory store) -                    logger.info("unsetting new flag!") -                    doc_wrapper.new = False -                    doc_wrapper.dirty = False +                self._try_call(call, item) +            except Exception as exc: +                failed = exc +                continue +        if failed: +            deferred.errback(MsgWriteError( +                "There was an error writing the mesage")) +        else: +            deferred.callback(doc_wrapper)      #      # SoledadStore specific methods.      # -    def _process(self, queue): +    def _process(self, doc_wrapper):          """ -        Return an iterator that will yield the msg_wrapper in the first place, +        Return an iterator that will yield the doc_wrapper in the first place,          followed by the subparts item and the proper call type for every          item in the queue, if any.          :param queue: the queue from where we'll pick item.          :type queue: Queue          """ -        doc_wrapper = queue.get() -          if isinstance(doc_wrapper, MessageWrapper):              return chain((doc_wrapper,),                           self._get_calls_for_msg_parts(doc_wrapper)) | 
