diff options
author | Kali Kaneko <kali@leap.se> | 2014-01-24 05:39:13 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-01-28 19:38:45 -0400 |
commit | e02db78b1b6d8fe021efd4adb250c64a1dd4bac4 (patch) | |
tree | 8837eb90579898f2488dfe7fd581c87dd3a43def /src/leap/mail/imap/memorystore.py | |
parent | ff28e22977db802c87f0b7be99e37c6de29183e9 (diff) |
flags use the memstore
* add new/dirty deferred dict to notify when written to disk
* fix eventual duplication after copy
* fix flag flickering on first retrieval.
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r-- | src/leap/mail/imap/memorystore.py | 265 |
1 files changed, 234 insertions, 31 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index f0bdab5..f0c0d4b 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -21,10 +21,13 @@ import contextlib import logging import weakref +from twisted.internet import defer from twisted.internet.task import LoopingCall +from twisted.python import log from zope.interface import implements from leap.mail import size +from leap.mail.utils import empty from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields @@ -34,6 +37,8 @@ from leap.mail.imap.messageparts import ReferenciableDict logger = logging.getLogger(__name__) +SOLEDAD_WRITE_PERIOD = 20 + @contextlib.contextmanager def set_bool_flag(obj, att): @@ -79,7 +84,8 @@ class MemoryStore(object): WRITING_FLAG = "_writing" - def __init__(self, permanent_store=None, write_period=60): + def __init__(self, permanent_store=None, + write_period=SOLEDAD_WRITE_PERIOD): """ Initialize a MemoryStore. @@ -92,10 +98,23 @@ class MemoryStore(object): self._permanent_store = permanent_store self._write_period = write_period - # Internal Storage + # Internal Storage: messages self._msg_store = {} + + # Internal Storage: payload-hash + """ + {'phash': weakreaf.proxy(dict)} + """ self._phash_store = {} + # Internal Storage: content-hash:fdoc + """ + {'chash': {'mbox-a': weakref.proxy(dict), + 'mbox-b': weakref.proxy(dict)} + } + """ + self._chash_fdoc_store = {} + # TODO ----------------- implement mailbox-level flags store too! ---- self._rflags_store = {} self._hdocset_store = {} @@ -103,7 +122,9 @@ class MemoryStore(object): # New and dirty flags, to set MessageWrapper State. self._new = set([]) + self._new_deferreds = {} self._dirty = set([]) + self._dirty_deferreds = {} # Flag for signaling we're busy writing to the disk storage. setattr(self, self.WRITING_FLAG, False) @@ -141,48 +162,141 @@ class MemoryStore(object): # We would have to add a put_flags operation to modify only # the flags doc (and set the dirty flag accordingly) - def create_message(self, mbox, uid, message): + def create_message(self, mbox, uid, message, notify_on_disk=True): """ Create the passed message into this MemoryStore. By default we consider that any message is a new message. + + :param mbox: the mailbox + :type mbox: basestring + :param uid: the UID for the message + :type uid: int + :param message: a to be added + :type message: MessageWrapper + :param notify_on_disk: + :type notify_on_disk: bool + + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred """ print "adding new doc to memstore %s (%s)" % (mbox, uid) key = mbox, uid + + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message save: %s" % result)) + self._new.add(key) + self._new_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + print "create message: ", d + return d - msg_dict = message.as_dict() - self._msg_store[key] = msg_dict + def put_message(self, mbox, uid, message, notify_on_disk=True): + """ + Put an existing message. - cdocs = message.cdocs + :param mbox: the mailbox + :type mbox: basestring + :param uid: the UID for the message + :type uid: int + :param message: a to be added + :type message: MessageWrapper + :param notify_on_disk: + :type notify_on_disk: bool - dirty = key in self._dirty - new = key in self._new + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred + """ + key = mbox, uid + + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message save: %s" % result)) + + self._dirty.add(key) + self._dirty_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + return d - # XXX should capture this in log... + def _add_message(self, mbox, uid, message, notify_on_disk=True): + # XXX have to differentiate between notify_new and notify_dirty + + key = mbox, uid + msg_dict = message.as_dict() + print "ADDING MESSAGE..." + import pprint; pprint.pprint(msg_dict) + + # XXX use the enum as keys + + try: + store = self._msg_store[key] + except KeyError: + self._msg_store[key] = {'fdoc': {}, + 'hdoc': {}, + 'cdocs': {}, + 'docs_id': {}} + store = self._msg_store[key] + + print "In store (before):" + import pprint; pprint.pprint(store) + + #self._msg_store[key] = msg_dict + fdoc = msg_dict.get('fdoc', None) + if fdoc: + if not store.get('fdoc', None): + store['fdoc'] = ReferenciableDict({}) + store['fdoc'].update(fdoc) + + # content-hash indexing + chash = fdoc.get(fields.CONTENT_HASH_KEY) + chash_fdoc_store = self._chash_fdoc_store + if not chash in chash_fdoc_store: + chash_fdoc_store[chash] = {} + + chash_fdoc_store[chash][mbox] = weakref.proxy( + store['fdoc']) + + hdoc = msg_dict.get('hdoc', None) + if hdoc: + if not store.get('hdoc', None): + store['hdoc'] = ReferenciableDict({}) + store['hdoc'].update(hdoc) + + docs_id = msg_dict.get('docs_id', None) + if docs_id: + if not store.get('docs_id', None): + store['docs_id'] = {} + store['docs_id'].update(docs_id) + cdocs = message.cdocs for cdoc_key in cdocs.keys(): - print "saving cdoc" - cdoc = self._msg_store[key]['cdocs'][cdoc_key] + if not store.get('cdocs', None): + store['cdocs'] = {} - # FIXME this should be done in the MessageWrapper constructor - # instead... + cdoc = cdocs[cdoc_key] # first we make it weak-referenciable referenciable_cdoc = ReferenciableDict(cdoc) - self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc( - new=new, dirty=dirty, store="mem", - part=MessagePartType.cdoc, - content=referenciable_cdoc) + store['cdocs'][cdoc_key] = referenciable_cdoc phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) if not phash: continue self._phash_store[phash] = weakref.proxy(referenciable_cdoc) - def put_message(self, mbox, uid, msg): - """ - Put an existing message. - """ - return NotImplementedError() + def prune(seq, store): + for key in seq: + if key in store and empty(store.get(key)): + store.pop(key) + + prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store) + #import ipdb; ipdb.set_trace() + + + print "after appending to store: ", key + import pprint; pprint.pprint(self._msg_store[key]) def get_message(self, mbox, uid): """ @@ -203,7 +317,13 @@ class MemoryStore(object): """ Remove a Message from this MemoryStore. """ - raise NotImplementedError() + try: + key = mbox, uid + self._new.discard(key) + self._dirty.discard(key) + self._msg_store.pop(key, None) + except Exception as exc: + logger.exception(exc) # IMessageStoreWriter @@ -211,12 +331,15 @@ class MemoryStore(object): """ Write the message documents in this MemoryStore to a different store. """ - # XXX pass if it's writing (ie, the queue is not empty...) - # See how to make the writing_flag aware of the queue state... - print "writing messages to producer..." + # For now, we pass if the queue is not empty, to avoid duplication. + # We would better use a flag to know when we've already enqueued an + # item. + if not self.producer.is_queue_empty(): + return + print "Writing messages to Soledad..." with set_bool_flag(self, self.WRITING_FLAG): - for msg_wrapper in self.all_msg_iter(): + for msg_wrapper in self.all_new_dirty_msg_iter(): self.producer.push(msg_wrapper) # MemoryStore specific methods. @@ -247,12 +370,14 @@ class MemoryStore(object): """ return len(self._new) - def get_by_phash(self, phash): + def get_cdoc_from_phash(self, phash): """ Return a content-document by its payload-hash. """ doc = self._phash_store.get(phash, None) + # XXX return None for consistency? + # XXX have to keep a mapping between phash and its linkage # info, to know if this payload is been already saved or not. # We will be able to get this from the linkage-docs, @@ -262,7 +387,40 @@ class MemoryStore(object): return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.cdoc, - content=doc) + content=doc, + doc_id=None) + + def get_fdoc_from_chash(self, chash, mbox): + """ + Return a flags-document by its content-hash and a given mailbox. + + :return: MessagePartDoc, or None. + """ + docs_dict = self._chash_fdoc_store.get(chash, None) + fdoc = docs_dict.get(mbox, None) if docs_dict else None + + print "GETTING FDOC BY CHASH:", fdoc + + # a couple of special cases. + # 1. We might have a doc with empty content... + if empty(fdoc): + return None + + # ...Or the message could exist, but being flagged for deletion. + # We want to create a new one in this case. + # Hmmm what if the deletion is un-done?? We would end with a + # duplicate... + if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + return None + + # XXX get flags + new = True + dirty = False + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.fdoc, + content=fdoc, + doc_id=None) def all_msg_iter(self): """ @@ -271,6 +429,25 @@ class MemoryStore(object): return (self.get_message(*key) for key in sorted(self._msg_store.keys())) + def all_new_dirty_msg_iter(self): + """ + Return geneator that iterates through all new and dirty messages. + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys()) + if key in self._new or key in self._dirty) + + def all_deleted_uid_iter(self, mbox): + """ + Return generator that iterates through the UIDs for all messags + with deleted flag in a given mailbox. + """ + all_deleted = ( + msg['fdoc']['uid'] for msg in self._msg_store.values() + if msg.get('fdoc', None) + and fields.DELETED_FLAG in msg['fdoc']['flags']) + return all_deleted + # new, dirty flags def _get_new_dirty_state(self, key): @@ -289,9 +466,35 @@ class MemoryStore(object): """ Remove the key value from the `new` set. """ - print "******************" - print "UNSETTING NEW FOR: %s" % str(key) + print "Unsetting NEW for: %s" % str(key) self._new.discard(key) + deferreds = self._new_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + def set_dirty(self, key): + """ + Add the key value to the `dirty` set. + """ + self._dirty.add(key) + + def unset_dirty(self, key): + """ + Remove the key value from the `dirty` set. + """ + print "Unsetting DIRTY for: %s" % str(key) + self._dirty.discard(key) + deferreds = self._dirty_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) @property def is_writing(self): |