summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2014-02-17 12:45:19 -0300
committerTomás Touceda <chiiph@leap.se>2014-02-17 12:45:19 -0300
commit32e3c5ddaa5df30a573762e273f3a12f7eb3c255 (patch)
tree0924be918d990628e73eb2059fb6eb1200748b7c
parent7828c517ae162de4676a71e05f77339598acd6f7 (diff)
parent985ff0a78a8df0eafb7789383f711b9e5ceb1cb6 (diff)
Merge remote-tracking branch 'refs/remotes/kali/bug/separate_deferreds_threads' into develop
-rw-r--r--src/leap/mail/imap/account.py51
-rw-r--r--src/leap/mail/imap/mailbox.py275
-rw-r--r--src/leap/mail/imap/memorystore.py581
-rw-r--r--src/leap/mail/imap/messageparts.py46
-rw-r--r--src/leap/mail/imap/messages.py352
-rw-r--r--src/leap/mail/imap/server.py43
-rw-r--r--src/leap/mail/imap/service/imap.py59
-rw-r--r--src/leap/mail/imap/soledadstore.py369
-rwxr-xr-xsrc/leap/mail/imap/tests/regressions6
-rw-r--r--src/leap/mail/imap/tests/test_imap.py432
-rw-r--r--src/leap/mail/messageflow.py26
-rw-r--r--src/leap/mail/utils.py104
12 files changed, 1451 insertions, 893 deletions
diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py
index f985c04..1b5d4a0 100644
--- a/src/leap/mail/imap/account.py
+++ b/src/leap/mail/imap/account.py
@@ -18,9 +18,12 @@
Soledad Backed Account.
"""
import copy
+import logging
+import os
import time
from twisted.mail import imap4
+from twisted.python import log
from zope.interface import implements
from leap.common.check import leap_assert, leap_assert_type
@@ -30,12 +33,27 @@ from leap.mail.imap.parser import MBoxParser
from leap.mail.imap.mailbox import SoledadMailbox
from leap.soledad.client import Soledad
+logger = logging.getLogger(__name__)
+
+PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False)
+
+if PROFILE_CMD:
+
+ def _debugProfiling(result, cmdname, start):
+ took = (time.time() - start) * 1000
+ log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec")
+ return result
+
#######################################
# Soledad Account
#######################################
+# TODO change name to LeapIMAPAccount, since we're using
+# the memstore.
+# IndexedDB should also not be here anymore.
+
class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
"""
An implementation of IAccount and INamespacePresenteer
@@ -67,14 +85,19 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
# XXX SHOULD assert too that the name matches the user/uuid with which
# soledad has been initialized.
+ # XXX ??? why is this parsing mailbox name??? it's account...
+ # userid? homogenize.
self._account_name = self._parse_mailbox_name(account_name)
self._soledad = soledad
self._memstore = memstore
+ self.__mailboxes = set([])
+
self.initialize_db()
# every user should have the right to an inbox folder
# at least, so let's make one!
+ self._load_mailboxes()
if not self.mailboxes:
self.addMailbox(self.INBOX_NAME)
@@ -106,9 +129,13 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
"""
A list of the current mailboxes for this account.
"""
- return [doc.content[self.MBOX_KEY]
- for doc in self._soledad.get_from_index(
- self.TYPE_IDX, self.MBOX_KEY)]
+ return self.__mailboxes
+
+ def _load_mailboxes(self):
+ self.__mailboxes.update(
+ [doc.content[self.MBOX_KEY]
+ for doc in self._soledad.get_from_index(
+ self.TYPE_IDX, self.MBOX_KEY)])
@property
def subscriptions(self):
@@ -173,6 +200,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
mbox[self.CREATED_KEY] = creation_ts
doc = self._soledad.create_doc(mbox)
+ self._load_mailboxes()
return bool(doc)
def create(self, pathspec):
@@ -203,6 +231,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
except imap4.MailboxCollision:
if not pathspec.endswith('/'):
return False
+ self._load_mailboxes()
return True
def select(self, name, readwrite=1):
@@ -215,17 +244,22 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
:param readwrite: 1 for readwrite permissions.
:type readwrite: int
- :rtype: bool
+ :rtype: SoledadMailbox
"""
- name = self._parse_mailbox_name(name)
+ if PROFILE_CMD:
+ start = time.time()
+ name = self._parse_mailbox_name(name)
if name not in self.mailboxes:
+ logger.warning("No such mailbox!")
return None
-
self.selected = name
- return SoledadMailbox(
+ sm = SoledadMailbox(
name, self._soledad, self._memstore, readwrite)
+ if PROFILE_CMD:
+ _debugProfiling(None, "SELECT", start)
+ return sm
def delete(self, name, force=False):
"""
@@ -260,6 +294,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
"Hierarchically inferior mailboxes "
"exist and \\Noselect is set")
mbox.destroy()
+ self._load_mailboxes()
# XXX FIXME --- not honoring the inferior names...
@@ -297,6 +332,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser):
mbox.content[self.MBOX_KEY] = new
self._soledad.put_doc(mbox)
+ self._load_mailboxes()
+
# XXX ---- FIXME!!!! ------------------------------------
# until here we just renamed the index...
# We have to rename also the occurrence of this
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index d8af0a5..57505f0 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -50,6 +50,25 @@ If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid
notifying clients of new messages. Use during stress tests.
"""
NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
+PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False)
+
+if PROFILE_CMD:
+ import time
+
+ def _debugProfiling(result, cmdname, start):
+ took = (time.time() - start) * 1000
+ log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec")
+ return result
+
+ def do_profile_cmd(d, name):
+ """
+ Add the profiling debug to the passed callback.
+ :param d: deferred
+ :param name: name of the command
+ :type name: str
+ """
+ d.addCallback(_debugProfiling, name, time.time())
+ d.addErrback(lambda f: log.msg(f.getTraceback()))
class SoledadMailbox(WithMsgFields, MBoxParser):
@@ -89,6 +108,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
_listeners = defaultdict(set)
next_uid_lock = threading.Lock()
+ last_uid_lock = threading.Lock()
+
+ # TODO unify all the `primed` dicts
+ _fdoc_primed = {}
+ _last_uid_primed = {}
+ _known_uids_primed = {}
def __init__(self, mbox, soledad, memstore, rw=1):
"""
@@ -107,6 +132,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param rw: read-and-write flag for this mailbox
:type rw: int
"""
+ logger.debug("Initializing mailbox %r" % (mbox,))
leap_assert(mbox, "Need a mailbox name to initialize")
leap_assert(soledad, "Need a soledad instance to initialize")
@@ -123,12 +149,24 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.messages = MessageCollection(
mbox=mbox, soledad=self._soledad, memstore=self._memstore)
+ # XXX careful with this get/set (it would be
+ # hitting db unconditionally, move to memstore too)
+ # Now it's returning a fixed amount of flags from mem
+ # as a workaround.
if not self.getFlags():
self.setFlags(self.INIT_FLAGS)
if self._memstore:
self.prime_known_uids_to_memstore()
self.prime_last_uid_to_memstore()
+ self.prime_flag_docs_to_memstore()
+
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ # purge memstore from empty fdocs.
+ self._memstore.purge_fdoc_store(mbox)
+ logger.debug("DONE initializing mailbox %r" % (mbox,))
@property
def listeners(self):
@@ -170,8 +208,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
self.listeners.remove(listener)
- # TODO move completely to soledadstore, under memstore reponsibility.
- def _get_mbox(self):
+ def _get_mbox_doc(self):
"""
Return mailbox document.
@@ -179,14 +216,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
the query failed.
:rtype: SoledadDocument or None.
"""
- try:
- query = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_MBOX_VAL, self.mbox)
- if query:
- return query.pop()
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
+ return self._memstore.get_mbox_doc(self.mbox)
def getFlags(self):
"""
@@ -195,12 +225,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:returns: tuple of flags for this mailbox
:rtype: tuple of str
"""
- mbox = self._get_mbox()
- if not mbox:
- return None
- flags = mbox.content.get(self.FLAGS_KEY, [])
+ flags = self.INIT_FLAGS
+
+ # XXX returning fixed flags always
+ # Since I have not found a case where the client
+ # wants to modify this, as a way of speeding up
+ # selects. To do it right, we probably should keep
+ # track of the set of all flags used by msgs
+ # in this mailbox. Does it matter?
+ #mbox = self._get_mbox_doc()
+ #if not mbox:
+ #return None
+ #flags = mbox.content.get(self.FLAGS_KEY, [])
return map(str, flags)
+ # XXX move to memstore->soledadstore
def setFlags(self, flags):
"""
Sets flags for this mailbox.
@@ -210,10 +249,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
leap_assert(isinstance(flags, tuple),
"flags expected to be a tuple")
- mbox = self._get_mbox()
+ mbox = self._get_mbox_doc()
if not mbox:
return None
mbox.content[self.FLAGS_KEY] = map(str, flags)
+ logger.debug("Writing mbox document for %r to Soledad"
+ % (self.mbox,))
self._soledad.put_doc(mbox)
# XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG.
@@ -225,8 +266,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: True if the mailbox is closed
:rtype: bool
"""
- mbox = self._get_mbox()
- return mbox.content.get(self.CLOSED_KEY, False)
+ return self._memstore.get_mbox_closed(self.mbox)
def _set_closed(self, closed):
"""
@@ -235,10 +275,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param closed: the state to be set
:type closed: bool
"""
- leap_assert(isinstance(closed, bool), "closed needs to be boolean")
- mbox = self._get_mbox()
- mbox.content[self.CLOSED_KEY] = closed
- self._soledad.put_doc(mbox)
+ self._memstore.set_mbox_closed(self.mbox, closed)
closed = property(
_get_closed, _set_closed, doc="Closed attribute.")
@@ -265,10 +302,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
Prime memstore with last_uid value
"""
- set_exist = set(self.messages.all_uid_iter())
- last = max(set_exist) if set_exist else 0
- logger.info("Priming Soledad last_uid to %s" % (last,))
- self._memstore.set_last_soledad_uid(self.mbox, last)
+ primed = self._last_uid_primed.get(self.mbox, False)
+ if not primed:
+ mbox = self._get_mbox_doc()
+ last = mbox.content.get('lastuid', 0)
+ logger.info("Priming Soledad last_uid to %s" % (last,))
+ self._memstore.set_last_soledad_uid(self.mbox, last)
+ self._last_uid_primed[self.mbox] = True
def prime_known_uids_to_memstore(self):
"""
@@ -276,8 +316,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
We do this to be able to filter the requests efficiently.
"""
- known_uids = self.messages.all_soledad_uid_iter()
- self._memstore.set_known_uids(self.mbox, known_uids)
+ primed = self._known_uids_primed.get(self.mbox, False)
+ if not primed:
+ known_uids = self.messages.all_soledad_uid_iter()
+ self._memstore.set_known_uids(self.mbox, known_uids)
+ self._known_uids_primed[self.mbox] = True
+
+ def prime_flag_docs_to_memstore(self):
+ """
+ Prime memstore with all the flags documents.
+ """
+ primed = self._fdoc_primed.get(self.mbox, False)
+ if not primed:
+ all_flag_docs = self.messages.get_all_soledad_flag_docs()
+ self._memstore.load_flag_docs(self.mbox, all_flag_docs)
+ self._fdoc_primed[self.mbox] = True
def getUIDValidity(self):
"""
@@ -286,7 +339,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: unique validity identifier
:rtype: int
"""
- mbox = self._get_mbox()
+ mbox = self._get_mbox_doc()
return mbox.content.get(self.CREATED_KEY, 1)
def getUID(self, message):
@@ -420,6 +473,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
flags = tuple(str(flag) for flag in flags)
d = self._do_add_message(message, flags=flags, date=date)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "APPEND")
+ # XXX should notify here probably
return d
def _do_add_message(self, message, flags, date):
@@ -428,15 +484,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
Invoked from addMessage.
"""
d = self.messages.add_msg(message, flags=flags, date=date)
- # XXX Removing notify temporarily.
- # This is interfering with imaptest results. I'm not clear if it's
- # because we clutter the logging or because the set of listeners is
- # ever-growing. We should come up with some smart way of dealing with
- # it, or maybe just disabling it using an environmental variable since
- # we will only have just a few listeners in the regular desktop case.
- #d.addCallback(self.notify_new)
return d
+ @deferred_to_thread
def notify_new(self, *args):
"""
Notify of new messages to all the listeners.
@@ -447,12 +497,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
return
exists = self.getMessageCount()
recent = self.getRecentCount()
- logger.debug("NOTIFY: there are %s messages, %s recent" % (
- exists,
- recent))
+ logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
+ self.mbox, exists, recent))
for l in self.listeners:
- logger.debug('notifying...')
l.newMessages(exists, recent)
# commands, do not rename methods
@@ -471,7 +519,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# we should postpone the removal
# XXX move to memory store??
- self._soledad.delete_doc(self._get_mbox())
+ self._soledad.delete_doc(self._get_mbox_doc())
def _close_cb(self, result):
self.closed = True
@@ -527,8 +575,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
seq_messg = set_asked.intersection(set_exist)
return seq_messg
- @deferred_to_thread
- #@profile
def fetch(self, messages_asked, uid):
"""
Retrieve one or more messages in this mailbox.
@@ -544,6 +590,27 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise.
:type uid: bool
+ :rtype: deferred
+ """
+ d = defer.Deferred()
+ self.reactor.callInThread(self._do_fetch, messages_asked, uid, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "FETCH")
+ return d
+
+ # called in thread
+ def _do_fetch(self, messages_asked, uid, d):
+ """
+ :param messages_asked: IDs of the messages to retrieve information
+ about
+ :type messages_asked: MessageSet
+
+ :param uid: If true, the IDs are UIDs. They are message sequence IDs
+ otherwise.
+ :type uid: bool
+ :param d: deferred whose callback will be called with result.
+ :type d: Deferred
+
:rtype: A tuple of two-tuples of message sequence numbers and
LeapMessage
"""
@@ -564,10 +631,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
logger.debug("Getting msg by index: INEFFICIENT call!")
raise NotImplementedError
else:
- result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
- return result
+ got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg)
+ result = ((msgid, msg) for msgid, msg in got_msg
+ if msg is not None)
+ self.reactor.callLater(0, self.unset_recent_flags, seq_messg)
+ self.reactor.callFromThread(d.callback, result)
- @deferred_to_thread
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -606,12 +675,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- all_flags = self.messages.all_flags()
+ all_flags = self._memstore.all_flags(self.mbox)
result = ((msgid, flagsPart(
msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
return result
- @deferred_to_thread
def fetch_headers(self, messages_asked, uid):
"""
A fast method to fetch all headers, tricking just the
@@ -636,6 +704,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
MessagePart.
:rtype: tuple
"""
+ # TODO how often is thunderbird doing this?
+
class headersPart(object):
def __init__(self, uid, headers):
self.uid = uid
@@ -653,10 +723,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- all_chash = self.messages.all_flags_chash()
all_headers = self.messages.all_headers()
result = ((msgid, headersPart(
- msgid, all_headers.get(all_chash.get(msgid, 'nil'), {})))
+ msgid, all_headers.get(msgid, {})))
for msgid in seq_messg)
return result
@@ -699,14 +768,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
- from twisted.internet import reactor
if not self.isWriteable():
log.msg('read only mailbox!')
raise imap4.ReadOnlyMailbox
d = defer.Deferred()
- deferLater(reactor, 0, self._do_store, messages_asked, flags,
- mode, uid, d)
+ self.reactor.callLater(0, self._do_store, messages_asked, flags,
+ mode, uid, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "STORE")
return d
def _do_store(self, messages_asked, flags, mode, uid, observer):
@@ -721,7 +791,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type observer: deferred
"""
# XXX implement also sequence (uid = 0)
- # XXX we should prevent cclient from setting Recent flag?
+ # XXX we should prevent client from setting Recent flag?
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
@@ -785,15 +855,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
uid when the copy succeed.
:rtype: Deferred
"""
- from twisted.internet import reactor
-
d = defer.Deferred()
- # XXX this should not happen ... track it down,
- # probably to FETCH...
- if message is None:
- log.msg("BUG: COPY found a None in passed message")
- d.callback(None)
- deferLater(reactor, 0, self._do_copy, message, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "COPY")
+ deferLater(self.reactor, 0, self._do_copy, message, d)
return d
def _do_copy(self, message, observer):
@@ -809,51 +874,70 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
UID of the message
:type observer: Deferred
"""
+ memstore = self._memstore
+
+ def createCopy(result):
+ exist, new_fdoc = result
+ if exist:
+ # Should we signal error on the callback?
+ logger.warning("Destination message already exists!")
+
+ # XXX I'm not sure if we should raise the
+ # errback. This actually rases an ugly warning
+ # in some muas like thunderbird.
+ # UID 0 seems a good convention for no uid.
+ observer.callback(0)
+ else:
+ mbox = self.mbox
+ uid_next = memstore.increment_last_soledad_uid(mbox)
+
+ new_fdoc[self.UID_KEY] = uid_next
+ new_fdoc[self.MBOX_KEY] = mbox
+
+ flags = list(new_fdoc[self.FLAGS_KEY])
+ flags.append(fields.RECENT_FLAG)
+ new_fdoc[self.FLAGS_KEY] = tuple(set(flags))
+
+ # FIXME set recent!
+
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(new_fdoc),
+ observer=observer,
+ notify_on_disk=False)
+
+ d = self._get_msg_copy(message)
+ d.addCallback(createCopy)
+ d.addErrback(lambda f: log.msg(f.getTraceback()))
+
+ @deferred_to_thread
+ def _get_msg_copy(self, message):
+ """
+ Get a copy of the fdoc for this message, and check whether
+ it already exists.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :return: exist, new_fdoc
+ :rtype: tuple
+ """
# XXX for clarity, this could be delegated to a
# MessageCollection mixin that implements copy too, and
# moved out of here.
msg = message
memstore = self._memstore
- # XXX should use a public api instead
- fdoc = msg._fdoc
- hdoc = msg._hdoc
- if not fdoc:
+ if empty(msg.fdoc):
logger.warning("Tried to copy a MSG with no fdoc")
return
- new_fdoc = copy.deepcopy(fdoc.content)
-
+ new_fdoc = copy.deepcopy(msg.fdoc.content)
fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
- # XXX is this hitting the db??? --- probably.
- # We should profile after the pre-fetch.
dest_fdoc = memstore.get_fdoc_from_chash(
fdoc_chash, self.mbox)
- exist = dest_fdoc and not empty(dest_fdoc.content)
-
- if exist:
- # Should we signal error on the callback?
- logger.warning("Destination message already exists!")
-
- # XXX I'm still not clear if we should raise the
- # errback. This actually rases an ugly warning
- # in some muas like thunderbird. I guess the user does
- # not deserve that.
- observer.callback(True)
- else:
- mbox = self.mbox
- uid_next = memstore.increment_last_soledad_uid(mbox)
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = mbox
-
- # FIXME set recent!
- self._memstore.create_message(
- self.mbox, uid_next,
- MessageWrapper(
- new_fdoc, hdoc.content),
- observer=observer,
- notify_on_disk=False)
+ exist = not empty(dest_fdoc)
+ return exist, new_fdoc
# convenience fun
@@ -865,12 +949,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
for doc in docs:
self.messages._soledad.delete_doc(doc)
- def unset_recent_flags(self, uids):
+ def unset_recent_flags(self, uid_seq):
"""
Unset Recent flag for a sequence of UIDs.
"""
- seq_messg = self._bound_seq(uids)
- self.messages.unset_recent_flags(seq_messg)
+ self.messages.unset_recent_flags(uid_seq)
def __repr__(self):
"""
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index ed2b3f2..f23a234 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -25,6 +25,7 @@ import weakref
from collections import defaultdict
from copy import copy
+from enum import Enum
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.python import log
@@ -32,8 +33,7 @@ from zope.interface import implements
from leap.common.check import leap_assert_type
from leap.mail import size
-from leap.mail.decorators import deferred_to_thread
-from leap.mail.utils import empty
+from leap.mail.utils import empty, phash_iter
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
from leap.mail.imap.fields import fields
@@ -42,12 +42,19 @@ from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import ReferenciableDict
+from leap.mail.decorators import deferred_to_thread
+
logger = logging.getLogger(__name__)
# The default period to do writebacks to the permanent
# soledad storage, in seconds.
-SOLEDAD_WRITE_PERIOD = 10
+SOLEDAD_WRITE_PERIOD = 15
+
+FDOC = MessagePartType.fdoc.key
+HDOC = MessagePartType.hdoc.key
+CDOCS = MessagePartType.cdocs.key
+DOCS_ID = MessagePartType.docs_id.key
@contextlib.contextmanager
@@ -65,6 +72,9 @@ def set_bool_flag(obj, att):
setattr(obj, att, False)
+DirtyState = Enum("none", "dirty", "new")
+
+
class MemoryStore(object):
"""
An in-memory store to where we can write the different parts that
@@ -88,6 +98,7 @@ class MemoryStore(object):
WRITING_FLAG = "_writing"
_last_uid_lock = threading.Lock()
+ _fdoc_docid_lock = threading.Lock()
def __init__(self, permanent_store=None,
write_period=SOLEDAD_WRITE_PERIOD):
@@ -100,11 +111,19 @@ class MemoryStore(object):
:param write_period: the interval to dump messages to disk, in seconds.
:type write_period: int
"""
+ from twisted.internet import reactor
+ self.reactor = reactor
+
self._permanent_store = permanent_store
self._write_period = write_period
# Internal Storage: messages
- self._msg_store = {}
+ """
+ flags document store.
+ _fdoc_store[mbox][uid] = { 'content': 'aaa' }
+ """
+ self._fdoc_store = defaultdict(lambda: defaultdict(
+ lambda: ReferenciableDict({})))
# Sizes
"""
@@ -114,9 +133,28 @@ class MemoryStore(object):
# Internal Storage: payload-hash
"""
- {'phash': weakreaf.proxy(dict)}
+ fdocs:doc-id store, stores document IDs for putting
+ the dirty flags-docs.
+ """
+ self._fdoc_id_store = defaultdict(lambda: defaultdict(
+ lambda: ''))
+
+ # Internal Storage: content-hash:hdoc
+ """
+ hdoc-store keeps references to
+ the header-documents indexed by content-hash.
+
+ {'chash': { dict-stuff }
+ }
+ """
+ self._hdoc_store = defaultdict(lambda: ReferenciableDict({}))
+
+ # Internal Storage: payload-hash:cdoc
+ """
+ content-docs stored by payload-hash
+ {'phash': { dict-stuff } }
"""
- self._phash_store = {}
+ self._cdoc_store = defaultdict(lambda: ReferenciableDict({}))
# Internal Storage: content-hash:fdoc
"""
@@ -127,7 +165,7 @@ class MemoryStore(object):
'mbox-b': weakref.proxy(dict)}
}
"""
- self._chash_fdoc_store = {}
+ self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None))
# Internal Storage: recent-flags store
"""
@@ -153,7 +191,7 @@ class MemoryStore(object):
{'mbox-a': 42,
'mbox-b': 23}
"""
- self._last_uid = {}
+ self._last_uid = defaultdict(lambda: 0)
"""
known-uids keeps a count of the uids that soledad knows for a given
@@ -165,11 +203,15 @@ class MemoryStore(object):
# New and dirty flags, to set MessageWrapper State.
self._new = set([])
+ self._new_queue = set([])
self._new_deferreds = {}
+
self._dirty = set([])
- self._rflags_dirty = set([])
+ self._dirty_queue = set([])
self._dirty_deferreds = {}
+ self._rflags_dirty = set([])
+
# Flag for signaling we're busy writing to the disk storage.
setattr(self, self.WRITING_FLAG, False)
@@ -185,11 +227,17 @@ class MemoryStore(object):
# We can start the write loop right now, why wait?
self._start_write_loop()
+ else:
+ # We have a memory-only store.
+ self.producer = None
+ self._write_loop = None
def _start_write_loop(self):
"""
Start loop for writing to disk database.
"""
+ if self._write_loop is None:
+ return
if not self._write_loop.running:
self._write_loop.start(self._write_period, now=True)
@@ -197,6 +245,8 @@ class MemoryStore(object):
"""
Stop loop for writing to disk database.
"""
+ if self._write_loop is None:
+ return
if self._write_loop.running:
self._write_loop.stop()
@@ -230,34 +280,30 @@ class MemoryStore(object):
be fired.
:type notify_on_disk: bool
"""
- log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
+ log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid))
key = mbox, uid
self._add_message(mbox, uid, message, notify_on_disk)
self._new.add(key)
- # XXX use this while debugging the callback firing,
- # remove after unittesting this.
- #def log_add(result):
- #return result
- #observer.addCallback(log_add)
-
- if notify_on_disk:
- # We store this deferred so we can keep track of the pending
- # operations internally.
- # TODO this should fire with the UID !!! -- change that in
- # the soledad store code.
- self._new_deferreds[key] = observer
- if not notify_on_disk:
- # Caller does not care, just fired and forgot, so we pass
- # a defer that will inmediately have its callback triggered.
- observer.callback(uid)
+ if observer is not None:
+ if notify_on_disk:
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
+ # TODO this should fire with the UID !!! -- change that in
+ # the soledad store code.
+ self._new_deferreds[key] = observer
+
+ else:
+ # Caller does not care, just fired and forgot, so we pass
+ # a defer that will inmediately have its callback triggered.
+ self.reactor.callFromThread(observer.callback, uid)
def put_message(self, mbox, uid, message, notify_on_disk=True):
"""
Put an existing message.
- This will set the dirty flag on the MemoryStore.
+ This will also set the dirty flag on the MemoryStore.
:param mbox: the mailbox
:type mbox: str or unicode
@@ -289,76 +335,59 @@ class MemoryStore(object):
Helper method, called by both create_message and put_message.
See those for parameter documentation.
"""
- # XXX have to differentiate between notify_new and notify_dirty
- # TODO defaultdict the hell outa here...
-
- key = mbox, uid
msg_dict = message.as_dict()
- FDOC = MessagePartType.fdoc.key
- HDOC = MessagePartType.hdoc.key
- CDOCS = MessagePartType.cdocs.key
- DOCS_ID = MessagePartType.docs_id.key
-
- try:
- store = self._msg_store[key]
- except KeyError:
- self._msg_store[key] = {FDOC: {},
- HDOC: {},
- CDOCS: {},
- DOCS_ID: {}}
- store = self._msg_store[key]
-
fdoc = msg_dict.get(FDOC, None)
- if fdoc:
- if not store.get(FDOC, None):
- store[FDOC] = ReferenciableDict({})
- store[FDOC].update(fdoc)
+ if fdoc is not None:
+ fdoc_store = self._fdoc_store[mbox][uid]
+ fdoc_store.update(fdoc)
+ chash_fdoc_store = self._chash_fdoc_store
# content-hash indexing
chash = fdoc.get(fields.CONTENT_HASH_KEY)
- chash_fdoc_store = self._chash_fdoc_store
- if not chash in chash_fdoc_store:
- chash_fdoc_store[chash] = {}
-
chash_fdoc_store[chash][mbox] = weakref.proxy(
- store[FDOC])
+ self._fdoc_store[mbox][uid])
hdoc = msg_dict.get(HDOC, None)
if hdoc is not None:
- if not store.get(HDOC, None):
- store[HDOC] = ReferenciableDict({})
- store[HDOC].update(hdoc)
-
- docs_id = msg_dict.get(DOCS_ID, None)
- if docs_id:
- if not store.get(DOCS_ID, None):
- store[DOCS_ID] = {}
- store[DOCS_ID].update(docs_id)
+ chash = hdoc.get(fields.CONTENT_HASH_KEY)
+ hdoc_store = self._hdoc_store[chash]
+ hdoc_store.update(hdoc)
cdocs = message.cdocs
- for cdoc_key in cdocs.keys():
- if not store.get(CDOCS, None):
- store[CDOCS] = {}
-
- cdoc = cdocs[cdoc_key]
- # first we make it weak-referenciable
- referenciable_cdoc = ReferenciableDict(cdoc)
- store[CDOCS][cdoc_key] = referenciable_cdoc
+ for cdoc in cdocs.values():
phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None)
if not phash:
continue
- self._phash_store[phash] = weakref.proxy(referenciable_cdoc)
+ cdoc_store = self._cdoc_store[phash]
+ cdoc_store.update(cdoc)
- def prune(seq, store):
- for key in seq:
- if key in store and empty(store.get(key)):
- store.pop(key)
+ # Update memory store size
+ # XXX this should use [mbox][uid]
+ key = mbox, uid
+ self._sizes[key] = size.get_size(self._fdoc_store[key])
+ # TODO add hdoc and cdocs sizes too
- prune((FDOC, HDOC, CDOCS, DOCS_ID), store)
+ def purge_fdoc_store(self, mbox):
+ """
+ Purge the empty documents from a fdoc store.
+ Called during initialization of the SoledadMailbox
- # Update memory store size
- self._sizes[key] = size(self._msg_store[key])
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ # XXX This is really a workaround until I find the conditions
+ # that are making the empty items remain there.
+ # This happens, for instance, after running several times
+ # the regression test, that issues a store deleted + expunge + select
+ # The items are being correclty deleted, but in succesive appends
+ # the empty items with previously deleted uids reappear as empty
+ # documents. I suspect it's a timing condition with a previously
+ # evaluated sequence being used after the items has been removed.
+
+ for uid, value in self._fdoc_store[mbox].items():
+ if empty(value):
+ del self._fdoc_store[mbox][uid]
def get_docid_for_fdoc(self, mbox, uid):
"""
@@ -371,13 +400,20 @@ class MemoryStore(object):
:type uid: int
:rtype: unicode or None
"""
- fdoc = self._permanent_store.get_flags_doc(mbox, uid)
- if empty(fdoc):
- return None
- doc_id = fdoc.doc_id
+ with self._fdoc_docid_lock:
+ doc_id = self._fdoc_id_store[mbox][uid]
+
+ if empty(doc_id):
+ fdoc = self._permanent_store.get_flags_doc(mbox, uid)
+ if empty(fdoc) or empty(fdoc.content):
+ return None
+ doc_id = fdoc.doc_id
+ self._fdoc_id_store[mbox][uid] = doc_id
+
return doc_id
- def get_message(self, mbox, uid, flags_only=False):
+ def get_message(self, mbox, uid, dirtystate=DirtyState.none,
+ flags_only=False):
"""
Get a MessageWrapper for the given mbox and uid combination.
@@ -385,25 +421,58 @@ class MemoryStore(object):
:type mbox: str or unicode
:param uid: the message UID
:type uid: int
+ :param dirtystate: DirtyState enum: one of `dirty`, `new`
+ or `none` (default)
+ :type dirtystate: enum
:param flags_only: whether the message should carry only a reference
to the flags document.
:type flags_only: bool
+ :
:return: MessageWrapper or None
"""
+ if dirtystate == DirtyState.dirty:
+ flags_only = True
+
key = mbox, uid
- FDOC = MessagePartType.fdoc.key
- msg_dict = self._msg_store.get(key, None)
- if empty(msg_dict):
+ fdoc = self._fdoc_store[mbox][uid]
+ if empty(fdoc):
return None
- new, dirty = self._get_new_dirty_state(key)
+
+ new, dirty = False, False
+ if dirtystate == DirtyState.none:
+ new, dirty = self._get_new_dirty_state(key)
+ if dirtystate == DirtyState.dirty:
+ new, dirty = False, True
+ if dirtystate == DirtyState.new:
+ new, dirty = True, False
+
if flags_only:
- return MessageWrapper(fdoc=msg_dict[FDOC],
+ return MessageWrapper(fdoc=fdoc,
new=new, dirty=dirty,
memstore=weakref.proxy(self))
else:
- return MessageWrapper(from_dict=msg_dict,
+ chash = fdoc.get(fields.CONTENT_HASH_KEY)
+ hdoc = self._hdoc_store[chash]
+ if empty(hdoc):
+ hdoc = self._permanent_store.get_headers_doc(chash)
+ if empty(hdoc):
+ return None
+ if not empty(hdoc.content):
+ self._hdoc_store[chash] = hdoc.content
+ hdoc = hdoc.content
+ cdocs = None
+
+ pmap = hdoc.get(fields.PARTS_MAP_KEY, None)
+ if new and pmap is not None:
+ # take the different cdocs for write...
+ cdoc_store = self._cdoc_store
+ cdocs_list = phash_iter(hdoc)
+ cdocs = dict(enumerate(
+ [cdoc_store[phash] for phash in cdocs_list], 1))
+
+ return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs,
new=new, dirty=dirty,
memstore=weakref.proxy(self))
@@ -424,23 +493,36 @@ class MemoryStore(object):
# token to ensure consistency in the removal.
try:
+ del self._fdoc_store[mbox][uid]
+ except KeyError:
+ pass
+
+ try:
key = mbox, uid
self._new.discard(key)
self._dirty.discard(key)
- self._msg_store.pop(key, None)
if key in self._sizes:
del self._sizes[key]
-
+ self._known_uids[mbox].discard(uid)
+ except Exception as exc:
+ logger.error("error while removing message!")
+ logger.exception(exc)
+ try:
+ with self._fdoc_docid_lock:
+ del self._fdoc_id_store[mbox][uid]
except Exception as exc:
+ logger.error("error while removing message!")
logger.exception(exc)
# IMessageStoreWriter
+ @deferred_to_thread
def write_messages(self, store):
"""
Write the message documents in this MemoryStore to a different store.
:param store: the IMessageStore to write to
+ :rtype: False if queue is not empty, None otherwise.
"""
# For now, we pass if the queue is not empty, to avoid duplicate
# queuing.
@@ -450,7 +532,7 @@ class MemoryStore(object):
# XXX this could return the deferred for all the enqueued operations
if not self.producer.is_queue_empty():
- return
+ return False
if any(map(lambda i: not empty(i), (self._new, self._dirty))):
logger.info("Writing messages to Soledad...")
@@ -459,9 +541,14 @@ class MemoryStore(object):
# is accquired
with set_bool_flag(self, self.WRITING_FLAG):
for rflags_doc_wrapper in self.all_rdocs_iter():
- self.producer.push(rflags_doc_wrapper)
- for msg_wrapper in self.all_new_dirty_msg_iter():
- self.producer.push(msg_wrapper)
+ self.producer.push(rflags_doc_wrapper,
+ state=self.producer.STATE_DIRTY)
+ for msg_wrapper in self.all_new_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_NEW)
+ for msg_wrapper in self.all_dirty_msg_iter():
+ self.producer.push(msg_wrapper,
+ state=self.producer.STATE_DIRTY)
# MemoryStore specific methods.
@@ -473,8 +560,7 @@ class MemoryStore(object):
:type mbox: str or unicode
:rtype: list
"""
- all_keys = self._msg_store.keys()
- return [uid for m, uid in all_keys if m == mbox]
+ return self._fdoc_store[mbox].keys()
def get_soledad_known_uids(self, mbox):
"""
@@ -523,7 +609,8 @@ class MemoryStore(object):
:param value: the value to set
:type value: int
"""
- leap_assert_type(value, int)
+ # can be long???
+ #leap_assert_type(value, int)
logger.info("setting last soledad uid for %s to %s" %
(mbox, value))
# if we already have a value here, don't do anything
@@ -555,10 +642,9 @@ class MemoryStore(object):
with self._last_uid_lock:
self._last_uid[mbox] += 1
value = self._last_uid[mbox]
- self.write_last_uid(mbox, value)
+ self.reactor.callInThread(self.write_last_uid, mbox, value)
return value
- @deferred_to_thread
def write_last_uid(self, mbox, value):
"""
Increment the soledad integer cache for the highest uid value.
@@ -572,11 +658,112 @@ class MemoryStore(object):
if self._permanent_store:
self._permanent_store.write_last_uid(mbox, value)
+ def load_flag_docs(self, mbox, flag_docs):
+ """
+ Load the flag documents for the given mbox.
+ Used during initial flag docs prefetch.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param flag_docs: a dict with the content for the flag docs, indexed
+ by uid.
+ :type flag_docs: dict
+ """
+ # We can do direct assignments cause we know this will only
+ # be called during initialization of the mailbox.
+ # TODO could hook here a sanity-check
+ # for duplicates
+
+ fdoc_store = self._fdoc_store[mbox]
+ chash_fdoc_store = self._chash_fdoc_store
+ for uid in flag_docs:
+ rdict = ReferenciableDict(flag_docs[uid])
+ fdoc_store[uid] = rdict
+ # populate chash dict too, to avoid fdoc duplication
+ chash = flag_docs[uid]["chash"]
+ chash_fdoc_store[chash][mbox] = weakref.proxy(
+ self._fdoc_store[mbox][uid])
+
+ def update_flags(self, mbox, uid, fdoc):
+ """
+ Update the flag document for a given mbox and uid combination,
+ and set the dirty flag.
+ We could use put_message, but this is faster.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param uid: the uid of the message
+ :type uid: int
+
+ :param fdoc: a dict with the content for the flag docs
+ :type fdoc: dict
+ """
+ key = mbox, uid
+ self._fdoc_store[mbox][uid].update(fdoc)
+ self._dirty.add(key)
+
+ def load_header_docs(self, header_docs):
+ """
+ Load the flag documents for the given mbox.
+ Used during header docs prefetch, and during cache after
+ a read from soledad if the hdoc property in message did not
+ find its value in here.
+
+ :param flag_docs: a dict with the content for the flag docs.
+ :type flag_docs: dict
+ """
+ hdoc_store = self._hdoc_store
+ for chash in header_docs:
+ hdoc_store[chash] = ReferenciableDict(header_docs[chash])
+
+ def all_flags(self, mbox):
+ """
+ Return a dictionary with all the flags for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: dict
+ """
+ flags_dict = {}
+ uids = self.get_uids(mbox)
+ fdoc_store = self._fdoc_store[mbox]
+
+ for uid in uids:
+ try:
+ flags = fdoc_store[uid][fields.FLAGS_KEY]
+ flags_dict[uid] = flags
+ except KeyError:
+ continue
+ return flags_dict
+
+ def all_headers(self, mbox):
+ """
+ Return a dictionary with all the header docs for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: dict
+ """
+ headers_dict = {}
+ uids = self.get_uids(mbox)
+ fdoc_store = self._fdoc_store[mbox]
+ hdoc_store = self._hdoc_store
+
+ for uid in uids:
+ try:
+ chash = fdoc_store[uid][fields.CONTENT_HASH_KEY]
+ hdoc = hdoc_store[chash]
+ if not empty(hdoc):
+ headers_dict[uid] = hdoc
+ except KeyError:
+ continue
+ return headers_dict
+
# Counting sheeps...
def count_new_mbox(self, mbox):
"""
- Count the new messages by inbox.
+ Count the new messages by mailbox.
:param mbox: the mailbox
:type mbox: str or unicode
@@ -594,6 +781,33 @@ class MemoryStore(object):
"""
return len(self._new)
+ def count(self, mbox):
+ """
+ Return the count of messages for a given mbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: number of messages
+ :rtype: int
+ """
+ return len(self._fdoc_store[mbox])
+
+ def unseen_iter(self, mbox):
+ """
+ Get an iterator for the message UIDs with no `seen` flag
+ for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :return: iterator through unseen message doc UIDs
+ :rtype: iterable
+ """
+ fdocs = self._fdoc_store[mbox]
+
+ return [uid for uid, value
+ in fdocs.items()
+ if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])]
+
def get_cdoc_from_phash(self, phash):
"""
Return a content-document by its payload-hash.
@@ -602,7 +816,7 @@ class MemoryStore(object):
:type phash: str or unicode
:rtype: MessagePartDoc
"""
- doc = self._phash_store.get(phash, None)
+ doc = self._cdoc_store.get(phash, None)
# XXX return None for consistency?
@@ -632,8 +846,7 @@ class MemoryStore(object):
:return: MessagePartDoc. It will return None if the flags document
has empty content or it is flagged as \\Deleted.
"""
- docs_dict = self._chash_fdoc_store.get(chash, None)
- fdoc = docs_dict.get(mbox, None) if docs_dict else None
+ fdoc = self._chash_fdoc_store[chash][mbox]
# a couple of special cases.
# 1. We might have a doc with empty content...
@@ -644,53 +857,61 @@ class MemoryStore(object):
# We want to create a new one in this case.
# Hmmm what if the deletion is un-done?? We would end with a
# duplicate...
- if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]:
+ if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []):
return None
uid = fdoc[fields.UID_KEY]
key = mbox, uid
new = key in self._new
dirty = key in self._dirty
+
return MessagePartDoc(
new=new, dirty=dirty, store="mem",
part=MessagePartType.fdoc,
content=fdoc,
doc_id=None)
- def all_msg_iter(self):
+ def iter_fdoc_keys(self):
"""
- Return generator that iterates through all messages in the store.
-
- :return: generator of MessageWrappers
- :rtype: generator
+ Return a generator through all the mbox, uid keys in the flags-doc
+ store.
"""
- return (self.get_message(*key)
- for key in sorted(self._msg_store.keys()))
+ fdoc_store = self._fdoc_store
+ for mbox in fdoc_store:
+ for uid in fdoc_store[mbox]:
+ yield mbox, uid
- def all_new_dirty_msg_iter(self):
+ def all_new_msg_iter(self):
"""
- Return generator that iterates through all new and dirty messages.
+ Return generator that iterates through all new messages.
:return: generator of MessageWrappers
:rtype: generator
"""
- return (self.get_message(*key)
- for key in sorted(self._msg_store.keys())
- if key in self._new or key in self._dirty)
+ gm = self.get_message
+ # need to freeze, set can change during iteration
+ new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)]
+ # move content from new set to the queue
+ self._new_queue.update(self._new)
+ self._new.difference_update(self._new)
+ return new
- def all_msg_dict_for_mbox(self, mbox):
+ def all_dirty_msg_iter(self):
"""
- Return all the message dicts for a given mbox.
+ Return generator that iterates through all dirty messages.
- :param mbox: the mailbox
- :type mbox: str or unicode
- :return: list of dictionaries
- :rtype: list
+ :return: generator of MessageWrappers
+ :rtype: generator
"""
- # This *needs* to return a fixed sequence. Otherwise the dictionary len
- # will change during iteration, when we modify it
- return [self._msg_store[(mb, uid)]
- for mb, uid in self._msg_store if mb == mbox]
+ gm = self.get_message
+ # need to freeze, set can change during iteration
+ dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty)
+ for key in tuple(self._dirty)]
+ # move content from new and dirty sets to the queue
+
+ self._dirty_queue.update(self._dirty)
+ self._dirty.difference_update(self._dirty)
+ return dirty
def all_deleted_uid_iter(self, mbox):
"""
@@ -704,11 +925,10 @@ class MemoryStore(object):
"""
# This *needs* to return a fixed sequence. Otherwise the dictionary len
# will change during iteration, when we modify it
- all_deleted = [
- msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox)
- if msg.get('fdoc', None)
- and fields.DELETED_FLAG in msg['fdoc']['flags']]
- return all_deleted
+ fdocs = self._fdoc_store[mbox]
+ return [uid for uid, value
+ in fdocs.items()
+ if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])]
# new, dirty flags
@@ -721,26 +941,30 @@ class MemoryStore(object):
:return: tuple of bools
:rtype: tuple
"""
+ # TODO change indexing of sets to [mbox][key] too.
# XXX should return *first* the news, and *then* the dirty...
+
+ # TODO should query in queues too , true?
+ #
return map(lambda _set: key in _set, (self._new, self._dirty))
- def set_new(self, key):
+ def set_new_queued(self, key):
"""
- Add the key value to the `new` set.
+ Add the key value to the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.add(key)
+ self._new_queue.add(key)
- def unset_new(self, key):
+ def unset_new_queued(self, key):
"""
- Remove the key value from the `new` set.
+ Remove the key value from the `new-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._new.discard(key)
+ self._new_queue.discard(key)
deferreds = self._new_deferreds
d = deferreds.get(key, None)
if d:
@@ -749,23 +973,23 @@ class MemoryStore(object):
d.callback('%s, ok' % str(key))
deferreds.pop(key)
- def set_dirty(self, key):
+ def set_dirty_queued(self, key):
"""
- Add the key value to the `dirty` set.
+ Add the key value to the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.add(key)
+ self._dirty_queue.add(key)
- def unset_dirty(self, key):
+ def unset_dirty_queued(self, key):
"""
- Remove the key value from the `dirty` set.
+ Remove the key value from the `dirty-queue` set.
:param key: the key for the message, in the form mbox, uid
:type key: tuple
"""
- self._dirty.discard(key)
+ self._dirty_queue.discard(key)
deferreds = self._dirty_deferreds
d = deferreds.get(key, None)
if d:
@@ -776,7 +1000,6 @@ class MemoryStore(object):
# Recent Flags
- # TODO --- nice but unused
def set_recent_flag(self, mbox, uid):
"""
Set the `Recent` flag for a given mailbox and UID.
@@ -894,6 +1117,8 @@ class MemoryStore(object):
"""
self._stop_write_loop()
if self._permanent_store is not None:
+ # XXX we should check if we did get a True value on this
+ # operation. If we got False we should retry! (queue was not empty)
self.write_messages(self._permanent_store)
self.producer.flush()
@@ -911,10 +1136,18 @@ class MemoryStore(object):
:type observer: Deferred
"""
soledad_store = self._permanent_store
+ if soledad_store is None:
+ # just-in memory store, easy then.
+ self._delete_from_memory(mbox, observer)
+ return
+
+ # We have a soledad storage.
try:
# Stop and trigger last write
self.stop_and_flush()
# Wait on the writebacks to finish
+
+ # XXX what if pending deferreds is empty?
pending_deferreds = (self._new_deferreds.get(mbox, []) +
self._dirty_deferreds.get(mbox, []))
d1 = defer.gatherResults(pending_deferreds, consumeErrors=True)
@@ -923,6 +1156,18 @@ class MemoryStore(object):
except Exception as exc:
logger.exception(exc)
+ def _delete_from_memory(self, mbox, observer):
+ """
+ Remove all messages marked as deleted from soledad and memory.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param observer: a deferred that will be fired when expunge is done
+ :type observer: Deferred
+ """
+ mem_deleted = self.remove_all_deleted(mbox)
+ observer.callback(mem_deleted)
+
def _delete_from_soledad_and_memory(self, result, mbox, observer):
"""
Remove all messages marked as deleted from soledad and memory.
@@ -939,12 +1184,8 @@ class MemoryStore(object):
try:
# 1. Delete all messages marked as deleted in soledad.
-
- # XXX this could be deferred for faster operation.
- if soledad_store:
- sol_deleted = soledad_store.remove_all_deleted(mbox)
- else:
- sol_deleted = []
+ logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,))
+ sol_deleted = soledad_store.remove_all_deleted(mbox)
try:
self._known_uids[mbox].difference_update(set(sol_deleted))
@@ -952,6 +1193,7 @@ class MemoryStore(object):
logger.exception(exc)
# 2. Delete all messages marked as deleted in memory.
+ logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,))
mem_deleted = self.remove_all_deleted(mbox)
all_deleted = set(mem_deleted).union(set(sol_deleted))
@@ -960,8 +1202,43 @@ class MemoryStore(object):
logger.exception(exc)
finally:
self._start_write_loop()
+
observer.callback(all_deleted)
+ # Mailbox documents and attributes
+
+ # This could be also be cached in memstore, but proxying directly
+ # to soledad since it's not too performance-critical.
+
+ def get_mbox_doc(self, mbox):
+ """
+ Return the soledad document for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: SoledadDocument or None.
+ """
+ return self.permanent_store.get_mbox_document(mbox)
+
+ def get_mbox_closed(self, mbox):
+ """
+ Return the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: bool
+ """
+ return self.permanent_store.get_mbox_closed(mbox)
+
+ def set_mbox_closed(self, mbox, closed):
+ """
+ Set the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ """
+ self.permanent_store.set_mbox_closed(mbox, closed)
+
# Dump-to-disk controls.
@property
diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py
index b1f333a..257721c 100644
--- a/src/leap/mail/imap/messageparts.py
+++ b/src/leap/mail/imap/messageparts.py
@@ -98,7 +98,7 @@ class MessageWrapper(object):
CDOCS = "cdocs"
DOCS_ID = "docs_id"
- # Using slots to limit some the memory footprint,
+ # Using slots to limit some the memory use,
# Add your attribute here.
__slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"]
@@ -148,7 +148,7 @@ class MessageWrapper(object):
"""
return self._new
- def _set_new(self, value=True):
+ def _set_new(self, value=False):
"""
Set the value for the `new` flag, and propagate it
to the memory store if any.
@@ -158,11 +158,14 @@ class MessageWrapper(object):
"""
self._new = value
if self.memstore:
- mbox = self.fdoc.content['mbox']
- uid = self.fdoc.content['uid']
+ mbox = self.fdoc.content.get('mbox', None)
+ uid = self.fdoc.content.get('uid', None)
+ if not mbox or not uid:
+ logger.warning("Malformed fdoc")
+ return
key = mbox, uid
- fun = [self.memstore.unset_new,
- self.memstore.set_new][int(value)]
+ fun = [self.memstore.unset_new_queued,
+ self.memstore.set_new_queued][int(value)]
fun(key)
else:
logger.warning("Could not find a memstore referenced from this "
@@ -190,11 +193,14 @@ class MessageWrapper(object):
"""
self._dirty = value
if self.memstore:
- mbox = self.fdoc.content['mbox']
- uid = self.fdoc.content['uid']
+ mbox = self.fdoc.content.get('mbox', None)
+ uid = self.fdoc.content.get('uid', None)
+ if not mbox or not uid:
+ logger.warning("Malformed fdoc")
+ return
key = mbox, uid
- fun = [self.memstore.unset_dirty,
- self.memstore.set_dirty][int(value)]
+ fun = [self.memstore.unset_dirty_queued,
+ self.memstore.set_dirty_queued][int(value)]
fun(key)
else:
logger.warning("Could not find a memstore referenced from this "
@@ -271,13 +277,17 @@ class MessageWrapper(object):
:rtype: generator
"""
if self._dirty:
- mbox = self.fdoc.content[fields.MBOX_KEY]
- uid = self.fdoc.content[fields.UID_KEY]
- docid_dict = self._dict[self.DOCS_ID]
- docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
- mbox, uid)
-
- if not empty(self.fdoc.content):
+ try:
+ mbox = self.fdoc.content[fields.MBOX_KEY]
+ uid = self.fdoc.content[fields.UID_KEY]
+ docid_dict = self._dict[self.DOCS_ID]
+ docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc(
+ mbox, uid)
+ except Exception as exc:
+ logger.debug("Error while walking message...")
+ logger.exception(exc)
+
+ if not empty(self.fdoc.content) and 'uid' in self.fdoc.content:
yield self.fdoc
if not empty(self.hdoc.content):
yield self.hdoc
@@ -408,10 +418,8 @@ class MessagePart(object):
if payload:
content_type = self._get_ctype_from_document(phash)
charset = find_charset(content_type)
- logger.debug("Got charset from header: %s" % (charset,))
if charset is None:
charset = self._get_charset(payload)
- logger.debug("Got charset: %s" % (charset,))
try:
if isinstance(payload, unicode):
payload = payload.encode(charset)
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 25fc55f..fc1ec55 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -28,7 +28,6 @@ from functools import partial
from twisted.mail import imap4
from twisted.internet import defer
-from twisted.python import log
from zope.interface import implements
from zope.proxy import sameProxiedObjects
@@ -78,7 +77,7 @@ def try_unique_query(curried):
# TODO we could take action, like trigger a background
# process to kill dupes.
name = getattr(curried, 'expected', 'doc')
- logger.warning(
+ logger.debug(
"More than one %s found for this mbox, "
"we got a duplicate!!" % (name,))
return query.pop()
@@ -88,6 +87,13 @@ def try_unique_query(curried):
logger.exception("Unhandled error %r" % exc)
+"""
+A dictionary that keeps one lock per mbox and uid.
+"""
+# XXX too much overhead?
+fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock()))
+
+
class LeapMessage(fields, MailParser, MBoxParser):
"""
The main representation of a message.
@@ -102,8 +108,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
implements(imap4.IMessage)
- flags_lock = threading.Lock()
-
def __init__(self, soledad, uid, mbox, collection=None, container=None):
"""
Initializes a LeapMessage.
@@ -129,10 +133,13 @@ class LeapMessage(fields, MailParser, MBoxParser):
self.__chash = None
self.__bdoc = None
+ from twisted.internet import reactor
+ self.reactor = reactor
+
# XXX make these properties public
@property
- def _fdoc(self):
+ def fdoc(self):
"""
An accessor to the flags document.
"""
@@ -149,35 +156,43 @@ class LeapMessage(fields, MailParser, MBoxParser):
return fdoc
@property
- def _hdoc(self):
+ def hdoc(self):
"""
An accessor to the headers document.
"""
- if self._container is not None:
+ container = self._container
+ if container is not None:
hdoc = self._container.hdoc
if hdoc and not empty(hdoc.content):
return hdoc
- # XXX cache this into the memory store !!!
- return self._get_headers_doc()
+ hdoc = self._get_headers_doc()
+
+ if container and not empty(hdoc.content):
+ # mem-cache it
+ hdoc_content = hdoc.content
+ chash = hdoc_content.get(fields.CONTENT_HASH_KEY)
+ hdocs = {chash: hdoc_content}
+ container.memstore.load_header_docs(hdocs)
+ return hdoc
@property
- def _chash(self):
+ def chash(self):
"""
An accessor to the content hash for this message.
"""
- if not self._fdoc:
+ if not self.fdoc:
return None
- if not self.__chash and self._fdoc:
- self.__chash = self._fdoc.content.get(
+ if not self.__chash and self.fdoc:
+ self.__chash = self.fdoc.content.get(
fields.CONTENT_HASH_KEY, None)
return self.__chash
@property
- def _bdoc(self):
+ def bdoc(self):
"""
An accessor to the body document.
"""
- if not self._hdoc:
+ if not self.hdoc:
return None
if not self.__bdoc:
self.__bdoc = self._get_body_doc()
@@ -204,7 +219,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
uid = self._uid
flags = set([])
- fdoc = self._fdoc
+ fdoc = self.fdoc
if fdoc:
flags = set(fdoc.content.get(self.FLAGS_KEY, None))
@@ -230,20 +245,19 @@ class LeapMessage(fields, MailParser, MBoxParser):
:type mode: int
"""
leap_assert(isinstance(flags, tuple), "flags need to be a tuple")
- log.msg('setting flags: %s (%s)' % (self._uid, flags))
-
- doc = self._fdoc
- if not doc:
- logger.warning(
- "Could not find FDOC for %s:%s while setting flags!" %
- (self._mbox, self._uid))
- return
+ mbox, uid = self._mbox, self._uid
APPEND = 1
REMOVE = -1
SET = 0
- with self.flags_lock:
+ with fdoc_locks[mbox][uid]:
+ doc = self.fdoc
+ if not doc:
+ logger.warning(
+ "Could not find FDOC for %r:%s while setting flags!" %
+ (mbox, uid))
+ return
current = doc.content[self.FLAGS_KEY]
if mode == APPEND:
newflags = tuple(set(tuple(current) + flags))
@@ -251,33 +265,31 @@ class LeapMessage(fields, MailParser, MBoxParser):
newflags = tuple(set(current).difference(set(flags)))
elif mode == SET:
newflags = flags
+ new_fdoc = {
+ self.FLAGS_KEY: newflags,
+ self.SEEN_KEY: self.SEEN_FLAG in newflags,
+ self.DEL_KEY: self.DELETED_FLAG in newflags}
+ self._collection.memstore.update_flags(mbox, uid, new_fdoc)
- # We could defer this, but I think it's better
- # to put it under the lock...
- doc.content[self.FLAGS_KEY] = newflags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
-
- if self._collection.memstore is not None:
- log.msg("putting message in collection")
- self._collection.memstore.put_message(
- self._mbox, self._uid,
- MessageWrapper(fdoc=doc.content, new=False, dirty=True,
- docs_id={'fdoc': doc.doc_id}))
- else:
- # fallback for non-memstore initializations.
- self._soledad.put_doc(doc)
return map(str, newflags)
def getInternalDate(self):
"""
Retrieve the date internally associated with this message
- :rtype: C{str}
+ According to the spec, this is NOT the date and time in the
+ RFC-822 header, but rather a date and time that reflects when the
+ message was received.
+
+ * In SMTP, date and time of final delivery.
+ * In COPY, internal date/time of the source message.
+ * In APPEND, date/time specified.
+
:return: An RFC822-formatted date string.
+ :rtype: str
"""
- date = self._hdoc.content.get(self.DATE_KEY, '')
- return str(date)
+ date = self.hdoc.content.get(fields.DATE_KEY, '')
+ return date
#
# IMessagePart
@@ -302,8 +314,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
fd = StringIO.StringIO()
- if self._bdoc is not None:
- bdoc_content = self._bdoc.content
+ if self.bdoc is not None:
+ bdoc_content = self.bdoc.content
if empty(bdoc_content):
logger.warning("No BDOC content found for message!!!")
return write_fd("")
@@ -311,7 +323,6 @@ class LeapMessage(fields, MailParser, MBoxParser):
body = bdoc_content.get(self.RAW_KEY, "")
content_type = bdoc_content.get('content-type', "")
charset = find_charset(content_type)
- logger.debug('got charset from content-type: %s' % charset)
if charset is None:
charset = self._get_charset(body)
try:
@@ -352,8 +363,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: int
"""
size = None
- if self._fdoc:
- fdoc_content = self._fdoc.content
+ if self.fdoc is not None:
+ fdoc_content = self.fdoc.content
size = fdoc_content.get(self.SIZE_KEY, False)
else:
logger.warning("No FLAGS doc for %s:%s" % (self._mbox,
@@ -422,8 +433,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
Return the headers dict for this message.
"""
- if self._hdoc is not None:
- hdoc_content = self._hdoc.content
+ if self.hdoc is not None:
+ hdoc_content = self.hdoc.content
headers = hdoc_content.get(self.HEADERS_KEY, {})
return headers
@@ -437,8 +448,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
Return True if this message is multipart.
"""
- if self._fdoc:
- fdoc_content = self._fdoc.content
+ if self.fdoc:
+ fdoc_content = self.fdoc.content
is_multipart = fdoc_content.get(self.MULTIPART_KEY, False)
return is_multipart
else:
@@ -477,11 +488,11 @@ class LeapMessage(fields, MailParser, MBoxParser):
:raises: KeyError if key does not exist
:rtype: dict
"""
- if not self._hdoc:
+ if not self.hdoc:
logger.warning("Tried to get part but no HDOC found!")
return None
- hdoc_content = self._hdoc.content
+ hdoc_content = self.hdoc.content
pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {})
# remember, lads, soledad is using strings in its keys,
@@ -508,6 +519,7 @@ class LeapMessage(fields, MailParser, MBoxParser):
finally:
return result
+ # TODO move to soledadstore instead of accessing soledad directly
def _get_headers_doc(self):
"""
Return the document that keeps the headers for this
@@ -515,15 +527,16 @@ class LeapMessage(fields, MailParser, MBoxParser):
"""
head_docs = self._soledad.get_from_index(
fields.TYPE_C_HASH_IDX,
- fields.TYPE_HEADERS_VAL, str(self._chash))
+ fields.TYPE_HEADERS_VAL, str(self.chash))
return first(head_docs)
+ # TODO move to soledadstore instead of accessing soledad directly
def _get_body_doc(self):
"""
Return the document that keeps the body for this
message.
"""
- hdoc_content = self._hdoc.content
+ hdoc_content = self.hdoc.content
body_phash = hdoc_content.get(
fields.BODY_KEY, None)
if not body_phash:
@@ -560,14 +573,14 @@ class LeapMessage(fields, MailParser, MBoxParser):
:return: The content value indexed by C{key} or None
:rtype: str
"""
- return self._fdoc.content.get(key, None)
+ return self.fdoc.content.get(key, None)
def does_exist(self):
"""
Return True if there is actually a flags document for this
UID and mbox.
"""
- return not empty(self._fdoc)
+ return not empty(self.fdoc)
class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
@@ -672,8 +685,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
_rdoc_lock = threading.Lock()
_rdoc_property_lock = threading.Lock()
- _hdocset_lock = threading.Lock()
- _hdocset_property_lock = threading.Lock()
def __init__(self, mbox=None, soledad=None, memstore=None):
"""
@@ -714,14 +725,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.memstore = memstore
self.__rflags = None
- self.__hdocset = None
self.initialize_db()
# ensure that we have a recent-flags and a hdocs-sec doc
self._get_or_create_rdoc()
- # Not for now...
- #self._get_or_create_hdocset()
+ from twisted.internet import reactor
+ self.reactor = reactor
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
@@ -746,33 +756,26 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
rdoc[fields.MBOX_KEY] = self.mbox
self._soledad.create_doc(rdoc)
- def _get_or_create_hdocset(self):
- """
- Try to retrieve the hdocs-set doc for this MessageCollection,
- and create one if not found.
- """
- hdocset = self._get_hdocset_doc()
- if not hdocset:
- hdocset = self._get_empty_doc(self.HDOCS_SET_DOC)
- if self.mbox != fields.INBOX_VAL:
- hdocset[fields.MBOX_KEY] = self.mbox
- self._soledad.create_doc(hdocset)
-
+ @deferred_to_thread
def _do_parse(self, raw):
"""
Parse raw message and return it along with
relevant information about its outer level.
+ This is done in a separate thread, and the callback is passed
+ to `_do_add_msg` method.
+
:param raw: the raw message
:type raw: StringIO or basestring
- :return: msg, chash, size, multi
+ :return: msg, parts, chash, size, multi
:rtype: tuple
"""
msg = self._get_parsed_msg(raw)
chash = self._get_hash(msg)
size = len(msg.as_string())
multi = msg.is_multipart()
- return msg, chash, size, multi
+ parts = walk.get_parts(msg)
+ return msg, parts, chash, size, multi
def _populate_flags(self, flags, uid, chash, size, multi):
"""
@@ -840,12 +843,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:return: False, if it does not exist, or UID.
"""
exist = False
- if self.memstore is not None:
- exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
+ exist = self.memstore.get_fdoc_from_chash(chash, self.mbox)
if not exist:
exist = self._get_fdoc_from_chash(chash)
- if exist:
+ if exist and exist.content is not None:
return exist.content.get(fields.UID_KEY, "unknown-uid")
else:
return False
@@ -874,24 +876,28 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
uid when the adding succeed.
:rtype: deferred
"""
- logger.debug('adding message')
if flags is None:
flags = tuple()
leap_assert_type(flags, tuple)
- d = defer.Deferred()
- self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
- return d
+ observer = defer.Deferred()
+ d = self._do_parse(raw)
+ d.addCallback(lambda result: self.reactor.callInThread(
+ self._do_add_msg, result, flags, subject, date,
+ notify_on_disk, observer))
+ return observer
- # We SHOULD defer this (or the heavy load here) to the thread pool,
- # but it gives troubles with the QSocketNotifier used by Qt...
- def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
+ # Called in thread
+ def _do_add_msg(self, parse_result, flags, subject,
+ date, notify_on_disk, observer):
"""
Helper that creates a new message document.
Here lives the magic of the leap mail. Well, in soledad, really.
See `add_msg` docstring for parameter info.
+ :param parse_result: a tuple with the results of `self._do_parse`
+ :type parse_result: tuple
:param observer: a deferred that will be fired with the message
uid when the adding succeed.
:type observer: deferred
@@ -902,35 +908,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO add the linked-from info !
# TODO add reference to the original message
- # parse
- msg, chash, size, multi = self._do_parse(raw)
+ msg, parts, chash, size, multi = parse_result
# check for uniqueness --------------------------------
- # XXX profiler says that this test is costly.
- # So we probably should just do an in-memory check and
- # move the complete check to the soledad writer?
# Watch out! We're reserving a UID right after this!
existing_uid = self._fdoc_already_exists(chash)
if existing_uid:
- logger.warning("We already have that message in this "
- "mailbox, unflagging as deleted")
uid = existing_uid
msg = self.get_msg_by_uid(uid)
- msg.setFlags((fields.DELETED_FLAG,), -1)
- # XXX if this is deferred to thread again we should not use
- # the callback in the deferred thread, but return and
- # call the callback from the caller fun...
- observer.callback(uid)
+ # We can say the observer that we're done
+ self.reactor.callFromThread(observer.callback, uid)
+ msg.setFlags((fields.DELETED_FLAG,), -1)
return
uid = self.memstore.increment_last_soledad_uid(self.mbox)
- logger.info("ADDING MSG WITH UID: %s" % uid)
+
+ # We can say the observer that we're done at this point, but
+ # before that we should make sure it has no serious consequences
+ # if we're issued, for instance, a fetch command right after...
+ #self.reactor.callFromThread(observer.callback, uid)
+ # if we did the notify, we need to invalidate the deferred
+ # so not to try to fire it twice.
+ #observer = None
fd = self._populate_flags(flags, uid, chash, size, multi)
hd = self._populate_headr(msg, chash, subject, date)
- parts = walk.get_parts(msg)
body_phash_fun = [walk.get_body_phash_simple,
walk.get_body_phash_multi][int(multi)]
body_phash = body_phash_fun(walk.get_payloads(msg))
@@ -949,9 +953,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.set_recent_flag(uid)
msg_container = MessageWrapper(fd, hd, cdocs)
- self.memstore.create_message(self.mbox, uid, msg_container,
- observer=observer,
- notify_on_disk=notify_on_disk)
+ self.memstore.create_message(
+ self.mbox, uid, msg_container,
+ observer=observer, notify_on_disk=notify_on_disk)
#
# getters: specific queries
@@ -982,14 +986,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
{'doc_id': rdoc.doc_id, 'set': rflags})
return rflags
- #else:
- # fallback for cases without memory store
- #with self._rdoc_lock:
- #rdoc = self._get_recent_doc()
- #self.__rflags = set(rdoc.content.get(
- #fields.RECENTFLAGS_KEY, []))
- #return self.__rflags
-
def _set_recent_flags(self, value):
"""
Setter for the recent-flags set for this mailbox.
@@ -997,16 +993,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
if self.memstore is not None:
self.memstore.set_recent_flags(self.mbox, value)
- #else:
- # fallback for cases without memory store
- #with self._rdoc_lock:
- #rdoc = self._get_recent_doc()
- #newv = set(value)
- #self.__rflags = newv
- #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
- # XXX should deferLater 0 it?
- #self._soledad.put_doc(rdoc)
-
recent_flags = property(
_get_recent_flags, _set_recent_flags,
doc="Set of UIDs with the recent flag for this mailbox.")
@@ -1121,6 +1107,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# XXX is this working?
return self._get_uid_from_msgidCb(msgid)
+ @deferred_to_thread
def set_flags(self, mbox, messages, flags, mode, observer):
"""
Set flags for a sequence of messages.
@@ -1138,28 +1125,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
done.
:type observer: deferred
"""
- # XXX we could defer *this* to thread pool, and gather results...
- # XXX use deferredList
+ reactor = self.reactor
+ getmsg = self.get_msg_by_uid
- deferreds = []
- for msg_id in messages:
- deferreds.append(
- self._set_flag_for_uid(msg_id, flags, mode))
+ def set_flags(uid, flags, mode):
+ msg = getmsg(uid, mem_only=True, flags_only=True)
+ if msg is not None:
+ return uid, msg.setFlags(flags, mode)
- def notify(result):
- observer.callback(dict(result))
- d1 = defer.gatherResults(deferreds, consumeErrors=True)
- d1.addCallback(notify)
+ setted_flags = [set_flags(uid, flags, mode) for uid in messages]
+ result = dict(filter(None, setted_flags))
- @deferred_to_thread
- def _set_flag_for_uid(self, msg_id, flags, mode):
- """
- Run the set_flag operation in the thread pool.
- """
- log.msg("MSG ID = %s" % msg_id)
- msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
- if msg is not None:
- return msg_id, msg.setFlags(flags, mode)
+ reactor.callFromThread(observer.callback, result)
# getters: generic for a mailbox
@@ -1182,7 +1159,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
or None if not found.
:rtype: LeapMessage
"""
- msg_container = self.memstore.get_message(self.mbox, uid, flags_only)
+ msg_container = self.memstore.get_message(
+ self.mbox, uid, flags_only=flags_only)
+
if msg_container is not None:
if mem_only:
msg = LeapMessage(None, uid, self.mbox, collection=self,
@@ -1195,6 +1174,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
collection=self, container=msg_container)
else:
msg = LeapMessage(self._soledad, uid, self.mbox, collection=self)
+
if not msg.does_exist():
return None
return msg
@@ -1234,67 +1214,51 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
db_uids = set([doc.content[self.UID_KEY] for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox)])
+ fields.TYPE_FLAGS_VAL, self.mbox)
+ if not empty(doc)])
return db_uids
def all_uid_iter(self):
"""
Return an iterator through the UIDs of all messages, from memory.
"""
- if self.memstore is not None:
- mem_uids = self.memstore.get_uids(self.mbox)
- soledad_known_uids = self.memstore.get_soledad_known_uids(
- self.mbox)
- combined = tuple(set(mem_uids).union(soledad_known_uids))
- return combined
+ mem_uids = self.memstore.get_uids(self.mbox)
+ soledad_known_uids = self.memstore.get_soledad_known_uids(
+ self.mbox)
+ combined = tuple(set(mem_uids).union(soledad_known_uids))
+ return combined
- # XXX MOVE to memstore
- def all_flags(self):
+ def get_all_soledad_flag_docs(self):
"""
- Return a dict with all flags documents for this mailbox.
- """
- # XXX get all from memstore and cache it there
- # FIXME should get all uids, get them fro memstore,
- # and get only the missing ones from disk.
-
- all_flags = dict(((
- doc.content[self.UID_KEY],
- doc.content[self.FLAGS_KEY]) for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox)))
- if self.memstore is not None:
- uids = self.memstore.get_uids(self.mbox)
- docs = ((uid, self.memstore.get_message(self.mbox, uid))
- for uid in uids)
- for uid, doc in docs:
- all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY]
+ Return a dict with the content of all the flag documents
+ in soledad store for the given mbox.
- return all_flags
-
- def all_flags_chash(self):
- """
- Return a dict with the content-hash for all flag documents
- for this mailbox.
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: dict
"""
- all_flags_chash = dict(((
+ # XXX we really could return a reduced version with
+ # just {'uid': (flags-tuple,) since the prefetch is
+ # only oriented to get the flag tuples.
+ all_docs = [(
doc.content[self.UID_KEY],
- doc.content[self.CONTENT_HASH_KEY]) for doc in
+ dict(doc.content))
+ for doc in
self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox)))
- return all_flags_chash
+ fields.TYPE_FLAGS_VAL, self.mbox)
+ if not empty(doc.content)]
+ all_flags = dict(all_docs)
+ return all_flags
def all_headers(self):
"""
- Return a dict with all the headers documents for this
+ Return a dict with all the header documents for this
mailbox.
+
+ :rtype: dict
"""
- all_headers = dict(((
- doc.content[self.CONTENT_HASH_KEY],
- doc.content[self.HEADERS_KEY]) for doc in
- self._soledad.get_docs(self._hdocset)))
- return all_headers
+ return self.memstore.all_headers(self.mbox)
def count(self):
"""
@@ -1302,13 +1266,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:rtype: int
"""
- # XXX We should cache this in memstore too until next write...
- count = self._soledad.get_count_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox)
- if self.memstore is not None:
- count += self.memstore.count_new()
- return count
+ return self.memstore.count(self.mbox)
# unseen messages
@@ -1320,10 +1278,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:return: iterator through unseen message doc UIDs
:rtype: iterable
"""
- return (doc.content[self.UID_KEY] for doc in
- self._soledad.get_from_index(
- fields.TYPE_MBOX_SEEN_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '0'))
+ return self.memstore.unseen_iter(self.mbox)
def count_unseen(self):
"""
@@ -1332,10 +1287,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:returns: count
:rtype: int
"""
- count = self._soledad.get_count_from_index(
- fields.TYPE_MBOX_SEEN_IDX,
- fields.TYPE_FLAGS_VAL, self.mbox, '0')
- return count
+ return len(list(self.unseen_iter()))
def get_unseen(self):
"""
diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py
index ba63846..5da9bfd 100644
--- a/src/leap/mail/imap/server.py
+++ b/src/leap/mail/imap/server.py
@@ -20,9 +20,7 @@ Leap IMAP4 Server Implementation.
from copy import copy
from twisted import cred
-from twisted.internet import defer
from twisted.internet.defer import maybeDeferred
-from twisted.internet.task import deferLater
from twisted.mail import imap4
from twisted.python import log
@@ -41,6 +39,7 @@ class LeapIMAPServer(imap4.IMAP4Server):
soledad = kwargs.pop('soledad', None)
uuid = kwargs.pop('uuid', None)
userid = kwargs.pop('userid', None)
+
leap_assert(soledad, "need a soledad instance")
leap_assert_type(soledad, Soledad)
leap_assert(uuid, "need a user in the initialization")
@@ -55,6 +54,9 @@ class LeapIMAPServer(imap4.IMAP4Server):
# populate the test account properly (and only once
# per session)
+ from twisted.internet import reactor
+ self.reactor = reactor
+
def lineReceived(self, line):
"""
Attempt to parse a single line from the server.
@@ -114,6 +116,7 @@ class LeapIMAPServer(imap4.IMAP4Server):
).addCallback(
cbFetch, tag, query, uid
).addErrback(ebFetch, tag)
+
elif len(query) == 1 and str(query[0]) == "rfc822.header":
self._oldTimeout = self.setTimeout(None)
# no need to call iter, we get a generator
@@ -130,48 +133,16 @@ class LeapIMAPServer(imap4.IMAP4Server):
).addCallback(
cbFetch, tag, query, uid
).addErrback(
- ebFetch, tag
- ).addCallback(
- self.on_fetch_finished, messages)
+ ebFetch, tag)
select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset,
imap4.IMAP4Server.arg_fetchatt)
- def on_fetch_finished(self, _, messages):
- from twisted.internet import reactor
-
- print "FETCH FINISHED -- NOTIFY NEW"
- deferLater(reactor, 0, self.notifyNew)
- deferLater(reactor, 0, self.mbox.unset_recent_flags, messages)
- deferLater(reactor, 0, self.mbox.signal_unread_to_ui)
-
- def on_copy_finished(self, defers):
- d = defer.gatherResults(filter(None, defers))
-
- def when_finished(result):
- log.msg("COPY FINISHED")
- self.notifyNew()
- self.mbox.signal_unread_to_ui()
- d.addCallback(when_finished)
- #d.addCallback(self.notifyNew)
- #d.addCallback(self.mbox.signal_unread_to_ui)
-
- def do_COPY(self, tag, messages, mailbox, uid=0):
- from twisted.internet import reactor
- defers = []
- d = imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid)
- defers.append(d)
- deferLater(reactor, 0, self.on_copy_finished, defers)
-
- select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset,
- imap4.IMAP4Server.arg_astring)
-
def notifyNew(self, ignored=None):
"""
Notify new messages to listeners.
"""
- print "TRYING TO NOTIFY NEW"
- self.mbox.notify_new()
+ self.reactor.callFromThread(self.mbox.notify_new)
def _cbSelectWork(self, mbox, cmdName, tag):
"""
diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py
index 93df51d..1175cdc 100644
--- a/src/leap/mail/imap/service/imap.py
+++ b/src/leap/mail/imap/service/imap.py
@@ -25,6 +25,7 @@ from twisted.internet import defer, threads
from twisted.internet.protocol import ServerFactory
from twisted.internet.error import CannotListenError
from twisted.mail import imap4
+from twisted.python import log
logger = logging.getLogger(__name__)
@@ -71,6 +72,15 @@ DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None)
if DO_MANHOLE:
from leap.mail.imap.service import manhole
+DO_PROFILE = os.environ.get("LEAP_PROFILE", None)
+if DO_PROFILE:
+ import cProfile
+ log.msg("Starting PROFILING...")
+
+ PROFILE_DAT = "/tmp/leap_mail_profile.pstats"
+ pr = cProfile.Profile()
+ pr.enable()
+
class IMAPAuthRealm(object):
"""
@@ -115,7 +125,12 @@ class LeapIMAPFactory(ServerFactory):
# XXX how to pass the store along?
def buildProtocol(self, addr):
- "Return a protocol suitable for the job."
+ """
+ Return a protocol suitable for the job.
+
+ :param addr: remote ip address
+ :type addr: str
+ """
imapProtocol = LeapIMAPServer(
uuid=self._uuid,
userid=self._userid,
@@ -124,7 +139,7 @@ class LeapIMAPFactory(ServerFactory):
imapProtocol.factory = self
return imapProtocol
- def doStop(self, cv):
+ def doStop(self, cv=None):
"""
Stops imap service (fetcher, factory and port).
@@ -135,23 +150,30 @@ class LeapIMAPFactory(ServerFactory):
disk in another thread.
:rtype: Deferred
"""
- ServerFactory.doStop(self)
+ if DO_PROFILE:
+ log.msg("Stopping PROFILING")
+ pr.disable()
+ pr.dump_stats(PROFILE_DAT)
- def _stop_imap_cb():
- logger.debug('Stopping in memory store.')
- self._memstore.stop_and_flush()
- while not self._memstore.producer.is_queue_empty():
- logger.debug('Waiting for queue to be empty.')
- # TODO use a gatherResults over the new/dirty deferred list,
- # as in memorystore's expunge() method.
- time.sleep(1)
- # notify that service has stopped
- logger.debug('Notifying that service has stopped.')
- cv.acquire()
- cv.notify()
- cv.release()
+ ServerFactory.doStop(self)
- return threads.deferToThread(_stop_imap_cb)
+ if cv is not None:
+ def _stop_imap_cb():
+ logger.debug('Stopping in memory store.')
+ self._memstore.stop_and_flush()
+ while not self._memstore.producer.is_queue_empty():
+ logger.debug('Waiting for queue to be empty.')
+ # TODO use a gatherResults over the new/dirty
+ # deferred list,
+ # as in memorystore's expunge() method.
+ time.sleep(1)
+ # notify that service has stopped
+ logger.debug('Notifying that service has stopped.')
+ cv.acquire()
+ cv.notify()
+ cv.release()
+
+ return threads.deferToThread(_stop_imap_cb)
def run_service(*args, **kwargs):
@@ -164,6 +186,9 @@ def run_service(*args, **kwargs):
the protocol.
"""
from twisted.internet import reactor
+ # it looks like qtreactor does not honor this,
+ # but other reactors should.
+ reactor.suggestThreadPoolSize(20)
leap_assert(len(args) == 2)
soledad, keymanager = args
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index 8e22f26..732fe03 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -20,14 +20,14 @@ A MessageStore that writes to Soledad.
import logging
import threading
+from collections import defaultdict
from itertools import chain
from u1db import errors as u1db_errors
-from twisted.internet import defer
from twisted.python import log
from zope.interface import implements
-from leap.common.check import leap_assert_type
+from leap.common.check import leap_assert_type, leap_assert
from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
@@ -35,15 +35,13 @@ from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
-from leap.mail.utils import first
+from leap.mail.utils import first, empty, accumulator_queue
logger = logging.getLogger(__name__)
# TODO
-# [ ] Delete original message from the incoming queue after all successful
-# writes.
-# [ ] Implement a retry queue.
+# [ ] Implement a retry queue?
# [ ] Consider journaling of operations.
@@ -86,10 +84,12 @@ class ContentDedup(object):
if not header_docs:
return False
- if len(header_docs) != 1:
- logger.warning("Found more than one copy of chash %s!"
- % (chash,))
- logger.debug("Found header doc with that hash! Skipping save!")
+ # FIXME enable only to debug this problem.
+ #if len(header_docs) != 1:
+ #logger.warning("Found more than one copy of chash %s!"
+ #% (chash,))
+
+ #logger.debug("Found header doc with that hash! Skipping save!")
return True
def _content_does_exist(self, doc):
@@ -110,10 +110,11 @@ class ContentDedup(object):
if not attach_docs:
return False
- if len(attach_docs) != 1:
- logger.warning("Found more than one copy of phash %s!"
- % (phash,))
- logger.debug("Found attachment doc with that hash! Skipping save!")
+ # FIXME enable only to debug this problem
+ #if len(attach_docs) != 1:
+ #logger.warning("Found more than one copy of phash %s!"
+ #% (phash,))
+ #logger.debug("Found attachment doc with that hash! Skipping save!")
return True
@@ -121,13 +122,26 @@ class MsgWriteError(Exception):
"""
Raised if any exception is found while saving message parts.
"""
+ pass
+
+
+"""
+A lock per document.
+"""
+# TODO should bound the space of this!!!
+# http://stackoverflow.com/a/2437645/1157664
+# Setting this to twice the number of threads in the threadpool
+# should be safe.
+put_locks = defaultdict(lambda: threading.Lock())
class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
"""
- _last_uid_lock = threading.Lock()
+ _soledad_rw_lock = threading.Lock()
+ _remove_lock = threading.Lock()
+ _mbox_doc_locks = defaultdict(lambda: threading.Lock())
implements(IMessageConsumer, IMessageStore)
@@ -138,8 +152,20 @@ class SoledadStore(ContentDedup):
:param soledad: the soledad instance
:type soledad: Soledad
"""
+ from twisted.internet import reactor
+ self.reactor = reactor
+
self._soledad = soledad
+ self._CREATE_DOC_FUN = self._soledad.create_doc
+ self._PUT_DOC_FUN = self._soledad.put_doc
+ self._GET_DOC_FUN = self._soledad.get_doc
+
+ # we instantiate an accumulator to batch the notifications
+ self.docs_notify_queue = accumulator_queue(
+ lambda item: reactor.callFromThread(self._unset_new_dirty, item),
+ 20)
+
# IMessageStore
# -------------------------------------------------------------------
@@ -194,47 +220,32 @@ class SoledadStore(ContentDedup):
# IMessageConsumer
- # It's not thread-safe to defer this to a different thread
+ # TODO should handle the delete case
+ # TODO should handle errors better
+ # TODO could generalize this method into a generic consumer
+ # and only implement `process` here
def consume(self, queue):
"""
Creates a new document in soledad db.
- :param queue: queue to get item from, with content of the document
- to be inserted.
- :type queue: Queue
- """
- # TODO should delete the original message from incoming only after
- # the writes are done.
- # TODO should handle the delete case
- # TODO should handle errors
- # TODO could generalize this method into a generic consumer
- # and only implement `process` here
-
- def docWriteCallBack(doc_wrapper):
- """
- Callback for a successful write of a document wrapper.
- """
- if isinstance(doc_wrapper, MessageWrapper):
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- self._unset_new_dirty(doc_wrapper)
-
- def docWriteErrorBack(failure):
- """
- Errorback for write operations.
- """
- log.error("Error while processing item.")
- log.msg(failure.getTraceBack())
-
- while not queue.empty():
- doc_wrapper = queue.get()
- d = defer.Deferred()
- d.addCallbacks(docWriteCallBack, docWriteErrorBack)
-
- self._consume_doc(doc_wrapper, d)
+ :param queue: a tuple of queues to get item from, with content of the
+ document to be inserted.
+ :type queue: tuple of Queues
+ """
+ new, dirty = queue
+ while not new.empty():
+ doc_wrapper = new.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+ while not dirty.empty():
+ doc_wrapper = dirty.get()
+ self.reactor.callInThread(self._consume_doc, doc_wrapper,
+ self.docs_notify_queue)
+
+ # Queue empty, flush the notifications queue.
+ self.docs_notify_queue(None, flush=True)
- @deferred_to_thread
def _unset_new_dirty(self, doc_wrapper):
"""
Unset the `new` and `dirty` flags for this document wrapper in the
@@ -243,56 +254,76 @@ class SoledadStore(ContentDedup):
:param doc_wrapper: a MessageWrapper instance
:type doc_wrapper: MessageWrapper
"""
- # XXX debug msg id/mbox?
- logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
+ if isinstance(doc_wrapper, MessageWrapper):
+ # XXX still needed for debug quite often
+ #logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
@deferred_to_thread
- def _consume_doc(self, doc_wrapper, deferred):
+ def _consume_doc(self, doc_wrapper, notify_queue):
"""
Consume each document wrapper in a separate thread.
+ We pass an instance of an accumulator that handles the notifications
+ to the memorystore when the write has been done.
:param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
:type doc_wrapper: MessageWrapper or RecentFlagsDoc
- :param deferred: a deferred that will be fired when the write operation
- has finished, either calling its callback or its
- errback depending on whether it succeed.
- :type deferred: Deferred
+ :param notify_queue: a callable that handles the writeback
+ notifications to the memstore.
+ :type notify_queue: callable
"""
- items = self._process(doc_wrapper)
+ def queueNotifyBack(failed, doc_wrapper):
+ if failed:
+ log.msg("There was an error writing the mesage...")
+ else:
+ notify_queue(doc_wrapper)
+
+ def doSoledadCalls(items):
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+ failed = self._soledad_write_document_parts(items)
+ queueNotifyBack(failed, doc_wrapper)
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
- # From here, we unpack the subpart items and
- # the right soledad call.
+ #
+ # SoledadStore specific methods.
+ #
+
+ def _soledad_write_document_parts(self, items):
+ """
+ Write the document parts to soledad in a separate thread.
+
+ :param items: the iterator through the different document wrappers
+ payloads.
+ :type items: iterator
+ :return: whether the write was successful or not
+ :rtype: bool
+ """
failed = False
for item, call in items:
+ if empty(item):
+ continue
try:
self._try_call(call, item)
except Exception as exc:
- failed = exc
+ logger.debug("ITEM WAS: %s" % str(item))
+ logger.debug("ITEM CONTENT WAS: %s" % str(item.content))
+ logger.exception(exc)
+ failed = True
continue
- if failed:
- deferred.errback(MsgWriteError(
- "There was an error writing the mesage"))
- else:
- deferred.callback(doc_wrapper)
-
- #
- # SoledadStore specific methods.
- #
+ return failed
- def _process(self, doc_wrapper):
+ def _iter_wrapper_subparts(self, doc_wrapper):
"""
Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every
item in the queue, if any.
- :param queue: the queue from where we'll pick item.
- :type queue: Queue
+ :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
+ :type doc_wrapper: MessageWrapper or RecentFlagsDoc
"""
if isinstance(doc_wrapper, MessageWrapper):
return chain((doc_wrapper,),
@@ -315,11 +346,38 @@ class SoledadStore(ContentDedup):
"""
if call is None:
return
- try:
- call(item)
- except u1db_errors.RevisionConflict as exc:
- logger.exception("Error: %r" % (exc,))
- raise exc
+
+ if call == self._PUT_DOC_FUN:
+ doc_id = item.doc_id
+ with put_locks[doc_id]:
+ doc = self._GET_DOC_FUN(doc_id)
+
+ if doc is None:
+ logger.warning("BUG! Dirty doc but could not "
+ "find document %s" % (doc_id,))
+ return
+
+ doc.content = dict(item.content)
+
+ item = doc
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+
+ else:
+ try:
+ call(item)
+ except u1db_errors.RevisionConflict as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
+ except Exception as exc:
+ logger.exception("Error: %r" % (exc,))
+ raise exc
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
@@ -334,7 +392,7 @@ class SoledadStore(ContentDedup):
call = None
if msg_wrapper.new:
- call = self._soledad.create_doc
+ call = self._CREATE_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
@@ -353,18 +411,18 @@ class SoledadStore(ContentDedup):
# the flags doc.
elif msg_wrapper.dirty:
- call = self._soledad.put_doc
+ call = self._PUT_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
# XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
if not doc_id:
+ logger.warning("Dirty item but no doc_id!")
continue
- doc = self._soledad.get_doc(doc_id)
- doc.content = dict(item.content)
+
if item.part == MessagePartType.fdoc:
- logger.debug("PUT dirty fdoc")
- yield doc, call
+ #logger.debug("PUT dirty fdoc")
+ yield item, call
# XXX also for linkage-doc !!!
else:
@@ -379,17 +437,16 @@ class SoledadStore(ContentDedup):
:return: a tuple with recent-flags doc payload and callable
:rtype: tuple
"""
- call = self._soledad.put_doc
- rdoc = self._soledad.get_doc(rflags_wrapper.doc_id)
+ call = self._CREATE_DOC_FUN
payload = rflags_wrapper.content
- logger.debug("Saving RFLAGS to Soledad...")
-
if payload:
- rdoc.content = payload
- yield rdoc, call
+ logger.debug("Saving RFLAGS to Soledad...")
+ yield payload, call
- def _get_mbox_document(self, mbox):
+ # Mbox documents and attributes
+
+ def get_mbox_document(self, mbox):
"""
Return mailbox document.
@@ -399,15 +456,80 @@ class SoledadStore(ContentDedup):
the query failed.
:rtype: SoledadDocument or None.
"""
+ with self._mbox_doc_locks[mbox]:
+ return self._get_mbox_document(mbox)
+
+ def _get_mbox_document(self, mbox):
+ """
+ Helper for returning the mailbox document.
+ """
try:
query = self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_MBOX_VAL, mbox)
if query:
return query.pop()
+ else:
+ logger.error("Could not find mbox document for %r" %
+ (self.mbox,))
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
+ def get_mbox_closed(self, mbox):
+ """
+ Return the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :rtype: bool
+ """
+ mbox_doc = self.get_mbox_document()
+ return mbox_doc.content.get(fields.CLOSED_KEY, False)
+
+ def set_mbox_closed(self, mbox, closed):
+ """
+ Set the closed attribute for a given mailbox.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param closed: the value to be set
+ :type closed: bool
+ """
+ leap_assert(isinstance(closed, bool), "closed needs to be boolean")
+ with self._mbox_doc_locks[mbox]:
+ mbox_doc = self._get_mbox_document(mbox)
+ if mbox_doc is None:
+ logger.error(
+ "Could not find mbox document for %r" % (mbox,))
+ return
+ mbox_doc.content[fields.CLOSED_KEY] = closed
+ self._soledad.put_doc(mbox_doc)
+
+ def write_last_uid(self, mbox, value):
+ """
+ Write the `last_uid` integer to the proper mailbox document
+ in Soledad.
+ This is called from the deferred triggered by
+ memorystore.increment_last_soledad_uid, which is expected to
+ run in a separate thread.
+
+ :param mbox: the mailbox
+ :type mbox: str or unicode
+ :param value: the value to set
+ :type value: int
+ """
+ leap_assert_type(value, int)
+ key = fields.LAST_UID_KEY
+
+ # XXX change for a lock related to the mbox document
+ # itself.
+ with self._mbox_doc_locks[mbox]:
+ mbox_doc = self._get_mbox_document(mbox)
+ old_val = mbox_doc.content[key]
+ if value > old_val:
+ mbox_doc.content[key] = value
+ self._soledad.put_doc(mbox_doc)
+
def get_flags_doc(self, mbox, uid):
"""
Return the SoledadDocument for the given mbox and uid.
@@ -416,12 +538,16 @@ class SoledadStore(ContentDedup):
:type mbox: str or unicode
:param uid: the UID for the message
:type uid: int
+ :rtype: SoledadDocument or None
"""
result = None
try:
flag_docs = self._soledad.get_from_index(
fields.TYPE_MBOX_UID_IDX,
fields.TYPE_FLAGS_VAL, mbox, str(uid))
+ if len(flag_docs) != 1:
+ logger.warning("More than one flag doc for %r:%s" %
+ (mbox, uid))
result = first(flag_docs)
except Exception as exc:
# ugh! Something's broken down there!
@@ -430,36 +556,25 @@ class SoledadStore(ContentDedup):
finally:
return result
- def write_last_uid(self, mbox, value):
+ def get_headers_doc(self, chash):
"""
- Write the `last_uid` integer to the proper mailbox document
- in Soledad.
- This is called from the deferred triggered by
- memorystore.increment_last_soledad_uid, which is expected to
- run in a separate thread.
+ Return the document that keeps the headers for a message
+ indexed by its content-hash.
- :param mbox: the mailbox
- :type mbox: str or unicode
- :param value: the value to set
- :type value: int
+ :param chash: the content-hash to retrieve the document from.
+ :type chash: str or unicode
+ :rtype: SoledadDocument or None
"""
- leap_assert_type(value, int)
- key = fields.LAST_UID_KEY
-
- with self._last_uid_lock:
- mbox_doc = self._get_mbox_document(mbox)
- old_val = mbox_doc.content[key]
- if value < old_val:
- logger.error("%r:%s Tried to write a UID lesser than what's "
- "stored!" % (mbox, value))
- mbox_doc.content[key] = value
- self._soledad.put_doc(mbox_doc)
+ head_docs = self._soledad.get_from_index(
+ fields.TYPE_C_HASH_IDX,
+ fields.TYPE_HEADERS_VAL, str(chash))
+ return first(head_docs)
# deleted messages
def deleted_iter(self, mbox):
"""
- Get an iterator for the SoledadDocuments for messages
+ Get an iterator for the the doc_id for SoledadDocuments for messages
with \\Deleted flag for a given mailbox.
:param mbox: the mailbox
@@ -467,11 +582,10 @@ class SoledadStore(ContentDedup):
:return: iterator through deleted message docs
:rtype: iterable
"""
- return (doc for doc in self._soledad.get_from_index(
+ return [doc.doc_id for doc in self._soledad.get_from_index(
fields.TYPE_MBOX_DEL_IDX,
- fields.TYPE_FLAGS_VAL, mbox, '1'))
+ fields.TYPE_FLAGS_VAL, mbox, '1')]
- # TODO can deferToThread this?
def remove_all_deleted(self, mbox):
"""
Remove from Soledad all messages flagged as deleted for a given
@@ -481,7 +595,14 @@ class SoledadStore(ContentDedup):
:type mbox: str or unicode
"""
deleted = []
- for doc in self.deleted_iter(mbox):
- deleted.append(doc.content[fields.UID_KEY])
- self._soledad.delete_doc(doc)
+ for doc_id in self.deleted_iter(mbox):
+ with self._remove_lock:
+ doc = self._soledad.get_doc(doc_id)
+ if doc is not None:
+ self._soledad.delete_doc(doc)
+ try:
+ deleted.append(doc.content[fields.UID_KEY])
+ except TypeError:
+ # empty content
+ pass
return deleted
diff --git a/src/leap/mail/imap/tests/regressions b/src/leap/mail/imap/tests/regressions
index 0a43398..efe3f46 100755
--- a/src/leap/mail/imap/tests/regressions
+++ b/src/leap/mail/imap/tests/regressions
@@ -101,7 +101,6 @@ def compare_msg_parts(a, b):
pprint(b[index])
print
-
return all_match
@@ -328,7 +327,7 @@ def cbAppendNextMessage(proto):
return proto.append(
REGRESSIONS_FOLDER, msg
).addCallback(
- lambda r: proto.examine(REGRESSIONS_FOLDER)
+ lambda r: proto.select(REGRESSIONS_FOLDER)
).addCallback(
cbAppend, proto, raw
).addErrback(
@@ -379,6 +378,9 @@ def cbCompareMessage(result, proto, raw):
if result:
keys = result.keys()
keys.sort()
+ else:
+ print "[-] GOT NO RESULT"
+ return proto.logout()
latest = max(keys)
diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py
index 8c1cf20..fd88440 100644
--- a/src/leap/mail/imap/tests/test_imap.py
+++ b/src/leap/mail/imap/tests/test_imap.py
@@ -43,6 +43,7 @@ from itertools import chain
from mock import Mock
from nose.twistedtools import deferred, stop_reactor
+from unittest import skip
from twisted.mail import imap4
@@ -64,11 +65,16 @@ import twisted.cred.portal
from leap.common.testing.basetest import BaseLeapTest
from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.mailbox import SoledadMailbox
+from leap.mail.imap.memorystore import MemoryStore
from leap.mail.imap.messages import MessageCollection
+from leap.mail.imap.server import LeapIMAPServer
from leap.soledad.client import Soledad
from leap.soledad.client import SoledadCrypto
+TEST_USER = "testuser@leap.se"
+TEST_PASSWD = "1234"
+
def strip(f):
return lambda result, f=f: f()
@@ -89,10 +95,10 @@ def initialize_soledad(email, gnupg_home, tempdir):
"""
Initializes soledad by hand
- @param email: ID for the user
- @param gnupg_home: path to home used by gnupg
- @param tempdir: path to temporal dir
- @rtype: Soledad instance
+ :param email: ID for the user
+ :param gnupg_home: path to home used by gnupg
+ :param tempdir: path to temporal dir
+ :rtype: Soledad instance
"""
uuid = "foobar-uuid"
@@ -125,55 +131,6 @@ def initialize_soledad(email, gnupg_home, tempdir):
return _soledad
-#
-# Simple LEAP IMAP4 Server for testing
-#
-
-class SimpleLEAPServer(imap4.IMAP4Server):
-
- """
- A Simple IMAP4 Server with mailboxes backed by Soledad.
-
- This should be pretty close to the real LeapIMAP4Server that we
- will be instantiating as a service, minus the authentication bits.
- """
-
- def __init__(self, *args, **kw):
-
- soledad = kw.pop('soledad', None)
-
- imap4.IMAP4Server.__init__(self, *args, **kw)
- realm = TestRealm()
-
- # XXX Why I AM PASSING THE ACCOUNT TO
- # REALM? I AM NOT USING THAT NOW, AM I???
- realm.theAccount = SoledadBackedAccount(
- 'testuser',
- soledad=soledad)
-
- portal = cred.portal.Portal(realm)
- c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.checker = c
- self.portal = portal
- portal.registerChecker(c)
- self.timeoutTest = False
-
- def lineReceived(self, line):
- if self.timeoutTest:
- # Do not send a respones
- return
-
- imap4.IMAP4Server.lineReceived(self, line)
-
- _username = 'testuser'
- _password = 'password-test'
-
- def authenticateLogin(self, username, password):
- if username == self._username and password == self._password:
- return imap4.IAccount, self.theAccount, lambda: None
- raise cred.error.UnauthorizedLogin()
-
-
class TestRealm:
"""
@@ -255,13 +212,6 @@ class IMAP4HelperMixin(BaseLeapTest):
# Soledad: config info
cls.gnupg_home = "%s/gnupg" % cls.tempdir
cls.email = 'leap@leap.se'
- # cls.db1_file = "%s/db1.u1db" % cls.tempdir
- # cls.db2_file = "%s/db2.u1db" % cls.tempdir
- # open test dbs
- # cls._db1 = u1db.open(cls.db1_file, create=True,
- # document_factory=SoledadDocument)
- # cls._db2 = u1db.open(cls.db2_file, create=True,
- # document_factory=SoledadDocument)
# initialize soledad by hand so we can control keys
cls._soledad = initialize_soledad(
@@ -283,8 +233,6 @@ class IMAP4HelperMixin(BaseLeapTest):
Restores the old path and home environment variables.
Removes the temporal dir created for tests.
"""
- # cls._db1.close()
- # cls._db2.close()
cls._soledad.close()
os.environ["PATH"] = cls.old_path
@@ -301,8 +249,13 @@ class IMAP4HelperMixin(BaseLeapTest):
but passing the same Soledad instance (it's costly to initialize),
so we have to be sure to restore state across tests.
"""
+ UUID = 'deadbeef',
+ USERID = TEST_USER
+ memstore = MemoryStore()
+
d = defer.Deferred()
- self.server = SimpleLEAPServer(
+ self.server = LeapIMAPServer(
+ uuid=UUID, userid=USERID,
contextFactory=self.serverCTX,
# XXX do we really need this??
soledad=self._soledad)
@@ -317,9 +270,10 @@ class IMAP4HelperMixin(BaseLeapTest):
# I THINK we ONLY need to do it at one place now.
theAccount = SoledadBackedAccount(
- 'testuser',
- soledad=self._soledad)
- SimpleLEAPServer.theAccount = theAccount
+ USERID,
+ soledad=self._soledad,
+ memstore=memstore)
+ LeapIMAPServer.theAccount = theAccount
# in case we get something from previous tests...
for mb in self.server.theAccount.mailboxes:
@@ -404,8 +358,9 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
We override mixin method since we are only testing
MessageCollection interface in this particular TestCase
"""
+ memstore = MemoryStore()
self.messages = MessageCollection("testmbox%s" % (self.count,),
- self._soledad)
+ self._soledad, memstore=memstore)
MessageCollectionTestCase.count += 1
def tearDown(self):
@@ -414,9 +369,6 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
del self.messages
- def wait(self):
- time.sleep(2)
-
def testEmptyMessage(self):
"""
Test empty message and collection
@@ -425,11 +377,11 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(
em,
{
+ "chash": '',
+ "deleted": False,
"flags": [],
"mbox": "inbox",
- "recent": True,
"seen": False,
- "deleted": False,
"multi": False,
"size": 0,
"type": "flags",
@@ -441,79 +393,100 @@ class MessageCollectionTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Add multiple messages
"""
- # TODO really profile addition
mc = self.messages
- print "messages", self.messages
self.assertEqual(self.messages.count(), 0)
- mc.add_msg('Stuff', uid=1, subject="test1")
- mc.add_msg('Stuff', uid=2, subject="test2")
- mc.add_msg('Stuff', uid=3, subject="test3")
- mc.add_msg('Stuff', uid=4, subject="test4")
- self.wait()
- self.assertEqual(self.messages.count(), 4)
- mc.add_msg('Stuff', uid=5, subject="test5")
- mc.add_msg('Stuff', uid=6, subject="test6")
- mc.add_msg('Stuff', uid=7, subject="test7")
- self.wait()
- self.assertEqual(self.messages.count(), 7)
- self.wait()
+ def add_first():
+ d = defer.gatherResults([
+ mc.add_msg('Stuff 1', uid=1, subject="test1"),
+ mc.add_msg('Stuff 2', uid=2, subject="test2"),
+ mc.add_msg('Stuff 3', uid=3, subject="test3"),
+ mc.add_msg('Stuff 4', uid=4, subject="test4")])
+ return d
+
+ def add_second(result):
+ d = defer.gatherResults([
+ mc.add_msg('Stuff 5', uid=5, subject="test5"),
+ mc.add_msg('Stuff 6', uid=6, subject="test6"),
+ mc.add_msg('Stuff 7', uid=7, subject="test7")])
+ return d
+
+ def check_second(result):
+ return self.assertEqual(mc.count(), 7)
+
+ d1 = add_first()
+ d1.addCallback(add_second)
+ d1.addCallback(check_second)
+
+ @skip("needs update!")
def testRecentCount(self):
"""
Test the recent count
"""
mc = self.messages
- self.assertEqual(self.messages.count_recent(), 0)
- mc.add_msg('Stuff', uid=1, subject="test1")
+ countrecent = mc.count_recent
+ eq = self.assertEqual
+
+ self.assertEqual(countrecent(), 0)
+
+ d = mc.add_msg('Stuff', uid=1, subject="test1")
# For the semantics defined in the RFC, we auto-add the
# recent flag by default.
- self.wait()
- self.assertEqual(self.messages.count_recent(), 1)
- mc.add_msg('Stuff', subject="test2", uid=2,
- flags=('\\Deleted',))
- self.wait()
- self.assertEqual(self.messages.count_recent(), 2)
- mc.add_msg('Stuff', subject="test3", uid=3,
- flags=('\\Recent',))
- self.wait()
- self.assertEqual(self.messages.count_recent(), 3)
- mc.add_msg('Stuff', subject="test4", uid=4,
- flags=('\\Deleted', '\\Recent'))
- self.wait()
- self.assertEqual(self.messages.count_recent(), 4)
-
- for msg in mc:
- msg.removeFlags(('\\Recent',))
- self.assertEqual(mc.count_recent(), 0)
+
+ def add2(_):
+ return mc.add_msg('Stuff', subject="test2", uid=2,
+ flags=('\\Deleted',))
+
+ def add3(_):
+ return mc.add_msg('Stuff', subject="test3", uid=3,
+ flags=('\\Recent',))
+
+ def add4(_):
+ return mc.add_msg('Stuff', subject="test4", uid=4,
+ flags=('\\Deleted', '\\Recent'))
+
+ d.addCallback(lambda r: eq(countrecent(), 1))
+ d.addCallback(add2)
+ d.addCallback(lambda r: eq(countrecent(), 2))
+ d.addCallback(add3)
+ d.addCallback(lambda r: eq(countrecent(), 3))
+ d.addCallback(add4)
+ d.addCallback(lambda r: eq(countrecent(), 4))
def testFilterByMailbox(self):
"""
Test that queries filter by selected mailbox
"""
- def wait():
- time.sleep(1)
-
mc = self.messages
self.assertEqual(self.messages.count(), 0)
- mc.add_msg('', uid=1, subject="test1")
- mc.add_msg('', uid=2, subject="test2")
- mc.add_msg('', uid=3, subject="test3")
- wait()
- self.assertEqual(self.messages.count(), 3)
- newmsg = mc._get_empty_doc()
- newmsg['mailbox'] = "mailbox/foo"
- mc._soledad.create_doc(newmsg)
- self.assertEqual(mc.count(), 3)
- self.assertEqual(
- len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4)
+
+ def add_1():
+ d1 = mc.add_msg('msg 1', uid=1, subject="test1")
+ d2 = mc.add_msg('msg 2', uid=2, subject="test2")
+ d3 = mc.add_msg('msg 3', uid=3, subject="test3")
+ d = defer.gatherResults([d1, d2, d3])
+ return d
+
+ add_1().addCallback(lambda ignored: self.assertEqual(
+ mc.count(), 3))
+
+ # XXX this has to be redone to fit memstore ------------#
+ #newmsg = mc._get_empty_doc()
+ #newmsg['mailbox'] = "mailbox/foo"
+ #mc._soledad.create_doc(newmsg)
+ #self.assertEqual(mc.count(), 3)
+ #self.assertEqual(
+ #len(mc._soledad.get_from_index(mc.TYPE_IDX, "flags")), 4)
class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
+ # TODO this currently will use a memory-only store.
+ # create a different one for testing soledad sync.
"""
Tests for the generic behavior of the LeapIMAP4Server
which, right now, it's just implemented in this test file as
- SimpleLEAPServer. We will move the implementation, together with
+ LeapIMAPServer. We will move the implementation, together with
authentication bits, to leap.mail.imap.server so it can be instantiated
from the tac file.
@@ -542,7 +515,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.result.append(0)
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def create():
for name in succeed + fail:
@@ -560,7 +533,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
def _cbTestCreate(self, ignored, succeed, fail):
self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail))
- mbox = SimpleLEAPServer.theAccount.mailboxes
+ mbox = LeapIMAPServer.theAccount.mailboxes
answers = ['foobox', 'testbox', 'test/box', 'test', 'test/box/box']
mbox.sort()
answers.sort()
@@ -571,10 +544,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Test whether we can delete mailboxes
"""
- SimpleLEAPServer.theAccount.addMailbox('delete/me')
+ LeapIMAPServer.theAccount.addMailbox('delete/me')
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def delete():
return self.client.delete('delete/me')
@@ -586,7 +559,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d = defer.gatherResults([d1, d2])
d.addCallback(
lambda _: self.assertEqual(
- SimpleLEAPServer.theAccount.mailboxes, []))
+ LeapIMAPServer.theAccount.mailboxes, []))
return d
def testIllegalInboxDelete(self):
@@ -597,7 +570,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.stashed = None
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def delete():
return self.client.delete('inbox')
@@ -619,10 +592,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
def testNonExistentDelete(self):
"""
Test what happens if we try to delete a non-existent mailbox.
- We expect an error raised stating 'No such inbox'
+ We expect an error raised stating 'No such mailbox'
"""
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def delete():
return self.client.delete('delete/me')
@@ -637,8 +610,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
d2 = self.loopback()
d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEqual(str(self.failure.value),
- 'No such mailbox'))
+ d.addCallback(lambda _: self.assertTrue(
+ str(self.failure.value).startswith('No such mailbox')))
return d
@deferred(timeout=None)
@@ -649,14 +622,14 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
Obs: this test will fail if SoledadMailbox returns hardcoded flags.
"""
- SimpleLEAPServer.theAccount.addMailbox('delete')
- to_delete = SimpleLEAPServer.theAccount.getMailbox('delete')
+ LeapIMAPServer.theAccount.addMailbox('delete')
+ to_delete = LeapIMAPServer.theAccount.getMailbox('delete')
to_delete.setFlags((r'\Noselect',))
to_delete.getFlags()
- SimpleLEAPServer.theAccount.addMailbox('delete/me')
+ LeapIMAPServer.theAccount.addMailbox('delete/me')
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def delete():
return self.client.delete('delete')
@@ -681,10 +654,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Test whether we can rename a mailbox
"""
- SimpleLEAPServer.theAccount.addMailbox('oldmbox')
+ LeapIMAPServer.theAccount.addMailbox('oldmbox')
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def rename():
return self.client.rename('oldmbox', 'newname')
@@ -696,7 +669,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d = defer.gatherResults([d1, d2])
d.addCallback(lambda _:
self.assertEqual(
- SimpleLEAPServer.theAccount.mailboxes,
+ LeapIMAPServer.theAccount.mailboxes,
['newname']))
return d
@@ -709,7 +682,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.stashed = None
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def rename():
return self.client.rename('inbox', 'frotz')
@@ -733,11 +706,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Try to rename hierarchical mailboxes
"""
- SimpleLEAPServer.theAccount.create('oldmbox/m1')
- SimpleLEAPServer.theAccount.create('oldmbox/m2')
+ LeapIMAPServer.theAccount.create('oldmbox/m1')
+ LeapIMAPServer.theAccount.create('oldmbox/m2')
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def rename():
return self.client.rename('oldmbox', 'newname')
@@ -750,7 +723,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(self._cbTestHierarchicalRename)
def _cbTestHierarchicalRename(self, ignored):
- mboxes = SimpleLEAPServer.theAccount.mailboxes
+ mboxes = LeapIMAPServer.theAccount.mailboxes
expected = ['newname', 'newname/m1', 'newname/m2']
mboxes.sort()
self.assertEqual(mboxes, [s for s in expected])
@@ -761,7 +734,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
Test whether we can mark a mailbox as subscribed to
"""
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def subscribe():
return self.client.subscribe('this/mbox')
@@ -773,7 +746,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d = defer.gatherResults([d1, d2])
d.addCallback(lambda _:
self.assertEqual(
- SimpleLEAPServer.theAccount.subscriptions,
+ LeapIMAPServer.theAccount.subscriptions,
['this/mbox']))
return d
@@ -782,11 +755,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Test whether we can unsubscribe from a set of mailboxes
"""
- SimpleLEAPServer.theAccount.subscribe('this/mbox')
- SimpleLEAPServer.theAccount.subscribe('that/mbox')
+ LeapIMAPServer.theAccount.subscribe('this/mbox')
+ LeapIMAPServer.theAccount.subscribe('that/mbox')
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def unsubscribe():
return self.client.unsubscribe('this/mbox')
@@ -798,7 +771,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d = defer.gatherResults([d1, d2])
d.addCallback(lambda _:
self.assertEqual(
- SimpleLEAPServer.theAccount.subscriptions,
+ LeapIMAPServer.theAccount.subscriptions,
['that/mbox']))
return d
@@ -811,7 +784,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.selectedArgs = None
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def select():
def selected(args):
@@ -829,7 +802,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
def _cbTestSelect(self, ignored):
- mbox = SimpleLEAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT')
+ mbox = LeapIMAPServer.theAccount.getMailbox('TESTMAILBOX-SELECT')
self.assertEqual(self.server.mbox.messages.mbox, mbox.messages.mbox)
self.assertEqual(self.selectedArgs, {
'EXISTS': 0, 'RECENT': 0, 'UIDVALIDITY': 42,
@@ -920,7 +893,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
Test login
"""
def login():
- d = self.client.login('testuser', 'password-test')
+ d = self.client.login(TEST_USER, TEST_PASSWD)
d.addCallback(self._cbStopClient)
d1 = self.connected.addCallback(
strip(login)).addErrback(self._ebGeneral)
@@ -928,7 +901,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(self._cbTestLogin)
def _cbTestLogin(self, ignored):
- self.assertEqual(self.server.account, SimpleLEAPServer.theAccount)
+ self.assertEqual(self.server.account, LeapIMAPServer.theAccount)
self.assertEqual(self.server.state, 'auth')
@deferred(timeout=None)
@@ -937,7 +910,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
Test bad login
"""
def login():
- d = self.client.login('testuser', 'wrong-password')
+ d = self.client.login("bad_user@leap.se", TEST_PASSWD)
d.addBoth(self._cbStopClient)
d1 = self.connected.addCallback(
@@ -947,19 +920,19 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(self._cbTestFailedLogin)
def _cbTestFailedLogin(self, ignored):
- self.assertEqual(self.server.account, None)
self.assertEqual(self.server.state, 'unauth')
+ self.assertEqual(self.server.account, None)
@deferred(timeout=None)
def testLoginRequiringQuoting(self):
"""
Test login requiring quoting
"""
- self.server._username = '{test}user'
+ self.server._userid = '{test}user@leap.se'
self.server._password = '{test}password'
def login():
- d = self.client.login('{test}user', '{test}password')
+ d = self.client.login('{test}user@leap.se', '{test}password')
d.addBoth(self._cbStopClient)
d1 = self.connected.addCallback(
@@ -968,7 +941,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
return d.addCallback(self._cbTestLoginRequiringQuoting)
def _cbTestLoginRequiringQuoting(self, ignored):
- self.assertEqual(self.server.account, SimpleLEAPServer.theAccount)
+ self.assertEqual(self.server.account, LeapIMAPServer.theAccount)
self.assertEqual(self.server.state, 'auth')
#
@@ -983,7 +956,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.namespaceArgs = None
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def namespace():
def gotNamespace(args):
@@ -1022,7 +995,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.examinedArgs = None
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def examine():
def examined(args):
@@ -1049,15 +1022,15 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
'READ-WRITE': False})
def _listSetup(self, f):
- SimpleLEAPServer.theAccount.addMailbox('root/subthingl',
- creation_ts=42)
- SimpleLEAPServer.theAccount.addMailbox('root/another-thing',
- creation_ts=42)
- SimpleLEAPServer.theAccount.addMailbox('non-root/subthing',
- creation_ts=42)
+ LeapIMAPServer.theAccount.addMailbox('root/subthingl',
+ creation_ts=42)
+ LeapIMAPServer.theAccount.addMailbox('root/another-thing',
+ creation_ts=42)
+ LeapIMAPServer.theAccount.addMailbox('non-root/subthing',
+ creation_ts=42)
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def listed(answers):
self.listed = answers
@@ -1092,7 +1065,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Test LSub command
"""
- SimpleLEAPServer.theAccount.subscribe('root/subthingl2')
+ LeapIMAPServer.theAccount.subscribe('root/subthingl2')
def lsub():
return self.client.lsub('root', '%')
@@ -1106,12 +1079,12 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Test Status command
"""
- SimpleLEAPServer.theAccount.addMailbox('root/subthings')
+ LeapIMAPServer.theAccount.addMailbox('root/subthings')
# XXX FIXME ---- should populate this a little bit,
# with unseen etc...
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def status():
return self.client.status(
@@ -1139,7 +1112,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
Test failed status command with a non-existent mailbox
"""
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def status():
return self.client.status(
@@ -1180,13 +1153,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
infile = util.sibpath(__file__, 'rfc822.message')
message = open(infile)
- SimpleLEAPServer.theAccount.addMailbox('root/subthing')
+ LeapIMAPServer.theAccount.addMailbox('root/subthing')
def login():
- return self.client.login('testuser', 'password-test')
-
- def wait():
- time.sleep(0.5)
+ return self.client.login(TEST_USER, TEST_PASSWD)
def append():
return self.client.append(
@@ -1198,21 +1168,19 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
d1 = self.connected.addCallback(strip(login))
d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(strip(wait), self._ebGeneral)
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
d2 = self.loopback()
d = defer.gatherResults([d1, d2])
return d.addCallback(self._cbTestFullAppend, infile)
def _cbTestFullAppend(self, ignored, infile):
- mb = SimpleLEAPServer.theAccount.getMailbox('root/subthing')
- time.sleep(0.5)
+ mb = LeapIMAPServer.theAccount.getMailbox('root/subthing')
self.assertEqual(1, len(mb.messages))
msg = mb.messages.get_msg_by_uid(1)
self.assertEqual(
- ('\\SEEN', '\\DELETED'),
- msg.getFlags())
+ set(('\\Recent', '\\SEEN', '\\DELETED')),
+ set(msg.getFlags()))
self.assertEqual(
'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
@@ -1220,14 +1188,11 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
parsed = self.parser.parse(open(infile))
body = parsed.get_payload()
- headers = parsed.items()
+ headers = dict(parsed.items())
self.assertEqual(
body,
msg.getBodyFile().read())
-
- msg_headers = msg.getHeaders(True, "",)
- gotheaders = list(chain(
- *[[(k, item) for item in v] for (k, v) in msg_headers.items()]))
+ gotheaders = msg.getHeaders(True)
self.assertItemsEqual(
headers, gotheaders)
@@ -1238,13 +1203,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
Test partially appending a message to the mailbox
"""
infile = util.sibpath(__file__, 'rfc822.message')
- SimpleLEAPServer.theAccount.addMailbox('PARTIAL/SUBTHING')
+ LeapIMAPServer.theAccount.addMailbox('PARTIAL/SUBTHING')
def login():
- return self.client.login('testuser', 'password-test')
-
- def wait():
- time.sleep(1)
+ return self.client.login(TEST_USER, TEST_PASSWD)
def append():
message = file(infile)
@@ -1257,7 +1219,6 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
)
)
d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(wait), self._ebGeneral)
d1.addCallbacks(strip(append), self._ebGeneral)
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
d2 = self.loopback()
@@ -1266,16 +1227,13 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self._cbTestPartialAppend, infile)
def _cbTestPartialAppend(self, ignored, infile):
- mb = SimpleLEAPServer.theAccount.getMailbox('PARTIAL/SUBTHING')
- time.sleep(1)
+ mb = LeapIMAPServer.theAccount.getMailbox('PARTIAL/SUBTHING')
self.assertEqual(1, len(mb.messages))
msg = mb.messages.get_msg_by_uid(1)
self.assertEqual(
- ('\\SEEN', ),
- msg.getFlags()
+ set(('\\SEEN', '\\Recent')),
+ set(msg.getFlags())
)
- #self.assertEqual(
- #'Right now', msg.getInternalDate())
parsed = self.parser.parse(open(infile))
body = parsed.get_payload()
self.assertEqual(
@@ -1287,10 +1245,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
"""
Test check command
"""
- SimpleLEAPServer.theAccount.addMailbox('root/subthing')
+ LeapIMAPServer.theAccount.addMailbox('root/subthing')
def login():
- return self.client.login('testuser', 'password-test')
+ return self.client.login(TEST_USER, TEST_PASSWD)
def select():
return self.client.select('root/subthing')
@@ -1306,7 +1264,7 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
# Okay, that was fun
- @deferred(timeout=None)
+ @deferred(timeout=5)
def testClose(self):
"""
Test closing the mailbox. We expect to get deleted all messages flagged
@@ -1315,29 +1273,33 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
name = 'mailbox-close'
self.server.theAccount.addMailbox(name)
- m = SimpleLEAPServer.theAccount.getMailbox(name)
- m.messages.add_msg('test 1', uid=1, subject="Message 1",
- flags=('\\Deleted', 'AnotherFlag'))
- m.messages.add_msg('test 2', uid=2, subject="Message 2",
- flags=('AnotherFlag',))
- m.messages.add_msg('test 3', uid=3, subject="Message 3",
- flags=('\\Deleted',))
+ m = LeapIMAPServer.theAccount.getMailbox(name)
def login():
- return self.client.login('testuser', 'password-test')
-
- def wait():
- time.sleep(1)
+ return self.client.login(TEST_USER, TEST_PASSWD)
def select():
return self.client.select(name)
+ def add_messages():
+ d1 = m.messages.add_msg(
+ 'test 1', uid=1, subject="Message 1",
+ flags=('\\Deleted', 'AnotherFlag'))
+ d2 = m.messages.add_msg(
+ 'test 2', uid=2, subject="Message 2",
+ flags=('AnotherFlag',))
+ d3 = m.messages.add_msg(
+ 'test 3', uid=3, subject="Message 3",
+ flags=('\\Deleted',))
+ d = defer.gatherResults([d1, d2, d3])
+ return d
+
def close():
return self.client.close()
d = self.connected.addCallback(strip(login))
- d.addCallbacks(strip(wait), self._ebGeneral)
d.addCallbacks(strip(select), self._ebGeneral)
+ d.addCallbacks(strip(add_messages), self._ebGeneral)
d.addCallbacks(strip(close), self._ebGeneral)
d.addCallbacks(self._cbStopClient, self._ebGeneral)
d2 = self.loopback()
@@ -1345,37 +1307,42 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
def _cbTestClose(self, ignored, m):
self.assertEqual(len(m.messages), 1)
- messages = [msg for msg in m.messages]
- self.assertFalse(messages[0] is None)
+
+ msg = m.messages.get_msg_by_uid(2)
+ self.assertFalse(msg is None)
self.assertEqual(
- messages[0]._hdoc.content['subject'],
+ msg._hdoc.content['subject'],
'Message 2')
self.failUnless(m.closed)
- @deferred(timeout=None)
+ @deferred(timeout=5)
def testExpunge(self):
"""
Test expunge command
"""
name = 'mailbox-expunge'
- SimpleLEAPServer.theAccount.addMailbox(name)
- m = SimpleLEAPServer.theAccount.getMailbox(name)
- m.messages.add_msg('test 1', uid=1, subject="Message 1",
- flags=('\\Deleted', 'AnotherFlag'))
- m.messages.add_msg('test 2', uid=2, subject="Message 2",
- flags=('AnotherFlag',))
- m.messages.add_msg('test 3', uid=3, subject="Message 3",
- flags=('\\Deleted',))
+ self.server.theAccount.addMailbox(name)
+ m = LeapIMAPServer.theAccount.getMailbox(name)
def login():
- return self.client.login('testuser', 'password-test')
-
- def wait():
- time.sleep(2)
+ return self.client.login(TEST_USER, TEST_PASSWD)
def select():
return self.client.select('mailbox-expunge')
+ def add_messages():
+ d1 = m.messages.add_msg(
+ 'test 1', uid=1, subject="Message 1",
+ flags=('\\Deleted', 'AnotherFlag'))
+ d2 = m.messages.add_msg(
+ 'test 2', uid=2, subject="Message 2",
+ flags=('AnotherFlag',))
+ d3 = m.messages.add_msg(
+ 'test 3', uid=3, subject="Message 3",
+ flags=('\\Deleted',))
+ d = defer.gatherResults([d1, d2, d3])
+ return d
+
def expunge():
return self.client.expunge()
@@ -1385,8 +1352,8 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
self.results = None
d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(wait), self._ebGeneral)
d1.addCallbacks(strip(select), self._ebGeneral)
+ d1.addCallbacks(strip(add_messages), self._ebGeneral)
d1.addCallbacks(strip(expunge), self._ebGeneral)
d1.addCallbacks(expunged, self._ebGeneral)
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
@@ -1397,9 +1364,10 @@ class LeapIMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
def _cbTestExpunge(self, ignored, m):
# we only left 1 mssage with no deleted flag
self.assertEqual(len(m.messages), 1)
- messages = [msg for msg in m.messages]
+
+ msg = m.messages.get_msg_by_uid(2)
self.assertEqual(
- messages[0]._hdoc.content['subject'],
+ msg._hdoc.content['subject'],
'Message 2')
# the uids of the deleted messages
self.assertItemsEqual(self.results, [1, 3])
diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py
index 80121c8..c8f224c 100644
--- a/src/leap/mail/messageflow.py
+++ b/src/leap/mail/messageflow.py
@@ -49,7 +49,7 @@ class IMessageProducer(Interface):
entities.
"""
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
"""
@@ -101,6 +101,10 @@ class MessageProducer(object):
# and consumption is not likely (?) to consume huge amounts of memory in
# our current settings, so the need to pause the stream is not urgent now.
+ # TODO use enum
+ STATE_NEW = 1
+ STATE_DIRTY = 2
+
def __init__(self, consumer, queue=Queue.Queue, period=1):
"""
Initializes the MessageProducer
@@ -115,7 +119,8 @@ class MessageProducer(object):
# it should implement a `consume` method
self._consumer = consumer
- self._queue = queue()
+ self._queue_new = queue()
+ self._queue_dirty = queue()
self._period = period
self._loop = LoopingCall(self._check_for_new)
@@ -130,7 +135,7 @@ class MessageProducer(object):
If the queue is found empty, the loop is stopped. It will be started
again after the addition of new items.
"""
- self._consumer.consume(self._queue)
+ self._consumer.consume((self._queue_new, self._queue_dirty))
if self.is_queue_empty():
self.stop()
@@ -138,11 +143,13 @@ class MessageProducer(object):
"""
Return True if queue is empty, False otherwise.
"""
- return self._queue.empty()
+ new = self._queue_new
+ dirty = self._queue_dirty
+ return new.empty() and dirty.empty()
# public methods: IMessageProducer
- def push(self, item):
+ def push(self, item, state=None):
"""
Push a new item in the queue.
@@ -150,7 +157,14 @@ class MessageProducer(object):
"""
# XXX this might raise if the queue does not accept any new
# items. what to do then?
- self._queue.put(item)
+ queue = self._queue_new
+
+ if state == self.STATE_NEW:
+ queue = self._queue_new
+ if state == self.STATE_DIRTY:
+ queue = self._queue_dirty
+
+ queue.put(item)
self.start()
def start(self):
diff --git a/src/leap/mail/utils.py b/src/leap/mail/utils.py
index 942acfb..fed24b3 100644
--- a/src/leap/mail/utils.py
+++ b/src/leap/mail/utils.py
@@ -17,10 +17,10 @@
"""
Mail utilities.
"""
-import copy
import json
import re
import traceback
+import Queue
from leap.soledad.common.document import SoledadDocument
@@ -49,7 +49,7 @@ def empty(thing):
thing = thing.content
try:
return len(thing) == 0
- except ReferenceError:
+ except (ReferenceError, TypeError):
return True
@@ -94,6 +94,7 @@ def lowerdict(_dict):
PART_MAP = "part_map"
+PHASH = "phash"
def _str_dict(d, k):
@@ -130,6 +131,103 @@ def stringify_parts_map(d):
return d
+def phash_iter(d):
+ """
+ A recursive generator that extracts all the payload-hashes
+ from an arbitrary nested parts-map dictionary.
+
+ :param d: the dictionary to walk
+ :type d: dictionary
+ :return: a list of all the phashes found
+ :rtype: list
+ """
+ if PHASH in d:
+ yield d[PHASH]
+ if PART_MAP in d:
+ for key in d[PART_MAP]:
+ for phash in phash_iter(d[PART_MAP][key]):
+ yield phash
+
+
+def accumulator(fun, lim):
+ """
+ A simple accumulator that uses a closure and a mutable
+ object to collect items.
+ When the count of items is greater than `lim`, the
+ collection is flushed after invoking a map of the function `fun`
+ over it.
+
+ The returned accumulator can also be flushed at any moment
+ by passing a boolean as a second parameter.
+
+ :param fun: the function to call over the collection
+ when its size is greater than `lim`
+ :type fun: callable
+ :param lim: the turning point for the collection
+ :type lim: int
+ :rtype: function
+
+ >>> from pprint import pprint
+ >>> acc = accumulator(pprint, 2)
+ >>> acc(1)
+ >>> acc(2)
+ [1, 2]
+ >>> acc(3)
+ >>> acc(4)
+ [3, 4]
+ >>> acc = accumulator(pprint, 5)
+ >>> acc(1)
+ >>> acc(2)
+ >>> acc(3)
+ >>> acc(None, flush=True)
+ [1,2,3]
+ """
+ KEY = "items"
+ _o = {KEY: []}
+
+ def _accumulator(item, flush=False):
+ collection = _o[KEY]
+ collection.append(item)
+ if len(collection) >= lim or flush:
+ map(fun, filter(None, collection))
+ _o[KEY] = []
+
+ return _accumulator
+
+
+def accumulator_queue(fun, lim):
+ """
+ A version of the accumulator that uses a queue.
+
+ When the count of items is greater than `lim`, the
+ queue is flushed after invoking the function `fun`
+ over its items.
+
+ The returned accumulator can also be flushed at any moment
+ by passing a boolean as a second parameter.
+
+ :param fun: the function to call over the collection
+ when its size is greater than `lim`
+ :type fun: callable
+ :param lim: the turning point for the collection
+ :type lim: int
+ :rtype: function
+ """
+ _q = Queue.Queue()
+
+ def _accumulator(item, flush=False):
+ _q.put(item)
+ if _q.qsize() >= lim or flush:
+ collection = [_q.get() for i in range(_q.qsize())]
+ map(fun, filter(None, collection))
+
+ return _accumulator
+
+
+#
+# String manipulation
+#
+
class CustomJsonScanner(object):
"""
This class is a context manager definition used to monkey patch the default
@@ -169,6 +267,8 @@ class CustomJsonScanner(object):
if not monkey_patched:
return self._orig_scanstring(s, idx, *args, **kwargs)
+ # TODO profile to see if a compiled regex can get us some
+ # benefit here.
found = False
end = s.find("\"", idx)
while not found: