summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-02-05 23:44:23 -0400
committerKali Kaneko <kali@leap.se>2014-02-17 11:37:03 -0400
commit553e5e27495f71cb5721b715fcae8561d37cc305 (patch)
treea9a84668cea722726584b2da59b80bd90fd2efbd
parent354dbdff54c136a54d11e24ea7cfc88f360a4a50 (diff)
defer parse to thread
-rw-r--r--mail/src/leap/mail/imap/memorystore.py4
-rw-r--r--mail/src/leap/mail/imap/messages.py72
2 files changed, 29 insertions, 47 deletions
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py
index d0321ae7..8dedddaf 100644
--- a/mail/src/leap/mail/imap/memorystore.py
+++ b/mail/src/leap/mail/imap/memorystore.py
@@ -230,6 +230,8 @@ class MemoryStore(object):
be fired.
:type notify_on_disk: bool
"""
+ from twisted.internet import reactor
+
log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))
key = mbox, uid
@@ -251,7 +253,7 @@ class MemoryStore(object):
if not notify_on_disk:
# Caller does not care, just fired and forgot, so we pass
# a defer that will inmediately have its callback triggered.
- observer.callback(uid)
+ reactor.callLater(0, observer.callback, uid)
def put_message(self, mbox, uid, message, notify_on_disk=True):
"""
diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py
index 25fc55fe..89beaaa7 100644
--- a/mail/src/leap/mail/imap/messages.py
+++ b/mail/src/leap/mail/imap/messages.py
@@ -78,7 +78,7 @@ def try_unique_query(curried):
# TODO we could take action, like trigger a background
# process to kill dupes.
name = getattr(curried, 'expected', 'doc')
- logger.warning(
+ logger.debug(
"More than one %s found for this mbox, "
"we got a duplicate!!" % (name,))
return query.pop()
@@ -720,9 +720,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# ensure that we have a recent-flags and a hdocs-sec doc
self._get_or_create_rdoc()
- # Not for now...
- #self._get_or_create_hdocset()
-
def _get_empty_doc(self, _type=FLAGS_DOC):
"""
Returns an empty doc for storing different message parts.
@@ -758,21 +755,26 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
hdocset[fields.MBOX_KEY] = self.mbox
self._soledad.create_doc(hdocset)
+ @deferred_to_thread
def _do_parse(self, raw):
"""
Parse raw message and return it along with
relevant information about its outer level.
+ This is done in a separate thread, and the callback is passed
+ to `_do_add_msg` method.
+
:param raw: the raw message
:type raw: StringIO or basestring
- :return: msg, chash, size, multi
+ :return: msg, parts, chash, size, multi
:rtype: tuple
"""
msg = self._get_parsed_msg(raw)
chash = self._get_hash(msg)
size = len(msg.as_string())
multi = msg.is_multipart()
- return msg, chash, size, multi
+ parts = walk.get_parts(msg)
+ return msg, parts, chash, size, multi
def _populate_flags(self, flags, uid, chash, size, multi):
"""
@@ -879,19 +881,25 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
flags = tuple()
leap_assert_type(flags, tuple)
- d = defer.Deferred()
- self._do_add_msg(raw, flags, subject, date, notify_on_disk, d)
- return d
+ observer = defer.Deferred()
+
+ d = self._do_parse(raw)
+ d.addCallback(self._do_add_msg, flags, subject, date,
+ notify_on_disk, observer)
+ return observer
- # We SHOULD defer this (or the heavy load here) to the thread pool,
+ # We SHOULD defer the heavy load here) to the thread pool,
# but it gives troubles with the QSocketNotifier used by Qt...
- def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer):
+ def _do_add_msg(self, parse_result, flags, subject,
+ date, notify_on_disk, observer):
"""
Helper that creates a new message document.
Here lives the magic of the leap mail. Well, in soledad, really.
See `add_msg` docstring for parameter info.
+ :param parse_result: a tuple with the results of `self._do_parse`
+ :type parse_result: tuple
:param observer: a deferred that will be fired with the message
uid when the adding succeed.
:type observer: deferred
@@ -902,26 +910,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
# TODO add the linked-from info !
# TODO add reference to the original message
- # parse
- msg, chash, size, multi = self._do_parse(raw)
+ from twisted.internet import reactor
+ msg, parts, chash, size, multi = parse_result
# check for uniqueness --------------------------------
- # XXX profiler says that this test is costly.
- # So we probably should just do an in-memory check and
- # move the complete check to the soledad writer?
# Watch out! We're reserving a UID right after this!
existing_uid = self._fdoc_already_exists(chash)
if existing_uid:
- logger.warning("We already have that message in this "
- "mailbox, unflagging as deleted")
uid = existing_uid
msg = self.get_msg_by_uid(uid)
- msg.setFlags((fields.DELETED_FLAG,), -1)
-
- # XXX if this is deferred to thread again we should not use
- # the callback in the deferred thread, but return and
- # call the callback from the caller fun...
- observer.callback(uid)
+ reactor.callLater(0, msg.setFlags, (fields.DELETED_FLAG,), -1)
+ reactor.callLater(0, observer.callback, uid)
return
uid = self.memstore.increment_last_soledad_uid(self.mbox)
@@ -930,7 +929,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
fd = self._populate_flags(flags, uid, chash, size, multi)
hd = self._populate_headr(msg, chash, subject, date)
- parts = walk.get_parts(msg)
body_phash_fun = [walk.get_body_phash_simple,
walk.get_body_phash_multi][int(multi)]
body_phash = body_phash_fun(walk.get_payloads(msg))
@@ -949,9 +947,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
self.set_recent_flag(uid)
msg_container = MessageWrapper(fd, hd, cdocs)
- self.memstore.create_message(self.mbox, uid, msg_container,
- observer=observer,
- notify_on_disk=notify_on_disk)
+ self.memstore.create_message(
+ self.mbox, uid, msg_container,
+ observer=observer, notify_on_disk=notify_on_disk)
#
# getters: specific queries
@@ -982,14 +980,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
{'doc_id': rdoc.doc_id, 'set': rflags})
return rflags
- #else:
- # fallback for cases without memory store
- #with self._rdoc_lock:
- #rdoc = self._get_recent_doc()
- #self.__rflags = set(rdoc.content.get(
- #fields.RECENTFLAGS_KEY, []))
- #return self.__rflags
-
def _set_recent_flags(self, value):
"""
Setter for the recent-flags set for this mailbox.
@@ -997,16 +987,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):
if self.memstore is not None:
self.memstore.set_recent_flags(self.mbox, value)
- #else:
- # fallback for cases without memory store
- #with self._rdoc_lock:
- #rdoc = self._get_recent_doc()
- #newv = set(value)
- #self.__rflags = newv
- #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv)
- # XXX should deferLater 0 it?
- #self._soledad.put_doc(rdoc)
-
recent_flags = property(
_get_recent_flags, _set_recent_flags,
doc="Set of UIDs with the recent flag for this mailbox.")