diff options
author | Kali Kaneko <kali@leap.se> | 2014-02-05 23:44:23 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:37:03 -0400 |
commit | 553e5e27495f71cb5721b715fcae8561d37cc305 (patch) | |
tree | a9a84668cea722726584b2da59b80bd90fd2efbd /mail | |
parent | 354dbdff54c136a54d11e24ea7cfc88f360a4a50 (diff) |
defer parse to thread
Diffstat (limited to 'mail')
-rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 4 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messages.py | 72 |
2 files changed, 29 insertions, 47 deletions
diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index d0321ae..8deddda 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -230,6 +230,8 @@ class MemoryStore(object): be fired. :type notify_on_disk: bool """ + from twisted.internet import reactor + log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) key = mbox, uid @@ -251,7 +253,7 @@ class MemoryStore(object): if not notify_on_disk: # Caller does not care, just fired and forgot, so we pass # a defer that will inmediately have its callback triggered. - observer.callback(uid) + reactor.callLater(0, observer.callback, uid) def put_message(self, mbox, uid, message, notify_on_disk=True): """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 25fc55f..89beaaa 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -78,7 +78,7 @@ def try_unique_query(curried): # TODO we could take action, like trigger a background # process to kill dupes. name = getattr(curried, 'expected', 'doc') - logger.warning( + logger.debug( "More than one %s found for this mbox, " "we got a duplicate!!" % (name,)) return query.pop() @@ -720,9 +720,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() - # Not for now... - #self._get_or_create_hdocset() - def _get_empty_doc(self, _type=FLAGS_DOC): """ Returns an empty doc for storing different message parts. @@ -758,21 +755,26 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): hdocset[fields.MBOX_KEY] = self.mbox self._soledad.create_doc(hdocset) + @deferred_to_thread def _do_parse(self, raw): """ Parse raw message and return it along with relevant information about its outer level. + This is done in a separate thread, and the callback is passed + to `_do_add_msg` method. + :param raw: the raw message :type raw: StringIO or basestring - :return: msg, chash, size, multi + :return: msg, parts, chash, size, multi :rtype: tuple """ msg = self._get_parsed_msg(raw) chash = self._get_hash(msg) size = len(msg.as_string()) multi = msg.is_multipart() - return msg, chash, size, multi + parts = walk.get_parts(msg) + return msg, parts, chash, size, multi def _populate_flags(self, flags, uid, chash, size, multi): """ @@ -879,19 +881,25 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): flags = tuple() leap_assert_type(flags, tuple) - d = defer.Deferred() - self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) - return d + observer = defer.Deferred() + + d = self._do_parse(raw) + d.addCallback(self._do_add_msg, flags, subject, date, + notify_on_disk, observer) + return observer - # We SHOULD defer this (or the heavy load here) to the thread pool, + # We SHOULD defer the heavy load here) to the thread pool, # but it gives troubles with the QSocketNotifier used by Qt... - def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + def _do_add_msg(self, parse_result, flags, subject, + date, notify_on_disk, observer): """ Helper that creates a new message document. Here lives the magic of the leap mail. Well, in soledad, really. See `add_msg` docstring for parameter info. + :param parse_result: a tuple with the results of `self._do_parse` + :type parse_result: tuple :param observer: a deferred that will be fired with the message uid when the adding succeed. :type observer: deferred @@ -902,26 +910,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): # TODO add the linked-from info ! # TODO add reference to the original message - # parse - msg, chash, size, multi = self._do_parse(raw) + from twisted.internet import reactor + msg, parts, chash, size, multi = parse_result # check for uniqueness -------------------------------- - # XXX profiler says that this test is costly. - # So we probably should just do an in-memory check and - # move the complete check to the soledad writer? # Watch out! We're reserving a UID right after this! existing_uid = self._fdoc_already_exists(chash) if existing_uid: - logger.warning("We already have that message in this " - "mailbox, unflagging as deleted") uid = existing_uid msg = self.get_msg_by_uid(uid) - msg.setFlags((fields.DELETED_FLAG,), -1) - - # XXX if this is deferred to thread again we should not use - # the callback in the deferred thread, but return and - # call the callback from the caller fun... - observer.callback(uid) + reactor.callLater(0, msg.setFlags, (fields.DELETED_FLAG,), -1) + reactor.callLater(0, observer.callback, uid) return uid = self.memstore.increment_last_soledad_uid(self.mbox) @@ -930,7 +929,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) - parts = walk.get_parts(msg) body_phash_fun = [walk.get_body_phash_simple, walk.get_body_phash_multi][int(multi)] body_phash = body_phash_fun(walk.get_payloads(msg)) @@ -949,9 +947,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): self.set_recent_flag(uid) msg_container = MessageWrapper(fd, hd, cdocs) - self.memstore.create_message(self.mbox, uid, msg_container, - observer=observer, - notify_on_disk=notify_on_disk) + self.memstore.create_message( + self.mbox, uid, msg_container, + observer=observer, notify_on_disk=notify_on_disk) # # getters: specific queries @@ -982,14 +980,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): {'doc_id': rdoc.doc_id, 'set': rflags}) return rflags - #else: - # fallback for cases without memory store - #with self._rdoc_lock: - #rdoc = self._get_recent_doc() - #self.__rflags = set(rdoc.content.get( - #fields.RECENTFLAGS_KEY, [])) - #return self.__rflags - def _set_recent_flags(self, value): """ Setter for the recent-flags set for this mailbox. @@ -997,16 +987,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): if self.memstore is not None: self.memstore.set_recent_flags(self.mbox, value) - #else: - # fallback for cases without memory store - #with self._rdoc_lock: - #rdoc = self._get_recent_doc() - #newv = set(value) - #self.__rflags = newv - #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) - # XXX should deferLater 0 it? - #self._soledad.put_doc(rdoc) - recent_flags = property( _get_recent_flags, _set_recent_flags, doc="Set of UIDs with the recent flag for this mailbox.") |