summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/messages.py
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-01-31 03:34:03 -0400
committerKali Kaneko <kali@leap.se>2014-01-31 11:24:50 -0400
commit0f6a8e1c83995cffec51e81f626d4bb29d4f7345 (patch)
tree97d068f620403b4042058fcb6d6566c06a910177 /src/leap/mail/imap/messages.py
parentff7de0c9bc760e097c0286d2d62a19095be3f35e (diff)
properly implement deferreds in several commands
Passing along a deferred as an observer whose callback will be called with the proper result. Returning to thread in the appropiate points. just let's remember that twisted APIs are not thread safe! SoledadStore process_item also properly returned to thread. Changed @deferred to @deferred_to_thread so it results less confusing to read. "know the territory". aha!
Diffstat (limited to 'src/leap/mail/imap/messages.py')
-rw-r--r--src/leap/mail/imap/messages.py133
1 files changed, 82 insertions, 51 deletions
diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py
index 0e5c74a..03dde29 100644
--- a/src/leap/mail/imap/messages.py
+++ b/src/leap/mail/imap/messages.py
@@ -37,7 +37,7 @@ from leap.common.decorators import memoized_method
from leap.common.mail import get_email_charset
from leap.mail import walk
from leap.mail.utils import first, find_charset, lowerdict, empty
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.index import IndexedDB
from leap.mail.imap.fields import fields, WithMsgFields
from leap.mail.imap.memorystore import MessageWrapper
@@ -243,30 +243,30 @@ class LeapMessage(fields, MailParser, MBoxParser):
REMOVE = -1
SET = 0
- with self.flags_lock:
- current = doc.content[self.FLAGS_KEY]
- if mode == APPEND:
- newflags = tuple(set(tuple(current) + flags))
- elif mode == REMOVE:
- newflags = tuple(set(current).difference(set(flags)))
- elif mode == SET:
- newflags = flags
-
- # We could defer this, but I think it's better
- # to put it under the lock...
- doc.content[self.FLAGS_KEY] = newflags
- doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
- doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
-
- if self._collection.memstore is not None:
- log.msg("putting message in collection")
- self._collection.memstore.put_message(
- self._mbox, self._uid,
- MessageWrapper(fdoc=doc.content, new=False, dirty=True,
- docs_id={'fdoc': doc.doc_id}))
- else:
- # fallback for non-memstore initializations.
- self._soledad.put_doc(doc)
+ #with self.flags_lock:
+ current = doc.content[self.FLAGS_KEY]
+ if mode == APPEND:
+ newflags = tuple(set(tuple(current) + flags))
+ elif mode == REMOVE:
+ newflags = tuple(set(current).difference(set(flags)))
+ elif mode == SET:
+ newflags = flags
+
+ # We could defer this, but I think it's better
+ # to put it under the lock...
+ doc.content[self.FLAGS_KEY] = newflags
+ doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags
+ doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags
+
+ if self._collection.memstore is not None:
+ log.msg("putting message in collection")
+ self._collection.memstore.put_message(
+ self._mbox, self._uid,
+ MessageWrapper(fdoc=doc.content, new=False, dirty=True,
+ docs_id={'fdoc': doc.doc_id}))
+ else:
+ # fallback for non-memstore initializations.
+ self._soledad.put_doc(doc)
return map(str, newflags)
def getInternalDate(self):
@@ -457,8 +457,8 @@ class LeapMessage(fields, MailParser, MBoxParser):
:rtype: Any object implementing C{IMessagePart}.
:return: The specified sub-part.
"""
- if not self.isMultipart():
- raise TypeError
+ #if not self.isMultipart():
+ #raise TypeError
try:
pmap_dict = self._get_part_from_parts_map(part + 1)
except KeyError:
@@ -846,14 +846,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
else:
return False
- # not deferring to thread cause this now uses deferred asa retval
- #@deferred
#@profile
def add_msg(self, raw, subject=None, flags=None, date=None, uid=None,
notify_on_disk=False):
"""
Creates a new message document.
- Here lives the magic of the leap mail. Well, in soledad, really.
:param raw: the raw message
:type raw: str
@@ -869,6 +866,31 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:param uid: the message uid for this mailbox
:type uid: int
+
+ :return: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :rtype: deferred
+ """
+ logger.debug('adding message')
+ if flags is None:
+ flags = tuple()
+ leap_assert_type(flags, tuple)
+
+ d = defer.Deferred()
+ self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
+ return d
+
+ @deferred_to_thread
+ def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
+ """
+ Helper that creates a new message document.
+ Here lives the magic of the leap mail. Well, in soledad, really.
+
+ See `add_msg` docstring for parameter info.
+
+ :param observer: a deferred that will be fired with the message
+ uid when the adding succeed.
+ :type observer: deferred
"""
# TODO signal that we can delete the original message!-----
# when all the processing is done.
@@ -876,11 +898,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO add the linked-from info !
# TODO add reference to the original message
- logger.debug('adding message')
- if flags is None:
- flags = tuple()
- leap_assert_type(flags, tuple)
-
# parse
msg, chash, size, multi = self._do_parse(raw)
@@ -918,16 +935,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.set_recent_flag(uid)
- # Saving ----------------------------------------
# TODO ---- add reference to original doc, to be deleted
# after writes are done.
msg_container = MessageWrapper(fd, hd, cdocs)
- # we return a deferred that by default will be triggered
- # inmediately.
- d = self.memstore.create_message(self.mbox, uid, msg_container,
- notify_on_disk=notify_on_disk)
- return d
+ self.memstore.create_message(self.mbox, uid, msg_container,
+ observer=observer,
+ notify_on_disk=notify_on_disk)
#
# getters: specific queries
@@ -1030,7 +1044,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.recent_flags.difference_update(
set([uid]))
- @deferred
+ @deferred_to_thread
def set_recent_flag(self, uid):
"""
Set Recent flag for a given uid.
@@ -1080,7 +1094,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
return None
return fdoc.content.get(fields.UID_KEY, None)
- @deferred
+ @deferred_to_thread
def _get_uid_from_msgid(self, msgid):
"""
Return a UID for a given message-id.
@@ -1100,7 +1114,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
return self._get_uid_from_msgidCb(msgid)
#@profile
- def set_flags(self, mbox, messages, flags, mode):
+ def set_flags(self, mbox, messages, flags, mode, observer):
"""
Set flags for a sequence of messages.
@@ -1112,16 +1126,33 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
:type flags: tuple
:param mode: the mode for setting. 1 is append, -1 is remove, 0 set.
:type mode: int
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
"""
- result = {}
+ # XXX we could defer *this* to thread pool, and gather results...
+ # XXX use deferredList
+
+ deferreds = []
for msg_id in messages:
- log.msg("MSG ID = %s" % msg_id)
- msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
- if not msg:
- continue
- result[msg_id] = msg.setFlags(flags, mode)
+ deferreds.append(
+ self._set_flag_for_uid(msg_id, flags, mode))
- return result
+ def notify(result):
+ observer.callback(dict(result))
+ d1 = defer.gatherResults(deferreds, consumeErrors=True)
+ d1.addCallback(notify)
+
+ @deferred_to_thread
+ def _set_flag_for_uid(self, msg_id, flags, mode):
+ """
+ Run the set_flag operation in the thread pool.
+ """
+ log.msg("MSG ID = %s" % msg_id)
+ msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True)
+ if msg is not None:
+ return msg_id, msg.setFlags(flags, mode)
# getters: generic for a mailbox