summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/memorystore.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-27 16:11:53 -0400
committerKali Kaneko <kali@leap.se>2014-01-28 19:38:46 -0400
commitf5365ae0c2edb8b3e879f876f2f7e42b25f4616a (patch)
tree6f9787f65e89d720739b99d7feefc30138ea890f /src/leap/mail/imap/memorystore.py
parenta5508429b90e2e9b58c5d073610ee5a10274663f (diff)
handle last_uid property in memory store
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r--src/leap/mail/imap/memorystore.py236
1 files changed, 197 insertions, 39 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 232a2fb..60e98c7 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -19,16 +19,20 @@ In-memory transient store for a LEAPIMAPServer.
"""
import contextlib
import logging
+import threading
import weakref
from collections import defaultdict
+from copy import copy
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.python import log
from zope.interface import implements
+from leap.common.check import leap_assert_type
from leap.mail import size
+from leap.mail.decorators import deferred
from leap.mail.utils import empty
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
@@ -40,7 +44,10 @@ from leap.mail.imap.messageparts import ReferenciableDict
logger = logging.getLogger(__name__)
-SOLEDAD_WRITE_PERIOD = 20
+
+# The default period to do writebacks to the permanent
+# soledad storage, in seconds.
+SOLEDAD_WRITE_PERIOD = 10
@contextlib.contextmanager
@@ -76,16 +83,11 @@ class MemoryStore(object):
implements(interfaces.IMessageStore,
interfaces.IMessageStoreWriter)
- producer = None
-
# TODO We will want to index by chash when we transition to local-only
# UIDs.
- # TODO should store RECENT-FLAGS too
- # TODO should store HDOCSET too (use weakrefs!) -- will need to subclass
- # TODO do use dirty flag (maybe use namedtuples for that) so we can use it
- # also as a read-cache.
WRITING_FLAG = "_writing"
+ _last_uid_lock = threading.Lock()
def __init__(self, permanent_store=None,
write_period=SOLEDAD_WRITE_PERIOD):
@@ -138,17 +140,20 @@ class MemoryStore(object):
self._rflags_store = defaultdict(
lambda: {'doc_id': None, 'set': set([])})
- # TODO ----------------- implement mailbox-level flags store too?
- # XXX maybe we don't need this anymore...
- # let's see how good does it prefetch the headers if
- # we cache them in the store.
- self._hdocset_store = {}
- # --------------------------------------------------------------
+ """
+ last-uid store keeps the count of the highest UID
+ per mailbox.
+
+ {'mbox-a': 42,
+ 'mbox-b': 23}
+ """
+ self._last_uid = {}
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
self._new_deferreds = {}
self._dirty = set([])
+ self._rflags_dirty = set([])
self._dirty_deferreds = {}
# Flag for signaling we're busy writing to the disk storage.
@@ -210,14 +215,25 @@ class MemoryStore(object):
print "adding new doc to memstore %s (%s)" % (mbox, uid)
key = mbox, uid
+ self._add_message(mbox, uid, message, notify_on_disk)
+
d = defer.Deferred()
d.addCallback(lambda result: log.msg("message save: %s" % result))
-
self._new.add(key)
+
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
self._new_deferreds[key] = d
- self._add_message(mbox, uid, message, notify_on_disk)
- print "create message: ", d
- return d
+
+ if notify_on_disk:
+ # Caller wants to be notified when the message is on disk
+ # so we pass the deferred that will be fired when the message
+ # has been written.
+ return d
+ else:
+ # Caller does not care, just fired and forgot, so we pass
+ # a defer that will inmediately have its callback triggered.
+ return defer.succeed('fire-and-forget:%s' % str(key))
def put_message(self, mbox, uid, message, notify_on_disk=True):
"""
@@ -238,13 +254,14 @@ class MemoryStore(object):
:rtype: Deferred
"""
key = mbox, uid
-
d = defer.Deferred()
- d.addCallback(lambda result: log.msg("message save: %s" % result))
+ d.addCallback(lambda result: log.msg("message PUT save: %s" % result))
self._dirty.add(key)
self._dirty_deferreds[key] = d
self._add_message(mbox, uid, message, notify_on_disk)
+ #print "dirty ", self._dirty
+ #print "new ", self._new
return d
def _add_message(self, mbox, uid, message, notify_on_disk=True):
@@ -315,6 +332,19 @@ class MemoryStore(object):
store.pop(key)
prune((FDOC, HDOC, CDOCS, DOCS_ID), store)
+ #print "after adding: "
+ #import pprint; pprint.pprint(self._msg_store[key])
+
+ def get_docid_for_fdoc(self, mbox, uid):
+ """
+ Get Soledad document id for the flags-doc for a given mbox and uid.
+ """
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if not fdoc:
+ return None
+ doc_id = fdoc.doc_id
+ return doc_id
+
def get_message(self, mbox, uid):
"""
Get a MessageWrapper for the given mbox and uid combination.
@@ -326,6 +356,8 @@ class MemoryStore(object):
if msg_dict:
new, dirty = self._get_new_dirty_state(key)
return MessageWrapper(from_dict=msg_dict,
+ new=new,
+ dirty=dirty,
memstore=weakref.proxy(self))
else:
return None
@@ -334,6 +366,13 @@ class MemoryStore(object):
"""
Remove a Message from this MemoryStore.
"""
+ # XXX For the moment we are only removing the flags and headers
+ # docs. The rest we leave there polluting your hard disk,
+ # until we think about a good way of deorphaning.
+
+ # XXX implement elijah's idea of using a PUT document as a
+ # token to ensure consistency in the removal.
+
try:
key = mbox, uid
self._new.discard(key)
@@ -348,18 +387,22 @@ class MemoryStore(object):
"""
Write the message documents in this MemoryStore to a different store.
"""
- # For now, we pass if the queue is not empty, to avoid duplication.
+ # For now, we pass if the queue is not empty, to avoid duplicate
+ # queuing.
# We would better use a flag to know when we've already enqueued an
# item.
+
+ # XXX this could return the deferred for all the enqueued operations
+
if not self.producer.is_queue_empty():
return
print "Writing messages to Soledad..."
with set_bool_flag(self, self.WRITING_FLAG):
- for msg_wrapper in self.all_new_dirty_msg_iter():
- self.producer.push(msg_wrapper)
for rflags_doc_wrapper in self.all_rdocs_iter():
self.producer.push(rflags_doc_wrapper)
+ for msg_wrapper in self.all_new_dirty_msg_iter():
+ self.producer.push(msg_wrapper)
# MemoryStore specific methods.
@@ -370,12 +413,61 @@ class MemoryStore(object):
all_keys = self._msg_store.keys()
return [uid for m, uid in all_keys if m == mbox]
+ # last_uid
+
def get_last_uid(self, mbox):
"""
Get the highest UID for a given mbox.
+ It will be the highest between the highest uid in the message store for
+ the mailbox, and the soledad integer cache.
"""
uids = self.get_uids(mbox)
- return uids and max(uids) or 0
+ last_mem_uid = uids and max(uids) or 0
+ last_soledad_uid = self.get_last_soledad_uid(mbox)
+ return max(last_mem_uid, last_soledad_uid)
+
+ def get_last_soledad_uid(self, mbox):
+ """
+ Get last uid for a given mbox from the soledad integer cache.
+ """
+ return self._last_uid.get(mbox, 0)
+
+ def set_last_soledad_uid(self, mbox, value):
+ """
+ Set last uid for a given mbox in the soledad integer cache.
+ SoledadMailbox should prime this value during initialization.
+ Other methods (during message adding) SHOULD call
+ `increment_last_soledad_uid` instead.
+ """
+ leap_assert_type(value, int)
+ print "setting last soledad uid for ", mbox, "to", value
+ # if we already have a vlue here, don't do anything
+ with self._last_uid_lock:
+ if not self._last_uid.get(mbox, None):
+ self._last_uid[mbox] = value
+
+ def increment_last_soledad_uid(self, mbox):
+ """
+ Increment by one the soledad integer cache for the last_uid for
+ this mbox, and fire a defer-to-thread to update the soledad value.
+ The caller should lock the call tho this method.
+ """
+ with self._last_uid_lock:
+ self._last_uid[mbox] += 1
+ value = self._last_uid[mbox]
+ self.write_last_uid(mbox, value)
+ return value
+
+ @deferred
+ def write_last_uid(self, mbox, value):
+ """
+ Increment the soledad cache,
+ """
+ leap_assert_type(value, int)
+ if self._permanent_store:
+ self._permanent_store.write_last_uid(mbox, value)
+
+ # Counting sheeps...
def count_new_mbox(self, mbox):
"""
@@ -418,14 +510,12 @@ class MemoryStore(object):
docs_dict = self._chash_fdoc_store.get(chash, None)
fdoc = docs_dict.get(mbox, None) if docs_dict else None
- print "GETTING FDOC BY CHASH:", fdoc
-
# a couple of special cases.
# 1. We might have a doc with empty content...
if empty(fdoc):
return None
- # ...Or the message could exist, but being flagged for deletion.
+ # 2. ...Or the message could exist, but being flagged for deletion.
# We want to create a new one in this case.
# Hmmm what if the deletion is un-done?? We would end with a
# duplicate...
@@ -456,15 +546,22 @@ class MemoryStore(object):
for key in sorted(self._msg_store.keys())
if key in self._new or key in self._dirty)
+ def all_msg_dict_for_mbox(self, mbox):
+ """
+ Return all the message dicts for a given mbox.
+ """
+ return [self._msg_store[(mb, uid)]
+ for mb, uid in self._msg_store if mb == mbox]
+
def all_deleted_uid_iter(self, mbox):
"""
Return generator that iterates through the UIDs for all messags
with deleted flag in a given mailbox.
"""
- all_deleted = (
- msg['fdoc']['uid'] for msg in self._msg_store.values()
+ all_deleted = [
+ msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)
if msg.get('fdoc', None)
- and fields.DELETED_FLAG in msg['fdoc']['flags'])
+ and fields.DELETED_FLAG in msg['fdoc']['flags']]
return all_deleted
# new, dirty flags
@@ -473,6 +570,7 @@ class MemoryStore(object):
"""
Return `new` and `dirty` flags for a given message.
"""
+ # XXX should return *first* the news, and *then* the dirty...
return map(lambda _set: key in _set, (self._new, self._dirty))
def set_new(self, key):
@@ -485,7 +583,7 @@ class MemoryStore(object):
"""
Remove the key value from the `new` set.
"""
- print "Unsetting NEW for: %s" % str(key)
+ #print "Unsetting NEW for: %s" % str(key)
self._new.discard(key)
deferreds = self._new_deferreds
d = deferreds.get(key, None)
@@ -505,7 +603,7 @@ class MemoryStore(object):
"""
Remove the key value from the `dirty` set.
"""
- print "Unsetting DIRTY for: %s" % str(key)
+ #print "Unsetting DIRTY for: %s" % str(key)
self._dirty.discard(key)
deferreds = self._dirty_deferreds
d = deferreds.get(key, None)
@@ -522,6 +620,7 @@ class MemoryStore(object):
"""
Set the `Recent` flag for a given mailbox and UID.
"""
+ self._rflags_dirty.add(mbox)
self._rflags_store[mbox]['set'].add(uid)
# TODO --- nice but unused
@@ -536,6 +635,7 @@ class MemoryStore(object):
Set the value for the set of the recent flags.
Used from the property in the MessageCollection.
"""
+ self._rflags_dirty.add(mbox)
self._rflags_store[mbox]['set'] = set(value)
def load_recent_flags(self, mbox, flags_doc):
@@ -568,23 +668,81 @@ class MemoryStore(object):
:rtype: generator
"""
- rflags_store = self._rflags_store
-
# XXX use enums
DOC_ID = "doc_id"
SET = "set"
- print "LEN RFLAGS_STORE ------->", len(rflags_store)
- return (
- RecentFlagsDoc(
+ rflags_store = self._rflags_store
+
+ def get_rdoc(mbox, rdict):
+ mbox_rflag_set = rdict[SET]
+ recent_set = copy(mbox_rflag_set)
+ # zero it!
+ mbox_rflag_set.difference_update(mbox_rflag_set)
+ return RecentFlagsDoc(
doc_id=rflags_store[mbox][DOC_ID],
content={
fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
fields.MBOX_KEY: mbox,
- fields.RECENTFLAGS_KEY: list(
- rflags_store[mbox][SET])
+ fields.RECENTFLAGS_KEY: list(recent_set)
})
- for mbox in rflags_store)
+
+ return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items()
+ if not empty(rdict[SET]))
+
+ # Methods that mirror the IMailbox interface
+
+ def remove_all_deleted(self, mbox):
+ """
+ Remove all messages flagged \\Deleted from this Memory Store only.
+ Called from `expunge`
+ """
+ mem_deleted = self.all_deleted_uid_iter(mbox)
+ for uid in mem_deleted:
+ self.remove_message(mbox, uid)
+ return mem_deleted
+
+ def expunge(self, mbox):
+ """
+ Remove all messages flagged \\Deleted, from the Memory Store
+ and from the permanent store also.
+ """
+ # TODO expunge should add itself as a callback to the ongoing
+ # writes.
+ soledad_store = self._permanent_store
+
+ try:
+ # 1. Stop the writing call
+ self._stop_write_loop()
+ # 2. Enqueue a last write.
+ #self.write_messages(soledad_store)
+ # 3. Should wait on the writebacks to finish ???
+ # FIXME wait for this, and add all the rest of the method
+ # as a callback!!!
+ except Exception as exc:
+ logger.exception(exc)
+
+ # Now, we...:
+
+ try:
+ # 1. Delete all messages marked as deleted in soledad.
+
+ # XXX this could be deferred for faster operation.
+ if soledad_store:
+ sol_deleted = soledad_store.remove_all_deleted(mbox)
+ else:
+ sol_deleted = []
+
+ # 2. Delete all messages marked as deleted in memory.
+ mem_deleted = self.remove_all_deleted(mbox)
+
+ all_deleted = set(mem_deleted).union(set(sol_deleted))
+ print "deleted ", all_deleted
+ except Exception as exc:
+ logger.exception(exc)
+ finally:
+ self._start_write_loop()
+ return all_deleted
# Dump-to-disk controls.