diff options
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 131 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 236 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 26 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 336 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/server.py | 1 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 129 | ||||
| -rwxr-xr-x | mail/src/leap/mail/imap/tests/leap_tests_imap.zsh | 7 | ||||
| -rw-r--r-- | mail/src/leap/mail/utils.py | 5 | 
8 files changed, 532 insertions, 339 deletions
| diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 108d0da..b5c5719 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -26,7 +26,7 @@ import cStringIO  from collections import defaultdict  from twisted.internet import defer -from twisted.internet.task import deferLater +#from twisted.internet.task import deferLater  from twisted.python import log  from twisted.mail import imap4 @@ -119,6 +119,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          if not self.getFlags():              self.setFlags(self.INIT_FLAGS) +        if self._memstore: +            self.prime_last_uid_to_memstore() +      @property      def listeners(self):          """ @@ -132,6 +135,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          """          return self._listeners[self.mbox] +    # TODO this grows too crazily when many instances are fired, like +    # during imaptest stress testing. Should have a queue of limited size +    # instead.      def addListener(self, listener):          """          Add a listener to the listeners queue. @@ -153,6 +159,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          """          self.listeners.remove(listener) +    # TODO move completely to soledadstore, under memstore reponsibility.      def _get_mbox(self):          """          Return mailbox document. @@ -228,52 +235,28 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      def _get_last_uid(self):          """          Return the last uid for this mailbox. +        If we have a memory store, the last UID will be the highest +        recorded UID in the message store, or a counter cached from +        the mailbox document in soledad if this is higher.          :return: the last uid for messages in this mailbox          :rtype: bool          """ -        mbox = self._get_mbox() -        if not mbox: -            logger.error("We could not get a mbox!") -            # XXX It looks like it has been corrupted. -            # We need to be able to survive this. -            return None -        last = mbox.content.get(self.LAST_UID_KEY, 1) -        if self._memstore: -            last = max(last, self._memstore.get_last_uid(mbox)) +        last = self._memstore.get_last_uid(self.mbox) +        print "last uid for %s: %s (from memstore)" % (self.mbox, last)          return last -    def _set_last_uid(self, uid): -        """ -        Sets the last uid for this mailbox. +    last_uid = property( +        _get_last_uid, doc="Last_UID attribute.") -        :param uid: the uid to be set -        :type uid: int +    def prime_last_uid_to_memstore(self):          """ -        leap_assert(isinstance(uid, int), "uid has to be int") -        mbox = self._get_mbox() -        key = self.LAST_UID_KEY - -        count = self.getMessageCount() - -        # XXX safety-catch. If we do get duplicates, -        # we want to avoid further duplication. - -        if uid >= count: -            value = uid -        else: -            # something is wrong, -            # just set the last uid -            # beyond the max msg count. -            logger.debug("WRONG uid < count. Setting last uid to %s", count) -            value = count - -        mbox.content[key] = value -        # XXX this should be set in the memorystore instead!!! -        self._soledad.put_doc(mbox) - -    last_uid = property( -        _get_last_uid, _set_last_uid, doc="Last_UID attribute.") +        Prime memstore with last_uid value +        """ +        set_exist = set(self.messages.all_uid_iter()) +        last = max(set_exist) + 1 if set_exist else 1 +        logger.info("Priming Soledad last_uid to %s" % (last,)) +        self._memstore.set_last_soledad_uid(self.mbox, last)      def getUIDValidity(self):          """ @@ -315,8 +298,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :rtype: int          """          with self.next_uid_lock: -            self.last_uid += 1 -            return self.last_uid +            if self._memstore: +                return self.last_uid + 1 +            else: +                # XXX after lock, it should be safe to +                # return just the increment here, and +                # have a different method that actually increments +                # the counter when really adding. +                self.last_uid += 1 +                return self.last_uid      def getMessageCount(self):          """ @@ -397,26 +387,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          :return: a deferred that evals to None          """ +        # TODO have a look at the cases for internal date in the rfc          if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)):              message = message.getvalue() -        # XXX we should treat the message as an IMessage from here + +        # XXX we could treat the message as an IMessage from here          leap_assert_type(message, basestring) -        uid_next = self.getUIDNext() -        logger.debug('Adding msg with UID :%s' % uid_next)          if flags is None:              flags = tuple()          else:              flags = tuple(str(flag) for flag in flags) -        d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) +        d = self._do_add_message(message, flags=flags, date=date)          return d -    def _do_add_message(self, message, flags, date, uid): +    def _do_add_message(self, message, flags, date):          """ -        Calls to the messageCollection add_msg method (deferred to thread). +        Calls to the messageCollection add_msg method.          Invoked from addMessage.          """ -        d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) +        d = self.messages.add_msg(message, flags=flags, date=date)          # XXX Removing notify temporarily.          # This is interfering with imaptest results. I'm not clear if it's          # because we clutter the logging or because the set of listeners is @@ -456,6 +446,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          # XXX removing the mailbox in situ for now,          # we should postpone the removal + +        # XXX move to memory store??          self._soledad.delete_doc(self._get_mbox())      def _close_cb(self, result): @@ -466,8 +458,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          Expunge and mark as closed          """          d = self.expunge() -        d.addCallback(self._close_cb) -        return d +        #d.addCallback(self._close_cb) +        #return d      def _expunge_cb(self, result):          return result @@ -479,22 +471,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          print "EXPUNGE!"          if not self.isWriteable():              raise imap4.ReadOnlyMailbox -        mstore = self._memstore -        if mstore is not None: -            deleted = mstore.all_deleted_uid_iter(self.mbox) -            print "deleted ", list(deleted) -            for uid in deleted: -                mstore.remove_message(self.mbox, uid) - -        print "now deleting from soledad" -        d = self.messages.remove_all_deleted() -        d.addCallback(self._expunge_cb) -        d.addCallback(self.messages.reset_last_uid) - -        # XXX DEBUG ------------------- -        # FIXME !!! -        # XXX should remove the hdocset too!!! -        return d + +        return self._memstore.expunge(self.mbox) + +        # TODO we can defer this back when it's correct +        # but we should make sure the memstore has been synced. + +        #d = self._memstore.expunge(self.mbox) +        #d.addCallback(self._expunge_cb) +        #return d      def _bound_seq(self, messages_asked):          """ @@ -783,12 +768,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):      # IMessageCopier      @deferred +    #@profile      def copy(self, messageObject):          """          Copy the given message object into this mailbox.          """          from twisted.internet import reactor -        uid_next = self.getUIDNext()          msg = messageObject          memstore = self._memstore @@ -796,7 +781,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          fdoc = msg._fdoc          hdoc = msg._hdoc          if not fdoc: -            logger.debug("Tried to copy a MSG with no fdoc") +            logger.warning("Tried to copy a MSG with no fdoc")              return          new_fdoc = copy.deepcopy(fdoc.content) @@ -807,11 +792,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):          if exist:              print "Destination message already exists!" -          else:              print "DO COPY MESSAGE!" +            mbox = self.mbox +            uid_next = memstore.increment_last_soledad_uid(mbox)              new_fdoc[self.UID_KEY] = uid_next -            new_fdoc[self.MBOX_KEY] = self.mbox +            new_fdoc[self.MBOX_KEY] = mbox              # XXX set recent! @@ -824,9 +810,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):              self._memstore.create_message(                  self.mbox, uid_next,                  MessageWrapper( -                    new_fdoc, hdoc.content)) - -            deferLater(reactor, 1, self.notify_new) +                    new_fdoc, hdoc.content), +                notify_on_disk=False)      # convenience fun diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 232a2fb..60e98c7 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -19,16 +19,20 @@ In-memory transient store for a LEAPIMAPServer.  """  import contextlib  import logging +import threading  import weakref  from collections import defaultdict +from copy import copy  from twisted.internet import defer  from twisted.internet.task import LoopingCall  from twisted.python import log  from zope.interface import implements +from leap.common.check import leap_assert_type  from leap.mail import size +from leap.mail.decorators import deferred  from leap.mail.utils import empty  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces @@ -40,7 +44,10 @@ from leap.mail.imap.messageparts import ReferenciableDict  logger = logging.getLogger(__name__) -SOLEDAD_WRITE_PERIOD = 20 + +# The default period to do writebacks to the permanent +# soledad storage, in seconds. +SOLEDAD_WRITE_PERIOD = 10  @contextlib.contextmanager @@ -76,16 +83,11 @@ class MemoryStore(object):      implements(interfaces.IMessageStore,                 interfaces.IMessageStoreWriter) -    producer = None -      # TODO We will want to index by chash when we transition to local-only      # UIDs. -    # TODO should store RECENT-FLAGS too -    # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass -    # TODO do use dirty flag (maybe use namedtuples for that) so we can use it -    # also as a read-cache.      WRITING_FLAG = "_writing" +    _last_uid_lock = threading.Lock()      def __init__(self, permanent_store=None,                   write_period=SOLEDAD_WRITE_PERIOD): @@ -138,17 +140,20 @@ class MemoryStore(object):          self._rflags_store = defaultdict(              lambda: {'doc_id': None, 'set': set([])}) -        # TODO ----------------- implement mailbox-level flags store too? -        # XXX maybe we don't need this anymore... -        # let's see how good does it prefetch the headers if -        # we cache them in the store. -        self._hdocset_store = {} -        # -------------------------------------------------------------- +        """ +        last-uid store keeps the count of the highest UID +        per mailbox. + +        {'mbox-a': 42, +         'mbox-b': 23} +        """ +        self._last_uid = {}          # New and dirty flags, to set MessageWrapper State.          self._new = set([])          self._new_deferreds = {}          self._dirty = set([]) +        self._rflags_dirty = set([])          self._dirty_deferreds = {}          # Flag for signaling we're busy writing to the disk storage. @@ -210,14 +215,25 @@ class MemoryStore(object):          print "adding new doc to memstore %s (%s)" % (mbox, uid)          key = mbox, uid +        self._add_message(mbox, uid, message, notify_on_disk) +          d = defer.Deferred()          d.addCallback(lambda result: log.msg("message save: %s" % result)) -          self._new.add(key) + +        # We store this deferred so we can keep track of the pending +        # operations internally.          self._new_deferreds[key] = d -        self._add_message(mbox, uid, message, notify_on_disk) -        print "create message: ", d -        return d + +        if notify_on_disk: +            # Caller wants to be notified when the message is on disk +            # so we pass the deferred that will be fired when the message +            # has been written. +            return d +        else: +            # Caller does not care, just fired and forgot, so we pass +            # a defer that will inmediately have its callback triggered. +            return defer.succeed('fire-and-forget:%s' % str(key))      def put_message(self, mbox, uid, message, notify_on_disk=True):          """ @@ -238,13 +254,14 @@ class MemoryStore(object):          :rtype: Deferred          """          key = mbox, uid -          d = defer.Deferred() -        d.addCallback(lambda result: log.msg("message save: %s" % result)) +        d.addCallback(lambda result: log.msg("message PUT save: %s" % result))          self._dirty.add(key)          self._dirty_deferreds[key] = d          self._add_message(mbox, uid, message, notify_on_disk) +        #print "dirty ", self._dirty +        #print "new ", self._new          return d      def _add_message(self, mbox, uid, message, notify_on_disk=True): @@ -315,6 +332,19 @@ class MemoryStore(object):                      store.pop(key)          prune((FDOC, HDOC, CDOCS, DOCS_ID), store) +        #print "after adding: " +        #import pprint; pprint.pprint(self._msg_store[key]) + +    def get_docid_for_fdoc(self, mbox, uid): +        """ +        Get Soledad document id for the flags-doc for a given mbox and uid. +        """ +        fdoc = self._permanent_store.get_flags_doc(mbox, uid) +        if not fdoc: +            return None +        doc_id = fdoc.doc_id +        return doc_id +      def get_message(self, mbox, uid):          """          Get a MessageWrapper for the given mbox and uid combination. @@ -326,6 +356,8 @@ class MemoryStore(object):          if msg_dict:              new, dirty = self._get_new_dirty_state(key)              return MessageWrapper(from_dict=msg_dict, +                                  new=new, +                                  dirty=dirty,                                    memstore=weakref.proxy(self))          else:              return None @@ -334,6 +366,13 @@ class MemoryStore(object):          """          Remove a Message from this MemoryStore.          """ +        # XXX For the moment we are only removing the flags and headers +        # docs. The rest we leave there polluting your hard disk, +        # until we think about a good way of deorphaning. + +        # XXX implement elijah's idea of using a PUT document as a +        # token to ensure consistency in the removal. +          try:              key = mbox, uid              self._new.discard(key) @@ -348,18 +387,22 @@ class MemoryStore(object):          """          Write the message documents in this MemoryStore to a different store.          """ -        # For now, we pass if the queue is not empty, to avoid duplication. +        # For now, we pass if the queue is not empty, to avoid duplicate +        # queuing.          # We would better use a flag to know when we've already enqueued an          # item. + +        # XXX this could return the deferred for all the enqueued operations +          if not self.producer.is_queue_empty():              return          print "Writing messages to Soledad..."          with set_bool_flag(self, self.WRITING_FLAG): -            for msg_wrapper in self.all_new_dirty_msg_iter(): -                self.producer.push(msg_wrapper)              for rflags_doc_wrapper in self.all_rdocs_iter():                  self.producer.push(rflags_doc_wrapper) +            for msg_wrapper in self.all_new_dirty_msg_iter(): +                self.producer.push(msg_wrapper)      # MemoryStore specific methods. @@ -370,12 +413,61 @@ class MemoryStore(object):          all_keys = self._msg_store.keys()          return [uid for m, uid in all_keys if m == mbox] +    # last_uid +      def get_last_uid(self, mbox):          """          Get the highest UID for a given mbox. +        It will be the highest between the highest uid in the message store for +        the mailbox, and the soledad integer cache.          """          uids = self.get_uids(mbox) -        return uids and max(uids) or 0 +        last_mem_uid = uids and max(uids) or 0 +        last_soledad_uid = self.get_last_soledad_uid(mbox) +        return max(last_mem_uid, last_soledad_uid) + +    def get_last_soledad_uid(self, mbox): +        """ +        Get last uid for a given mbox from the soledad integer cache. +        """ +        return self._last_uid.get(mbox, 0) + +    def set_last_soledad_uid(self, mbox, value): +        """ +        Set last uid for a given mbox in the soledad integer cache. +        SoledadMailbox should prime this value during initialization. +        Other methods (during message adding) SHOULD call +        `increment_last_soledad_uid` instead. +        """ +        leap_assert_type(value, int) +        print "setting last soledad uid for ", mbox, "to", value +        # if we already have a vlue here, don't do anything +        with self._last_uid_lock: +            if not self._last_uid.get(mbox, None): +                self._last_uid[mbox] = value + +    def increment_last_soledad_uid(self, mbox): +        """ +        Increment by one the soledad integer cache for the last_uid for +        this mbox, and fire a defer-to-thread to update the soledad value. +        The caller should lock the call tho this method. +        """ +        with self._last_uid_lock: +            self._last_uid[mbox] += 1 +            value = self._last_uid[mbox] +            self.write_last_uid(mbox, value) +            return value + +    @deferred +    def write_last_uid(self, mbox, value): +        """ +        Increment the soledad cache, +        """ +        leap_assert_type(value, int) +        if self._permanent_store: +            self._permanent_store.write_last_uid(mbox, value) + +    # Counting sheeps...      def count_new_mbox(self, mbox):          """ @@ -418,14 +510,12 @@ class MemoryStore(object):          docs_dict = self._chash_fdoc_store.get(chash, None)          fdoc = docs_dict.get(mbox, None) if docs_dict else None -        print "GETTING FDOC BY CHASH:", fdoc -          # a couple of special cases.          # 1. We might have a doc with empty content...          if empty(fdoc):              return None -        # ...Or the message could exist, but being flagged for deletion. +        # 2. ...Or the message could exist, but being flagged for deletion.          # We want to create a new one in this case.          # Hmmm what if the deletion is un-done?? We would end with a          # duplicate... @@ -456,15 +546,22 @@ class MemoryStore(object):                  for key in sorted(self._msg_store.keys())                  if key in self._new or key in self._dirty) +    def all_msg_dict_for_mbox(self, mbox): +        """ +        Return all the message dicts for a given mbox. +        """ +        return [self._msg_store[(mb, uid)] +                for mb, uid in self._msg_store if mb == mbox] +      def all_deleted_uid_iter(self, mbox):          """          Return generator that iterates through the UIDs for all messags          with deleted flag in a given mailbox.          """ -        all_deleted = ( -            msg['fdoc']['uid'] for msg in self._msg_store.values() +        all_deleted = [ +            msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)              if msg.get('fdoc', None) -            and fields.DELETED_FLAG in msg['fdoc']['flags']) +            and fields.DELETED_FLAG in msg['fdoc']['flags']]          return all_deleted      # new, dirty flags @@ -473,6 +570,7 @@ class MemoryStore(object):          """          Return `new` and `dirty` flags for a given message.          """ +        # XXX should return *first* the news, and *then* the dirty...          return map(lambda _set: key in _set, (self._new, self._dirty))      def set_new(self, key): @@ -485,7 +583,7 @@ class MemoryStore(object):          """          Remove the key value from the `new` set.          """ -        print "Unsetting NEW for: %s" % str(key) +        #print "Unsetting NEW for: %s" % str(key)          self._new.discard(key)          deferreds = self._new_deferreds          d = deferreds.get(key, None) @@ -505,7 +603,7 @@ class MemoryStore(object):          """          Remove the key value from the `dirty` set.          """ -        print "Unsetting DIRTY for: %s" % str(key) +        #print "Unsetting DIRTY for: %s" % str(key)          self._dirty.discard(key)          deferreds = self._dirty_deferreds          d = deferreds.get(key, None) @@ -522,6 +620,7 @@ class MemoryStore(object):          """          Set the `Recent` flag for a given mailbox and UID.          """ +        self._rflags_dirty.add(mbox)          self._rflags_store[mbox]['set'].add(uid)      # TODO --- nice but unused @@ -536,6 +635,7 @@ class MemoryStore(object):          Set the value for the set of the recent flags.          Used from the property in the MessageCollection.          """ +        self._rflags_dirty.add(mbox)          self._rflags_store[mbox]['set'] = set(value)      def load_recent_flags(self, mbox, flags_doc): @@ -568,23 +668,81 @@ class MemoryStore(object):          :rtype: generator          """ -        rflags_store = self._rflags_store -          # XXX use enums          DOC_ID = "doc_id"          SET = "set" -        print "LEN RFLAGS_STORE ------->", len(rflags_store) -        return ( -            RecentFlagsDoc( +        rflags_store = self._rflags_store + +        def get_rdoc(mbox, rdict): +            mbox_rflag_set = rdict[SET] +            recent_set = copy(mbox_rflag_set) +            # zero it! +            mbox_rflag_set.difference_update(mbox_rflag_set) +            return RecentFlagsDoc(                  doc_id=rflags_store[mbox][DOC_ID],                  content={                      fields.TYPE_KEY: fields.TYPE_RECENT_VAL,                      fields.MBOX_KEY: mbox, -                    fields.RECENTFLAGS_KEY: list( -                        rflags_store[mbox][SET]) +                    fields.RECENTFLAGS_KEY: list(recent_set)                  }) -            for mbox in rflags_store) + +        return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() +                if not empty(rdict[SET])) + +    # Methods that mirror the IMailbox interface + +    def remove_all_deleted(self, mbox): +        """ +        Remove all messages flagged \\Deleted from this Memory Store only. +        Called from `expunge` +        """ +        mem_deleted = self.all_deleted_uid_iter(mbox) +        for uid in mem_deleted: +            self.remove_message(mbox, uid) +        return mem_deleted + +    def expunge(self, mbox): +        """ +        Remove all messages flagged \\Deleted, from the Memory Store +        and from the permanent store also. +        """ +        # TODO expunge should add itself as a callback to the ongoing +        # writes. +        soledad_store = self._permanent_store + +        try: +            # 1. Stop the writing call +            self._stop_write_loop() +            # 2. Enqueue a last write. +            #self.write_messages(soledad_store) +            # 3. Should wait on the writebacks to finish ??? +            # FIXME wait for this, and add all the rest of the method +            # as a callback!!! +        except Exception as exc: +            logger.exception(exc) + +        # Now, we...: + +        try: +            # 1. Delete all messages marked as deleted in soledad. + +            # XXX this could be deferred for faster operation. +            if soledad_store: +                sol_deleted = soledad_store.remove_all_deleted(mbox) +            else: +                sol_deleted = [] + +            # 2. Delete all messages marked as deleted in memory. +            mem_deleted = self.remove_all_deleted(mbox) + +            all_deleted = set(mem_deleted).union(set(sol_deleted)) +            print "deleted ", all_deleted +        except Exception as exc: +            logger.exception(exc) +        finally: +            self._start_write_loop() +        return all_deleted      # Dump-to-disk controls. diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index 257d3f0..6d8631a 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -32,7 +32,7 @@ from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset  from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields -from leap.mail.utils import first +from leap.mail.utils import empty, first  MessagePartType = Enum("hdoc", "fdoc", "cdoc", "cdocs", "docs_id") @@ -134,6 +134,13 @@ class MessageWrapper(object):                  self._dict[self.HDOC] = ReferenciableDict(hdoc)              if cdocs is not None:                  self._dict[self.CDOCS] = ReferenciableDict(cdocs) + +        # This will keep references to the doc_ids to be able to put +        # messages to soledad. It will be populated during the walk() to avoid +        # the overhead of reading from the db. + +        # XXX it really *only* make sense for the FDOC, the other parts +        # should not be "dirty", just new...!!!          self._dict[self.DOCS_ID] = docs_id      # properties @@ -201,6 +208,7 @@ class MessageWrapper(object):          else:              logger.warning("NO FDOC!!!")              content_ref = {} +          return MessagePartDoc(new=self.new, dirty=self.dirty,                                store=self._storetype,                                part=MessagePartType.fdoc, @@ -214,7 +222,6 @@ class MessageWrapper(object):          if _hdoc:              content_ref = weakref.proxy(_hdoc)          else: -            logger.warning("NO HDOC!!!!")              content_ref = {}          return MessagePartDoc(new=self.new, dirty=self.dirty,                                store=self._storetype, @@ -234,14 +241,21 @@ class MessageWrapper(object):      def walk(self):          """          Generator that iterates through all the parts, returning -        MessagePartDoc. +        MessagePartDoc. Used for writing to SoledadStore.          """ -        if self.fdoc is not None: +        if self._dirty: +            mbox = self.fdoc.content[fields.MBOX_KEY] +            uid = self.fdoc.content[fields.UID_KEY] +            docid_dict = self._dict[self.DOCS_ID] +            docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( +                mbox, uid) + +        if not empty(self.fdoc.content):              yield self.fdoc -        if self.hdoc is not None: +        if not empty(self.hdoc.content):              yield self.hdoc          for cdoc in self.cdocs.values(): -            if cdoc is not None: +            if not empty(cdoc):                  content_ref = weakref.proxy(cdoc)                  yield MessagePartDoc(new=self.new, dirty=self.dirty,                                       store=self._storetype, diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 5de638b..35c07f5 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -202,21 +202,21 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: The flags, represented as strings          :rtype: tuple          """ -        if self._uid is None: -            return [] +        #if self._uid is None: +            #return []          uid = self._uid -        flags = [] +        flags = set([])          fdoc = self._fdoc          if fdoc: -            flags = fdoc.content.get(self.FLAGS_KEY, None) +            flags = set(fdoc.content.get(self.FLAGS_KEY, None))          msgcol = self._collection          # We treat the recent flag specially: gotten from          # a mailbox-level document.          if msgcol and uid in msgcol.recent_flags: -            flags.append(fields.RECENT_FLAG) +            flags.add(fields.RECENT_FLAG)          if flags:              flags = map(str, flags)          return tuple(flags) @@ -236,7 +236,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          :return: a SoledadDocument instance          :rtype: SoledadDocument          """ -        # XXX use memory store ...! +        # XXX Move logic to memory store ...          leap_assert(isinstance(flags, tuple), "flags need to be a tuple")          log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -252,6 +252,7 @@ class LeapMessage(fields, MailParser, MBoxParser):          doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags          if self._collection.memstore is not None: +            print "putting message in collection"              self._collection.memstore.put_message(                  self._mbox, self._uid,                  MessageWrapper(fdoc=doc.content, new=False, dirty=True, @@ -508,6 +509,8 @@ class LeapMessage(fields, MailParser, MBoxParser):          pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})          return pmap[str(part)] +    # XXX moved to memory store +    # move the rest too. ------------------------------------------      def _get_flags_doc(self):          """          Return the document that keeps the flags for this @@ -617,57 +620,38 @@ class LeapMessage(fields, MailParser, MBoxParser):      # destructor -    @deferred -    def remove(self): -        """ -        Remove all docs associated with this message. -        Currently it removes only the flags doc. -        """ -        # XXX For the moment we are only removing the flags and headers -        # docs. The rest we leave there polluting your hard disk, -        # until we think about a good way of deorphaning. -        # Maybe a crawler of unreferenced docs. - -        # XXX implement elijah's idea of using a PUT document as a -        # token to ensure consistency in the removal. - -        uid = self._uid - -        fd = self._get_flags_doc() -        #hd = self._get_headers_doc() -        #bd = self._get_body_doc() -        #docs = [fd, hd, bd] - -        try: -            memstore = self._collection.memstore -        except AttributeError: -            memstore = False - -        if memstore and hasattr(fd, "store", None) == "mem": -            key = self._mbox, self._uid -            if fd.new: -                # it's a new document, so we can remove it and it will not -                # be writen. Watch out! We need to be sure it has not been -                # just queued to write! -                memstore.remove_message(*key) - -            if fd.dirty: -                doc_id = fd.doc_id -                doc = self._soledad.get_doc(doc_id) -                try: -                    self._soledad.delete_doc(doc) -                except Exception as exc: -                    logger.exception(exc) - -        else: +    # XXX this logic moved to remove_message in memory store... +    #@deferred +    #def remove(self): +        #""" +        #Remove all docs associated with this message. +        #Currently it removes only the flags doc. +        #""" +        #fd = self._get_flags_doc() +# +        #if fd.new: +            # it's a new document, so we can remove it and it will not +            # be writen. Watch out! We need to be sure it has not been +            # just queued to write! +            #memstore.remove_message(*key) +# +        #if fd.dirty: +            #doc_id = fd.doc_id +            #doc = self._soledad.get_doc(doc_id) +            #try: +                #self._soledad.delete_doc(doc) +            #except Exception as exc: +                #logger.exception(exc) +# +        #else:              # we just got a soledad_doc -            try: -                doc_id = fd.doc_id -                latest_doc = self._soledad.get_doc(doc_id) -                self._soledad.delete_doc(latest_doc) -            except Exception as exc: -                logger.exception(exc) -        return uid +            #try: +                #doc_id = fd.doc_id +                #latest_doc = self._soledad.get_doc(doc_id) +                #self._soledad.delete_doc(latest_doc) +            #except Exception as exc: +                #logger.exception(exc) +        #return uid      def does_exist(self):          """ @@ -826,7 +810,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # ensure that we have a recent-flags and a hdocs-sec doc          self._get_or_create_rdoc() -        self._get_or_create_hdocset() + +        # Not for now... +        #self._get_or_create_hdocset()      def _get_empty_doc(self, _type=FLAGS_DOC):          """ @@ -959,7 +945,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      # not deferring to thread cause this now uses deferred asa retval      #@deferred -    def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): +    #@profile +    def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, +                notify_on_disk=False):          """          Creates a new message document.          Here lives the magic of the leap mail. Well, in soledad, really. @@ -994,7 +982,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # parse          msg, chash, size, multi = self._do_parse(raw) -        # check for uniqueness. +        # check for uniqueness -------------------------------- +        # XXX profiler says that this test is costly. +        # So we probably should just do an in-memory check and +        # move the complete check to the soledad writer? +        # Watch out! We're reserving a UID right after this!          if self._fdoc_already_exists(chash):              print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"              logger.warning("We already have that message in this mailbox.") @@ -1003,6 +995,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              # so not touch it by the moment.              return defer.succeed('already_exists') +        uid = self.memstore.increment_last_soledad_uid(self.mbox) +        print "ADDING MSG WITH UID: %s" % uid +          fd = self._populate_flags(flags, uid, chash, size, multi)          hd = self._populate_headr(msg, chash, subject, date) @@ -1039,36 +1034,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # XXX Should allow also to dump to disk directly,          # for no-memstore cases. -        # we return a deferred that, by default, will be triggered when -        # saved to disk -        d = self.memstore.create_message(self.mbox, uid, msg_container) -        print "defered-add", d +        # we return a deferred that by default will be triggered +        # inmediately. +        d = self.memstore.create_message(self.mbox, uid, msg_container, +                                         notify_on_disk=notify_on_disk)          print "adding message", d          return d -    def _remove_cb(self, result): -        return result - -    def remove_all_deleted(self): -        """ -        Removes all messages flagged as deleted. -        """ -        delete_deferl = [] -        for msg in self.get_deleted(): -            delete_deferl.append(msg.remove()) -        d1 = defer.gatherResults(delete_deferl, consumeErrors=True) -        d1.addCallback(self._remove_cb) -        return d1 - -    def remove(self, msg): -        """ -        Remove a given msg. -        :param msg: the message to be removed -        :type msg: LeapMessage -        """ -        d = msg.remove() -        d.addCallback(self._remove_cb) -        return d +    #def remove(self, msg): +        #""" +        #Remove a given msg. +        #:param msg: the message to be removed +        #:type msg: LeapMessage +        #""" +        #d = msg.remove() +        #d.addCallback(self._remove_cb) +        #return d      #      # getters: specific queries @@ -1175,76 +1156,76 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      # XXX FIXME -------------------------------------      # This should be rewritten to use memory store. -    def _get_hdocset(self): -        """ -        An accessor for the hdocs-set for this mailbox. -        """ -        if not self.__hdocset: -            with self._hdocset_lock: -                hdocset_doc = self._get_hdocset_doc() -                value = set(hdocset_doc.content.get( -                    fields.HDOCS_SET_KEY, [])) -                self.__hdocset = value -        return self.__hdocset - -    def _set_hdocset(self, value): -        """ -        Setter for the hdocs-set for this mailbox. -        """ -        with self._hdocset_lock: -            hdocset_doc = self._get_hdocset_doc() -            newv = set(value) -            self.__hdocset = newv -            hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) +    #def _get_hdocset(self): +        #""" +        #An accessor for the hdocs-set for this mailbox. +        #""" +        #if not self.__hdocset: +            #with self._hdocset_lock: +                #hdocset_doc = self._get_hdocset_doc() +                #value = set(hdocset_doc.content.get( +                    #fields.HDOCS_SET_KEY, [])) +                #self.__hdocset = value +        #return self.__hdocset +# +    #def _set_hdocset(self, value): +        #""" +        #Setter for the hdocs-set for this mailbox. +        #""" +        #with self._hdocset_lock: +            #hdocset_doc = self._get_hdocset_doc() +            #newv = set(value) +            #self.__hdocset = newv +            #hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv)              # XXX should deferLater 0 it? -            self._soledad.put_doc(hdocset_doc) - -    _hdocset = property( -        _get_hdocset, _set_hdocset, -        doc="Set of Document-IDs for the headers docs associated " -            "with this mailbox.") - -    def _get_hdocset_doc(self): -        """ -        Get hdocs-set document for this mailbox. -        """ -        curried = partial( -            self._soledad.get_from_index, -            fields.TYPE_MBOX_IDX, -            fields.TYPE_HDOCS_SET_VAL, self.mbox) -        curried.expected = "hdocset" -        hdocset_doc = try_unique_query(curried) -        return hdocset_doc - +            #self._soledad.put_doc(hdocset_doc) +# +    #_hdocset = property( +        #_get_hdocset, _set_hdocset, +        #doc="Set of Document-IDs for the headers docs associated " +            #"with this mailbox.") +# +    #def _get_hdocset_doc(self): +        #""" +        #Get hdocs-set document for this mailbox. +        #""" +        #curried = partial( +            #self._soledad.get_from_index, +            #fields.TYPE_MBOX_IDX, +            #fields.TYPE_HDOCS_SET_VAL, self.mbox) +        #curried.expected = "hdocset" +        #hdocset_doc = try_unique_query(curried) +        #return hdocset_doc +#      # Property-set modification (protected by a different      # lock to give atomicity to the read/write operation) - -    def remove_hdocset_docids(self, docids): -        """ -        Remove the given document IDs from the set of -        header-documents associated with this mailbox. -        """ -        with self._hdocset_property_lock: -            self._hdocset = self._hdocset.difference( -                set(docids)) - -    def remove_hdocset_docid(self, docid): -        """ -        Remove the given document ID from the set of -        header-documents associated with this mailbox. -        """ -        with self._hdocset_property_lock: -            self._hdocset = self._hdocset.difference( -                set([docid])) - -    def add_hdocset_docid(self, docid): -        """ -        Add the given document ID to the set of -        header-documents associated with this mailbox. -        """ -        with self._hdocset_property_lock: -            self._hdocset = self._hdocset.union( -                set([docid])) +# +    #def remove_hdocset_docids(self, docids): +        #""" +        #Remove the given document IDs from the set of +        #header-documents associated with this mailbox. +        #""" +        #with self._hdocset_property_lock: +            #self._hdocset = self._hdocset.difference( +                #set(docids)) +# +    #def remove_hdocset_docid(self, docid): +        #""" +        #Remove the given document ID from the set of +        #header-documents associated with this mailbox. +        #""" +        #with self._hdocset_property_lock: +            #self._hdocset = self._hdocset.difference( +                #set([docid])) +# +    #def add_hdocset_docid(self, docid): +        #""" +        #Add the given document ID to the set of +        #header-documents associated with this mailbox. +        #""" +        #with self._hdocset_property_lock: +            #self._hdocset = self._hdocset.union( +                #set([docid]))      # individual doc getters, message layer. @@ -1378,18 +1359,20 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          return (u for u in sorted(uids)) -    def reset_last_uid(self, param): -        """ -        Set the last uid to the highest uid found. -        Used while expunging, passed as a callback. -        """ -        try: -            self.last_uid = max(self.all_uid_iter()) + 1 -        except ValueError: +    # XXX Should be moved to memstore +    #def reset_last_uid(self, param): +        #""" +        #Set the last uid to the highest uid found. +        #Used while expunging, passed as a callback. +        #""" +        #try: +            #self.last_uid = max(self.all_uid_iter()) + 1 +        #except ValueError:              # empty sequence -            pass -        return param +            #pass +        #return param +    # XXX MOVE to memstore      def all_flags(self):          """          Return a dict with all flags documents for this mailbox. @@ -1444,7 +1427,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          :rtype: int          """ -        # XXX We could cache this in memstore too until next write... +        # XXX We should cache this in memstore too until next write...          count = self._soledad.get_count_from_index(              fields.TYPE_MBOX_IDX,              fields.TYPE_FLAGS_VAL, self.mbox) @@ -1491,6 +1474,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):      # recent messages +    # XXX take it from memstore      def count_recent(self):          """          Count all messages with the `Recent` flag. @@ -1503,30 +1487,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          """          return len(self.recent_flags) -    # deleted messages - -    def deleted_iter(self): -        """ -        Get an iterator for the message UIDs with `deleted` flag. - -        :return: iterator through deleted message docs -        :rtype: iterable -        """ -        return (doc.content[self.UID_KEY] for doc in -                self._soledad.get_from_index( -                    fields.TYPE_MBOX_DEL_IDX, -                    fields.TYPE_FLAGS_VAL, self.mbox, '1')) - -    def get_deleted(self): -        """ -        Get all messages with the `Deleted` flag. - -        :returns: a generator of LeapMessages -        :rtype: generator -        """ -        return (LeapMessage(self._soledad, docid, self.mbox) -                for docid in self.deleted_iter()) -      def __len__(self):          """          Returns the number of messages on this mailbox. diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py index c95a9be..3a6ac9a 100644 --- a/mail/src/leap/mail/imap/server.py +++ b/mail/src/leap/mail/imap/server.py @@ -199,6 +199,7 @@ class LeapIMAPServer(imap4.IMAP4Server):          # XXX  fake a delayed operation, to debug problem with messages getting          # back to the source mailbox... +        print "faking checkpoint..."          import time          time.sleep(2)          return None diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index ea5b36e..60576a3 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -18,18 +18,22 @@  A MessageStore that writes to Soledad.  """  import logging +import threading  from itertools import chain +#from twisted.internet import defer  from u1db import errors as u1db_errors  from zope.interface import implements +from leap.common.check import leap_assert_type  from leap.mail.imap.messageparts import MessagePartType  from leap.mail.imap.messageparts import MessageWrapper  from leap.mail.imap.messageparts import RecentFlagsDoc  from leap.mail.imap.fields import fields  from leap.mail.imap.interfaces import IMessageStore  from leap.mail.messageflow import IMessageConsumer +from leap.mail.utils import first  logger = logging.getLogger(__name__) @@ -123,6 +127,7 @@ class SoledadStore(ContentDedup):      """      This will create docs in the local Soledad database.      """ +    _last_uid_lock = threading.Lock()      implements(IMessageConsumer, IMessageStore) @@ -177,6 +182,7 @@ class SoledadStore(ContentDedup):      # IMessageConsumer +    #@profile      def consume(self, queue):          """          Creates a new document in soledad db. @@ -220,6 +226,7 @@ class SoledadStore(ContentDedup):                  if isinstance(doc_wrapper, MessageWrapper):                      # If everything went well, we can unset the new flag                      # in the source store (memory store) +                    print "unsetting new flag!"                      doc_wrapper.new = False                      doc_wrapper.dirty = False              empty = queue.empty() @@ -243,13 +250,11 @@ class SoledadStore(ContentDedup):              return chain((doc_wrapper,),                           self._get_calls_for_msg_parts(doc_wrapper))          elif isinstance(doc_wrapper, RecentFlagsDoc): -            print "getting calls for rflags"              return chain((doc_wrapper,),                           self._get_calls_for_rflags_doc(doc_wrapper))          else:              print "********************"              print "CANNOT PROCESS ITEM!" -            print "item --------------------->", doc_wrapper              return (i for i in [])      def _try_call(self, call, item): @@ -275,6 +280,7 @@ class SoledadStore(ContentDedup):          if msg_wrapper.new is True:              call = self._soledad.create_doc +            print "NEW DOC ----------------------"              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk(): @@ -301,30 +307,22 @@ class SoledadStore(ContentDedup):          # the flags doc.          elif msg_wrapper.dirty is True: -            print "DIRTY DOC! ----------------------"              call = self._soledad.put_doc -              # item is expected to be a MessagePartDoc              for item in msg_wrapper.walk(): +                # XXX FIXME Give error if dirty and not doc_id !!!                  doc_id = item.doc_id  # defend! +                if not doc_id: +                    continue                  doc = self._soledad.get_doc(doc_id) -                doc.content = item.content - +                doc.content = dict(item.content)                  if item.part == MessagePartType.fdoc: -                    print "Will PUT the doc: ", doc -                    yield dict(doc), call - -                # XXX also for linkage-doc - -        # TODO should write back to the queue -        # with the results of the operation. -        # We can write there: -        # (*) MsgWriteACK  --> Should remove from incoming queue. -        #                      (We should do this here). -        # Implement using callbacks for each operation. +                    logger.debug("PUT dirty fdoc") +                    yield doc, call +                # XXX also for linkage-doc !!!          else: -            logger.error("Cannot delete documents yet!") +            logger.error("Cannot delete documents yet from the queue...!")      def _get_calls_for_rflags_doc(self, rflags_wrapper):          """ @@ -334,18 +332,91 @@ class SoledadStore(ContentDedup):          rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)          payload = rflags_wrapper.content -        print "rdoc", rdoc -        print "SAVING RFLAGS TO SOLEDAD..." -        import pprint; pprint.pprint(payload) +        logger.debug("Saving RFLAGS to Soledad...")          if payload:              rdoc.content = payload -            print -            print "YIELDING -----", rdoc -            print "AND ----------", call              yield rdoc, call -        else: -            print ">>>>>>>>>>>>>>>>>" -            print ">>>>>>>>>>>>>>>>>" -            print ">>>>>>>>>>>>>>>>>" -            print "No payload" + +    def _get_mbox_document(self, mbox): +        """ +        Return mailbox document. + +        :return: A SoledadDocument containing this mailbox, or None if +                 the query failed. +        :rtype: SoledadDocument or None. +        """ +        try: +            query = self._soledad.get_from_index( +                fields.TYPE_MBOX_IDX, +                fields.TYPE_MBOX_VAL, mbox) +            if query: +                return query.pop() +        except Exception as exc: +            logger.exception("Unhandled error %r" % exc) + +    def get_flags_doc(self, mbox, uid): +        """ +        Return the SoledadDocument for the given mbox and uid. +        """ +        try: +            flag_docs = self._soledad.get_from_index( +                fields.TYPE_MBOX_UID_IDX, +                fields.TYPE_FLAGS_VAL, mbox, str(uid)) +            result = first(flag_docs) +        except Exception as exc: +            # ugh! Something's broken down there! +            logger.warning("ERROR while getting flags for UID: %s" % uid) +            logger.exception(exc) +        finally: +            return result + +    def write_last_uid(self, mbox, value): +        """ +        Write the `last_uid` integer to the proper mailbox document +        in Soledad. +        This is called from the deferred triggered by +        memorystore.increment_last_soledad_uid, which is expected to +        run in a separate thread. +        """ +        leap_assert_type(value, int) +        key = fields.LAST_UID_KEY + +        with self._last_uid_lock: +            mbox_doc = self._get_mbox_document(mbox) +            old_val = mbox_doc.content[key] +            if value < old_val: +                logger.error("%s:%s Tried to write a UID lesser than what's " +                             "stored!" % (mbox, value)) +            mbox_doc.content[key] = value +            self._soledad.put_doc(mbox_doc) + +    # deleted messages + +    def deleted_iter(self, mbox): +        """ +        Get an iterator for the SoledadDocuments for messages +        with \\Deleted flag for a given mailbox. + +        :return: iterator through deleted message docs +        :rtype: iterable +        """ +        return (doc for doc in self._soledad.get_from_index( +                fields.TYPE_MBOX_DEL_IDX, +                fields.TYPE_FLAGS_VAL, mbox, '1')) + +    # TODO can deferToThread this? +    def remove_all_deleted(self, mbox): +        """ +        Remove from Soledad all messages flagged as deleted for a given +        mailbox. +        """ +        print "DELETING ALL DOCS FOR -------", mbox +        deleted = [] +        for doc in self.deleted_iter(mbox): +            deleted.append(doc.content[fields.UID_KEY]) +            print +            print ">>>>>>>>>>>>>>>>>>>>" +            print "deleting doc: ", doc.doc_id, doc.content +            self._soledad.delete_doc(doc) +        return deleted diff --git a/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh b/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh index 676d1a8..8f0df9f 100755 --- a/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh +++ b/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh @@ -61,7 +61,8 @@ IMAPTEST="imaptest"  # These should be kept constant across benchmarking  # runs across different machines, for comparability. -DURATION=200 +#DURATION=200 +DURATION=60  NUM_MSG=200 @@ -76,7 +77,7 @@ imaptest_cmd() {  }  stress_imap() 	{ -  mknod imap_pipe p +  mkfifo imap_pipe    cat imap_pipe | tee output &    imaptest_cmd >> imap_pipe  } @@ -99,7 +100,7 @@ print_results() {  	echo "----------------------"  	echo "\tavg\tstdev"  	$GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \ -	awk ' +	gawk '  function avg(data, count) {      sum=0;      for( x=0; x <= count-1; x++) { diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index bae2898..1f43947 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -42,7 +42,10 @@ def empty(thing):      """      if thing is None:          return True -    return len(thing) == 0 +    try: +        return len(thing) == 0 +    except ReferenceError: +        return True  def maybe_call(thing): | 
