diff options
| author | Kali Kaneko <kali@leap.se> | 2014-01-27 16:11:53 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-01-28 19:38:46 -0400 | 
| commit | f5365ae0c2edb8b3e879f876f2f7e42b25f4616a (patch) | |
| tree | 6f9787f65e89d720739b99d7feefc30138ea890f /src/leap/mail/imap/memorystore.py | |
| parent | a5508429b90e2e9b58c5d073610ee5a10274663f (diff) | |
handle last_uid property in memory store
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
| -rw-r--r-- | src/leap/mail/imap/memorystore.py | 236 | 
1 files changed, 197 insertions, 39 deletions
| diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index 232a2fb..60e98c7 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -19,16 +19,20 @@ In-memory transient store for a LEAPIMAPServer.  """  import contextlib  import logging +import threading  import weakref  from collections import defaultdict +from copy import copy  from twisted.internet import defer  from twisted.internet.task import LoopingCall  from twisted.python import log  from zope.interface import implements +from leap.common.check import leap_assert_type  from leap.mail import size +from leap.mail.decorators import deferred  from leap.mail.utils import empty  from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces @@ -40,7 +44,10 @@ from leap.mail.imap.messageparts import ReferenciableDict  logger = logging.getLogger(__name__) -SOLEDAD_WRITE_PERIOD = 20 + +# The default period to do writebacks to the permanent +# soledad storage, in seconds. +SOLEDAD_WRITE_PERIOD = 10  @contextlib.contextmanager @@ -76,16 +83,11 @@ class MemoryStore(object):      implements(interfaces.IMessageStore,                 interfaces.IMessageStoreWriter) -    producer = None -      # TODO We will want to index by chash when we transition to local-only      # UIDs. -    # TODO should store RECENT-FLAGS too -    # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass -    # TODO do use dirty flag (maybe use namedtuples for that) so we can use it -    # also as a read-cache.      WRITING_FLAG = "_writing" +    _last_uid_lock = threading.Lock()      def __init__(self, permanent_store=None,                   write_period=SOLEDAD_WRITE_PERIOD): @@ -138,17 +140,20 @@ class MemoryStore(object):          self._rflags_store = defaultdict(              lambda: {'doc_id': None, 'set': set([])}) -        # TODO ----------------- implement mailbox-level flags store too? -        # XXX maybe we don't need this anymore... -        # let's see how good does it prefetch the headers if -        # we cache them in the store. -        self._hdocset_store = {} -        # -------------------------------------------------------------- +        """ +        last-uid store keeps the count of the highest UID +        per mailbox. + +        {'mbox-a': 42, +         'mbox-b': 23} +        """ +        self._last_uid = {}          # New and dirty flags, to set MessageWrapper State.          self._new = set([])          self._new_deferreds = {}          self._dirty = set([]) +        self._rflags_dirty = set([])          self._dirty_deferreds = {}          # Flag for signaling we're busy writing to the disk storage. @@ -210,14 +215,25 @@ class MemoryStore(object):          print "adding new doc to memstore %s (%s)" % (mbox, uid)          key = mbox, uid +        self._add_message(mbox, uid, message, notify_on_disk) +          d = defer.Deferred()          d.addCallback(lambda result: log.msg("message save: %s" % result)) -          self._new.add(key) + +        # We store this deferred so we can keep track of the pending +        # operations internally.          self._new_deferreds[key] = d -        self._add_message(mbox, uid, message, notify_on_disk) -        print "create message: ", d -        return d + +        if notify_on_disk: +            # Caller wants to be notified when the message is on disk +            # so we pass the deferred that will be fired when the message +            # has been written. +            return d +        else: +            # Caller does not care, just fired and forgot, so we pass +            # a defer that will inmediately have its callback triggered. +            return defer.succeed('fire-and-forget:%s' % str(key))      def put_message(self, mbox, uid, message, notify_on_disk=True):          """ @@ -238,13 +254,14 @@ class MemoryStore(object):          :rtype: Deferred          """          key = mbox, uid -          d = defer.Deferred() -        d.addCallback(lambda result: log.msg("message save: %s" % result)) +        d.addCallback(lambda result: log.msg("message PUT save: %s" % result))          self._dirty.add(key)          self._dirty_deferreds[key] = d          self._add_message(mbox, uid, message, notify_on_disk) +        #print "dirty ", self._dirty +        #print "new ", self._new          return d      def _add_message(self, mbox, uid, message, notify_on_disk=True): @@ -315,6 +332,19 @@ class MemoryStore(object):                      store.pop(key)          prune((FDOC, HDOC, CDOCS, DOCS_ID), store) +        #print "after adding: " +        #import pprint; pprint.pprint(self._msg_store[key]) + +    def get_docid_for_fdoc(self, mbox, uid): +        """ +        Get Soledad document id for the flags-doc for a given mbox and uid. +        """ +        fdoc = self._permanent_store.get_flags_doc(mbox, uid) +        if not fdoc: +            return None +        doc_id = fdoc.doc_id +        return doc_id +      def get_message(self, mbox, uid):          """          Get a MessageWrapper for the given mbox and uid combination. @@ -326,6 +356,8 @@ class MemoryStore(object):          if msg_dict:              new, dirty = self._get_new_dirty_state(key)              return MessageWrapper(from_dict=msg_dict, +                                  new=new, +                                  dirty=dirty,                                    memstore=weakref.proxy(self))          else:              return None @@ -334,6 +366,13 @@ class MemoryStore(object):          """          Remove a Message from this MemoryStore.          """ +        # XXX For the moment we are only removing the flags and headers +        # docs. The rest we leave there polluting your hard disk, +        # until we think about a good way of deorphaning. + +        # XXX implement elijah's idea of using a PUT document as a +        # token to ensure consistency in the removal. +          try:              key = mbox, uid              self._new.discard(key) @@ -348,18 +387,22 @@ class MemoryStore(object):          """          Write the message documents in this MemoryStore to a different store.          """ -        # For now, we pass if the queue is not empty, to avoid duplication. +        # For now, we pass if the queue is not empty, to avoid duplicate +        # queuing.          # We would better use a flag to know when we've already enqueued an          # item. + +        # XXX this could return the deferred for all the enqueued operations +          if not self.producer.is_queue_empty():              return          print "Writing messages to Soledad..."          with set_bool_flag(self, self.WRITING_FLAG): -            for msg_wrapper in self.all_new_dirty_msg_iter(): -                self.producer.push(msg_wrapper)              for rflags_doc_wrapper in self.all_rdocs_iter():                  self.producer.push(rflags_doc_wrapper) +            for msg_wrapper in self.all_new_dirty_msg_iter(): +                self.producer.push(msg_wrapper)      # MemoryStore specific methods. @@ -370,12 +413,61 @@ class MemoryStore(object):          all_keys = self._msg_store.keys()          return [uid for m, uid in all_keys if m == mbox] +    # last_uid +      def get_last_uid(self, mbox):          """          Get the highest UID for a given mbox. +        It will be the highest between the highest uid in the message store for +        the mailbox, and the soledad integer cache.          """          uids = self.get_uids(mbox) -        return uids and max(uids) or 0 +        last_mem_uid = uids and max(uids) or 0 +        last_soledad_uid = self.get_last_soledad_uid(mbox) +        return max(last_mem_uid, last_soledad_uid) + +    def get_last_soledad_uid(self, mbox): +        """ +        Get last uid for a given mbox from the soledad integer cache. +        """ +        return self._last_uid.get(mbox, 0) + +    def set_last_soledad_uid(self, mbox, value): +        """ +        Set last uid for a given mbox in the soledad integer cache. +        SoledadMailbox should prime this value during initialization. +        Other methods (during message adding) SHOULD call +        `increment_last_soledad_uid` instead. +        """ +        leap_assert_type(value, int) +        print "setting last soledad uid for ", mbox, "to", value +        # if we already have a vlue here, don't do anything +        with self._last_uid_lock: +            if not self._last_uid.get(mbox, None): +                self._last_uid[mbox] = value + +    def increment_last_soledad_uid(self, mbox): +        """ +        Increment by one the soledad integer cache for the last_uid for +        this mbox, and fire a defer-to-thread to update the soledad value. +        The caller should lock the call tho this method. +        """ +        with self._last_uid_lock: +            self._last_uid[mbox] += 1 +            value = self._last_uid[mbox] +            self.write_last_uid(mbox, value) +            return value + +    @deferred +    def write_last_uid(self, mbox, value): +        """ +        Increment the soledad cache, +        """ +        leap_assert_type(value, int) +        if self._permanent_store: +            self._permanent_store.write_last_uid(mbox, value) + +    # Counting sheeps...      def count_new_mbox(self, mbox):          """ @@ -418,14 +510,12 @@ class MemoryStore(object):          docs_dict = self._chash_fdoc_store.get(chash, None)          fdoc = docs_dict.get(mbox, None) if docs_dict else None -        print "GETTING FDOC BY CHASH:", fdoc -          # a couple of special cases.          # 1. We might have a doc with empty content...          if empty(fdoc):              return None -        # ...Or the message could exist, but being flagged for deletion. +        # 2. ...Or the message could exist, but being flagged for deletion.          # We want to create a new one in this case.          # Hmmm what if the deletion is un-done?? We would end with a          # duplicate... @@ -456,15 +546,22 @@ class MemoryStore(object):                  for key in sorted(self._msg_store.keys())                  if key in self._new or key in self._dirty) +    def all_msg_dict_for_mbox(self, mbox): +        """ +        Return all the message dicts for a given mbox. +        """ +        return [self._msg_store[(mb, uid)] +                for mb, uid in self._msg_store if mb == mbox] +      def all_deleted_uid_iter(self, mbox):          """          Return generator that iterates through the UIDs for all messags          with deleted flag in a given mailbox.          """ -        all_deleted = ( -            msg['fdoc']['uid'] for msg in self._msg_store.values() +        all_deleted = [ +            msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)              if msg.get('fdoc', None) -            and fields.DELETED_FLAG in msg['fdoc']['flags']) +            and fields.DELETED_FLAG in msg['fdoc']['flags']]          return all_deleted      # new, dirty flags @@ -473,6 +570,7 @@ class MemoryStore(object):          """          Return `new` and `dirty` flags for a given message.          """ +        # XXX should return *first* the news, and *then* the dirty...          return map(lambda _set: key in _set, (self._new, self._dirty))      def set_new(self, key): @@ -485,7 +583,7 @@ class MemoryStore(object):          """          Remove the key value from the `new` set.          """ -        print "Unsetting NEW for: %s" % str(key) +        #print "Unsetting NEW for: %s" % str(key)          self._new.discard(key)          deferreds = self._new_deferreds          d = deferreds.get(key, None) @@ -505,7 +603,7 @@ class MemoryStore(object):          """          Remove the key value from the `dirty` set.          """ -        print "Unsetting DIRTY for: %s" % str(key) +        #print "Unsetting DIRTY for: %s" % str(key)          self._dirty.discard(key)          deferreds = self._dirty_deferreds          d = deferreds.get(key, None) @@ -522,6 +620,7 @@ class MemoryStore(object):          """          Set the `Recent` flag for a given mailbox and UID.          """ +        self._rflags_dirty.add(mbox)          self._rflags_store[mbox]['set'].add(uid)      # TODO --- nice but unused @@ -536,6 +635,7 @@ class MemoryStore(object):          Set the value for the set of the recent flags.          Used from the property in the MessageCollection.          """ +        self._rflags_dirty.add(mbox)          self._rflags_store[mbox]['set'] = set(value)      def load_recent_flags(self, mbox, flags_doc): @@ -568,23 +668,81 @@ class MemoryStore(object):          :rtype: generator          """ -        rflags_store = self._rflags_store -          # XXX use enums          DOC_ID = "doc_id"          SET = "set" -        print "LEN RFLAGS_STORE ------->", len(rflags_store) -        return ( -            RecentFlagsDoc( +        rflags_store = self._rflags_store + +        def get_rdoc(mbox, rdict): +            mbox_rflag_set = rdict[SET] +            recent_set = copy(mbox_rflag_set) +            # zero it! +            mbox_rflag_set.difference_update(mbox_rflag_set) +            return RecentFlagsDoc(                  doc_id=rflags_store[mbox][DOC_ID],                  content={                      fields.TYPE_KEY: fields.TYPE_RECENT_VAL,                      fields.MBOX_KEY: mbox, -                    fields.RECENTFLAGS_KEY: list( -                        rflags_store[mbox][SET]) +                    fields.RECENTFLAGS_KEY: list(recent_set)                  }) -            for mbox in rflags_store) + +        return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() +                if not empty(rdict[SET])) + +    # Methods that mirror the IMailbox interface + +    def remove_all_deleted(self, mbox): +        """ +        Remove all messages flagged \\Deleted from this Memory Store only. +        Called from `expunge` +        """ +        mem_deleted = self.all_deleted_uid_iter(mbox) +        for uid in mem_deleted: +            self.remove_message(mbox, uid) +        return mem_deleted + +    def expunge(self, mbox): +        """ +        Remove all messages flagged \\Deleted, from the Memory Store +        and from the permanent store also. +        """ +        # TODO expunge should add itself as a callback to the ongoing +        # writes. +        soledad_store = self._permanent_store + +        try: +            # 1. Stop the writing call +            self._stop_write_loop() +            # 2. Enqueue a last write. +            #self.write_messages(soledad_store) +            # 3. Should wait on the writebacks to finish ??? +            # FIXME wait for this, and add all the rest of the method +            # as a callback!!! +        except Exception as exc: +            logger.exception(exc) + +        # Now, we...: + +        try: +            # 1. Delete all messages marked as deleted in soledad. + +            # XXX this could be deferred for faster operation. +            if soledad_store: +                sol_deleted = soledad_store.remove_all_deleted(mbox) +            else: +                sol_deleted = [] + +            # 2. Delete all messages marked as deleted in memory. +            mem_deleted = self.remove_all_deleted(mbox) + +            all_deleted = set(mem_deleted).union(set(sol_deleted)) +            print "deleted ", all_deleted +        except Exception as exc: +            logger.exception(exc) +        finally: +            self._start_write_loop() +        return all_deleted      # Dump-to-disk controls. | 
