From a1a8f708867b2c60e07f18f59c774f134ac3be00 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 16 Jan 2014 22:01:20 -0400 Subject: Separate RECENT Flag to a mailbox document. this way we avoid a bunch of writes. --- mail/src/leap/mail/imap/fields.py | 2 + mail/src/leap/mail/imap/mailbox.py | 82 ++++++---------- mail/src/leap/mail/imap/messages.py | 163 +++++++++++++++++++++++--------- mail/src/leap/mail/imap/service/imap.py | 28 +++++- 4 files changed, 177 insertions(+), 98 deletions(-) (limited to 'mail') diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py index 3d2ac92..bc928a1 100644 --- a/mail/src/leap/mail/imap/fields.py +++ b/mail/src/leap/mail/imap/fields.py @@ -60,6 +60,7 @@ class WithMsgFields(object): SUBSCRIBED_KEY = "subscribed" RW_KEY = "rw" LAST_UID_KEY = "lastuid" + RECENTFLAGS_KEY = "rct" # Document Type, for indexing TYPE_KEY = "type" @@ -67,6 +68,7 @@ class WithMsgFields(object): TYPE_FLAGS_VAL = "flags" TYPE_HEADERS_VAL = "head" TYPE_CONTENT_VAL = "cnt" + TYPE_RECENT_VAL = "rct" INBOX_VAL = "inbox" diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index cf09bc4..bd69d12 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -398,18 +398,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser): flags = tuple(str(flag) for flag in flags) d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) - d.addCallback(self._notify_new) return d - @deferred def _do_add_message(self, message, flags, date, uid): """ Calls to the messageCollection add_msg method (deferred to thread). Invoked from addMessage. """ - self.messages.add_msg(message, flags=flags, date=date, uid=uid) + d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) + # XXX notify after batch APPEND? + d.addCallback(self.notify_new) + return d - def _notify_new(self, *args): + def notify_new(self, *args): """ Notify of new messages to all the listeners. @@ -463,8 +464,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if not self.isWriteable(): raise imap4.ReadOnlyMailbox d = self.messages.remove_all_deleted() - d.addCallback(self.messages.reset_last_uid) d.addCallback(self._expunge_cb) + d.addCallback(self.messages.reset_last_uid) return d def _bound_seq(self, messages_asked): @@ -480,7 +481,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): iter(messages_asked) except TypeError: # looks like we cannot iterate - messages_asked.last = self.last_uid + try: + messages_asked.last = self.last_uid + except ValueError: + pass return messages_asked def _filter_msg_seq(self, messages_asked): @@ -529,10 +533,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - def getmsg(msgid): - if self.isWriteable(): - deferLater(reactor, 2, self._unset_recent_flag, messages_asked) - return self.messages.get_msg_by_uid(msgid) + getmsg = lambda uid: self.messages.get_msg_by_uid(uid) # for sequence numbers (uid = 0) if sequence: @@ -544,7 +545,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # this should really be called as a final callback of # the do_FETCH method... - deferLater(reactor, 1, self._signal_unread_to_ui) + return result @deferred @@ -591,37 +592,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): msgid, all_flags[msgid])) for msgid in seq_messg) return result - @deferred - def _unset_recent_flag(self, message_uid): - """ - Unsets `Recent` flag from a tuple of messages. - Called from fetch. - - From RFC, about `Recent`: - - Message is "recently" arrived in this mailbox. This session - is the first session to have been notified about this - message; if the session is read-write, subsequent sessions - will not see \Recent set for this message. This flag can not - be altered by the client. - - If it is not possible to determine whether or not this - session is the first session to be notified about a message, - then that message SHOULD be considered recent. - - :param message_uids: the sequence of msg ids to update. - :type message_uids: sequence - """ - # XXX deprecate this! - # move to a mailbox-level call, and do it in batches! - - log.msg('unsetting recent flag: %s' % message_uid) - msg = self.messages.get_msg_by_uid(message_uid) - msg.removeFlags((fields.RECENT_FLAG,)) - self._signal_unread_to_ui() - - @deferred - def _signal_unread_to_ui(self): + def signal_unread_to_ui(self): """ Sends unread event to ui. """ @@ -687,8 +658,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result[msg_id] = msg.getFlags() # this should really be called as a final callback of - # the do_FETCH method... - deferLater(reactor, 1, self._signal_unread_to_ui) + # the do_STORE method... + # XXX --- + #deferLater(reactor, 1, self._signal_unread_to_ui) return result # ISearchableMailbox @@ -741,6 +713,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ Copy the given message object into this mailbox. """ + from twisted.internet import reactor uid_next = self.getUIDNext() msg = messageObject @@ -753,17 +726,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): new_fdoc = copy.deepcopy(fdoc.content) new_fdoc[self.UID_KEY] = uid_next new_fdoc[self.MBOX_KEY] = self.mbox + self._do_add_doc(new_fdoc) + deferLater(reactor, 1, self.notify_new) - d = self._do_add_doc(new_fdoc) - # XXX notify should be done when all the - # copies in the batch are finished. - d.addCallback(self._notify_new) - - @deferred def _do_add_doc(self, doc): """ - Defers the adding of a new doc. + Defer the adding of a new doc. + :param doc: document to be created in soledad. + :type doc: dict """ self._soledad.create_doc(doc) @@ -771,12 +742,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def deleteAllDocs(self): """ - Deletes all docs in this mailbox + Delete all docs in this mailbox """ docs = self.messages.get_all_docs() for doc in docs: self.messages._soledad.delete_doc(doc) + def unset_recent_flags(self, uids): + """ + Unset Recent flag for a sequence of UIDs. + """ + seq_messg = self._bound_seq(uids) + self.messages.unset_recent_flags(seq_messg) + def __repr__(self): """ Representation string for this mailbox. diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 1b996b6..6556b12 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -21,6 +21,7 @@ import copy import logging import re import time +import threading import StringIO from collections import defaultdict, namedtuple @@ -308,7 +309,7 @@ class LeapMessage(fields, MailParser, MBoxParser): implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox): + def __init__(self, soledad, uid, mbox, collection=None): """ Initializes a LeapMessage. @@ -318,11 +319,14 @@ class LeapMessage(fields, MailParser, MBoxParser): :type uid: int or basestring :param mbox: the mbox this message belongs to :type mbox: basestring + :param collection: a reference to the parent collection object + :type collection: MessageCollection """ MailParser.__init__(self) self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) + self._collection = collection self.__chash = None self.__bdoc = None @@ -373,7 +377,7 @@ class LeapMessage(fields, MailParser, MBoxParser): def getUID(self): """ - Retrieve the unique identifier associated with this message + Retrieve the unique identifier associated with this Message. :return: uid for this message :rtype: int @@ -382,18 +386,26 @@ class LeapMessage(fields, MailParser, MBoxParser): def getFlags(self): """ - Retrieve the flags associated with this message + Retrieve the flags associated with this Message. :return: The flags, represented as strings :rtype: tuple """ if self._uid is None: return [] + uid = self._uid flags = [] fdoc = self._fdoc if fdoc: flags = fdoc.content.get(self.FLAGS_KEY, None) + + msgcol = self._collection + + # We treat the recent flag specially: gotten from + # a mailbox-level document. + if msgcol and uid in msgcol.recent_flags: + flags.append(fields.RECENT_FLAG) if flags: flags = map(str, flags) return tuple(flags) @@ -414,7 +426,7 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: SoledadDocument """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - log.msg('setting flags: %s' % (self._uid)) + log.msg('setting flags: %s (%s)' % (self._uid, flags)) doc = self._fdoc if not doc: @@ -424,7 +436,6 @@ class LeapMessage(fields, MailParser, MBoxParser): return doc.content[self.FLAGS_KEY] = flags doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.RECENT_KEY] = self.RECENT_FLAG in flags doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags self._soledad.put_doc(doc) @@ -927,6 +938,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" CONTENT_DOC = "CONTENT" + RECENT_DOC = "RECENT" templates = { @@ -937,7 +949,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.CONTENT_HASH_KEY: "", fields.SEEN_KEY: False, - fields.RECENT_KEY: True, fields.DEL_KEY: False, fields.FLAGS_KEY: [], fields.MULTIPART_KEY: False, @@ -970,12 +981,25 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, fields.MULTIPART_KEY: False, }, + RECENT_DOC: { + fields.TYPE_KEY: fields.TYPE_RECENT_VAL, + fields.MBOX_KEY: fields.INBOX_VAL, + fields.RECENTFLAGS_KEY: [], + } } + _rdoc_lock = threading.Lock() + def __init__(self, mbox=None, soledad=None): """ Constructor for MessageCollection. + On initialization, we ensure that we have a document for + storing the recent flags. The nature of this flag make us wanting + to store the set of the UIDs with this flag at the level of the + MessageCollection for each mailbox, instead of treating them + as a property of each message. + :param mbox: the name of the mailbox. It is the name with which we filter the query over the messages database @@ -994,17 +1018,11 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # okay, all in order, keep going... self.mbox = self._parse_mailbox_name(mbox) self._soledad = soledad + self.__rflags = None self.initialize_db() - # I think of someone like nietzsche when reading this - - # this will be the producer that will enqueue the content - # to be processed serially by the consumer (the writer). We just - # need to `put` the new material on its plate. - - self.soledad_writer = MessageProducer( - SoledadDocWriter(soledad), - period=0.02) + # ensure that we have a recent-flags doc + self._get_or_create_rdoc() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -1017,6 +1035,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, raise TypeError("Improper type passed to _get_empty_doc") return copy.deepcopy(self.templates[_type]) + def _get_or_create_rdoc(self): + """ + Try to retrieve the recent-flags doc for this MessageCollection, + and create one if not found. + """ + rdoc = self._get_recent_doc() + if not rdoc: + rdoc = self._get_empty_doc(self.RECENT_DOC) + if self.mbox != fields.INBOX_VAL: + rdoc[fields.MBOX_KEY] = self.mbox + self._soledad.create_doc(rdoc) + def _do_parse(self, raw): """ Parse raw message and return it along with @@ -1161,7 +1191,8 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[key] = parts_map[key] del parts_map - # Saving + # Saving ---------------------------------------- + self.set_recent_flag(uid) # first, regular docs: flags and headers self._soledad.create_doc(fd) @@ -1203,6 +1234,76 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # getters: specific queries + def _get_recent_flags(self): + """ + An accessor for the recent-flags set for this mailbox. + """ + if not self.__rflags: + rdoc = self._get_recent_doc() + self.__rflags = set(rdoc.content.get( + fields.RECENTFLAGS_KEY, [])) + return self.__rflags + + def _set_recent_flags(self, value): + """ + Setter for the recent-flags set for this mailbox. + """ + rdoc = self._get_recent_doc() + newv = set(value) + self.__rflags = newv + + with self._rdoc_lock: + rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) + # XXX should deferLater 0 it? + self._soledad.put_doc(rdoc) + + recent_flags = property( + _get_recent_flags, _set_recent_flags, + doc="Set of UIDs with the recent flag for this mailbox.") + + def unset_recent_flags(self, uids): + """ + Unset Recent flag for a sequence of uids. + """ + self.recent_flags = self.recent_flags.difference( + set(uids)) + + def unset_recent_flag(self, uid): + """ + Unset Recent flag for a given uid. + """ + self.recent_flags = self.recent_flags.difference( + set([uid])) + + def set_recent_flag(self, uid): + """ + Set Recent flag for a given uid. + """ + self.recent_flags = self.recent_flags.union( + set([uid])) + + def _get_recent_doc(self): + """ + Get recent-flags document for this inbox. + """ + # TODO refactor this try-catch structure into a utility + try: + query = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_RECENT_VAL, self.mbox) + if query: + if len(query) > 1: + logger.warning( + "More than one rdoc found for this mbox, " + "we got a duplicate!!") + # XXX we could take action, like trigger a background + # process to kill dupes. + return query.pop() + else: + return None + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + def _get_fdoc_from_chash(self, chash): """ Return a flags document for this mailbox with a given chash. @@ -1287,6 +1388,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def get_msg_by_uid(self, uid): """ Retrieves a LeapMessage by UID. + This is used primarity in the Mailbox fetch and store methods. :param uid: the message uid to query by :type uid: int @@ -1295,7 +1397,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, or None if not found. :rtype: LeapMessage """ - msg = LeapMessage(self._soledad, uid, self.mbox) + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg @@ -1412,28 +1514,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # recent messages - def recent_iter(self): - """ - Get an iterator for the message UIDs with `recent` flag. - - :return: iterator through recent message docs - :rtype: iterable - """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_RECT_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1')) - - def get_recent(self): - """ - Get all messages with the `Recent` flag. - - :returns: a list of LeapMessages - :rtype: list - """ - return [LeapMessage(self._soledad, docid, self.mbox) - for docid in self.recent_iter()] - def count_recent(self): """ Count all messages with the `Recent` flag. @@ -1441,10 +1521,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :returns: count :rtype: int """ - count = self._soledad.get_count_from_index( - fields.TYPE_MBOX_RECT_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1') - return count + return len(self.recent_flags) # deleted messages diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 6e03456..a3ef098 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -24,6 +24,7 @@ import logging from twisted.internet.protocol import ServerFactory from twisted.internet.defer import maybeDeferred from twisted.internet.error import CannotListenError +from twisted.internet.task import deferLater from twisted.mail import imap4 from twisted.python import log from twisted import cred @@ -116,6 +117,7 @@ class LeapIMAPServer(imap4.IMAP4Server): Overwritten fetch dispatcher to use the fast fetch_flags method """ + from twisted.internet import reactor log.msg("LEAP Overwritten fetch...") if not query: self.sendPositiveResponse(tag, 'FETCH complete') @@ -124,8 +126,6 @@ class LeapIMAPServer(imap4.IMAP4Server): cbFetch = self._IMAP4Server__cbFetch ebFetch = self._IMAP4Server__ebFetch - print "QUERY: ", query - if len(query) == 1 and str(query[0]) == "flags": self._oldTimeout = self.setTimeout(None) # no need to call iter, we get a generator @@ -141,11 +141,32 @@ class LeapIMAPServer(imap4.IMAP4Server): self.mbox.fetch, messages, uid=uid ).addCallback( cbFetch, tag, query, uid - ).addErrback(ebFetch, tag) + ).addErrback( + ebFetch, tag) + + deferLater(reactor, + 2, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 1, self.mbox.signal_unread_to_ui) select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, imap4.IMAP4Server.arg_fetchatt) + def do_COPY(self, tag, messages, mailbox, uid=0): + from twisted.internet import reactor + imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) + deferLater(reactor, + 2, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 1, self.mbox.signal_unread_to_ui) + + select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_astring) + + def notifyNew(self, ignored): + """ + Notify new messages to listeners. + """ + self.mbox.notify_new() + def _cbSelectWork(self, mbox, cmdName, tag): """ Callback for selectWork, patched to avoid conformance errors due to @@ -177,6 +198,7 @@ class LeapIMAPServer(imap4.IMAP4Server): self.mbox = mbox + class IMAPAuthRealm(object): """ Dummy authentication realm. Do not use in production! -- cgit v1.2.3