summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/mailbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/mailbox.py')
-rw-r--r--src/leap/mail/imap/mailbox.py295
1 files changed, 178 insertions, 117 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 0131ce0..c682578 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -22,6 +22,7 @@ import threading
import logging
import StringIO
import cStringIO
+import os
from collections import defaultdict
@@ -35,13 +36,21 @@ from zope.interface import implements
from leap.common import events as leap_events
from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
from leap.common.check import leap_assert, leap_assert_type
-from leap.mail.decorators import deferred
+from leap.mail.decorators import deferred_to_thread
+from leap.mail.utils import empty
from leap.mail.imap.fields import WithMsgFields, fields
from leap.mail.imap.messages import MessageCollection
+from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.parser import MBoxParser
logger = logging.getLogger(__name__)
+"""
+If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid
+notifying clients of new messages. Use during stress tests.
+"""
+NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
+
class SoledadMailbox(WithMsgFields, MBoxParser):
"""
@@ -76,11 +85,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
CMD_UIDVALIDITY = "UIDVALIDITY"
CMD_UNSEEN = "UNSEEN"
+ # FIXME we should turn this into a datastructure with limited capacity
_listeners = defaultdict(set)
next_uid_lock = threading.Lock()
- def __init__(self, mbox, soledad=None, rw=1):
+ def __init__(self, mbox, soledad, memstore, rw=1):
"""
SoledadMailbox constructor. Needs to get passed a name, plus a
Soledad instance.
@@ -91,7 +101,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param soledad: a Soledad instance.
:type soledad: Soledad
- :param rw: read-and-write flags
+ :param memstore: a MemoryStore instance
+ :type memstore: MemoryStore
+
+ :param rw: read-and-write flag for this mailbox
:type rw: int
"""
leap_assert(mbox, "Need a mailbox name to initialize")
@@ -105,13 +118,18 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.rw = rw
self._soledad = soledad
+ self._memstore = memstore
self.messages = MessageCollection(
- mbox=mbox, soledad=self._soledad)
+ mbox=mbox, soledad=self._soledad, memstore=self._memstore)
if not self.getFlags():
self.setFlags(self.INIT_FLAGS)
+ if self._memstore:
+ self.prime_known_uids_to_memstore()
+ self.prime_last_uid_to_memstore()
+
@property
def listeners(self):
"""
@@ -125,6 +143,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
return self._listeners[self.mbox]
+ # TODO this grows too crazily when many instances are fired, like
+ # during imaptest stress testing. Should have a queue of limited size
+ # instead.
def addListener(self, listener):
"""
Add a listener to the listeners queue.
@@ -134,6 +155,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param listener: listener to add
:type listener: an object that implements IMailboxListener
"""
+ if not NOTIFY_NEW:
+ return
+
logger.debug('adding mailbox listener: %s' % listener)
self.listeners.add(listener)
@@ -146,6 +170,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
self.listeners.remove(listener)
+ # TODO move completely to soledadstore, under memstore reponsibility.
def _get_mbox(self):
"""
Return mailbox document.
@@ -221,48 +246,38 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def _get_last_uid(self):
"""
Return the last uid for this mailbox.
+ If we have a memory store, the last UID will be the highest
+ recorded UID in the message store, or a counter cached from
+ the mailbox document in soledad if this is higher.
:return: the last uid for messages in this mailbox
- :rtype: bool
+ :rtype: int
"""
- mbox = self._get_mbox()
- if not mbox:
- logger.error("We could not get a mbox!")
- # XXX It looks like it has been corrupted.
- # We need to be able to survive this.
- return None
- return mbox.content.get(self.LAST_UID_KEY, 1)
+ last = self._memstore.get_last_uid(self.mbox)
+ logger.debug("last uid for %s: %s (from memstore)" % (
+ repr(self.mbox), last))
+ return last
- def _set_last_uid(self, uid):
- """
- Sets the last uid for this mailbox.
+ last_uid = property(
+ _get_last_uid, doc="Last_UID attribute.")
- :param uid: the uid to be set
- :type uid: int
+ def prime_last_uid_to_memstore(self):
"""
- leap_assert(isinstance(uid, int), "uid has to be int")
- mbox = self._get_mbox()
- key = self.LAST_UID_KEY
-
- count = self.getMessageCount()
-
- # XXX safety-catch. If we do get duplicates,
- # we want to avoid further duplication.
-
- if uid >= count:
- value = uid
- else:
- # something is wrong,
- # just set the last uid
- # beyond the max msg count.
- logger.debug("WRONG uid < count. Setting last uid to %s", count)
- value = count
+ Prime memstore with last_uid value
+ """
+ set_exist = set(self.messages.all_uid_iter())
+ last = max(set_exist) if set_exist else 0
+ logger.info("Priming Soledad last_uid to %s" % (last,))
+ self._memstore.set_last_soledad_uid(self.mbox, last)
- mbox.content[key] = value
- self._soledad.put_doc(mbox)
+ def prime_known_uids_to_memstore(self):
+ """
+ Prime memstore with the set of all known uids.
- last_uid = property(
- _get_last_uid, _set_last_uid, doc="Last_UID attribute.")
+ We do this to be able to filter the requests efficiently.
+ """
+ known_uids = self.messages.all_soledad_uid_iter()
+ self._memstore.set_known_uids(self.mbox, known_uids)
def getUIDValidity(self):
"""
@@ -304,8 +319,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:rtype: int
"""
with self.next_uid_lock:
- self.last_uid += 1
- return self.last_uid
+ if self._memstore:
+ return self.last_uid + 1
+ else:
+ # XXX after lock, it should be safe to
+ # return just the increment here, and
+ # have a different method that actually increments
+ # the counter when really adding.
+ self.last_uid += 1
+ return self.last_uid
def getMessageCount(self):
"""
@@ -366,7 +388,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if self.CMD_UIDNEXT in names:
r[self.CMD_UIDNEXT] = self.last_uid + 1
if self.CMD_UIDVALIDITY in names:
- r[self.CMD_UIDVALIDITY] = self.getUID()
+ r[self.CMD_UIDVALIDITY] = self.getUIDValidity()
if self.CMD_UNSEEN in names:
r[self.CMD_UNSEEN] = self.getUnseenCount()
return defer.succeed(r)
@@ -386,26 +408,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: a deferred that evals to None
"""
+ # TODO have a look at the cases for internal date in the rfc
if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)):
message = message.getvalue()
- # XXX we should treat the message as an IMessage from here
+
+ # XXX we could treat the message as an IMessage from here
leap_assert_type(message, basestring)
- uid_next = self.getUIDNext()
- logger.debug('Adding msg with UID :%s' % uid_next)
if flags is None:
flags = tuple()
else:
flags = tuple(str(flag) for flag in flags)
- d = self._do_add_message(message, flags=flags, date=date, uid=uid_next)
+ d = self._do_add_message(message, flags=flags, date=date)
return d
- def _do_add_message(self, message, flags, date, uid):
+ def _do_add_message(self, message, flags, date):
"""
- Calls to the messageCollection add_msg method (deferred to thread).
+ Calls to the messageCollection add_msg method.
Invoked from addMessage.
"""
- d = self.messages.add_msg(message, flags=flags, date=date, uid=uid)
+ d = self.messages.add_msg(message, flags=flags, date=date)
# XXX Removing notify temporarily.
# This is interfering with imaptest results. I'm not clear if it's
# because we clutter the logging or because the set of listeners is
@@ -421,6 +443,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param args: ignored.
"""
+ if not NOTIFY_NEW:
+ return
exists = self.getMessageCount()
recent = self.getRecentCount()
logger.debug("NOTIFY: there are %s messages, %s recent" % (
@@ -445,6 +469,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# XXX removing the mailbox in situ for now,
# we should postpone the removal
+
+ # XXX move to memory store??
self._soledad.delete_doc(self._get_mbox())
def _close_cb(self, result):
@@ -467,13 +493,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
- d = self.messages.remove_all_deleted()
- d.addCallback(self._expunge_cb)
- d.addCallback(self.messages.reset_last_uid)
-
- # XXX DEBUG -------------------
- # FIXME !!!
- # XXX should remove the hdocset too!!!
+ d = defer.Deferred()
+ return self._memstore.expunge(self.mbox, d)
+ self._memstore.expunge(self.mbox)
+ d.addCallback(self._expunge_cb, d)
return d
def _bound_seq(self, messages_asked):
@@ -509,7 +532,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
seq_messg = set_asked.intersection(set_exist)
return seq_messg
- @deferred
+ @deferred_to_thread
+ #@profile
def fetch(self, messages_asked, uid):
"""
Retrieve one or more messages in this mailbox.
@@ -548,7 +572,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -589,10 +613,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
all_flags = self.messages.all_flags()
result = ((msgid, flagsPart(
- msgid, all_flags[msgid])) for msgid in seq_messg)
+ msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
return result
- @deferred
+ @deferred_to_thread
def fetch_headers(self, messages_asked, uid):
"""
A fast method to fetch all headers, tricking just the
@@ -641,14 +665,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
for msgid in seq_messg)
return result
- def signal_unread_to_ui(self):
+ def signal_unread_to_ui(self, *args, **kwargs):
"""
Sends unread event to ui.
+
+ :param args: ignored
+ :param kwargs: ignored
"""
unseen = self.getUnseenCount()
leap_events.signal(IMAP_UNREAD_MAIL, str(unseen))
- @deferred
def store(self, messages_asked, flags, mode, uid):
"""
Sets the flags of one or more messages.
@@ -670,49 +696,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise they are message sequence IDs.
:type uid: bool
- :return: A dict mapping message sequence numbers to sequences of
- str representing the flags set on the message after this
- operation has been performed.
- :rtype: dict
+ :return: A deferred, that will be called with a dict mapping message
+ sequence numbers to sequences of str representing the flags
+ set on the message after this operation has been performed.
+ :rtype: deferred
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
+ from twisted.internet import reactor
+ if not self.isWriteable():
+ log.msg('read only mailbox!')
+ raise imap4.ReadOnlyMailbox
+
+ d = defer.Deferred()
+ deferLater(reactor, 0, self._do_store, messages_asked, flags,
+ mode, uid, d)
+ return d
+
+ def _do_store(self, messages_asked, flags, mode, uid, observer):
+ """
+ Helper method, invoke set_flags method in the MessageCollection.
+
+ See the documentation for the `store` method for the parameters.
+
+ :param observer: a deferred that will be called with the dictionary
+ mapping UIDs to flags after the operation has been
+ done.
+ :type observer: deferred
+ """
# XXX implement also sequence (uid = 0)
- # XXX we should prevent cclient from setting Recent flag.
+ # XXX we should prevent cclient from setting Recent flag?
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
-
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
-
- if not self.isWriteable():
- log.msg('read only mailbox!')
- raise imap4.ReadOnlyMailbox
-
- result = {}
- for msg_id in seq_messg:
- log.msg("MSG ID = %s" % msg_id)
- msg = self.messages.get_msg_by_uid(msg_id)
- if not msg:
- continue
- if mode == 1:
- msg.addFlags(flags)
- elif mode == -1:
- msg.removeFlags(flags)
- elif mode == 0:
- msg.setFlags(flags)
- result[msg_id] = msg.getFlags()
-
- # After changing flags, we want to signal again to the
- # UI because the number of unread might have changed.
- # Hoever, we should probably limit this to INBOX only?
- # this should really be called as a final callback of
- # the do_STORE method...
- from twisted.internet import reactor
- deferLater(reactor, 1, self.signal_unread_to_ui)
- return result
+ self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)
# ISearchableMailbox
@@ -760,44 +780,85 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# IMessageCopier
- @deferred
- def copy(self, messageObject):
+ def copy(self, message):
"""
Copy the given message object into this mailbox.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :return: a deferred that will be fired with the message
+ uid when the copy succeed.
+ :rtype: Deferred
"""
from twisted.internet import reactor
- uid_next = self.getUIDNext()
- msg = messageObject
- # XXX DEBUG ----------------------------------------
- #print "copying MESSAGE from %s (%s) to %s (%s)" % (
- #msg._mbox, msg._uid, self.mbox, uid_next)
+ d = defer.Deferred()
+ # XXX this should not happen ... track it down,
+ # probably to FETCH...
+ if message is None:
+ log.msg("BUG: COPY found a None in passed message")
+ d.callback(None)
+ deferLater(reactor, 0, self._do_copy, message, d)
+ return d
+
+ def _do_copy(self, message, observer):
+ """
+ Call invoked from the deferLater in `copy`. This will
+ copy the flags and header documents, and pass them to the
+ `create_message` method in the MemoryStore, together with
+ the observer deferred that we've been passed along.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :param observer: the deferred that will fire with the
+ UID of the message
+ :type observer: Deferred
+ """
+ # XXX for clarity, this could be delegated to a
+ # MessageCollection mixin that implements copy too, and
+ # moved out of here.
+ msg = message
+ memstore = self._memstore
# XXX should use a public api instead
fdoc = msg._fdoc
+ hdoc = msg._hdoc
if not fdoc:
- logger.debug("Tried to copy a MSG with no fdoc")
+ logger.warning("Tried to copy a MSG with no fdoc")
return
-
new_fdoc = copy.deepcopy(fdoc.content)
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = self.mbox
- self._do_add_doc(new_fdoc)
- # XXX should use a public api instead
- hdoc = msg._hdoc
- self.messages.add_hdocset_docid(hdoc.doc_id)
+ fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
- deferLater(reactor, 1, self.notify_new)
+ # XXX is this hitting the db??? --- probably.
+ # We should profile after the pre-fetch.
+ dest_fdoc = memstore.get_fdoc_from_chash(
+ fdoc_chash, self.mbox)
+ exist = dest_fdoc and not empty(dest_fdoc.content)
- def _do_add_doc(self, doc):
- """
- Defer the adding of a new doc.
+ if exist:
+ # Should we signal error on the callback?
+ logger.warning("Destination message already exists!")
- :param doc: document to be created in soledad.
- :type doc: dict
- """
- self._soledad.create_doc(doc)
+ # XXX I'm still not clear if we should raise the
+ # errback. This actually rases an ugly warning
+ # in some muas like thunderbird. I guess the user does
+ # not deserve that.
+ observer.callback(True)
+ else:
+ mbox = self.mbox
+ uid_next = memstore.increment_last_soledad_uid(mbox)
+ new_fdoc[self.UID_KEY] = uid_next
+ new_fdoc[self.MBOX_KEY] = mbox
+
+ # FIXME set recent!
+
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(
+ new_fdoc, hdoc.content),
+ observer=observer,
+ notify_on_disk=False)
# convenience fun