diff options
author | Tomás Touceda <chiiph@leap.se> | 2014-02-17 12:45:19 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2014-02-17 12:45:19 -0300 |
commit | 32e3c5ddaa5df30a573762e273f3a12f7eb3c255 (patch) | |
tree | 0924be918d990628e73eb2059fb6eb1200748b7c | |
parent | 7828c517ae162de4676a71e05f77339598acd6f7 (diff) | |
parent | 985ff0a78a8df0eafb7789383f711b9e5ceb1cb6 (diff) |
Merge remote-tracking branch 'refs/remotes/kali/bug/separate_deferreds_threads' into develop
-rw-r--r-- | src/leap/mail/imap/account.py | 51 | ||||
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 275 | ||||
-rw-r--r-- | src/leap/mail/imap/memorystore.py | 581 | ||||
-rw-r--r-- | src/leap/mail/imap/messageparts.py | 46 | ||||
-rw-r--r-- | src/leap/mail/imap/messages.py | 352 | ||||
-rw-r--r-- | src/leap/mail/imap/server.py | 43 | ||||
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 59 | ||||
-rw-r--r-- | src/leap/mail/imap/soledadstore.py | 369 | ||||
-rwxr-xr-x | src/leap/mail/imap/tests/regressions | 6 | ||||
-rw-r--r-- | src/leap/mail/imap/tests/test_imap.py | 432 | ||||
-rw-r--r-- | src/leap/mail/messageflow.py | 26 | ||||
-rw-r--r-- | src/leap/mail/utils.py | 104 |
12 files changed, 1451 insertions, 893 deletions
diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py index f985c04..1b5d4a0 100644 --- a/src/leap/mail/imap/account.py +++ b/src/leap/mail/imap/account.py @@ -18,9 +18,12 @@ Soledad Backed Account. """ import copy +import logging +import os import time from twisted.mail import imap4 +from twisted.python import log from zope.interface import implements from leap.common.check import leap_assert, leap_assert_type @@ -30,12 +33,27 @@ from leap.mail.imap.parser import MBoxParser from leap.mail.imap.mailbox import SoledadMailbox from leap.soledad.client import Soledad +logger = logging.getLogger(__name__) + +PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False) + +if PROFILE_CMD: + + def _debugProfiling(result, cmdname, start): + took = (time.time() - start) * 1000 + log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec") + return result + ####################################### # Soledad Account ####################################### +# TODO change name to LeapIMAPAccount, since we're using +# the memstore. +# IndexedDB should also not be here anymore. + class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): """ An implementation of IAccount and INamespacePresenteer @@ -67,14 +85,19 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): # XXX SHOULD assert too that the name matches the user/uuid with which # soledad has been initialized. + # XXX ??? why is this parsing mailbox name??? it's account... + # userid? homogenize. self._account_name = self._parse_mailbox_name(account_name) self._soledad = soledad self._memstore = memstore + self.__mailboxes = set([]) + self.initialize_db() # every user should have the right to an inbox folder # at least, so let's make one! + self._load_mailboxes() if not self.mailboxes: self.addMailbox(self.INBOX_NAME) @@ -106,9 +129,13 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): """ A list of the current mailboxes for this account. """ - return [doc.content[self.MBOX_KEY] - for doc in self._soledad.get_from_index( - self.TYPE_IDX, self.MBOX_KEY)] + return self.__mailboxes + + def _load_mailboxes(self): + self.__mailboxes.update( + [doc.content[self.MBOX_KEY] + for doc in self._soledad.get_from_index( + self.TYPE_IDX, self.MBOX_KEY)]) @property def subscriptions(self): @@ -173,6 +200,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): mbox[self.CREATED_KEY] = creation_ts doc = self._soledad.create_doc(mbox) + self._load_mailboxes() return bool(doc) def create(self, pathspec): @@ -203,6 +231,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): except imap4.MailboxCollision: if not pathspec.endswith('/'): return False + self._load_mailboxes() return True def select(self, name, readwrite=1): @@ -215,17 +244,22 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): :param readwrite: 1 for readwrite permissions. :type readwrite: int - :rtype: bool + :rtype: SoledadMailbox """ - name = self._parse_mailbox_name(name) + if PROFILE_CMD: + start = time.time() + name = self._parse_mailbox_name(name) if name not in self.mailboxes: + logger.warning("No such mailbox!") return None - self.selected = name - return SoledadMailbox( + sm = SoledadMailbox( name, self._soledad, self._memstore, readwrite) + if PROFILE_CMD: + _debugProfiling(None, "SELECT", start) + return sm def delete(self, name, force=False): """ @@ -260,6 +294,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): "Hierarchically inferior mailboxes " "exist and \\Noselect is set") mbox.destroy() + self._load_mailboxes() # XXX FIXME --- not honoring the inferior names... @@ -297,6 +332,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): mbox.content[self.MBOX_KEY] = new self._soledad.put_doc(mbox) + self._load_mailboxes() + # XXX ---- FIXME!!!! ------------------------------------ # until here we just renamed the index... # We have to rename also the occurrence of this diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index d8af0a5..57505f0 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -50,6 +50,25 @@ If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid notifying clients of new messages. Use during stress tests. """ NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False) + +if PROFILE_CMD: + import time + + def _debugProfiling(result, cmdname, start): + took = (time.time() - start) * 1000 + log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec") + return result + + def do_profile_cmd(d, name): + """ + Add the profiling debug to the passed callback. + :param d: deferred + :param name: name of the command + :type name: str + """ + d.addCallback(_debugProfiling, name, time.time()) + d.addErrback(lambda f: log.msg(f.getTraceback())) class SoledadMailbox(WithMsgFields, MBoxParser): @@ -89,6 +108,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): _listeners = defaultdict(set) next_uid_lock = threading.Lock() + last_uid_lock = threading.Lock() + + # TODO unify all the `primed` dicts + _fdoc_primed = {} + _last_uid_primed = {} + _known_uids_primed = {} def __init__(self, mbox, soledad, memstore, rw=1): """ @@ -107,6 +132,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param rw: read-and-write flag for this mailbox :type rw: int """ + logger.debug("Initializing mailbox %r" % (mbox,)) leap_assert(mbox, "Need a mailbox name to initialize") leap_assert(soledad, "Need a soledad instance to initialize") @@ -123,12 +149,24 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.messages = MessageCollection( mbox=mbox, soledad=self._soledad, memstore=self._memstore) + # XXX careful with this get/set (it would be + # hitting db unconditionally, move to memstore too) + # Now it's returning a fixed amount of flags from mem + # as a workaround. if not self.getFlags(): self.setFlags(self.INIT_FLAGS) if self._memstore: self.prime_known_uids_to_memstore() self.prime_last_uid_to_memstore() + self.prime_flag_docs_to_memstore() + + from twisted.internet import reactor + self.reactor = reactor + + # purge memstore from empty fdocs. + self._memstore.purge_fdoc_store(mbox) + logger.debug("DONE initializing mailbox %r" % (mbox,)) @property def listeners(self): @@ -170,8 +208,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) - # TODO move completely to soledadstore, under memstore reponsibility. - def _get_mbox(self): + def _get_mbox_doc(self): """ Return mailbox document. @@ -179,14 +216,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): the query failed. :rtype: SoledadDocument or None. """ - try: - query = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_MBOX_VAL, self.mbox) - if query: - return query.pop() - except Exception as exc: - logger.exception("Unhandled error %r" % exc) + return self._memstore.get_mbox_doc(self.mbox) def getFlags(self): """ @@ -195,12 +225,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :returns: tuple of flags for this mailbox :rtype: tuple of str """ - mbox = self._get_mbox() - if not mbox: - return None - flags = mbox.content.get(self.FLAGS_KEY, []) + flags = self.INIT_FLAGS + + # XXX returning fixed flags always + # Since I have not found a case where the client + # wants to modify this, as a way of speeding up + # selects. To do it right, we probably should keep + # track of the set of all flags used by msgs + # in this mailbox. Does it matter? + #mbox = self._get_mbox_doc() + #if not mbox: + #return None + #flags = mbox.content.get(self.FLAGS_KEY, []) return map(str, flags) + # XXX move to memstore->soledadstore def setFlags(self, flags): """ Sets flags for this mailbox. @@ -210,10 +249,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ leap_assert(isinstance(flags, tuple), "flags expected to be a tuple") - mbox = self._get_mbox() + mbox = self._get_mbox_doc() if not mbox: return None mbox.content[self.FLAGS_KEY] = map(str, flags) + logger.debug("Writing mbox document for %r to Soledad" + % (self.mbox,)) self._soledad.put_doc(mbox) # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG. @@ -225,8 +266,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: True if the mailbox is closed :rtype: bool """ - mbox = self._get_mbox() - return mbox.content.get(self.CLOSED_KEY, False) + return self._memstore.get_mbox_closed(self.mbox) def _set_closed(self, closed): """ @@ -235,10 +275,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param closed: the state to be set :type closed: bool """ - leap_assert(isinstance(closed, bool), "closed needs to be boolean") - mbox = self._get_mbox() - mbox.content[self.CLOSED_KEY] = closed - self._soledad.put_doc(mbox) + self._memstore.set_mbox_closed(self.mbox, closed) closed = property( _get_closed, _set_closed, doc="Closed attribute.") @@ -265,10 +302,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ Prime memstore with last_uid value """ - set_exist = set(self.messages.all_uid_iter()) - last = max(set_exist) if set_exist else 0 - logger.info("Priming Soledad last_uid to %s" % (last,)) - self._memstore.set_last_soledad_uid(self.mbox, last) + primed = self._last_uid_primed.get(self.mbox, False) + if not primed: + mbox = self._get_mbox_doc() + last = mbox.content.get('lastuid', 0) + logger.info("Priming Soledad last_uid to %s" % (last,)) + self._memstore.set_last_soledad_uid(self.mbox, last) + self._last_uid_primed[self.mbox] = True def prime_known_uids_to_memstore(self): """ @@ -276,8 +316,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser): We do this to be able to filter the requests efficiently. """ - known_uids = self.messages.all_soledad_uid_iter() - self._memstore.set_known_uids(self.mbox, known_uids) + primed = self._known_uids_primed.get(self.mbox, False) + if not primed: + known_uids = self.messages.all_soledad_uid_iter() + self._memstore.set_known_uids(self.mbox, known_uids) + self._known_uids_primed[self.mbox] = True + + def prime_flag_docs_to_memstore(self): + """ + Prime memstore with all the flags documents. + """ + primed = self._fdoc_primed.get(self.mbox, False) + if not primed: + all_flag_docs = self.messages.get_all_soledad_flag_docs() + self._memstore.load_flag_docs(self.mbox, all_flag_docs) + self._fdoc_primed[self.mbox] = True def getUIDValidity(self): """ @@ -286,7 +339,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: unique validity identifier :rtype: int """ - mbox = self._get_mbox() + mbox = self._get_mbox_doc() return mbox.content.get(self.CREATED_KEY, 1) def getUID(self, message): @@ -420,6 +473,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): flags = tuple(str(flag) for flag in flags) d = self._do_add_message(message, flags=flags, date=date) + if PROFILE_CMD: + do_profile_cmd(d, "APPEND") + # XXX should notify here probably return d def _do_add_message(self, message, flags, date): @@ -428,15 +484,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): Invoked from addMessage. """ d = self.messages.add_msg(message, flags=flags, date=date) - # XXX Removing notify temporarily. - # This is interfering with imaptest results. I'm not clear if it's - # because we clutter the logging or because the set of listeners is - # ever-growing. We should come up with some smart way of dealing with - # it, or maybe just disabling it using an environmental variable since - # we will only have just a few listeners in the regular desktop case. - #d.addCallback(self.notify_new) return d + @deferred_to_thread def notify_new(self, *args): """ Notify of new messages to all the listeners. @@ -447,12 +497,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): return exists = self.getMessageCount() recent = self.getRecentCount() - logger.debug("NOTIFY: there are %s messages, %s recent" % ( - exists, - recent)) + logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( + self.mbox, exists, recent)) for l in self.listeners: - logger.debug('notifying...') l.newMessages(exists, recent) # commands, do not rename methods @@ -471,7 +519,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # we should postpone the removal # XXX move to memory store?? - self._soledad.delete_doc(self._get_mbox()) + self._soledad.delete_doc(self._get_mbox_doc()) def _close_cb(self, result): self.closed = True @@ -527,8 +575,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred_to_thread - #@profile def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. @@ -544,6 +590,27 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise. :type uid: bool + :rtype: deferred + """ + d = defer.Deferred() + self.reactor.callInThread(self._do_fetch, messages_asked, uid, d) + if PROFILE_CMD: + do_profile_cmd(d, "FETCH") + return d + + # called in thread + def _do_fetch(self, messages_asked, uid, d): + """ + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet + + :param uid: If true, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: bool + :param d: deferred whose callback will be called with result. + :type d: Deferred + :rtype: A tuple of two-tuples of message sequence numbers and LeapMessage """ @@ -564,10 +631,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): logger.debug("Getting msg by index: INEFFICIENT call!") raise NotImplementedError else: - result = ((msgid, getmsg(msgid)) for msgid in seq_messg) - return result + got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg) + result = ((msgid, msg) for msgid, msg in got_msg + if msg is not None) + self.reactor.callLater(0, self.unset_recent_flags, seq_messg) + self.reactor.callFromThread(d.callback, result) - @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -606,12 +675,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - all_flags = self.messages.all_flags() + all_flags = self._memstore.all_flags(self.mbox) result = ((msgid, flagsPart( msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -636,6 +704,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): MessagePart. :rtype: tuple """ + # TODO how often is thunderbird doing this? + class headersPart(object): def __init__(self, uid, headers): self.uid = uid @@ -653,10 +723,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - all_chash = self.messages.all_flags_chash() all_headers = self.messages.all_headers() result = ((msgid, headersPart( - msgid, all_headers.get(all_chash.get(msgid, 'nil'), {}))) + msgid, all_headers.get(msgid, {}))) for msgid in seq_messg) return result @@ -699,14 +768,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ - from twisted.internet import reactor if not self.isWriteable(): log.msg('read only mailbox!') raise imap4.ReadOnlyMailbox d = defer.Deferred() - deferLater(reactor, 0, self._do_store, messages_asked, flags, - mode, uid, d) + self.reactor.callLater(0, self._do_store, messages_asked, flags, + mode, uid, d) + if PROFILE_CMD: + do_profile_cmd(d, "STORE") return d def _do_store(self, messages_asked, flags, mode, uid, observer): @@ -721,7 +791,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :type observer: deferred """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag? + # XXX we should prevent client from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) @@ -785,15 +855,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): uid when the copy succeed. :rtype: Deferred """ - from twisted.internet import reactor - d = defer.Deferred() - # XXX this should not happen ... track it down, - # probably to FETCH... - if message is None: - log.msg("BUG: COPY found a None in passed message") - d.callback(None) - deferLater(reactor, 0, self._do_copy, message, d) + if PROFILE_CMD: + do_profile_cmd(d, "COPY") + deferLater(self.reactor, 0, self._do_copy, message, d) return d def _do_copy(self, message, observer): @@ -809,51 +874,70 @@ class SoledadMailbox(WithMsgFields, MBoxParser): UID of the message :type observer: Deferred """ + memstore = self._memstore + + def createCopy(result): + exist, new_fdoc = result + if exist: + # Should we signal error on the callback? + logger.warning("Destination message already exists!") + + # XXX I'm not sure if we should raise the + # errback. This actually rases an ugly warning + # in some muas like thunderbird. + # UID 0 seems a good convention for no uid. + observer.callback(0) + else: + mbox = self.mbox + uid_next = memstore.increment_last_soledad_uid(mbox) + + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = mbox + + flags = list(new_fdoc[self.FLAGS_KEY]) + flags.append(fields.RECENT_FLAG) + new_fdoc[self.FLAGS_KEY] = tuple(set(flags)) + + # FIXME set recent! + + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper(new_fdoc), + observer=observer, + notify_on_disk=False) + + d = self._get_msg_copy(message) + d.addCallback(createCopy) + d.addErrback(lambda f: log.msg(f.getTraceback())) + + @deferred_to_thread + def _get_msg_copy(self, message): + """ + Get a copy of the fdoc for this message, and check whether + it already exists. + + :param message: an IMessage implementor + :type message: LeapMessage + :return: exist, new_fdoc + :rtype: tuple + """ # XXX for clarity, this could be delegated to a # MessageCollection mixin that implements copy too, and # moved out of here. msg = message memstore = self._memstore - # XXX should use a public api instead - fdoc = msg._fdoc - hdoc = msg._hdoc - if not fdoc: + if empty(msg.fdoc): logger.warning("Tried to copy a MSG with no fdoc") return - new_fdoc = copy.deepcopy(fdoc.content) - + new_fdoc = copy.deepcopy(msg.fdoc.content) fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - # XXX is this hitting the db??? --- probably. - # We should profile after the pre-fetch. dest_fdoc = memstore.get_fdoc_from_chash( fdoc_chash, self.mbox) - exist = dest_fdoc and not empty(dest_fdoc.content) - - if exist: - # Should we signal error on the callback? - logger.warning("Destination message already exists!") - - # XXX I'm still not clear if we should raise the - # errback. This actually rases an ugly warning - # in some muas like thunderbird. I guess the user does - # not deserve that. - observer.callback(True) - else: - mbox = self.mbox - uid_next = memstore.increment_last_soledad_uid(mbox) - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = mbox - - # FIXME set recent! - self._memstore.create_message( - self.mbox, uid_next, - MessageWrapper( - new_fdoc, hdoc.content), - observer=observer, - notify_on_disk=False) + exist = not empty(dest_fdoc) + return exist, new_fdoc # convenience fun @@ -865,12 +949,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): for doc in docs: self.messages._soledad.delete_doc(doc) - def unset_recent_flags(self, uids): + def unset_recent_flags(self, uid_seq): """ Unset Recent flag for a sequence of UIDs. """ - seq_messg = self._bound_seq(uids) - self.messages.unset_recent_flags(seq_messg) + self.messages.unset_recent_flags(uid_seq) def __repr__(self): """ diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py index ed2b3f2..f23a234 100644 --- a/src/leap/mail/imap/memorystore.py +++ b/src/leap/mail/imap/memorystore.py @@ -25,6 +25,7 @@ import weakref from collections import defaultdict from copy import copy +from enum import Enum from twisted.internet import defer from twisted.internet.task import LoopingCall from twisted.python import log @@ -32,8 +33,7 @@ from zope.interface import implements from leap.common.check import leap_assert_type from leap.mail import size -from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty +from leap.mail.utils import empty, phash_iter from leap.mail.messageflow import MessageProducer from leap.mail.imap import interfaces from leap.mail.imap.fields import fields @@ -42,12 +42,19 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.messageparts import ReferenciableDict +from leap.mail.decorators import deferred_to_thread + logger = logging.getLogger(__name__) # The default period to do writebacks to the permanent # soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 10 +SOLEDAD_WRITE_PERIOD = 15 + +FDOC = MessagePartType.fdoc.key +HDOC = MessagePartType.hdoc.key +CDOCS = MessagePartType.cdocs.key +DOCS_ID = MessagePartType.docs_id.key @contextlib.contextmanager @@ -65,6 +72,9 @@ def set_bool_flag(obj, att): setattr(obj, att, False) +DirtyState = Enum("none", "dirty", "new") + + class MemoryStore(object): """ An in-memory store to where we can write the different parts that @@ -88,6 +98,7 @@ class MemoryStore(object): WRITING_FLAG = "_writing" _last_uid_lock = threading.Lock() + _fdoc_docid_lock = threading.Lock() def __init__(self, permanent_store=None, write_period=SOLEDAD_WRITE_PERIOD): @@ -100,11 +111,19 @@ class MemoryStore(object): :param write_period: the interval to dump messages to disk, in seconds. :type write_period: int """ + from twisted.internet import reactor + self.reactor = reactor + self._permanent_store = permanent_store self._write_period = write_period # Internal Storage: messages - self._msg_store = {} + """ + flags document store. + _fdoc_store[mbox][uid] = { 'content': 'aaa' } + """ + self._fdoc_store = defaultdict(lambda: defaultdict( + lambda: ReferenciableDict({}))) # Sizes """ @@ -114,9 +133,28 @@ class MemoryStore(object): # Internal Storage: payload-hash """ - {'phash': weakreaf.proxy(dict)} + fdocs:doc-id store, stores document IDs for putting + the dirty flags-docs. + """ + self._fdoc_id_store = defaultdict(lambda: defaultdict( + lambda: '')) + + # Internal Storage: content-hash:hdoc + """ + hdoc-store keeps references to + the header-documents indexed by content-hash. + + {'chash': { dict-stuff } + } + """ + self._hdoc_store = defaultdict(lambda: ReferenciableDict({})) + + # Internal Storage: payload-hash:cdoc + """ + content-docs stored by payload-hash + {'phash': { dict-stuff } } """ - self._phash_store = {} + self._cdoc_store = defaultdict(lambda: ReferenciableDict({})) # Internal Storage: content-hash:fdoc """ @@ -127,7 +165,7 @@ class MemoryStore(object): 'mbox-b': weakref.proxy(dict)} } """ - self._chash_fdoc_store = {} + self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None)) # Internal Storage: recent-flags store """ @@ -153,7 +191,7 @@ class MemoryStore(object): {'mbox-a': 42, 'mbox-b': 23} """ - self._last_uid = {} + self._last_uid = defaultdict(lambda: 0) """ known-uids keeps a count of the uids that soledad knows for a given @@ -165,11 +203,15 @@ class MemoryStore(object): # New and dirty flags, to set MessageWrapper State. self._new = set([]) + self._new_queue = set([]) self._new_deferreds = {} + self._dirty = set([]) - self._rflags_dirty = set([]) + self._dirty_queue = set([]) self._dirty_deferreds = {} + self._rflags_dirty = set([]) + # Flag for signaling we're busy writing to the disk storage. setattr(self, self.WRITING_FLAG, False) @@ -185,11 +227,17 @@ class MemoryStore(object): # We can start the write loop right now, why wait? self._start_write_loop() + else: + # We have a memory-only store. + self.producer = None + self._write_loop = None def _start_write_loop(self): """ Start loop for writing to disk database. """ + if self._write_loop is None: + return if not self._write_loop.running: self._write_loop.start(self._write_period, now=True) @@ -197,6 +245,8 @@ class MemoryStore(object): """ Stop loop for writing to disk database. """ + if self._write_loop is None: + return if self._write_loop.running: self._write_loop.stop() @@ -230,34 +280,30 @@ class MemoryStore(object): be fired. :type notify_on_disk: bool """ - log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) + log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid self._add_message(mbox, uid, message, notify_on_disk) self._new.add(key) - # XXX use this while debugging the callback firing, - # remove after unittesting this. - #def log_add(result): - #return result - #observer.addCallback(log_add) - - if notify_on_disk: - # We store this deferred so we can keep track of the pending - # operations internally. - # TODO this should fire with the UID !!! -- change that in - # the soledad store code. - self._new_deferreds[key] = observer - if not notify_on_disk: - # Caller does not care, just fired and forgot, so we pass - # a defer that will inmediately have its callback triggered. - observer.callback(uid) + if observer is not None: + if notify_on_disk: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + + else: + # Caller does not care, just fired and forgot, so we pass + # a defer that will inmediately have its callback triggered. + self.reactor.callFromThread(observer.callback, uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ Put an existing message. - This will set the dirty flag on the MemoryStore. + This will also set the dirty flag on the MemoryStore. :param mbox: the mailbox :type mbox: str or unicode @@ -289,76 +335,59 @@ class MemoryStore(object): Helper method, called by both create_message and put_message. See those for parameter documentation. """ - # XXX have to differentiate between notify_new and notify_dirty - # TODO defaultdict the hell outa here... - - key = mbox, uid msg_dict = message.as_dict() - FDOC = MessagePartType.fdoc.key - HDOC = MessagePartType.hdoc.key - CDOCS = MessagePartType.cdocs.key - DOCS_ID = MessagePartType.docs_id.key - - try: - store = self._msg_store[key] - except KeyError: - self._msg_store[key] = {FDOC: {}, - HDOC: {}, - CDOCS: {}, - DOCS_ID: {}} - store = self._msg_store[key] - fdoc = msg_dict.get(FDOC, None) - if fdoc: - if not store.get(FDOC, None): - store[FDOC] = ReferenciableDict({}) - store[FDOC].update(fdoc) + if fdoc is not None: + fdoc_store = self._fdoc_store[mbox][uid] + fdoc_store.update(fdoc) + chash_fdoc_store = self._chash_fdoc_store # content-hash indexing chash = fdoc.get(fields.CONTENT_HASH_KEY) - chash_fdoc_store = self._chash_fdoc_store - if not chash in chash_fdoc_store: - chash_fdoc_store[chash] = {} - chash_fdoc_store[chash][mbox] = weakref.proxy( - store[FDOC]) + self._fdoc_store[mbox][uid]) hdoc = msg_dict.get(HDOC, None) if hdoc is not None: - if not store.get(HDOC, None): - store[HDOC] = ReferenciableDict({}) - store[HDOC].update(hdoc) - - docs_id = msg_dict.get(DOCS_ID, None) - if docs_id: - if not store.get(DOCS_ID, None): - store[DOCS_ID] = {} - store[DOCS_ID].update(docs_id) + chash = hdoc.get(fields.CONTENT_HASH_KEY) + hdoc_store = self._hdoc_store[chash] + hdoc_store.update(hdoc) cdocs = message.cdocs - for cdoc_key in cdocs.keys(): - if not store.get(CDOCS, None): - store[CDOCS] = {} - - cdoc = cdocs[cdoc_key] - # first we make it weak-referenciable - referenciable_cdoc = ReferenciableDict(cdoc) - store[CDOCS][cdoc_key] = referenciable_cdoc + for cdoc in cdocs.values(): phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) if not phash: continue - self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + cdoc_store = self._cdoc_store[phash] + cdoc_store.update(cdoc) - def prune(seq, store): - for key in seq: - if key in store and empty(store.get(key)): - store.pop(key) + # Update memory store size + # XXX this should use [mbox][uid] + key = mbox, uid + self._sizes[key] = size.get_size(self._fdoc_store[key]) + # TODO add hdoc and cdocs sizes too - prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + def purge_fdoc_store(self, mbox): + """ + Purge the empty documents from a fdoc store. + Called during initialization of the SoledadMailbox - # Update memory store size - self._sizes[key] = size(self._msg_store[key]) + :param mbox: the mailbox + :type mbox: str or unicode + """ + # XXX This is really a workaround until I find the conditions + # that are making the empty items remain there. + # This happens, for instance, after running several times + # the regression test, that issues a store deleted + expunge + select + # The items are being correclty deleted, but in succesive appends + # the empty items with previously deleted uids reappear as empty + # documents. I suspect it's a timing condition with a previously + # evaluated sequence being used after the items has been removed. + + for uid, value in self._fdoc_store[mbox].items(): + if empty(value): + del self._fdoc_store[mbox][uid] def get_docid_for_fdoc(self, mbox, uid): """ @@ -371,13 +400,20 @@ class MemoryStore(object): :type uid: int :rtype: unicode or None """ - fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if empty(fdoc): - return None - doc_id = fdoc.doc_id + with self._fdoc_docid_lock: + doc_id = self._fdoc_id_store[mbox][uid] + + if empty(doc_id): + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if empty(fdoc) or empty(fdoc.content): + return None + doc_id = fdoc.doc_id + self._fdoc_id_store[mbox][uid] = doc_id + return doc_id - def get_message(self, mbox, uid, flags_only=False): + def get_message(self, mbox, uid, dirtystate=DirtyState.none, + flags_only=False): """ Get a MessageWrapper for the given mbox and uid combination. @@ -385,25 +421,58 @@ class MemoryStore(object): :type mbox: str or unicode :param uid: the message UID :type uid: int + :param dirtystate: DirtyState enum: one of `dirty`, `new` + or `none` (default) + :type dirtystate: enum :param flags_only: whether the message should carry only a reference to the flags document. :type flags_only: bool + : :return: MessageWrapper or None """ + if dirtystate == DirtyState.dirty: + flags_only = True + key = mbox, uid - FDOC = MessagePartType.fdoc.key - msg_dict = self._msg_store.get(key, None) - if empty(msg_dict): + fdoc = self._fdoc_store[mbox][uid] + if empty(fdoc): return None - new, dirty = self._get_new_dirty_state(key) + + new, dirty = False, False + if dirtystate == DirtyState.none: + new, dirty = self._get_new_dirty_state(key) + if dirtystate == DirtyState.dirty: + new, dirty = False, True + if dirtystate == DirtyState.new: + new, dirty = True, False + if flags_only: - return MessageWrapper(fdoc=msg_dict[FDOC], + return MessageWrapper(fdoc=fdoc, new=new, dirty=dirty, memstore=weakref.proxy(self)) else: - return MessageWrapper(from_dict=msg_dict, + chash = fdoc.get(fields.CONTENT_HASH_KEY) + hdoc = self._hdoc_store[chash] + if empty(hdoc): + hdoc = self._permanent_store.get_headers_doc(chash) + if empty(hdoc): + return None + if not empty(hdoc.content): + self._hdoc_store[chash] = hdoc.content + hdoc = hdoc.content + cdocs = None + + pmap = hdoc.get(fields.PARTS_MAP_KEY, None) + if new and pmap is not None: + # take the different cdocs for write... + cdoc_store = self._cdoc_store + cdocs_list = phash_iter(hdoc) + cdocs = dict(enumerate( + [cdoc_store[phash] for phash in cdocs_list], 1)) + + return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs, new=new, dirty=dirty, memstore=weakref.proxy(self)) @@ -424,23 +493,36 @@ class MemoryStore(object): # token to ensure consistency in the removal. try: + del self._fdoc_store[mbox][uid] + except KeyError: + pass + + try: key = mbox, uid self._new.discard(key) self._dirty.discard(key) - self._msg_store.pop(key, None) if key in self._sizes: del self._sizes[key] - + self._known_uids[mbox].discard(uid) + except Exception as exc: + logger.error("error while removing message!") + logger.exception(exc) + try: + with self._fdoc_docid_lock: + del self._fdoc_id_store[mbox][uid] except Exception as exc: + logger.error("error while removing message!") logger.exception(exc) # IMessageStoreWriter + @deferred_to_thread def write_messages(self, store): """ Write the message documents in this MemoryStore to a different store. :param store: the IMessageStore to write to + :rtype: False if queue is not empty, None otherwise. """ # For now, we pass if the queue is not empty, to avoid duplicate # queuing. @@ -450,7 +532,7 @@ class MemoryStore(object): # XXX this could return the deferred for all the enqueued operations if not self.producer.is_queue_empty(): - return + return False if any(map(lambda i: not empty(i), (self._new, self._dirty))): logger.info("Writing messages to Soledad...") @@ -459,9 +541,14 @@ class MemoryStore(object): # is accquired with set_bool_flag(self, self.WRITING_FLAG): for rflags_doc_wrapper in self.all_rdocs_iter(): - self.producer.push(rflags_doc_wrapper) - for msg_wrapper in self.all_new_dirty_msg_iter(): - self.producer.push(msg_wrapper) + self.producer.push(rflags_doc_wrapper, + state=self.producer.STATE_DIRTY) + for msg_wrapper in self.all_new_msg_iter(): + self.producer.push(msg_wrapper, + state=self.producer.STATE_NEW) + for msg_wrapper in self.all_dirty_msg_iter(): + self.producer.push(msg_wrapper, + state=self.producer.STATE_DIRTY) # MemoryStore specific methods. @@ -473,8 +560,7 @@ class MemoryStore(object): :type mbox: str or unicode :rtype: list """ - all_keys = self._msg_store.keys() - return [uid for m, uid in all_keys if m == mbox] + return self._fdoc_store[mbox].keys() def get_soledad_known_uids(self, mbox): """ @@ -523,7 +609,8 @@ class MemoryStore(object): :param value: the value to set :type value: int """ - leap_assert_type(value, int) + # can be long??? + #leap_assert_type(value, int) logger.info("setting last soledad uid for %s to %s" % (mbox, value)) # if we already have a value here, don't do anything @@ -555,10 +642,9 @@ class MemoryStore(object): with self._last_uid_lock: self._last_uid[mbox] += 1 value = self._last_uid[mbox] - self.write_last_uid(mbox, value) + self.reactor.callInThread(self.write_last_uid, mbox, value) return value - @deferred_to_thread def write_last_uid(self, mbox, value): """ Increment the soledad integer cache for the highest uid value. @@ -572,11 +658,112 @@ class MemoryStore(object): if self._permanent_store: self._permanent_store.write_last_uid(mbox, value) + def load_flag_docs(self, mbox, flag_docs): + """ + Load the flag documents for the given mbox. + Used during initial flag docs prefetch. + + :param mbox: the mailbox + :type mbox: str or unicode + :param flag_docs: a dict with the content for the flag docs, indexed + by uid. + :type flag_docs: dict + """ + # We can do direct assignments cause we know this will only + # be called during initialization of the mailbox. + # TODO could hook here a sanity-check + # for duplicates + + fdoc_store = self._fdoc_store[mbox] + chash_fdoc_store = self._chash_fdoc_store + for uid in flag_docs: + rdict = ReferenciableDict(flag_docs[uid]) + fdoc_store[uid] = rdict + # populate chash dict too, to avoid fdoc duplication + chash = flag_docs[uid]["chash"] + chash_fdoc_store[chash][mbox] = weakref.proxy( + self._fdoc_store[mbox][uid]) + + def update_flags(self, mbox, uid, fdoc): + """ + Update the flag document for a given mbox and uid combination, + and set the dirty flag. + We could use put_message, but this is faster. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the uid of the message + :type uid: int + + :param fdoc: a dict with the content for the flag docs + :type fdoc: dict + """ + key = mbox, uid + self._fdoc_store[mbox][uid].update(fdoc) + self._dirty.add(key) + + def load_header_docs(self, header_docs): + """ + Load the flag documents for the given mbox. + Used during header docs prefetch, and during cache after + a read from soledad if the hdoc property in message did not + find its value in here. + + :param flag_docs: a dict with the content for the flag docs. + :type flag_docs: dict + """ + hdoc_store = self._hdoc_store + for chash in header_docs: + hdoc_store[chash] = ReferenciableDict(header_docs[chash]) + + def all_flags(self, mbox): + """ + Return a dictionary with all the flags for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: dict + """ + flags_dict = {} + uids = self.get_uids(mbox) + fdoc_store = self._fdoc_store[mbox] + + for uid in uids: + try: + flags = fdoc_store[uid][fields.FLAGS_KEY] + flags_dict[uid] = flags + except KeyError: + continue + return flags_dict + + def all_headers(self, mbox): + """ + Return a dictionary with all the header docs for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: dict + """ + headers_dict = {} + uids = self.get_uids(mbox) + fdoc_store = self._fdoc_store[mbox] + hdoc_store = self._hdoc_store + + for uid in uids: + try: + chash = fdoc_store[uid][fields.CONTENT_HASH_KEY] + hdoc = hdoc_store[chash] + if not empty(hdoc): + headers_dict[uid] = hdoc + except KeyError: + continue + return headers_dict + # Counting sheeps... def count_new_mbox(self, mbox): """ - Count the new messages by inbox. + Count the new messages by mailbox. :param mbox: the mailbox :type mbox: str or unicode @@ -594,6 +781,33 @@ class MemoryStore(object): """ return len(self._new) + def count(self, mbox): + """ + Return the count of messages for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: number of messages + :rtype: int + """ + return len(self._fdoc_store[mbox]) + + def unseen_iter(self, mbox): + """ + Get an iterator for the message UIDs with no `seen` flag + for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: iterator through unseen message doc UIDs + :rtype: iterable + """ + fdocs = self._fdoc_store[mbox] + + return [uid for uid, value + in fdocs.items() + if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])] + def get_cdoc_from_phash(self, phash): """ Return a content-document by its payload-hash. @@ -602,7 +816,7 @@ class MemoryStore(object): :type phash: str or unicode :rtype: MessagePartDoc """ - doc = self._phash_store.get(phash, None) + doc = self._cdoc_store.get(phash, None) # XXX return None for consistency? @@ -632,8 +846,7 @@ class MemoryStore(object): :return: MessagePartDoc. It will return None if the flags document has empty content or it is flagged as \\Deleted. """ - docs_dict = self._chash_fdoc_store.get(chash, None) - fdoc = docs_dict.get(mbox, None) if docs_dict else None + fdoc = self._chash_fdoc_store[chash][mbox] # a couple of special cases. # 1. We might have a doc with empty content... @@ -644,53 +857,61 @@ class MemoryStore(object): # We want to create a new one in this case. # Hmmm what if the deletion is un-done?? We would end with a # duplicate... - if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []): return None uid = fdoc[fields.UID_KEY] key = mbox, uid new = key in self._new dirty = key in self._dirty + return MessagePartDoc( new=new, dirty=dirty, store="mem", part=MessagePartType.fdoc, content=fdoc, doc_id=None) - def all_msg_iter(self): + def iter_fdoc_keys(self): """ - Return generator that iterates through all messages in the store. - - :return: generator of MessageWrappers - :rtype: generator + Return a generator through all the mbox, uid keys in the flags-doc + store. """ - return (self.get_message(*key) - for key in sorted(self._msg_store.keys())) + fdoc_store = self._fdoc_store + for mbox in fdoc_store: + for uid in fdoc_store[mbox]: + yield mbox, uid - def all_new_dirty_msg_iter(self): + def all_new_msg_iter(self): """ - Return generator that iterates through all new and dirty messages. + Return generator that iterates through all new messages. :return: generator of MessageWrappers :rtype: generator """ - return (self.get_message(*key) - for key in sorted(self._msg_store.keys()) - if key in self._new or key in self._dirty) + gm = self.get_message + # need to freeze, set can change during iteration + new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)] + # move content from new set to the queue + self._new_queue.update(self._new) + self._new.difference_update(self._new) + return new - def all_msg_dict_for_mbox(self, mbox): + def all_dirty_msg_iter(self): """ - Return all the message dicts for a given mbox. + Return generator that iterates through all dirty messages. - :param mbox: the mailbox - :type mbox: str or unicode - :return: list of dictionaries - :rtype: list + :return: generator of MessageWrappers + :rtype: generator """ - # This *needs* to return a fixed sequence. Otherwise the dictionary len - # will change during iteration, when we modify it - return [self._msg_store[(mb, uid)] - for mb, uid in self._msg_store if mb == mbox] + gm = self.get_message + # need to freeze, set can change during iteration + dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty) + for key in tuple(self._dirty)] + # move content from new and dirty sets to the queue + + self._dirty_queue.update(self._dirty) + self._dirty.difference_update(self._dirty) + return dirty def all_deleted_uid_iter(self, mbox): """ @@ -704,11 +925,10 @@ class MemoryStore(object): """ # This *needs* to return a fixed sequence. Otherwise the dictionary len # will change during iteration, when we modify it - all_deleted = [ - msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) - if msg.get('fdoc', None) - and fields.DELETED_FLAG in msg['fdoc']['flags']] - return all_deleted + fdocs = self._fdoc_store[mbox] + return [uid for uid, value + in fdocs.items() + if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])] # new, dirty flags @@ -721,26 +941,30 @@ class MemoryStore(object): :return: tuple of bools :rtype: tuple """ + # TODO change indexing of sets to [mbox][key] too. # XXX should return *first* the news, and *then* the dirty... + + # TODO should query in queues too , true? + # return map(lambda _set: key in _set, (self._new, self._dirty)) - def set_new(self, key): + def set_new_queued(self, key): """ - Add the key value to the `new` set. + Add the key value to the `new-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._new.add(key) + self._new_queue.add(key) - def unset_new(self, key): + def unset_new_queued(self, key): """ - Remove the key value from the `new` set. + Remove the key value from the `new-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._new.discard(key) + self._new_queue.discard(key) deferreds = self._new_deferreds d = deferreds.get(key, None) if d: @@ -749,23 +973,23 @@ class MemoryStore(object): d.callback('%s, ok' % str(key)) deferreds.pop(key) - def set_dirty(self, key): + def set_dirty_queued(self, key): """ - Add the key value to the `dirty` set. + Add the key value to the `dirty-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._dirty.add(key) + self._dirty_queue.add(key) - def unset_dirty(self, key): + def unset_dirty_queued(self, key): """ - Remove the key value from the `dirty` set. + Remove the key value from the `dirty-queue` set. :param key: the key for the message, in the form mbox, uid :type key: tuple """ - self._dirty.discard(key) + self._dirty_queue.discard(key) deferreds = self._dirty_deferreds d = deferreds.get(key, None) if d: @@ -776,7 +1000,6 @@ class MemoryStore(object): # Recent Flags - # TODO --- nice but unused def set_recent_flag(self, mbox, uid): """ Set the `Recent` flag for a given mailbox and UID. @@ -894,6 +1117,8 @@ class MemoryStore(object): """ self._stop_write_loop() if self._permanent_store is not None: + # XXX we should check if we did get a True value on this + # operation. If we got False we should retry! (queue was not empty) self.write_messages(self._permanent_store) self.producer.flush() @@ -911,10 +1136,18 @@ class MemoryStore(object): :type observer: Deferred """ soledad_store = self._permanent_store + if soledad_store is None: + # just-in memory store, easy then. + self._delete_from_memory(mbox, observer) + return + + # We have a soledad storage. try: # Stop and trigger last write self.stop_and_flush() # Wait on the writebacks to finish + + # XXX what if pending deferreds is empty? pending_deferreds = (self._new_deferreds.get(mbox, []) + self._dirty_deferreds.get(mbox, [])) d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) @@ -923,6 +1156,18 @@ class MemoryStore(object): except Exception as exc: logger.exception(exc) + def _delete_from_memory(self, mbox, observer): + """ + Remove all messages marked as deleted from soledad and memory. + + :param mbox: the mailbox + :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred + """ + mem_deleted = self.remove_all_deleted(mbox) + observer.callback(mem_deleted) + def _delete_from_soledad_and_memory(self, result, mbox, observer): """ Remove all messages marked as deleted from soledad and memory. @@ -939,12 +1184,8 @@ class MemoryStore(object): try: # 1. Delete all messages marked as deleted in soledad. - - # XXX this could be deferred for faster operation. - if soledad_store: - sol_deleted = soledad_store.remove_all_deleted(mbox) - else: - sol_deleted = [] + logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,)) + sol_deleted = soledad_store.remove_all_deleted(mbox) try: self._known_uids[mbox].difference_update(set(sol_deleted)) @@ -952,6 +1193,7 @@ class MemoryStore(object): logger.exception(exc) # 2. Delete all messages marked as deleted in memory. + logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,)) mem_deleted = self.remove_all_deleted(mbox) all_deleted = set(mem_deleted).union(set(sol_deleted)) @@ -960,8 +1202,43 @@ class MemoryStore(object): logger.exception(exc) finally: self._start_write_loop() + observer.callback(all_deleted) + # Mailbox documents and attributes + + # This could be also be cached in memstore, but proxying directly + # to soledad since it's not too performance-critical. + + def get_mbox_doc(self, mbox): + """ + Return the soledad document for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: SoledadDocument or None. + """ + return self.permanent_store.get_mbox_document(mbox) + + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + return self.permanent_store.get_mbox_closed(mbox) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + self.permanent_store.set_mbox_closed(mbox, closed) + # Dump-to-disk controls. @property diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py index b1f333a..257721c 100644 --- a/src/leap/mail/imap/messageparts.py +++ b/src/leap/mail/imap/messageparts.py @@ -98,7 +98,7 @@ class MessageWrapper(object): CDOCS = "cdocs" DOCS_ID = "docs_id" - # Using slots to limit some the memory footprint, + # Using slots to limit some the memory use, # Add your attribute here. __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] @@ -148,7 +148,7 @@ class MessageWrapper(object): """ return self._new - def _set_new(self, value=True): + def _set_new(self, value=False): """ Set the value for the `new` flag, and propagate it to the memory store if any. @@ -158,11 +158,14 @@ class MessageWrapper(object): """ self._new = value if self.memstore: - mbox = self.fdoc.content['mbox'] - uid = self.fdoc.content['uid'] + mbox = self.fdoc.content.get('mbox', None) + uid = self.fdoc.content.get('uid', None) + if not mbox or not uid: + logger.warning("Malformed fdoc") + return key = mbox, uid - fun = [self.memstore.unset_new, - self.memstore.set_new][int(value)] + fun = [self.memstore.unset_new_queued, + self.memstore.set_new_queued][int(value)] fun(key) else: logger.warning("Could not find a memstore referenced from this " @@ -190,11 +193,14 @@ class MessageWrapper(object): """ self._dirty = value if self.memstore: - mbox = self.fdoc.content['mbox'] - uid = self.fdoc.content['uid'] + mbox = self.fdoc.content.get('mbox', None) + uid = self.fdoc.content.get('uid', None) + if not mbox or not uid: + logger.warning("Malformed fdoc") + return key = mbox, uid - fun = [self.memstore.unset_dirty, - self.memstore.set_dirty][int(value)] + fun = [self.memstore.unset_dirty_queued, + self.memstore.set_dirty_queued][int(value)] fun(key) else: logger.warning("Could not find a memstore referenced from this " @@ -271,13 +277,17 @@ class MessageWrapper(object): :rtype: generator """ if self._dirty: - mbox = self.fdoc.content[fields.MBOX_KEY] - uid = self.fdoc.content[fields.UID_KEY] - docid_dict = self._dict[self.DOCS_ID] - docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( - mbox, uid) - - if not empty(self.fdoc.content): + try: + mbox = self.fdoc.content[fields.MBOX_KEY] + uid = self.fdoc.content[fields.UID_KEY] + docid_dict = self._dict[self.DOCS_ID] + docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( + mbox, uid) + except Exception as exc: + logger.debug("Error while walking message...") + logger.exception(exc) + + if not empty(self.fdoc.content) and 'uid' in self.fdoc.content: yield self.fdoc if not empty(self.hdoc.content): yield self.hdoc @@ -408,10 +418,8 @@ class MessagePart(object): if payload: content_type = self._get_ctype_from_document(phash) charset = find_charset(content_type) - logger.debug("Got charset from header: %s" % (charset,)) if charset is None: charset = self._get_charset(payload) - logger.debug("Got charset: %s" % (charset,)) try: if isinstance(payload, unicode): payload = payload.encode(charset) diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 25fc55f..fc1ec55 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -28,7 +28,6 @@ from functools import partial from twisted.mail import imap4 from twisted.internet import defer -from twisted.python import log from zope.interface import implements from zope.proxy import sameProxiedObjects @@ -78,7 +77,7 @@ def try_unique_query(curried): # TODO we could take action, like trigger a background # process to kill dupes. name = getattr(curried, 'expected', 'doc') - logger.warning( + logger.debug( "More than one %s found for this mbox, " "we got a duplicate!!" % (name,)) return query.pop() @@ -88,6 +87,13 @@ def try_unique_query(curried): logger.exception("Unhandled error %r" % exc) +""" +A dictionary that keeps one lock per mbox and uid. +""" +# XXX too much overhead? +fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) + + class LeapMessage(fields, MailParser, MBoxParser): """ The main representation of a message. @@ -102,8 +108,6 @@ class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) - flags_lock = threading.Lock() - def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -129,10 +133,13 @@ class LeapMessage(fields, MailParser, MBoxParser): self.__chash = None self.__bdoc = None + from twisted.internet import reactor + self.reactor = reactor + # XXX make these properties public @property - def _fdoc(self): + def fdoc(self): """ An accessor to the flags document. """ @@ -149,35 +156,43 @@ class LeapMessage(fields, MailParser, MBoxParser): return fdoc @property - def _hdoc(self): + def hdoc(self): """ An accessor to the headers document. """ - if self._container is not None: + container = self._container + if container is not None: hdoc = self._container.hdoc if hdoc and not empty(hdoc.content): return hdoc - # XXX cache this into the memory store !!! - return self._get_headers_doc() + hdoc = self._get_headers_doc() + + if container and not empty(hdoc.content): + # mem-cache it + hdoc_content = hdoc.content + chash = hdoc_content.get(fields.CONTENT_HASH_KEY) + hdocs = {chash: hdoc_content} + container.memstore.load_header_docs(hdocs) + return hdoc @property - def _chash(self): + def chash(self): """ An accessor to the content hash for this message. """ - if not self._fdoc: + if not self.fdoc: return None - if not self.__chash and self._fdoc: - self.__chash = self._fdoc.content.get( + if not self.__chash and self.fdoc: + self.__chash = self.fdoc.content.get( fields.CONTENT_HASH_KEY, None) return self.__chash @property - def _bdoc(self): + def bdoc(self): """ An accessor to the body document. """ - if not self._hdoc: + if not self.hdoc: return None if not self.__bdoc: self.__bdoc = self._get_body_doc() @@ -204,7 +219,7 @@ class LeapMessage(fields, MailParser, MBoxParser): uid = self._uid flags = set([]) - fdoc = self._fdoc + fdoc = self.fdoc if fdoc: flags = set(fdoc.content.get(self.FLAGS_KEY, None)) @@ -230,20 +245,19 @@ class LeapMessage(fields, MailParser, MBoxParser): :type mode: int """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - log.msg('setting flags: %s (%s)' % (self._uid, flags)) - - doc = self._fdoc - if not doc: - logger.warning( - "Could not find FDOC for %s:%s while setting flags!" % - (self._mbox, self._uid)) - return + mbox, uid = self._mbox, self._uid APPEND = 1 REMOVE = -1 SET = 0 - with self.flags_lock: + with fdoc_locks[mbox][uid]: + doc = self.fdoc + if not doc: + logger.warning( + "Could not find FDOC for %r:%s while setting flags!" % + (mbox, uid)) + return current = doc.content[self.FLAGS_KEY] if mode == APPEND: newflags = tuple(set(tuple(current) + flags)) @@ -251,33 +265,31 @@ class LeapMessage(fields, MailParser, MBoxParser): newflags = tuple(set(current).difference(set(flags))) elif mode == SET: newflags = flags + new_fdoc = { + self.FLAGS_KEY: newflags, + self.SEEN_KEY: self.SEEN_FLAG in newflags, + self.DEL_KEY: self.DELETED_FLAG in newflags} + self._collection.memstore.update_flags(mbox, uid, new_fdoc) - # We could defer this, but I think it's better - # to put it under the lock... - doc.content[self.FLAGS_KEY] = newflags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - - if self._collection.memstore is not None: - log.msg("putting message in collection") - self._collection.memstore.put_message( - self._mbox, self._uid, - MessageWrapper(fdoc=doc.content, new=False, dirty=True, - docs_id={'fdoc': doc.doc_id})) - else: - # fallback for non-memstore initializations. - self._soledad.put_doc(doc) return map(str, newflags) def getInternalDate(self): """ Retrieve the date internally associated with this message - :rtype: C{str} + According to the spec, this is NOT the date and time in the + RFC-822 header, but rather a date and time that reflects when the + message was received. + + * In SMTP, date and time of final delivery. + * In COPY, internal date/time of the source message. + * In APPEND, date/time specified. + :return: An RFC822-formatted date string. + :rtype: str """ - date = self._hdoc.content.get(self.DATE_KEY, '') - return str(date) + date = self.hdoc.content.get(fields.DATE_KEY, '') + return date # # IMessagePart @@ -302,8 +314,8 @@ class LeapMessage(fields, MailParser, MBoxParser): fd = StringIO.StringIO() - if self._bdoc is not None: - bdoc_content = self._bdoc.content + if self.bdoc is not None: + bdoc_content = self.bdoc.content if empty(bdoc_content): logger.warning("No BDOC content found for message!!!") return write_fd("") @@ -311,7 +323,6 @@ class LeapMessage(fields, MailParser, MBoxParser): body = bdoc_content.get(self.RAW_KEY, "") content_type = bdoc_content.get('content-type', "") charset = find_charset(content_type) - logger.debug('got charset from content-type: %s' % charset) if charset is None: charset = self._get_charset(body) try: @@ -352,8 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: int """ size = None - if self._fdoc: - fdoc_content = self._fdoc.content + if self.fdoc is not None: + fdoc_content = self.fdoc.content size = fdoc_content.get(self.SIZE_KEY, False) else: logger.warning("No FLAGS doc for %s:%s" % (self._mbox, @@ -422,8 +433,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ Return the headers dict for this message. """ - if self._hdoc is not None: - hdoc_content = self._hdoc.content + if self.hdoc is not None: + hdoc_content = self.hdoc.content headers = hdoc_content.get(self.HEADERS_KEY, {}) return headers @@ -437,8 +448,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ Return True if this message is multipart. """ - if self._fdoc: - fdoc_content = self._fdoc.content + if self.fdoc: + fdoc_content = self.fdoc.content is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) return is_multipart else: @@ -477,11 +488,11 @@ class LeapMessage(fields, MailParser, MBoxParser): :raises: KeyError if key does not exist :rtype: dict """ - if not self._hdoc: + if not self.hdoc: logger.warning("Tried to get part but no HDOC found!") return None - hdoc_content = self._hdoc.content + hdoc_content = self.hdoc.content pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) # remember, lads, soledad is using strings in its keys, @@ -508,6 +519,7 @@ class LeapMessage(fields, MailParser, MBoxParser): finally: return result + # TODO move to soledadstore instead of accessing soledad directly def _get_headers_doc(self): """ Return the document that keeps the headers for this @@ -515,15 +527,16 @@ class LeapMessage(fields, MailParser, MBoxParser): """ head_docs = self._soledad.get_from_index( fields.TYPE_C_HASH_IDX, - fields.TYPE_HEADERS_VAL, str(self._chash)) + fields.TYPE_HEADERS_VAL, str(self.chash)) return first(head_docs) + # TODO move to soledadstore instead of accessing soledad directly def _get_body_doc(self): """ Return the document that keeps the body for this message. """ - hdoc_content = self._hdoc.content + hdoc_content = self.hdoc.content body_phash = hdoc_content.get( fields.BODY_KEY, None) if not body_phash: @@ -560,14 +573,14 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The content value indexed by C{key} or None :rtype: str """ - return self._fdoc.content.get(key, None) + return self.fdoc.content.get(key, None) def does_exist(self): """ Return True if there is actually a flags document for this UID and mbox. """ - return not empty(self._fdoc) + return not empty(self.fdoc) class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): @@ -672,8 +685,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): _rdoc_lock = threading.Lock() _rdoc_property_lock = threading.Lock() - _hdocset_lock = threading.Lock() - _hdocset_property_lock = threading.Lock() def __init__(self, mbox=None, soledad=None, memstore=None): """ @@ -714,14 +725,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.memstore = memstore self.__rflags = None - self.__hdocset = None self.initialize_db() # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() - # Not for now... - #self._get_or_create_hdocset() + from twisted.internet import reactor + self.reactor = reactor def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -746,33 +756,26 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): rdoc[fields.MBOX_KEY] = self.mbox self._soledad.create_doc(rdoc) - def _get_or_create_hdocset(self): - """ - Try to retrieve the hdocs-set doc for this MessageCollection, - and create one if not found. - """ - hdocset = self._get_hdocset_doc() - if not hdocset: - hdocset = self._get_empty_doc(self.HDOCS_SET_DOC) - if self.mbox != fields.INBOX_VAL: - hdocset[fields.MBOX_KEY] = self.mbox - self._soledad.create_doc(hdocset) - + @deferred_to_thread def _do_parse(self, raw): """ Parse raw message and return it along with relevant information about its outer level. + This is done in a separate thread, and the callback is passed + to `_do_add_msg` method. + :param raw: the raw message :type raw: StringIO or basestring - :return: msg, chash, size, multi + :return: msg, parts, chash, size, multi :rtype: tuple """ msg = self._get_parsed_msg(raw) chash = self._get_hash(msg) size = len(msg.as_string()) multi = msg.is_multipart() - return msg, chash, size, multi + parts = walk.get_parts(msg) + return msg, parts, chash, size, multi def _populate_flags(self, flags, uid, chash, size, multi): """ @@ -840,12 +843,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :return: False, if it does not exist, or UID. """ exist = False - if self.memstore is not None: - exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) if not exist: exist = self._get_fdoc_from_chash(chash) - if exist: + if exist and exist.content is not None: return exist.content.get(fields.UID_KEY, "unknown-uid") else: return False @@ -874,24 +876,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): uid when the adding succeed. :rtype: deferred """ - logger.debug('adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) - d = defer.Deferred() - self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) - return d + observer = defer.Deferred() + d = self._do_parse(raw) + d.addCallback(lambda result: self.reactor.callInThread( + self._do_add_msg, result, flags, subject, date, + notify_on_disk, observer)) + return observer - # We SHOULD defer this (or the heavy load here) to the thread pool, - # but it gives troubles with the QSocketNotifier used by Qt... - def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + # Called in thread + def _do_add_msg(self, parse_result, flags, subject, + date, notify_on_disk, observer): """ Helper that creates a new message document. Here lives the magic of the leap mail. Well, in soledad, really. See `add_msg` docstring for parameter info. + :param parse_result: a tuple with the results of `self._do_parse` + :type parse_result: tuple :param observer: a deferred that will be fired with the message uid when the adding succeed. :type observer: deferred @@ -902,35 +908,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO add the linked-from info ! # TODO add reference to the original message - # parse - msg, chash, size, multi = self._do_parse(raw) + msg, parts, chash, size, multi = parse_result # check for uniqueness -------------------------------- - # XXX profiler says that this test is costly. - # So we probably should just do an in-memory check and - # move the complete check to the soledad writer? # Watch out! We're reserving a UID right after this! existing_uid = self._fdoc_already_exists(chash) if existing_uid: - logger.warning("We already have that message in this " - "mailbox, unflagging as deleted") uid = existing_uid msg = self.get_msg_by_uid(uid) - msg.setFlags((fields.DELETED_FLAG,), -1) - # XXX if this is deferred to thread again we should not use - # the callback in the deferred thread, but return and - # call the callback from the caller fun... - observer.callback(uid) + # We can say the observer that we're done + self.reactor.callFromThread(observer.callback, uid) + msg.setFlags((fields.DELETED_FLAG,), -1) return uid = self.memstore.increment_last_soledad_uid(self.mbox) - logger.info("ADDING MSG WITH UID: %s" % uid) + + # We can say the observer that we're done at this point, but + # before that we should make sure it has no serious consequences + # if we're issued, for instance, a fetch command right after... + #self.reactor.callFromThread(observer.callback, uid) + # if we did the notify, we need to invalidate the deferred + # so not to try to fire it twice. + #observer = None fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) - parts = walk.get_parts(msg) body_phash_fun = [walk.get_body_phash_simple, walk.get_body_phash_multi][int(multi)] body_phash = body_phash_fun(walk.get_payloads(msg)) @@ -949,9 +953,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.set_recent_flag(uid) msg_container = MessageWrapper(fd, hd, cdocs) - self.memstore.create_message(self.mbox, uid, msg_container, - observer=observer, - notify_on_disk=notify_on_disk) + self.memstore.create_message( + self.mbox, uid, msg_container, + observer=observer, notify_on_disk=notify_on_disk) # # getters: specific queries @@ -982,14 +986,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): {'doc_id': rdoc.doc_id, 'set': rflags}) return rflags - #else: - # fallback for cases without memory store - #with self._rdoc_lock: - #rdoc = self._get_recent_doc() - #self.__rflags = set(rdoc.content.get( - #fields.RECENTFLAGS_KEY, [])) - #return self.__rflags - def _set_recent_flags(self, value): """ Setter for the recent-flags set for this mailbox. @@ -997,16 +993,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): if self.memstore is not None: self.memstore.set_recent_flags(self.mbox, value) - #else: - # fallback for cases without memory store - #with self._rdoc_lock: - #rdoc = self._get_recent_doc() - #newv = set(value) - #self.__rflags = newv - #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) - # XXX should deferLater 0 it? - #self._soledad.put_doc(rdoc) - recent_flags = property( _get_recent_flags, _set_recent_flags, doc="Set of UIDs with the recent flag for this mailbox.") @@ -1121,6 +1107,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # XXX is this working? return self._get_uid_from_msgidCb(msgid) + @deferred_to_thread def set_flags(self, mbox, messages, flags, mode, observer): """ Set flags for a sequence of messages. @@ -1138,28 +1125,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): done. :type observer: deferred """ - # XXX we could defer *this* to thread pool, and gather results... - # XXX use deferredList + reactor = self.reactor + getmsg = self.get_msg_by_uid - deferreds = [] - for msg_id in messages: - deferreds.append( - self._set_flag_for_uid(msg_id, flags, mode)) + def set_flags(uid, flags, mode): + msg = getmsg(uid, mem_only=True, flags_only=True) + if msg is not None: + return uid, msg.setFlags(flags, mode) - def notify(result): - observer.callback(dict(result)) - d1 = defer.gatherResults(deferreds, consumeErrors=True) - d1.addCallback(notify) + setted_flags = [set_flags(uid, flags, mode) for uid in messages] + result = dict(filter(None, setted_flags)) - @deferred_to_thread - def _set_flag_for_uid(self, msg_id, flags, mode): - """ - Run the set_flag operation in the thread pool. - """ - log.msg("MSG ID = %s" % msg_id) - msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) - if msg is not None: - return msg_id, msg.setFlags(flags, mode) + reactor.callFromThread(observer.callback, result) # getters: generic for a mailbox @@ -1182,7 +1159,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): or None if not found. :rtype: LeapMessage """ - msg_container = self.memstore.get_message(self.mbox, uid, flags_only) + msg_container = self.memstore.get_message( + self.mbox, uid, flags_only=flags_only) + if msg_container is not None: if mem_only: msg = LeapMessage(None, uid, self.mbox, collection=self, @@ -1195,6 +1174,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): collection=self, container=msg_container) else: msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) + if not msg.does_exist(): return None return msg @@ -1234,67 +1214,51 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): db_uids = set([doc.content[self.UID_KEY] for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox)]) + fields.TYPE_FLAGS_VAL, self.mbox) + if not empty(doc)]) return db_uids def all_uid_iter(self): """ Return an iterator through the UIDs of all messages, from memory. """ - if self.memstore is not None: - mem_uids = self.memstore.get_uids(self.mbox) - soledad_known_uids = self.memstore.get_soledad_known_uids( - self.mbox) - combined = tuple(set(mem_uids).union(soledad_known_uids)) - return combined + mem_uids = self.memstore.get_uids(self.mbox) + soledad_known_uids = self.memstore.get_soledad_known_uids( + self.mbox) + combined = tuple(set(mem_uids).union(soledad_known_uids)) + return combined - # XXX MOVE to memstore - def all_flags(self): + def get_all_soledad_flag_docs(self): """ - Return a dict with all flags documents for this mailbox. - """ - # XXX get all from memstore and cache it there - # FIXME should get all uids, get them fro memstore, - # and get only the missing ones from disk. - - all_flags = dict((( - doc.content[self.UID_KEY], - doc.content[self.FLAGS_KEY]) for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox))) - if self.memstore is not None: - uids = self.memstore.get_uids(self.mbox) - docs = ((uid, self.memstore.get_message(self.mbox, uid)) - for uid in uids) - for uid, doc in docs: - all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY] + Return a dict with the content of all the flag documents + in soledad store for the given mbox. - return all_flags - - def all_flags_chash(self): - """ - Return a dict with the content-hash for all flag documents - for this mailbox. + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: dict """ - all_flags_chash = dict((( + # XXX we really could return a reduced version with + # just {'uid': (flags-tuple,) since the prefetch is + # only oriented to get the flag tuples. + all_docs = [( doc.content[self.UID_KEY], - doc.content[self.CONTENT_HASH_KEY]) for doc in + dict(doc.content)) + for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox))) - return all_flags_chash + fields.TYPE_FLAGS_VAL, self.mbox) + if not empty(doc.content)] + all_flags = dict(all_docs) + return all_flags def all_headers(self): """ - Return a dict with all the headers documents for this + Return a dict with all the header documents for this mailbox. + + :rtype: dict """ - all_headers = dict((( - doc.content[self.CONTENT_HASH_KEY], - doc.content[self.HEADERS_KEY]) for doc in - self._soledad.get_docs(self._hdocset))) - return all_headers + return self.memstore.all_headers(self.mbox) def count(self): """ @@ -1302,13 +1266,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :rtype: int """ - # XXX We should cache this in memstore too until next write... - count = self._soledad.get_count_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox) - if self.memstore is not None: - count += self.memstore.count_new() - return count + return self.memstore.count(self.mbox) # unseen messages @@ -1320,10 +1278,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :return: iterator through unseen message doc UIDs :rtype: iterable """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_SEEN_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '0')) + return self.memstore.unseen_iter(self.mbox) def count_unseen(self): """ @@ -1332,10 +1287,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): :returns: count :rtype: int """ - count = self._soledad.get_count_from_index( - fields.TYPE_MBOX_SEEN_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '0') - return count + return len(list(self.unseen_iter())) def get_unseen(self): """ diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py index ba63846..5da9bfd 100644 --- a/src/leap/mail/imap/server.py +++ b/src/leap/mail/imap/server.py @@ -20,9 +20,7 @@ Leap IMAP4 Server Implementation. from copy import copy from twisted import cred -from twisted.internet import defer from twisted.internet.defer import maybeDeferred -from twisted.internet.task import deferLater from twisted.mail import imap4 from twisted.python import log @@ -41,6 +39,7 @@ class LeapIMAPServer(imap4.IMAP4Server): soledad = kwargs.pop('soledad', None) uuid = kwargs.pop('uuid', None) userid = kwargs.pop('userid', None) + leap_assert(soledad, "need a soledad instance") leap_assert_type(soledad, Soledad) leap_assert(uuid, "need a user in the initialization") @@ -55,6 +54,9 @@ class LeapIMAPServer(imap4.IMAP4Server): # populate the test account properly (and only once # per session) + from twisted.internet import reactor + self.reactor = reactor + def lineReceived(self, line): """ Attempt to parse a single line from the server. @@ -114,6 +116,7 @@ class LeapIMAPServer(imap4.IMAP4Server): ).addCallback( cbFetch, tag, query, uid ).addErrback(ebFetch, tag) + elif len(query) == 1 and str(query[0]) == "rfc822.header": self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator @@ -130,48 +133,16 @@ class LeapIMAPServer(imap4.IMAP4Server): ).addCallback( cbFetch, tag, query, uid ).addErrback( - ebFetch, tag - ).addCallback( - self.on_fetch_finished, messages) + ebFetch, tag) select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, imap4.IMAP4Server.arg_fetchatt) - def on_fetch_finished(self, _, messages): - from twisted.internet import reactor - - print "FETCH FINISHED -- NOTIFY NEW" - deferLater(reactor, 0, self.notifyNew) - deferLater(reactor, 0, self.mbox.unset_recent_flags, messages) - deferLater(reactor, 0, self.mbox.signal_unread_to_ui) - - def on_copy_finished(self, defers): - d = defer.gatherResults(filter(None, defers)) - - def when_finished(result): - log.msg("COPY FINISHED") - self.notifyNew() - self.mbox.signal_unread_to_ui() - d.addCallback(when_finished) - #d.addCallback(self.notifyNew) - #d.addCallback(self.mbox.signal_unread_to_ui) - - def do_COPY(self, tag, messages, mailbox, uid=0): - from twisted.internet import reactor - defers = [] - d = imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) - defers.append(d) - deferLater(reactor, 0, self.on_copy_finished, defers) - - select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, - imap4.IMAP4Server.arg_astring) - def notifyNew(self, ignored=None): """ Notify new messages to listeners. """ - print "TRYING TO NOTIFY NEW" - self.mbox.notify_new() + self.reactor.callFromThread(self.mbox.notify_new) def _cbSelectWork(self, mbox, cmdName, tag): """ diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index 93df51d..1175cdc 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -25,6 +25,7 @@ from twisted.internet import defer, threads from twisted.internet.protocol import ServerFactory from twisted.internet.error import CannotListenError from twisted.mail import imap4 +from twisted.python import log logger = logging.getLogger(__name__) @@ -71,6 +72,15 @@ DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None) if DO_MANHOLE: from leap.mail.imap.service import manhole +DO_PROFILE = os.environ.get("LEAP_PROFILE", None) +if DO_PROFILE: + import cProfile + log.msg("Starting PROFILING...") + + PROFILE_DAT = "/tmp/leap_mail_profile.pstats" + pr = cProfile.Profile() + pr.enable() + class IMAPAuthRealm(object): """ @@ -115,7 +125,12 @@ class LeapIMAPFactory(ServerFactory): # XXX how to pass the store along? def buildProtocol(self, addr): - "Return a protocol suitable for the job." + """ + Return a protocol suitable for the job. + + :param addr: remote ip address + :type addr: str + """ imapProtocol = LeapIMAPServer( uuid=self._uuid, userid=self._userid, @@ -124,7 +139,7 @@ class LeapIMAPFactory(ServerFactory): imapProtocol.factory = self return imapProtocol - def doStop(self, cv): + def doStop(self, cv=None): """ Stops imap service (fetcher, factory and port). @@ -135,23 +150,30 @@ class LeapIMAPFactory(ServerFactory): disk in another thread. :rtype: Deferred """ - ServerFactory.doStop(self) + if DO_PROFILE: + log.msg("Stopping PROFILING") + pr.disable() + pr.dump_stats(PROFILE_DAT) - def _stop_imap_cb(): - logger.debug('Stopping in memory store.') - self._memstore.stop_and_flush() - while not self._memstore.producer.is_queue_empty(): - logger.debug('Waiting for queue to be empty.') - # TODO use a gatherResults over the new/dirty deferred list, - # as in memorystore's expunge() method. - time.sleep(1) - # notify that service has stopped - logger.debug('Notifying that service has stopped.') - cv.acquire() - cv.notify() - cv.release() + ServerFactory.doStop(self) - return threads.deferToThread(_stop_imap_cb) + if cv is not None: + def _stop_imap_cb(): + logger.debug('Stopping in memory store.') + self._memstore.stop_and_flush() + while not self._memstore.producer.is_queue_empty(): + logger.debug('Waiting for queue to be empty.') + # TODO use a gatherResults over the new/dirty + # deferred list, + # as in memorystore's expunge() method. + time.sleep(1) + # notify that service has stopped + logger.debug('Notifying that service has stopped.') + cv.acquire() + cv.notify() + cv.release() + + return threads.deferToThread(_stop_imap_cb) def run_service(*args, **kwargs): @@ -164,6 +186,9 @@ def run_service(*args, **kwargs): the protocol. """ from twisted.internet import reactor + # it looks like qtreactor does not honor this, + # but other reactors should. + reactor.suggestThreadPoolSize(20) leap_assert(len(args) == 2) soledad, keymanager = args diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py index 8e22f26..732fe03 100644 --- a/src/leap/mail/imap/soledadstore.py +++ b/src/leap/mail/imap/soledadstore.py @@ -20,14 +20,14 @@ A MessageStore that writes to Soledad. import logging import threading +from collections import defaultdict from itertools import chain from u1db import errors as u1db_errors -from twisted.internet import defer from twisted.python import log from zope.interface import implements -from leap.common.check import leap_assert_type +from leap.common.check import leap_assert_type, leap_assert from leap.mail.decorators import deferred_to_thread from leap.mail.imap.messageparts import MessagePartType from leap.mail.imap.messageparts import MessageWrapper @@ -35,15 +35,13 @@ from leap.mail.imap.messageparts import RecentFlagsDoc from leap.mail.imap.fields import fields from leap.mail.imap.interfaces import IMessageStore from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first +from leap.mail.utils import first, empty, accumulator_queue logger = logging.getLogger(__name__) # TODO -# [ ] Delete original message from the incoming queue after all successful -# writes. -# [ ] Implement a retry queue. +# [ ] Implement a retry queue? # [ ] Consider journaling of operations. @@ -86,10 +84,12 @@ class ContentDedup(object): if not header_docs: return False - if len(header_docs) != 1: - logger.warning("Found more than one copy of chash %s!" - % (chash,)) - logger.debug("Found header doc with that hash! Skipping save!") + # FIXME enable only to debug this problem. + #if len(header_docs) != 1: + #logger.warning("Found more than one copy of chash %s!" + #% (chash,)) + + #logger.debug("Found header doc with that hash! Skipping save!") return True def _content_does_exist(self, doc): @@ -110,10 +110,11 @@ class ContentDedup(object): if not attach_docs: return False - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") + # FIXME enable only to debug this problem + #if len(attach_docs) != 1: + #logger.warning("Found more than one copy of phash %s!" + #% (phash,)) + #logger.debug("Found attachment doc with that hash! Skipping save!") return True @@ -121,13 +122,26 @@ class MsgWriteError(Exception): """ Raised if any exception is found while saving message parts. """ + pass + + +""" +A lock per document. +""" +# TODO should bound the space of this!!! +# http://stackoverflow.com/a/2437645/1157664 +# Setting this to twice the number of threads in the threadpool +# should be safe. +put_locks = defaultdict(lambda: threading.Lock()) class SoledadStore(ContentDedup): """ This will create docs in the local Soledad database. """ - _last_uid_lock = threading.Lock() + _soledad_rw_lock = threading.Lock() + _remove_lock = threading.Lock() + _mbox_doc_locks = defaultdict(lambda: threading.Lock()) implements(IMessageConsumer, IMessageStore) @@ -138,8 +152,20 @@ class SoledadStore(ContentDedup): :param soledad: the soledad instance :type soledad: Soledad """ + from twisted.internet import reactor + self.reactor = reactor + self._soledad = soledad + self._CREATE_DOC_FUN = self._soledad.create_doc + self._PUT_DOC_FUN = self._soledad.put_doc + self._GET_DOC_FUN = self._soledad.get_doc + + # we instantiate an accumulator to batch the notifications + self.docs_notify_queue = accumulator_queue( + lambda item: reactor.callFromThread(self._unset_new_dirty, item), + 20) + # IMessageStore # ------------------------------------------------------------------- @@ -194,47 +220,32 @@ class SoledadStore(ContentDedup): # IMessageConsumer - # It's not thread-safe to defer this to a different thread + # TODO should handle the delete case + # TODO should handle errors better + # TODO could generalize this method into a generic consumer + # and only implement `process` here def consume(self, queue): """ Creates a new document in soledad db. - :param queue: queue to get item from, with content of the document - to be inserted. - :type queue: Queue - """ - # TODO should delete the original message from incoming only after - # the writes are done. - # TODO should handle the delete case - # TODO should handle errors - # TODO could generalize this method into a generic consumer - # and only implement `process` here - - def docWriteCallBack(doc_wrapper): - """ - Callback for a successful write of a document wrapper. - """ - if isinstance(doc_wrapper, MessageWrapper): - # If everything went well, we can unset the new flag - # in the source store (memory store) - self._unset_new_dirty(doc_wrapper) - - def docWriteErrorBack(failure): - """ - Errorback for write operations. - """ - log.error("Error while processing item.") - log.msg(failure.getTraceBack()) - - while not queue.empty(): - doc_wrapper = queue.get() - d = defer.Deferred() - d.addCallbacks(docWriteCallBack, docWriteErrorBack) - - self._consume_doc(doc_wrapper, d) + :param queue: a tuple of queues to get item from, with content of the + document to be inserted. + :type queue: tuple of Queues + """ + new, dirty = queue + while not new.empty(): + doc_wrapper = new.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + while not dirty.empty(): + doc_wrapper = dirty.get() + self.reactor.callInThread(self._consume_doc, doc_wrapper, + self.docs_notify_queue) + + # Queue empty, flush the notifications queue. + self.docs_notify_queue(None, flush=True) - @deferred_to_thread def _unset_new_dirty(self, doc_wrapper): """ Unset the `new` and `dirty` flags for this document wrapper in the @@ -243,56 +254,76 @@ class SoledadStore(ContentDedup): :param doc_wrapper: a MessageWrapper instance :type doc_wrapper: MessageWrapper """ - # XXX debug msg id/mbox? - logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False + if isinstance(doc_wrapper, MessageWrapper): + # XXX still needed for debug quite often + #logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False @deferred_to_thread - def _consume_doc(self, doc_wrapper, deferred): + def _consume_doc(self, doc_wrapper, notify_queue): """ Consume each document wrapper in a separate thread. + We pass an instance of an accumulator that handles the notifications + to the memorystore when the write has been done. :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance :type doc_wrapper: MessageWrapper or RecentFlagsDoc - :param deferred: a deferred that will be fired when the write operation - has finished, either calling its callback or its - errback depending on whether it succeed. - :type deferred: Deferred + :param notify_queue: a callable that handles the writeback + notifications to the memstore. + :type notify_queue: callable """ - items = self._process(doc_wrapper) + def queueNotifyBack(failed, doc_wrapper): + if failed: + log.msg("There was an error writing the mesage...") + else: + notify_queue(doc_wrapper) + + def doSoledadCalls(items): + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + failed = self._soledad_write_document_parts(items) + queueNotifyBack(failed, doc_wrapper) - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - doc_wrapper = items.next() + doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) - # From here, we unpack the subpart items and - # the right soledad call. + # + # SoledadStore specific methods. + # + + def _soledad_write_document_parts(self, items): + """ + Write the document parts to soledad in a separate thread. + + :param items: the iterator through the different document wrappers + payloads. + :type items: iterator + :return: whether the write was successful or not + :rtype: bool + """ failed = False for item, call in items: + if empty(item): + continue try: self._try_call(call, item) except Exception as exc: - failed = exc + logger.debug("ITEM WAS: %s" % str(item)) + logger.debug("ITEM CONTENT WAS: %s" % str(item.content)) + logger.exception(exc) + failed = True continue - if failed: - deferred.errback(MsgWriteError( - "There was an error writing the mesage")) - else: - deferred.callback(doc_wrapper) - - # - # SoledadStore specific methods. - # + return failed - def _process(self, doc_wrapper): + def _iter_wrapper_subparts(self, doc_wrapper): """ Return an iterator that will yield the doc_wrapper in the first place, followed by the subparts item and the proper call type for every item in the queue, if any. - :param queue: the queue from where we'll pick item. - :type queue: Queue + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc """ if isinstance(doc_wrapper, MessageWrapper): return chain((doc_wrapper,), @@ -315,11 +346,38 @@ class SoledadStore(ContentDedup): """ if call is None: return - try: - call(item) - except u1db_errors.RevisionConflict as exc: - logger.exception("Error: %r" % (exc,)) - raise exc + + if call == self._PUT_DOC_FUN: + doc_id = item.doc_id + with put_locks[doc_id]: + doc = self._GET_DOC_FUN(doc_id) + + if doc is None: + logger.warning("BUG! Dirty doc but could not " + "find document %s" % (doc_id,)) + return + + doc.content = dict(item.content) + + item = doc + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + + else: + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + except Exception as exc: + logger.exception("Error: %r" % (exc,)) + raise exc def _get_calls_for_msg_parts(self, msg_wrapper): """ @@ -334,7 +392,7 @@ class SoledadStore(ContentDedup): call = None if msg_wrapper.new: - call = self._soledad.create_doc + call = self._CREATE_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): @@ -353,18 +411,18 @@ class SoledadStore(ContentDedup): # the flags doc. elif msg_wrapper.dirty: - call = self._soledad.put_doc + call = self._PUT_DOC_FUN # item is expected to be a MessagePartDoc for item in msg_wrapper.walk(): # XXX FIXME Give error if dirty and not doc_id !!! doc_id = item.doc_id # defend! if not doc_id: + logger.warning("Dirty item but no doc_id!") continue - doc = self._soledad.get_doc(doc_id) - doc.content = dict(item.content) + if item.part == MessagePartType.fdoc: - logger.debug("PUT dirty fdoc") - yield doc, call + #logger.debug("PUT dirty fdoc") + yield item, call # XXX also for linkage-doc !!! else: @@ -379,17 +437,16 @@ class SoledadStore(ContentDedup): :return: a tuple with recent-flags doc payload and callable :rtype: tuple """ - call = self._soledad.put_doc - rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + call = self._CREATE_DOC_FUN payload = rflags_wrapper.content - logger.debug("Saving RFLAGS to Soledad...") - if payload: - rdoc.content = payload - yield rdoc, call + logger.debug("Saving RFLAGS to Soledad...") + yield payload, call - def _get_mbox_document(self, mbox): + # Mbox documents and attributes + + def get_mbox_document(self, mbox): """ Return mailbox document. @@ -399,15 +456,80 @@ class SoledadStore(ContentDedup): the query failed. :rtype: SoledadDocument or None. """ + with self._mbox_doc_locks[mbox]: + return self._get_mbox_document(mbox) + + def _get_mbox_document(self, mbox): + """ + Helper for returning the mailbox document. + """ try: query = self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_MBOX_VAL, mbox) if query: return query.pop() + else: + logger.error("Could not find mbox document for %r" % + (self.mbox,)) except Exception as exc: logger.exception("Unhandled error %r" % exc) + def get_mbox_closed(self, mbox): + """ + Return the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: bool + """ + mbox_doc = self.get_mbox_document() + return mbox_doc.content.get(fields.CLOSED_KEY, False) + + def set_mbox_closed(self, mbox, closed): + """ + Set the closed attribute for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param closed: the value to be set + :type closed: bool + """ + leap_assert(isinstance(closed, bool), "closed needs to be boolean") + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + if mbox_doc is None: + logger.error( + "Could not find mbox document for %r" % (mbox,)) + return + mbox_doc.content[fields.CLOSED_KEY] = closed + self._soledad.put_doc(mbox_doc) + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + # XXX change for a lock related to the mbox document + # itself. + with self._mbox_doc_locks[mbox]: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value > old_val: + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + def get_flags_doc(self, mbox, uid): """ Return the SoledadDocument for the given mbox and uid. @@ -416,12 +538,16 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode :param uid: the UID for the message :type uid: int + :rtype: SoledadDocument or None """ result = None try: flag_docs = self._soledad.get_from_index( fields.TYPE_MBOX_UID_IDX, fields.TYPE_FLAGS_VAL, mbox, str(uid)) + if len(flag_docs) != 1: + logger.warning("More than one flag doc for %r:%s" % + (mbox, uid)) result = first(flag_docs) except Exception as exc: # ugh! Something's broken down there! @@ -430,36 +556,25 @@ class SoledadStore(ContentDedup): finally: return result - def write_last_uid(self, mbox, value): + def get_headers_doc(self, chash): """ - Write the `last_uid` integer to the proper mailbox document - in Soledad. - This is called from the deferred triggered by - memorystore.increment_last_soledad_uid, which is expected to - run in a separate thread. + Return the document that keeps the headers for a message + indexed by its content-hash. - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int + :param chash: the content-hash to retrieve the document from. + :type chash: str or unicode + :rtype: SoledadDocument or None """ - leap_assert_type(value, int) - key = fields.LAST_UID_KEY - - with self._last_uid_lock: - mbox_doc = self._get_mbox_document(mbox) - old_val = mbox_doc.content[key] - if value < old_val: - logger.error("%r:%s Tried to write a UID lesser than what's " - "stored!" % (mbox, value)) - mbox_doc.content[key] = value - self._soledad.put_doc(mbox_doc) + head_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + return first(head_docs) # deleted messages def deleted_iter(self, mbox): """ - Get an iterator for the SoledadDocuments for messages + Get an iterator for the the doc_id for SoledadDocuments for messages with \\Deleted flag for a given mailbox. :param mbox: the mailbox @@ -467,11 +582,10 @@ class SoledadStore(ContentDedup): :return: iterator through deleted message docs :rtype: iterable """ - return (doc for doc in self._soledad.get_from_index( + return [doc.doc_id for doc in self._soledad.get_from_index( fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, mbox, '1')) + fields.TYPE_FLAGS_VAL, mbox, '1')] - # TODO can deferToThread this? def remove_all_deleted(self, mbox): """ Remove from Soledad all messages flagged as deleted for a given @@ -481,7 +595,14 @@ class SoledadStore(ContentDedup): :type mbox: str or unicode """ deleted = [] - for doc in self.deleted_iter(mbox): - deleted.append(doc.content[fields.UID_KEY]) - self._soledad.delete_doc(doc) + for doc_id in self.deleted_iter(mbox): + with self._remove_lock: + doc = self._soledad.get_doc(doc_id) + if doc is not None: + self._soledad.delete_doc(doc) + try: + deleted.append(doc.content[fields.UID_KEY]) + except TypeError: + # empty content + pass return deleted diff --git a/src/leap/mail/imap/tests/regressions b/src/leap/mail/imap/tests/regressions index 0a43398..efe3f46 100755 --- a/src/leap/mail/imap/tests/regressions +++ b/src/leap/mail/imap/tests/regressions @@ -101,7 +101,6 @@ def compare_msg_parts(a, b): pprint(b[index]) print - return all_match @@ -328,7 +327,7 @@ def cbAppendNextMessage(proto): return proto.append( REGRESSIONS_FOLDER, msg ).addCallback( - lambda r: proto.examine(REGRESSIONS_FOLDER) + lambda r: proto.select(REGRESSIONS_FOLDER) ).addCallback( cbAppend, proto, raw ).addErrback( @@ -379,6 +378,9 @@ def cbCompareMessage(result, proto, raw): if result: keys = result.keys() keys.sort() + else: + print "[-] GOT NO RESULT" + return proto.logout() latest = max(keys) diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py index 8c1cf20..fd88440 100644 --- a/src/leap/mail/imap/tests/test_imap.py +++ b/src/leap/mail/imap/tests/test_imap.py @@ -43,6 +43,7 @@ from itertools import chain from mock import Mock from nose.twistedtools import deferred, stop_reactor +from unittest import skip from twisted.mail import imap4 @@ -64,11 +65,16 @@ import twisted.cred.portal from leap.common.testing.basetest import BaseLeapTest from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.mailbox import SoledadMailbox +from leap.mail.imap.memorystore import MemoryStore from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.server import LeapIMAPServer from leap.soledad.client import Soledad from leap.soledad.client import SoledadCrypto +TEST_USER = "testuser@leap.se" +TEST_PASSWD = "1234" + def strip(f): return lambda result, f=f: f() @@ -89,10 +95,10 @@ def initialize_soledad(email, gnupg_home, tempdir): """ Initializes soledad by hand - @param email: ID for the user - @param gnupg_home: path to home used by gnupg - @param tempdir: path to temporal dir - @rtype: Soledad instance + :param email: ID for the user + :param gnupg_home: path to home used by gnupg + :param tempdir: path to temporal dir + :rtype: Soledad instance """ uuid = "foobar-uuid" @@ -125,55 +131,6 @@ def initialize_soledad(email, gnupg_home, tempdir): return _soledad -# -# Simple LEAP IMAP4 Server for testing -# - -class SimpleLEAPServer(imap4.IMAP4Server): - - """ - A Simple IMAP4 Server with mailboxes backed by Soledad. - - This should be pretty close to the real LeapIMAP4Server that we - will be instantiating as a service, minus the authentication bits. - """ - - def __init__(self, *args, **kw): - - soledad = kw.pop('soledad', None) - - imap4.IMAP4Server.__init__(self, *args, **kw) - realm = TestRealm() - - # XXX Why I AM PASSING THE ACCOUNT TO - # REALM? I AM NOT USING THAT NOW, AM I??? - realm.theAccount = SoledadBackedAccount( - 'testuser', - soledad=soledad) - - portal = cred.portal.Portal(realm) - c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() - self.checker = c - self.portal = portal - portal.registerChecker(c) - self.timeoutTest = False - - def lineReceived(self, line): - if self.timeoutTest: - # Do not send a respones - return - - imap4.IMAP4Server.lineReceived(self, line) - - _username = 'testuser' - _password = 'password-test' - - def authenticateLogin(self, username, password): - if username == self._username and password == self._password: - return imap4.IAccount, self.theAccount, lambda: None - raise cred.error.UnauthorizedLogin() - - class TestRealm: """ @@ -255,13 +212,6 @@ class IMAP4HelperMixin(BaseLeapTest): # Soledad: config info cls.gnupg_home = "%s/gnupg" % cls.tempdir cls.email = 'leap@leap.se' - # cls.db1_file = "%s/db1.u1db" % cls.tempdir - # cls.db2_file = "%s/db2.u1db" % cls.tempdir - # open test dbs - # cls._db1 = u1db.open(cls.db1_file, create=True, - # document_factory=SoledadDocument) - # cls._db2 = u1db.open(cls.db2_file, create=True, - # document_factory=SoledadDocument) # initialize soledad by hand so we can control keys cls._soledad = initialize_soledad( @@ -283,8 +233,6 @@ class IMAP4HelperMixin(BaseLeapTest): Restores the old path and home environment variables. Removes the temporal dir created for tests. """ - # cls._db1.close() - # cls._db2.close() cls._soledad.close() os.environ["PATH"] = cls.old_path @@ -301,8 +249,13 @@ class IMAP4HelperMixin(BaseLeapTest): but passing the same Soledad instance (it's costly to initialize), so we have to be sure to restore state across tests. """ + UUID = 'deadbeef', + USERID = TEST_USER + memstore = MemoryStore() + d = defer.Deferred() - self.server = SimpleLEAPServer( + self.server = LeapIMAPServer( + uuid=UUID, userid=USERID, contextFactory=self.serverCTX, # XXX do we really need this?? soledad=self._soledad) @@ -317,9 +270,10 @@ class IMAP4HelperMixin(BaseLeapTest): # I THINK we ONLY need to do it at one place now. theAccount = SoledadBackedAccount( - 'testuser', - soledad=self._soledad) - SimpleLEAPServer.theAccount = theAccount + USERID, + soledad=self._soledad, + memstore=memstore) + LeapIMAPServer.theAccount = theAccount # in case we get something from previous tests... for mb in self.server.theAccount.mailboxes: @@ -404,8 +358,9 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): We override mixin method since we are only testing MessageCollection interface in this particular TestCase """ + memstore = MemoryStore() self.messages = MessageCollection("testmbox%s" % (self.count,), - self._soledad) + self._soledad, memstore=memstore) MessageCollectionTestCase.count += 1 def tearDown(self): @@ -414,9 +369,6 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): """ del self.messages - def wait(self): - time.sleep(2) - def testEmptyMessage(self): """ Test empty message and collection @@ -425,11 +377,11 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): self.assertEqual( em, { + "chash": '', + "deleted": False, "flags": [], "mbox": "inbox", - "recent": True, "seen": False, - "deleted": False, "multi": False, "size": 0, "type": "flags", @@ -441,79 +393,100 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase): """ Add multiple messages """ - # TODO really profile addition mc = self.messages - print "messages", self.messages self.assertEqual(self.messages.count(), 0) - mc.add_msg('Stuff', uid=1, subject="test1") - mc.add_msg('Stuff', uid=2, subject="test2") - mc.add_msg('Stuff', uid=3, subject="test3") - mc.add_msg('Stuff', uid=4, subject="test4") - self.wait() - self.assertEqual(self.messages.count(), 4) - mc.add_msg('Stuff', uid=5, subject="test5") - mc.add_msg('Stuff', uid=6, subject="test6") - mc.add_msg('Stuff', uid=7, subject="test7") - self.wait() - self.assertEqual(self.messages.count(), 7) - self.wait() + def add_first(): + d = defer.gatherResults([ + mc.add_msg('Stuff 1', uid=1, subject="test1"), + mc.add_msg('Stuff 2', uid=2, subject="test2"), + mc.add_msg('Stuff 3', uid=3, subject="test3"), + mc.add_msg('Stuff 4', uid=4, subject="test4")]) + return d + + def add_second(result): + d = defer.gatherResults([ + mc.add_msg('Stuff 5', uid=5, subject="test5"), + mc.add_msg('Stuff 6', uid=6, subject="test6"), + mc.add_msg('Stuff 7', uid=7, subject="test7")]) + return d + + def check_second(result): + return self.assertEqual(mc.count(), 7) + + d1 = add_first() + d1.addCallback(add_second) + d1.addCallback(check_second) + + @skip("needs update!") def testRecentCount(self): """ Test the recent count """ mc = self.messages - self.assertEqual(self.messages.count_recent(), 0) - mc.add_msg('Stuff', uid=1, subject="test1") + countrecent = mc.count_recent + eq = self.assertEqual + + self.assertEqual(countrecent(), 0) + + d = mc.add_msg('Stuff', uid=1, subject="test1") # For the semantics defined in the RFC, we auto-add the # recent flag by default. - self.wait() - self.assertEqual(self.messages.count_recent(), 1) - mc.add_msg('Stuff', subject="test2", uid=2, - flags=('\\Deleted',)) - self.wait() - self.assertEqual(self.messages.count_recent(), 2) - mc.add_msg('Stuff', subject="test3", uid=3, - flags=('\\Recent',)) - self.wait() - self.assertEqual(self.messages.count_recent(), 3) - mc.add_msg('Stuff', subject="test4", uid=4, - flags=('\\Deleted', '\\Recent')) - self.wait() - self.assertEqual(self.messages.count_recent(), 4) - - for msg in mc: - msg.removeFlags(('\\Recent',)) - self.assertEqual(mc.count_recent(), 0) + + def add2(_): + return mc.add_msg('Stuff', subject="test2", uid=2, + flags=('\\Deleted',)) + + def add3(_): + return mc.add_msg('Stuff', subject="test3", uid=3, + flags=('\\Recent',)) + + def add4(_): + return mc.add_msg('Stuff', subject="test4", uid=4, + flags=('\\Deleted', '\\Recent')) + + d.addCallback(lambda r: eq(countrecent(), 1)) + d.addCallback(add2) + d.addCallback(lambda r: eq(countrecent(), 2)) + d.addCallback(add3) + d.addCallback(lambda r: eq(countrecent(), 3)) + d.addCallback(add4) + d.addCallback(lambda r: eq(countrecent(), 4)) def testFilterByMailbox(self): """ Test that queries filter by selected mailbox """ - def wait(): - time.sleep(1) - mc = self.messages self.assertEqual(self.messages.count(), 0) - mc.add_msg('', uid=1, subject="test1") - mc.add_msg('', uid=2, subject="test2") - mc.add_msg('', uid=3, subject="test3") - wait() - self.assertEqual(self.messages.count(), 3) - newmsg = mc._get_empty_doc() - newmsg['mailbox'] = "mailbox/foo" - mc._soledad.create_doc(newmsg) - self.assertEqual(mc.count(), 3) - self.assertEqual( - len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4) + + def add_1(): + d1 = mc.add_msg('msg 1', uid=1, subject="test1") + d2 = mc.add_msg('msg 2', uid=2, subject="test2") + d3 = mc.add_msg('msg 3', uid=3, subject="test3") + d = defer.gatherResults([d1, d2, d3]) + return d + + add_1().addCallback(lambda ignored: self.assertEqual( + mc.count(), 3)) + + # XXX this has to be redone to fit memstore ------------# + #newmsg = mc._get_empty_doc() + #newmsg['mailbox'] = "mailbox/foo" + #mc._soledad.create_doc(newmsg) + #self.assertEqual(mc.count(), 3) + #self.assertEqual( + #len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4) class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): + # TODO this currently will use a memory-only store. + # create a different one for testing soledad sync. """ Tests for the generic behavior of the LeapIMAP4Server which, right now, it's just implemented in this test file as - SimpleLEAPServer. We will move the implementation, together with + LeapIMAPServer. We will move the implementation, together with authentication bits, to leap.mail.imap.server so it can be instantiated from the tac file. @@ -542,7 +515,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.result.append(0) def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def create(): for name in succeed + fail: @@ -560,7 +533,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): def _cbTestCreate(self, ignored, succeed, fail): self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail)) - mbox = SimpleLEAPServer.theAccount.mailboxes + mbox = LeapIMAPServer.theAccount.mailboxes answers = ['foobox', 'testbox', 'test/box', 'test', 'test/box/box'] mbox.sort() answers.sort() @@ -571,10 +544,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test whether we can delete mailboxes """ - SimpleLEAPServer.theAccount.addMailbox('delete/me') + LeapIMAPServer.theAccount.addMailbox('delete/me') def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('delete/me') @@ -586,7 +559,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): d = defer.gatherResults([d1, d2]) d.addCallback( lambda _: self.assertEqual( - SimpleLEAPServer.theAccount.mailboxes, [])) + LeapIMAPServer.theAccount.mailboxes, [])) return d def testIllegalInboxDelete(self): @@ -597,7 +570,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.stashed = None def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('inbox') @@ -619,10 +592,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): def testNonExistentDelete(self): """ Test what happens if we try to delete a non-existent mailbox. - We expect an error raised stating 'No such inbox' + We expect an error raised stating 'No such mailbox' """ def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('delete/me') @@ -637,8 +610,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) - d.addCallback(lambda _: self.assertEqual(str(self.failure.value), - 'No such mailbox')) + d.addCallback(lambda _: self.assertTrue( + str(self.failure.value).startswith('No such mailbox'))) return d @deferred(timeout=None) @@ -649,14 +622,14 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): Obs: this test will fail if SoledadMailbox returns hardcoded flags. """ - SimpleLEAPServer.theAccount.addMailbox('delete') - to_delete = SimpleLEAPServer.theAccount.getMailbox('delete') + LeapIMAPServer.theAccount.addMailbox('delete') + to_delete = LeapIMAPServer.theAccount.getMailbox('delete') to_delete.setFlags((r'\Noselect',)) to_delete.getFlags() - SimpleLEAPServer.theAccount.addMailbox('delete/me') + LeapIMAPServer.theAccount.addMailbox('delete/me') def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def delete(): return self.client.delete('delete') @@ -681,10 +654,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test whether we can rename a mailbox """ - SimpleLEAPServer.theAccount.addMailbox('oldmbox') + LeapIMAPServer.theAccount.addMailbox('oldmbox') def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def rename(): return self.client.rename('oldmbox', 'newname') @@ -696,7 +669,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( - SimpleLEAPServer.theAccount.mailboxes, + LeapIMAPServer.theAccount.mailboxes, ['newname'])) return d @@ -709,7 +682,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.stashed = None def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def rename(): return self.client.rename('inbox', 'frotz') @@ -733,11 +706,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Try to rename hierarchical mailboxes """ - SimpleLEAPServer.theAccount.create('oldmbox/m1') - SimpleLEAPServer.theAccount.create('oldmbox/m2') + LeapIMAPServer.theAccount.create('oldmbox/m1') + LeapIMAPServer.theAccount.create('oldmbox/m2') def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def rename(): return self.client.rename('oldmbox', 'newname') @@ -750,7 +723,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): return d.addCallback(self._cbTestHierarchicalRename) def _cbTestHierarchicalRename(self, ignored): - mboxes = SimpleLEAPServer.theAccount.mailboxes + mboxes = LeapIMAPServer.theAccount.mailboxes expected = ['newname', 'newname/m1', 'newname/m2'] mboxes.sort() self.assertEqual(mboxes, [s for s in expected]) @@ -761,7 +734,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): Test whether we can mark a mailbox as subscribed to """ def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def subscribe(): return self.client.subscribe('this/mbox') @@ -773,7 +746,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( - SimpleLEAPServer.theAccount.subscriptions, + LeapIMAPServer.theAccount.subscriptions, ['this/mbox'])) return d @@ -782,11 +755,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test whether we can unsubscribe from a set of mailboxes """ - SimpleLEAPServer.theAccount.subscribe('this/mbox') - SimpleLEAPServer.theAccount.subscribe('that/mbox') + LeapIMAPServer.theAccount.subscribe('this/mbox') + LeapIMAPServer.theAccount.subscribe('that/mbox') def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def unsubscribe(): return self.client.unsubscribe('this/mbox') @@ -798,7 +771,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEqual( - SimpleLEAPServer.theAccount.subscriptions, + LeapIMAPServer.theAccount.subscriptions, ['that/mbox'])) return d @@ -811,7 +784,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.selectedArgs = None def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def select(): def selected(args): @@ -829,7 +802,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect) def _cbTestSelect(self, ignored): - mbox = SimpleLEAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT') + mbox = LeapIMAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT') self.assertEqual(self.server.mbox.messages.mbox, mbox.messages.mbox) self.assertEqual(self.selectedArgs, { 'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42, @@ -920,7 +893,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): Test login """ def login(): - d = self.client.login('testuser', 'password-test') + d = self.client.login(TEST_USER, TEST_PASSWD) d.addCallback(self._cbStopClient) d1 = self.connected.addCallback( strip(login)).addErrback(self._ebGeneral) @@ -928,7 +901,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): return d.addCallback(self._cbTestLogin) def _cbTestLogin(self, ignored): - self.assertEqual(self.server.account, SimpleLEAPServer.theAccount) + self.assertEqual(self.server.account, LeapIMAPServer.theAccount) self.assertEqual(self.server.state, 'auth') @deferred(timeout=None) @@ -937,7 +910,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): Test bad login """ def login(): - d = self.client.login('testuser', 'wrong-password') + d = self.client.login("bad_user@leap.se", TEST_PASSWD) d.addBoth(self._cbStopClient) d1 = self.connected.addCallback( @@ -947,19 +920,19 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): return d.addCallback(self._cbTestFailedLogin) def _cbTestFailedLogin(self, ignored): - self.assertEqual(self.server.account, None) self.assertEqual(self.server.state, 'unauth') + self.assertEqual(self.server.account, None) @deferred(timeout=None) def testLoginRequiringQuoting(self): """ Test login requiring quoting """ - self.server._username = '{test}user' + self.server._userid = '{test}user@leap.se' self.server._password = '{test}password' def login(): - d = self.client.login('{test}user', '{test}password') + d = self.client.login('{test}user@leap.se', '{test}password') d.addBoth(self._cbStopClient) d1 = self.connected.addCallback( @@ -968,7 +941,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): return d.addCallback(self._cbTestLoginRequiringQuoting) def _cbTestLoginRequiringQuoting(self, ignored): - self.assertEqual(self.server.account, SimpleLEAPServer.theAccount) + self.assertEqual(self.server.account, LeapIMAPServer.theAccount) self.assertEqual(self.server.state, 'auth') # @@ -983,7 +956,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.namespaceArgs = None def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def namespace(): def gotNamespace(args): @@ -1022,7 +995,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.examinedArgs = None def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def examine(): def examined(args): @@ -1049,15 +1022,15 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): 'READ-WRITE': False}) def _listSetup(self, f): - SimpleLEAPServer.theAccount.addMailbox('root/subthingl', - creation_ts=42) - SimpleLEAPServer.theAccount.addMailbox('root/another-thing', - creation_ts=42) - SimpleLEAPServer.theAccount.addMailbox('non-root/subthing', - creation_ts=42) + LeapIMAPServer.theAccount.addMailbox('root/subthingl', + creation_ts=42) + LeapIMAPServer.theAccount.addMailbox('root/another-thing', + creation_ts=42) + LeapIMAPServer.theAccount.addMailbox('non-root/subthing', + creation_ts=42) def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def listed(answers): self.listed = answers @@ -1092,7 +1065,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test LSub command """ - SimpleLEAPServer.theAccount.subscribe('root/subthingl2') + LeapIMAPServer.theAccount.subscribe('root/subthingl2') def lsub(): return self.client.lsub('root', '%') @@ -1106,12 +1079,12 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test Status command """ - SimpleLEAPServer.theAccount.addMailbox('root/subthings') + LeapIMAPServer.theAccount.addMailbox('root/subthings') # XXX FIXME ---- should populate this a little bit, # with unseen etc... def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def status(): return self.client.status( @@ -1139,7 +1112,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): Test failed status command with a non-existent mailbox """ def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def status(): return self.client.status( @@ -1180,13 +1153,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ infile = util.sibpath(__file__, 'rfc822.message') message = open(infile) - SimpleLEAPServer.theAccount.addMailbox('root/subthing') + LeapIMAPServer.theAccount.addMailbox('root/subthing') def login(): - return self.client.login('testuser', 'password-test') - - def wait(): - time.sleep(0.5) + return self.client.login(TEST_USER, TEST_PASSWD) def append(): return self.client.append( @@ -1198,21 +1168,19 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): d1 = self.connected.addCallback(strip(login)) d1.addCallbacks(strip(append), self._ebGeneral) - d1.addCallbacks(strip(wait), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) return d.addCallback(self._cbTestFullAppend, infile) def _cbTestFullAppend(self, ignored, infile): - mb = SimpleLEAPServer.theAccount.getMailbox('root/subthing') - time.sleep(0.5) + mb = LeapIMAPServer.theAccount.getMailbox('root/subthing') self.assertEqual(1, len(mb.messages)) msg = mb.messages.get_msg_by_uid(1) self.assertEqual( - ('\\SEEN', '\\DELETED'), - msg.getFlags()) + set(('\\Recent', '\\SEEN', '\\DELETED')), + set(msg.getFlags())) self.assertEqual( 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', @@ -1220,14 +1188,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): parsed = self.parser.parse(open(infile)) body = parsed.get_payload() - headers = parsed.items() + headers = dict(parsed.items()) self.assertEqual( body, msg.getBodyFile().read()) - - msg_headers = msg.getHeaders(True, "",) - gotheaders = list(chain( - *[[(k, item) for item in v] for (k, v) in msg_headers.items()])) + gotheaders = msg.getHeaders(True) self.assertItemsEqual( headers, gotheaders) @@ -1238,13 +1203,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): Test partially appending a message to the mailbox """ infile = util.sibpath(__file__, 'rfc822.message') - SimpleLEAPServer.theAccount.addMailbox('PARTIAL/SUBTHING') + LeapIMAPServer.theAccount.addMailbox('PARTIAL/SUBTHING') def login(): - return self.client.login('testuser', 'password-test') - - def wait(): - time.sleep(1) + return self.client.login(TEST_USER, TEST_PASSWD) def append(): message = file(infile) @@ -1257,7 +1219,6 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): ) ) d1 = self.connected.addCallback(strip(login)) - d1.addCallbacks(strip(wait), self._ebGeneral) d1.addCallbacks(strip(append), self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() @@ -1266,16 +1227,13 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self._cbTestPartialAppend, infile) def _cbTestPartialAppend(self, ignored, infile): - mb = SimpleLEAPServer.theAccount.getMailbox('PARTIAL/SUBTHING') - time.sleep(1) + mb = LeapIMAPServer.theAccount.getMailbox('PARTIAL/SUBTHING') self.assertEqual(1, len(mb.messages)) msg = mb.messages.get_msg_by_uid(1) self.assertEqual( - ('\\SEEN', ), - msg.getFlags() + set(('\\SEEN', '\\Recent')), + set(msg.getFlags()) ) - #self.assertEqual( - #'Right now', msg.getInternalDate()) parsed = self.parser.parse(open(infile)) body = parsed.get_payload() self.assertEqual( @@ -1287,10 +1245,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): """ Test check command """ - SimpleLEAPServer.theAccount.addMailbox('root/subthing') + LeapIMAPServer.theAccount.addMailbox('root/subthing') def login(): - return self.client.login('testuser', 'password-test') + return self.client.login(TEST_USER, TEST_PASSWD) def select(): return self.client.select('root/subthing') @@ -1306,7 +1264,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): # Okay, that was fun - @deferred(timeout=None) + @deferred(timeout=5) def testClose(self): """ Test closing the mailbox. We expect to get deleted all messages flagged @@ -1315,29 +1273,33 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): name = 'mailbox-close' self.server.theAccount.addMailbox(name) - m = SimpleLEAPServer.theAccount.getMailbox(name) - m.messages.add_msg('test 1', uid=1, subject="Message 1", - flags=('\\Deleted', 'AnotherFlag')) - m.messages.add_msg('test 2', uid=2, subject="Message 2", - flags=('AnotherFlag',)) - m.messages.add_msg('test 3', uid=3, subject="Message 3", - flags=('\\Deleted',)) + m = LeapIMAPServer.theAccount.getMailbox(name) def login(): - return self.client.login('testuser', 'password-test') - - def wait(): - time.sleep(1) + return self.client.login(TEST_USER, TEST_PASSWD) def select(): return self.client.select(name) + def add_messages(): + d1 = m.messages.add_msg( + 'test 1', uid=1, subject="Message 1", + flags=('\\Deleted', 'AnotherFlag')) + d2 = m.messages.add_msg( + 'test 2', uid=2, subject="Message 2", + flags=('AnotherFlag',)) + d3 = m.messages.add_msg( + 'test 3', uid=3, subject="Message 3", + flags=('\\Deleted',)) + d = defer.gatherResults([d1, d2, d3]) + return d + def close(): return self.client.close() d = self.connected.addCallback(strip(login)) - d.addCallbacks(strip(wait), self._ebGeneral) d.addCallbacks(strip(select), self._ebGeneral) + d.addCallbacks(strip(add_messages), self._ebGeneral) d.addCallbacks(strip(close), self._ebGeneral) d.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() @@ -1345,37 +1307,42 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): def _cbTestClose(self, ignored, m): self.assertEqual(len(m.messages), 1) - messages = [msg for msg in m.messages] - self.assertFalse(messages[0] is None) + + msg = m.messages.get_msg_by_uid(2) + self.assertFalse(msg is None) self.assertEqual( - messages[0]._hdoc.content['subject'], + msg._hdoc.content['subject'], 'Message 2') self.failUnless(m.closed) - @deferred(timeout=None) + @deferred(timeout=5) def testExpunge(self): """ Test expunge command """ name = 'mailbox-expunge' - SimpleLEAPServer.theAccount.addMailbox(name) - m = SimpleLEAPServer.theAccount.getMailbox(name) - m.messages.add_msg('test 1', uid=1, subject="Message 1", - flags=('\\Deleted', 'AnotherFlag')) - m.messages.add_msg('test 2', uid=2, subject="Message 2", - flags=('AnotherFlag',)) - m.messages.add_msg('test 3', uid=3, subject="Message 3", - flags=('\\Deleted',)) + self.server.theAccount.addMailbox(name) + m = LeapIMAPServer.theAccount.getMailbox(name) def login(): - return self.client.login('testuser', 'password-test') - - def wait(): - time.sleep(2) + return self.client.login(TEST_USER, TEST_PASSWD) def select(): return self.client.select('mailbox-expunge') + def add_messages(): + d1 = m.messages.add_msg( + 'test 1', uid=1, subject="Message 1", + flags=('\\Deleted', 'AnotherFlag')) + d2 = m.messages.add_msg( + 'test 2', uid=2, subject="Message 2", + flags=('AnotherFlag',)) + d3 = m.messages.add_msg( + 'test 3', uid=3, subject="Message 3", + flags=('\\Deleted',)) + d = defer.gatherResults([d1, d2, d3]) + return d + def expunge(): return self.client.expunge() @@ -1385,8 +1352,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): self.results = None d1 = self.connected.addCallback(strip(login)) - d1.addCallbacks(strip(wait), self._ebGeneral) d1.addCallbacks(strip(select), self._ebGeneral) + d1.addCallbacks(strip(add_messages), self._ebGeneral) d1.addCallbacks(strip(expunge), self._ebGeneral) d1.addCallbacks(expunged, self._ebGeneral) d1.addCallbacks(self._cbStopClient, self._ebGeneral) @@ -1397,9 +1364,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase): def _cbTestExpunge(self, ignored, m): # we only left 1 mssage with no deleted flag self.assertEqual(len(m.messages), 1) - messages = [msg for msg in m.messages] + + msg = m.messages.get_msg_by_uid(2) self.assertEqual( - messages[0]._hdoc.content['subject'], + msg._hdoc.content['subject'], 'Message 2') # the uids of the deleted messages self.assertItemsEqual(self.results, [1, 3]) diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py index 80121c8..c8f224c 100644 --- a/src/leap/mail/messageflow.py +++ b/src/leap/mail/messageflow.py @@ -49,7 +49,7 @@ class IMessageProducer(Interface): entities. """ - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. """ @@ -101,6 +101,10 @@ class MessageProducer(object): # and consumption is not likely (?) to consume huge amounts of memory in # our current settings, so the need to pause the stream is not urgent now. + # TODO use enum + STATE_NEW = 1 + STATE_DIRTY = 2 + def __init__(self, consumer, queue=Queue.Queue, period=1): """ Initializes the MessageProducer @@ -115,7 +119,8 @@ class MessageProducer(object): # it should implement a `consume` method self._consumer = consumer - self._queue = queue() + self._queue_new = queue() + self._queue_dirty = queue() self._period = period self._loop = LoopingCall(self._check_for_new) @@ -130,7 +135,7 @@ class MessageProducer(object): If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ - self._consumer.consume(self._queue) + self._consumer.consume((self._queue_new, self._queue_dirty)) if self.is_queue_empty(): self.stop() @@ -138,11 +143,13 @@ class MessageProducer(object): """ Return True if queue is empty, False otherwise. """ - return self._queue.empty() + new = self._queue_new + dirty = self._queue_dirty + return new.empty() and dirty.empty() # public methods: IMessageProducer - def push(self, item): + def push(self, item, state=None): """ Push a new item in the queue. @@ -150,7 +157,14 @@ class MessageProducer(object): """ # XXX this might raise if the queue does not accept any new # items. what to do then? - self._queue.put(item) + queue = self._queue_new + + if state == self.STATE_NEW: + queue = self._queue_new + if state == self.STATE_DIRTY: + queue = self._queue_dirty + + queue.put(item) self.start() def start(self): diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py index 942acfb..fed24b3 100644 --- a/src/leap/mail/utils.py +++ b/src/leap/mail/utils.py @@ -17,10 +17,10 @@ """ Mail utilities. """ -import copy import json import re import traceback +import Queue from leap.soledad.common.document import SoledadDocument @@ -49,7 +49,7 @@ def empty(thing): thing = thing.content try: return len(thing) == 0 - except ReferenceError: + except (ReferenceError, TypeError): return True @@ -94,6 +94,7 @@ def lowerdict(_dict): PART_MAP = "part_map" +PHASH = "phash" def _str_dict(d, k): @@ -130,6 +131,103 @@ def stringify_parts_map(d): return d +def phash_iter(d): + """ + A recursive generator that extracts all the payload-hashes + from an arbitrary nested parts-map dictionary. + + :param d: the dictionary to walk + :type d: dictionary + :return: a list of all the phashes found + :rtype: list + """ + if PHASH in d: + yield d[PHASH] + if PART_MAP in d: + for key in d[PART_MAP]: + for phash in phash_iter(d[PART_MAP][key]): + yield phash + + +def accumulator(fun, lim): + """ + A simple accumulator that uses a closure and a mutable + object to collect items. + When the count of items is greater than `lim`, the + collection is flushed after invoking a map of the function `fun` + over it. + + The returned accumulator can also be flushed at any moment + by passing a boolean as a second parameter. + + :param fun: the function to call over the collection + when its size is greater than `lim` + :type fun: callable + :param lim: the turning point for the collection + :type lim: int + :rtype: function + + >>> from pprint import pprint + >>> acc = accumulator(pprint, 2) + >>> acc(1) + >>> acc(2) + [1, 2] + >>> acc(3) + >>> acc(4) + [3, 4] + >>> acc = accumulator(pprint, 5) + >>> acc(1) + >>> acc(2) + >>> acc(3) + >>> acc(None, flush=True) + [1,2,3] + """ + KEY = "items" + _o = {KEY: []} + + def _accumulator(item, flush=False): + collection = _o[KEY] + collection.append(item) + if len(collection) >= lim or flush: + map(fun, filter(None, collection)) + _o[KEY] = [] + + return _accumulator + + +def accumulator_queue(fun, lim): + """ + A version of the accumulator that uses a queue. + + When the count of items is greater than `lim`, the + queue is flushed after invoking the function `fun` + over its items. + + The returned accumulator can also be flushed at any moment + by passing a boolean as a second parameter. + + :param fun: the function to call over the collection + when its size is greater than `lim` + :type fun: callable + :param lim: the turning point for the collection + :type lim: int + :rtype: function + """ + _q = Queue.Queue() + + def _accumulator(item, flush=False): + _q.put(item) + if _q.qsize() >= lim or flush: + collection = [_q.get() for i in range(_q.qsize())] + map(fun, filter(None, collection)) + + return _accumulator + + +# +# String manipulation +# + class CustomJsonScanner(object): """ This class is a context manager definition used to monkey patch the default @@ -169,6 +267,8 @@ class CustomJsonScanner(object): if not monkey_patched: return self._orig_scanstring(s, idx, *args, **kwargs) + # TODO profile to see if a compiled regex can get us some + # benefit here. found = False end = s.find("\"", idx) while not found: |