From 90f4338da088394ade1663871a23b8fb0a4c0d66 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Wed, 15 Jan 2014 17:05:24 -0400 Subject: Performance improvement on FLAGS-only FETCH * Compute the intersection set of the uids on a FETCH, so we avoid iterating through the non-existant UIDs. * Dispatch FLAGS query to our specialized method, that fetches all the flags documents and return objects that only specify one subset of the MessagePart interface, apt to render flags quickly with less queries overhead. * Overwrite the do_FETCH command in the imap Server to use fetch_flags. * Use deferLater for a better dispatch of tasks in the reactor. --- src/leap/mail/imap/mailbox.py | 94 +++++++++++++++++++++++++++++--------- src/leap/mail/imap/messages.py | 11 +---- src/leap/mail/imap/service/imap.py | 37 ++++++++++++++- 3 files changed, 109 insertions(+), 33 deletions(-) (limited to 'src/leap') diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index cd782b2..94070ac 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -20,13 +20,13 @@ Soledad Mailbox. import copy import threading import logging -import time import StringIO import cStringIO from collections import defaultdict from twisted.internet import defer +from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -59,7 +59,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): imap4.IMessageCopier) # XXX should finish the implementation of IMailboxListener - # XXX should complately implement ISearchableMailbox too + # XXX should completely implement ISearchableMailbox too messages = None _closed = False @@ -467,15 +467,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser): return d @deferred - def fetch(self, messages, uid): + def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. from rfc 3501: The data items to be fetched can be either a single atom or a parenthesized list. - :param messages: IDs of the messages to retrieve information about - :type messages: MessageSet + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet :param uid: If true, the IDs are UIDs. They are message sequence IDs otherwise. @@ -484,7 +485,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: A tuple of two-tuples of message sequence numbers and LeapMessage """ - result = [] + from twisted.internet import reactor # For the moment our UID is sequential, so we # can treat them all the same. @@ -494,12 +495,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser): sequence = False #sequence = True if uid == 0 else False - if not messages.last: + if not messages_asked.last: try: - iter(messages) + iter(messages_asked) except TypeError: # looks like we cannot iterate - messages.last = self.last_uid + messages_asked.last = self.last_uid + + set_asked = set(messages_asked) + set_exist = set(self.messages.all_uid_iter()) + seq_messg = set_asked.intersection(set_exist) + getmsg = lambda msgid: self.messages.get_msg_by_uid(msgid) # for sequence numbers (uid = 0) if sequence: @@ -507,20 +513,68 @@ class SoledadMailbox(WithMsgFields, MBoxParser): raise NotImplementedError else: - for msg_id in messages: - msg = self.messages.get_msg_by_uid(msg_id) - if msg: - result.append((msg_id, msg)) - else: - logger.debug("fetch %s, no msg found!!!" % msg_id) + result = ((msgid, getmsg(msgid)) for msgid in seq_messg) if self.isWriteable(): + deferLater(reactor, 30, self._unset_recent_flag) + # XXX I should rewrite the scheduler so it handles a + # set of queues with different priority. self._unset_recent_flag() - self._signal_unread_to_ui() - # XXX workaround for hangs in thunderbird - #return tuple(result[:100]) # --- doesn't show all!! - return tuple(result) + # this should really be called as a final callback of + # the do_FETCH method... + deferLater(reactor, 1, self._signal_unread_to_ui) + return result + + @deferred + def fetch_flags(self, messages_asked, uid): + """ + A fast method to fetch all flags, tricking just the + needed subset of the MIME interface that's needed to satisfy + a generic FLAGS query. + Given how LEAP Mail is supposed to work without local cache, + this query is going to be quite common, and also we expect + it to be in the form 1:* at the beginning of a session, so + it's not bad to fetch all the flags doc at once. + + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet + + :param uid: If true, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: bool + + :return: A tuple of two-tuples of message sequence numbers and + flagsPart, which is a only a partial implementation of + MessagePart. + :rtype: tuple + """ + class flagsPart(object): + def __init__(self, uid, flags): + self.uid = uid + self.flags = flags + + def getUID(self): + return self.uid + + def getFlags(self): + return map(str, self.flags) + + if not messages_asked.last: + try: + iter(messages_asked) + except TypeError: + # looks like we cannot iterate + messages_asked.last = self.last_uid + + set_asked = set(messages_asked) + set_exist = set(self.messages.all_uid_iter()) + seq_messg = set_asked.intersection(set_exist) + all_flags = self.messages.all_flags() + result = ((msgid, flagsPart( + msgid, all_flags[msgid])) for msgid in seq_messg) + return result @deferred def _unset_recent_flag(self): @@ -549,8 +603,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # 3. Route it through a queue with lesser priority than the # regularar writer. - # hmm let's try 2. in a quickndirty way... - time.sleep(1) log.msg('unsetting recent flags...') for msg in self.messages.get_recent(): msg.removeFlags((fields.RECENT_FLAG,)) diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index b35b808..22de356 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -662,17 +662,8 @@ class LeapMessage(fields, MailParser, MBoxParser): result = first(flag_docs) except Exception as exc: # ugh! Something's broken down there! - logger.warning("FUCKING ERROR ----- getting for UID:", self._uid) + logger.warning("ERROR while getting flags for UID: %s" % self._uid) logger.exception(exc) - try: - flag_docs = self._soledad.get_from_index( - fields.TYPE_MBOX_UID_IDX, - fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) - result = first(flag_docs) - except Exception as exc: - # ugh! Something's broken down there! - logger.warning("FUCKING ERROR, 2nd time -----") - logger.exception(exc) finally: return result diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index c48e5c5..e877869 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -22,6 +22,7 @@ from copy import copy import logging from twisted.internet.protocol import ServerFactory +from twisted.internet.defer import maybeDeferred from twisted.internet.error import CannotListenError from twisted.mail import imap4 from twisted.python import log @@ -78,7 +79,6 @@ class LeapIMAPServer(imap4.IMAP4Server): :param line: the line from the server, without the line delimiter. :type line: str """ - print "RECV: STATE (%s)" % self.state if self.theAccount.closed is True and self.state != "unauth": log.msg("Closing the session. State: unauth") self.state = "unauth" @@ -89,7 +89,7 @@ class LeapIMAPServer(imap4.IMAP4Server): msg = line[:7] + " [...]" else: msg = copy(line) - log.msg('rcv: %s' % msg) + log.msg('rcv (%s): %s' % (self.state, msg)) imap4.IMAP4Server.lineReceived(self, line) def authenticateLogin(self, username, password): @@ -111,6 +111,39 @@ class LeapIMAPServer(imap4.IMAP4Server): leap_events.signal(IMAP_CLIENT_LOGIN, "1") return imap4.IAccount, self.theAccount, lambda: None + def do_FETCH(self, tag, messages, query, uid=0): + """ + Overwritten fetch dispatcher to use the fast fetch_flags + method + """ + log.msg("LEAP Overwritten fetch...") + if not query: + self.sendPositiveResponse(tag, 'FETCH complete') + return # XXX ??? + + cbFetch = self._IMAP4Server__cbFetch + ebFetch = self._IMAP4Server__ebFetch + + if str(query[0]) == "flags": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_flags, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + else: + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + + select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_fetchatt) + class IMAPAuthRealm(object): """ -- cgit v1.2.3