diff options
| author | Kali Kaneko <kali@leap.se> | 2014-02-05 23:44:23 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2014-02-17 11:37:03 -0400 | 
| commit | 553e5e27495f71cb5721b715fcae8561d37cc305 (patch) | |
| tree | a9a84668cea722726584b2da59b80bd90fd2efbd | |
| parent | 354dbdff54c136a54d11e24ea7cfc88f360a4a50 (diff) | |
defer parse to thread
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 4 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 72 | 
2 files changed, 29 insertions, 47 deletions
| diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py index d0321ae..8deddda 100644 --- a/mail/src/leap/mail/imap/memorystore.py +++ b/mail/src/leap/mail/imap/memorystore.py @@ -230,6 +230,8 @@ class MemoryStore(object):                                 be fired.          :type notify_on_disk: bool          """ +        from twisted.internet import reactor +          log.msg("adding new doc to memstore %r (%r)" % (mbox, uid))          key = mbox, uid @@ -251,7 +253,7 @@ class MemoryStore(object):          if not notify_on_disk:              # Caller does not care, just fired and forgot, so we pass              # a defer that will inmediately have its callback triggered. -            observer.callback(uid) +            reactor.callLater(0, observer.callback, uid)      def put_message(self, mbox, uid, message, notify_on_disk=True):          """ diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 25fc55f..89beaaa 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -78,7 +78,7 @@ def try_unique_query(curried):                  # TODO we could take action, like trigger a background                  # process to kill dupes.                  name = getattr(curried, 'expected', 'doc') -                logger.warning( +                logger.debug(                      "More than one %s found for this mbox, "                      "we got a duplicate!!" % (name,))              return query.pop() @@ -720,9 +720,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # ensure that we have a recent-flags and a hdocs-sec doc          self._get_or_create_rdoc() -        # Not for now... -        #self._get_or_create_hdocset() -      def _get_empty_doc(self, _type=FLAGS_DOC):          """          Returns an empty doc for storing different message parts. @@ -758,21 +755,26 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                  hdocset[fields.MBOX_KEY] = self.mbox              self._soledad.create_doc(hdocset) +    @deferred_to_thread      def _do_parse(self, raw):          """          Parse raw message and return it along with          relevant information about its outer level. +        This is done in a separate thread, and the callback is passed +        to `_do_add_msg` method. +          :param raw: the raw message          :type raw: StringIO or basestring -        :return: msg, chash, size, multi +        :return: msg, parts, chash, size, multi          :rtype: tuple          """          msg = self._get_parsed_msg(raw)          chash = self._get_hash(msg)          size = len(msg.as_string())          multi = msg.is_multipart() -        return msg, chash, size, multi +        parts = walk.get_parts(msg) +        return msg, parts, chash, size, multi      def _populate_flags(self, flags, uid, chash, size, multi):          """ @@ -879,19 +881,25 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):              flags = tuple()          leap_assert_type(flags, tuple) -        d = defer.Deferred() -        self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) -        return d +        observer = defer.Deferred() + +        d = self._do_parse(raw) +        d.addCallback(self._do_add_msg, flags, subject, date, +                      notify_on_disk, observer) +        return observer -    # We SHOULD defer this (or the heavy load here) to the thread pool, +    # We SHOULD defer the heavy load here) to the thread pool,      # but it gives troubles with the QSocketNotifier used by Qt... -    def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): +    def _do_add_msg(self, parse_result, flags, subject, +                    date, notify_on_disk, observer):          """          Helper that creates a new message document.          Here lives the magic of the leap mail. Well, in soledad, really.          See `add_msg` docstring for parameter info. +        :param parse_result: a tuple with the results of `self._do_parse` +        :type parse_result: tuple          :param observer: a deferred that will be fired with the message                           uid when the adding succeed.          :type observer: deferred @@ -902,26 +910,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          # TODO add the linked-from info !          # TODO add reference to the original message -        # parse -        msg, chash, size, multi = self._do_parse(raw) +        from twisted.internet import reactor +        msg, parts, chash, size, multi = parse_result          # check for uniqueness -------------------------------- -        # XXX profiler says that this test is costly. -        # So we probably should just do an in-memory check and -        # move the complete check to the soledad writer?          # Watch out! We're reserving a UID right after this!          existing_uid = self._fdoc_already_exists(chash)          if existing_uid: -            logger.warning("We already have that message in this " -                           "mailbox, unflagging as deleted")              uid = existing_uid              msg = self.get_msg_by_uid(uid) -            msg.setFlags((fields.DELETED_FLAG,), -1) - -            # XXX if this is deferred to thread again we should not use -            # the callback in the deferred thread, but return and -            # call the callback from the caller fun... -            observer.callback(uid) +            reactor.callLater(0, msg.setFlags, (fields.DELETED_FLAG,), -1) +            reactor.callLater(0, observer.callback, uid)              return          uid = self.memstore.increment_last_soledad_uid(self.mbox) @@ -930,7 +929,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          fd = self._populate_flags(flags, uid, chash, size, multi)          hd = self._populate_headr(msg, chash, subject, date) -        parts = walk.get_parts(msg)          body_phash_fun = [walk.get_body_phash_simple,                            walk.get_body_phash_multi][int(multi)]          body_phash = body_phash_fun(walk.get_payloads(msg)) @@ -949,9 +947,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          self.set_recent_flag(uid)          msg_container = MessageWrapper(fd, hd, cdocs) -        self.memstore.create_message(self.mbox, uid, msg_container, -                                     observer=observer, -                                     notify_on_disk=notify_on_disk) +        self.memstore.create_message( +            self.mbox, uid, msg_container, +            observer=observer, notify_on_disk=notify_on_disk)      #      # getters: specific queries @@ -982,14 +980,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):                          {'doc_id': rdoc.doc_id, 'set': rflags})              return rflags -        #else: -            # fallback for cases without memory store -            #with self._rdoc_lock: -                #rdoc = self._get_recent_doc() -                #self.__rflags = set(rdoc.content.get( -                    #fields.RECENTFLAGS_KEY, [])) -            #return self.__rflags -      def _set_recent_flags(self, value):          """          Setter for the recent-flags set for this mailbox. @@ -997,16 +987,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser):          if self.memstore is not None:              self.memstore.set_recent_flags(self.mbox, value) -        #else: -            # fallback for cases without memory store -            #with self._rdoc_lock: -                #rdoc = self._get_recent_doc() -                #newv = set(value) -                #self.__rflags = newv -                #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) -                # XXX should deferLater 0 it? -                #self._soledad.put_doc(rdoc) -      recent_flags = property(          _get_recent_flags, _set_recent_flags,          doc="Set of UIDs with the recent flag for this mailbox.") | 
