summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/mailbox.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-31 03:34:03 -0400
committerKali Kaneko <kali@leap.se>2014-01-31 11:24:50 -0400
commit0f6a8e1c83995cffec51e81f626d4bb29d4f7345 (patch)
tree97d068f620403b4042058fcb6d6566c06a910177 /src/leap/mail/imap/mailbox.py
parentff7de0c9bc760e097c0286d2d62a19095be3f35e (diff)
properly implement deferreds in several commands
Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha!
Diffstat (limited to 'src/leap/mail/imap/mailbox.py')
-rw-r--r--src/leap/mail/imap/mailbox.py112
1 files changed, 89 insertions, 23 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 802ebf3..79fb476 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -27,6 +27,7 @@ import os
from collections import defaultdict
from twisted.internet import defer
+from twisted.internet.task import deferLater
from twisted.python import log
from twisted.mail import imap4
@@ -35,7 +36,7 @@ from zope.interface import implements
from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.utils import empty
from leap.mail.imap.fields import WithMsgFields, fields
from leap.mail.imap.messages import MessageCollection
@@ -51,6 +52,11 @@ notifying clients of new messages. Use during stress tests.
NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
+class MessageCopyError(Exception):
+ """
+ """
+
+
class SoledadMailbox(WithMsgFields, MBoxParser):
"""
A Soledad-backed IMAP mailbox.
@@ -534,7 +540,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
seq_messg = set_asked.intersection(set_exist)
return seq_messg
- @deferred
+ @deferred_to_thread
#@profile
def fetch(self, messages_asked, uid):
"""
@@ -574,7 +580,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -615,10 +621,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
all_flags = self.messages.all_flags()
result = ((msgid, flagsPart(
- msgid, all_flags[msgid])) for msgid in seq_messg)
+ msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_headers(self, messages_asked, uid):
"""
A fast method to fetch all headers, tricking just the
@@ -698,28 +704,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise they are message sequence IDs.
:type uid: bool
- :return: A dict mapping message sequence numbers to sequences of
- str representing the flags set on the message after this
- operation has been performed.
- :rtype: dict
+ :return: A deferred, that will be called with a dict mapping message
+ sequence numbers to sequences of str representing the flags
+ set on the message after this operation has been performed.
+ :rtype: deferred
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
+ from twisted.internet import reactor
+ if not self.isWriteable():
+ log.msg('read only mailbox!')
+ raise imap4.ReadOnlyMailbox
+
+ d = defer.Deferred()
+ deferLater(reactor, 0, self._do_store, messages_asked, flags,
+ mode, uid, d)
+ return d
+
+ def _do_store(self, messages_asked, flags, mode, uid, observer):
+ """
+ Helper method, invoke set_flags method in the MessageCollection.
+
+ See the documentation for the `store` method for the parameters.
+
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
# XXX implement also sequence (uid = 0)
- # XXX we should prevent cclient from setting Recent flag.
+ # XXX we should prevent cclient from setting Recent flag?
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
-
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
-
- if not self.isWriteable():
- log.msg('read only mailbox!')
- raise imap4.ReadOnlyMailbox
-
- return self.messages.set_flags(self.mbox, seq_messg, flags, mode)
+ self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)
# ISearchableMailbox
@@ -767,13 +788,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# IMessageCopier
- #@deferred
- #@profile
- def copy(self, messageObject):
+ def copy(self, message):
"""
Copy the given message object into this mailbox.
- """
- msg = messageObject
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :return: a deferred that will be fired with the message
+ uid when the copy succeed.
+ :rtype: Deferred
+ """
+ from twisted.internet import reactor
+ print "COPY :", message
+ d = defer.Deferred()
+
+ # XXX this should not happen ... track it down,
+ # probably to FETCH...
+ if message is None:
+ log.msg("BUG: COPY found a None in passed message")
+ d.calback(None)
+ deferLater(reactor, 0, self._do_copy, message, d)
+ return d
+
+ #@profile
+ def _do_copy(self, message, observer):
+ """
+ Call invoked from the deferLater in `copy`. This will
+ copy the flags and header documents, and pass them to the
+ `create_message` method in the MemoryStore, together with
+ the observer deferred that we've been passed along.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :param observer: the deferred that will fire with the
+ UID of the message
+ :type observer: Deferred
+ """
+ # XXX for clarity, this could be delegated to a
+ # MessageCollection mixin that implements copy too, and
+ # moved out of here.
+ msg = message
memstore = self._memstore
# XXX should use a public api instead
@@ -785,12 +839,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
new_fdoc = copy.deepcopy(fdoc.content)
fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
+
+ # XXX is this hitting the db??? --- probably.
+ # We should profile after the pre-fetch.
dest_fdoc = memstore.get_fdoc_from_chash(
fdoc_chash, self.mbox)
exist = dest_fdoc and not empty(dest_fdoc.content)
if exist:
+ # Should we signal error on the callback?
logger.warning("Destination message already exists!")
+
+ # XXX I'm still not clear if we should raise the
+ # callback. This actually rases an ugly warning
+ # in some muas like thunderbird. I guess the user does
+ # not deserve that.
+ #observer.errback(MessageCopyError("Already exists!"))
+ observer.callback(True)
else:
mbox = self.mbox
uid_next = memstore.increment_last_soledad_uid(mbox)
@@ -799,10 +864,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# FIXME set recent!
- return self._memstore.create_message(
+ self._memstore.create_message(
self.mbox, uid_next,
MessageWrapper(
new_fdoc, hdoc.content),
+ observer=observer,
notify_on_disk=False)
# convenience fun