diff options
| author | Kali Kaneko <kali@leap.se> | 2014-01-24 23:14:38 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-01-28 19:38:45 -0400 | 
| commit | b9042503becebfe07b3a4586bd56126b334e0182 (patch) | |
| tree | e1d3eda4a23812b828d7061e11c70bfa7ab42962 /mail/src/leap | |
| parent | 77f836cb1e698792cd28bca1d44ece6174b5f04d (diff) | |
recent-flags use the memory store
Diffstat (limited to 'mail/src/leap')
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 112 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 8 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 60 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 59 | 
4 files changed, 205 insertions, 34 deletions
| diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index dcae6b0..232a2fb 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -21,6 +21,8 @@ import contextlib  import logging  import weakref +from collections import defaultdict +  from twisted.internet import defer  from twisted.internet.task import LoopingCall  from twisted.python import log @@ -32,6 +34,7 @@ from leap.mail.messageflow import MessageProducer  from leap.mail.imap import interfaces  from leap.mail.imap.fields import fields  from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import RecentFlagsDoc  from leap.mail.imap.messageparts import MessageWrapper  from leap.mail.imap.messageparts import ReferenciableDict @@ -109,16 +112,38 @@ class MemoryStore(object):          # Internal Storage: content-hash:fdoc          """ +        chash-fdoc-store keeps references to +        the flag-documents indexed by content-hash. +          {'chash': {'mbox-a': weakref.proxy(dict),                     'mbox-b': weakref.proxy(dict)}          }          """          self._chash_fdoc_store = {} -        # TODO ----------------- implement mailbox-level flags store too! ---- -        self._rflags_store = {} +        # Internal Storage: recent-flags store +        """ +        recent-flags store keeps one dict per mailbox, +        with the document-id of the u1db document +        and the set of the UIDs that have the recent flag. + +        {'mbox-a': {'doc_id': 'deadbeef', +                    'set': {1,2,3,4} +                    } +        } +        """ +        # TODO this will have to transition to content-hash +        # indexes after we move to local-only UIDs. + +        self._rflags_store = defaultdict( +            lambda: {'doc_id': None, 'set': set([])}) + +        # TODO ----------------- implement mailbox-level flags store too? +        # XXX maybe we don't need this anymore... +        # let's see how good does it prefetch the headers if +        # we cache them in the store.          self._hdocset_store = {} -        # TODO ----------------- implement mailbox-level flags store too! ---- +        # --------------------------------------------------------------          # New and dirty flags, to set MessageWrapper State.          self._new = set([]) @@ -224,6 +249,8 @@ class MemoryStore(object):      def _add_message(self, mbox, uid, message, notify_on_disk=True):          # XXX have to differentiate between notify_new and notify_dirty +        # TODO defaultdict the hell outa here... +          key = mbox, uid          msg_dict = message.as_dict() @@ -331,6 +358,8 @@ class MemoryStore(object):          with set_bool_flag(self, self.WRITING_FLAG):              for msg_wrapper in self.all_new_dirty_msg_iter():                  self.producer.push(msg_wrapper) +            for rflags_doc_wrapper in self.all_rdocs_iter(): +                self.producer.push(rflags_doc_wrapper)      # MemoryStore specific methods. @@ -486,6 +515,79 @@ class MemoryStore(object):              d.callback('%s, ok' % str(key))              deferreds.pop(key) +    # Recent Flags + +    # TODO --- nice but unused +    def set_recent_flag(self, mbox, uid): +        """ +        Set the `Recent` flag for a given mailbox and UID. +        """ +        self._rflags_store[mbox]['set'].add(uid) + +    # TODO --- nice but unused +    def unset_recent_flag(self, mbox, uid): +        """ +        Unset the `Recent` flag for a given mailbox and UID. +        """ +        self._rflags_store[mbox]['set'].discard(uid) + +    def set_recent_flags(self, mbox, value): +        """ +        Set the value for the set of the recent flags. +        Used from the property in the MessageCollection. +        """ +        self._rflags_store[mbox]['set'] = set(value) + +    def load_recent_flags(self, mbox, flags_doc): +        """ +        Load the passed flags document in the recent flags store, for a given +        mailbox. + +        :param flags_doc: A dictionary containing the `doc_id` of the Soledad +                          flags-document for this mailbox, and the `set` +                          of uids marked with that flag. +        """ +        self._rflags_store[mbox] = flags_doc + +    def get_recent_flags(self, mbox): +        """ +        Get the set of UIDs with the `Recent` flag for this mailbox. + +        :return: set, or None +        """ +        rflag_for_mbox = self._rflags_store.get(mbox, None) +        if not rflag_for_mbox: +            return None +        return self._rflags_store[mbox]['set'] + +    def all_rdocs_iter(self): +        """ +        Return an iterator through all in-memory recent flag dicts, wrapped +        under a RecentFlagsDoc namedtuple. +        Used for saving to disk. + +        :rtype: generator +        """ +        rflags_store = self._rflags_store + +        # XXX use enums +        DOC_ID = "doc_id" +        SET = "set" + +        print "LEN RFLAGS_STORE ------->", len(rflags_store) +        return ( +            RecentFlagsDoc( +                doc_id=rflags_store[mbox][DOC_ID], +                content={ +                    fields.TYPE_KEY: fields.TYPE_RECENT_VAL, +                    fields.MBOX_KEY: mbox, +                    fields.RECENTFLAGS_KEY: list( +                        rflags_store[mbox][SET]) +                }) +            for mbox in rflags_store) + +    # Dump-to-disk controls. +      @property      def is_writing(self):          """ @@ -498,7 +600,9 @@ class MemoryStore(object):          :rtype: bool          """ -        # XXX this should probably return a deferred !!! +        # FIXME this should return a deferred !!! +        # XXX ----- can fire when all new + dirty deferreds +        # are done (gatherResults)          return getattr(self, self.WRITING_FLAG)      def put_part(self, part_type, value): diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index 055e6a5..257d3f0 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -73,6 +73,14 @@ MessagePartDoc = namedtuple(      'MessagePartDoc',      ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) +""" +A RecentFlagsDoc is used to send the recent-flags document payload to the +SoledadWriter during dumps. +""" +RecentFlagsDoc = namedtuple( +    'RecentFlagsDoc', +    ['content', 'doc_id']) +  class ReferenciableDict(dict):      """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index c212472..5de638b 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -813,6 +813,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          leap_assert(soledad, "Need a soledad instance to initialize")          # okay, all in order, keep going... +          self.mbox = self._parse_mailbox_name(mbox)          # XXX get a SoledadStore passed instead @@ -996,8 +997,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # check for uniqueness.          if self._fdoc_already_exists(chash):              print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" -            print -            print              logger.warning("We already have that message in this mailbox.")              # note that this operation will leave holes in the UID sequence,              # but we're gonna change that all the same for a local-only table. @@ -1023,21 +1022,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # XXX review-me          cdocs = dict((index, doc) for index, doc in                       enumerate(walk.get_raw_docs(msg, parts))) -        print "cdocs is", cdocs -        # Saving ---------------------------------------- -        # XXX should check for content duplication on headers too -        # but with chash. !!! +        self.set_recent_flag(uid) +        # Saving ----------------------------------------          # XXX adapt hdocset to use memstore          #hdoc = self._soledad.create_doc(hd)          # We add the newly created hdoc to the fast-access set of          # headers documents associated with the mailbox.          #self.add_hdocset_docid(hdoc.doc_id) -        # XXX move to memory store too -        # self.set_recent_flag(uid) -          # TODO ---- add reference to original doc, to be deleted          # after writes are done.          msg_container = MessageWrapper(fd, hd, cdocs) @@ -1088,24 +1082,48 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          """          An accessor for the recent-flags set for this mailbox.          """ -        if not self.__rflags: +        if self.__rflags is not None: +            return self.__rflags + +        if self.memstore: +            with self._rdoc_lock: +                rflags = self.memstore.get_recent_flags(self.mbox) +                if not rflags: +                    # not loaded in the memory store yet. +                    # let's fetch them from soledad... +                    rdoc = self._get_recent_doc() +                    rflags = set(rdoc.content.get( +                        fields.RECENTFLAGS_KEY, [])) +                    # ...and cache them now. +                    self.memstore.load_recent_flags( +                        self.mbox, +                        {'doc_id': rdoc.doc_id, 'set': rflags}) +            return rflags + +        else: +            # fallback for cases without memory store              with self._rdoc_lock:                  rdoc = self._get_recent_doc()                  self.__rflags = set(rdoc.content.get(                      fields.RECENTFLAGS_KEY, [])) -        return self.__rflags +            return self.__rflags      def _set_recent_flags(self, value):          """          Setter for the recent-flags set for this mailbox.          """ -        with self._rdoc_lock: -            rdoc = self._get_recent_doc() -            newv = set(value) -            self.__rflags = newv -            rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) -            # XXX should deferLater 0 it? -            self._soledad.put_doc(rdoc) +        if self.memstore: +            self.memstore.set_recent_flags(self.mbox, value) + +        else: +            # fallback for cases without memory store +            with self._rdoc_lock: +                rdoc = self._get_recent_doc() +                newv = set(value) +                self.__rflags = newv +                rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) +                # XXX should deferLater 0 it? +                self._soledad.put_doc(rdoc)      recent_flags = property(          _get_recent_flags, _set_recent_flags, @@ -1131,15 +1149,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          Unset Recent flag for a sequence of uids.          """          with self._rdoc_property_lock: -            self.recent_flags = self.recent_flags.difference( +            self.recent_flags.difference_update(                  set(uids)) +    # Individual flags operations +      def unset_recent_flag(self, uid):          """          Unset Recent flag for a given uid.          """          with self._rdoc_property_lock: -            self.recent_flags = self.recent_flags.difference( +            self.recent_flags.difference_update(                  set([uid]))      def set_recent_flag(self, uid): diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index b321da8..ea5b36e 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -25,6 +25,8 @@ from u1db import errors as u1db_errors  from zope.interface import implements  from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import RecentFlagsDoc  from leap.mail.imap.fields import fields  from leap.mail.imap.interfaces import IMessageStore  from leap.mail.messageflow import IMessageConsumer @@ -193,9 +195,10 @@ class SoledadStore(ContentDedup):          empty = queue.empty()          while not empty:              items = self._process(queue) +              # we prime the generator, that should return the -            # item in the first place. -            msg_wrapper = items.next() +            # message or flags wrapper item in the first place. +            doc_wrapper = items.next()              # From here, we unpack the subpart items and              # the right soledad call. @@ -214,10 +217,11 @@ class SoledadStore(ContentDedup):                  logger.error("Error while processing item.")                  pass              else: -                # If everything went well, we can unset the new flag -                # in the source store (memory store) -                msg_wrapper.new = False -                msg_wrapper.dirty = False +                if isinstance(doc_wrapper, MessageWrapper): +                    # If everything went well, we can unset the new flag +                    # in the source store (memory store) +                    doc_wrapper.new = False +                    doc_wrapper.dirty = False              empty = queue.empty()      # @@ -233,9 +237,20 @@ class SoledadStore(ContentDedup):          :param queue: the queue from where we'll pick item.          :type queue: Queue          """ -        msg_wrapper = queue.get() -        return chain((msg_wrapper,), -                     self._get_calls_for_msg_parts(msg_wrapper)) +        doc_wrapper = queue.get() + +        if isinstance(doc_wrapper, MessageWrapper): +            return chain((doc_wrapper,), +                         self._get_calls_for_msg_parts(doc_wrapper)) +        elif isinstance(doc_wrapper, RecentFlagsDoc): +            print "getting calls for rflags" +            return chain((doc_wrapper,), +                         self._get_calls_for_rflags_doc(doc_wrapper)) +        else: +            print "********************" +            print "CANNOT PROCESS ITEM!" +            print "item --------------------->", doc_wrapper +            return (i for i in [])      def _try_call(self, call, item):          """ @@ -309,4 +324,28 @@ class SoledadStore(ContentDedup):          # Implement using callbacks for each operation.          else: -            logger.error("Cannot put/delete documents yet!") +            logger.error("Cannot delete documents yet!") + +    def _get_calls_for_rflags_doc(self, rflags_wrapper): +        """ +        We always put these documents. +        """ +        call = self._soledad.put_doc +        rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + +        payload = rflags_wrapper.content +        print "rdoc", rdoc +        print "SAVING RFLAGS TO SOLEDAD..." +        import pprint; pprint.pprint(payload) + +        if payload: +            rdoc.content = payload +            print +            print "YIELDING -----", rdoc +            print "AND ----------", call +            yield rdoc, call +        else: +            print ">>>>>>>>>>>>>>>>>" +            print ">>>>>>>>>>>>>>>>>" +            print ">>>>>>>>>>>>>>>>>" +            print "No payload" | 
