summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/memorystore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r--src/leap/mail/imap/memorystore.py581
1 files changed, 429 insertions, 152 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index ed2b3f2..f23a234 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -25,6 +25,7 @@ import weakref
from collections import defaultdict
from copy import copy
+from enum import Enum
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.python import log
@@ -32,8 +33,7 @@ from zope.interface import implements
from leap.common.check import leap_assert_type
from leap.mail import size
-from leap.mail.decorators import deferred_to_thread
-from leap.mail.utils import empty
+from leap.mail.utils import empty, phash_iter
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
@@ -42,12 +42,19 @@ from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import ReferenciableDict
+from leap.mail.decorators import deferred_to_thread
+
logger = logging.getLogger(__name__)
# The default period to do writebacks to the permanent
# soledad storage, in seconds.
-SOLEDAD_WRITE_PERIOD = 10
+SOLEDAD_WRITE_PERIOD = 15
+
+FDOC = MessagePartType.fdoc.key
+HDOC = MessagePartType.hdoc.key
+CDOCS = MessagePartType.cdocs.key
+DOCS_ID = MessagePartType.docs_id.key
@contextlib.contextmanager
@@ -65,6 +72,9 @@ def set_bool_flag(obj, att):
setattr(obj, att, False)
+DirtyState = Enum("none", "dirty", "new")
+
+
class MemoryStore(object):
"""
An in-memory store to where we can write the different parts that
@@ -88,6 +98,7 @@ class MemoryStore(object):
WRITING_FLAG = "_writing"
_last_uid_lock = threading.Lock()
+ _fdoc_docid_lock = threading.Lock()
def __init__(self, permanent_store=None,
write_period=SOLEDAD_WRITE_PERIOD):
@@ -100,11 +111,19 @@ class MemoryStore(object):
:param write_period: the interval to dump messages to disk, in seconds.
:type write_period: int
"""
+ from twisted.internet import reactor
+ self.reactor = reactor
+
self._permanent_store = permanent_store
self._write_period = write_period
# Internal Storage: messages
- self._msg_store = {}
+ """
+ flags document store.
+ _fdoc_store[mbox][uid] = { 'content': 'aaa' }
+ """
+ self._fdoc_store = defaultdict(lambda: defaultdict(
+ lambda: ReferenciableDict({})))
# Sizes
"""
@@ -114,9 +133,28 @@ class MemoryStore(object):
# Internal Storage: payload-hash
"""
- {'phash': weakreaf.proxy(dict)}
+ fdocs:doc-id store, stores document IDs for putting
+ the dirty flags-docs.
+ """
+ self._fdoc_id_store = defaultdict(lambda: defaultdict(
+ lambda: ''))
+
+ # Internal Storage: content-hash:hdoc
+ """
+ hdoc-store keeps references to
+ the header-documents indexed by content-hash.
+
+ {'chash': { dict-stuff }
+ }
+ """
+ self._hdoc_store = defaultdict(lambda: ReferenciableDict({}))
+
+ # Internal Storage: payload-hash:cdoc
+ """
+ content-docs stored by payload-hash
+ {'phash': { dict-stuff } }
"""
- self._phash_store = {}
+ self._cdoc_store = defaultdict(lambda: ReferenciableDict({}))
# Internal Storage: content-hash:fdoc
"""
@@ -127,7 +165,7 @@ class MemoryStore(object):
'mbox-b': weakref.proxy(dict)}
}
"""
- self._chash_fdoc_store = {}
+ self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None))
# Internal Storage: recent-flags store
"""
@@ -153,7 +191,7 @@ class MemoryStore(object):
{'mbox-a': 42,
'mbox-b': 23}
"""
- self._last_uid = {}
+ self._last_uid = defaultdict(lambda: 0)
"""
known-uids keeps a count of the uids that soledad knows for a given
@@ -165,11 +203,15 @@ class MemoryStore(object):
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
+ self._new_queue = set([])
self._new_deferreds = {}
+
self._dirty = set([])
- self._rflags_dirty = set([])
+ self._dirty_queue = set([])
self._dirty_deferreds = {}
+ self._rflags_dirty = set([])
+
# Flag for signaling we're busy writing to the disk storage.
setattr(self, self.WRITING_FLAG, False)
@@ -185,11 +227,17 @@ class MemoryStore(object):
# We can start the write loop right now, why wait?
self._start_write_loop()
+ else:
+ # We have a memory-only store.
+ self.producer = None
+ self._write_loop = None
def _start_write_loop(self):
"""
Start loop for writing to disk database.
"""
+ if self._write_loop is None:
+ return
if not self._write_loop.running:
self._write_loop.start(self._write_period, now=True)
@@ -197,6 +245,8 @@ class MemoryStore(object):
"""
Stop loop for writing to disk database.
"""
+ if self._write_loop is None:
+ return
if self._write_loop.running:
self._write_loop.stop()
@@ -230,34 +280,30 @@ class MemoryStore(object):
be fired.
:type notify_on_disk: bool
"""
- log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
+ log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid))
key = mbox, uid
self._add_message(mbox, uid, message, notify_on_disk)
self._new.add(key)
- # XXX use this while debugging the callback firing,
- # remove after unittesting this.
- #def log_add(result):
- #return result
- #observer.addCallback(log_add)
-
- if notify_on_disk:
- # We store this deferred so we can keep track of the pending
- # operations internally.
- # TODO this should fire with the UID !!! -- change that in
- # the soledad store code.
- self._new_deferreds[key] = observer
- if not notify_on_disk:
- # Caller does not care, just fired and forgot, so we pass
- # a defer that will inmediately have its callback triggered.
- observer.callback(uid)
+ if observer is not None:
+ if notify_on_disk:
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
+ # TODO this should fire with the UID !!! -- change that in
+ # the soledad store code.
+ self._new_deferreds[key] = observer
+
+ else:
+ # Caller does not care, just fired and forgot, so we pass
+ # a defer that will inmediately have its callback triggered.
+ self.reactor.callFromThread(observer.callback, uid)
def put_message(self, mbox, uid, message, notify_on_disk=True):
"""
Put an existing message.
- This will set the dirty flag on the MemoryStore.
+ This will also set the dirty flag on the MemoryStore.
:param mbox: the mailbox
:type mbox: str or unicode
@@ -289,76 +335,59 @@ class MemoryStore(object):
Helper method, called by both create_message and put_message.
See those for parameter documentation.
"""
- # XXX have to differentiate between notify_new and notify_dirty
- # TODO defaultdict the hell outa here...
-
- key = mbox, uid
msg_dict = message.as_dict()
- FDOC = MessagePartType.fdoc.key
- HDOC = MessagePartType.hdoc.key
- CDOCS = MessagePartType.cdocs.key
- DOCS_ID = MessagePartType.docs_id.key
-
- try:
- store = self._msg_store[key]
- except KeyError:
- self._msg_store[key] = {FDOC: {},
- HDOC: {},
- CDOCS: {},
- DOCS_ID: {}}
- store = self._msg_store[key]
-
fdoc = msg_dict.get(FDOC, None)
- if fdoc:
- if not store.get(FDOC, None):
- store[FDOC] = ReferenciableDict({})
- store[FDOC].update(fdoc)
+ if fdoc is not None:
+ fdoc_store = self._fdoc_store[mbox][uid]
+ fdoc_store.update(fdoc)
+ chash_fdoc_store = self._chash_fdoc_store
# content-hash indexing
chash = fdoc.get(fields.CONTENT_HASH_KEY)
- chash_fdoc_store = self._chash_fdoc_store
- if not chash in chash_fdoc_store:
- chash_fdoc_store[chash] = {}
-
chash_fdoc_store[chash][mbox] = weakref.proxy(
- store[FDOC])
+ self._fdoc_store[mbox][uid])
hdoc = msg_dict.get(HDOC, None)
if hdoc is not None:
- if not store.get(HDOC, None):
- store[HDOC] = ReferenciableDict({})
- store[HDOC].update(hdoc)
-
- docs_id = msg_dict.get(DOCS_ID, None)
- if docs_id:
- if not store.get(DOCS_ID, None):
- store[DOCS_ID] = {}
- store[DOCS_ID].update(docs_id)
+ chash = hdoc.get(fields.CONTENT_HASH_KEY)
+ hdoc_store = self._hdoc_store[chash]
+ hdoc_store.update(hdoc)
cdocs = message.cdocs
- for cdoc_key in cdocs.keys():
- if not store.get(CDOCS, None):
- store[CDOCS] = {}
-
- cdoc = cdocs[cdoc_key]
- # first we make it weak-referenciable
- referenciable_cdoc = ReferenciableDict(cdoc)
- store[CDOCS][cdoc_key] = referenciable_cdoc
+ for cdoc in cdocs.values():
phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
if not phash:
continue
- self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
+ cdoc_store = self._cdoc_store[phash]
+ cdoc_store.update(cdoc)
- def prune(seq, store):
- for key in seq:
- if key in store and empty(store.get(key)):
- store.pop(key)
+ # Update memory store size
+ # XXX this should use [mbox][uid]
+ key = mbox, uid
+ self._sizes[key] = size.get_size(self._fdoc_store[key])
+ # TODO add hdoc and cdocs sizes too
- prune((FDOC, HDOC, CDOCS, DOCS_ID), store)
+ def purge_fdoc_store(self, mbox):
+ """
+ Purge the empty documents from a fdoc store.
+ Called during initialization of the SoledadMailbox
- # Update memory store size
- self._sizes[key] = size(self._msg_store[key])
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ # XXX This is really a workaround until I find the conditions
+ # that are making the empty items remain there.
+ # This happens, for instance, after running several times
+ # the regression test, that issues a store deleted + expunge + select
+ # The items are being correclty deleted, but in succesive appends
+ # the empty items with previously deleted uids reappear as empty
+ # documents. I suspect it's a timing condition with a previously
+ # evaluated sequence being used after the items has been removed.
+
+ for uid, value in self._fdoc_store[mbox].items():
+ if empty(value):
+ del self._fdoc_store[mbox][uid]
def get_docid_for_fdoc(self, mbox, uid):
"""
@@ -371,13 +400,20 @@ class MemoryStore(object):
:type uid: int
:rtype: unicode or None
"""
- fdoc = self._permanent_store.get_flags_doc(mbox, uid)
- if empty(fdoc):
- return None
- doc_id = fdoc.doc_id
+ with self._fdoc_docid_lock:
+ doc_id = self._fdoc_id_store[mbox][uid]
+
+ if empty(doc_id):
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if empty(fdoc) or empty(fdoc.content):
+ return None
+ doc_id = fdoc.doc_id
+ self._fdoc_id_store[mbox][uid] = doc_id
+
return doc_id
- def get_message(self, mbox, uid, flags_only=False):
+ def get_message(self, mbox, uid, dirtystate=DirtyState.none,
+ flags_only=False):
"""
Get a MessageWrapper for the given mbox and uid combination.
@@ -385,25 +421,58 @@ class MemoryStore(object):
:type mbox: str or unicode
:param uid: the message UID
:type uid: int
+ :param dirtystate: DirtyState enum: one of `dirty`, `new`
+ or `none` (default)
+ :type dirtystate: enum
:param flags_only: whether the message should carry only a reference
to the flags document.
:type flags_only: bool
+ :
:return: MessageWrapper or None
"""
+ if dirtystate == DirtyState.dirty:
+ flags_only = True
+
key = mbox, uid
- FDOC = MessagePartType.fdoc.key
- msg_dict = self._msg_store.get(key, None)
- if empty(msg_dict):
+ fdoc = self._fdoc_store[mbox][uid]
+ if empty(fdoc):
return None
- new, dirty = self._get_new_dirty_state(key)
+
+ new, dirty = False, False
+ if dirtystate == DirtyState.none:
+ new, dirty = self._get_new_dirty_state(key)
+ if dirtystate == DirtyState.dirty:
+ new, dirty = False, True
+ if dirtystate == DirtyState.new:
+ new, dirty = True, False
+
if flags_only:
- return MessageWrapper(fdoc=msg_dict[FDOC],
+ return MessageWrapper(fdoc=fdoc,
new=new, dirty=dirty,
memstore=weakref.proxy(self))
else:
- return MessageWrapper(from_dict=msg_dict,
+ chash = fdoc.get(fields.CONTENT_HASH_KEY)
+ hdoc = self._hdoc_store[chash]
+ if empty(hdoc):
+ hdoc = self._permanent_store.get_headers_doc(chash)
+ if empty(hdoc):
+ return None
+ if not empty(hdoc.content):
+ self._hdoc_store[chash] = hdoc.content
+ hdoc = hdoc.content
+ cdocs = None
+
+ pmap = hdoc.get(fields.PARTS_MAP_KEY, None)
+ if new and pmap is not None:
+ # take the different cdocs for write...
+ cdoc_store = self._cdoc_store
+ cdocs_list = phash_iter(hdoc)
+ cdocs = dict(enumerate(
+ [cdoc_store[phash] for phash in cdocs_list], 1))
+
+ return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs,
new=new, dirty=dirty,
memstore=weakref.proxy(self))
@@ -424,23 +493,36 @@ class MemoryStore(object):
# token to ensure consistency in the removal.
try:
+ del self._fdoc_store[mbox][uid]
+ except KeyError:
+ pass
+
+ try:
key = mbox, uid
self._new.discard(key)
self._dirty.discard(key)
- self._msg_store.pop(key, None)
if key in self._sizes:
del self._sizes[key]
-
+ self._known_uids[mbox].discard(uid)
+ except Exception as exc:
+ logger.error("error while removing message!")
+ logger.exception(exc)
+ try:
+ with self._fdoc_docid_lock:
+ del self._fdoc_id_store[mbox][uid]
except Exception as exc:
+ logger.error("error while removing message!")
logger.exception(exc)
# IMessageStoreWriter
+ @deferred_to_thread
def write_messages(self, store):
"""
Write the message documents in this MemoryStore to a different store.
:param store: the IMessageStore to write to
+ :rtype: False if queue is not empty, None otherwise.
"""
# For now, we pass if the queue is not empty, to avoid duplicate
# queuing.
@@ -450,7 +532,7 @@ class MemoryStore(object):
# XXX this could return the deferred for all the enqueued operations
if not self.producer.is_queue_empty():
- return
+ return False
if any(map(lambda i: not empty(i), (self._new, self._dirty))):
logger.info("Writing messages to Soledad...")
@@ -459,9 +541,14 @@ class MemoryStore(object):
# is accquired
with set_bool_flag(self, self.WRITING_FLAG):
for rflags_doc_wrapper in self.all_rdocs_iter():
- self.producer.push(rflags_doc_wrapper)
- for msg_wrapper in self.all_new_dirty_msg_iter():
- self.producer.push(msg_wrapper)
+ self.producer.push(rflags_doc_wrapper,
+ state=self.producer.STATE_DIRTY)
+ for msg_wrapper in self.all_new_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_NEW)
+ for msg_wrapper in self.all_dirty_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_DIRTY)
# MemoryStore specific methods.
@@ -473,8 +560,7 @@ class MemoryStore(object):
:type mbox: str or unicode
:rtype: list
"""
- all_keys = self._msg_store.keys()
- return [uid for m, uid in all_keys if m == mbox]
+ return self._fdoc_store[mbox].keys()
def get_soledad_known_uids(self, mbox):
"""
@@ -523,7 +609,8 @@ class MemoryStore(object):
:param value: the value to set
:type value: int
"""
- leap_assert_type(value, int)
+ # can be long???
+ #leap_assert_type(value, int)
logger.info("setting last soledad uid for %s to %s" %
(mbox, value))
# if we already have a value here, don't do anything
@@ -555,10 +642,9 @@ class MemoryStore(object):
with self._last_uid_lock:
self._last_uid[mbox] += 1
value = self._last_uid[mbox]
- self.write_last_uid(mbox, value)
+ self.reactor.callInThread(self.write_last_uid, mbox, value)
return value
- @deferred_to_thread
def write_last_uid(self, mbox, value):
"""
Increment the soledad integer cache for the highest uid value.
@@ -572,11 +658,112 @@ class MemoryStore(object):
if self._permanent_store:
self._permanent_store.write_last_uid(mbox, value)
+ def load_flag_docs(self, mbox, flag_docs):
+ """
+ Load the flag documents for the given mbox.
+ Used during initial flag docs prefetch.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param flag_docs: a dict with the content for the flag docs, indexed
+ by uid.
+ :type flag_docs: dict
+ """
+ # We can do direct assignments cause we know this will only
+ # be called during initialization of the mailbox.
+ # TODO could hook here a sanity-check
+ # for duplicates
+
+ fdoc_store = self._fdoc_store[mbox]
+ chash_fdoc_store = self._chash_fdoc_store
+ for uid in flag_docs:
+ rdict = ReferenciableDict(flag_docs[uid])
+ fdoc_store[uid] = rdict
+ # populate chash dict too, to avoid fdoc duplication
+ chash = flag_docs[uid]["chash"]
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ self._fdoc_store[mbox][uid])
+
+ def update_flags(self, mbox, uid, fdoc):
+ """
+ Update the flag document for a given mbox and uid combination,
+ and set the dirty flag.
+ We could use put_message, but this is faster.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the uid of the message
+ :type uid: int
+
+ :param fdoc: a dict with the content for the flag docs
+ :type fdoc: dict
+ """
+ key = mbox, uid
+ self._fdoc_store[mbox][uid].update(fdoc)
+ self._dirty.add(key)
+
+ def load_header_docs(self, header_docs):
+ """
+ Load the flag documents for the given mbox.
+ Used during header docs prefetch, and during cache after
+ a read from soledad if the hdoc property in message did not
+ find its value in here.
+
+ :param flag_docs: a dict with the content for the flag docs.
+ :type flag_docs: dict
+ """
+ hdoc_store = self._hdoc_store
+ for chash in header_docs:
+ hdoc_store[chash] = ReferenciableDict(header_docs[chash])
+
+ def all_flags(self, mbox):
+ """
+ Return a dictionary with all the flags for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: dict
+ """
+ flags_dict = {}
+ uids = self.get_uids(mbox)
+ fdoc_store = self._fdoc_store[mbox]
+
+ for uid in uids:
+ try:
+ flags = fdoc_store[uid][fields.FLAGS_KEY]
+ flags_dict[uid] = flags
+ except KeyError:
+ continue
+ return flags_dict
+
+ def all_headers(self, mbox):
+ """
+ Return a dictionary with all the header docs for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: dict
+ """
+ headers_dict = {}
+ uids = self.get_uids(mbox)
+ fdoc_store = self._fdoc_store[mbox]
+ hdoc_store = self._hdoc_store
+
+ for uid in uids:
+ try:
+ chash = fdoc_store[uid][fields.CONTENT_HASH_KEY]
+ hdoc = hdoc_store[chash]
+ if not empty(hdoc):
+ headers_dict[uid] = hdoc
+ except KeyError:
+ continue
+ return headers_dict
+
# Counting sheeps...
def count_new_mbox(self, mbox):
"""
- Count the new messages by inbox.
+ Count the new messages by mailbox.
:param mbox: the mailbox
:type mbox: str or unicode
@@ -594,6 +781,33 @@ class MemoryStore(object):
"""
return len(self._new)
+ def count(self, mbox):
+ """
+ Return the count of messages for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: number of messages
+ :rtype: int
+ """
+ return len(self._fdoc_store[mbox])
+
+ def unseen_iter(self, mbox):
+ """
+ Get an iterator for the message UIDs with no `seen` flag
+ for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: iterator through unseen message doc UIDs
+ :rtype: iterable
+ """
+ fdocs = self._fdoc_store[mbox]
+
+ return [uid for uid, value
+ in fdocs.items()
+ if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])]
+
def get_cdoc_from_phash(self, phash):
"""
Return a content-document by its payload-hash.
@@ -602,7 +816,7 @@ class MemoryStore(object):
:type phash: str or unicode
:rtype: MessagePartDoc
"""
- doc = self._phash_store.get(phash, None)
+ doc = self._cdoc_store.get(phash, None)
# XXX return None for consistency?
@@ -632,8 +846,7 @@ class MemoryStore(object):
:return: MessagePartDoc. It will return None if the flags document
has empty content or it is flagged as \\Deleted.
"""
- docs_dict = self._chash_fdoc_store.get(chash, None)
- fdoc = docs_dict.get(mbox, None) if docs_dict else None
+ fdoc = self._chash_fdoc_store[chash][mbox]
# a couple of special cases.
# 1. We might have a doc with empty content...
@@ -644,53 +857,61 @@ class MemoryStore(object):
# We want to create a new one in this case.
# Hmmm what if the deletion is un-done?? We would end with a
# duplicate...
- if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []):
return None
uid = fdoc[fields.UID_KEY]
key = mbox, uid
new = key in self._new
dirty = key in self._dirty
+
return MessagePartDoc(
new=new, dirty=dirty, store="mem",
part=MessagePartType.fdoc,
content=fdoc,
doc_id=None)
- def all_msg_iter(self):
+ def iter_fdoc_keys(self):
"""
- Return generator that iterates through all messages in the store.
-
- :return: generator of MessageWrappers
- :rtype: generator
+ Return a generator through all the mbox, uid keys in the flags-doc
+ store.
"""
- return (self.get_message(*key)
- for key in sorted(self._msg_store.keys()))
+ fdoc_store = self._fdoc_store
+ for mbox in fdoc_store:
+ for uid in fdoc_store[mbox]:
+ yield mbox, uid
- def all_new_dirty_msg_iter(self):
+ def all_new_msg_iter(self):
"""
- Return generator that iterates through all new and dirty messages.
+ Return generator that iterates through all new messages.
:return: generator of MessageWrappers
:rtype: generator
"""
- return (self.get_message(*key)
- for key in sorted(self._msg_store.keys())
- if key in self._new or key in self._dirty)
+ gm = self.get_message
+ # need to freeze, set can change during iteration
+ new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)]
+ # move content from new set to the queue
+ self._new_queue.update(self._new)
+ self._new.difference_update(self._new)
+ return new
- def all_msg_dict_for_mbox(self, mbox):
+ def all_dirty_msg_iter(self):
"""
- Return all the message dicts for a given mbox.
+ Return generator that iterates through all dirty messages.
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: list of dictionaries
- :rtype: list
+ :return: generator of MessageWrappers
+ :rtype: generator
"""
- # This *needs* to return a fixed sequence. Otherwise the dictionary len
- # will change during iteration, when we modify it
- return [self._msg_store[(mb, uid)]
- for mb, uid in self._msg_store if mb == mbox]
+ gm = self.get_message
+ # need to freeze, set can change during iteration
+ dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty)
+ for key in tuple(self._dirty)]
+ # move content from new and dirty sets to the queue
+
+ self._dirty_queue.update(self._dirty)
+ self._dirty.difference_update(self._dirty)
+ return dirty
def all_deleted_uid_iter(self, mbox):
"""
@@ -704,11 +925,10 @@ class MemoryStore(object):
"""
# This *needs* to return a fixed sequence. Otherwise the dictionary len
# will change during iteration, when we modify it
- all_deleted = [
- msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)
- if msg.get('fdoc', None)
- and fields.DELETED_FLAG in msg['fdoc']['flags']]
- return all_deleted
+ fdocs = self._fdoc_store[mbox]
+ return [uid for uid, value
+ in fdocs.items()
+ if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])]
# new, dirty flags
@@ -721,26 +941,30 @@ class MemoryStore(object):
:return: tuple of bools
:rtype: tuple
"""
+ # TODO change indexing of sets to [mbox][key] too.
# XXX should return *first* the news, and *then* the dirty...
+
+ # TODO should query in queues too , true?
+ #
return map(lambda _set: key in _set, (self._new, self._dirty))
- def set_new(self, key):
+ def set_new_queued(self, key):
"""
- Add the key value to the `new` set.
+ Add the key value to the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.add(key)
+ self._new_queue.add(key)
- def unset_new(self, key):
+ def unset_new_queued(self, key):
"""
- Remove the key value from the `new` set.
+ Remove the key value from the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.discard(key)
+ self._new_queue.discard(key)
deferreds = self._new_deferreds
d = deferreds.get(key, None)
if d:
@@ -749,23 +973,23 @@ class MemoryStore(object):
d.callback('%s, ok' % str(key))
deferreds.pop(key)
- def set_dirty(self, key):
+ def set_dirty_queued(self, key):
"""
- Add the key value to the `dirty` set.
+ Add the key value to the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.add(key)
+ self._dirty_queue.add(key)
- def unset_dirty(self, key):
+ def unset_dirty_queued(self, key):
"""
- Remove the key value from the `dirty` set.
+ Remove the key value from the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.discard(key)
+ self._dirty_queue.discard(key)
deferreds = self._dirty_deferreds
d = deferreds.get(key, None)
if d:
@@ -776,7 +1000,6 @@ class MemoryStore(object):
# Recent Flags
- # TODO --- nice but unused
def set_recent_flag(self, mbox, uid):
"""
Set the `Recent` flag for a given mailbox and UID.
@@ -894,6 +1117,8 @@ class MemoryStore(object):
"""
self._stop_write_loop()
if self._permanent_store is not None:
+ # XXX we should check if we did get a True value on this
+ # operation. If we got False we should retry! (queue was not empty)
self.write_messages(self._permanent_store)
self.producer.flush()
@@ -911,10 +1136,18 @@ class MemoryStore(object):
:type observer: Deferred
"""
soledad_store = self._permanent_store
+ if soledad_store is None:
+ # just-in memory store, easy then.
+ self._delete_from_memory(mbox, observer)
+ return
+
+ # We have a soledad storage.
try:
# Stop and trigger last write
self.stop_and_flush()
# Wait on the writebacks to finish
+
+ # XXX what if pending deferreds is empty?
pending_deferreds = (self._new_deferreds.get(mbox, []) +
self._dirty_deferreds.get(mbox, []))
d1 = defer.gatherResults(pending_deferreds, consumeErrors=True)
@@ -923,6 +1156,18 @@ class MemoryStore(object):
except Exception as exc:
logger.exception(exc)
+ def _delete_from_memory(self, mbox, observer):
+ """
+ Remove all messages marked as deleted from soledad and memory.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param observer: a deferred that will be fired when expunge is done
+ :type observer: Deferred
+ """
+ mem_deleted = self.remove_all_deleted(mbox)
+ observer.callback(mem_deleted)
+
def _delete_from_soledad_and_memory(self, result, mbox, observer):
"""
Remove all messages marked as deleted from soledad and memory.
@@ -939,12 +1184,8 @@ class MemoryStore(object):
try:
# 1. Delete all messages marked as deleted in soledad.
-
- # XXX this could be deferred for faster operation.
- if soledad_store:
- sol_deleted = soledad_store.remove_all_deleted(mbox)
- else:
- sol_deleted = []
+ logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,))
+ sol_deleted = soledad_store.remove_all_deleted(mbox)
try:
self._known_uids[mbox].difference_update(set(sol_deleted))
@@ -952,6 +1193,7 @@ class MemoryStore(object):
logger.exception(exc)
# 2. Delete all messages marked as deleted in memory.
+ logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,))
mem_deleted = self.remove_all_deleted(mbox)
all_deleted = set(mem_deleted).union(set(sol_deleted))
@@ -960,8 +1202,43 @@ class MemoryStore(object):
logger.exception(exc)
finally:
self._start_write_loop()
+
observer.callback(all_deleted)
+ # Mailbox documents and attributes
+
+ # This could be also be cached in memstore, but proxying directly
+ # to soledad since it's not too performance-critical.
+
+ def get_mbox_doc(self, mbox):
+ """
+ Return the soledad document for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: SoledadDocument or None.
+ """
+ return self.permanent_store.get_mbox_document(mbox)
+
+ def get_mbox_closed(self, mbox):
+ """
+ Return the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: bool
+ """
+ return self.permanent_store.get_mbox_closed(mbox)
+
+ def set_mbox_closed(self, mbox, closed):
+ """
+ Set the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ self.permanent_store.set_mbox_closed(mbox, closed)
+
# Dump-to-disk controls.
@property