diff options
| -rw-r--r-- | src/leap/mail/imap/account.py | 51 | ||||
| -rw-r--r-- | src/leap/mail/imap/mailbox.py | 275 | ||||
| -rw-r--r-- | src/leap/mail/imap/memorystore.py | 581 | ||||
| -rw-r--r-- | src/leap/mail/imap/messageparts.py | 46 | ||||
| -rw-r--r-- | src/leap/mail/imap/messages.py | 352 | ||||
| -rw-r--r-- | src/leap/mail/imap/server.py | 43 | ||||
| -rw-r--r-- | src/leap/mail/imap/service/imap.py | 59 | ||||
| -rw-r--r-- | src/leap/mail/imap/soledadstore.py | 369 | ||||
| -rwxr-xr-x | src/leap/mail/imap/tests/regressions | 6 | ||||
| -rw-r--r-- | src/leap/mail/imap/tests/test_imap.py | 432 | ||||
| -rw-r--r-- | src/leap/mail/messageflow.py | 26 | ||||
| -rw-r--r-- | src/leap/mail/utils.py | 104 | 
12 files changed, 1451 insertions, 893 deletions
| diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py index f985c04..1b5d4a0 100644 --- a/src/leap/mail/imap/account.py +++ b/src/leap/mail/imap/account.py @@ -18,9 +18,12 @@  Soledad Backed Account.  """  import copy +import logging +import os  import time  from twisted.mail import imap4 +from twisted.python import log  from zope.interface import implements  from leap.common.check import leap_assert, leap_assert_type @@ -30,12 +33,27 @@ from leap.mail.imap.parser import MBoxParser  from leap.mail.imap.mailbox import SoledadMailbox  from leap.soledad.client import Soledad +logger = logging.getLogger(__name__) + +PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False) + +if PROFILE_CMD: + +    def _debugProfiling(result, cmdname, start): +        took = (time.time() - start) * 1000 +        log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec") +        return result +  #######################################  # Soledad Account  ####################################### +# TODO change name to LeapIMAPAccount, since we're using +# the memstore. +# IndexedDB should also not be here anymore. +  class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):      """      An implementation of IAccount and INamespacePresenteer @@ -67,14 +85,19 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          # XXX SHOULD assert too that the name matches the user/uuid with which          # soledad has been initialized. +        # XXX ??? why is this parsing mailbox name??? it's account... +        # userid? homogenize.          self._account_name = self._parse_mailbox_name(account_name)          self._soledad = soledad          self._memstore = memstore +        self.__mailboxes = set([]) +          self.initialize_db()          # every user should have the right to an inbox folder          # at least, so let's make one! +        self._load_mailboxes()          if not self.mailboxes:              self.addMailbox(self.INBOX_NAME) @@ -106,9 +129,13 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          """          A list of the current mailboxes for this account.          """ -        return [doc.content[self.MBOX_KEY] -                for doc in self._soledad.get_from_index( -                    self.TYPE_IDX, self.MBOX_KEY)] +        return self.__mailboxes + +    def _load_mailboxes(self): +        self.__mailboxes.update( +            [doc.content[self.MBOX_KEY] +             for doc in self._soledad.get_from_index( +                 self.TYPE_IDX, self.MBOX_KEY)])      @property      def subscriptions(self): @@ -173,6 +200,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          mbox[self.CREATED_KEY] = creation_ts          doc = self._soledad.create_doc(mbox) +        self._load_mailboxes()          return bool(doc)      def create(self, pathspec): @@ -203,6 +231,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          except imap4.MailboxCollision:              if not pathspec.endswith('/'):                  return False +        self._load_mailboxes()          return True      def select(self, name, readwrite=1): @@ -215,17 +244,22 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):          :param readwrite: 1 for readwrite permissions.          :type readwrite: int -        :rtype: bool +        :rtype: SoledadMailbox          """ -        name = self._parse_mailbox_name(name) +        if PROFILE_CMD: +            start = time.time() +        name = self._parse_mailbox_name(name)          if name not in self.mailboxes: +            logger.warning("No such mailbox!")              return None -          self.selected = name -        return SoledadMailbox( +        sm = SoledadMailbox(              name, self._soledad, self._memstore, readwrite) +        if PROFILE_CMD: +            _debugProfiling(None, "SELECT", start) +        return sm      def delete(self, name, force=False):          """ @@ -260,6 +294,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):                              "Hierarchically inferior mailboxes "                              "exist and \\Noselect is set")          mbox.destroy() +        self._load_mailboxes()          # XXX FIXME --- not honoring the inferior names... @@ -297,6 +332,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):              mbox.content[self.MBOX_KEY] = new              self._soledad.put_doc(mbox) +        self._load_mailboxes() +          # XXX ---- FIXME!!!! ------------------------------------          # until here we just renamed the index...          # We have to rename also the occurrence of this diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index d8af0a5..57505f0 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -50,6 +50,25 @@ If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid  notifying clients of new messages. Use during stress tests.  """  NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False) + +if PROFILE_CMD: +    import time + +    def _debugProfiling(result, cmdname, start): +        took = (time.time() - start) * 1000 +        log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec") +        return result + +    def do_profile_cmd(d, name): +        """ +        Add the profiling debug to the passed callback. +        :param d: deferred +        :param name: name of the command +        :type name: str +        """ +        d.addCallback(_debugProfiling, name, time.time()) +        d.addErrback(lambda f: log.msg(f.getTraceback()))  class SoledadMailbox(WithMsgFields, MBoxParser): @@ -89,6 +108,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      _listeners = defaultdict(set)      next_uid_lock = threading.Lock() +    last_uid_lock = threading.Lock() + +    # TODO unify all the `primed` dicts +    _fdoc_primed = {} +    _last_uid_primed = {} +    _known_uids_primed = {}      def __init__(self, mbox, soledad, memstore, rw=1):          """ @@ -107,6 +132,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :param rw: read-and-write flag for this mailbox          :type rw: int          """ +        logger.debug("Initializing mailbox %r" % (mbox,))          leap_assert(mbox, "Need a mailbox name to initialize")          leap_assert(soledad, "Need a soledad instance to initialize") @@ -123,12 +149,24 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          self.messages = MessageCollection(              mbox=mbox, soledad=self._soledad, memstore=self._memstore) +        # XXX careful with this get/set (it would be +        # hitting db unconditionally, move to memstore too) +        # Now it's returning a fixed amount of flags from mem +        # as a workaround.          if not self.getFlags():              self.setFlags(self.INIT_FLAGS)          if self._memstore:              self.prime_known_uids_to_memstore()              self.prime_last_uid_to_memstore() +            self.prime_flag_docs_to_memstore() + +        from twisted.internet import reactor +        self.reactor = reactor + +        # purge memstore from empty fdocs. +        self._memstore.purge_fdoc_store(mbox) +        logger.debug("DONE initializing mailbox %r" % (mbox,))      @property      def listeners(self): @@ -170,8 +208,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          """          self.listeners.remove(listener) -    # TODO move completely to soledadstore, under memstore reponsibility. -    def _get_mbox(self): +    def _get_mbox_doc(self):          """          Return mailbox document. @@ -179,14 +216,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                   the query failed.          :rtype: SoledadDocument or None.          """ -        try: -            query = self._soledad.get_from_index( -                fields.TYPE_MBOX_IDX, -                fields.TYPE_MBOX_VAL, self.mbox) -            if query: -                return query.pop() -        except Exception as exc: -            logger.exception("Unhandled error %r" % exc) +        return self._memstore.get_mbox_doc(self.mbox)      def getFlags(self):          """ @@ -195,12 +225,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :returns: tuple of flags for this mailbox          :rtype: tuple of str          """ -        mbox = self._get_mbox() -        if not mbox: -            return None -        flags = mbox.content.get(self.FLAGS_KEY, []) +        flags = self.INIT_FLAGS + +        # XXX returning fixed flags always +        # Since I have not found a case where the client +        # wants to modify this, as a way of speeding up +        # selects. To do it right, we probably should keep +        # track of the set of all flags used by msgs +        # in this mailbox. Does it matter? +        #mbox = self._get_mbox_doc() +        #if not mbox: +            #return None +        #flags = mbox.content.get(self.FLAGS_KEY, [])          return map(str, flags) +    # XXX move to memstore->soledadstore      def setFlags(self, flags):          """          Sets flags for this mailbox. @@ -210,10 +249,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          """          leap_assert(isinstance(flags, tuple),                      "flags expected to be a tuple") -        mbox = self._get_mbox() +        mbox = self._get_mbox_doc()          if not mbox:              return None          mbox.content[self.FLAGS_KEY] = map(str, flags) +        logger.debug("Writing mbox document for %r to Soledad" +                     % (self.mbox,))          self._soledad.put_doc(mbox)      # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG. @@ -225,8 +266,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :return: True if the mailbox is closed          :rtype: bool          """ -        mbox = self._get_mbox() -        return mbox.content.get(self.CLOSED_KEY, False) +        return self._memstore.get_mbox_closed(self.mbox)      def _set_closed(self, closed):          """ @@ -235,10 +275,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :param closed: the state to be set          :type closed: bool          """ -        leap_assert(isinstance(closed, bool), "closed needs to be boolean") -        mbox = self._get_mbox() -        mbox.content[self.CLOSED_KEY] = closed -        self._soledad.put_doc(mbox) +        self._memstore.set_mbox_closed(self.mbox, closed)      closed = property(          _get_closed, _set_closed, doc="Closed attribute.") @@ -265,10 +302,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          """          Prime memstore with last_uid value          """ -        set_exist = set(self.messages.all_uid_iter()) -        last = max(set_exist) if set_exist else 0 -        logger.info("Priming Soledad last_uid to %s" % (last,)) -        self._memstore.set_last_soledad_uid(self.mbox, last) +        primed = self._last_uid_primed.get(self.mbox, False) +        if not primed: +            mbox = self._get_mbox_doc() +            last = mbox.content.get('lastuid', 0) +            logger.info("Priming Soledad last_uid to %s" % (last,)) +            self._memstore.set_last_soledad_uid(self.mbox, last) +            self._last_uid_primed[self.mbox] = True      def prime_known_uids_to_memstore(self):          """ @@ -276,8 +316,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          We do this to be able to filter the requests efficiently.          """ -        known_uids = self.messages.all_soledad_uid_iter() -        self._memstore.set_known_uids(self.mbox, known_uids) +        primed = self._known_uids_primed.get(self.mbox, False) +        if not primed: +            known_uids = self.messages.all_soledad_uid_iter() +            self._memstore.set_known_uids(self.mbox, known_uids) +            self._known_uids_primed[self.mbox] = True + +    def prime_flag_docs_to_memstore(self): +        """ +        Prime memstore with all the flags documents. +        """ +        primed = self._fdoc_primed.get(self.mbox, False) +        if not primed: +            all_flag_docs = self.messages.get_all_soledad_flag_docs() +            self._memstore.load_flag_docs(self.mbox, all_flag_docs) +            self._fdoc_primed[self.mbox] = True      def getUIDValidity(self):          """ @@ -286,7 +339,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :return: unique validity identifier          :rtype: int          """ -        mbox = self._get_mbox() +        mbox = self._get_mbox_doc()          return mbox.content.get(self.CREATED_KEY, 1)      def getUID(self, message): @@ -420,6 +473,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              flags = tuple(str(flag) for flag in flags)          d = self._do_add_message(message, flags=flags, date=date) +        if PROFILE_CMD: +            do_profile_cmd(d, "APPEND") +        # XXX should notify here probably          return d      def _do_add_message(self, message, flags, date): @@ -428,15 +484,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          Invoked from addMessage.          """          d = self.messages.add_msg(message, flags=flags, date=date) -        # XXX Removing notify temporarily. -        # This is interfering with imaptest results. I'm not clear if it's -        # because we clutter the logging or because the set of listeners is -        # ever-growing. We should come up with some smart way of dealing with -        # it, or maybe just disabling it using an environmental variable since -        # we will only have just a few listeners in the regular desktop case. -        #d.addCallback(self.notify_new)          return d +    @deferred_to_thread      def notify_new(self, *args):          """          Notify of new messages to all the listeners. @@ -447,12 +497,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              return          exists = self.getMessageCount()          recent = self.getRecentCount() -        logger.debug("NOTIFY: there are %s messages, %s recent" % ( -            exists, -            recent)) +        logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( +            self.mbox, exists, recent))          for l in self.listeners: -            logger.debug('notifying...')              l.newMessages(exists, recent)      # commands, do not rename methods @@ -471,7 +519,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          # we should postpone the removal          # XXX move to memory store?? -        self._soledad.delete_doc(self._get_mbox()) +        self._soledad.delete_doc(self._get_mbox_doc())      def _close_cb(self, result):          self.closed = True @@ -527,8 +575,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          seq_messg = set_asked.intersection(set_exist)          return seq_messg -    @deferred_to_thread -    #@profile      def fetch(self, messages_asked, uid):          """          Retrieve one or more messages in this mailbox. @@ -544,6 +590,27 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                      otherwise.          :type uid: bool +        :rtype: deferred +        """ +        d = defer.Deferred() +        self.reactor.callInThread(self._do_fetch, messages_asked, uid, d) +        if PROFILE_CMD: +            do_profile_cmd(d, "FETCH") +        return d + +    # called in thread +    def _do_fetch(self, messages_asked, uid, d): +        """ +        :param messages_asked: IDs of the messages to retrieve information +                               about +        :type messages_asked: MessageSet + +        :param uid: If true, the IDs are UIDs. They are message sequence IDs +                    otherwise. +        :type uid: bool +        :param d: deferred whose callback will be called with result. +        :type d: Deferred +          :rtype: A tuple of two-tuples of message sequence numbers and                  LeapMessage          """ @@ -564,10 +631,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              logger.debug("Getting msg by index: INEFFICIENT call!")              raise NotImplementedError          else: -            result = ((msgid, getmsg(msgid)) for msgid in seq_messg) -        return result +            got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg) +            result = ((msgid, msg) for msgid, msg in got_msg +                      if msg is not None) +            self.reactor.callLater(0, self.unset_recent_flags, seq_messg) +            self.reactor.callFromThread(d.callback, result) -    @deferred_to_thread      def fetch_flags(self, messages_asked, uid):          """          A fast method to fetch all flags, tricking just the @@ -606,12 +675,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          messages_asked = self._bound_seq(messages_asked)          seq_messg = self._filter_msg_seq(messages_asked) -        all_flags = self.messages.all_flags() +        all_flags = self._memstore.all_flags(self.mbox)          result = ((msgid, flagsPart(              msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)          return result -    @deferred_to_thread      def fetch_headers(self, messages_asked, uid):          """          A fast method to fetch all headers, tricking just the @@ -636,6 +704,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                  MessagePart.          :rtype: tuple          """ +        # TODO how often is thunderbird doing this? +          class headersPart(object):              def __init__(self, uid, headers):                  self.uid = uid @@ -653,10 +723,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          messages_asked = self._bound_seq(messages_asked)          seq_messg = self._filter_msg_seq(messages_asked) -        all_chash = self.messages.all_flags_chash()          all_headers = self.messages.all_headers()          result = ((msgid, headersPart( -            msgid, all_headers.get(all_chash.get(msgid, 'nil'), {}))) +            msgid, all_headers.get(msgid, {})))              for msgid in seq_messg)          return result @@ -699,14 +768,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :raise ReadOnlyMailbox: Raised if this mailbox is not open for                                  read-write.          """ -        from twisted.internet import reactor          if not self.isWriteable():              log.msg('read only mailbox!')              raise imap4.ReadOnlyMailbox          d = defer.Deferred() -        deferLater(reactor, 0, self._do_store, messages_asked, flags, -                   mode, uid, d) +        self.reactor.callLater(0, self._do_store, messages_asked, flags, +                               mode, uid, d) +        if PROFILE_CMD: +            do_profile_cmd(d, "STORE")          return d      def _do_store(self, messages_asked, flags, mode, uid, observer): @@ -721,7 +791,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :type observer: deferred          """          # XXX implement also sequence (uid = 0) -        # XXX we should prevent cclient from setting Recent flag? +        # XXX we should prevent client from setting Recent flag?          leap_assert(not isinstance(flags, basestring),                      "flags cannot be a string")          flags = tuple(flags) @@ -785,15 +855,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                   uid when the copy succeed.          :rtype: Deferred          """ -        from twisted.internet import reactor -          d = defer.Deferred() -        # XXX this should not happen ... track it down, -        # probably to FETCH... -        if message is None: -            log.msg("BUG: COPY found a None in passed message") -            d.callback(None) -        deferLater(reactor, 0, self._do_copy, message, d) +        if PROFILE_CMD: +            do_profile_cmd(d, "COPY") +        deferLater(self.reactor, 0, self._do_copy, message, d)          return d      def _do_copy(self, message, observer): @@ -809,51 +874,70 @@ class SoledadMailbox(WithMsgFields, MBoxParser):                           UID of the message          :type observer: Deferred          """ +        memstore = self._memstore + +        def createCopy(result): +            exist, new_fdoc = result +            if exist: +                # Should we signal error on the callback? +                logger.warning("Destination message already exists!") + +                # XXX I'm not sure if we should raise the +                # errback. This actually rases an ugly warning +                # in some muas like thunderbird. +                # UID 0 seems a good convention for no uid. +                observer.callback(0) +            else: +                mbox = self.mbox +                uid_next = memstore.increment_last_soledad_uid(mbox) + +                new_fdoc[self.UID_KEY] = uid_next +                new_fdoc[self.MBOX_KEY] = mbox + +                flags = list(new_fdoc[self.FLAGS_KEY]) +                flags.append(fields.RECENT_FLAG) +                new_fdoc[self.FLAGS_KEY] = tuple(set(flags)) + +                # FIXME set recent! + +                self._memstore.create_message( +                    self.mbox, uid_next, +                    MessageWrapper(new_fdoc), +                    observer=observer, +                    notify_on_disk=False) + +        d = self._get_msg_copy(message) +        d.addCallback(createCopy) +        d.addErrback(lambda f: log.msg(f.getTraceback())) + +    @deferred_to_thread +    def _get_msg_copy(self, message): +        """ +        Get a copy of the fdoc for this message, and check whether +        it already exists. + +        :param message: an IMessage implementor +        :type message: LeapMessage +        :return: exist, new_fdoc +        :rtype: tuple +        """          # XXX  for clarity, this could be delegated to a          # MessageCollection mixin that implements copy too, and          # moved out of here.          msg = message          memstore = self._memstore -        # XXX should use a public api instead -        fdoc = msg._fdoc -        hdoc = msg._hdoc -        if not fdoc: +        if empty(msg.fdoc):              logger.warning("Tried to copy a MSG with no fdoc")              return -        new_fdoc = copy.deepcopy(fdoc.content) - +        new_fdoc = copy.deepcopy(msg.fdoc.content)          fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] -        # XXX is this hitting the db??? --- probably. -        # We should profile after the pre-fetch.          dest_fdoc = memstore.get_fdoc_from_chash(              fdoc_chash, self.mbox) -        exist = dest_fdoc and not empty(dest_fdoc.content) - -        if exist: -            # Should we signal error on the callback? -            logger.warning("Destination message already exists!") - -            # XXX I'm still not clear if we should raise the -            # errback. This actually rases an ugly warning -            # in some muas like thunderbird. I guess the user does -            # not deserve that. -            observer.callback(True) -        else: -            mbox = self.mbox -            uid_next = memstore.increment_last_soledad_uid(mbox) -            new_fdoc[self.UID_KEY] = uid_next -            new_fdoc[self.MBOX_KEY] = mbox - -            # FIXME set recent! -            self._memstore.create_message( -                self.mbox, uid_next, -                MessageWrapper( -                    new_fdoc, hdoc.content), -                observer=observer, -                notify_on_disk=False) +        exist = not empty(dest_fdoc) +        return exist, new_fdoc      # convenience fun @@ -865,12 +949,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          for doc in docs:              self.messages._soledad.delete_doc(doc) -    def unset_recent_flags(self, uids): +    def unset_recent_flags(self, uid_seq):          """          Unset Recent flag for a sequence of UIDs.          """ -        seq_messg = self._bound_seq(uids) -        self.messages.unset_recent_flags(seq_messg) +        self.messages.unset_recent_flags(uid_seq)      def __repr__(self):          """ diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index ed2b3f2..f23a234 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -25,6 +25,7 @@ import weakref  from collections import defaultdict  from copy import copy +from enum import Enum  from twisted.internet import defer  from twisted.internet.task import LoopingCall  from twisted.python import log @@ -32,8 +33,7 @@ from zope.interface import implements  from leap.common.check import leap_assert_type  from leap.mail import size -from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty +from leap.mail.utils import empty, phash_iter  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields @@ -42,12 +42,19 @@ from leap.mail.imap.messageparts import RecentFlagsDoc  from leap.mail.imap.messageparts import MessageWrapper  from leap.mail.imap.messageparts import ReferenciableDict +from leap.mail.decorators import deferred_to_thread +  logger = logging.getLogger(__name__)  # The default period to do writebacks to the permanent  # soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 10 +SOLEDAD_WRITE_PERIOD = 15 + +FDOC = MessagePartType.fdoc.key +HDOC = MessagePartType.hdoc.key +CDOCS = MessagePartType.cdocs.key +DOCS_ID = MessagePartType.docs_id.key  @contextlib.contextmanager @@ -65,6 +72,9 @@ def set_bool_flag(obj, att):          setattr(obj, att, False) +DirtyState = Enum("none", "dirty", "new") + +  class MemoryStore(object):      """      An in-memory store to where we can write the different parts that @@ -88,6 +98,7 @@ class MemoryStore(object):      WRITING_FLAG = "_writing"      _last_uid_lock = threading.Lock() +    _fdoc_docid_lock = threading.Lock()      def __init__(self, permanent_store=None,                   write_period=SOLEDAD_WRITE_PERIOD): @@ -100,11 +111,19 @@ class MemoryStore(object):          :param write_period: the interval to dump messages to disk, in seconds.          :type write_period: int          """ +        from twisted.internet import reactor +        self.reactor = reactor +          self._permanent_store = permanent_store          self._write_period = write_period          # Internal Storage: messages -        self._msg_store = {} +        """ +        flags document store. +        _fdoc_store[mbox][uid] = { 'content': 'aaa' } +        """ +        self._fdoc_store = defaultdict(lambda: defaultdict( +            lambda: ReferenciableDict({})))          # Sizes          """ @@ -114,9 +133,28 @@ class MemoryStore(object):          # Internal Storage: payload-hash          """ -        {'phash': weakreaf.proxy(dict)} +        fdocs:doc-id store, stores document IDs for putting +        the dirty flags-docs. +        """ +        self._fdoc_id_store = defaultdict(lambda: defaultdict( +            lambda: '')) + +        # Internal Storage: content-hash:hdoc +        """ +        hdoc-store keeps references to +        the header-documents indexed by content-hash. + +        {'chash': { dict-stuff } +        } +        """ +        self._hdoc_store = defaultdict(lambda: ReferenciableDict({})) + +        # Internal Storage: payload-hash:cdoc +        """ +        content-docs stored by payload-hash +        {'phash': { dict-stuff } }          """ -        self._phash_store = {} +        self._cdoc_store = defaultdict(lambda: ReferenciableDict({}))          # Internal Storage: content-hash:fdoc          """ @@ -127,7 +165,7 @@ class MemoryStore(object):                     'mbox-b': weakref.proxy(dict)}          }          """ -        self._chash_fdoc_store = {} +        self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None))          # Internal Storage: recent-flags store          """ @@ -153,7 +191,7 @@ class MemoryStore(object):          {'mbox-a': 42,           'mbox-b': 23}          """ -        self._last_uid = {} +        self._last_uid = defaultdict(lambda: 0)          """          known-uids keeps a count of the uids that soledad knows for a given @@ -165,11 +203,15 @@ class MemoryStore(object):          # New and dirty flags, to set MessageWrapper State.          self._new = set([]) +        self._new_queue = set([])          self._new_deferreds = {} +          self._dirty = set([]) -        self._rflags_dirty = set([]) +        self._dirty_queue = set([])          self._dirty_deferreds = {} +        self._rflags_dirty = set([]) +          # Flag for signaling we're busy writing to the disk storage.          setattr(self, self.WRITING_FLAG, False) @@ -185,11 +227,17 @@ class MemoryStore(object):              # We can start the write loop right now, why wait?              self._start_write_loop() +        else: +            # We have a memory-only store. +            self.producer = None +            self._write_loop = None      def _start_write_loop(self):          """          Start loop for writing to disk database.          """ +        if self._write_loop is None: +            return          if not self._write_loop.running:              self._write_loop.start(self._write_period, now=True) @@ -197,6 +245,8 @@ class MemoryStore(object):          """          Stop loop for writing to disk database.          """ +        if self._write_loop is None: +            return          if self._write_loop.running:              self._write_loop.stop() @@ -230,34 +280,30 @@ class MemoryStore(object):                                 be fired.          :type notify_on_disk: bool          """ -        log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) +        log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid))          key = mbox, uid          self._add_message(mbox, uid, message, notify_on_disk)          self._new.add(key) -        # XXX use this while debugging the callback firing, -        # remove after unittesting this. -        #def log_add(result): -            #return result -        #observer.addCallback(log_add) - -        if notify_on_disk: -            # We store this deferred so we can keep track of the pending -            # operations internally. -            # TODO this should fire with the UID !!! -- change that in -            # the soledad store code. -            self._new_deferreds[key] = observer -        if not notify_on_disk: -            # Caller does not care, just fired and forgot, so we pass -            # a defer that will inmediately have its callback triggered. -            observer.callback(uid) +        if observer is not None: +            if notify_on_disk: +                # We store this deferred so we can keep track of the pending +                # operations internally. +                # TODO this should fire with the UID !!! -- change that in +                # the soledad store code. +                self._new_deferreds[key] = observer + +            else: +                # Caller does not care, just fired and forgot, so we pass +                # a defer that will inmediately have its callback triggered. +                self.reactor.callFromThread(observer.callback, uid)      def put_message(self, mbox, uid, message, notify_on_disk=True):          """          Put an existing message. -        This will set the dirty flag on the MemoryStore. +        This will also set the dirty flag on the MemoryStore.          :param mbox: the mailbox          :type mbox: str or unicode @@ -289,76 +335,59 @@ class MemoryStore(object):          Helper method, called by both create_message and put_message.          See those for parameter documentation.          """ -        # XXX have to differentiate between notify_new and notify_dirty -        # TODO defaultdict the hell outa here... - -        key = mbox, uid          msg_dict = message.as_dict() -        FDOC = MessagePartType.fdoc.key -        HDOC = MessagePartType.hdoc.key -        CDOCS = MessagePartType.cdocs.key -        DOCS_ID = MessagePartType.docs_id.key - -        try: -            store = self._msg_store[key] -        except KeyError: -            self._msg_store[key] = {FDOC: {}, -                                    HDOC: {}, -                                    CDOCS: {}, -                                    DOCS_ID: {}} -            store = self._msg_store[key] -          fdoc = msg_dict.get(FDOC, None) -        if fdoc: -            if not store.get(FDOC, None): -                store[FDOC] = ReferenciableDict({}) -            store[FDOC].update(fdoc) +        if fdoc is not None: +            fdoc_store = self._fdoc_store[mbox][uid] +            fdoc_store.update(fdoc) +            chash_fdoc_store = self._chash_fdoc_store              # content-hash indexing              chash = fdoc.get(fields.CONTENT_HASH_KEY) -            chash_fdoc_store = self._chash_fdoc_store -            if not chash in chash_fdoc_store: -                chash_fdoc_store[chash] = {} -              chash_fdoc_store[chash][mbox] = weakref.proxy( -                store[FDOC]) +                self._fdoc_store[mbox][uid])          hdoc = msg_dict.get(HDOC, None)          if hdoc is not None: -            if not store.get(HDOC, None): -                store[HDOC] = ReferenciableDict({}) -            store[HDOC].update(hdoc) - -        docs_id = msg_dict.get(DOCS_ID, None) -        if docs_id: -            if not store.get(DOCS_ID, None): -                store[DOCS_ID] = {} -            store[DOCS_ID].update(docs_id) +            chash = hdoc.get(fields.CONTENT_HASH_KEY) +            hdoc_store = self._hdoc_store[chash] +            hdoc_store.update(hdoc)          cdocs = message.cdocs -        for cdoc_key in cdocs.keys(): -            if not store.get(CDOCS, None): -                store[CDOCS] = {} - -            cdoc = cdocs[cdoc_key] -            # first we make it weak-referenciable -            referenciable_cdoc = ReferenciableDict(cdoc) -            store[CDOCS][cdoc_key] = referenciable_cdoc +        for cdoc in cdocs.values():              phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)              if not phash:                  continue -            self._phash_store[phash] = weakref.proxy(referenciable_cdoc) +            cdoc_store = self._cdoc_store[phash] +            cdoc_store.update(cdoc) -        def prune(seq, store): -            for key in seq: -                if key in store and empty(store.get(key)): -                    store.pop(key) +        # Update memory store size +        # XXX this should use [mbox][uid] +        key = mbox, uid +        self._sizes[key] = size.get_size(self._fdoc_store[key]) +        # TODO add hdoc and cdocs sizes too -        prune((FDOC, HDOC, CDOCS, DOCS_ID), store) +    def purge_fdoc_store(self, mbox): +        """ +        Purge the empty documents from a fdoc store. +        Called during initialization of the SoledadMailbox -        # Update memory store size -        self._sizes[key] = size(self._msg_store[key]) +        :param mbox: the mailbox +        :type mbox: str or unicode +        """ +        # XXX This is really a workaround until I find the conditions +        # that are making the empty items remain there. +        # This happens, for instance, after running several times +        # the regression test, that issues a store deleted + expunge + select +        # The items are being correclty deleted, but in succesive appends +        # the empty items with previously deleted uids reappear as empty +        # documents. I suspect it's a timing condition with a previously +        # evaluated sequence being used after the items has been removed. + +        for uid, value in self._fdoc_store[mbox].items(): +            if empty(value): +                del self._fdoc_store[mbox][uid]      def get_docid_for_fdoc(self, mbox, uid):          """ @@ -371,13 +400,20 @@ class MemoryStore(object):          :type uid: int          :rtype: unicode or None          """ -        fdoc = self._permanent_store.get_flags_doc(mbox, uid) -        if empty(fdoc): -            return None -        doc_id = fdoc.doc_id +        with self._fdoc_docid_lock: +            doc_id = self._fdoc_id_store[mbox][uid] + +        if empty(doc_id): +            fdoc = self._permanent_store.get_flags_doc(mbox, uid) +            if empty(fdoc) or empty(fdoc.content): +                return None +            doc_id = fdoc.doc_id +            self._fdoc_id_store[mbox][uid] = doc_id +          return doc_id -    def get_message(self, mbox, uid, flags_only=False): +    def get_message(self, mbox, uid, dirtystate=DirtyState.none, +                    flags_only=False):          """          Get a MessageWrapper for the given mbox and uid combination. @@ -385,25 +421,58 @@ class MemoryStore(object):          :type mbox: str or unicode          :param uid: the message UID          :type uid: int +        :param dirtystate: DirtyState enum: one of `dirty`, `new` +                           or `none` (default) +        :type dirtystate: enum          :param flags_only: whether the message should carry only a reference                             to the flags document.          :type flags_only: bool +        :          :return: MessageWrapper or None          """ +        if dirtystate == DirtyState.dirty: +            flags_only = True +          key = mbox, uid -        FDOC = MessagePartType.fdoc.key -        msg_dict = self._msg_store.get(key, None) -        if empty(msg_dict): +        fdoc = self._fdoc_store[mbox][uid] +        if empty(fdoc):              return None -        new, dirty = self._get_new_dirty_state(key) + +        new, dirty = False, False +        if dirtystate == DirtyState.none: +            new, dirty = self._get_new_dirty_state(key) +        if dirtystate == DirtyState.dirty: +            new, dirty = False, True +        if dirtystate == DirtyState.new: +            new, dirty = True, False +          if flags_only: -            return MessageWrapper(fdoc=msg_dict[FDOC], +            return MessageWrapper(fdoc=fdoc,                                    new=new, dirty=dirty,                                    memstore=weakref.proxy(self))          else: -            return MessageWrapper(from_dict=msg_dict, +            chash = fdoc.get(fields.CONTENT_HASH_KEY) +            hdoc = self._hdoc_store[chash] +            if empty(hdoc): +                hdoc = self._permanent_store.get_headers_doc(chash) +                if empty(hdoc): +                    return None +                if not empty(hdoc.content): +                    self._hdoc_store[chash] = hdoc.content +                    hdoc = hdoc.content +            cdocs = None + +            pmap = hdoc.get(fields.PARTS_MAP_KEY, None) +            if new and pmap is not None: +                # take the different cdocs for write... +                cdoc_store = self._cdoc_store +                cdocs_list = phash_iter(hdoc) +                cdocs = dict(enumerate( +                    [cdoc_store[phash] for phash in cdocs_list], 1)) + +            return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs,                                    new=new, dirty=dirty,                                    memstore=weakref.proxy(self)) @@ -424,23 +493,36 @@ class MemoryStore(object):          # token to ensure consistency in the removal.          try: +            del self._fdoc_store[mbox][uid] +        except KeyError: +            pass + +        try:              key = mbox, uid              self._new.discard(key)              self._dirty.discard(key) -            self._msg_store.pop(key, None)              if key in self._sizes:                  del self._sizes[key] - +            self._known_uids[mbox].discard(uid) +        except Exception as exc: +            logger.error("error while removing message!") +            logger.exception(exc) +        try: +            with self._fdoc_docid_lock: +                del self._fdoc_id_store[mbox][uid]          except Exception as exc: +            logger.error("error while removing message!")              logger.exception(exc)      # IMessageStoreWriter +    @deferred_to_thread      def write_messages(self, store):          """          Write the message documents in this MemoryStore to a different store.          :param store: the IMessageStore to write to +        :rtype: False if queue is not empty, None otherwise.          """          # For now, we pass if the queue is not empty, to avoid duplicate          # queuing. @@ -450,7 +532,7 @@ class MemoryStore(object):          # XXX this could return the deferred for all the enqueued operations          if not self.producer.is_queue_empty(): -            return +            return False          if any(map(lambda i: not empty(i), (self._new, self._dirty))):              logger.info("Writing messages to Soledad...") @@ -459,9 +541,14 @@ class MemoryStore(object):          # is accquired          with set_bool_flag(self, self.WRITING_FLAG):              for rflags_doc_wrapper in self.all_rdocs_iter(): -                self.producer.push(rflags_doc_wrapper) -            for msg_wrapper in self.all_new_dirty_msg_iter(): -                self.producer.push(msg_wrapper) +                self.producer.push(rflags_doc_wrapper, +                                   state=self.producer.STATE_DIRTY) +            for msg_wrapper in self.all_new_msg_iter(): +                self.producer.push(msg_wrapper, +                                   state=self.producer.STATE_NEW) +            for msg_wrapper in self.all_dirty_msg_iter(): +                self.producer.push(msg_wrapper, +                                   state=self.producer.STATE_DIRTY)      # MemoryStore specific methods. @@ -473,8 +560,7 @@ class MemoryStore(object):          :type mbox: str or unicode          :rtype: list          """ -        all_keys = self._msg_store.keys() -        return [uid for m, uid in all_keys if m == mbox] +        return self._fdoc_store[mbox].keys()      def get_soledad_known_uids(self, mbox):          """ @@ -523,7 +609,8 @@ class MemoryStore(object):          :param value: the value to set          :type value: int          """ -        leap_assert_type(value, int) +        # can be long??? +        #leap_assert_type(value, int)          logger.info("setting last soledad uid for %s to %s" %                      (mbox, value))          # if we already have a value here, don't do anything @@ -555,10 +642,9 @@ class MemoryStore(object):          with self._last_uid_lock:              self._last_uid[mbox] += 1              value = self._last_uid[mbox] -            self.write_last_uid(mbox, value) +            self.reactor.callInThread(self.write_last_uid, mbox, value)              return value -    @deferred_to_thread      def write_last_uid(self, mbox, value):          """          Increment the soledad integer cache for the highest uid value. @@ -572,11 +658,112 @@ class MemoryStore(object):          if self._permanent_store:              self._permanent_store.write_last_uid(mbox, value) +    def load_flag_docs(self, mbox, flag_docs): +        """ +        Load the flag documents for the given mbox. +        Used during initial flag docs prefetch. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :param flag_docs: a dict with the content for the flag docs, indexed +                          by uid. +        :type flag_docs: dict +        """ +        # We can do direct assignments cause we know this will only +        # be called during initialization of the mailbox. +        # TODO could hook here a sanity-check +        # for duplicates + +        fdoc_store = self._fdoc_store[mbox] +        chash_fdoc_store = self._chash_fdoc_store +        for uid in flag_docs: +            rdict = ReferenciableDict(flag_docs[uid]) +            fdoc_store[uid] = rdict +            # populate chash dict too, to avoid fdoc duplication +            chash = flag_docs[uid]["chash"] +            chash_fdoc_store[chash][mbox] = weakref.proxy( +                self._fdoc_store[mbox][uid]) + +    def update_flags(self, mbox, uid, fdoc): +        """ +        Update the flag document for a given mbox and uid combination, +        and set the dirty flag. +        We could use put_message, but this is faster. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :param uid: the uid of the message +        :type uid: int + +        :param fdoc: a dict with the content for the flag docs +        :type fdoc: dict +        """ +        key = mbox, uid +        self._fdoc_store[mbox][uid].update(fdoc) +        self._dirty.add(key) + +    def load_header_docs(self, header_docs): +        """ +        Load the flag documents for the given mbox. +        Used during header docs prefetch, and during cache after +        a read from soledad if the hdoc property in message did not +        find its value in here. + +        :param flag_docs: a dict with the content for the flag docs. +        :type flag_docs: dict +        """ +        hdoc_store = self._hdoc_store +        for chash in header_docs: +            hdoc_store[chash] = ReferenciableDict(header_docs[chash]) + +    def all_flags(self, mbox): +        """ +        Return a dictionary with all the flags for a given mbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :rtype: dict +        """ +        flags_dict = {} +        uids = self.get_uids(mbox) +        fdoc_store = self._fdoc_store[mbox] + +        for uid in uids: +            try: +                flags = fdoc_store[uid][fields.FLAGS_KEY] +                flags_dict[uid] = flags +            except KeyError: +                continue +        return flags_dict + +    def all_headers(self, mbox): +        """ +        Return a dictionary with all the header docs for a given mbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :rtype: dict +        """ +        headers_dict = {} +        uids = self.get_uids(mbox) +        fdoc_store = self._fdoc_store[mbox] +        hdoc_store = self._hdoc_store + +        for uid in uids: +            try: +                chash = fdoc_store[uid][fields.CONTENT_HASH_KEY] +                hdoc = hdoc_store[chash] +                if not empty(hdoc): +                    headers_dict[uid] = hdoc +            except KeyError: +                continue +        return headers_dict +      # Counting sheeps...      def count_new_mbox(self, mbox):          """ -        Count the new messages by inbox. +        Count the new messages by mailbox.          :param mbox: the mailbox          :type mbox: str or unicode @@ -594,6 +781,33 @@ class MemoryStore(object):          """          return len(self._new) +    def count(self, mbox): +        """ +        Return the count of messages for a given mbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :return: number of messages +        :rtype: int +        """ +        return len(self._fdoc_store[mbox]) + +    def unseen_iter(self, mbox): +        """ +        Get an iterator for the message UIDs with no `seen` flag +        for a given mailbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :return: iterator through unseen message doc UIDs +        :rtype: iterable +        """ +        fdocs = self._fdoc_store[mbox] + +        return [uid for uid, value +                in fdocs.items() +                if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])] +      def get_cdoc_from_phash(self, phash):          """          Return a content-document by its payload-hash. @@ -602,7 +816,7 @@ class MemoryStore(object):          :type phash: str or unicode          :rtype: MessagePartDoc          """ -        doc = self._phash_store.get(phash, None) +        doc = self._cdoc_store.get(phash, None)          # XXX return None for consistency? @@ -632,8 +846,7 @@ class MemoryStore(object):          :return: MessagePartDoc. It will return None if the flags document                   has empty content or it is flagged as \\Deleted.          """ -        docs_dict = self._chash_fdoc_store.get(chash, None) -        fdoc = docs_dict.get(mbox, None) if docs_dict else None +        fdoc = self._chash_fdoc_store[chash][mbox]          # a couple of special cases.          # 1. We might have a doc with empty content... @@ -644,53 +857,61 @@ class MemoryStore(object):          # We want to create a new one in this case.          # Hmmm what if the deletion is un-done?? We would end with a          # duplicate... -        if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: +        if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []):              return None          uid = fdoc[fields.UID_KEY]          key = mbox, uid          new = key in self._new          dirty = key in self._dirty +          return MessagePartDoc(              new=new, dirty=dirty, store="mem",              part=MessagePartType.fdoc,              content=fdoc,              doc_id=None) -    def all_msg_iter(self): +    def iter_fdoc_keys(self):          """ -        Return generator that iterates through all messages in the store. - -        :return: generator of MessageWrappers -        :rtype: generator +        Return a generator through all the mbox, uid keys in the flags-doc +        store.          """ -        return (self.get_message(*key) -                for key in sorted(self._msg_store.keys())) +        fdoc_store = self._fdoc_store +        for mbox in fdoc_store: +            for uid in fdoc_store[mbox]: +                yield mbox, uid -    def all_new_dirty_msg_iter(self): +    def all_new_msg_iter(self):          """ -        Return generator that iterates through all new and dirty messages. +        Return generator that iterates through all new messages.          :return: generator of MessageWrappers          :rtype: generator          """ -        return (self.get_message(*key) -                for key in sorted(self._msg_store.keys()) -                if key in self._new or key in self._dirty) +        gm = self.get_message +        # need to freeze, set can change during iteration +        new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)] +        # move content from new set to the queue +        self._new_queue.update(self._new) +        self._new.difference_update(self._new) +        return new -    def all_msg_dict_for_mbox(self, mbox): +    def all_dirty_msg_iter(self):          """ -        Return all the message dicts for a given mbox. +        Return generator that iterates through all dirty messages. -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: list of dictionaries -        :rtype: list +        :return: generator of MessageWrappers +        :rtype: generator          """ -        # This *needs* to return a fixed sequence. Otherwise the dictionary len -        # will change during iteration, when we modify it -        return [self._msg_store[(mb, uid)] -                for mb, uid in self._msg_store if mb == mbox] +        gm = self.get_message +        # need to freeze, set can change during iteration +        dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty) +                 for key in tuple(self._dirty)] +        # move content from new and dirty sets to the queue + +        self._dirty_queue.update(self._dirty) +        self._dirty.difference_update(self._dirty) +        return dirty      def all_deleted_uid_iter(self, mbox):          """ @@ -704,11 +925,10 @@ class MemoryStore(object):          """          # This *needs* to return a fixed sequence. Otherwise the dictionary len          # will change during iteration, when we modify it -        all_deleted = [ -            msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) -            if msg.get('fdoc', None) -            and fields.DELETED_FLAG in msg['fdoc']['flags']] -        return all_deleted +        fdocs = self._fdoc_store[mbox] +        return [uid for uid, value +                in fdocs.items() +                if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])]      # new, dirty flags @@ -721,26 +941,30 @@ class MemoryStore(object):          :return: tuple of bools          :rtype: tuple          """ +        # TODO change indexing of sets to [mbox][key] too.          # XXX should return *first* the news, and *then* the dirty... + +        # TODO should query in queues too , true? +        #          return map(lambda _set: key in _set, (self._new, self._dirty)) -    def set_new(self, key): +    def set_new_queued(self, key):          """ -        Add the key value to the `new` set. +        Add the key value to the `new-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._new.add(key) +        self._new_queue.add(key) -    def unset_new(self, key): +    def unset_new_queued(self, key):          """ -        Remove the key value from the `new` set. +        Remove the key value from the `new-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._new.discard(key) +        self._new_queue.discard(key)          deferreds = self._new_deferreds          d = deferreds.get(key, None)          if d: @@ -749,23 +973,23 @@ class MemoryStore(object):              d.callback('%s, ok' % str(key))              deferreds.pop(key) -    def set_dirty(self, key): +    def set_dirty_queued(self, key):          """ -        Add the key value to the `dirty` set. +        Add the key value to the `dirty-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._dirty.add(key) +        self._dirty_queue.add(key) -    def unset_dirty(self, key): +    def unset_dirty_queued(self, key):          """ -        Remove the key value from the `dirty` set. +        Remove the key value from the `dirty-queue` set.          :param key: the key for the message, in the form mbox, uid          :type key: tuple          """ -        self._dirty.discard(key) +        self._dirty_queue.discard(key)          deferreds = self._dirty_deferreds          d = deferreds.get(key, None)          if d: @@ -776,7 +1000,6 @@ class MemoryStore(object):      # Recent Flags -    # TODO --- nice but unused      def set_recent_flag(self, mbox, uid):          """          Set the `Recent` flag for a given mailbox and UID. @@ -894,6 +1117,8 @@ class MemoryStore(object):          """          self._stop_write_loop()          if self._permanent_store is not None: +            # XXX we should check if we did get a True value on this +            # operation. If we got False we should retry! (queue was not empty)              self.write_messages(self._permanent_store)              self.producer.flush() @@ -911,10 +1136,18 @@ class MemoryStore(object):          :type observer: Deferred          """          soledad_store = self._permanent_store +        if soledad_store is None: +            # just-in memory store, easy then. +            self._delete_from_memory(mbox, observer) +            return + +        # We have a soledad storage.          try:              # Stop and trigger last write              self.stop_and_flush()              # Wait on the writebacks to finish + +            # XXX what if pending deferreds is empty?              pending_deferreds = (self._new_deferreds.get(mbox, []) +                                   self._dirty_deferreds.get(mbox, []))              d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -923,6 +1156,18 @@ class MemoryStore(object):          except Exception as exc:              logger.exception(exc) +    def _delete_from_memory(self, mbox, observer): +        """ +        Remove all messages marked as deleted from soledad and memory. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :param observer: a deferred that will be fired when expunge is done +        :type observer: Deferred +        """ +        mem_deleted = self.remove_all_deleted(mbox) +        observer.callback(mem_deleted) +      def _delete_from_soledad_and_memory(self, result, mbox, observer):          """          Remove all messages marked as deleted from soledad and memory. @@ -939,12 +1184,8 @@ class MemoryStore(object):          try:              # 1. Delete all messages marked as deleted in soledad. - -            # XXX this could be deferred for faster operation. -            if soledad_store: -                sol_deleted = soledad_store.remove_all_deleted(mbox) -            else: -                sol_deleted = [] +            logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,)) +            sol_deleted = soledad_store.remove_all_deleted(mbox)              try:                  self._known_uids[mbox].difference_update(set(sol_deleted)) @@ -952,6 +1193,7 @@ class MemoryStore(object):                  logger.exception(exc)              # 2. Delete all messages marked as deleted in memory. +            logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,))              mem_deleted = self.remove_all_deleted(mbox)              all_deleted = set(mem_deleted).union(set(sol_deleted)) @@ -960,8 +1202,43 @@ class MemoryStore(object):              logger.exception(exc)          finally:              self._start_write_loop() +          observer.callback(all_deleted) +    # Mailbox documents and attributes + +    # This could be also be cached in memstore, but proxying directly +    # to soledad since it's not too performance-critical. + +    def get_mbox_doc(self, mbox): +        """ +        Return the soledad document for a given mailbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :rtype: SoledadDocument or None. +        """ +        return self.permanent_store.get_mbox_document(mbox) + +    def get_mbox_closed(self, mbox): +        """ +        Return the closed attribute for a given mailbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :rtype: bool +        """ +        return self.permanent_store.get_mbox_closed(mbox) + +    def set_mbox_closed(self, mbox, closed): +        """ +        Set the closed attribute for a given mailbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        """ +        self.permanent_store.set_mbox_closed(mbox, closed) +      # Dump-to-disk controls.      @property diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py index b1f333a..257721c 100644 --- a/src/leap/mail/imap/messageparts.py +++ b/src/leap/mail/imap/messageparts.py @@ -98,7 +98,7 @@ class MessageWrapper(object):      CDOCS = "cdocs"      DOCS_ID = "docs_id" -    # Using slots to limit some the memory footprint, +    # Using slots to limit some the memory use,      # Add your attribute here.      __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] @@ -148,7 +148,7 @@ class MessageWrapper(object):          """          return self._new -    def _set_new(self, value=True): +    def _set_new(self, value=False):          """          Set the value for the `new` flag, and propagate it          to the memory store if any. @@ -158,11 +158,14 @@ class MessageWrapper(object):          """          self._new = value          if self.memstore: -            mbox = self.fdoc.content['mbox'] -            uid = self.fdoc.content['uid'] +            mbox = self.fdoc.content.get('mbox', None) +            uid = self.fdoc.content.get('uid', None) +            if not mbox or not uid: +                logger.warning("Malformed fdoc") +                return              key = mbox, uid -            fun = [self.memstore.unset_new, -                   self.memstore.set_new][int(value)] +            fun = [self.memstore.unset_new_queued, +                   self.memstore.set_new_queued][int(value)]              fun(key)          else:              logger.warning("Could not find a memstore referenced from this " @@ -190,11 +193,14 @@ class MessageWrapper(object):          """          self._dirty = value          if self.memstore: -            mbox = self.fdoc.content['mbox'] -            uid = self.fdoc.content['uid'] +            mbox = self.fdoc.content.get('mbox', None) +            uid = self.fdoc.content.get('uid', None) +            if not mbox or not uid: +                logger.warning("Malformed fdoc") +                return              key = mbox, uid -            fun = [self.memstore.unset_dirty, -                   self.memstore.set_dirty][int(value)] +            fun = [self.memstore.unset_dirty_queued, +                   self.memstore.set_dirty_queued][int(value)]              fun(key)          else:              logger.warning("Could not find a memstore referenced from this " @@ -271,13 +277,17 @@ class MessageWrapper(object):          :rtype: generator          """          if self._dirty: -            mbox = self.fdoc.content[fields.MBOX_KEY] -            uid = self.fdoc.content[fields.UID_KEY] -            docid_dict = self._dict[self.DOCS_ID] -            docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( -                mbox, uid) - -        if not empty(self.fdoc.content): +            try: +                mbox = self.fdoc.content[fields.MBOX_KEY] +                uid = self.fdoc.content[fields.UID_KEY] +                docid_dict = self._dict[self.DOCS_ID] +                docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( +                    mbox, uid) +            except Exception as exc: +                logger.debug("Error while walking message...") +                logger.exception(exc) + +        if not empty(self.fdoc.content) and 'uid' in self.fdoc.content:              yield self.fdoc          if not empty(self.hdoc.content):              yield self.hdoc @@ -408,10 +418,8 @@ class MessagePart(object):          if payload:              content_type = self._get_ctype_from_document(phash)              charset = find_charset(content_type) -            logger.debug("Got charset from header: %s" % (charset,))              if charset is None:                  charset = self._get_charset(payload) -                logger.debug("Got charset: %s" % (charset,))              try:                  if isinstance(payload, unicode):                      payload = payload.encode(charset) diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 25fc55f..fc1ec55 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -28,7 +28,6 @@ from functools import partial  from twisted.mail import imap4  from twisted.internet import defer -from twisted.python import log  from zope.interface import implements  from zope.proxy import sameProxiedObjects @@ -78,7 +77,7 @@ def try_unique_query(curried):                  # TODO we could take action, like trigger a background                  # process to kill dupes.                  name = getattr(curried, 'expected', 'doc') -                logger.warning( +                logger.debug(                      "More than one %s found for this mbox, "                      "we got a duplicate!!" % (name,))              return query.pop() @@ -88,6 +87,13 @@ def try_unique_query(curried):          logger.exception("Unhandled error %r" % exc) +""" +A dictionary that keeps one lock per mbox and uid. +""" +# XXX too much overhead? +fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) + +  class LeapMessage(fields, MailParser, MBoxParser):      """      The main representation of a message. @@ -102,8 +108,6 @@ class LeapMessage(fields, MailParser, MBoxParser):      implements(imap4.IMessage) -    flags_lock = threading.Lock() -      def __init__(self, soledad, uid, mbox, collection=None, container=None):          """          Initializes a LeapMessage. @@ -129,10 +133,13 @@ class LeapMessage(fields, MailParser, MBoxParser):          self.__chash = None          self.__bdoc = None +        from twisted.internet import reactor +        self.reactor = reactor +      # XXX make these properties public      @property -    def _fdoc(self): +    def fdoc(self):          """          An accessor to the flags document.          """ @@ -149,35 +156,43 @@ class LeapMessage(fields, MailParser, MBoxParser):              return fdoc      @property -    def _hdoc(self): +    def hdoc(self):          """          An accessor to the headers document.          """ -        if self._container is not None: +        container = self._container +        if container is not None:              hdoc = self._container.hdoc              if hdoc and not empty(hdoc.content):                  return hdoc -        # XXX cache this into the memory store !!! -        return self._get_headers_doc() +        hdoc = self._get_headers_doc() + +        if container and not empty(hdoc.content): +            # mem-cache it +            hdoc_content = hdoc.content +            chash = hdoc_content.get(fields.CONTENT_HASH_KEY) +            hdocs = {chash: hdoc_content} +            container.memstore.load_header_docs(hdocs) +        return hdoc      @property -    def _chash(self): +    def chash(self):          """          An accessor to the content hash for this message.          """ -        if not self._fdoc: +        if not self.fdoc:              return None -        if not self.__chash and self._fdoc: -            self.__chash = self._fdoc.content.get( +        if not self.__chash and self.fdoc: +            self.__chash = self.fdoc.content.get(                  fields.CONTENT_HASH_KEY, None)          return self.__chash      @property -    def _bdoc(self): +    def bdoc(self):          """          An accessor to the body document.          """ -        if not self._hdoc: +        if not self.hdoc:              return None          if not self.__bdoc:              self.__bdoc = self._get_body_doc() @@ -204,7 +219,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          uid = self._uid          flags = set([]) -        fdoc = self._fdoc +        fdoc = self.fdoc          if fdoc:              flags = set(fdoc.content.get(self.FLAGS_KEY, None)) @@ -230,20 +245,19 @@ class LeapMessage(fields, MailParser, MBoxParser):          :type mode: int          """          leap_assert(isinstance(flags, tuple), "flags need to be a tuple") -        log.msg('setting flags: %s (%s)' % (self._uid, flags)) - -        doc = self._fdoc -        if not doc: -            logger.warning( -                "Could not find FDOC for %s:%s while setting flags!" % -                (self._mbox, self._uid)) -            return +        mbox, uid = self._mbox, self._uid          APPEND = 1          REMOVE = -1          SET = 0 -        with self.flags_lock: +        with fdoc_locks[mbox][uid]: +            doc = self.fdoc +            if not doc: +                logger.warning( +                    "Could not find FDOC for %r:%s while setting flags!" % +                    (mbox, uid)) +                return              current = doc.content[self.FLAGS_KEY]              if mode == APPEND:                  newflags = tuple(set(tuple(current) + flags)) @@ -251,33 +265,31 @@ class LeapMessage(fields, MailParser, MBoxParser):                  newflags = tuple(set(current).difference(set(flags)))              elif mode == SET:                  newflags = flags +            new_fdoc = { +                self.FLAGS_KEY: newflags, +                self.SEEN_KEY: self.SEEN_FLAG in newflags, +                self.DEL_KEY: self.DELETED_FLAG in newflags} +            self._collection.memstore.update_flags(mbox, uid, new_fdoc) -            # We could defer this, but I think it's better -            # to put it under the lock... -            doc.content[self.FLAGS_KEY] = newflags -            doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags -            doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - -            if self._collection.memstore is not None: -                log.msg("putting message in collection") -                self._collection.memstore.put_message( -                    self._mbox, self._uid, -                    MessageWrapper(fdoc=doc.content, new=False, dirty=True, -                                   docs_id={'fdoc': doc.doc_id})) -            else: -                # fallback for non-memstore initializations. -                self._soledad.put_doc(doc)          return map(str, newflags)      def getInternalDate(self):          """          Retrieve the date internally associated with this message -        :rtype: C{str} +        According to the spec, this is NOT the date and time in the +        RFC-822 header, but rather a date and time that reflects when the +        message was received. + +        * In SMTP, date and time of final delivery. +        * In COPY, internal date/time of the source message. +        * In APPEND, date/time specified. +          :return: An RFC822-formatted date string. +        :rtype: str          """ -        date = self._hdoc.content.get(self.DATE_KEY, '') -        return str(date) +        date = self.hdoc.content.get(fields.DATE_KEY, '') +        return date      #      # IMessagePart @@ -302,8 +314,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          fd = StringIO.StringIO() -        if self._bdoc is not None: -            bdoc_content = self._bdoc.content +        if self.bdoc is not None: +            bdoc_content = self.bdoc.content              if empty(bdoc_content):                  logger.warning("No BDOC content found for message!!!")                  return write_fd("") @@ -311,7 +323,6 @@ class LeapMessage(fields, MailParser, MBoxParser):              body = bdoc_content.get(self.RAW_KEY, "")              content_type = bdoc_content.get('content-type', "")              charset = find_charset(content_type) -            logger.debug('got charset from content-type: %s' % charset)              if charset is None:                  charset = self._get_charset(body)              try: @@ -352,8 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          :rtype: int          """          size = None -        if self._fdoc: -            fdoc_content = self._fdoc.content +        if self.fdoc is not None: +            fdoc_content = self.fdoc.content              size = fdoc_content.get(self.SIZE_KEY, False)          else:              logger.warning("No FLAGS doc for %s:%s" % (self._mbox, @@ -422,8 +433,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          Return the headers dict for this message.          """ -        if self._hdoc is not None: -            hdoc_content = self._hdoc.content +        if self.hdoc is not None: +            hdoc_content = self.hdoc.content              headers = hdoc_content.get(self.HEADERS_KEY, {})              return headers @@ -437,8 +448,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          Return True if this message is multipart.          """ -        if self._fdoc: -            fdoc_content = self._fdoc.content +        if self.fdoc: +            fdoc_content = self.fdoc.content              is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)              return is_multipart          else: @@ -477,11 +488,11 @@ class LeapMessage(fields, MailParser, MBoxParser):          :raises: KeyError if key does not exist          :rtype: dict          """ -        if not self._hdoc: +        if not self.hdoc:              logger.warning("Tried to get part but no HDOC found!")              return None -        hdoc_content = self._hdoc.content +        hdoc_content = self.hdoc.content          pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})          # remember, lads, soledad is using strings in its keys, @@ -508,6 +519,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          finally:              return result +    # TODO move to soledadstore instead of accessing soledad directly      def _get_headers_doc(self):          """          Return the document that keeps the headers for this @@ -515,15 +527,16 @@ class LeapMessage(fields, MailParser, MBoxParser):          """          head_docs = self._soledad.get_from_index(              fields.TYPE_C_HASH_IDX, -            fields.TYPE_HEADERS_VAL, str(self._chash)) +            fields.TYPE_HEADERS_VAL, str(self.chash))          return first(head_docs) +    # TODO move to soledadstore instead of accessing soledad directly      def _get_body_doc(self):          """          Return the document that keeps the body for this          message.          """ -        hdoc_content = self._hdoc.content +        hdoc_content = self.hdoc.content          body_phash = hdoc_content.get(              fields.BODY_KEY, None)          if not body_phash: @@ -560,14 +573,14 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: The content value indexed by C{key} or None          :rtype: str          """ -        return self._fdoc.content.get(key, None) +        return self.fdoc.content.get(key, None)      def does_exist(self):          """          Return True if there is actually a flags document for this          UID and mbox.          """ -        return not empty(self._fdoc) +        return not empty(self.fdoc)  class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): @@ -672,8 +685,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      _rdoc_lock = threading.Lock()      _rdoc_property_lock = threading.Lock() -    _hdocset_lock = threading.Lock() -    _hdocset_property_lock = threading.Lock()      def __init__(self, mbox=None, soledad=None, memstore=None):          """ @@ -714,14 +725,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          self.memstore = memstore          self.__rflags = None -        self.__hdocset = None          self.initialize_db()          # ensure that we have a recent-flags and a hdocs-sec doc          self._get_or_create_rdoc() -        # Not for now... -        #self._get_or_create_hdocset() +        from twisted.internet import reactor +        self.reactor = reactor      def _get_empty_doc(self, _type=FLAGS_DOC):          """ @@ -746,33 +756,26 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                  rdoc[fields.MBOX_KEY] = self.mbox              self._soledad.create_doc(rdoc) -    def _get_or_create_hdocset(self): -        """ -        Try to retrieve the hdocs-set doc for this MessageCollection, -        and create one if not found. -        """ -        hdocset = self._get_hdocset_doc() -        if not hdocset: -            hdocset = self._get_empty_doc(self.HDOCS_SET_DOC) -            if self.mbox != fields.INBOX_VAL: -                hdocset[fields.MBOX_KEY] = self.mbox -            self._soledad.create_doc(hdocset) - +    @deferred_to_thread      def _do_parse(self, raw):          """          Parse raw message and return it along with          relevant information about its outer level. +        This is done in a separate thread, and the callback is passed +        to `_do_add_msg` method. +          :param raw: the raw message          :type raw: StringIO or basestring -        :return: msg, chash, size, multi +        :return: msg, parts, chash, size, multi          :rtype: tuple          """          msg = self._get_parsed_msg(raw)          chash = self._get_hash(msg)          size = len(msg.as_string())          multi = msg.is_multipart() -        return msg, chash, size, multi +        parts = walk.get_parts(msg) +        return msg, parts, chash, size, multi      def _populate_flags(self, flags, uid, chash, size, multi):          """ @@ -840,12 +843,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :return: False, if it does not exist, or UID.          """          exist = False -        if self.memstore is not None: -            exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) +        exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)          if not exist:              exist = self._get_fdoc_from_chash(chash) -        if exist: +        if exist and exist.content is not None:              return exist.content.get(fields.UID_KEY, "unknown-uid")          else:              return False @@ -874,24 +876,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                   uid when the adding succeed.          :rtype: deferred          """ -        logger.debug('adding message')          if flags is None:              flags = tuple()          leap_assert_type(flags, tuple) -        d = defer.Deferred() -        self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) -        return d +        observer = defer.Deferred() +        d = self._do_parse(raw) +        d.addCallback(lambda result: self.reactor.callInThread( +            self._do_add_msg, result, flags, subject, date, +            notify_on_disk, observer)) +        return observer -    # We SHOULD defer this (or the heavy load here) to the thread pool, -    # but it gives troubles with the QSocketNotifier used by Qt... -    def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): +    # Called in thread +    def _do_add_msg(self, parse_result, flags, subject, +                    date, notify_on_disk, observer):          """          Helper that creates a new message document.          Here lives the magic of the leap mail. Well, in soledad, really.          See `add_msg` docstring for parameter info. +        :param parse_result: a tuple with the results of `self._do_parse` +        :type parse_result: tuple          :param observer: a deferred that will be fired with the message                           uid when the adding succeed.          :type observer: deferred @@ -902,35 +908,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # TODO add the linked-from info !          # TODO add reference to the original message -        # parse -        msg, chash, size, multi = self._do_parse(raw) +        msg, parts, chash, size, multi = parse_result          # check for uniqueness -------------------------------- -        # XXX profiler says that this test is costly. -        # So we probably should just do an in-memory check and -        # move the complete check to the soledad writer?          # Watch out! We're reserving a UID right after this!          existing_uid = self._fdoc_already_exists(chash)          if existing_uid: -            logger.warning("We already have that message in this " -                           "mailbox, unflagging as deleted")              uid = existing_uid              msg = self.get_msg_by_uid(uid) -            msg.setFlags((fields.DELETED_FLAG,), -1) -            # XXX if this is deferred to thread again we should not use -            # the callback in the deferred thread, but return and -            # call the callback from the caller fun... -            observer.callback(uid) +            # We can say the observer that we're done +            self.reactor.callFromThread(observer.callback, uid) +            msg.setFlags((fields.DELETED_FLAG,), -1)              return          uid = self.memstore.increment_last_soledad_uid(self.mbox) -        logger.info("ADDING MSG WITH UID: %s" % uid) + +        # We can say the observer that we're done at this point, but +        # before that we should make sure it has no serious consequences +        # if we're issued, for instance, a fetch command right after... +        #self.reactor.callFromThread(observer.callback, uid) +        # if we did the notify, we need to invalidate the deferred +        # so not to try to fire it twice. +        #observer = None          fd = self._populate_flags(flags, uid, chash, size, multi)          hd = self._populate_headr(msg, chash, subject, date) -        parts = walk.get_parts(msg)          body_phash_fun = [walk.get_body_phash_simple,                            walk.get_body_phash_multi][int(multi)]          body_phash = body_phash_fun(walk.get_payloads(msg)) @@ -949,9 +953,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          self.set_recent_flag(uid)          msg_container = MessageWrapper(fd, hd, cdocs) -        self.memstore.create_message(self.mbox, uid, msg_container, -                                     observer=observer, -                                     notify_on_disk=notify_on_disk) +        self.memstore.create_message( +            self.mbox, uid, msg_container, +            observer=observer, notify_on_disk=notify_on_disk)      #      # getters: specific queries @@ -982,14 +986,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                          {'doc_id': rdoc.doc_id, 'set': rflags})              return rflags -        #else: -            # fallback for cases without memory store -            #with self._rdoc_lock: -                #rdoc = self._get_recent_doc() -                #self.__rflags = set(rdoc.content.get( -                    #fields.RECENTFLAGS_KEY, [])) -            #return self.__rflags -      def _set_recent_flags(self, value):          """          Setter for the recent-flags set for this mailbox. @@ -997,16 +993,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          if self.memstore is not None:              self.memstore.set_recent_flags(self.mbox, value) -        #else: -            # fallback for cases without memory store -            #with self._rdoc_lock: -                #rdoc = self._get_recent_doc() -                #newv = set(value) -                #self.__rflags = newv -                #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) -                # XXX should deferLater 0 it? -                #self._soledad.put_doc(rdoc) -      recent_flags = property(          _get_recent_flags, _set_recent_flags,          doc="Set of UIDs with the recent flag for this mailbox.") @@ -1121,6 +1107,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # XXX is this working?          return self._get_uid_from_msgidCb(msgid) +    @deferred_to_thread      def set_flags(self, mbox, messages, flags, mode, observer):          """          Set flags for a sequence of messages. @@ -1138,28 +1125,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                           done.          :type observer: deferred          """ -        # XXX we could defer *this* to thread pool, and gather results... -        # XXX use deferredList +        reactor = self.reactor +        getmsg = self.get_msg_by_uid -        deferreds = [] -        for msg_id in messages: -            deferreds.append( -                self._set_flag_for_uid(msg_id, flags, mode)) +        def set_flags(uid, flags, mode): +            msg = getmsg(uid, mem_only=True, flags_only=True) +            if msg is not None: +                return uid, msg.setFlags(flags, mode) -        def notify(result): -            observer.callback(dict(result)) -        d1 = defer.gatherResults(deferreds, consumeErrors=True) -        d1.addCallback(notify) +        setted_flags = [set_flags(uid, flags, mode) for uid in messages] +        result = dict(filter(None, setted_flags)) -    @deferred_to_thread -    def _set_flag_for_uid(self, msg_id, flags, mode): -        """ -        Run the set_flag operation in the thread pool. -        """ -        log.msg("MSG ID = %s" % msg_id) -        msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) -        if msg is not None: -            return msg_id, msg.setFlags(flags, mode) +        reactor.callFromThread(observer.callback, result)      # getters: generic for a mailbox @@ -1182,7 +1159,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                   or None if not found.          :rtype: LeapMessage          """ -        msg_container = self.memstore.get_message(self.mbox, uid, flags_only) +        msg_container = self.memstore.get_message( +            self.mbox, uid, flags_only=flags_only) +          if msg_container is not None:              if mem_only:                  msg = LeapMessage(None, uid, self.mbox, collection=self, @@ -1195,6 +1174,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                                    collection=self, container=msg_container)          else:              msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) +          if not msg.does_exist():              return None          return msg @@ -1234,67 +1214,51 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          db_uids = set([doc.content[self.UID_KEY] for doc in                         self._soledad.get_from_index(                             fields.TYPE_MBOX_IDX, -                           fields.TYPE_FLAGS_VAL, self.mbox)]) +                           fields.TYPE_FLAGS_VAL, self.mbox) +                       if not empty(doc)])          return db_uids      def all_uid_iter(self):          """          Return an iterator through the UIDs of all messages, from memory.          """ -        if self.memstore is not None: -            mem_uids = self.memstore.get_uids(self.mbox) -            soledad_known_uids = self.memstore.get_soledad_known_uids( -                self.mbox) -            combined = tuple(set(mem_uids).union(soledad_known_uids)) -            return combined +        mem_uids = self.memstore.get_uids(self.mbox) +        soledad_known_uids = self.memstore.get_soledad_known_uids( +            self.mbox) +        combined = tuple(set(mem_uids).union(soledad_known_uids)) +        return combined -    # XXX MOVE to memstore -    def all_flags(self): +    def get_all_soledad_flag_docs(self):          """ -        Return a dict with all flags documents for this mailbox. -        """ -        # XXX get all from memstore and cache it there -        # FIXME should get all uids, get them fro memstore, -        # and get only the missing ones from disk. - -        all_flags = dict((( -            doc.content[self.UID_KEY], -            doc.content[self.FLAGS_KEY]) for doc in -            self._soledad.get_from_index( -                fields.TYPE_MBOX_IDX, -                fields.TYPE_FLAGS_VAL, self.mbox))) -        if self.memstore is not None: -            uids = self.memstore.get_uids(self.mbox) -            docs = ((uid, self.memstore.get_message(self.mbox, uid)) -                    for uid in uids) -            for uid, doc in docs: -                all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY] +        Return a dict with the content of all the flag documents +        in soledad store for the given mbox. -        return all_flags - -    def all_flags_chash(self): -        """ -        Return a dict with the content-hash for all flag documents -        for this mailbox. +        :param mbox: the mailbox +        :type mbox: str or unicode +        :rtype: dict          """ -        all_flags_chash = dict((( +        # XXX we really could return a reduced version with +        # just {'uid': (flags-tuple,) since the prefetch is +        # only oriented to get the flag tuples. +        all_docs = [(              doc.content[self.UID_KEY], -            doc.content[self.CONTENT_HASH_KEY]) for doc in +            dict(doc.content)) +            for doc in              self._soledad.get_from_index(                  fields.TYPE_MBOX_IDX, -                fields.TYPE_FLAGS_VAL, self.mbox))) -        return all_flags_chash +                fields.TYPE_FLAGS_VAL, self.mbox) +            if not empty(doc.content)] +        all_flags = dict(all_docs) +        return all_flags      def all_headers(self):          """ -        Return a dict with all the headers documents for this +        Return a dict with all the header documents for this          mailbox. + +        :rtype: dict          """ -        all_headers = dict((( -            doc.content[self.CONTENT_HASH_KEY], -            doc.content[self.HEADERS_KEY]) for doc in -            self._soledad.get_docs(self._hdocset))) -        return all_headers +        return self.memstore.all_headers(self.mbox)      def count(self):          """ @@ -1302,13 +1266,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: int          """ -        # XXX We should cache this in memstore too until next write... -        count = self._soledad.get_count_from_index( -            fields.TYPE_MBOX_IDX, -            fields.TYPE_FLAGS_VAL, self.mbox) -        if self.memstore is not None: -            count += self.memstore.count_new() -        return count +        return self.memstore.count(self.mbox)      # unseen messages @@ -1320,10 +1278,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :return: iterator through unseen message doc UIDs          :rtype: iterable          """ -        return (doc.content[self.UID_KEY] for doc in -                self._soledad.get_from_index( -                    fields.TYPE_MBOX_SEEN_IDX, -                    fields.TYPE_FLAGS_VAL, self.mbox, '0')) +        return self.memstore.unseen_iter(self.mbox)      def count_unseen(self):          """ @@ -1332,10 +1287,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :returns: count          :rtype: int          """ -        count = self._soledad.get_count_from_index( -            fields.TYPE_MBOX_SEEN_IDX, -            fields.TYPE_FLAGS_VAL, self.mbox, '0') -        return count +        return len(list(self.unseen_iter()))      def get_unseen(self):          """ diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index ba63846..5da9bfd 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -20,9 +20,7 @@ Leap IMAP4 Server Implementation.  from copy import copy  from twisted import cred -from twisted.internet import defer  from twisted.internet.defer import maybeDeferred -from twisted.internet.task import deferLater  from twisted.mail import imap4  from twisted.python import log @@ -41,6 +39,7 @@ class LeapIMAPServer(imap4.IMAP4Server):          soledad = kwargs.pop('soledad', None)          uuid = kwargs.pop('uuid', None)          userid = kwargs.pop('userid', None) +          leap_assert(soledad, "need a soledad instance")          leap_assert_type(soledad, Soledad)          leap_assert(uuid, "need a user in the initialization") @@ -55,6 +54,9 @@ class LeapIMAPServer(imap4.IMAP4Server):          # populate the test account properly (and only once          # per session) +        from twisted.internet import reactor +        self.reactor = reactor +      def lineReceived(self, line):          """          Attempt to parse a single line from the server. @@ -114,6 +116,7 @@ class LeapIMAPServer(imap4.IMAP4Server):              ).addCallback(                  cbFetch, tag, query, uid              ).addErrback(ebFetch, tag) +          elif len(query) == 1 and str(query[0]) == "rfc822.header":              self._oldTimeout = self.setTimeout(None)              # no need to call iter, we get a generator @@ -130,48 +133,16 @@ class LeapIMAPServer(imap4.IMAP4Server):              ).addCallback(                  cbFetch, tag, query, uid              ).addErrback( -                ebFetch, tag -            ).addCallback( -                self.on_fetch_finished, messages) +                ebFetch, tag)      select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset,                      imap4.IMAP4Server.arg_fetchatt) -    def on_fetch_finished(self, _, messages): -        from twisted.internet import reactor - -        print "FETCH FINISHED -- NOTIFY NEW" -        deferLater(reactor, 0, self.notifyNew) -        deferLater(reactor, 0, self.mbox.unset_recent_flags, messages) -        deferLater(reactor, 0, self.mbox.signal_unread_to_ui) - -    def on_copy_finished(self, defers): -        d = defer.gatherResults(filter(None, defers)) - -        def when_finished(result): -            log.msg("COPY FINISHED") -            self.notifyNew() -            self.mbox.signal_unread_to_ui() -        d.addCallback(when_finished) -        #d.addCallback(self.notifyNew) -        #d.addCallback(self.mbox.signal_unread_to_ui) - -    def do_COPY(self, tag, messages, mailbox, uid=0): -        from twisted.internet import reactor -        defers = [] -        d = imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) -        defers.append(d) -        deferLater(reactor, 0, self.on_copy_finished, defers) - -    select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, -                   imap4.IMAP4Server.arg_astring) -      def notifyNew(self, ignored=None):          """          Notify new messages to listeners.          """ -        print "TRYING TO NOTIFY NEW" -        self.mbox.notify_new() +        self.reactor.callFromThread(self.mbox.notify_new)      def _cbSelectWork(self, mbox, cmdName, tag):          """ diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index 93df51d..1175cdc 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -25,6 +25,7 @@ from twisted.internet import defer, threads  from twisted.internet.protocol import ServerFactory  from twisted.internet.error import CannotListenError  from twisted.mail import imap4 +from twisted.python import log  logger = logging.getLogger(__name__) @@ -71,6 +72,15 @@ DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None)  if DO_MANHOLE:      from leap.mail.imap.service import manhole +DO_PROFILE = os.environ.get("LEAP_PROFILE", None) +if DO_PROFILE: +    import cProfile +    log.msg("Starting PROFILING...") + +    PROFILE_DAT = "/tmp/leap_mail_profile.pstats" +    pr = cProfile.Profile() +    pr.enable() +  class IMAPAuthRealm(object):      """ @@ -115,7 +125,12 @@ class LeapIMAPFactory(ServerFactory):          # XXX how to pass the store along?      def buildProtocol(self, addr): -        "Return a protocol suitable for the job." +        """ +        Return a protocol suitable for the job. + +        :param addr: remote ip address +        :type addr:  str +        """          imapProtocol = LeapIMAPServer(              uuid=self._uuid,              userid=self._userid, @@ -124,7 +139,7 @@ class LeapIMAPFactory(ServerFactory):          imapProtocol.factory = self          return imapProtocol -    def doStop(self, cv): +    def doStop(self, cv=None):          """          Stops imap service (fetcher, factory and port). @@ -135,23 +150,30 @@ class LeapIMAPFactory(ServerFactory):                   disk in another thread.          :rtype: Deferred          """ -        ServerFactory.doStop(self) +        if DO_PROFILE: +            log.msg("Stopping PROFILING") +            pr.disable() +            pr.dump_stats(PROFILE_DAT) -        def _stop_imap_cb(): -            logger.debug('Stopping in memory store.') -            self._memstore.stop_and_flush() -            while not self._memstore.producer.is_queue_empty(): -                logger.debug('Waiting for queue to be empty.') -                # TODO use a gatherResults over the new/dirty deferred list, -                # as in memorystore's expunge() method. -                time.sleep(1) -            # notify that service has stopped -            logger.debug('Notifying that service has stopped.') -            cv.acquire() -            cv.notify() -            cv.release() +        ServerFactory.doStop(self) -        return threads.deferToThread(_stop_imap_cb) +        if cv is not None: +            def _stop_imap_cb(): +                logger.debug('Stopping in memory store.') +                self._memstore.stop_and_flush() +                while not self._memstore.producer.is_queue_empty(): +                    logger.debug('Waiting for queue to be empty.') +                    # TODO use a gatherResults over the new/dirty +                    # deferred list, +                    # as in memorystore's expunge() method. +                    time.sleep(1) +                # notify that service has stopped +                logger.debug('Notifying that service has stopped.') +                cv.acquire() +                cv.notify() +                cv.release() + +            return threads.deferToThread(_stop_imap_cb)  def run_service(*args, **kwargs): @@ -164,6 +186,9 @@ def run_service(*args, **kwargs):                the protocol.      """      from twisted.internet import reactor +    # it looks like qtreactor does not honor this, +    # but other reactors should. +    reactor.suggestThreadPoolSize(20)      leap_assert(len(args) == 2)      soledad, keymanager = args diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 8e22f26..732fe03 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -20,14 +20,14 @@ A MessageStore that writes to Soledad.  import logging  import threading +from collections import defaultdict  from itertools import chain  from u1db import errors as u1db_errors -from twisted.internet import defer  from twisted.python import log  from zope.interface import implements -from leap.common.check import leap_assert_type +from leap.common.check import leap_assert_type, leap_assert  from leap.mail.decorators import deferred_to_thread  from leap.mail.imap.messageparts import MessagePartType  from leap.mail.imap.messageparts import MessageWrapper @@ -35,15 +35,13 @@ from leap.mail.imap.messageparts import RecentFlagsDoc  from leap.mail.imap.fields import fields  from leap.mail.imap.interfaces import IMessageStore  from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first +from leap.mail.utils import first, empty, accumulator_queue  logger = logging.getLogger(__name__)  # TODO -# [ ] Delete original message from the incoming queue after all successful -#     writes. -# [ ] Implement a retry queue. +# [ ] Implement a retry queue?  # [ ] Consider journaling of operations. @@ -86,10 +84,12 @@ class ContentDedup(object):          if not header_docs:              return False -        if len(header_docs) != 1: -            logger.warning("Found more than one copy of chash %s!" -                           % (chash,)) -        logger.debug("Found header doc with that hash! Skipping save!") +        # FIXME enable only to debug this problem. +        #if len(header_docs) != 1: +            #logger.warning("Found more than one copy of chash %s!" +                           #% (chash,)) + +        #logger.debug("Found header doc with that hash! Skipping save!")          return True      def _content_does_exist(self, doc): @@ -110,10 +110,11 @@ class ContentDedup(object):          if not attach_docs:              return False -        if len(attach_docs) != 1: -            logger.warning("Found more than one copy of phash %s!" -                           % (phash,)) -        logger.debug("Found attachment doc with that hash! Skipping save!") +        # FIXME enable only to debug this problem +        #if len(attach_docs) != 1: +            #logger.warning("Found more than one copy of phash %s!" +                           #% (phash,)) +        #logger.debug("Found attachment doc with that hash! Skipping save!")          return True @@ -121,13 +122,26 @@ class MsgWriteError(Exception):      """      Raised if any exception is found while saving message parts.      """ +    pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock())  class SoledadStore(ContentDedup):      """      This will create docs in the local Soledad database.      """ -    _last_uid_lock = threading.Lock() +    _soledad_rw_lock = threading.Lock() +    _remove_lock = threading.Lock() +    _mbox_doc_locks = defaultdict(lambda: threading.Lock())      implements(IMessageConsumer, IMessageStore) @@ -138,8 +152,20 @@ class SoledadStore(ContentDedup):          :param soledad: the soledad instance          :type soledad: Soledad          """ +        from twisted.internet import reactor +        self.reactor = reactor +          self._soledad = soledad +        self._CREATE_DOC_FUN = self._soledad.create_doc +        self._PUT_DOC_FUN = self._soledad.put_doc +        self._GET_DOC_FUN = self._soledad.get_doc + +        # we instantiate an accumulator to batch the notifications +        self.docs_notify_queue = accumulator_queue( +            lambda item: reactor.callFromThread(self._unset_new_dirty, item), +            20) +      # IMessageStore      # ------------------------------------------------------------------- @@ -194,47 +220,32 @@ class SoledadStore(ContentDedup):      # IMessageConsumer -    # It's not thread-safe to defer this to a different thread +    # TODO should handle the delete case +    # TODO should handle errors better +    # TODO could generalize this method into a generic consumer +    # and only implement `process` here      def consume(self, queue):          """          Creates a new document in soledad db. -        :param queue: queue to get item from, with content of the document -                      to be inserted. -        :type queue: Queue -        """ -        # TODO should delete the original message from incoming only after -        # the writes are done. -        # TODO should handle the delete case -        # TODO should handle errors -        # TODO could generalize this method into a generic consumer -        # and only implement `process` here - -        def docWriteCallBack(doc_wrapper): -            """ -            Callback for a successful write of a document wrapper. -            """ -            if isinstance(doc_wrapper, MessageWrapper): -                # If everything went well, we can unset the new flag -                # in the source store (memory store) -                self._unset_new_dirty(doc_wrapper) - -        def docWriteErrorBack(failure): -            """ -            Errorback for write operations. -            """ -            log.error("Error while processing item.") -            log.msg(failure.getTraceBack()) - -        while not queue.empty(): -            doc_wrapper = queue.get() -            d = defer.Deferred() -            d.addCallbacks(docWriteCallBack, docWriteErrorBack) - -            self._consume_doc(doc_wrapper, d) +        :param queue: a tuple of queues to get item from, with content of the +                      document to be inserted. +        :type queue: tuple of Queues +        """ +        new, dirty = queue +        while not new.empty(): +            doc_wrapper = new.get() +            self.reactor.callInThread(self._consume_doc, doc_wrapper, +                                      self.docs_notify_queue) +        while not dirty.empty(): +            doc_wrapper = dirty.get() +            self.reactor.callInThread(self._consume_doc, doc_wrapper, +                                      self.docs_notify_queue) + +        # Queue empty, flush the notifications queue. +        self.docs_notify_queue(None, flush=True) -    @deferred_to_thread      def _unset_new_dirty(self, doc_wrapper):          """          Unset the `new` and `dirty` flags for this document wrapper in the @@ -243,56 +254,76 @@ class SoledadStore(ContentDedup):          :param doc_wrapper: a MessageWrapper instance          :type doc_wrapper: MessageWrapper          """ -        # XXX debug msg id/mbox? -        logger.info("unsetting new flag!") -        doc_wrapper.new = False -        doc_wrapper.dirty = False +        if isinstance(doc_wrapper, MessageWrapper): +            # XXX still needed for debug quite often +            #logger.info("unsetting new flag!") +            doc_wrapper.new = False +            doc_wrapper.dirty = False      @deferred_to_thread -    def _consume_doc(self, doc_wrapper, deferred): +    def _consume_doc(self, doc_wrapper, notify_queue):          """          Consume each document wrapper in a separate thread. +        We pass an instance of an accumulator that handles the notifications +        to the memorystore when the write has been done.          :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance          :type doc_wrapper: MessageWrapper or RecentFlagsDoc -        :param deferred: a deferred that will be fired when the write operation -                         has finished, either calling its callback or its -                         errback depending on whether it succeed. -        :type deferred: Deferred +        :param notify_queue: a callable that handles the writeback +                             notifications to the memstore. +        :type notify_queue: callable          """ -        items = self._process(doc_wrapper) +        def queueNotifyBack(failed, doc_wrapper): +            if failed: +                log.msg("There was an error writing the mesage...") +            else: +                notify_queue(doc_wrapper) + +        def doSoledadCalls(items): +            # we prime the generator, that should return the +            # message or flags wrapper item in the first place. +            doc_wrapper = items.next() +            failed = self._soledad_write_document_parts(items) +            queueNotifyBack(failed, doc_wrapper) -        # we prime the generator, that should return the -        # message or flags wrapper item in the first place. -        doc_wrapper = items.next() +        doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) -        # From here, we unpack the subpart items and -        # the right soledad call. +    # +    # SoledadStore specific methods. +    # + +    def _soledad_write_document_parts(self, items): +        """ +        Write the document parts to soledad in a separate thread. + +        :param items: the iterator through the different document wrappers +                      payloads. +        :type items: iterator +        :return: whether the write was successful or not +        :rtype: bool +        """          failed = False          for item, call in items: +            if empty(item): +                continue              try:                  self._try_call(call, item)              except Exception as exc: -                failed = exc +                logger.debug("ITEM WAS: %s" % str(item)) +                logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) +                logger.exception(exc) +                failed = True                  continue -        if failed: -            deferred.errback(MsgWriteError( -                "There was an error writing the mesage")) -        else: -            deferred.callback(doc_wrapper) - -    # -    # SoledadStore specific methods. -    # +        return failed -    def _process(self, doc_wrapper): +    def _iter_wrapper_subparts(self, doc_wrapper):          """          Return an iterator that will yield the doc_wrapper in the first place,          followed by the subparts item and the proper call type for every          item in the queue, if any. -        :param queue: the queue from where we'll pick item. -        :type queue: Queue +        :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance +        :type doc_wrapper: MessageWrapper or RecentFlagsDoc          """          if isinstance(doc_wrapper, MessageWrapper):              return chain((doc_wrapper,), @@ -315,11 +346,38 @@ class SoledadStore(ContentDedup):          """          if call is None:              return -        try: -            call(item) -        except u1db_errors.RevisionConflict as exc: -            logger.exception("Error: %r" % (exc,)) -            raise exc + +        if call == self._PUT_DOC_FUN: +            doc_id = item.doc_id +            with put_locks[doc_id]: +                doc = self._GET_DOC_FUN(doc_id) + +                if doc is None: +                    logger.warning("BUG! Dirty doc but could not " +                                   "find document %s" % (doc_id,)) +                    return + +                doc.content = dict(item.content) + +                item = doc +                try: +                    call(item) +                except u1db_errors.RevisionConflict as exc: +                    logger.exception("Error: %r" % (exc,)) +                    raise exc +                except Exception as exc: +                    logger.exception("Error: %r" % (exc,)) +                    raise exc + +        else: +            try: +                call(item) +            except u1db_errors.RevisionConflict as exc: +                logger.exception("Error: %r" % (exc,)) +                raise exc +            except Exception as exc: +                logger.exception("Error: %r" % (exc,)) +                raise exc      def _get_calls_for_msg_parts(self, msg_wrapper):          """ @@ -334,7 +392,7 @@ class SoledadStore(ContentDedup):          call = None          if msg_wrapper.new: -            call = self._soledad.create_doc +            call = self._CREATE_DOC_FUN              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk(): @@ -353,18 +411,18 @@ class SoledadStore(ContentDedup):          # the flags doc.          elif msg_wrapper.dirty: -            call = self._soledad.put_doc +            call = self._PUT_DOC_FUN              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk():                  # XXX FIXME Give error if dirty and not doc_id !!!                  doc_id = item.doc_id  # defend!                  if not doc_id: +                    logger.warning("Dirty item but no doc_id!")                      continue -                doc = self._soledad.get_doc(doc_id) -                doc.content = dict(item.content) +                  if item.part == MessagePartType.fdoc: -                    logger.debug("PUT dirty fdoc") -                    yield doc, call +                    #logger.debug("PUT dirty fdoc") +                    yield item, call                  # XXX also for linkage-doc !!!          else: @@ -379,17 +437,16 @@ class SoledadStore(ContentDedup):          :return: a tuple with recent-flags doc payload and callable          :rtype: tuple          """ -        call = self._soledad.put_doc -        rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) +        call = self._CREATE_DOC_FUN          payload = rflags_wrapper.content -        logger.debug("Saving RFLAGS to Soledad...") -          if payload: -            rdoc.content = payload -            yield rdoc, call +            logger.debug("Saving RFLAGS to Soledad...") +            yield payload, call -    def _get_mbox_document(self, mbox): +    # Mbox documents and attributes + +    def get_mbox_document(self, mbox):          """          Return mailbox document. @@ -399,15 +456,80 @@ class SoledadStore(ContentDedup):                   the query failed.          :rtype: SoledadDocument or None.          """ +        with self._mbox_doc_locks[mbox]: +            return self._get_mbox_document(mbox) + +    def _get_mbox_document(self, mbox): +        """ +        Helper for returning the mailbox document. +        """          try:              query = self._soledad.get_from_index(                  fields.TYPE_MBOX_IDX,                  fields.TYPE_MBOX_VAL, mbox)              if query:                  return query.pop() +            else: +                logger.error("Could not find mbox document for %r" % +                            (self.mbox,))          except Exception as exc:              logger.exception("Unhandled error %r" % exc) +    def get_mbox_closed(self, mbox): +        """ +        Return the closed attribute for a given mailbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :rtype: bool +        """ +        mbox_doc = self.get_mbox_document() +        return mbox_doc.content.get(fields.CLOSED_KEY, False) + +    def set_mbox_closed(self, mbox, closed): +        """ +        Set the closed attribute for a given mailbox. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :param closed:  the value to be set +        :type closed: bool +        """ +        leap_assert(isinstance(closed, bool), "closed needs to be boolean") +        with self._mbox_doc_locks[mbox]: +            mbox_doc = self._get_mbox_document(mbox) +            if mbox_doc is None: +                logger.error( +                    "Could not find mbox document for %r" % (mbox,)) +                return +            mbox_doc.content[fields.CLOSED_KEY] = closed +            self._soledad.put_doc(mbox_doc) + +    def write_last_uid(self, mbox, value): +        """ +        Write the `last_uid` integer to the proper mailbox document +        in Soledad. +        This is called from the deferred triggered by +        memorystore.increment_last_soledad_uid, which is expected to +        run in a separate thread. + +        :param mbox: the mailbox +        :type mbox: str or unicode +        :param value: the value to set +        :type value: int +        """ +        leap_assert_type(value, int) +        key = fields.LAST_UID_KEY + +        # XXX change for a lock related to the mbox document +        # itself. +        with self._mbox_doc_locks[mbox]: +            mbox_doc = self._get_mbox_document(mbox) +            old_val = mbox_doc.content[key] +            if value > old_val: +                mbox_doc.content[key] = value +                self._soledad.put_doc(mbox_doc) +      def get_flags_doc(self, mbox, uid):          """          Return the SoledadDocument for the given mbox and uid. @@ -416,12 +538,16 @@ class SoledadStore(ContentDedup):          :type mbox: str or unicode          :param uid: the UID for the message          :type uid: int +        :rtype: SoledadDocument or None          """          result = None          try:              flag_docs = self._soledad.get_from_index(                  fields.TYPE_MBOX_UID_IDX,                  fields.TYPE_FLAGS_VAL, mbox, str(uid)) +            if len(flag_docs) != 1: +                logger.warning("More than one flag doc for %r:%s" % +                               (mbox, uid))              result = first(flag_docs)          except Exception as exc:              # ugh! Something's broken down there! @@ -430,36 +556,25 @@ class SoledadStore(ContentDedup):          finally:              return result -    def write_last_uid(self, mbox, value): +    def get_headers_doc(self, chash):          """ -        Write the `last_uid` integer to the proper mailbox document -        in Soledad. -        This is called from the deferred triggered by -        memorystore.increment_last_soledad_uid, which is expected to -        run in a separate thread. +        Return the document that keeps the headers for a message +        indexed by its content-hash. -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param value: the value to set -        :type value: int +        :param chash: the content-hash to retrieve the document from. +        :type chash: str or unicode +        :rtype: SoledadDocument or None          """ -        leap_assert_type(value, int) -        key = fields.LAST_UID_KEY - -        with self._last_uid_lock: -            mbox_doc = self._get_mbox_document(mbox) -            old_val = mbox_doc.content[key] -            if value < old_val: -                logger.error("%r:%s Tried to write a UID lesser than what's " -                             "stored!" % (mbox, value)) -            mbox_doc.content[key] = value -            self._soledad.put_doc(mbox_doc) +        head_docs = self._soledad.get_from_index( +            fields.TYPE_C_HASH_IDX, +            fields.TYPE_HEADERS_VAL, str(chash)) +        return first(head_docs)      # deleted messages      def deleted_iter(self, mbox):          """ -        Get an iterator for the SoledadDocuments for messages +        Get an iterator for the the doc_id for SoledadDocuments for messages          with \\Deleted flag for a given mailbox.          :param mbox: the mailbox @@ -467,11 +582,10 @@ class SoledadStore(ContentDedup):          :return: iterator through deleted message docs          :rtype: iterable          """ -        return (doc for doc in self._soledad.get_from_index( +        return [doc.doc_id for doc in self._soledad.get_from_index(                  fields.TYPE_MBOX_DEL_IDX, -                fields.TYPE_FLAGS_VAL, mbox, '1')) +                fields.TYPE_FLAGS_VAL, mbox, '1')] -    # TODO can deferToThread this?      def remove_all_deleted(self, mbox):          """          Remove from Soledad all messages flagged as deleted for a given @@ -481,7 +595,14 @@ class SoledadStore(ContentDedup):          :type mbox: str or unicode          """          deleted = [] -        for doc in self.deleted_iter(mbox): -            deleted.append(doc.content[fields.UID_KEY]) -            self._soledad.delete_doc(doc) +        for doc_id in self.deleted_iter(mbox): +            with self._remove_lock: +                doc = self._soledad.get_doc(doc_id) +                if doc is not None: +                    self._soledad.delete_doc(doc) +                    try: +                        deleted.append(doc.content[fields.UID_KEY]) +                    except TypeError: +                        # empty content +                        pass          return deleted diff --git a/src/leap/mail/imap/tests/regressions b/src/leap/mail/imap/tests/regressions index 0a43398..efe3f46 100755 --- a/src/leap/mail/imap/tests/regressions +++ b/src/leap/mail/imap/tests/regressions @@ -101,7 +101,6 @@ def compare_msg_parts(a, b):          pprint(b[index])          print -      return all_match @@ -328,7 +327,7 @@ def cbAppendNextMessage(proto):      return proto.append(          REGRESSIONS_FOLDER, msg      ).addCallback( -        lambda r: proto.examine(REGRESSIONS_FOLDER) +        lambda r: proto.select(REGRESSIONS_FOLDER)      ).addCallback(          cbAppend, proto, raw      ).addErrback( @@ -379,6 +378,9 @@ def cbCompareMessage(result, proto, raw):      if result:          keys = result.keys()          keys.sort() +    else: +        print "[-] GOT NO RESULT" +        return proto.logout()      latest = max(keys) diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py index 8c1cf20..fd88440 100644 --- a/src/leap/mail/imap/tests/test_imap.py +++ b/src/leap/mail/imap/tests/test_imap.py @@ -43,6 +43,7 @@ from itertools import chain  from mock import Mock  from nose.twistedtools import deferred, stop_reactor +from unittest import skip  from twisted.mail import imap4 @@ -64,11 +65,16 @@ import twisted.cred.portal  from leap.common.testing.basetest import BaseLeapTest  from leap.mail.imap.account import SoledadBackedAccount  from leap.mail.imap.mailbox import SoledadMailbox +from leap.mail.imap.memorystore import MemoryStore  from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.server import LeapIMAPServer  from leap.soledad.client import Soledad  from leap.soledad.client import SoledadCrypto +TEST_USER = "testuser@leap.se" +TEST_PASSWD = "1234" +  def strip(f):      return lambda result, f=f: f() @@ -89,10 +95,10 @@ def initialize_soledad(email, gnupg_home, tempdir):      """      Initializes soledad by hand -    @param email: ID for the user -    @param gnupg_home: path to home used by gnupg -    @param tempdir: path to temporal dir -    @rtype: Soledad instance +    :param email: ID for the user +    :param gnupg_home: path to home used by gnupg +    :param tempdir: path to temporal dir +    :rtype: Soledad instance      """      uuid = "foobar-uuid" @@ -125,55 +131,6 @@ def initialize_soledad(email, gnupg_home, tempdir):      return _soledad -# -# Simple LEAP IMAP4 Server for testing -# - -class SimpleLEAPServer(imap4.IMAP4Server): - -    """ -    A Simple IMAP4 Server with mailboxes backed by Soledad. - -    This should be pretty close to the real LeapIMAP4Server that we -    will be instantiating as a service, minus the authentication bits. -    """ - -    def __init__(self, *args, **kw): - -        soledad = kw.pop('soledad', None) - -        imap4.IMAP4Server.__init__(self, *args, **kw) -        realm = TestRealm() - -        # XXX Why I AM PASSING THE ACCOUNT TO -        # REALM? I AM NOT USING  THAT NOW, AM I??? -        realm.theAccount = SoledadBackedAccount( -            'testuser', -            soledad=soledad) - -        portal = cred.portal.Portal(realm) -        c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() -        self.checker = c -        self.portal = portal -        portal.registerChecker(c) -        self.timeoutTest = False - -    def lineReceived(self, line): -        if self.timeoutTest: -            # Do not send a respones -            return - -        imap4.IMAP4Server.lineReceived(self, line) - -    _username = 'testuser' -    _password = 'password-test' - -    def authenticateLogin(self, username, password): -        if username == self._username and password == self._password: -            return imap4.IAccount, self.theAccount, lambda: None -        raise cred.error.UnauthorizedLogin() - -  class TestRealm:      """ @@ -255,13 +212,6 @@ class IMAP4HelperMixin(BaseLeapTest):          # Soledad: config info          cls.gnupg_home = "%s/gnupg" % cls.tempdir          cls.email = 'leap@leap.se' -        # cls.db1_file = "%s/db1.u1db" % cls.tempdir -        # cls.db2_file = "%s/db2.u1db" % cls.tempdir -        # open test dbs -        # cls._db1 = u1db.open(cls.db1_file, create=True, -                              # document_factory=SoledadDocument) -        # cls._db2 = u1db.open(cls.db2_file, create=True, -                              # document_factory=SoledadDocument)          # initialize soledad by hand so we can control keys          cls._soledad = initialize_soledad( @@ -283,8 +233,6 @@ class IMAP4HelperMixin(BaseLeapTest):          Restores the old path and home environment variables.          Removes the temporal dir created for tests.          """ -        # cls._db1.close() -        # cls._db2.close()          cls._soledad.close()          os.environ["PATH"] = cls.old_path @@ -301,8 +249,13 @@ class IMAP4HelperMixin(BaseLeapTest):          but passing the same Soledad instance (it's costly to initialize),          so we have to be sure to restore state across tests.          """ +        UUID = 'deadbeef', +        USERID = TEST_USER +        memstore = MemoryStore() +          d = defer.Deferred() -        self.server = SimpleLEAPServer( +        self.server = LeapIMAPServer( +            uuid=UUID, userid=USERID,              contextFactory=self.serverCTX,              # XXX do we really need this??              soledad=self._soledad) @@ -317,9 +270,10 @@ class IMAP4HelperMixin(BaseLeapTest):          # I THINK we ONLY need to do it at one place now.          theAccount = SoledadBackedAccount( -            'testuser', -            soledad=self._soledad) -        SimpleLEAPServer.theAccount = theAccount +            USERID, +            soledad=self._soledad, +            memstore=memstore) +        LeapIMAPServer.theAccount = theAccount          # in case we get something from previous tests...          for mb in self.server.theAccount.mailboxes: @@ -404,8 +358,9 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):          We override mixin method since we are only testing          MessageCollection interface in this particular TestCase          """ +        memstore = MemoryStore()          self.messages = MessageCollection("testmbox%s" % (self.count,), -                                          self._soledad) +                                          self._soledad, memstore=memstore)          MessageCollectionTestCase.count += 1      def tearDown(self): @@ -414,9 +369,6 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):          """          del self.messages -    def wait(self): -        time.sleep(2) -      def testEmptyMessage(self):          """          Test empty message and collection @@ -425,11 +377,11 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):          self.assertEqual(              em,              { +                "chash": '', +                "deleted": False,                  "flags": [],                  "mbox": "inbox", -                "recent": True,                  "seen": False, -                "deleted": False,                  "multi": False,                  "size": 0,                  "type": "flags", @@ -441,79 +393,100 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Add multiple messages          """ -        # TODO really profile addition          mc = self.messages -        print "messages", self.messages          self.assertEqual(self.messages.count(), 0) -        mc.add_msg('Stuff', uid=1, subject="test1") -        mc.add_msg('Stuff', uid=2, subject="test2") -        mc.add_msg('Stuff', uid=3, subject="test3") -        mc.add_msg('Stuff', uid=4, subject="test4") -        self.wait() -        self.assertEqual(self.messages.count(), 4) -        mc.add_msg('Stuff', uid=5, subject="test5") -        mc.add_msg('Stuff', uid=6, subject="test6") -        mc.add_msg('Stuff', uid=7, subject="test7") -        self.wait() -        self.assertEqual(self.messages.count(), 7) -        self.wait() +        def add_first(): +            d = defer.gatherResults([ +                mc.add_msg('Stuff 1', uid=1, subject="test1"), +                mc.add_msg('Stuff 2', uid=2, subject="test2"), +                mc.add_msg('Stuff 3', uid=3, subject="test3"), +                mc.add_msg('Stuff 4', uid=4, subject="test4")]) +            return d + +        def add_second(result): +            d = defer.gatherResults([ +                mc.add_msg('Stuff 5', uid=5, subject="test5"), +                mc.add_msg('Stuff 6', uid=6, subject="test6"), +                mc.add_msg('Stuff 7', uid=7, subject="test7")]) +            return d + +        def check_second(result): +            return self.assertEqual(mc.count(), 7) + +        d1 = add_first() +        d1.addCallback(add_second) +        d1.addCallback(check_second) + +    @skip("needs update!")      def testRecentCount(self):          """          Test the recent count          """          mc = self.messages -        self.assertEqual(self.messages.count_recent(), 0) -        mc.add_msg('Stuff', uid=1, subject="test1") +        countrecent = mc.count_recent +        eq = self.assertEqual + +        self.assertEqual(countrecent(), 0) + +        d = mc.add_msg('Stuff', uid=1, subject="test1")          # For the semantics defined in the RFC, we auto-add the          # recent flag by default. -        self.wait() -        self.assertEqual(self.messages.count_recent(), 1) -        mc.add_msg('Stuff', subject="test2", uid=2, -                   flags=('\\Deleted',)) -        self.wait() -        self.assertEqual(self.messages.count_recent(), 2) -        mc.add_msg('Stuff', subject="test3", uid=3, -                   flags=('\\Recent',)) -        self.wait() -        self.assertEqual(self.messages.count_recent(), 3) -        mc.add_msg('Stuff', subject="test4", uid=4, -                   flags=('\\Deleted', '\\Recent')) -        self.wait() -        self.assertEqual(self.messages.count_recent(), 4) - -        for msg in mc: -            msg.removeFlags(('\\Recent',)) -        self.assertEqual(mc.count_recent(), 0) + +        def add2(_): +            return mc.add_msg('Stuff', subject="test2", uid=2, +                              flags=('\\Deleted',)) + +        def add3(_): +            return mc.add_msg('Stuff', subject="test3", uid=3, +                              flags=('\\Recent',)) + +        def add4(_): +            return mc.add_msg('Stuff', subject="test4", uid=4, +                              flags=('\\Deleted', '\\Recent')) + +        d.addCallback(lambda r: eq(countrecent(), 1)) +        d.addCallback(add2) +        d.addCallback(lambda r: eq(countrecent(), 2)) +        d.addCallback(add3) +        d.addCallback(lambda r: eq(countrecent(), 3)) +        d.addCallback(add4) +        d.addCallback(lambda r: eq(countrecent(), 4))      def testFilterByMailbox(self):          """          Test that queries filter by selected mailbox          """ -        def wait(): -            time.sleep(1) -          mc = self.messages          self.assertEqual(self.messages.count(), 0) -        mc.add_msg('', uid=1, subject="test1") -        mc.add_msg('', uid=2, subject="test2") -        mc.add_msg('', uid=3, subject="test3") -        wait() -        self.assertEqual(self.messages.count(), 3) -        newmsg = mc._get_empty_doc() -        newmsg['mailbox'] = "mailbox/foo" -        mc._soledad.create_doc(newmsg) -        self.assertEqual(mc.count(), 3) -        self.assertEqual( -            len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4) + +        def add_1(): +            d1 = mc.add_msg('msg 1', uid=1, subject="test1") +            d2 = mc.add_msg('msg 2', uid=2, subject="test2") +            d3 = mc.add_msg('msg 3', uid=3, subject="test3") +            d = defer.gatherResults([d1, d2, d3]) +            return d + +        add_1().addCallback(lambda ignored: self.assertEqual( +                            mc.count(), 3)) + +        # XXX this has to be redone to fit memstore ------------# +        #newmsg = mc._get_empty_doc() +        #newmsg['mailbox'] = "mailbox/foo" +        #mc._soledad.create_doc(newmsg) +        #self.assertEqual(mc.count(), 3) +        #self.assertEqual( +            #len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4)  class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): +    # TODO this currently will use a memory-only store. +    # create a different one for testing soledad sync.      """      Tests for the generic behavior of the LeapIMAP4Server      which, right now, it's just implemented in this test file as -    SimpleLEAPServer. We will move the implementation, together with +    LeapIMAPServer. We will move the implementation, together with      authentication bits, to leap.mail.imap.server so it can be instantiated      from the tac file. @@ -542,7 +515,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):              self.result.append(0)          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def create():              for name in succeed + fail: @@ -560,7 +533,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):      def _cbTestCreate(self, ignored, succeed, fail):          self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail)) -        mbox = SimpleLEAPServer.theAccount.mailboxes +        mbox = LeapIMAPServer.theAccount.mailboxes          answers = ['foobox', 'testbox', 'test/box', 'test', 'test/box/box']          mbox.sort()          answers.sort() @@ -571,10 +544,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Test whether we can delete mailboxes          """ -        SimpleLEAPServer.theAccount.addMailbox('delete/me') +        LeapIMAPServer.theAccount.addMailbox('delete/me')          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def delete():              return self.client.delete('delete/me') @@ -586,7 +559,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          d = defer.gatherResults([d1, d2])          d.addCallback(              lambda _: self.assertEqual( -                SimpleLEAPServer.theAccount.mailboxes, [])) +                LeapIMAPServer.theAccount.mailboxes, []))          return d      def testIllegalInboxDelete(self): @@ -597,7 +570,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.stashed = None          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def delete():              return self.client.delete('inbox') @@ -619,10 +592,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):      def testNonExistentDelete(self):          """          Test what happens if we try to delete a non-existent mailbox. -        We expect an error raised stating 'No such inbox' +        We expect an error raised stating 'No such mailbox'          """          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def delete():              return self.client.delete('delete/me') @@ -637,8 +610,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          d1.addCallbacks(self._cbStopClient, self._ebGeneral)          d2 = self.loopback()          d = defer.gatherResults([d1, d2]) -        d.addCallback(lambda _: self.assertEqual(str(self.failure.value), -                                                 'No such mailbox')) +        d.addCallback(lambda _: self.assertTrue( +            str(self.failure.value).startswith('No such mailbox')))          return d      @deferred(timeout=None) @@ -649,14 +622,14 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          Obs: this test will fail if SoledadMailbox returns hardcoded flags.          """ -        SimpleLEAPServer.theAccount.addMailbox('delete') -        to_delete = SimpleLEAPServer.theAccount.getMailbox('delete') +        LeapIMAPServer.theAccount.addMailbox('delete') +        to_delete = LeapIMAPServer.theAccount.getMailbox('delete')          to_delete.setFlags((r'\Noselect',))          to_delete.getFlags() -        SimpleLEAPServer.theAccount.addMailbox('delete/me') +        LeapIMAPServer.theAccount.addMailbox('delete/me')          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def delete():              return self.client.delete('delete') @@ -681,10 +654,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Test whether we can rename a mailbox          """ -        SimpleLEAPServer.theAccount.addMailbox('oldmbox') +        LeapIMAPServer.theAccount.addMailbox('oldmbox')          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def rename():              return self.client.rename('oldmbox', 'newname') @@ -696,7 +669,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          d = defer.gatherResults([d1, d2])          d.addCallback(lambda _:                        self.assertEqual( -                          SimpleLEAPServer.theAccount.mailboxes, +                          LeapIMAPServer.theAccount.mailboxes,                            ['newname']))          return d @@ -709,7 +682,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.stashed = None          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def rename():              return self.client.rename('inbox', 'frotz') @@ -733,11 +706,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Try to rename hierarchical mailboxes          """ -        SimpleLEAPServer.theAccount.create('oldmbox/m1') -        SimpleLEAPServer.theAccount.create('oldmbox/m2') +        LeapIMAPServer.theAccount.create('oldmbox/m1') +        LeapIMAPServer.theAccount.create('oldmbox/m2')          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def rename():              return self.client.rename('oldmbox', 'newname') @@ -750,7 +723,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          return d.addCallback(self._cbTestHierarchicalRename)      def _cbTestHierarchicalRename(self, ignored): -        mboxes = SimpleLEAPServer.theAccount.mailboxes +        mboxes = LeapIMAPServer.theAccount.mailboxes          expected = ['newname', 'newname/m1', 'newname/m2']          mboxes.sort()          self.assertEqual(mboxes, [s for s in expected]) @@ -761,7 +734,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          Test whether we can mark a mailbox as subscribed to          """          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def subscribe():              return self.client.subscribe('this/mbox') @@ -773,7 +746,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          d = defer.gatherResults([d1, d2])          d.addCallback(lambda _:                        self.assertEqual( -                          SimpleLEAPServer.theAccount.subscriptions, +                          LeapIMAPServer.theAccount.subscriptions,                            ['this/mbox']))          return d @@ -782,11 +755,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Test whether we can unsubscribe from a set of mailboxes          """ -        SimpleLEAPServer.theAccount.subscribe('this/mbox') -        SimpleLEAPServer.theAccount.subscribe('that/mbox') +        LeapIMAPServer.theAccount.subscribe('this/mbox') +        LeapIMAPServer.theAccount.subscribe('that/mbox')          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def unsubscribe():              return self.client.unsubscribe('this/mbox') @@ -798,7 +771,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          d = defer.gatherResults([d1, d2])          d.addCallback(lambda _:                        self.assertEqual( -                          SimpleLEAPServer.theAccount.subscriptions, +                          LeapIMAPServer.theAccount.subscriptions,                            ['that/mbox']))          return d @@ -811,7 +784,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.selectedArgs = None          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def select():              def selected(args): @@ -829,7 +802,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)      def _cbTestSelect(self, ignored): -        mbox = SimpleLEAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT') +        mbox = LeapIMAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT')          self.assertEqual(self.server.mbox.messages.mbox, mbox.messages.mbox)          self.assertEqual(self.selectedArgs, {              'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42, @@ -920,7 +893,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          Test login          """          def login(): -            d = self.client.login('testuser', 'password-test') +            d = self.client.login(TEST_USER, TEST_PASSWD)              d.addCallback(self._cbStopClient)          d1 = self.connected.addCallback(              strip(login)).addErrback(self._ebGeneral) @@ -928,7 +901,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          return d.addCallback(self._cbTestLogin)      def _cbTestLogin(self, ignored): -        self.assertEqual(self.server.account, SimpleLEAPServer.theAccount) +        self.assertEqual(self.server.account, LeapIMAPServer.theAccount)          self.assertEqual(self.server.state, 'auth')      @deferred(timeout=None) @@ -937,7 +910,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          Test bad login          """          def login(): -            d = self.client.login('testuser', 'wrong-password') +            d = self.client.login("bad_user@leap.se", TEST_PASSWD)              d.addBoth(self._cbStopClient)          d1 = self.connected.addCallback( @@ -947,19 +920,19 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          return d.addCallback(self._cbTestFailedLogin)      def _cbTestFailedLogin(self, ignored): -        self.assertEqual(self.server.account, None)          self.assertEqual(self.server.state, 'unauth') +        self.assertEqual(self.server.account, None)      @deferred(timeout=None)      def testLoginRequiringQuoting(self):          """          Test login requiring quoting          """ -        self.server._username = '{test}user' +        self.server._userid = '{test}user@leap.se'          self.server._password = '{test}password'          def login(): -            d = self.client.login('{test}user', '{test}password') +            d = self.client.login('{test}user@leap.se', '{test}password')              d.addBoth(self._cbStopClient)          d1 = self.connected.addCallback( @@ -968,7 +941,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          return d.addCallback(self._cbTestLoginRequiringQuoting)      def _cbTestLoginRequiringQuoting(self, ignored): -        self.assertEqual(self.server.account, SimpleLEAPServer.theAccount) +        self.assertEqual(self.server.account, LeapIMAPServer.theAccount)          self.assertEqual(self.server.state, 'auth')      # @@ -983,7 +956,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.namespaceArgs = None          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def namespace():              def gotNamespace(args): @@ -1022,7 +995,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.examinedArgs = None          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def examine():              def examined(args): @@ -1049,15 +1022,15 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):              'READ-WRITE': False})      def _listSetup(self, f): -        SimpleLEAPServer.theAccount.addMailbox('root/subthingl', -                                               creation_ts=42) -        SimpleLEAPServer.theAccount.addMailbox('root/another-thing', -                                               creation_ts=42) -        SimpleLEAPServer.theAccount.addMailbox('non-root/subthing', -                                               creation_ts=42) +        LeapIMAPServer.theAccount.addMailbox('root/subthingl', +                                             creation_ts=42) +        LeapIMAPServer.theAccount.addMailbox('root/another-thing', +                                             creation_ts=42) +        LeapIMAPServer.theAccount.addMailbox('non-root/subthing', +                                             creation_ts=42)          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def listed(answers):              self.listed = answers @@ -1092,7 +1065,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Test LSub command          """ -        SimpleLEAPServer.theAccount.subscribe('root/subthingl2') +        LeapIMAPServer.theAccount.subscribe('root/subthingl2')          def lsub():              return self.client.lsub('root', '%') @@ -1106,12 +1079,12 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Test Status command          """ -        SimpleLEAPServer.theAccount.addMailbox('root/subthings') +        LeapIMAPServer.theAccount.addMailbox('root/subthings')          # XXX FIXME ---- should populate this a little bit,          # with unseen etc...          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def status():              return self.client.status( @@ -1139,7 +1112,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          Test failed status command with a non-existent mailbox          """          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def status():              return self.client.status( @@ -1180,13 +1153,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          infile = util.sibpath(__file__, 'rfc822.message')          message = open(infile) -        SimpleLEAPServer.theAccount.addMailbox('root/subthing') +        LeapIMAPServer.theAccount.addMailbox('root/subthing')          def login(): -            return self.client.login('testuser', 'password-test') - -        def wait(): -            time.sleep(0.5) +            return self.client.login(TEST_USER, TEST_PASSWD)          def append():              return self.client.append( @@ -1198,21 +1168,19 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          d1 = self.connected.addCallback(strip(login))          d1.addCallbacks(strip(append), self._ebGeneral) -        d1.addCallbacks(strip(wait), self._ebGeneral)          d1.addCallbacks(self._cbStopClient, self._ebGeneral)          d2 = self.loopback()          d = defer.gatherResults([d1, d2])          return d.addCallback(self._cbTestFullAppend, infile)      def _cbTestFullAppend(self, ignored, infile): -        mb = SimpleLEAPServer.theAccount.getMailbox('root/subthing') -        time.sleep(0.5) +        mb = LeapIMAPServer.theAccount.getMailbox('root/subthing')          self.assertEqual(1, len(mb.messages))          msg = mb.messages.get_msg_by_uid(1)          self.assertEqual( -            ('\\SEEN', '\\DELETED'), -            msg.getFlags()) +            set(('\\Recent', '\\SEEN', '\\DELETED')), +            set(msg.getFlags()))          self.assertEqual(              'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', @@ -1220,14 +1188,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          parsed = self.parser.parse(open(infile))          body = parsed.get_payload() -        headers = parsed.items() +        headers = dict(parsed.items())          self.assertEqual(              body,              msg.getBodyFile().read()) - -        msg_headers = msg.getHeaders(True, "",) -        gotheaders = list(chain( -            *[[(k, item) for item in v] for (k, v) in msg_headers.items()])) +        gotheaders = msg.getHeaders(True)          self.assertItemsEqual(              headers, gotheaders) @@ -1238,13 +1203,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          Test partially appending a message to the mailbox          """          infile = util.sibpath(__file__, 'rfc822.message') -        SimpleLEAPServer.theAccount.addMailbox('PARTIAL/SUBTHING') +        LeapIMAPServer.theAccount.addMailbox('PARTIAL/SUBTHING')          def login(): -            return self.client.login('testuser', 'password-test') - -        def wait(): -            time.sleep(1) +            return self.client.login(TEST_USER, TEST_PASSWD)          def append():              message = file(infile) @@ -1257,7 +1219,6 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):                  )              )          d1 = self.connected.addCallback(strip(login)) -        d1.addCallbacks(strip(wait), self._ebGeneral)          d1.addCallbacks(strip(append), self._ebGeneral)          d1.addCallbacks(self._cbStopClient, self._ebGeneral)          d2 = self.loopback() @@ -1266,16 +1227,13 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):              self._cbTestPartialAppend, infile)      def _cbTestPartialAppend(self, ignored, infile): -        mb = SimpleLEAPServer.theAccount.getMailbox('PARTIAL/SUBTHING') -        time.sleep(1) +        mb = LeapIMAPServer.theAccount.getMailbox('PARTIAL/SUBTHING')          self.assertEqual(1, len(mb.messages))          msg = mb.messages.get_msg_by_uid(1)          self.assertEqual( -            ('\\SEEN', ), -            msg.getFlags() +            set(('\\SEEN', '\\Recent')), +            set(msg.getFlags())          ) -        #self.assertEqual( -            #'Right now', msg.getInternalDate())          parsed = self.parser.parse(open(infile))          body = parsed.get_payload()          self.assertEqual( @@ -1287,10 +1245,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          """          Test check command          """ -        SimpleLEAPServer.theAccount.addMailbox('root/subthing') +        LeapIMAPServer.theAccount.addMailbox('root/subthing')          def login(): -            return self.client.login('testuser', 'password-test') +            return self.client.login(TEST_USER, TEST_PASSWD)          def select():              return self.client.select('root/subthing') @@ -1306,7 +1264,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          # Okay, that was fun -    @deferred(timeout=None) +    @deferred(timeout=5)      def testClose(self):          """          Test closing the mailbox. We expect to get deleted all messages flagged @@ -1315,29 +1273,33 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          name = 'mailbox-close'          self.server.theAccount.addMailbox(name) -        m = SimpleLEAPServer.theAccount.getMailbox(name) -        m.messages.add_msg('test 1', uid=1, subject="Message 1", -                           flags=('\\Deleted', 'AnotherFlag')) -        m.messages.add_msg('test 2', uid=2, subject="Message 2", -                           flags=('AnotherFlag',)) -        m.messages.add_msg('test 3', uid=3, subject="Message 3", -                           flags=('\\Deleted',)) +        m = LeapIMAPServer.theAccount.getMailbox(name)          def login(): -            return self.client.login('testuser', 'password-test') - -        def wait(): -            time.sleep(1) +            return self.client.login(TEST_USER, TEST_PASSWD)          def select():              return self.client.select(name) +        def add_messages(): +            d1 = m.messages.add_msg( +                'test 1', uid=1, subject="Message 1", +                flags=('\\Deleted', 'AnotherFlag')) +            d2 = m.messages.add_msg( +                'test 2', uid=2, subject="Message 2", +                flags=('AnotherFlag',)) +            d3 = m.messages.add_msg( +                'test 3', uid=3, subject="Message 3", +                flags=('\\Deleted',)) +            d = defer.gatherResults([d1, d2, d3]) +            return d +          def close():              return self.client.close()          d = self.connected.addCallback(strip(login)) -        d.addCallbacks(strip(wait), self._ebGeneral)          d.addCallbacks(strip(select), self._ebGeneral) +        d.addCallbacks(strip(add_messages), self._ebGeneral)          d.addCallbacks(strip(close), self._ebGeneral)          d.addCallbacks(self._cbStopClient, self._ebGeneral)          d2 = self.loopback() @@ -1345,37 +1307,42 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):      def _cbTestClose(self, ignored, m):          self.assertEqual(len(m.messages), 1) -        messages = [msg for msg in m.messages] -        self.assertFalse(messages[0] is None) + +        msg = m.messages.get_msg_by_uid(2) +        self.assertFalse(msg is None)          self.assertEqual( -            messages[0]._hdoc.content['subject'], +            msg._hdoc.content['subject'],              'Message 2')          self.failUnless(m.closed) -    @deferred(timeout=None) +    @deferred(timeout=5)      def testExpunge(self):          """          Test expunge command          """          name = 'mailbox-expunge' -        SimpleLEAPServer.theAccount.addMailbox(name) -        m = SimpleLEAPServer.theAccount.getMailbox(name) -        m.messages.add_msg('test 1', uid=1, subject="Message 1", -                           flags=('\\Deleted', 'AnotherFlag')) -        m.messages.add_msg('test 2', uid=2, subject="Message 2", -                           flags=('AnotherFlag',)) -        m.messages.add_msg('test 3', uid=3, subject="Message 3", -                           flags=('\\Deleted',)) +        self.server.theAccount.addMailbox(name) +        m = LeapIMAPServer.theAccount.getMailbox(name)          def login(): -            return self.client.login('testuser', 'password-test') - -        def wait(): -            time.sleep(2) +            return self.client.login(TEST_USER, TEST_PASSWD)          def select():              return self.client.select('mailbox-expunge') +        def add_messages(): +            d1 = m.messages.add_msg( +                'test 1', uid=1, subject="Message 1", +                flags=('\\Deleted', 'AnotherFlag')) +            d2 = m.messages.add_msg( +                'test 2', uid=2, subject="Message 2", +                flags=('AnotherFlag',)) +            d3 = m.messages.add_msg( +                'test 3', uid=3, subject="Message 3", +                flags=('\\Deleted',)) +            d = defer.gatherResults([d1, d2, d3]) +            return d +          def expunge():              return self.client.expunge() @@ -1385,8 +1352,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):          self.results = None          d1 = self.connected.addCallback(strip(login)) -        d1.addCallbacks(strip(wait), self._ebGeneral)          d1.addCallbacks(strip(select), self._ebGeneral) +        d1.addCallbacks(strip(add_messages), self._ebGeneral)          d1.addCallbacks(strip(expunge), self._ebGeneral)          d1.addCallbacks(expunged, self._ebGeneral)          d1.addCallbacks(self._cbStopClient, self._ebGeneral) @@ -1397,9 +1364,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):      def _cbTestExpunge(self, ignored, m):          # we only left 1 mssage with no deleted flag          self.assertEqual(len(m.messages), 1) -        messages = [msg for msg in m.messages] + +        msg = m.messages.get_msg_by_uid(2)          self.assertEqual( -            messages[0]._hdoc.content['subject'], +            msg._hdoc.content['subject'],              'Message 2')          # the uids of the deleted messages          self.assertItemsEqual(self.results, [1, 3]) diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 80121c8..c8f224c 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -49,7 +49,7 @@ class IMessageProducer(Interface):      entities.      """ -    def push(self, item): +    def push(self, item, state=None):          """          Push a new item in the queue.          """ @@ -101,6 +101,10 @@ class MessageProducer(object):      # and consumption is not likely (?) to consume huge amounts of memory in      # our current settings, so the need to pause the stream is not urgent now. +    # TODO use enum +    STATE_NEW = 1 +    STATE_DIRTY = 2 +      def __init__(self, consumer, queue=Queue.Queue, period=1):          """          Initializes the MessageProducer @@ -115,7 +119,8 @@ class MessageProducer(object):          # it should implement a `consume` method          self._consumer = consumer -        self._queue = queue() +        self._queue_new = queue() +        self._queue_dirty = queue()          self._period = period          self._loop = LoopingCall(self._check_for_new) @@ -130,7 +135,7 @@ class MessageProducer(object):          If the queue is found empty, the loop is stopped. It will be started          again after the addition of new items.          """ -        self._consumer.consume(self._queue) +        self._consumer.consume((self._queue_new, self._queue_dirty))          if self.is_queue_empty():              self.stop() @@ -138,11 +143,13 @@ class MessageProducer(object):          """          Return True if queue is empty, False otherwise.          """ -        return self._queue.empty() +        new = self._queue_new +        dirty = self._queue_dirty +        return new.empty() and dirty.empty()      # public methods: IMessageProducer -    def push(self, item): +    def push(self, item, state=None):          """          Push a new item in the queue. @@ -150,7 +157,14 @@ class MessageProducer(object):          """          # XXX this might raise if the queue does not accept any new          # items. what to do then? -        self._queue.put(item) +        queue = self._queue_new + +        if state == self.STATE_NEW: +            queue = self._queue_new +        if state == self.STATE_DIRTY: +            queue = self._queue_dirty + +        queue.put(item)          self.start()      def start(self): diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py index 942acfb..fed24b3 100644 --- a/src/leap/mail/utils.py +++ b/src/leap/mail/utils.py @@ -17,10 +17,10 @@  """  Mail utilities.  """ -import copy  import json  import re  import traceback +import Queue  from leap.soledad.common.document import SoledadDocument @@ -49,7 +49,7 @@ def empty(thing):          thing = thing.content      try:          return len(thing) == 0 -    except ReferenceError: +    except (ReferenceError, TypeError):          return True @@ -94,6 +94,7 @@ def lowerdict(_dict):  PART_MAP = "part_map" +PHASH = "phash"  def _str_dict(d, k): @@ -130,6 +131,103 @@ def stringify_parts_map(d):      return d +def phash_iter(d): +    """ +    A recursive generator that extracts all the payload-hashes +    from an arbitrary nested parts-map dictionary. + +    :param d: the dictionary to walk +    :type d: dictionary +    :return: a list of all the phashes found +    :rtype: list +    """ +    if PHASH in d: +        yield d[PHASH] +    if PART_MAP in d: +        for key in d[PART_MAP]: +            for phash in phash_iter(d[PART_MAP][key]): +                yield phash + + +def accumulator(fun, lim): +    """ +    A simple accumulator that uses a closure and a mutable +    object to collect items. +    When the count of items is greater than `lim`, the +    collection is flushed after invoking a map of the function `fun` +    over it. + +    The returned accumulator can also be flushed at any moment +    by passing a boolean as a second parameter. + +    :param fun: the function to call over the collection +                when its size is greater than `lim` +    :type fun: callable +    :param lim: the turning point for the collection +    :type lim: int +    :rtype: function + +    >>> from pprint import pprint +    >>> acc = accumulator(pprint, 2) +    >>> acc(1) +    >>> acc(2) +    [1, 2] +    >>> acc(3) +    >>> acc(4) +    [3, 4] +    >>> acc = accumulator(pprint, 5) +    >>> acc(1) +    >>> acc(2) +    >>> acc(3) +    >>> acc(None, flush=True) +    [1,2,3] +    """ +    KEY = "items" +    _o = {KEY: []} + +    def _accumulator(item, flush=False): +        collection = _o[KEY] +        collection.append(item) +        if len(collection) >= lim or flush: +            map(fun, filter(None, collection)) +            _o[KEY] = [] + +    return _accumulator + + +def accumulator_queue(fun, lim): +    """ +    A version of the accumulator that uses a queue. + +    When the count of items is greater than `lim`, the +    queue is flushed after invoking the function `fun` +    over its items. + +    The returned accumulator can also be flushed at any moment +    by passing a boolean as a second parameter. + +    :param fun: the function to call over the collection +                when its size is greater than `lim` +    :type fun: callable +    :param lim: the turning point for the collection +    :type lim: int +    :rtype: function +    """ +    _q = Queue.Queue() + +    def _accumulator(item, flush=False): +        _q.put(item) +        if _q.qsize() >= lim or flush: +            collection = [_q.get() for i in range(_q.qsize())] +            map(fun, filter(None, collection)) + +    return _accumulator + + +# +# String manipulation +# +  class CustomJsonScanner(object):      """      This class is a context manager definition used to monkey patch the default @@ -169,6 +267,8 @@ class CustomJsonScanner(object):          if not monkey_patched:              return self._orig_scanstring(s, idx, *args, **kwargs) +        # TODO profile to see if a compiled regex can get us some +        # benefit here.          found = False          end = s.find("\"", idx)          while not found: | 
