From 2087c78ce5a4a4cdb8ed4192840059513088838f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 7 Feb 2014 05:50:55 -0400 Subject: separate better dirty/new flags; add cdocs --- mail/src/leap/mail/imap/memorystore.py | 88 ++++++++++++++++++++++----------- mail/src/leap/mail/imap/messages.py | 21 ++++---- mail/src/leap/mail/imap/soledadstore.py | 22 +++++++-- mail/src/leap/mail/utils.py | 19 +++++++ 4 files changed, 106 insertions(+), 44 deletions(-) (limited to 'mail/src') diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index 4156c0b1..ee3ee925 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -24,6 +24,7 @@ import weakref from collections import defaultdict from copy import copy +from itertools import chain from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -33,7 +34,7 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty +from leap.mail.utils import empty, phash_iter from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields @@ -110,13 +111,12 @@ class MemoryStore(object): # Internal Storage: messages """ - Flags document store. + flags document store. _fdoc_store[mbox][uid] = { 'content': 'aaa' } """ self._fdoc_store = defaultdict(lambda: defaultdict( lambda: ReferenciableDict({}))) -<<<<<<< HEAD # Sizes """ {'mbox, uid': } @@ -124,9 +124,14 @@ class MemoryStore(object): self._sizes = {} # Internal Storage: payload-hash -======= + """ + fdocs:doc-id store, stores document IDs for putting + the dirty flags-docs. + """ + self._fdoc_id_store = defaultdict(lambda: defaultdict( + lambda: '')) + # Internal Storage: content-hash:hdoc ->>>>>>> change internal storage and keying scheme in memstore """ hdoc-store keeps references to the header-documents indexed by content-hash. @@ -360,14 +365,6 @@ class MemoryStore(object): self._sizes[key] = size.get_size(self._fdoc_store[key]) # TODO add hdoc and cdocs sizes too - # XXX what to do with this? - #docs_id = msg_dict.get(DOCS_ID, None) - #if docs_id is not None: - #if not store.get(DOCS_ID, None): - #store[DOCS_ID] = {} - #store[DOCS_ID].update(docs_id) - - def get_docid_for_fdoc(self, mbox, uid): """ Return Soledad document id for the flags-doc for a given mbox and uid, @@ -379,13 +376,18 @@ class MemoryStore(object): :type uid: int :rtype: unicode or None """ - fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if empty(fdoc): - return None - doc_id = fdoc.doc_id + doc_id = self._fdoc_id_store[mbox][uid] + + if empty(doc_id): + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if empty(fdoc.content): + return None + doc_id = fdoc.doc_id + self._fdoc_id_store[mbox][uid] = doc_id + return doc_id - def get_message(self, mbox, uid, flags_only=False): + def get_message(self, mbox, uid, dirtystate="none", flags_only=False): """ Get a MessageWrapper for the given mbox and uid combination. @@ -393,19 +395,32 @@ class MemoryStore(object): :type mbox: str or unicode :param uid: the message UID :type uid: int + :param dirtystate: one of `dirty`, `new` or `none` (default) + :type dirtystate: str :param flags_only: whether the message should carry only a reference to the flags document. :type flags_only: bool + : :return: MessageWrapper or None """ + if dirtystate == "dirty": + flags_only = True + key = mbox, uid fdoc = self._fdoc_store[mbox][uid] if empty(fdoc): return None - new, dirty = self._get_new_dirty_state(key) + new, dirty = False, False + if dirtystate == "none": + new, dirty = self._get_new_dirty_state(key) + if dirtystate == "dirty": + new, dirty = False, True + if dirtystate == "new": + new, dirty = True, False + if flags_only: return MessageWrapper(fdoc=fdoc, new=new, dirty=dirty, @@ -413,7 +428,22 @@ class MemoryStore(object): else: chash = fdoc.get(fields.CONTENT_HASH_KEY) hdoc = self._hdoc_store[chash] - return MessageWrapper(fdoc=fdoc, hdoc=hdoc, + if empty(hdoc): + hdoc = self._permanent_store.get_headers_doc(chash) + if not empty(hdoc.content): + self._hdoc_store[chash] = hdoc.content + hdoc = hdoc.content + cdocs = None + + pmap = hdoc.get(fields.PARTS_MAP_KEY, None) + if new and pmap is not None: + # take the different cdocs for write... + cdoc_store = self._cdoc_store + cdocs_list = phash_iter(hdoc) + cdocs = dict(enumerate( + [cdoc_store[phash] for phash in cdocs_list], 1)) + + return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs, new=new, dirty=dirty, memstore=weakref.proxy(self)) @@ -437,14 +467,9 @@ class MemoryStore(object): key = mbox, uid self._new.discard(key) self._dirty.discard(key) -<<<<<<< HEAD - self._msg_store.pop(key, None) if key in self._sizes: del self._sizes[key] - -======= self._fdoc_store[mbox].pop(uid, None) ->>>>>>> change internal storage and keying scheme in memstore except Exception as exc: logger.exception(exc) @@ -464,7 +489,7 @@ class MemoryStore(object): # XXX this could return the deferred for all the enqueued operations if not self.producer.is_queue_empty(): - return + return False if any(map(lambda i: not empty(i), (self._new, self._dirty))): logger.info("Writing messages to Soledad...") @@ -598,6 +623,7 @@ class MemoryStore(object): """ # We can do direct assignments cause we know this will only # be called during initialization of the mailbox. + fdoc_store = self._fdoc_store[mbox] for uid in flag_docs: fdoc_store[uid] = ReferenciableDict(flag_docs[uid]) @@ -626,7 +652,8 @@ class MemoryStore(object): """ flags_dict = {} uids = self.get_uids(mbox) - fdoc_store = self._fdoc_store + fdoc_store = self._fdoc_store[mbox] + for uid in uids: try: flags = fdoc_store[uid][fields.FLAGS_KEY] @@ -763,9 +790,10 @@ class MemoryStore(object): :return: generator of MessageWrappers :rtype: generator """ - return (self.get_message(*key) - for key in sorted(self.iter_fdoc_keys()) - if key in self._new or key in self._dirty) + gm = self.get_message + new = (gm(*key) for key in self._new) + dirty = (gm(*key, flags_only=True) for key in self._dirty) + return chain(new, dirty) def all_deleted_uid_iter(self, mbox): """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 4b956894..8b6d3f31 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -264,17 +264,15 @@ class LeapMessage(fields, MailParser, MBoxParser): # to put it under the lock... doc.content[self.FLAGS_KEY] = newflags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + + # XXX check if this is working ok. doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - if self._collection.memstore is not None: - log.msg("putting message in collection") - self._collection.memstore.put_message( - self._mbox, self._uid, - MessageWrapper(fdoc=doc.content, new=False, dirty=True, - docs_id={'fdoc': doc.doc_id})) - else: - # fallback for non-memstore initializations. - self._soledad.put_doc(doc) + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) return map(str, newflags) def getInternalDate(self): @@ -524,6 +522,7 @@ class LeapMessage(fields, MailParser, MBoxParser): finally: return result + # TODO move to soledadstore instead of accessing soledad directly def _get_headers_doc(self): """ Return the document that keeps the headers for this @@ -534,6 +533,7 @@ class LeapMessage(fields, MailParser, MBoxParser): fields.TYPE_HEADERS_VAL, str(self.chash)) return first(head_docs) + # TODO move to soledadstore instead of accessing soledad directly def _get_body_doc(self): """ Return the document that keeps the body for this @@ -1165,7 +1165,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): or None if not found. :rtype: LeapMessage """ - msg_container = self.memstore.get_message(self.mbox, uid, flags_only) + msg_container = self.memstore.get_message( + self.mbox, uid, flags_only=flags_only) if msg_container is not None: if mem_only: msg = LeapMessage(None, uid, self.mbox, collection=self, diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py index a74b49c0..6cd3749a 100644 --- a/mail/src/leap/mail/imap/soledadstore.py +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -212,10 +212,8 @@ class SoledadStore(ContentDedup): to be inserted. :type queue: Queue """ - # TODO should delete the original message from incoming only after - # the writes are done. # TODO should handle the delete case - # TODO should handle errors + # TODO should handle errors better # TODO could generalize this method into a generic consumer # and only implement `process` here @@ -235,7 +233,7 @@ class SoledadStore(ContentDedup): Errorback for write operations. """ log.msg("ERROR: Error while processing item.") - log.msg(failure.getTraceBack()) + log.msg(failure.getTraceback()) while not queue.empty(): doc_wrapper = queue.get() @@ -354,6 +352,7 @@ class SoledadStore(ContentDedup): doc = self._GET_DOC_FUN(doc_id) doc.content = dict(item.content) item = doc + try: call(item) except u1db_errors.RevisionConflict as exc: @@ -451,6 +450,7 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode :param uid: the UID for the message :type uid: int + :rtype: SoledadDocument or None """ result = None try: @@ -465,6 +465,20 @@ class SoledadStore(ContentDedup): finally: return result + def get_headers_doc(self, chash): + """ + Return the document that keeps the headers for a message + indexed by its content-hash. + + :param chash: the content-hash to retrieve the document from. + :type chash: str or unicode + :rtype: SoledadDocument or None + """ + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + return first(head_docs) + def write_last_uid(self, mbox, value): """ Write the `last_uid` integer to the proper mailbox document diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index 942acfb9..8b75cfc1 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -94,6 +94,7 @@ def lowerdict(_dict): PART_MAP = "part_map" +PHASH = "phash" def _str_dict(d, k): @@ -130,6 +131,24 @@ def stringify_parts_map(d): return d +def phash_iter(d): + """ + A recursive generator that extracts all the payload-hashes + from an arbitrary nested parts-map dictionary. + + :param d: the dictionary to walk + :type d: dictionary + :return: a list of all the phashes found + :rtype: list + """ + if PHASH in d: + yield d[PHASH] + if PART_MAP in d: + for key in d[PART_MAP]: + for phash in phash_iter(d[PART_MAP][key]): + yield phash + + class CustomJsonScanner(object): """ This class is a context manager definition used to monkey patch the default -- cgit v1.2.3