summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/memorystore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/memorystore.py')
-rw-r--r--src/leap/mail/imap/memorystore.py961
1 files changed, 961 insertions, 0 deletions
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
new file mode 100644
index 0000000..195cef7
--- /dev/null
+++ b/src/leap/mail/imap/memorystore.py
@@ -0,0 +1,961 @@
+# -*- coding: utf-8 -*-
+# memorystore.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+In-memory transient store for a LEAPIMAPServer.
+"""
+import contextlib
+import logging
+import threading
+import weakref
+
+from collections import defaultdict
+from copy import copy
+
+from twisted.internet import defer
+from twisted.internet.task import LoopingCall
+from twisted.python import log
+from zope.interface import implements
+
+from leap.common.check import leap_assert_type
+from leap.mail import size
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.utils import empty
+from leap.mail.messageflow import MessageProducer
+from leap.mail.imap import interfaces
+from leap.mail.imap.fields import fields
+from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc
+from leap.mail.imap.messageparts import RecentFlagsDoc
+from leap.mail.imap.messageparts import MessageWrapper
+from leap.mail.imap.messageparts import ReferenciableDict
+
+logger = logging.getLogger(__name__)
+
+
+# The default period to do writebacks to the permanent
+# soledad storage, in seconds.
+SOLEDAD_WRITE_PERIOD = 10
+
+
+@contextlib.contextmanager
+def set_bool_flag(obj, att):
+ """
+ Set a boolean flag to True while we're doing our thing.
+ Just to let the world know.
+ """
+ setattr(obj, att, True)
+ try:
+ yield True
+ except RuntimeError as exc:
+ logger.exception(exc)
+ finally:
+ setattr(obj, att, False)
+
+
+class MemoryStore(object):
+ """
+ An in-memory store to where we can write the different parts that
+ we split the messages into and buffer them until we write them to the
+ permanent storage.
+
+ It uses MessageWrapper instances to represent the message-parts, which are
+ indexed by mailbox name and UID.
+
+ It also can be passed a permanent storage as a paremeter (any implementor
+ of IMessageStore, in this case a SoledadStore). In this case, a periodic
+ dump of the messages stored in memory will be done. The period of the
+ writes to the permanent storage is controled by the write_period parameter
+ in the constructor.
+ """
+ implements(interfaces.IMessageStore,
+ interfaces.IMessageStoreWriter)
+
+ # TODO We will want to index by chash when we transition to local-only
+ # UIDs.
+
+ WRITING_FLAG = "_writing"
+ _last_uid_lock = threading.Lock()
+
+ def __init__(self, permanent_store=None,
+ write_period=SOLEDAD_WRITE_PERIOD):
+ """
+ Initialize a MemoryStore.
+
+ :param permanent_store: a IMessageStore implementor to dump
+ messages to.
+ :type permanent_store: IMessageStore
+ :param write_period: the interval to dump messages to disk, in seconds.
+ :type write_period: int
+ """
+ self._permanent_store = permanent_store
+ self._write_period = write_period
+
+ # Internal Storage: messages
+ self._msg_store = {}
+
+ # Internal Storage: payload-hash
+ """
+ {'phash': weakreaf.proxy(dict)}
+ """
+ self._phash_store = {}
+
+ # Internal Storage: content-hash:fdoc
+ """
+ chash-fdoc-store keeps references to
+ the flag-documents indexed by content-hash.
+
+ {'chash': {'mbox-a': weakref.proxy(dict),
+ 'mbox-b': weakref.proxy(dict)}
+ }
+ """
+ self._chash_fdoc_store = {}
+
+ # Internal Storage: recent-flags store
+ """
+ recent-flags store keeps one dict per mailbox,
+ with the document-id of the u1db document
+ and the set of the UIDs that have the recent flag.
+
+ {'mbox-a': {'doc_id': 'deadbeef',
+ 'set': {1,2,3,4}
+ }
+ }
+ """
+ # TODO this will have to transition to content-hash
+ # indexes after we move to local-only UIDs.
+
+ self._rflags_store = defaultdict(
+ lambda: {'doc_id': None, 'set': set([])})
+
+ """
+ last-uid store keeps the count of the highest UID
+ per mailbox.
+
+ {'mbox-a': 42,
+ 'mbox-b': 23}
+ """
+ self._last_uid = {}
+
+ """
+ known-uids keeps a count of the uids that soledad knows for a given
+ mailbox
+
+ {'mbox-a': set([1,2,3])}
+ """
+ self._known_uids = defaultdict(set)
+
+ # New and dirty flags, to set MessageWrapper State.
+ self._new = set([])
+ self._new_deferreds = {}
+ self._dirty = set([])
+ self._rflags_dirty = set([])
+ self._dirty_deferreds = {}
+
+ # Flag for signaling we're busy writing to the disk storage.
+ setattr(self, self.WRITING_FLAG, False)
+
+ if self._permanent_store is not None:
+ # this producer spits its messages to the permanent store
+ # consumer using a queue. We will use that to put
+ # our messages to be written.
+ self.producer = MessageProducer(permanent_store,
+ period=0.1)
+ # looping call for dumping to SoledadStore
+ self._write_loop = LoopingCall(self.write_messages,
+ permanent_store)
+
+ # We can start the write loop right now, why wait?
+ self._start_write_loop()
+
+ def _start_write_loop(self):
+ """
+ Start loop for writing to disk database.
+ """
+ if not self._write_loop.running:
+ self._write_loop.start(self._write_period, now=True)
+
+ def _stop_write_loop(self):
+ """
+ Stop loop for writing to disk database.
+ """
+ if self._write_loop.running:
+ self._write_loop.stop()
+
+ # IMessageStore
+
+ # XXX this would work well for whole message operations.
+ # We would have to add a put_flags operation to modify only
+ # the flags doc (and set the dirty flag accordingly)
+
+ def create_message(self, mbox, uid, message, observer,
+ notify_on_disk=True):
+ """
+ Create the passed message into this MemoryStore.
+
+ By default we consider that any message is a new message.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a message to be added
+ :type message: MessageWrapper
+ :param observer: the deferred that will fire with the
+ UID of the message. If notify_on_disk is True,
+ this will happen when the message is written to
+ Soledad. Otherwise it will fire as soon as we've
+ added the message to the memory store.
+ :type observer: Deferred
+ :param notify_on_disk: whether the `observer` deferred should
+ wait until the message is written to disk to
+ be fired.
+ :type notify_on_disk: bool
+ """
+ log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
+ key = mbox, uid
+
+ self._add_message(mbox, uid, message, notify_on_disk)
+ self._new.add(key)
+
+ def log_add(result):
+ log.msg("message save: %s" % result)
+ return result
+ observer.addCallback(log_add)
+
+ if notify_on_disk:
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
+ # TODO this should fire with the UID !!! -- change that in
+ # the soledad store code.
+ self._new_deferreds[key] = observer
+ if not notify_on_disk:
+ # Caller does not care, just fired and forgot, so we pass
+ # a defer that will inmediately have its callback triggered.
+ observer.callback(uid)
+
+ def put_message(self, mbox, uid, message, notify_on_disk=True):
+ """
+ Put an existing message.
+
+ This will set the dirty flag on the MemoryStore.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the UID for the message
+ :type uid: int
+ :param message: a message to be added
+ :type message: MessageWrapper
+ :param notify_on_disk: whether the deferred that is returned should
+ wait until the message is written to disk to
+ be fired.
+ :type notify_on_disk: bool
+
+ :return: a Deferred. if notify_on_disk is True, will be fired
+ when written to the db on disk.
+ Otherwise will fire inmediately
+ :rtype: Deferred
+ """
+ key = mbox, uid
+ d = defer.Deferred()
+ d.addCallback(lambda result: log.msg("message PUT save: %s" % result))
+
+ self._dirty.add(key)
+ self._dirty_deferreds[key] = d
+ self._add_message(mbox, uid, message, notify_on_disk)
+ return d
+
+ def _add_message(self, mbox, uid, message, notify_on_disk=True):
+ """
+ Helper method, called by both create_message and put_message.
+ See those for parameter documentation.
+ """
+ # XXX have to differentiate between notify_new and notify_dirty
+ # TODO defaultdict the hell outa here...
+
+ key = mbox, uid
+ msg_dict = message.as_dict()
+
+ FDOC = MessagePartType.fdoc.key
+ HDOC = MessagePartType.hdoc.key
+ CDOCS = MessagePartType.cdocs.key
+ DOCS_ID = MessagePartType.docs_id.key
+
+ try:
+ store = self._msg_store[key]
+ except KeyError:
+ self._msg_store[key] = {FDOC: {},
+ HDOC: {},
+ CDOCS: {},
+ DOCS_ID: {}}
+ store = self._msg_store[key]
+
+ fdoc = msg_dict.get(FDOC, None)
+ if fdoc:
+ if not store.get(FDOC, None):
+ store[FDOC] = ReferenciableDict({})
+ store[FDOC].update(fdoc)
+
+ # content-hash indexing
+ chash = fdoc.get(fields.CONTENT_HASH_KEY)
+ chash_fdoc_store = self._chash_fdoc_store
+ if not chash in chash_fdoc_store:
+ chash_fdoc_store[chash] = {}
+
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ store[FDOC])
+
+ hdoc = msg_dict.get(HDOC, None)
+ if hdoc is not None:
+ if not store.get(HDOC, None):
+ store[HDOC] = ReferenciableDict({})
+ store[HDOC].update(hdoc)
+
+ docs_id = msg_dict.get(DOCS_ID, None)
+ if docs_id:
+ if not store.get(DOCS_ID, None):
+ store[DOCS_ID] = {}
+ store[DOCS_ID].update(docs_id)
+
+ cdocs = message.cdocs
+ for cdoc_key in cdocs.keys():
+ if not store.get(CDOCS, None):
+ store[CDOCS] = {}
+
+ cdoc = cdocs[cdoc_key]
+ # first we make it weak-referenciable
+ referenciable_cdoc = ReferenciableDict(cdoc)
+ store[CDOCS][cdoc_key] = referenciable_cdoc
+ phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
+ if not phash:
+ continue
+ self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
+
+ def prune(seq, store):
+ for key in seq:
+ if key in store and empty(store.get(key)):
+ store.pop(key)
+ prune((FDOC, HDOC, CDOCS, DOCS_ID), store)
+
+ def get_docid_for_fdoc(self, mbox, uid):
+ """
+ Return Soledad document id for the flags-doc for a given mbox and uid,
+ or None of no flags document could be found.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ :rtype: unicode or None
+ """
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if empty(fdoc):
+ return None
+ doc_id = fdoc.doc_id
+ return doc_id
+
+ def get_message(self, mbox, uid, flags_only=False):
+ """
+ Get a MessageWrapper for the given mbox and uid combination.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ :param flags_only: whether the message should carry only a reference
+ to the flags document.
+ :type flags_only: bool
+
+ :return: MessageWrapper or None
+ """
+ key = mbox, uid
+ FDOC = MessagePartType.fdoc.key
+
+ msg_dict = self._msg_store.get(key, None)
+ if empty(msg_dict):
+ return None
+ new, dirty = self._get_new_dirty_state(key)
+ if flags_only:
+ return MessageWrapper(fdoc=msg_dict[FDOC],
+ new=new, dirty=dirty,
+ memstore=weakref.proxy(self))
+ else:
+ return MessageWrapper(from_dict=msg_dict,
+ new=new, dirty=dirty,
+ memstore=weakref.proxy(self))
+
+ def remove_message(self, mbox, uid):
+ """
+ Remove a Message from this MemoryStore.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ """
+ # XXX For the moment we are only removing the flags and headers
+ # docs. The rest we leave there polluting your hard disk,
+ # until we think about a good way of deorphaning.
+
+ # XXX implement elijah's idea of using a PUT document as a
+ # token to ensure consistency in the removal.
+
+ try:
+ key = mbox, uid
+ self._new.discard(key)
+ self._dirty.discard(key)
+ self._msg_store.pop(key, None)
+ except Exception as exc:
+ logger.exception(exc)
+
+ # IMessageStoreWriter
+
+ def write_messages(self, store):
+ """
+ Write the message documents in this MemoryStore to a different store.
+
+ :param store: the IMessageStore to write to
+ """
+ # For now, we pass if the queue is not empty, to avoid duplicate
+ # queuing.
+ # We would better use a flag to know when we've already enqueued an
+ # item.
+
+ # XXX this could return the deferred for all the enqueued operations
+
+ if not self.producer.is_queue_empty():
+ return
+
+ if any(map(lambda i: not empty(i), (self._new, self._dirty))):
+ logger.info("Writing messages to Soledad...")
+
+ # TODO change for lock, and make the property access
+ # is accquired
+ with set_bool_flag(self, self.WRITING_FLAG):
+ for rflags_doc_wrapper in self.all_rdocs_iter():
+ self.producer.push(rflags_doc_wrapper)
+ for msg_wrapper in self.all_new_dirty_msg_iter():
+ self.producer.push(msg_wrapper)
+
+ # MemoryStore specific methods.
+
+ def get_uids(self, mbox):
+ """
+ Get all uids for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: list
+ """
+ all_keys = self._msg_store.keys()
+ return [uid for m, uid in all_keys if m == mbox]
+
+ def get_soledad_known_uids(self, mbox):
+ """
+ Get all uids that soledad knows about, from the memory cache.
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: list
+ """
+ return self._known_uids.get(mbox, [])
+
+ # last_uid
+
+ def get_last_uid(self, mbox):
+ """
+ Return the highest UID for a given mbox.
+ It will be the highest between the highest uid in the message store for
+ the mailbox, and the soledad integer cache.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: int
+ """
+ uids = self.get_uids(mbox)
+ last_mem_uid = uids and max(uids) or 0
+ last_soledad_uid = self.get_last_soledad_uid(mbox)
+ return max(last_mem_uid, last_soledad_uid)
+
+ def get_last_soledad_uid(self, mbox):
+ """
+ Get last uid for a given mbox from the soledad integer cache.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ return self._last_uid.get(mbox, 0)
+
+ def set_last_soledad_uid(self, mbox, value):
+ """
+ Set last uid for a given mbox in the soledad integer cache.
+ SoledadMailbox should prime this value during initialization.
+ Other methods (during message adding) SHOULD call
+ `increment_last_soledad_uid` instead.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ logger.info("setting last soledad uid for %s to %s" %
+ (mbox, value))
+ # if we already have a value here, don't do anything
+ with self._last_uid_lock:
+ if not self._last_uid.get(mbox, None):
+ self._last_uid[mbox] = value
+
+ def set_known_uids(self, mbox, value):
+ """
+ Set the value fo the known-uids set for this mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: a sequence of integers to be added to the set.
+ :type value: tuple
+ """
+ current = self._known_uids[mbox]
+ self._known_uids[mbox] = current.union(set(value))
+
+ def increment_last_soledad_uid(self, mbox):
+ """
+ Increment by one the soledad integer cache for the last_uid for
+ this mbox, and fire a defer-to-thread to update the soledad value.
+ The caller should lock the call tho this method.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ with self._last_uid_lock:
+ self._last_uid[mbox] += 1
+ value = self._last_uid[mbox]
+ self.write_last_uid(mbox, value)
+ return value
+
+ @deferred_to_thread
+ def write_last_uid(self, mbox, value):
+ """
+ Increment the soledad integer cache for the highest uid value.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ if self._permanent_store:
+ self._permanent_store.write_last_uid(mbox, value)
+
+ # Counting sheeps...
+
+ def count_new_mbox(self, mbox):
+ """
+ Count the new messages by inbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: number of new messages
+ :rtype: int
+ """
+ return len([(m, uid) for m, uid in self._new if mbox == mbox])
+
+ # XXX used at all?
+ def count_new(self):
+ """
+ Count all the new messages in the MemoryStore.
+
+ :rtype: int
+ """
+ return len(self._new)
+
+ def get_cdoc_from_phash(self, phash):
+ """
+ Return a content-document by its payload-hash.
+
+ :param phash: the payload hash to check against
+ :type phash: str or unicode
+ :rtype: MessagePartDoc
+ """
+ doc = self._phash_store.get(phash, None)
+
+ # XXX return None for consistency?
+
+ # XXX have to keep a mapping between phash and its linkage
+ # info, to know if this payload is been already saved or not.
+ # We will be able to get this from the linkage-docs,
+ # not yet implemented.
+ new = True
+ dirty = False
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.cdoc,
+ content=doc,
+ doc_id=None)
+
+ def get_fdoc_from_chash(self, chash, mbox):
+ """
+ Return a flags-document by its content-hash and a given mailbox.
+ Used during content-duplication detection while copying or adding a
+ message.
+
+ :param chash: the content hash to check against
+ :type chash: str or unicode
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+
+ :return: MessagePartDoc. It will return None if the flags document
+ has empty content or it is flagged as \\Deleted.
+ """
+ docs_dict = self._chash_fdoc_store.get(chash, None)
+ fdoc = docs_dict.get(mbox, None) if docs_dict else None
+
+ # a couple of special cases.
+ # 1. We might have a doc with empty content...
+ if empty(fdoc):
+ return None
+
+ # 2. ...Or the message could exist, but being flagged for deletion.
+ # We want to create a new one in this case.
+ # Hmmm what if the deletion is un-done?? We would end with a
+ # duplicate...
+ if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ return None
+
+ uid = fdoc[fields.UID_KEY]
+ key = mbox, uid
+ new = key in self._new
+ dirty = key in self._dirty
+ return MessagePartDoc(
+ new=new, dirty=dirty, store="mem",
+ part=MessagePartType.fdoc,
+ content=fdoc,
+ doc_id=None)
+
+ def all_msg_iter(self):
+ """
+ Return generator that iterates through all messages in the store.
+
+ :return: generator of MessageWrappers
+ :rtype: generator
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys()))
+
+ def all_new_dirty_msg_iter(self):
+ """
+ Return generator that iterates through all new and dirty messages.
+
+ :return: generator of MessageWrappers
+ :rtype: generator
+ """
+ return (self.get_message(*key)
+ for key in sorted(self._msg_store.keys())
+ if key in self._new or key in self._dirty)
+
+ def all_msg_dict_for_mbox(self, mbox):
+ """
+ Return all the message dicts for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: list of dictionaries
+ :rtype: list
+ """
+ # This *needs* to return a fixed sequence. Otherwise the dictionary len
+ # will change during iteration, when we modify it
+ return [self._msg_store[(mb, uid)]
+ for mb, uid in self._msg_store if mb == mbox]
+
+ def all_deleted_uid_iter(self, mbox):
+ """
+ Return a list with the UIDs for all messags
+ with deleted flag in a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: list of integers
+ :rtype: list
+ """
+ # This *needs* to return a fixed sequence. Otherwise the dictionary len
+ # will change during iteration, when we modify it
+ all_deleted = [
+ msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)
+ if msg.get('fdoc', None)
+ and fields.DELETED_FLAG in msg['fdoc']['flags']]
+ return all_deleted
+
+ # new, dirty flags
+
+ def _get_new_dirty_state(self, key):
+ """
+ Return `new` and `dirty` flags for a given message.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ :return: tuple of bools
+ :rtype: tuple
+ """
+ # XXX should return *first* the news, and *then* the dirty...
+ return map(lambda _set: key in _set, (self._new, self._dirty))
+
+ def set_new(self, key):
+ """
+ Add the key value to the `new` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._new.add(key)
+
+ def unset_new(self, key):
+ """
+ Remove the key value from the `new` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._new.discard(key)
+ deferreds = self._new_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
+
+ def set_dirty(self, key):
+ """
+ Add the key value to the `dirty` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._dirty.add(key)
+
+ def unset_dirty(self, key):
+ """
+ Remove the key value from the `dirty` set.
+
+ :param key: the key for the message, in the form mbox, uid
+ :type key: tuple
+ """
+ self._dirty.discard(key)
+ deferreds = self._dirty_deferreds
+ d = deferreds.get(key, None)
+ if d:
+ # XXX use a namedtuple for passing the result
+ # when we check it in the other side.
+ d.callback('%s, ok' % str(key))
+ deferreds.pop(key)
+
+ # Recent Flags
+
+ # TODO --- nice but unused
+ def set_recent_flag(self, mbox, uid):
+ """
+ Set the `Recent` flag for a given mailbox and UID.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ """
+ self._rflags_dirty.add(mbox)
+ self._rflags_store[mbox]['set'].add(uid)
+
+ # TODO --- nice but unused
+ def unset_recent_flag(self, mbox, uid):
+ """
+ Unset the `Recent` flag for a given mailbox and UID.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the message UID
+ :type uid: int
+ """
+ self._rflags_store[mbox]['set'].discard(uid)
+
+ def set_recent_flags(self, mbox, value):
+ """
+ Set the value for the set of the recent flags.
+ Used from the property in the MessageCollection.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: a sequence of flags to set
+ :type value: sequence
+ """
+ self._rflags_dirty.add(mbox)
+ self._rflags_store[mbox]['set'] = set(value)
+
+ def load_recent_flags(self, mbox, flags_doc):
+ """
+ Load the passed flags document in the recent flags store, for a given
+ mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param flags_doc: A dictionary containing the `doc_id` of the Soledad
+ flags-document for this mailbox, and the `set`
+ of uids marked with that flag.
+ """
+ self._rflags_store[mbox] = flags_doc
+
+ def get_recent_flags(self, mbox):
+ """
+ Return the set of UIDs with the `Recent` flag for this mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: set, or None
+ """
+ rflag_for_mbox = self._rflags_store.get(mbox, None)
+ if not rflag_for_mbox:
+ return None
+ return self._rflags_store[mbox]['set']
+
+ def all_rdocs_iter(self):
+ """
+ Return an iterator through all in-memory recent flag dicts, wrapped
+ under a RecentFlagsDoc namedtuple.
+ Used for saving to disk.
+
+ :return: a generator of RecentFlagDoc
+ :rtype: generator
+ """
+ # XXX use enums
+ DOC_ID = "doc_id"
+ SET = "set"
+
+ rflags_store = self._rflags_store
+
+ def get_rdoc(mbox, rdict):
+ mbox_rflag_set = rdict[SET]
+ recent_set = copy(mbox_rflag_set)
+ # zero it!
+ mbox_rflag_set.difference_update(mbox_rflag_set)
+ return RecentFlagsDoc(
+ doc_id=rflags_store[mbox][DOC_ID],
+ content={
+ fields.TYPE_KEY: fields.TYPE_RECENT_VAL,
+ fields.MBOX_KEY: mbox,
+ fields.RECENTFLAGS_KEY: list(recent_set)
+ })
+
+ return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items()
+ if not empty(rdict[SET]))
+
+ # Methods that mirror the IMailbox interface
+
+ def remove_all_deleted(self, mbox):
+ """
+ Remove all messages flagged \\Deleted from this Memory Store only.
+ Called from `expunge`
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: a list of UIDs
+ :rtype: list
+ """
+ mem_deleted = self.all_deleted_uid_iter(mbox)
+ for uid in mem_deleted:
+ self.remove_message(mbox, uid)
+ return mem_deleted
+
+ def expunge(self, mbox, observer):
+ """
+ Remove all messages flagged \\Deleted, from the Memory Store
+ and from the permanent store also.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param observer: a deferred that will be fired when expunge is done
+ :type observer: Deferred
+ :return: a list of UIDs
+ :rtype: list
+ """
+ # TODO expunge should add itself as a callback to the ongoing
+ # writes.
+ soledad_store = self._permanent_store
+ all_deleted = []
+
+ try:
+ # 1. Stop the writing call
+ self._stop_write_loop()
+ # 2. Enqueue a last write.
+ #self.write_messages(soledad_store)
+ # 3. Should wait on the writebacks to finish ???
+ # FIXME wait for this, and add all the rest of the method
+ # as a callback!!!
+ except Exception as exc:
+ logger.exception(exc)
+
+ # Now, we...:
+
+ try:
+ # 1. Delete all messages marked as deleted in soledad.
+
+ # XXX this could be deferred for faster operation.
+ if soledad_store:
+ sol_deleted = soledad_store.remove_all_deleted(mbox)
+ else:
+ sol_deleted = []
+
+ try:
+ self._known_uids[mbox].difference_update(set(sol_deleted))
+ except Exception as exc:
+ logger.exception(exc)
+
+ # 2. Delete all messages marked as deleted in memory.
+ mem_deleted = self.remove_all_deleted(mbox)
+
+ all_deleted = set(mem_deleted).union(set(sol_deleted))
+ logger.debug("deleted %r" % all_deleted)
+ except Exception as exc:
+ logger.exception(exc)
+ finally:
+ self._start_write_loop()
+ observer.callback(True)
+ return all_deleted
+
+ # Dump-to-disk controls.
+
+ @property
+ def is_writing(self):
+ """
+ Property that returns whether the store is currently writing its
+ internal state to a permanent storage.
+
+ Used to evaluate whether the CHECK command can inform that the field
+ is clear to proceed, or waiting for the write operations to complete
+ is needed instead.
+
+ :rtype: bool
+ """
+ # FIXME this should return a deferred !!!
+ # XXX ----- can fire when all new + dirty deferreds
+ # are done (gatherResults)
+ return getattr(self, self.WRITING_FLAG)
+
+ # Memory management.
+
+ def get_size(self):
+ """
+ Return the size of the internal storage.
+ Use for calculating the limit beyond which we should flush the store.
+
+ :rtype: int
+ """
+ return size.get_size(self._msg_store)