From b9042503becebfe07b3a4586bd56126b334e0182 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 24 Jan 2014 23:14:38 -0400 Subject: recent-flags use the memory store --- mail/src/leap/mail/imap/memorystore.py | 112 ++++++++++++++++++++++++++++++-- mail/src/leap/mail/imap/messageparts.py | 8 +++ mail/src/leap/mail/imap/messages.py | 60 +++++++++++------ mail/src/leap/mail/imap/soledadstore.py | 59 ++++++++++++++--- 4 files changed, 205 insertions(+), 34 deletions(-) (limited to 'mail/src/leap') diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index dcae6b03..232a2fb2 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -21,6 +21,8 @@ import contextlib import logging import weakref +from collections import defaultdict + from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.python import log @@ -32,6 +34,7 @@ from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import ReferenciableDict @@ -109,16 +112,38 @@ class MemoryStore(object): # Internal Storage: content-hash:fdoc """ + chash-fdoc-store keeps references to + the flag-documents indexed by content-hash. + {'chash': {'mbox-a': weakref.proxy(dict), 'mbox-b': weakref.proxy(dict)} } """ self._chash_fdoc_store = {} - # TODO ----------------- implement mailbox-level flags store too! ---- - self._rflags_store = {} + # Internal Storage: recent-flags store + """ + recent-flags store keeps one dict per mailbox, + with the document-id of the u1db document + and the set of the UIDs that have the recent flag. + + {'mbox-a': {'doc_id': 'deadbeef', + 'set': {1,2,3,4} + } + } + """ + # TODO this will have to transition to content-hash + # indexes after we move to local-only UIDs. + + self._rflags_store = defaultdict( + lambda: {'doc_id': None, 'set': set([])}) + + # TODO ----------------- implement mailbox-level flags store too? + # XXX maybe we don't need this anymore... + # let's see how good does it prefetch the headers if + # we cache them in the store. self._hdocset_store = {} - # TODO ----------------- implement mailbox-level flags store too! ---- + # -------------------------------------------------------------- # New and dirty flags, to set MessageWrapper State. self._new = set([]) @@ -224,6 +249,8 @@ class MemoryStore(object): def _add_message(self, mbox, uid, message, notify_on_disk=True): # XXX have to differentiate between notify_new and notify_dirty + # TODO defaultdict the hell outa here... + key = mbox, uid msg_dict = message.as_dict() @@ -331,6 +358,8 @@ class MemoryStore(object): with set_bool_flag(self, self.WRITING_FLAG): for msg_wrapper in self.all_new_dirty_msg_iter(): self.producer.push(msg_wrapper) + for rflags_doc_wrapper in self.all_rdocs_iter(): + self.producer.push(rflags_doc_wrapper) # MemoryStore specific methods. @@ -486,6 +515,79 @@ class MemoryStore(object): d.callback('%s, ok' % str(key)) deferreds.pop(key) + # Recent Flags + + # TODO --- nice but unused + def set_recent_flag(self, mbox, uid): + """ + Set the `Recent` flag for a given mailbox and UID. + """ + self._rflags_store[mbox]['set'].add(uid) + + # TODO --- nice but unused + def unset_recent_flag(self, mbox, uid): + """ + Unset the `Recent` flag for a given mailbox and UID. + """ + self._rflags_store[mbox]['set'].discard(uid) + + def set_recent_flags(self, mbox, value): + """ + Set the value for the set of the recent flags. + Used from the property in the MessageCollection. + """ + self._rflags_store[mbox]['set'] = set(value) + + def load_recent_flags(self, mbox, flags_doc): + """ + Load the passed flags document in the recent flags store, for a given + mailbox. + + :param flags_doc: A dictionary containing the `doc_id` of the Soledad + flags-document for this mailbox, and the `set` + of uids marked with that flag. + """ + self._rflags_store[mbox] = flags_doc + + def get_recent_flags(self, mbox): + """ + Get the set of UIDs with the `Recent` flag for this mailbox. + + :return: set, or None + """ + rflag_for_mbox = self._rflags_store.get(mbox, None) + if not rflag_for_mbox: + return None + return self._rflags_store[mbox]['set'] + + def all_rdocs_iter(self): + """ + Return an iterator through all in-memory recent flag dicts, wrapped + under a RecentFlagsDoc namedtuple. + Used for saving to disk. + + :rtype: generator + """ + rflags_store = self._rflags_store + + # XXX use enums + DOC_ID = "doc_id" + SET = "set" + + print "LEN RFLAGS_STORE ------->", len(rflags_store) + return ( + RecentFlagsDoc( + doc_id=rflags_store[mbox][DOC_ID], + content={ + fields.TYPE_KEY: fields.TYPE_RECENT_VAL, + fields.MBOX_KEY: mbox, + fields.RECENTFLAGS_KEY: list( + rflags_store[mbox][SET]) + }) + for mbox in rflags_store) + + # Dump-to-disk controls. + @property def is_writing(self): """ @@ -498,7 +600,9 @@ class MemoryStore(object): :rtype: bool """ - # XXX this should probably return a deferred !!! + # FIXME this should return a deferred !!! + # XXX ----- can fire when all new + dirty deferreds + # are done (gatherResults) return getattr(self, self.WRITING_FLAG) def put_part(self, part_type, value): diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py index 055e6a5a..257d3f0e 100644 --- a/mail/src/leap/mail/imap/messageparts.py +++ b/mail/src/leap/mail/imap/messageparts.py @@ -73,6 +73,14 @@ MessagePartDoc = namedtuple( 'MessagePartDoc', ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) +""" +A RecentFlagsDoc is used to send the recent-flags document payload to the +SoledadWriter during dumps. +""" +RecentFlagsDoc = namedtuple( + 'RecentFlagsDoc', + ['content', 'doc_id']) + class ReferenciableDict(dict): """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index c212472b..5de638bc 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -813,6 +813,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): leap_assert(soledad, "Need a soledad instance to initialize") # okay, all in order, keep going... + self.mbox = self._parse_mailbox_name(mbox) # XXX get a SoledadStore passed instead @@ -996,8 +997,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # check for uniqueness. if self._fdoc_already_exists(chash): print ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" - print - print logger.warning("We already have that message in this mailbox.") # note that this operation will leave holes in the UID sequence, # but we're gonna change that all the same for a local-only table. @@ -1023,21 +1022,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # XXX review-me cdocs = dict((index, doc) for index, doc in enumerate(walk.get_raw_docs(msg, parts))) - print "cdocs is", cdocs - # Saving ---------------------------------------- - # XXX should check for content duplication on headers too - # but with chash. !!! + self.set_recent_flag(uid) + # Saving ---------------------------------------- # XXX adapt hdocset to use memstore #hdoc = self._soledad.create_doc(hd) # We add the newly created hdoc to the fast-access set of # headers documents associated with the mailbox. #self.add_hdocset_docid(hdoc.doc_id) - # XXX move to memory store too - # self.set_recent_flag(uid) - # TODO ---- add reference to original doc, to be deleted # after writes are done. msg_container = MessageWrapper(fd, hd, cdocs) @@ -1088,24 +1082,48 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ An accessor for the recent-flags set for this mailbox. """ - if not self.__rflags: + if self.__rflags is not None: + return self.__rflags + + if self.memstore: + with self._rdoc_lock: + rflags = self.memstore.get_recent_flags(self.mbox) + if not rflags: + # not loaded in the memory store yet. + # let's fetch them from soledad... + rdoc = self._get_recent_doc() + rflags = set(rdoc.content.get( + fields.RECENTFLAGS_KEY, [])) + # ...and cache them now. + self.memstore.load_recent_flags( + self.mbox, + {'doc_id': rdoc.doc_id, 'set': rflags}) + return rflags + + else: + # fallback for cases without memory store with self._rdoc_lock: rdoc = self._get_recent_doc() self.__rflags = set(rdoc.content.get( fields.RECENTFLAGS_KEY, [])) - return self.__rflags + return self.__rflags def _set_recent_flags(self, value): """ Setter for the recent-flags set for this mailbox. """ - with self._rdoc_lock: - rdoc = self._get_recent_doc() - newv = set(value) - self.__rflags = newv - rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(rdoc) + if self.memstore: + self.memstore.set_recent_flags(self.mbox, value) + + else: + # fallback for cases without memory store + with self._rdoc_lock: + rdoc = self._get_recent_doc() + newv = set(value) + self.__rflags = newv + rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) + # XXX should deferLater 0 it? + self._soledad.put_doc(rdoc) recent_flags = property( _get_recent_flags, _set_recent_flags, @@ -1131,15 +1149,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): Unset Recent flag for a sequence of uids. """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set(uids)) + # Individual flags operations + def unset_recent_flag(self, uid): """ Unset Recent flag for a given uid. """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set([uid])) def set_recent_flag(self, uid): diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index b321da8a..ea5b36e0 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -25,6 +25,8 @@ from u1db import errors as u1db_errors from zope.interface import implements from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer @@ -193,9 +195,10 @@ class SoledadStore(ContentDedup): empty = queue.empty() while not empty: items = self._process(queue) + # we prime the generator, that should return the - # item in the first place. - msg_wrapper = items.next() + # message or flags wrapper item in the first place. + doc_wrapper = items.next() # From here, we unpack the subpart items and # the right soledad call. @@ -214,10 +217,11 @@ class SoledadStore(ContentDedup): logger.error("Error while processing item.") pass else: - # If everything went well, we can unset the new flag - # in the source store (memory store) - msg_wrapper.new = False - msg_wrapper.dirty = False + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + doc_wrapper.new = False + doc_wrapper.dirty = False empty = queue.empty() # @@ -233,9 +237,20 @@ class SoledadStore(ContentDedup): :param queue: the queue from where we'll pick item. :type queue: Queue """ - msg_wrapper = queue.get() - return chain((msg_wrapper,), - self._get_calls_for_msg_parts(msg_wrapper)) + doc_wrapper = queue.get() + + if isinstance(doc_wrapper, MessageWrapper): + return chain((doc_wrapper,), + self._get_calls_for_msg_parts(doc_wrapper)) + elif isinstance(doc_wrapper, RecentFlagsDoc): + print "getting calls for rflags" + return chain((doc_wrapper,), + self._get_calls_for_rflags_doc(doc_wrapper)) + else: + print "********************" + print "CANNOT PROCESS ITEM!" + print "item --------------------->", doc_wrapper + return (i for i in []) def _try_call(self, call, item): """ @@ -309,4 +324,28 @@ class SoledadStore(ContentDedup): # Implement using callbacks for each operation. else: - logger.error("Cannot put/delete documents yet!") + logger.error("Cannot delete documents yet!") + + def _get_calls_for_rflags_doc(self, rflags_wrapper): + """ + We always put these documents. + """ + call = self._soledad.put_doc + rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + + payload = rflags_wrapper.content + print "rdoc", rdoc + print "SAVING RFLAGS TO SOLEDAD..." + import pprint; pprint.pprint(payload) + + if payload: + rdoc.content = payload + print + print "YIELDING -----", rdoc + print "AND ----------", call + yield rdoc, call + else: + print ">>>>>>>>>>>>>>>>>" + print ">>>>>>>>>>>>>>>>>" + print ">>>>>>>>>>>>>>>>>" + print "No payload" -- cgit v1.2.3