summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/memorystore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-24 05:39:13 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:45 -0400
commite02db78b1b6d8fe021efd4adb250c64a1dd4bac4 (patch)
tree8837eb90579898f2488dfe7fd581c87dd3a43def /src/leap/mail/imap/memorystore.py
parentff28e22977db802c87f0b7be99e37c6de29183e9 (diff)
flags use the memstore
* add new/dirty deferred dict to notify when written to disk * fix eventual duplication after copy * fix flag flickering on first retrieval.
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r--src/leap/mail/imap/memorystore.py265
1 files changed, 234 insertions, 31 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index f0bdab5..f0c0d4b 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -21,10 +21,13 @@ import contextlib
import logging
import weakref
+from twisted.internet import defer
from twisted.internet.task import LoopingCall
+from twisted.python import log
from zope.interface import implements
from leap.mail import size
+from leap.mail.utils import empty
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
@@ -34,6 +37,8 @@ from leap.mail.imap.messageparts import ReferenciableDict
logger = logging.getLogger(__name__)
+SOLEDAD_WRITE_PERIOD = 20
+
@contextlib.contextmanager
def set_bool_flag(obj, att):
@@ -79,7 +84,8 @@ class MemoryStore(object):
WRITING_FLAG = "_writing"
- def __init__(self, permanent_store=None, write_period=60):
+ def __init__(self, permanent_store=None,
+ write_period=SOLEDAD_WRITE_PERIOD):
"""
Initialize a MemoryStore.
@@ -92,10 +98,23 @@ class MemoryStore(object):
self._permanent_store = permanent_store
self._write_period = write_period
- # Internal Storage
+ # Internal Storage: messages
self._msg_store = {}
+
+ # Internal Storage: payload-hash
+ """
+ {'phash': weakreaf.proxy(dict)}
+ """
self._phash_store = {}
+ # Internal Storage: content-hash:fdoc
+ """
+ {'chash': {'mbox-a': weakref.proxy(dict),
+ 'mbox-b': weakref.proxy(dict)}
+ }
+ """
+ self._chash_fdoc_store = {}
+
# TODO ----------------- implement mailbox-level flags store too! ----
self._rflags_store = {}
self._hdocset_store = {}
@@ -103,7 +122,9 @@ class MemoryStore(object):
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
+ self._new_deferreds = {}
self._dirty = set([])
+ self._dirty_deferreds = {}
# Flag for signaling we're busy writing to the disk storage.
setattr(self, self.WRITING_FLAG, False)
@@ -141,48 +162,141 @@ class MemoryStore(object):
# We would have to add a put_flags operation to modify only
# the flags doc (and set the dirty flag accordingly)
- def create_message(self, mbox, uid, message):
+ def create_message(self, mbox, uid, message, notify_on_disk=True):
"""
Create the passed message into this MemoryStore.
By default we consider that any message is a new message.
+
+ :param mbox: the mailbox
+ :type mbox: basestring
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a to be added
+ :type message: MessageWrapper
+ :param notify_on_disk:
+ :type notify_on_disk: bool
+
+ :return: a Deferred. if notify_on_disk is True, will be fired
+ when written to the db on disk.
+ Otherwise will fire inmediately
+ :rtype: Deferred
"""
print "adding new doc to memstore %s (%s)" % (mbox, uid)
key = mbox, uid
+
+ d = defer.Deferred()
+ d.addCallback(lambda result: log.msg("message save: %s" % result))
+
self._new.add(key)
+ self._new_deferreds[key] = d
+ self._add_message(mbox, uid, message, notify_on_disk)
+ print "create message: ", d
+ return d
- msg_dict = message.as_dict()
- self._msg_store[key] = msg_dict
+ def put_message(self, mbox, uid, message, notify_on_disk=True):
+ """
+ Put an existing message.
- cdocs = message.cdocs
+ :param mbox: the mailbox
+ :type mbox: basestring
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a to be added
+ :type message: MessageWrapper
+ :param notify_on_disk:
+ :type notify_on_disk: bool
- dirty = key in self._dirty
- new = key in self._new
+ :return: a Deferred. if notify_on_disk is True, will be fired
+ when written to the db on disk.
+ Otherwise will fire inmediately
+ :rtype: Deferred
+ """
+ key = mbox, uid
+
+ d = defer.Deferred()
+ d.addCallback(lambda result: log.msg("message save: %s" % result))
+
+ self._dirty.add(key)
+ self._dirty_deferreds[key] = d
+ self._add_message(mbox, uid, message, notify_on_disk)
+ return d
- # XXX should capture this in log...
+ def _add_message(self, mbox, uid, message, notify_on_disk=True):
+ # XXX have to differentiate between notify_new and notify_dirty
+
+ key = mbox, uid
+ msg_dict = message.as_dict()
+ print "ADDING MESSAGE..."
+ import pprint; pprint.pprint(msg_dict)
+
+ # XXX use the enum as keys
+
+ try:
+ store = self._msg_store[key]
+ except KeyError:
+ self._msg_store[key] = {'fdoc': {},
+ 'hdoc': {},
+ 'cdocs': {},
+ 'docs_id': {}}
+ store = self._msg_store[key]
+
+ print "In store (before):"
+ import pprint; pprint.pprint(store)
+
+ #self._msg_store[key] = msg_dict
+ fdoc = msg_dict.get('fdoc', None)
+ if fdoc:
+ if not store.get('fdoc', None):
+ store['fdoc'] = ReferenciableDict({})
+ store['fdoc'].update(fdoc)
+
+ # content-hash indexing
+ chash = fdoc.get(fields.CONTENT_HASH_KEY)
+ chash_fdoc_store = self._chash_fdoc_store
+ if not chash in chash_fdoc_store:
+ chash_fdoc_store[chash] = {}
+
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ store['fdoc'])
+
+ hdoc = msg_dict.get('hdoc', None)
+ if hdoc:
+ if not store.get('hdoc', None):
+ store['hdoc'] = ReferenciableDict({})
+ store['hdoc'].update(hdoc)
+
+ docs_id = msg_dict.get('docs_id', None)
+ if docs_id:
+ if not store.get('docs_id', None):
+ store['docs_id'] = {}
+ store['docs_id'].update(docs_id)
+ cdocs = message.cdocs
for cdoc_key in cdocs.keys():
- print "saving cdoc"
- cdoc = self._msg_store[key]['cdocs'][cdoc_key]
+ if not store.get('cdocs', None):
+ store['cdocs'] = {}
- # FIXME this should be done in the MessageWrapper constructor
- # instead...
+ cdoc = cdocs[cdoc_key]
# first we make it weak-referenciable
referenciable_cdoc = ReferenciableDict(cdoc)
- self._msg_store[key]['cdocs'][cdoc_key] = MessagePartDoc(
- new=new, dirty=dirty, store="mem",
- part=MessagePartType.cdoc,
- content=referenciable_cdoc)
+ store['cdocs'][cdoc_key] = referenciable_cdoc
phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
if not phash:
continue
self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
- def put_message(self, mbox, uid, msg):
- """
- Put an existing message.
- """
- return NotImplementedError()
+ def prune(seq, store):
+ for key in seq:
+ if key in store and empty(store.get(key)):
+ store.pop(key)
+
+ prune(('fdoc', 'hdoc', 'cdocs', 'docs_id'), store)
+ #import ipdb; ipdb.set_trace()
+
+
+ print "after appending to store: ", key
+ import pprint; pprint.pprint(self._msg_store[key])
def get_message(self, mbox, uid):
"""
@@ -203,7 +317,13 @@ class MemoryStore(object):
"""
Remove a Message from this MemoryStore.
"""
- raise NotImplementedError()
+ try:
+ key = mbox, uid
+ self._new.discard(key)
+ self._dirty.discard(key)
+ self._msg_store.pop(key, None)
+ except Exception as exc:
+ logger.exception(exc)
# IMessageStoreWriter
@@ -211,12 +331,15 @@ class MemoryStore(object):
"""
Write the message documents in this MemoryStore to a different store.
"""
- # XXX pass if it's writing (ie, the queue is not empty...)
- # See how to make the writing_flag aware of the queue state...
- print "writing messages to producer..."
+ # For now, we pass if the queue is not empty, to avoid duplication.
+ # We would better use a flag to know when we've already enqueued an
+ # item.
+ if not self.producer.is_queue_empty():
+ return
+ print "Writing messages to Soledad..."
with set_bool_flag(self, self.WRITING_FLAG):
- for msg_wrapper in self.all_msg_iter():
+ for msg_wrapper in self.all_new_dirty_msg_iter():
self.producer.push(msg_wrapper)
# MemoryStore specific methods.
@@ -247,12 +370,14 @@ class MemoryStore(object):
"""
return len(self._new)
- def get_by_phash(self, phash):
+ def get_cdoc_from_phash(self, phash):
"""
Return a content-document by its payload-hash.
"""
doc = self._phash_store.get(phash, None)
+ # XXX return None for consistency?
+
# XXX have to keep a mapping between phash and its linkage
# info, to know if this payload is been already saved or not.
# We will be able to get this from the linkage-docs,
@@ -262,7 +387,40 @@ class MemoryStore(object):
return MessagePartDoc(
new=new, dirty=dirty, store="mem",
part=MessagePartType.cdoc,
- content=doc)
+ content=doc,
+ doc_id=None)
+
+ def get_fdoc_from_chash(self, chash, mbox):
+ """
+ Return a flags-document by its content-hash and a given mailbox.
+
+ :return: MessagePartDoc, or None.
+ """
+ docs_dict = self._chash_fdoc_store.get(chash, None)
+ fdoc = docs_dict.get(mbox, None) if docs_dict else None
+
+ print "GETTING FDOC BY CHASH:", fdoc
+
+ # a couple of special cases.
+ # 1. We might have a doc with empty content...
+ if empty(fdoc):
+ return None
+
+ # ...Or the message could exist, but being flagged for deletion.
+ # We want to create a new one in this case.
+ # Hmmm what if the deletion is un-done?? We would end with a
+ # duplicate...
+ if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ return None
+
+ # XXX get flags
+ new = True
+ dirty = False
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.fdoc,
+ content=fdoc,
+ doc_id=None)
def all_msg_iter(self):
"""
@@ -271,6 +429,25 @@ class MemoryStore(object):
return (self.get_message(*key)
for key in sorted(self._msg_store.keys()))
+ def all_new_dirty_msg_iter(self):
+ """
+ Return geneator that iterates through all new and dirty messages.
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys())
+ if key in self._new or key in self._dirty)
+
+ def all_deleted_uid_iter(self, mbox):
+ """
+ Return generator that iterates through the UIDs for all messags
+ with deleted flag in a given mailbox.
+ """
+ all_deleted = (
+ msg['fdoc']['uid'] for msg in self._msg_store.values()
+ if msg.get('fdoc', None)
+ and fields.DELETED_FLAG in msg['fdoc']['flags'])
+ return all_deleted
+
# new, dirty flags
def _get_new_dirty_state(self, key):
@@ -289,9 +466,35 @@ class MemoryStore(object):
"""
Remove the key value from the `new` set.
"""
- print "******************"
- print "UNSETTING NEW FOR: %s" % str(key)
+ print "Unsetting NEW for: %s" % str(key)
self._new.discard(key)
+ deferreds = self._new_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
+
+ def set_dirty(self, key):
+ """
+ Add the key value to the `dirty` set.
+ """
+ self._dirty.add(key)
+
+ def unset_dirty(self, key):
+ """
+ Remove the key value from the `dirty` set.
+ """
+ print "Unsetting DIRTY for: %s" % str(key)
+ self._dirty.discard(key)
+ deferreds = self._dirty_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
@property
def is_writing(self):