diff options
Diffstat (limited to 'src/leap/mail/imap/mailbox.py')
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 275 |
1 files changed, 179 insertions, 96 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index d8af0a5..57505f0 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -50,6 +50,25 @@ If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid notifying clients of new messages. Use during stress tests. """ NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) +PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False) + +if PROFILE_CMD: + import time + + def _debugProfiling(result, cmdname, start): + took = (time.time() - start) * 1000 + log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec") + return result + + def do_profile_cmd(d, name): + """ + Add the profiling debug to the passed callback. + :param d: deferred + :param name: name of the command + :type name: str + """ + d.addCallback(_debugProfiling, name, time.time()) + d.addErrback(lambda f: log.msg(f.getTraceback())) class SoledadMailbox(WithMsgFields, MBoxParser): @@ -89,6 +108,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): _listeners = defaultdict(set) next_uid_lock = threading.Lock() + last_uid_lock = threading.Lock() + + # TODO unify all the `primed` dicts + _fdoc_primed = {} + _last_uid_primed = {} + _known_uids_primed = {} def __init__(self, mbox, soledad, memstore, rw=1): """ @@ -107,6 +132,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param rw: read-and-write flag for this mailbox :type rw: int """ + logger.debug("Initializing mailbox %r" % (mbox,)) leap_assert(mbox, "Need a mailbox name to initialize") leap_assert(soledad, "Need a soledad instance to initialize") @@ -123,12 +149,24 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.messages = MessageCollection( mbox=mbox, soledad=self._soledad, memstore=self._memstore) + # XXX careful with this get/set (it would be + # hitting db unconditionally, move to memstore too) + # Now it's returning a fixed amount of flags from mem + # as a workaround. if not self.getFlags(): self.setFlags(self.INIT_FLAGS) if self._memstore: self.prime_known_uids_to_memstore() self.prime_last_uid_to_memstore() + self.prime_flag_docs_to_memstore() + + from twisted.internet import reactor + self.reactor = reactor + + # purge memstore from empty fdocs. + self._memstore.purge_fdoc_store(mbox) + logger.debug("DONE initializing mailbox %r" % (mbox,)) @property def listeners(self): @@ -170,8 +208,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) - # TODO move completely to soledadstore, under memstore reponsibility. - def _get_mbox(self): + def _get_mbox_doc(self): """ Return mailbox document. @@ -179,14 +216,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): the query failed. :rtype: SoledadDocument or None. """ - try: - query = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_MBOX_VAL, self.mbox) - if query: - return query.pop() - except Exception as exc: - logger.exception("Unhandled error %r" % exc) + return self._memstore.get_mbox_doc(self.mbox) def getFlags(self): """ @@ -195,12 +225,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :returns: tuple of flags for this mailbox :rtype: tuple of str """ - mbox = self._get_mbox() - if not mbox: - return None - flags = mbox.content.get(self.FLAGS_KEY, []) + flags = self.INIT_FLAGS + + # XXX returning fixed flags always + # Since I have not found a case where the client + # wants to modify this, as a way of speeding up + # selects. To do it right, we probably should keep + # track of the set of all flags used by msgs + # in this mailbox. Does it matter? + #mbox = self._get_mbox_doc() + #if not mbox: + #return None + #flags = mbox.content.get(self.FLAGS_KEY, []) return map(str, flags) + # XXX move to memstore->soledadstore def setFlags(self, flags): """ Sets flags for this mailbox. @@ -210,10 +249,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ leap_assert(isinstance(flags, tuple), "flags expected to be a tuple") - mbox = self._get_mbox() + mbox = self._get_mbox_doc() if not mbox: return None mbox.content[self.FLAGS_KEY] = map(str, flags) + logger.debug("Writing mbox document for %r to Soledad" + % (self.mbox,)) self._soledad.put_doc(mbox) # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG. @@ -225,8 +266,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: True if the mailbox is closed :rtype: bool """ - mbox = self._get_mbox() - return mbox.content.get(self.CLOSED_KEY, False) + return self._memstore.get_mbox_closed(self.mbox) def _set_closed(self, closed): """ @@ -235,10 +275,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param closed: the state to be set :type closed: bool """ - leap_assert(isinstance(closed, bool), "closed needs to be boolean") - mbox = self._get_mbox() - mbox.content[self.CLOSED_KEY] = closed - self._soledad.put_doc(mbox) + self._memstore.set_mbox_closed(self.mbox, closed) closed = property( _get_closed, _set_closed, doc="Closed attribute.") @@ -265,10 +302,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ Prime memstore with last_uid value """ - set_exist = set(self.messages.all_uid_iter()) - last = max(set_exist) if set_exist else 0 - logger.info("Priming Soledad last_uid to %s" % (last,)) - self._memstore.set_last_soledad_uid(self.mbox, last) + primed = self._last_uid_primed.get(self.mbox, False) + if not primed: + mbox = self._get_mbox_doc() + last = mbox.content.get('lastuid', 0) + logger.info("Priming Soledad last_uid to %s" % (last,)) + self._memstore.set_last_soledad_uid(self.mbox, last) + self._last_uid_primed[self.mbox] = True def prime_known_uids_to_memstore(self): """ @@ -276,8 +316,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser): We do this to be able to filter the requests efficiently. """ - known_uids = self.messages.all_soledad_uid_iter() - self._memstore.set_known_uids(self.mbox, known_uids) + primed = self._known_uids_primed.get(self.mbox, False) + if not primed: + known_uids = self.messages.all_soledad_uid_iter() + self._memstore.set_known_uids(self.mbox, known_uids) + self._known_uids_primed[self.mbox] = True + + def prime_flag_docs_to_memstore(self): + """ + Prime memstore with all the flags documents. + """ + primed = self._fdoc_primed.get(self.mbox, False) + if not primed: + all_flag_docs = self.messages.get_all_soledad_flag_docs() + self._memstore.load_flag_docs(self.mbox, all_flag_docs) + self._fdoc_primed[self.mbox] = True def getUIDValidity(self): """ @@ -286,7 +339,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: unique validity identifier :rtype: int """ - mbox = self._get_mbox() + mbox = self._get_mbox_doc() return mbox.content.get(self.CREATED_KEY, 1) def getUID(self, message): @@ -420,6 +473,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): flags = tuple(str(flag) for flag in flags) d = self._do_add_message(message, flags=flags, date=date) + if PROFILE_CMD: + do_profile_cmd(d, "APPEND") + # XXX should notify here probably return d def _do_add_message(self, message, flags, date): @@ -428,15 +484,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): Invoked from addMessage. """ d = self.messages.add_msg(message, flags=flags, date=date) - # XXX Removing notify temporarily. - # This is interfering with imaptest results. I'm not clear if it's - # because we clutter the logging or because the set of listeners is - # ever-growing. We should come up with some smart way of dealing with - # it, or maybe just disabling it using an environmental variable since - # we will only have just a few listeners in the regular desktop case. - #d.addCallback(self.notify_new) return d + @deferred_to_thread def notify_new(self, *args): """ Notify of new messages to all the listeners. @@ -447,12 +497,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): return exists = self.getMessageCount() recent = self.getRecentCount() - logger.debug("NOTIFY: there are %s messages, %s recent" % ( - exists, - recent)) + logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( + self.mbox, exists, recent)) for l in self.listeners: - logger.debug('notifying...') l.newMessages(exists, recent) # commands, do not rename methods @@ -471,7 +519,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # we should postpone the removal # XXX move to memory store?? - self._soledad.delete_doc(self._get_mbox()) + self._soledad.delete_doc(self._get_mbox_doc()) def _close_cb(self, result): self.closed = True @@ -527,8 +575,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred_to_thread - #@profile def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. @@ -544,6 +590,27 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise. :type uid: bool + :rtype: deferred + """ + d = defer.Deferred() + self.reactor.callInThread(self._do_fetch, messages_asked, uid, d) + if PROFILE_CMD: + do_profile_cmd(d, "FETCH") + return d + + # called in thread + def _do_fetch(self, messages_asked, uid, d): + """ + :param messages_asked: IDs of the messages to retrieve information + about + :type messages_asked: MessageSet + + :param uid: If true, the IDs are UIDs. They are message sequence IDs + otherwise. + :type uid: bool + :param d: deferred whose callback will be called with result. + :type d: Deferred + :rtype: A tuple of two-tuples of message sequence numbers and LeapMessage """ @@ -564,10 +631,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): logger.debug("Getting msg by index: INEFFICIENT call!") raise NotImplementedError else: - result = ((msgid, getmsg(msgid)) for msgid in seq_messg) - return result + got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg) + result = ((msgid, msg) for msgid, msg in got_msg + if msg is not None) + self.reactor.callLater(0, self.unset_recent_flags, seq_messg) + self.reactor.callFromThread(d.callback, result) - @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -606,12 +675,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - all_flags = self.messages.all_flags() + all_flags = self._memstore.all_flags(self.mbox) result = ((msgid, flagsPart( msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -636,6 +704,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): MessagePart. :rtype: tuple """ + # TODO how often is thunderbird doing this? + class headersPart(object): def __init__(self, uid, headers): self.uid = uid @@ -653,10 +723,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - all_chash = self.messages.all_flags_chash() all_headers = self.messages.all_headers() result = ((msgid, headersPart( - msgid, all_headers.get(all_chash.get(msgid, 'nil'), {}))) + msgid, all_headers.get(msgid, {}))) for msgid in seq_messg) return result @@ -699,14 +768,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ - from twisted.internet import reactor if not self.isWriteable(): log.msg('read only mailbox!') raise imap4.ReadOnlyMailbox d = defer.Deferred() - deferLater(reactor, 0, self._do_store, messages_asked, flags, - mode, uid, d) + self.reactor.callLater(0, self._do_store, messages_asked, flags, + mode, uid, d) + if PROFILE_CMD: + do_profile_cmd(d, "STORE") return d def _do_store(self, messages_asked, flags, mode, uid, observer): @@ -721,7 +791,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :type observer: deferred """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag? + # XXX we should prevent client from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) @@ -785,15 +855,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): uid when the copy succeed. :rtype: Deferred """ - from twisted.internet import reactor - d = defer.Deferred() - # XXX this should not happen ... track it down, - # probably to FETCH... - if message is None: - log.msg("BUG: COPY found a None in passed message") - d.callback(None) - deferLater(reactor, 0, self._do_copy, message, d) + if PROFILE_CMD: + do_profile_cmd(d, "COPY") + deferLater(self.reactor, 0, self._do_copy, message, d) return d def _do_copy(self, message, observer): @@ -809,51 +874,70 @@ class SoledadMailbox(WithMsgFields, MBoxParser): UID of the message :type observer: Deferred """ + memstore = self._memstore + + def createCopy(result): + exist, new_fdoc = result + if exist: + # Should we signal error on the callback? + logger.warning("Destination message already exists!") + + # XXX I'm not sure if we should raise the + # errback. This actually rases an ugly warning + # in some muas like thunderbird. + # UID 0 seems a good convention for no uid. + observer.callback(0) + else: + mbox = self.mbox + uid_next = memstore.increment_last_soledad_uid(mbox) + + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = mbox + + flags = list(new_fdoc[self.FLAGS_KEY]) + flags.append(fields.RECENT_FLAG) + new_fdoc[self.FLAGS_KEY] = tuple(set(flags)) + + # FIXME set recent! + + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper(new_fdoc), + observer=observer, + notify_on_disk=False) + + d = self._get_msg_copy(message) + d.addCallback(createCopy) + d.addErrback(lambda f: log.msg(f.getTraceback())) + + @deferred_to_thread + def _get_msg_copy(self, message): + """ + Get a copy of the fdoc for this message, and check whether + it already exists. + + :param message: an IMessage implementor + :type message: LeapMessage + :return: exist, new_fdoc + :rtype: tuple + """ # XXX for clarity, this could be delegated to a # MessageCollection mixin that implements copy too, and # moved out of here. msg = message memstore = self._memstore - # XXX should use a public api instead - fdoc = msg._fdoc - hdoc = msg._hdoc - if not fdoc: + if empty(msg.fdoc): logger.warning("Tried to copy a MSG with no fdoc") return - new_fdoc = copy.deepcopy(fdoc.content) - + new_fdoc = copy.deepcopy(msg.fdoc.content) fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - # XXX is this hitting the db??? --- probably. - # We should profile after the pre-fetch. dest_fdoc = memstore.get_fdoc_from_chash( fdoc_chash, self.mbox) - exist = dest_fdoc and not empty(dest_fdoc.content) - - if exist: - # Should we signal error on the callback? - logger.warning("Destination message already exists!") - - # XXX I'm still not clear if we should raise the - # errback. This actually rases an ugly warning - # in some muas like thunderbird. I guess the user does - # not deserve that. - observer.callback(True) - else: - mbox = self.mbox - uid_next = memstore.increment_last_soledad_uid(mbox) - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = mbox - - # FIXME set recent! - self._memstore.create_message( - self.mbox, uid_next, - MessageWrapper( - new_fdoc, hdoc.content), - observer=observer, - notify_on_disk=False) + exist = not empty(dest_fdoc) + return exist, new_fdoc # convenience fun @@ -865,12 +949,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser): for doc in docs: self.messages._soledad.delete_doc(doc) - def unset_recent_flags(self, uids): + def unset_recent_flags(self, uid_seq): """ Unset Recent flag for a sequence of UIDs. """ - seq_messg = self._bound_seq(uids) - self.messages.unset_recent_flags(seq_messg) + self.messages.unset_recent_flags(uid_seq) def __repr__(self): """ |