diff options
author | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
---|---|---|
committer | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
commit | 74428b3176d286312f69e124d4d613c27a1ec93e (patch) | |
tree | eec0561e2184472dfedba3135a3d005efd478c34 /src/leap/mail/imap/mailbox.py | |
parent | 781bd2f4d2a047088d1a0ecd673a38c80ea0c0c0 (diff) | |
parent | 23e28bae2c3cb74e00e29ee8add0b73adeb65c2b (diff) |
Merge remote-tracking branch 'kali/feature/in-memory-store' into develop
Diffstat (limited to 'src/leap/mail/imap/mailbox.py')
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 295 |
1 files changed, 178 insertions, 117 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 0131ce0..c682578 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -22,6 +22,7 @@ import threading import logging import StringIO import cStringIO +import os from collections import defaultdict @@ -35,13 +36,21 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread +from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.parser import MBoxParser logger = logging.getLogger(__name__) +""" +If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid +notifying clients of new messages. Use during stress tests. +""" +NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) + class SoledadMailbox(WithMsgFields, MBoxParser): """ @@ -76,11 +85,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): CMD_UIDVALIDITY = "UIDVALIDITY" CMD_UNSEEN = "UNSEEN" + # FIXME we should turn this into a datastructure with limited capacity _listeners = defaultdict(set) next_uid_lock = threading.Lock() - def __init__(self, mbox, soledad=None, rw=1): + def __init__(self, mbox, soledad, memstore, rw=1): """ SoledadMailbox constructor. Needs to get passed a name, plus a Soledad instance. @@ -91,7 +101,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param soledad: a Soledad instance. :type soledad: Soledad - :param rw: read-and-write flags + :param memstore: a MemoryStore instance + :type memstore: MemoryStore + + :param rw: read-and-write flag for this mailbox :type rw: int """ leap_assert(mbox, "Need a mailbox name to initialize") @@ -105,13 +118,18 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.rw = rw self._soledad = soledad + self._memstore = memstore self.messages = MessageCollection( - mbox=mbox, soledad=self._soledad) + mbox=mbox, soledad=self._soledad, memstore=self._memstore) if not self.getFlags(): self.setFlags(self.INIT_FLAGS) + if self._memstore: + self.prime_known_uids_to_memstore() + self.prime_last_uid_to_memstore() + @property def listeners(self): """ @@ -125,6 +143,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ return self._listeners[self.mbox] + # TODO this grows too crazily when many instances are fired, like + # during imaptest stress testing. Should have a queue of limited size + # instead. def addListener(self, listener): """ Add a listener to the listeners queue. @@ -134,6 +155,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param listener: listener to add :type listener: an object that implements IMailboxListener """ + if not NOTIFY_NEW: + return + logger.debug('adding mailbox listener: %s' % listener) self.listeners.add(listener) @@ -146,6 +170,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) + # TODO move completely to soledadstore, under memstore reponsibility. def _get_mbox(self): """ Return mailbox document. @@ -221,48 +246,38 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def _get_last_uid(self): """ Return the last uid for this mailbox. + If we have a memory store, the last UID will be the highest + recorded UID in the message store, or a counter cached from + the mailbox document in soledad if this is higher. :return: the last uid for messages in this mailbox - :rtype: bool + :rtype: int """ - mbox = self._get_mbox() - if not mbox: - logger.error("We could not get a mbox!") - # XXX It looks like it has been corrupted. - # We need to be able to survive this. - return None - return mbox.content.get(self.LAST_UID_KEY, 1) + last = self._memstore.get_last_uid(self.mbox) + logger.debug("last uid for %s: %s (from memstore)" % ( + repr(self.mbox), last)) + return last - def _set_last_uid(self, uid): - """ - Sets the last uid for this mailbox. + last_uid = property( + _get_last_uid, doc="Last_UID attribute.") - :param uid: the uid to be set - :type uid: int + def prime_last_uid_to_memstore(self): """ - leap_assert(isinstance(uid, int), "uid has to be int") - mbox = self._get_mbox() - key = self.LAST_UID_KEY - - count = self.getMessageCount() - - # XXX safety-catch. If we do get duplicates, - # we want to avoid further duplication. - - if uid >= count: - value = uid - else: - # something is wrong, - # just set the last uid - # beyond the max msg count. - logger.debug("WRONG uid < count. Setting last uid to %s", count) - value = count + Prime memstore with last_uid value + """ + set_exist = set(self.messages.all_uid_iter()) + last = max(set_exist) if set_exist else 0 + logger.info("Priming Soledad last_uid to %s" % (last,)) + self._memstore.set_last_soledad_uid(self.mbox, last) - mbox.content[key] = value - self._soledad.put_doc(mbox) + def prime_known_uids_to_memstore(self): + """ + Prime memstore with the set of all known uids. - last_uid = property( - _get_last_uid, _set_last_uid, doc="Last_UID attribute.") + We do this to be able to filter the requests efficiently. + """ + known_uids = self.messages.all_soledad_uid_iter() + self._memstore.set_known_uids(self.mbox, known_uids) def getUIDValidity(self): """ @@ -304,8 +319,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: int """ with self.next_uid_lock: - self.last_uid += 1 - return self.last_uid + if self._memstore: + return self.last_uid + 1 + else: + # XXX after lock, it should be safe to + # return just the increment here, and + # have a different method that actually increments + # the counter when really adding. + self.last_uid += 1 + return self.last_uid def getMessageCount(self): """ @@ -366,7 +388,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if self.CMD_UIDNEXT in names: r[self.CMD_UIDNEXT] = self.last_uid + 1 if self.CMD_UIDVALIDITY in names: - r[self.CMD_UIDVALIDITY] = self.getUID() + r[self.CMD_UIDVALIDITY] = self.getUIDValidity() if self.CMD_UNSEEN in names: r[self.CMD_UNSEEN] = self.getUnseenCount() return defer.succeed(r) @@ -386,26 +408,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: a deferred that evals to None """ + # TODO have a look at the cases for internal date in the rfc if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): message = message.getvalue() - # XXX we should treat the message as an IMessage from here + + # XXX we could treat the message as an IMessage from here leap_assert_type(message, basestring) - uid_next = self.getUIDNext() - logger.debug('Adding msg with UID :%s' % uid_next) if flags is None: flags = tuple() else: flags = tuple(str(flag) for flag in flags) - d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) + d = self._do_add_message(message, flags=flags, date=date) return d - def _do_add_message(self, message, flags, date, uid): + def _do_add_message(self, message, flags, date): """ - Calls to the messageCollection add_msg method (deferred to thread). + Calls to the messageCollection add_msg method. Invoked from addMessage. """ - d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) + d = self.messages.add_msg(message, flags=flags, date=date) # XXX Removing notify temporarily. # This is interfering with imaptest results. I'm not clear if it's # because we clutter the logging or because the set of listeners is @@ -421,6 +443,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param args: ignored. """ + if not NOTIFY_NEW: + return exists = self.getMessageCount() recent = self.getRecentCount() logger.debug("NOTIFY: there are %s messages, %s recent" % ( @@ -445,6 +469,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # XXX removing the mailbox in situ for now, # we should postpone the removal + + # XXX move to memory store?? self._soledad.delete_doc(self._get_mbox()) def _close_cb(self, result): @@ -467,13 +493,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ if not self.isWriteable(): raise imap4.ReadOnlyMailbox - d = self.messages.remove_all_deleted() - d.addCallback(self._expunge_cb) - d.addCallback(self.messages.reset_last_uid) - - # XXX DEBUG ------------------- - # FIXME !!! - # XXX should remove the hdocset too!!! + d = defer.Deferred() + return self._memstore.expunge(self.mbox, d) + self._memstore.expunge(self.mbox) + d.addCallback(self._expunge_cb, d) return d def _bound_seq(self, messages_asked): @@ -509,7 +532,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred + @deferred_to_thread + #@profile def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. @@ -548,7 +572,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result = ((msgid, getmsg(msgid)) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -589,10 +613,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self.messages.all_flags() result = ((msgid, flagsPart( - msgid, all_flags[msgid])) for msgid in seq_messg) + msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -641,14 +665,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser): for msgid in seq_messg) return result - def signal_unread_to_ui(self): + def signal_unread_to_ui(self, *args, **kwargs): """ Sends unread event to ui. + + :param args: ignored + :param kwargs: ignored """ unseen = self.getUnseenCount() leap_events.signal(IMAP_UNREAD_MAIL, str(unseen)) - @deferred def store(self, messages_asked, flags, mode, uid): """ Sets the flags of one or more messages. @@ -670,49 +696,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise they are message sequence IDs. :type uid: bool - :return: A dict mapping message sequence numbers to sequences of - str representing the flags set on the message after this - operation has been performed. - :rtype: dict + :return: A deferred, that will be called with a dict mapping message + sequence numbers to sequences of str representing the flags + set on the message after this operation has been performed. + :rtype: deferred :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ + from twisted.internet import reactor + if not self.isWriteable(): + log.msg('read only mailbox!') + raise imap4.ReadOnlyMailbox + + d = defer.Deferred() + deferLater(reactor, 0, self._do_store, messages_asked, flags, + mode, uid, d) + return d + + def _do_store(self, messages_asked, flags, mode, uid, observer): + """ + Helper method, invoke set_flags method in the MessageCollection. + + See the documentation for the `store` method for the parameters. + + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag. + # XXX we should prevent cclient from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) - messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - - if not self.isWriteable(): - log.msg('read only mailbox!') - raise imap4.ReadOnlyMailbox - - result = {} - for msg_id in seq_messg: - log.msg("MSG ID = %s" % msg_id) - msg = self.messages.get_msg_by_uid(msg_id) - if not msg: - continue - if mode == 1: - msg.addFlags(flags) - elif mode == -1: - msg.removeFlags(flags) - elif mode == 0: - msg.setFlags(flags) - result[msg_id] = msg.getFlags() - - # After changing flags, we want to signal again to the - # UI because the number of unread might have changed. - # Hoever, we should probably limit this to INBOX only? - # this should really be called as a final callback of - # the do_STORE method... - from twisted.internet import reactor - deferLater(reactor, 1, self.signal_unread_to_ui) - return result + self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -760,44 +780,85 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier - @deferred - def copy(self, messageObject): + def copy(self, message): """ Copy the given message object into this mailbox. + + :param message: an IMessage implementor + :type message: LeapMessage + :return: a deferred that will be fired with the message + uid when the copy succeed. + :rtype: Deferred """ from twisted.internet import reactor - uid_next = self.getUIDNext() - msg = messageObject - # XXX DEBUG ---------------------------------------- - #print "copying MESSAGE from %s (%s) to %s (%s)" % ( - #msg._mbox, msg._uid, self.mbox, uid_next) + d = defer.Deferred() + # XXX this should not happen ... track it down, + # probably to FETCH... + if message is None: + log.msg("BUG: COPY found a None in passed message") + d.callback(None) + deferLater(reactor, 0, self._do_copy, message, d) + return d + + def _do_copy(self, message, observer): + """ + Call invoked from the deferLater in `copy`. This will + copy the flags and header documents, and pass them to the + `create_message` method in the MemoryStore, together with + the observer deferred that we've been passed along. + + :param message: an IMessage implementor + :type message: LeapMessage + :param observer: the deferred that will fire with the + UID of the message + :type observer: Deferred + """ + # XXX for clarity, this could be delegated to a + # MessageCollection mixin that implements copy too, and + # moved out of here. + msg = message + memstore = self._memstore # XXX should use a public api instead fdoc = msg._fdoc + hdoc = msg._hdoc if not fdoc: - logger.debug("Tried to copy a MSG with no fdoc") + logger.warning("Tried to copy a MSG with no fdoc") return - new_fdoc = copy.deepcopy(fdoc.content) - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = self.mbox - self._do_add_doc(new_fdoc) - # XXX should use a public api instead - hdoc = msg._hdoc - self.messages.add_hdocset_docid(hdoc.doc_id) + fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - deferLater(reactor, 1, self.notify_new) + # XXX is this hitting the db??? --- probably. + # We should profile after the pre-fetch. + dest_fdoc = memstore.get_fdoc_from_chash( + fdoc_chash, self.mbox) + exist = dest_fdoc and not empty(dest_fdoc.content) - def _do_add_doc(self, doc): - """ - Defer the adding of a new doc. + if exist: + # Should we signal error on the callback? + logger.warning("Destination message already exists!") - :param doc: document to be created in soledad. - :type doc: dict - """ - self._soledad.create_doc(doc) + # XXX I'm still not clear if we should raise the + # errback. This actually rases an ugly warning + # in some muas like thunderbird. I guess the user does + # not deserve that. + observer.callback(True) + else: + mbox = self.mbox + uid_next = memstore.increment_last_soledad_uid(mbox) + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = mbox + + # FIXME set recent! + + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper( + new_fdoc, hdoc.content), + observer=observer, + notify_on_disk=False) # convenience fun |