From 9ef1cd79397d811575826025b924c615e6ce2aa4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 17 Jan 2014 02:51:31 -0400 Subject: Add a fetch_headers for mass-header fetch queries --- src/leap/mail/imap/fields.py | 2 + src/leap/mail/imap/mailbox.py | 76 +++++++++++++++--- src/leap/mail/imap/messages.py | 159 +++++++++++++++++++++++++++++++++++-- src/leap/mail/imap/service/imap.py | 12 ++- 4 files changed, 231 insertions(+), 18 deletions(-) (limited to 'src/leap/mail') diff --git a/src/leap/mail/imap/fields.py b/src/leap/mail/imap/fields.py index bc928a1..886ee63 100644 --- a/src/leap/mail/imap/fields.py +++ b/src/leap/mail/imap/fields.py @@ -61,6 +61,7 @@ class WithMsgFields(object): RW_KEY = "rw" LAST_UID_KEY = "lastuid" RECENTFLAGS_KEY = "rct" + HDOCS_SET_KEY = "hdocset" # Document Type, for indexing TYPE_KEY = "type" @@ -69,6 +70,7 @@ class WithMsgFields(object): TYPE_HEADERS_VAL = "head" TYPE_CONTENT_VAL = "cnt" TYPE_RECENT_VAL = "rct" + TYPE_HDOCS_SET_VAL = "hdocset" INBOX_VAL = "inbox" diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index bd69d12..b186e75 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -466,6 +466,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): d = self.messages.remove_all_deleted() d.addCallback(self._expunge_cb) d.addCallback(self.messages.reset_last_uid) + + # XXX DEBUG ------------------- + # FIXME !!! + # XXX should remove the hdocset too!!! return d def _bound_seq(self, messages_asked): @@ -520,8 +524,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: A tuple of two-tuples of message sequence numbers and LeapMessage """ - from twisted.internet import reactor - # For the moment our UID is sequential, so we # can treat them all the same. # Change this to the flag that twisted expects when we @@ -532,20 +534,14 @@ class SoledadMailbox(WithMsgFields, MBoxParser): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - getmsg = lambda uid: self.messages.get_msg_by_uid(uid) # for sequence numbers (uid = 0) if sequence: logger.debug("Getting msg by index: INEFFICIENT call!") raise NotImplementedError - else: result = ((msgid, getmsg(msgid)) for msgid in seq_messg) - - # this should really be called as a final callback of - # the do_FETCH method... - return result @deferred @@ -558,7 +554,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): Given how LEAP Mail is supposed to work without local cache, this query is going to be quite common, and also we expect it to be in the form 1:* at the beginning of a session, so - it's not bad to fetch all the flags doc at once. + it's not bad to fetch all the FLAGS docs at once. :param messages_asked: IDs of the messages to retrieve information about @@ -592,6 +588,55 @@ class SoledadMailbox(WithMsgFields, MBoxParser): msgid, all_flags[msgid])) for msgid in seq_messg) return result + @deferred + def fetch_headers(self, messages_asked, uid): + """ + A fast method to fetch all headers, tricking just the + needed subset of the MIME interface that's needed to satisfy + a generic HEADERS query. + + Given how LEAP Mail is supposed to work without local cache, + this query is going to be quite common, and also we expect + it to be in the form 1:* at the beginning of a session, so + **MAYBE** it's not too bad to fetch all the HEADERS docs at once. + + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet + + :param uid: If true, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: bool + + :return: A tuple of two-tuples of message sequence numbers and + headersPart, which is a only a partial implementation of + MessagePart. + :rtype: tuple + """ + class headersPart(object): + def __init__(self, uid, headers): + self.uid = uid + self.headers = headers + + def getUID(self): + return self.uid + + def getHeaders(self, _): + return dict( + (str(key), str(value)) + for key, value in + self.headers.items()) + + messages_asked = self._bound_seq(messages_asked) + seq_messg = self._filter_msg_seq(messages_asked) + + all_chash = self.messages.all_flags_chash() + all_headers = self.messages.all_headers() + result = ((msgid, headersPart( + msgid, all_headers.get(all_chash.get(msgid, 'nil'), {}))) + for msgid in seq_messg) + return result + def signal_unread_to_ui(self): """ Sends unread event to ui. @@ -629,7 +674,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ - from twisted.internet import reactor # XXX implement also sequence (uid = 0) # XXX we should prevent cclient from setting Recent flag. leap_assert(not isinstance(flags, basestring), @@ -657,10 +701,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser): msg.setFlags(flags) result[msg_id] = msg.getFlags() + # After changing flags, we want to signal again to the + # UI because the number of unread might have changed. + # Hoever, we should probably limit this to INBOX only? # this should really be called as a final callback of # the do_STORE method... - # XXX --- - #deferLater(reactor, 1, self._signal_unread_to_ui) + from twisted.internet import reactor + deferLater(reactor, 1, self._signal_unread_to_ui) return result # ISearchableMailbox @@ -727,6 +774,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): new_fdoc[self.UID_KEY] = uid_next new_fdoc[self.MBOX_KEY] = self.mbox self._do_add_doc(new_fdoc) + + # XXX should use a public api instead + hdoc = msg._hdoc + self.messages.add_hdocset_docid(hdoc.doc_id) + deferLater(reactor, 1, self.notify_new) def _do_add_doc(self, doc): diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index f968c47..7a21009 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -964,10 +964,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" CONTENT_DOC = "CONTENT" + """ + RECENT_DOC is a document that stores a list of the UIDs + with the recent flag for this mailbox. It deserves a special treatment + because: + (1) it cannot be set by the user + (2) it's a flag that we set inmediately after a fetch, which is quite + often. + (3) we need to be able to set/unset it in batches without doing a single + write for each element in the sequence. + """ RECENT_DOC = "RECENT" + """ + HDOCS_SET_DOC is a document that stores a set of the Document-IDs + (the u1db index) for all the headers documents for a given mailbox. + We use it to prefetch massively all the headers for a mailbox. + This is the second massive query, after fetching all the FLAGS, that + a MUA will do in a case where we do not have local disk cache. + """ + HDOCS_SET_DOC = "HDOCS_SET" templates = { + # Message Level + FLAGS_DOC: { fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, fields.UID_KEY: 1, # XXX moe to a local table @@ -1007,14 +1027,25 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.MULTIPART_KEY: False, }, + # Mailbox Level + RECENT_DOC: { fields.TYPE_KEY: fields.TYPE_RECENT_VAL, fields.MBOX_KEY: fields.INBOX_VAL, fields.RECENTFLAGS_KEY: [], + }, + + HDOCS_SET_DOC: { + fields.TYPE_KEY: fields.TYPE_HDOCS_SET_VAL, + fields.MBOX_KEY: fields.INBOX_VAL, + fields.HDOCS_SET_KEY: [], } + + } _rdoc_lock = threading.Lock() + _hdocset_lock = threading.Lock() def __init__(self, mbox=None, soledad=None): """ @@ -1045,10 +1076,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, self.mbox = self._parse_mailbox_name(mbox) self._soledad = soledad self.__rflags = None + self.__hdocset = None self.initialize_db() - # ensure that we have a recent-flags doc + # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() + self._get_or_create_hdocset() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -1073,6 +1106,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, rdoc[fields.MBOX_KEY] = self.mbox self._soledad.create_doc(rdoc) + def _get_or_create_hdocset(self): + """ + Try to retrieve the hdocs-set doc for this MessageCollection, + and create one if not found. + """ + hdocset = self._get_hdocset_doc() + if not hdocset: + hdocset = self._get_empty_doc(self.HDOCS_SET_DOC) + if self.mbox != fields.INBOX_VAL: + hdocset[fields.MBOX_KEY] = self.mbox + self._soledad.create_doc(hdocset) + def _do_parse(self, raw): """ Parse raw message and return it along with @@ -1222,10 +1267,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # first, regular docs: flags and headers self._soledad.create_doc(fd) - # XXX should check for content duplication on headers too # but with chash. !!! - self._soledad.create_doc(hd) + hdoc = self._soledad.create_doc(hd) + # We add the newly created hdoc to the fast-access set of + # headers documents associated with the mailbox. + self.add_hdocset_docid(hdoc.doc_id) # and last, but not least, try to create # content docs if not already there. @@ -1258,7 +1305,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, d.addCallback(self._remove_cb) return d + # # getters: specific queries + # + + # recent flags def _get_recent_flags(self): """ @@ -1310,14 +1361,85 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def _get_recent_doc(self): """ - Get recent-flags document for this inbox. + Get recent-flags document for this mailbox. """ curried = partial( self._soledad.get_from_index, fields.TYPE_MBOX_IDX, fields.TYPE_RECENT_VAL, self.mbox) curried.expected = "rdoc" - return try_unique_query(curried) + with self._rdoc_lock: + return try_unique_query(curried) + + # headers-docs-set + + def _get_hdocset(self): + """ + An accessor for the hdocs-set for this mailbox. + """ + if not self.__hdocset: + hdocset_doc = self._get_hdocset_doc() + value = set(hdocset_doc.content.get( + fields.HDOCS_SET_KEY, [])) + self.__hdocset = value + return self.__hdocset + + def _set_hdocset(self, value): + """ + Setter for the hdocs-set for this mailbox. + """ + hdocset_doc = self._get_hdocset_doc() + newv = set(value) + self.__hdocset = newv + + with self._hdocset_lock: + hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) + # XXX should deferLater 0 it? + self._soledad.put_doc(hdocset_doc) + + _hdocset = property( + _get_hdocset, _set_hdocset, + doc="Set of Document-IDs for the headers docs associated " + "with this mailbox.") + + def _get_hdocset_doc(self): + """ + Get hdocs-set document for this mailbox. + """ + curried = partial( + self._soledad.get_from_index, + fields.TYPE_MBOX_IDX, + fields.TYPE_HDOCS_SET_VAL, self.mbox) + curried.expected = "hdocset" + with self._hdocset_lock: + hdocset_doc = try_unique_query(curried) + return hdocset_doc + + def remove_hdocset_docids(self, docids): + """ + Remove the given document IDs from the set of + header-documents associated with this mailbox. + """ + self._hdocset = self._hdocset.difference( + set(docids)) + + def remove_hdocset_docid(self, docid): + """ + Remove the given document ID from the set of + header-documents associated with this mailbox. + """ + self._hdocset = self._hdocset.difference( + set([docid])) + + def add_hdocset_docid(self, docid): + """ + Add the given document ID to the set of + header-documents associated with this mailbox. + """ + hdocset = self._hdocset + self._hdocset = hdocset.union(set([docid])) + + # individual doc getters, message layer. def _get_fdoc_from_chash(self, chash): """ @@ -1456,6 +1578,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.TYPE_FLAGS_VAL, self.mbox))) return all_flags + def all_flags_chash(self): + """ + Return a dict with the content-hash for all flag documents + for this mailbox. + """ + all_flags_chash = dict((( + doc.content[self.UID_KEY], + doc.content[self.CONTENT_HASH_KEY]) for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox))) + return all_flags_chash + + def all_headers(self): + """ + Return a dict with all the headers documents for this + mailbox. + """ + all_headers = dict((( + doc.content[self.CONTENT_HASH_KEY], + doc.content[self.HEADERS_KEY]) for doc in + self._soledad.get_docs(self._hdocset))) + return all_headers + def count(self): """ Return the count of messages for this mailbox. @@ -1509,6 +1655,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def count_recent(self): """ Count all messages with the `Recent` flag. + It just retrieves the length of the recent_flags set, + which is stored in a specific type of document for + this collection. :returns: count :rtype: int diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index a3ef098..a1d3ab7 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -123,6 +123,9 @@ class LeapIMAPServer(imap4.IMAP4Server): self.sendPositiveResponse(tag, 'FETCH complete') return # XXX ??? + print "QUERY ", query + print query[0] + cbFetch = self._IMAP4Server__cbFetch ebFetch = self._IMAP4Server__ebFetch @@ -134,6 +137,14 @@ class LeapIMAPServer(imap4.IMAP4Server): ).addCallback( cbFetch, tag, query, uid ).addErrback(ebFetch, tag) + elif len(query) == 1 and str(query[0]) == "rfc822.header": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_headers, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) else: self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator @@ -198,7 +209,6 @@ class LeapIMAPServer(imap4.IMAP4Server): self.mbox = mbox - class IMAPAuthRealm(object): """ Dummy authentication realm. Do not use in production! -- cgit v1.2.3