From 0f6a8e1c83995cffec51e81f626d4bb29d4f7345 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 31 Jan 2014 03:34:03 -0400 Subject: properly implement deferreds in several commands Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha! --- src/leap/mail/imap/mailbox.py | 112 +++++++++++++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 23 deletions(-) (limited to 'src/leap/mail/imap/mailbox.py') diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 802ebf3..79fb476 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -27,6 +27,7 @@ import os from collections import defaultdict from twisted.internet import defer +from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -35,7 +36,7 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection @@ -51,6 +52,11 @@ notifying clients of new messages. Use during stress tests. NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +class MessageCopyError(Exception): + """ + """ + + class SoledadMailbox(WithMsgFields, MBoxParser): """ A Soledad-backed IMAP mailbox. @@ -534,7 +540,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred + @deferred_to_thread #@profile def fetch(self, messages_asked, uid): """ @@ -574,7 +580,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result = ((msgid, getmsg(msgid)) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -615,10 +621,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self.messages.all_flags() result = ((msgid, flagsPart( - msgid, all_flags[msgid])) for msgid in seq_messg) + msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -698,28 +704,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise they are message sequence IDs. :type uid: bool - :return: A dict mapping message sequence numbers to sequences of - str representing the flags set on the message after this - operation has been performed. - :rtype: dict + :return: A deferred, that will be called with a dict mapping message + sequence numbers to sequences of str representing the flags + set on the message after this operation has been performed. + :rtype: deferred :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ + from twisted.internet import reactor + if not self.isWriteable(): + log.msg('read only mailbox!') + raise imap4.ReadOnlyMailbox + + d = defer.Deferred() + deferLater(reactor, 0, self._do_store, messages_asked, flags, + mode, uid, d) + return d + + def _do_store(self, messages_asked, flags, mode, uid, observer): + """ + Helper method, invoke set_flags method in the MessageCollection. + + See the documentation for the `store` method for the parameters. + + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag. + # XXX we should prevent cclient from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) - messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - - if not self.isWriteable(): - log.msg('read only mailbox!') - raise imap4.ReadOnlyMailbox - - return self.messages.set_flags(self.mbox, seq_messg, flags, mode) + self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -767,13 +788,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier - #@deferred - #@profile - def copy(self, messageObject): + def copy(self, message): """ Copy the given message object into this mailbox. - """ - msg = messageObject + + :param message: an IMessage implementor + :type message: LeapMessage + :return: a deferred that will be fired with the message + uid when the copy succeed. + :rtype: Deferred + """ + from twisted.internet import reactor + print "COPY :", message + d = defer.Deferred() + + # XXX this should not happen ... track it down, + # probably to FETCH... + if message is None: + log.msg("BUG: COPY found a None in passed message") + d.calback(None) + deferLater(reactor, 0, self._do_copy, message, d) + return d + + #@profile + def _do_copy(self, message, observer): + """ + Call invoked from the deferLater in `copy`. This will + copy the flags and header documents, and pass them to the + `create_message` method in the MemoryStore, together with + the observer deferred that we've been passed along. + + :param message: an IMessage implementor + :type message: LeapMessage + :param observer: the deferred that will fire with the + UID of the message + :type observer: Deferred + """ + # XXX for clarity, this could be delegated to a + # MessageCollection mixin that implements copy too, and + # moved out of here. + msg = message memstore = self._memstore # XXX should use a public api instead @@ -785,12 +839,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser): new_fdoc = copy.deepcopy(fdoc.content) fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] + + # XXX is this hitting the db??? --- probably. + # We should profile after the pre-fetch. dest_fdoc = memstore.get_fdoc_from_chash( fdoc_chash, self.mbox) exist = dest_fdoc and not empty(dest_fdoc.content) if exist: + # Should we signal error on the callback? logger.warning("Destination message already exists!") + + # XXX I'm still not clear if we should raise the + # callback. This actually rases an ugly warning + # in some muas like thunderbird. I guess the user does + # not deserve that. + #observer.errback(MessageCopyError("Already exists!")) + observer.callback(True) else: mbox = self.mbox uid_next = memstore.increment_last_soledad_uid(mbox) @@ -799,10 +864,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # FIXME set recent! - return self._memstore.create_message( + self._memstore.create_message( self.mbox, uid_next, MessageWrapper( new_fdoc, hdoc.content), + observer=observer, notify_on_disk=False) # convenience fun -- cgit v1.2.3