summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/mailbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/mailbox.py')
-rw-r--r--src/leap/mail/imap/mailbox.py275
1 files changed, 179 insertions, 96 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index d8af0a5..57505f0 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -50,6 +50,25 @@ If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid
notifying clients of new messages. Use during stress tests.
"""
NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
+PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False)
+
+if PROFILE_CMD:
+ import time
+
+ def _debugProfiling(result, cmdname, start):
+ took = (time.time() - start) * 1000
+ log.msg("CMD " + cmdname + " TOOK: " + str(took) + " msec")
+ return result
+
+ def do_profile_cmd(d, name):
+ """
+ Add the profiling debug to the passed callback.
+ :param d: deferred
+ :param name: name of the command
+ :type name: str
+ """
+ d.addCallback(_debugProfiling, name, time.time())
+ d.addErrback(lambda f: log.msg(f.getTraceback()))
class SoledadMailbox(WithMsgFields, MBoxParser):
@@ -89,6 +108,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
_listeners = defaultdict(set)
next_uid_lock = threading.Lock()
+ last_uid_lock = threading.Lock()
+
+ # TODO unify all the `primed` dicts
+ _fdoc_primed = {}
+ _last_uid_primed = {}
+ _known_uids_primed = {}
def __init__(self, mbox, soledad, memstore, rw=1):
"""
@@ -107,6 +132,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param rw: read-and-write flag for this mailbox
:type rw: int
"""
+ logger.debug("Initializing mailbox %r" % (mbox,))
leap_assert(mbox, "Need a mailbox name to initialize")
leap_assert(soledad, "Need a soledad instance to initialize")
@@ -123,12 +149,24 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
self.messages = MessageCollection(
mbox=mbox, soledad=self._soledad, memstore=self._memstore)
+ # XXX careful with this get/set (it would be
+ # hitting db unconditionally, move to memstore too)
+ # Now it's returning a fixed amount of flags from mem
+ # as a workaround.
if not self.getFlags():
self.setFlags(self.INIT_FLAGS)
if self._memstore:
self.prime_known_uids_to_memstore()
self.prime_last_uid_to_memstore()
+ self.prime_flag_docs_to_memstore()
+
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ # purge memstore from empty fdocs.
+ self._memstore.purge_fdoc_store(mbox)
+ logger.debug("DONE initializing mailbox %r" % (mbox,))
@property
def listeners(self):
@@ -170,8 +208,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
self.listeners.remove(listener)
- # TODO move completely to soledadstore, under memstore reponsibility.
- def _get_mbox(self):
+ def _get_mbox_doc(self):
"""
Return mailbox document.
@@ -179,14 +216,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
the query failed.
:rtype: SoledadDocument or None.
"""
- try:
- query = self._soledad.get_from_index(
- fields.TYPE_MBOX_IDX,
- fields.TYPE_MBOX_VAL, self.mbox)
- if query:
- return query.pop()
- except Exception as exc:
- logger.exception("Unhandled error %r" % exc)
+ return self._memstore.get_mbox_doc(self.mbox)
def getFlags(self):
"""
@@ -195,12 +225,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:returns: tuple of flags for this mailbox
:rtype: tuple of str
"""
- mbox = self._get_mbox()
- if not mbox:
- return None
- flags = mbox.content.get(self.FLAGS_KEY, [])
+ flags = self.INIT_FLAGS
+
+ # XXX returning fixed flags always
+ # Since I have not found a case where the client
+ # wants to modify this, as a way of speeding up
+ # selects. To do it right, we probably should keep
+ # track of the set of all flags used by msgs
+ # in this mailbox. Does it matter?
+ #mbox = self._get_mbox_doc()
+ #if not mbox:
+ #return None
+ #flags = mbox.content.get(self.FLAGS_KEY, [])
return map(str, flags)
+ # XXX move to memstore->soledadstore
def setFlags(self, flags):
"""
Sets flags for this mailbox.
@@ -210,10 +249,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
leap_assert(isinstance(flags, tuple),
"flags expected to be a tuple")
- mbox = self._get_mbox()
+ mbox = self._get_mbox_doc()
if not mbox:
return None
mbox.content[self.FLAGS_KEY] = map(str, flags)
+ logger.debug("Writing mbox document for %r to Soledad"
+ % (self.mbox,))
self._soledad.put_doc(mbox)
# XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG.
@@ -225,8 +266,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: True if the mailbox is closed
:rtype: bool
"""
- mbox = self._get_mbox()
- return mbox.content.get(self.CLOSED_KEY, False)
+ return self._memstore.get_mbox_closed(self.mbox)
def _set_closed(self, closed):
"""
@@ -235,10 +275,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param closed: the state to be set
:type closed: bool
"""
- leap_assert(isinstance(closed, bool), "closed needs to be boolean")
- mbox = self._get_mbox()
- mbox.content[self.CLOSED_KEY] = closed
- self._soledad.put_doc(mbox)
+ self._memstore.set_mbox_closed(self.mbox, closed)
closed = property(
_get_closed, _set_closed, doc="Closed attribute.")
@@ -265,10 +302,13 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
Prime memstore with last_uid value
"""
- set_exist = set(self.messages.all_uid_iter())
- last = max(set_exist) if set_exist else 0
- logger.info("Priming Soledad last_uid to %s" % (last,))
- self._memstore.set_last_soledad_uid(self.mbox, last)
+ primed = self._last_uid_primed.get(self.mbox, False)
+ if not primed:
+ mbox = self._get_mbox_doc()
+ last = mbox.content.get('lastuid', 0)
+ logger.info("Priming Soledad last_uid to %s" % (last,))
+ self._memstore.set_last_soledad_uid(self.mbox, last)
+ self._last_uid_primed[self.mbox] = True
def prime_known_uids_to_memstore(self):
"""
@@ -276,8 +316,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
We do this to be able to filter the requests efficiently.
"""
- known_uids = self.messages.all_soledad_uid_iter()
- self._memstore.set_known_uids(self.mbox, known_uids)
+ primed = self._known_uids_primed.get(self.mbox, False)
+ if not primed:
+ known_uids = self.messages.all_soledad_uid_iter()
+ self._memstore.set_known_uids(self.mbox, known_uids)
+ self._known_uids_primed[self.mbox] = True
+
+ def prime_flag_docs_to_memstore(self):
+ """
+ Prime memstore with all the flags documents.
+ """
+ primed = self._fdoc_primed.get(self.mbox, False)
+ if not primed:
+ all_flag_docs = self.messages.get_all_soledad_flag_docs()
+ self._memstore.load_flag_docs(self.mbox, all_flag_docs)
+ self._fdoc_primed[self.mbox] = True
def getUIDValidity(self):
"""
@@ -286,7 +339,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: unique validity identifier
:rtype: int
"""
- mbox = self._get_mbox()
+ mbox = self._get_mbox_doc()
return mbox.content.get(self.CREATED_KEY, 1)
def getUID(self, message):
@@ -420,6 +473,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
flags = tuple(str(flag) for flag in flags)
d = self._do_add_message(message, flags=flags, date=date)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "APPEND")
+ # XXX should notify here probably
return d
def _do_add_message(self, message, flags, date):
@@ -428,15 +484,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
Invoked from addMessage.
"""
d = self.messages.add_msg(message, flags=flags, date=date)
- # XXX Removing notify temporarily.
- # This is interfering with imaptest results. I'm not clear if it's
- # because we clutter the logging or because the set of listeners is
- # ever-growing. We should come up with some smart way of dealing with
- # it, or maybe just disabling it using an environmental variable since
- # we will only have just a few listeners in the regular desktop case.
- #d.addCallback(self.notify_new)
return d
+ @deferred_to_thread
def notify_new(self, *args):
"""
Notify of new messages to all the listeners.
@@ -447,12 +497,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
return
exists = self.getMessageCount()
recent = self.getRecentCount()
- logger.debug("NOTIFY: there are %s messages, %s recent" % (
- exists,
- recent))
+ logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
+ self.mbox, exists, recent))
for l in self.listeners:
- logger.debug('notifying...')
l.newMessages(exists, recent)
# commands, do not rename methods
@@ -471,7 +519,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
# we should postpone the removal
# XXX move to memory store??
- self._soledad.delete_doc(self._get_mbox())
+ self._soledad.delete_doc(self._get_mbox_doc())
def _close_cb(self, result):
self.closed = True
@@ -527,8 +575,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
seq_messg = set_asked.intersection(set_exist)
return seq_messg
- @deferred_to_thread
- #@profile
def fetch(self, messages_asked, uid):
"""
Retrieve one or more messages in this mailbox.
@@ -544,6 +590,27 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise.
:type uid: bool
+ :rtype: deferred
+ """
+ d = defer.Deferred()
+ self.reactor.callInThread(self._do_fetch, messages_asked, uid, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "FETCH")
+ return d
+
+ # called in thread
+ def _do_fetch(self, messages_asked, uid, d):
+ """
+ :param messages_asked: IDs of the messages to retrieve information
+ about
+ :type messages_asked: MessageSet
+
+ :param uid: If true, the IDs are UIDs. They are message sequence IDs
+ otherwise.
+ :type uid: bool
+ :param d: deferred whose callback will be called with result.
+ :type d: Deferred
+
:rtype: A tuple of two-tuples of message sequence numbers and
LeapMessage
"""
@@ -564,10 +631,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
logger.debug("Getting msg by index: INEFFICIENT call!")
raise NotImplementedError
else:
- result = ((msgid, getmsg(msgid)) for msgid in seq_messg)
- return result
+ got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg)
+ result = ((msgid, msg) for msgid, msg in got_msg
+ if msg is not None)
+ self.reactor.callLater(0, self.unset_recent_flags, seq_messg)
+ self.reactor.callFromThread(d.callback, result)
- @deferred_to_thread
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -606,12 +675,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- all_flags = self.messages.all_flags()
+ all_flags = self._memstore.all_flags(self.mbox)
result = ((msgid, flagsPart(
msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
return result
- @deferred_to_thread
def fetch_headers(self, messages_asked, uid):
"""
A fast method to fetch all headers, tricking just the
@@ -636,6 +704,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
MessagePart.
:rtype: tuple
"""
+ # TODO how often is thunderbird doing this?
+
class headersPart(object):
def __init__(self, uid, headers):
self.uid = uid
@@ -653,10 +723,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
messages_asked = self._bound_seq(messages_asked)
seq_messg = self._filter_msg_seq(messages_asked)
- all_chash = self.messages.all_flags_chash()
all_headers = self.messages.all_headers()
result = ((msgid, headersPart(
- msgid, all_headers.get(all_chash.get(msgid, 'nil'), {})))
+ msgid, all_headers.get(msgid, {})))
for msgid in seq_messg)
return result
@@ -699,14 +768,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:raise ReadOnlyMailbox: Raised if this mailbox is not open for
read-write.
"""
- from twisted.internet import reactor
if not self.isWriteable():
log.msg('read only mailbox!')
raise imap4.ReadOnlyMailbox
d = defer.Deferred()
- deferLater(reactor, 0, self._do_store, messages_asked, flags,
- mode, uid, d)
+ self.reactor.callLater(0, self._do_store, messages_asked, flags,
+ mode, uid, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "STORE")
return d
def _do_store(self, messages_asked, flags, mode, uid, observer):
@@ -721,7 +791,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type observer: deferred
"""
# XXX implement also sequence (uid = 0)
- # XXX we should prevent cclient from setting Recent flag?
+ # XXX we should prevent client from setting Recent flag?
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
@@ -785,15 +855,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
uid when the copy succeed.
:rtype: Deferred
"""
- from twisted.internet import reactor
-
d = defer.Deferred()
- # XXX this should not happen ... track it down,
- # probably to FETCH...
- if message is None:
- log.msg("BUG: COPY found a None in passed message")
- d.callback(None)
- deferLater(reactor, 0, self._do_copy, message, d)
+ if PROFILE_CMD:
+ do_profile_cmd(d, "COPY")
+ deferLater(self.reactor, 0, self._do_copy, message, d)
return d
def _do_copy(self, message, observer):
@@ -809,51 +874,70 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
UID of the message
:type observer: Deferred
"""
+ memstore = self._memstore
+
+ def createCopy(result):
+ exist, new_fdoc = result
+ if exist:
+ # Should we signal error on the callback?
+ logger.warning("Destination message already exists!")
+
+ # XXX I'm not sure if we should raise the
+ # errback. This actually rases an ugly warning
+ # in some muas like thunderbird.
+ # UID 0 seems a good convention for no uid.
+ observer.callback(0)
+ else:
+ mbox = self.mbox
+ uid_next = memstore.increment_last_soledad_uid(mbox)
+
+ new_fdoc[self.UID_KEY] = uid_next
+ new_fdoc[self.MBOX_KEY] = mbox
+
+ flags = list(new_fdoc[self.FLAGS_KEY])
+ flags.append(fields.RECENT_FLAG)
+ new_fdoc[self.FLAGS_KEY] = tuple(set(flags))
+
+ # FIXME set recent!
+
+ self._memstore.create_message(
+ self.mbox, uid_next,
+ MessageWrapper(new_fdoc),
+ observer=observer,
+ notify_on_disk=False)
+
+ d = self._get_msg_copy(message)
+ d.addCallback(createCopy)
+ d.addErrback(lambda f: log.msg(f.getTraceback()))
+
+ @deferred_to_thread
+ def _get_msg_copy(self, message):
+ """
+ Get a copy of the fdoc for this message, and check whether
+ it already exists.
+
+ :param message: an IMessage implementor
+ :type message: LeapMessage
+ :return: exist, new_fdoc
+ :rtype: tuple
+ """
# XXX for clarity, this could be delegated to a
# MessageCollection mixin that implements copy too, and
# moved out of here.
msg = message
memstore = self._memstore
- # XXX should use a public api instead
- fdoc = msg._fdoc
- hdoc = msg._hdoc
- if not fdoc:
+ if empty(msg.fdoc):
logger.warning("Tried to copy a MSG with no fdoc")
return
- new_fdoc = copy.deepcopy(fdoc.content)
-
+ new_fdoc = copy.deepcopy(msg.fdoc.content)
fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
- # XXX is this hitting the db??? --- probably.
- # We should profile after the pre-fetch.
dest_fdoc = memstore.get_fdoc_from_chash(
fdoc_chash, self.mbox)
- exist = dest_fdoc and not empty(dest_fdoc.content)
-
- if exist:
- # Should we signal error on the callback?
- logger.warning("Destination message already exists!")
-
- # XXX I'm still not clear if we should raise the
- # errback. This actually rases an ugly warning
- # in some muas like thunderbird. I guess the user does
- # not deserve that.
- observer.callback(True)
- else:
- mbox = self.mbox
- uid_next = memstore.increment_last_soledad_uid(mbox)
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = mbox
-
- # FIXME set recent!
- self._memstore.create_message(
- self.mbox, uid_next,
- MessageWrapper(
- new_fdoc, hdoc.content),
- observer=observer,
- notify_on_disk=False)
+ exist = not empty(dest_fdoc)
+ return exist, new_fdoc
# convenience fun
@@ -865,12 +949,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
for doc in docs:
self.messages._soledad.delete_doc(doc)
- def unset_recent_flags(self, uids):
+ def unset_recent_flags(self, uid_seq):
"""
Unset Recent flag for a sequence of UIDs.
"""
- seq_messg = self._bound_seq(uids)
- self.messages.unset_recent_flags(seq_messg)
+ self.messages.unset_recent_flags(uid_seq)
def __repr__(self):
"""