diff options
Diffstat (limited to 'mail/src/leap')
-rw-r--r-- | mail/src/leap/mail/imap/fields.py | 4 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 197 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/messages.py | 405 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 68 |
4 files changed, 510 insertions, 164 deletions
diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index 3d2ac92..886ee63 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -60,6 +60,8 @@ class WithMsgFields(object): SUBSCRIBED_KEY = "subscribed" RW_KEY = "rw" LAST_UID_KEY = "lastuid" + RECENTFLAGS_KEY = "rct" + HDOCS_SET_KEY = "hdocset" # Document Type, for indexing TYPE_KEY = "type" @@ -67,6 +69,8 @@ class WithMsgFields(object): TYPE_FLAGS_VAL = "flags" TYPE_HEADERS_VAL = "head" TYPE_CONTENT_VAL = "cnt" + TYPE_RECENT_VAL = "rct" + TYPE_HDOCS_SET_VAL = "hdocset" INBOX_VAL = "inbox" diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 94070ac..b186e75 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -398,18 +398,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser): flags = tuple(str(flag) for flag in flags) d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) - d.addCallback(self._notify_new) return d - @deferred def _do_add_message(self, message, flags, date, uid): """ Calls to the messageCollection add_msg method (deferred to thread). Invoked from addMessage. """ - self.messages.add_msg(message, flags=flags, date=date, uid=uid) + d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) + # XXX notify after batch APPEND? + d.addCallback(self.notify_new) + return d - def _notify_new(self, *args): + def notify_new(self, *args): """ Notify of new messages to all the listeners. @@ -464,8 +465,46 @@ class SoledadMailbox(WithMsgFields, MBoxParser): raise imap4.ReadOnlyMailbox d = self.messages.remove_all_deleted() d.addCallback(self._expunge_cb) + d.addCallback(self.messages.reset_last_uid) + + # XXX DEBUG ------------------- + # FIXME !!! + # XXX should remove the hdocset too!!! return d + def _bound_seq(self, messages_asked): + """ + Put an upper bound to a messages sequence if this is open. + + :param messages_asked: IDs of the messages. + :type messages_asked: MessageSet + :rtype: MessageSet + """ + if not messages_asked.last: + try: + iter(messages_asked) + except TypeError: + # looks like we cannot iterate + try: + messages_asked.last = self.last_uid + except ValueError: + pass + return messages_asked + + def _filter_msg_seq(self, messages_asked): + """ + Filter a message sequence returning only the ones that do exist in the + collection. + + :param messages_asked: IDs of the messages. + :type messages_asked: MessageSet + :rtype: set + """ + set_asked = set(messages_asked) + set_exist = set(self.messages.all_uid_iter()) + seq_messg = set_asked.intersection(set_exist) + return seq_messg + @deferred def fetch(self, messages_asked, uid): """ @@ -485,8 +524,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: A tuple of two-tuples of message sequence numbers and LeapMessage """ - from twisted.internet import reactor - # For the moment our UID is sequential, so we # can treat them all the same. # Change this to the flag that twisted expects when we @@ -495,35 +532,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser): sequence = False #sequence = True if uid == 0 else False - if not messages_asked.last: - try: - iter(messages_asked) - except TypeError: - # looks like we cannot iterate - messages_asked.last = self.last_uid - - set_asked = set(messages_asked) - set_exist = set(self.messages.all_uid_iter()) - seq_messg = set_asked.intersection(set_exist) - getmsg = lambda msgid: self.messages.get_msg_by_uid(msgid) + messages_asked = self._bound_seq(messages_asked) + seq_messg = self._filter_msg_seq(messages_asked) + getmsg = lambda uid: self.messages.get_msg_by_uid(uid) # for sequence numbers (uid = 0) if sequence: logger.debug("Getting msg by index: INEFFICIENT call!") raise NotImplementedError - else: result = ((msgid, getmsg(msgid)) for msgid in seq_messg) - - if self.isWriteable(): - deferLater(reactor, 30, self._unset_recent_flag) - # XXX I should rewrite the scheduler so it handles a - # set of queues with different priority. - self._unset_recent_flag() - - # this should really be called as a final callback of - # the do_FETCH method... - deferLater(reactor, 1, self._signal_unread_to_ui) return result @deferred @@ -532,10 +550,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): A fast method to fetch all flags, tricking just the needed subset of the MIME interface that's needed to satisfy a generic FLAGS query. + Given how LEAP Mail is supposed to work without local cache, this query is going to be quite common, and also we expect it to be in the form 1:* at the beginning of a session, so - it's not bad to fetch all the flags doc at once. + it's not bad to fetch all the FLAGS docs at once. :param messages_asked: IDs of the messages to retrieve information about @@ -561,55 +580,64 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def getFlags(self): return map(str, self.flags) - if not messages_asked.last: - try: - iter(messages_asked) - except TypeError: - # looks like we cannot iterate - messages_asked.last = self.last_uid + messages_asked = self._bound_seq(messages_asked) + seq_messg = self._filter_msg_seq(messages_asked) - set_asked = set(messages_asked) - set_exist = set(self.messages.all_uid_iter()) - seq_messg = set_asked.intersection(set_exist) all_flags = self.messages.all_flags() result = ((msgid, flagsPart( msgid, all_flags[msgid])) for msgid in seq_messg) return result @deferred - def _unset_recent_flag(self): + def fetch_headers(self, messages_asked, uid): """ - Unsets `Recent` flag from a tuple of messages. - Called from fetch. + A fast method to fetch all headers, tricking just the + needed subset of the MIME interface that's needed to satisfy + a generic HEADERS query. + + Given how LEAP Mail is supposed to work without local cache, + this query is going to be quite common, and also we expect + it to be in the form 1:* at the beginning of a session, so + **MAYBE** it's not too bad to fetch all the HEADERS docs at once. - From RFC, about `Recent`: + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet - Message is "recently" arrived in this mailbox. This session - is the first session to have been notified about this - message; if the session is read-write, subsequent sessions - will not see \Recent set for this message. This flag can not - be altered by the client. + :param uid: If true, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: bool - If it is not possible to determine whether or not this - session is the first session to be notified about a message, - then that message SHOULD be considered recent. + :return: A tuple of two-tuples of message sequence numbers and + headersPart, which is a only a partial implementation of + MessagePart. + :rtype: tuple """ - # TODO this fucker, for the sake of correctness, is messing with - # the whole collection of flag docs. + class headersPart(object): + def __init__(self, uid, headers): + self.uid = uid + self.headers = headers - # Possible ways of action: - # 1. Ignore it, we want fun. - # 2. Trigger it with a delay - # 3. Route it through a queue with lesser priority than the - # regularar writer. + def getUID(self): + return self.uid - log.msg('unsetting recent flags...') - for msg in self.messages.get_recent(): - msg.removeFlags((fields.RECENT_FLAG,)) - self._signal_unread_to_ui() + def getHeaders(self, _): + return dict( + (str(key), str(value)) + for key, value in + self.headers.items()) - @deferred - def _signal_unread_to_ui(self): + messages_asked = self._bound_seq(messages_asked) + seq_messg = self._filter_msg_seq(messages_asked) + + all_chash = self.messages.all_flags_chash() + all_headers = self.messages.all_headers() + result = ((msgid, headersPart( + msgid, all_headers.get(all_chash.get(msgid, 'nil'), {}))) + for msgid in seq_messg) + return result + + def signal_unread_to_ui(self): """ Sends unread event to ui. """ @@ -617,7 +645,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): leap_events.signal(IMAP_UNREAD_MAIL, str(unseen)) @deferred - def store(self, messages, flags, mode, uid): + def store(self, messages_asked, flags, mode, uid): """ Sets the flags of one or more messages. @@ -652,19 +680,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser): "flags cannot be a string") flags = tuple(flags) + messages_asked = self._bound_seq(messages_asked) + seq_messg = self._filter_msg_seq(messages_asked) + if not self.isWriteable(): log.msg('read only mailbox!') raise imap4.ReadOnlyMailbox - if not messages.last: - messages.last = self.messages.count() - result = {} - for msg_id in messages: + for msg_id in seq_messg: log.msg("MSG ID = %s" % msg_id) msg = self.messages.get_msg_by_uid(msg_id) if not msg: - return result + continue if mode == 1: msg.addFlags(flags) elif mode == -1: @@ -673,7 +701,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser): msg.setFlags(flags) result[msg_id] = msg.getFlags() - self._signal_unread_to_ui() + # After changing flags, we want to signal again to the + # UI because the number of unread might have changed. + # Hoever, we should probably limit this to INBOX only? + # this should really be called as a final callback of + # the do_STORE method... + from twisted.internet import reactor + deferLater(reactor, 1, self._signal_unread_to_ui) return result # ISearchableMailbox @@ -726,6 +760,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ Copy the given message object into this mailbox. """ + from twisted.internet import reactor uid_next = self.getUIDNext() msg = messageObject @@ -738,15 +773,20 @@ class SoledadMailbox(WithMsgFields, MBoxParser): new_fdoc = copy.deepcopy(fdoc.content) new_fdoc[self.UID_KEY] = uid_next new_fdoc[self.MBOX_KEY] = self.mbox + self._do_add_doc(new_fdoc) - d = self._do_add_doc(new_fdoc) - d.addCallback(self._notify_new) + # XXX should use a public api instead + hdoc = msg._hdoc + self.messages.add_hdocset_docid(hdoc.doc_id) + + deferLater(reactor, 1, self.notify_new) - @deferred def _do_add_doc(self, doc): """ - Defers the adding of a new doc. + Defer the adding of a new doc. + :param doc: document to be created in soledad. + :type doc: dict """ self._soledad.create_doc(doc) @@ -754,12 +794,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def deleteAllDocs(self): """ - Deletes all docs in this mailbox + Delete all docs in this mailbox """ docs = self.messages.get_all_docs() for doc in docs: self.messages._soledad.delete_doc(doc) + def unset_recent_flags(self, uids): + """ + Unset Recent flag for a sequence of UIDs. + """ + seq_messg = self._bound_seq(uids) + self.messages.unset_recent_flags(seq_messg) + def __repr__(self): """ Representation string for this mailbox. diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 28bd272..d2c0950 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -21,9 +21,11 @@ import copy import logging import re import time +import threading import StringIO from collections import defaultdict, namedtuple +from functools import partial from twisted.mail import imap4 from twisted.internet import defer @@ -41,7 +43,7 @@ from leap.mail.decorators import deferred from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields from leap.mail.imap.parser import MailParser, MBoxParser -from leap.mail.messageflow import IMessageConsumer, MessageProducer +from leap.mail.messageflow import IMessageConsumer logger = logging.getLogger(__name__) @@ -65,6 +67,31 @@ def lowerdict(_dict): for key, value in _dict.items()) +def try_unique_query(curried): + """ + Try to execute a query that is expected to have a + single outcome, and log a warning if more than one document found. + + :param curried: a curried function + :type curried: callable + """ + leap_assert(callable(curried), "A callable is expected") + try: + query = curried() + if query: + if len(query) > 1: + # TODO we could take action, like trigger a background + # process to kill dupes. + name = getattr(curried, 'expected', 'doc') + logger.warning( + "More than one %s found for this mbox, " + "we got a duplicate!!" % (name,)) + return query.pop() + else: + return None + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + CHARSET_PATTERN = r"""charset=([\w-]+)""" MSGID_PATTERN = r"""<([\w@.]+)>""" @@ -308,7 +335,7 @@ class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox): + def __init__(self, soledad, uid, mbox, collection=None): """ Initializes a LeapMessage. @@ -318,11 +345,14 @@ class LeapMessage(fields, MailParser, MBoxParser): :type uid: int or basestring :param mbox: the mbox this message belongs to :type mbox: basestring + :param collection: a reference to the parent collection object + :type collection: MessageCollection """ MailParser.__init__(self) self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) + self._collection = collection self.__chash = None self.__bdoc = None @@ -373,7 +403,7 @@ class LeapMessage(fields, MailParser, MBoxParser): def getUID(self): """ - Retrieve the unique identifier associated with this message + Retrieve the unique identifier associated with this Message. :return: uid for this message :rtype: int @@ -382,18 +412,26 @@ class LeapMessage(fields, MailParser, MBoxParser): def getFlags(self): """ - Retrieve the flags associated with this message + Retrieve the flags associated with this Message. :return: The flags, represented as strings :rtype: tuple """ if self._uid is None: return [] + uid = self._uid flags = [] fdoc = self._fdoc if fdoc: flags = fdoc.content.get(self.FLAGS_KEY, None) + + msgcol = self._collection + + # We treat the recent flag specially: gotten from + # a mailbox-level document. + if msgcol and uid in msgcol.recent_flags: + flags.append(fields.RECENT_FLAG) if flags: flags = map(str, flags) return tuple(flags) @@ -414,7 +452,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: SoledadDocument """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - log.msg('setting flags: %s' % (self._uid)) + log.msg('setting flags: %s (%s)' % (self._uid, flags)) doc = self._fdoc if not doc: @@ -424,7 +462,6 @@ class LeapMessage(fields, MailParser, MBoxParser): return doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags self._soledad.put_doc(doc) @@ -467,7 +504,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: C{str} :return: An RFC822-formatted date string. """ - return str(self._hdoc.content.get(self.DATE_KEY, '')) + date = self._hdoc.content.get(self.DATE_KEY, '') + return str(date) # # IMessagePart @@ -926,9 +964,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" CONTENT_DOC = "CONTENT" + """ + RECENT_DOC is a document that stores a list of the UIDs + with the recent flag for this mailbox. It deserves a special treatment + because: + (1) it cannot be set by the user + (2) it's a flag that we set inmediately after a fetch, which is quite + often. + (3) we need to be able to set/unset it in batches without doing a single + write for each element in the sequence. + """ + RECENT_DOC = "RECENT" + """ + HDOCS_SET_DOC is a document that stores a set of the Document-IDs + (the u1db index) for all the headers documents for a given mailbox. + We use it to prefetch massively all the headers for a mailbox. + This is the second massive query, after fetching all the FLAGS, that + a MUA will do in a case where we do not have local disk cache. + """ + HDOCS_SET_DOC = "HDOCS_SET" templates = { + # Message Level + FLAGS_DOC: { fields.TYPE_KEY: fields.TYPE_FLAGS_VAL, fields.UID_KEY: 1, # XXX moe to a local table @@ -936,7 +995,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.CONTENT_HASH_KEY: "", fields.SEEN_KEY: False, - fields.RECENT_KEY: True, fields.DEL_KEY: False, fields.FLAGS_KEY: [], fields.MULTIPART_KEY: False, @@ -969,12 +1027,36 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.MULTIPART_KEY: False, }, + # Mailbox Level + + RECENT_DOC: { + fields.TYPE_KEY: fields.TYPE_RECENT_VAL, + fields.MBOX_KEY: fields.INBOX_VAL, + fields.RECENTFLAGS_KEY: [], + }, + + HDOCS_SET_DOC: { + fields.TYPE_KEY: fields.TYPE_HDOCS_SET_VAL, + fields.MBOX_KEY: fields.INBOX_VAL, + fields.HDOCS_SET_KEY: [], + } + + } + _rdoc_lock = threading.Lock() + _hdocset_lock = threading.Lock() + def __init__(self, mbox=None, soledad=None): """ Constructor for MessageCollection. + On initialization, we ensure that we have a document for + storing the recent flags. The nature of this flag make us wanting + to store the set of the UIDs with this flag at the level of the + MessageCollection for each mailbox, instead of treating them + as a property of each message. + :param mbox: the name of the mailbox. It is the name with which we filter the query over the messages database @@ -993,17 +1075,13 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # okay, all in order, keep going... self.mbox = self._parse_mailbox_name(mbox) self._soledad = soledad + self.__rflags = None + self.__hdocset = None self.initialize_db() - # I think of someone like nietzsche when reading this - - # this will be the producer that will enqueue the content - # to be processed serially by the consumer (the writer). We just - # need to `put` the new material on its plate. - - self.soledad_writer = MessageProducer( - SoledadDocWriter(soledad), - period=0.02) + # ensure that we have a recent-flags and a hdocs-sec doc + self._get_or_create_rdoc() + self._get_or_create_hdocset() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -1016,6 +1094,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, raise TypeError("Improper type passed to _get_empty_doc") return copy.deepcopy(self.templates[_type]) + def _get_or_create_rdoc(self): + """ + Try to retrieve the recent-flags doc for this MessageCollection, + and create one if not found. + """ + rdoc = self._get_recent_doc() + if not rdoc: + rdoc = self._get_empty_doc(self.RECENT_DOC) + if self.mbox != fields.INBOX_VAL: + rdoc[fields.MBOX_KEY] = self.mbox + self._soledad.create_doc(rdoc) + + def _get_or_create_hdocset(self): + """ + Try to retrieve the hdocs-set doc for this MessageCollection, + and create one if not found. + """ + hdocset = self._get_hdocset_doc() + if not hdocset: + hdocset = self._get_empty_doc(self.HDOCS_SET_DOC) + if self.mbox != fields.INBOX_VAL: + hdocset[fields.MBOX_KEY] = self.mbox + self._soledad.create_doc(hdocset) + def _do_parse(self, raw): """ Parse raw message and return it along with @@ -1077,12 +1179,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[self.MSGID_KEY] = msgid if not subject and self.SUBJECT_FIELD in headers: - hd[self.SUBJECT_KEY] = first(headers[self.SUBJECT_FIELD]) + hd[self.SUBJECT_KEY] = headers[self.SUBJECT_FIELD] else: hd[self.SUBJECT_KEY] = subject if not date and self.DATE_FIELD in headers: - hd[self.DATE_KEY] = first(headers[self.DATE_FIELD]) + hd[self.DATE_KEY] = headers[self.DATE_FIELD] else: hd[self.DATE_KEY] = date return hd @@ -1160,14 +1262,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[key] = parts_map[key] del parts_map - # Saving + # Saving ---------------------------------------- + self.set_recent_flag(uid) # first, regular docs: flags and headers self._soledad.create_doc(fd) - # XXX should check for content duplication on headers too # but with chash. !!! - self._soledad.create_doc(hd) + hdoc = self._soledad.create_doc(hd) + # We add the newly created hdoc to the fast-access set of + # headers documents associated with the mailbox. + self.add_hdocset_docid(hdoc.doc_id) # and last, but not least, try to create # content docs if not already there. @@ -1200,7 +1305,141 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, d.addCallback(self._remove_cb) return d + # # getters: specific queries + # + + # recent flags + + def _get_recent_flags(self): + """ + An accessor for the recent-flags set for this mailbox. + """ + if not self.__rflags: + rdoc = self._get_recent_doc() + self.__rflags = set(rdoc.content.get( + fields.RECENTFLAGS_KEY, [])) + return self.__rflags + + def _set_recent_flags(self, value): + """ + Setter for the recent-flags set for this mailbox. + """ + rdoc = self._get_recent_doc() + newv = set(value) + self.__rflags = newv + + with self._rdoc_lock: + rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) + # XXX should deferLater 0 it? + self._soledad.put_doc(rdoc) + + recent_flags = property( + _get_recent_flags, _set_recent_flags, + doc="Set of UIDs with the recent flag for this mailbox.") + + def unset_recent_flags(self, uids): + """ + Unset Recent flag for a sequence of uids. + """ + self.recent_flags = self.recent_flags.difference( + set(uids)) + + def unset_recent_flag(self, uid): + """ + Unset Recent flag for a given uid. + """ + self.recent_flags = self.recent_flags.difference( + set([uid])) + + def set_recent_flag(self, uid): + """ + Set Recent flag for a given uid. + """ + self.recent_flags = self.recent_flags.union( + set([uid])) + + def _get_recent_doc(self): + """ + Get recent-flags document for this mailbox. + """ + curried = partial( + self._soledad.get_from_index, + fields.TYPE_MBOX_IDX, + fields.TYPE_RECENT_VAL, self.mbox) + curried.expected = "rdoc" + with self._rdoc_lock: + return try_unique_query(curried) + + # headers-docs-set + + def _get_hdocset(self): + """ + An accessor for the hdocs-set for this mailbox. + """ + if not self.__hdocset: + hdocset_doc = self._get_hdocset_doc() + value = set(hdocset_doc.content.get( + fields.HDOCS_SET_KEY, [])) + self.__hdocset = value + return self.__hdocset + + def _set_hdocset(self, value): + """ + Setter for the hdocs-set for this mailbox. + """ + hdocset_doc = self._get_hdocset_doc() + newv = set(value) + self.__hdocset = newv + + with self._hdocset_lock: + hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) + # XXX should deferLater 0 it? + self._soledad.put_doc(hdocset_doc) + + _hdocset = property( + _get_hdocset, _set_hdocset, + doc="Set of Document-IDs for the headers docs associated " + "with this mailbox.") + + def _get_hdocset_doc(self): + """ + Get hdocs-set document for this mailbox. + """ + curried = partial( + self._soledad.get_from_index, + fields.TYPE_MBOX_IDX, + fields.TYPE_HDOCS_SET_VAL, self.mbox) + curried.expected = "hdocset" + with self._hdocset_lock: + hdocset_doc = try_unique_query(curried) + return hdocset_doc + + def remove_hdocset_docids(self, docids): + """ + Remove the given document IDs from the set of + header-documents associated with this mailbox. + """ + self._hdocset = self._hdocset.difference( + set(docids)) + + def remove_hdocset_docid(self, docid): + """ + Remove the given document ID from the set of + header-documents associated with this mailbox. + """ + self._hdocset = self._hdocset.difference( + set([docid])) + + def add_hdocset_docid(self, docid): + """ + Add the given document ID to the set of + header-documents associated with this mailbox. + """ + hdocset = self._hdocset + self._hdocset = hdocset.union(set([docid])) + + # individual doc getters, message layer. def _get_fdoc_from_chash(self, chash): """ @@ -1210,39 +1449,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, the query failed. :rtype: SoledadDocument or None. """ - try: - query = self._soledad.get_from_index( - fields.TYPE_MBOX_C_HASH_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, chash) - if query: - if len(query) > 1: - logger.warning( - "More than one fdoc found for this chash, " - "we got a duplicate!!") - # XXX we could take action, like trigger a background - # process to kill dupes. - return query.pop() - else: - return None - except Exception as exc: - logger.exception("Unhandled error %r" % exc) + curried = partial( + self._soledad.get_from_index, + fields.TYPE_MBOX_C_HASH_IDX, + fields.TYPE_FLAGS_VAL, self.mbox, chash) + curried.expected = "fdoc" + return try_unique_query(curried) def _get_uid_from_msgidCb(self, msgid): hdoc = None - try: - query = self._soledad.get_from_index( - fields.TYPE_MSGID_IDX, - fields.TYPE_HEADERS_VAL, msgid) - if query: - if len(query) > 1: - logger.warning( - "More than one hdoc found for this msgid, " - "we got a duplicate!!") - # XXX we could take action, like trigger a background - # process to kill dupes. - hdoc = query.pop() - except Exception as exc: - logger.exception("Unhandled error %r" % exc) + curried = partial( + self._soledad.get_from_index, + fields.TYPE_MSGID_IDX, + fields.TYPE_HEADERS_VAL, msgid) + curried.expected = "hdoc" + hdoc = try_unique_query(curried) if hdoc is None: logger.warning("Could not find hdoc for msgid %s" @@ -1271,13 +1492,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # the query is received right after we've saved the document, # and we cannot find it otherwise. This seems to be enough. - # Doing a sleep since we'll be calling this in a secondary thread, - # but we'll should be able to collect the results after a - # reactor.callLater. - # Maybe we can implement something like NOT_DONE_YET in the web - # framework, and return from the callback? - # See: http://jcalderone.livejournal.com/50226.html - # reactor.callLater(0.3, self._get_uid_from_msgidCb, msgid) + # XXX do a deferLater instead ?? time.sleep(0.3) return self._get_uid_from_msgidCb(msgid) @@ -1286,6 +1501,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def get_msg_by_uid(self, uid): """ Retrieves a LeapMessage by UID. + This is used primarity in the Mailbox fetch and store methods. :param uid: the message uid to query by :type uid: int @@ -1294,7 +1510,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, or None if not found. :rtype: LeapMessage """ - msg = LeapMessage(self._soledad, uid, self.mbox) + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg @@ -1323,6 +1539,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # inneficient, but first let's grok it and then # let's worry about efficiency. # XXX FIXINDEX -- should implement order by in soledad + # FIXME ---------------------------------------------- return sorted(all_docs, key=lambda item: item.content['uid']) def all_uid_iter(self): @@ -1337,6 +1554,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.TYPE_FLAGS_VAL, self.mbox)) return (u for u in sorted(all_uids)) + def reset_last_uid(self, param): + """ + Set the last uid to the highest uid found. + Used while expunging, passed as a callback. + """ + try: + self.last_uid = max(self.all_uid_iter()) + 1 + except ValueError: + # empty sequence + pass + return param + def all_flags(self): """ Return a dict with all flags documents for this mailbox. @@ -1349,6 +1578,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.TYPE_FLAGS_VAL, self.mbox))) return all_flags + def all_flags_chash(self): + """ + Return a dict with the content-hash for all flag documents + for this mailbox. + """ + all_flags_chash = dict((( + doc.content[self.UID_KEY], + doc.content[self.CONTENT_HASH_KEY]) for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox))) + return all_flags_chash + + def all_headers(self): + """ + Return a dict with all the headers documents for this + mailbox. + """ + all_headers = dict((( + doc.content[self.CONTENT_HASH_KEY], + doc.content[self.HEADERS_KEY]) for doc in + self._soledad.get_docs(self._hdocset))) + return all_headers + def count(self): """ Return the count of messages for this mailbox. @@ -1399,39 +1652,17 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # recent messages - def recent_iter(self): - """ - Get an iterator for the message UIDs with `recent` flag. - - :return: iterator through recent message docs - :rtype: iterable - """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_RECT_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1')) - - def get_recent(self): - """ - Get all messages with the `Recent` flag. - - :returns: a list of LeapMessages - :rtype: list - """ - return [LeapMessage(self._soledad, docid, self.mbox) - for docid in self.recent_iter()] - def count_recent(self): """ Count all messages with the `Recent` flag. + It just retrieves the length of the recent_flags set, + which is stored in a specific type of document for + this collection. :returns: count :rtype: int """ - count = self._soledad.get_count_from_index( - fields.TYPE_MBOX_RECT_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1') - return count + return len(self.recent_flags) # deleted messages @@ -1483,4 +1714,4 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, self.mbox, self.count()) # XXX should implement __eq__ also !!! - # --- use the content hash for that, will be used for dedup. + # use chash... diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index e877869..a1d3ab7 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -24,6 +24,7 @@ import logging from twisted.internet.protocol import ServerFactory from twisted.internet.defer import maybeDeferred from twisted.internet.error import CannotListenError +from twisted.internet.task import deferLater from twisted.mail import imap4 from twisted.python import log from twisted import cred @@ -116,15 +117,19 @@ class LeapIMAPServer(imap4.IMAP4Server): Overwritten fetch dispatcher to use the fast fetch_flags method """ + from twisted.internet import reactor log.msg("LEAP Overwritten fetch...") if not query: self.sendPositiveResponse(tag, 'FETCH complete') return # XXX ??? + print "QUERY ", query + print query[0] + cbFetch = self._IMAP4Server__cbFetch ebFetch = self._IMAP4Server__ebFetch - if str(query[0]) == "flags": + if len(query) == 1 and str(query[0]) == "flags": self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator maybeDeferred( @@ -132,6 +137,14 @@ class LeapIMAPServer(imap4.IMAP4Server): ).addCallback( cbFetch, tag, query, uid ).addErrback(ebFetch, tag) + elif len(query) == 1 and str(query[0]) == "rfc822.header": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_headers, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) else: self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator @@ -139,11 +152,62 @@ class LeapIMAPServer(imap4.IMAP4Server): self.mbox.fetch, messages, uid=uid ).addCallback( cbFetch, tag, query, uid - ).addErrback(ebFetch, tag) + ).addErrback( + ebFetch, tag) + + deferLater(reactor, + 2, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 1, self.mbox.signal_unread_to_ui) select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, imap4.IMAP4Server.arg_fetchatt) + def do_COPY(self, tag, messages, mailbox, uid=0): + from twisted.internet import reactor + imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) + deferLater(reactor, + 2, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 1, self.mbox.signal_unread_to_ui) + + select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_astring) + + def notifyNew(self, ignored): + """ + Notify new messages to listeners. + """ + self.mbox.notify_new() + + def _cbSelectWork(self, mbox, cmdName, tag): + """ + Callback for selectWork, patched to avoid conformance errors due to + incomplete UIDVALIDITY line. + """ + if mbox is None: + self.sendNegativeResponse(tag, 'No such mailbox') + return + if '\\noselect' in [s.lower() for s in mbox.getFlags()]: + self.sendNegativeResponse(tag, 'Mailbox cannot be selected') + return + + flags = mbox.getFlags() + self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS') + self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT') + self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags)) + + # Patched ------------------------------------------------------- + # imaptest was complaining about the incomplete line, we're adding + # "UIDs valid" here. + self.sendPositiveResponse( + None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity()) + # ---------------------------------------------------------------- + + s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY' + mbox.addListener(self) + self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName)) + self.state = 'select' + self.mbox = mbox + class IMAPAuthRealm(object): """ |