summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/mail/decorators.py2
-rw-r--r--src/leap/mail/imap/fetch.py4
-rw-r--r--src/leap/mail/imap/mailbox.py112
-rw-r--r--src/leap/mail/imap/memorystore.py43
-rw-r--r--src/leap/mail/imap/messages.py133
-rw-r--r--src/leap/mail/imap/soledadstore.py99
6 files changed, 264 insertions, 129 deletions
diff --git a/src/leap/mail/decorators.py b/src/leap/mail/decorators.py
index d5eac97..ae115f8 100644
--- a/src/leap/mail/decorators.py
+++ b/src/leap/mail/decorators.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
# See this answer: http://stackoverflow.com/a/19019648/1157664
# And the notes by glyph and jpcalderone
-def deferred(f):
+def deferred_to_thread(f):
"""
Decorator, for deferring methods to Threads.
diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py
index 817ad6a..40dadb3 100644
--- a/src/leap/mail/imap/fetch.py
+++ b/src/leap/mail/imap/fetch.py
@@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import json_loads
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
@@ -199,7 +199,7 @@ class LeapIncomingMail(object):
logger.exception(failure.value)
traceback.print_tb(*sys.exc_info())
- @deferred
+ @deferred_to_thread
def _sync_soledad(self):
"""
Synchronizes with remote soledad.
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 802ebf3..79fb476 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -27,6 +27,7 @@ import os
from collections import defaultdict
from twisted.internet import defer
+from twisted.internet.task import deferLater
from twisted.python import log
from twisted.mail import imap4
@@ -35,7 +36,7 @@ from zope.interface import implements
from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import empty
from leap.mail.imap.fields import WithMsgFields, fields
from leap.mail.imap.messages import MessageCollection
@@ -51,6 +52,11 @@ notifying clients of new messages. Use during stress tests.
NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
+class MessageCopyError(Exception):
+ """
+ """
+
+
class SoledadMailbox(WithMsgFields, MBoxParser):
"""
A Soledad-backed IMAP mailbox.
@@ -534,7 +540,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
seq_messg = set_asked.intersection(set_exist)
return seq_messg
- @deferred
+ @deferred_to_thread
#@profile
def fetch(self, messages_asked, uid):
"""
@@ -574,7 +580,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -615,10 +621,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
all_flags = self.messages.all_flags()
result = ((msgid, flagsPart(
- msgid, all_flags[msgid])) for msgid in seq_messg)
+ msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_headers(self, messages_asked, uid):
"""
A fast method to fetch all headers, tricking just the
@@ -698,28 +704,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise they are message sequence IDs.
:type uid: bool
- :return: A dict mapping message sequence numbers to sequences of
- str representing the flags set on the message after this
- operation has been performed.
- :rtype: dict
+ :return: A deferred, that will be called with a dict mapping message
+ sequence numbers to sequences of str representing the flags
+ set on the message after this operation has been performed.
+ :rtype: deferred
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
+ from twisted.internet import reactor
+ if not self.isWriteable():
+ log.msg('read only mailbox!')
+ raise imap4.ReadOnlyMailbox
+
+ d = defer.Deferred()
+ deferLater(reactor, 0, self._do_store, messages_asked, flags,
+ mode, uid, d)
+ return d
+
+ def _do_store(self, messages_asked, flags, mode, uid, observer):
+ """
+ Helper method, invoke set_flags method in the MessageCollection.
+
+ See the documentation for the `store` method for the parameters.
+
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
# XXX implement also sequence (uid = 0)
- # XXX we should prevent cclient from setting Recent flag.
+ # XXX we should prevent cclient from setting Recent flag?
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
-
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
-
- if not self.isWriteable():
- log.msg('read only mailbox!')
- raise imap4.ReadOnlyMailbox
-
- return self.messages.set_flags(self.mbox, seq_messg, flags, mode)
+ self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)
# ISearchableMailbox
@@ -767,13 +788,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# IMessageCopier
- #@deferred
- #@profile
- def copy(self, messageObject):
+ def copy(self, message):
"""
Copy the given message object into this mailbox.
- """
- msg = messageObject
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :return: a deferred that will be fired with the message
+ uid when the copy succeed.
+ :rtype: Deferred
+ """
+ from twisted.internet import reactor
+ print "COPY :", message
+ d = defer.Deferred()
+
+ # XXX this should not happen ... track it down,
+ # probably to FETCH...
+ if message is None:
+ log.msg("BUG: COPY found a None in passed message")
+ d.calback(None)
+ deferLater(reactor, 0, self._do_copy, message, d)
+ return d
+
+ #@profile
+ def _do_copy(self, message, observer):
+ """
+ Call invoked from the deferLater in `copy`. This will
+ copy the flags and header documents, and pass them to the
+ `create_message` method in the MemoryStore, together with
+ the observer deferred that we've been passed along.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :param observer: the deferred that will fire with the
+ UID of the message
+ :type observer: Deferred
+ """
+ # XXX for clarity, this could be delegated to a
+ # MessageCollection mixin that implements copy too, and
+ # moved out of here.
+ msg = message
memstore = self._memstore
# XXX should use a public api instead
@@ -785,12 +839,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
new_fdoc = copy.deepcopy(fdoc.content)
fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
+
+ # XXX is this hitting the db??? --- probably.
+ # We should profile after the pre-fetch.
dest_fdoc = memstore.get_fdoc_from_chash(
fdoc_chash, self.mbox)
exist = dest_fdoc and not empty(dest_fdoc.content)
if exist:
+ # Should we signal error on the callback?
logger.warning("Destination message already exists!")
+
+ # XXX I'm still not clear if we should raise the
+ # callback. This actually rases an ugly warning
+ # in some muas like thunderbird. I guess the user does
+ # not deserve that.
+ #observer.errback(MessageCopyError("Already exists!"))
+ observer.callback(True)
else:
mbox = self.mbox
uid_next = memstore.increment_last_soledad_uid(mbox)
@@ -799,10 +864,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# FIXME set recent!
- return self._memstore.create_message(
+ self._memstore.create_message(
self.mbox, uid_next,
MessageWrapper(
new_fdoc, hdoc.content),
+ observer=observer,
notify_on_disk=False)
# convenience fun
diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py
index 217ad8e..211d282 100644
--- a/src/leap/mail/imap/memorystore.py
+++ b/src/leap/mail/imap/memorystore.py
@@ -32,7 +32,7 @@ from zope.interface import implements
from leap.common.check import leap_assert_type
from leap.mail import size
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import empty
from leap.mail.messageflow import MessageProducer
from leap.mail.imap import interfaces
@@ -200,7 +200,8 @@ class MemoryStore(object):
# We would have to add a put_flags operation to modify only
# the flags doc (and set the dirty flag accordingly)
- def create_message(self, mbox, uid, message, notify_on_disk=True):
+ def create_message(self, mbox, uid, message, observer,
+ notify_on_disk=True):
"""
Create the passed message into this MemoryStore.
@@ -212,38 +213,38 @@ class MemoryStore(object):
:type uid: int
:param message: a message to be added
:type message: MessageWrapper
- :param notify_on_disk: whether the deferred that is returned should
+ :param observer: the deferred that will fire with the
+ UID of the message. If notify_on_disk is True,
+ this will happen when the message is written to
+ Soledad. Otherwise it will fire as soon as we've
+ added the message to the memory store.
+ :type observer: Deferred
+ :param notify_on_disk: whether the `observer` deferred should
wait until the message is written to disk to
be fired.
:type notify_on_disk: bool
-
- :return: a Deferred. if notify_on_disk is True, will be fired
- when written to the db on disk.
- Otherwise will fire inmediately
- :rtype: Deferred
"""
log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
key = mbox, uid
self._add_message(mbox, uid, message, notify_on_disk)
-
- d = defer.Deferred()
- d.addCallback(lambda result: log.msg("message save: %s" % result))
self._new.add(key)
- # We store this deferred so we can keep track of the pending
- # operations internally.
- self._new_deferreds[key] = d
+ def log_add(result):
+ log.msg("message save: %s" % result)
+ return result
+ observer.addCallback(log_add)
if notify_on_disk:
- # Caller wants to be notified when the message is on disk
- # so we pass the deferred that will be fired when the message
- # has been written.
- return d
- else:
+ # We store this deferred so we can keep track of the pending
+ # operations internally.
+ # TODO this should fire with the UID !!! -- change that in
+ # the soledad store code.
+ self._new_deferreds[key] = observer
+ if not notify_on_disk:
# Caller does not care, just fired and forgot, so we pass
# a defer that will inmediately have its callback triggered.
- return defer.succeed('fire-and-forget:%s' % str(key))
+ observer.callback(uid)
def put_message(self, mbox, uid, message, notify_on_disk=True):
"""
@@ -541,7 +542,7 @@ class MemoryStore(object):
self.write_last_uid(mbox, value)
return value
- @deferred
+ @deferred_to_thread
def write_last_uid(self, mbox, value):
"""
Increment the soledad integer cache for the highest uid value.
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 0e5c74a..03dde29 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -37,7 +37,7 @@ from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
from leap.mail.utils import first, find_charset, lowerdict, empty
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
from leap.mail.imap.memorystore import MessageWrapper
@@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser):
REMOVE = -1
SET = 0
- with self.flags_lock:
- current = doc.content[self.FLAGS_KEY]
- if mode == APPEND:
- newflags = tuple(set(tuple(current) + flags))
- elif mode == REMOVE:
- newflags = tuple(set(current).difference(set(flags)))
- elif mode == SET:
- newflags = flags
-
- # We could defer this, but I think it's better
- # to put it under the lock...
- doc.content[self.FLAGS_KEY] = newflags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
-
- if self._collection.memstore is not None:
- log.msg("putting message in collection")
- self._collection.memstore.put_message(
- self._mbox, self._uid,
- MessageWrapper(fdoc=doc.content, new=False, dirty=True,
- docs_id={'fdoc': doc.doc_id}))
- else:
- # fallback for non-memstore initializations.
- self._soledad.put_doc(doc)
+ #with self.flags_lock:
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+
+ # We could defer this, but I think it's better
+ # to put it under the lock...
+ doc.content[self.FLAGS_KEY] = newflags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+
+ if self._collection.memstore is not None:
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
+ self._soledad.put_doc(doc)
return map(str, newflags)
def getInternalDate(self):
@@ -457,8 +457,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: Any object implementing C{IMessagePart}.
:return: The specified sub-part.
"""
- if not self.isMultipart():
- raise TypeError
+ #if not self.isMultipart():
+ #raise TypeError
try:
pmap_dict = self._get_part_from_parts_map(part + 1)
except KeyError:
@@ -846,14 +846,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
else:
return False
- # not deferring to thread cause this now uses deferred asa retval
- #@deferred
#@profile
def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
notify_on_disk=False):
"""
Creates a new message document.
- Here lives the magic of the leap mail. Well, in soledad, really.
:param raw: the raw message
:type raw: str
@@ -869,6 +866,31 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:param uid: the message uid for this mailbox
:type uid: int
+
+ :return: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :rtype: deferred
+ """
+ logger.debug('adding message')
+ if flags is None:
+ flags = tuple()
+ leap_assert_type(flags, tuple)
+
+ d = defer.Deferred()
+ self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
+ return d
+
+ @deferred_to_thread
+ def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
+ """
+ Helper that creates a new message document.
+ Here lives the magic of the leap mail. Well, in soledad, really.
+
+ See `add_msg` docstring for parameter info.
+
+ :param observer: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :type observer: deferred
"""
# TODO signal that we can delete the original message!-----
# when all the processing is done.
@@ -876,11 +898,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO add the linked-from info !
# TODO add reference to the original message
- logger.debug('adding message')
- if flags is None:
- flags = tuple()
- leap_assert_type(flags, tuple)
-
# parse
msg, chash, size, multi = self._do_parse(raw)
@@ -918,16 +935,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.set_recent_flag(uid)
- # Saving ----------------------------------------
# TODO ---- add reference to original doc, to be deleted
# after writes are done.
msg_container = MessageWrapper(fd, hd, cdocs)
- # we return a deferred that by default will be triggered
- # inmediately.
- d = self.memstore.create_message(self.mbox, uid, msg_container,
- notify_on_disk=notify_on_disk)
- return d
+ self.memstore.create_message(self.mbox, uid, msg_container,
+ observer=observer,
+ notify_on_disk=notify_on_disk)
#
# getters: specific queries
@@ -1030,7 +1044,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.recent_flags.difference_update(
set([uid]))
- @deferred
+ @deferred_to_thread
def set_recent_flag(self, uid):
"""
Set Recent flag for a given uid.
@@ -1080,7 +1094,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
return None
return fdoc.content.get(fields.UID_KEY, None)
- @deferred
+ @deferred_to_thread
def _get_uid_from_msgid(self, msgid):
"""
Return a UID for a given message-id.
@@ -1100,7 +1114,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
return self._get_uid_from_msgidCb(msgid)
#@profile
- def set_flags(self, mbox, messages, flags, mode):
+ def set_flags(self, mbox, messages, flags, mode, observer):
"""
Set flags for a sequence of messages.
@@ -1112,16 +1126,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:type flags: tuple
:param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
:type mode: int
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
"""
- result = {}
+ # XXX we could defer *this* to thread pool, and gather results...
+ # XXX use deferredList
+
+ deferreds = []
for msg_id in messages:
- log.msg("MSG ID = %s" % msg_id)
- msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
- if not msg:
- continue
- result[msg_id] = msg.setFlags(flags, mode)
+ deferreds.append(
+ self._set_flag_for_uid(msg_id, flags, mode))
- return result
+ def notify(result):
+ observer.callback(dict(result))
+ d1 = defer.gatherResults(deferreds, consumeErrors=True)
+ d1.addCallback(notify)
+
+ @deferred_to_thread
+ def _set_flag_for_uid(self, msg_id, flags, mode):
+ """
+ Run the set_flag operation in the thread pool.
+ """
+ log.msg("MSG ID = %s" % msg_id)
+ msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
+ if msg is not None:
+ return msg_id, msg.setFlags(flags, mode)
# getters: generic for a mailbox
diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py
index ff5e03b..82f27e7 100644
--- a/src/leap/mail/imap/soledadstore.py
+++ b/src/leap/mail/imap/soledadstore.py
@@ -23,10 +23,12 @@ import threading
from itertools import chain
from u1db import errors as u1db_errors
+from twisted.internet import defer
+from twisted.python import log
from zope.interface import implements
from leap.common.check import leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
@@ -209,52 +211,87 @@ class SoledadStore(ContentDedup):
# TODO could generalize this method into a generic consumer
# and only implement `process` here
+ def docWriteCallBack(doc_wrapper):
+ """
+ Callback for a successful write of a document wrapper.
+ """
+ if isinstance(doc_wrapper, MessageWrapper):
+ # If everything went well, we can unset the new flag
+ # in the source store (memory store)
+ self._unset_new_dirty(doc_wrapper)
+
+ def docWriteErrorBack(failure):
+ """
+ Errorback for write operations.
+ """
+ log.error("Error while processing item.")
+ log.msg(failure.getTraceBack())
+
while not queue.empty():
- items = self._process(queue)
+ doc_wrapper = queue.get()
+ d = defer.Deferred()
+ d.addCallbacks(docWriteCallBack, docWriteErrorBack)
+
+ self._consume_doc(doc_wrapper, d)
+
+ @deferred_to_thread
+ def _unset_new_dirty(self, doc_wrapper):
+ """
+ Unset the `new` and `dirty` flags for this document wrapper in the
+ memory store.
+
+ :param doc_wrapper: a MessageWrapper instance
+ :type doc_wrapper: MessageWrapper
+ """
+ # XXX debug msg id/mbox?
+ logger.info("unsetting new flag!")
+ doc_wrapper.new = False
+ doc_wrapper.dirty = False
- # we prime the generator, that should return the
- # message or flags wrapper item in the first place.
- doc_wrapper = items.next()
+ @deferred_to_thread
+ def _consume_doc(self, doc_wrapper, deferred):
+ """
+ Consume each document wrapper in a separate thread.
+
+ :param doc_wrapper:
+ :type doc_wrapper:
+ :param deferred:
+ :type deferred: Deferred
+ """
+ items = self._process(doc_wrapper)
- # From here, we unpack the subpart items and
- # the right soledad call.
+ # we prime the generator, that should return the
+ # message or flags wrapper item in the first place.
+ doc_wrapper = items.next()
+
+ # From here, we unpack the subpart items and
+ # the right soledad call.
+ failed = False
+ for item, call in items:
try:
- failed = False
- for item, call in items:
- try:
- self._try_call(call, item)
- except Exception as exc:
- failed = exc
- continue
- if failed:
- raise MsgWriteError
-
- except MsgWriteError:
- logger.error("Error while processing item.")
- logger.exception(failed)
- else:
- if isinstance(doc_wrapper, MessageWrapper):
- # If everything went well, we can unset the new flag
- # in the source store (memory store)
- logger.info("unsetting new flag!")
- doc_wrapper.new = False
- doc_wrapper.dirty = False
+ self._try_call(call, item)
+ except Exception as exc:
+ failed = exc
+ continue
+ if failed:
+ deferred.errback(MsgWriteError(
+ "There was an error writing the mesage"))
+ else:
+ deferred.callback(doc_wrapper)
#
# SoledadStore specific methods.
#
- def _process(self, queue):
+ def _process(self, doc_wrapper):
"""
- Return an iterator that will yield the msg_wrapper in the first place,
+ Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every
item in the queue, if any.
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
- doc_wrapper = queue.get()
-
if isinstance(doc_wrapper, MessageWrapper):
return chain((doc_wrapper,),
self._get_calls_for_msg_parts(doc_wrapper))