summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/mailbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/mail/imap/mailbox.py')
-rw-r--r--src/leap/mail/imap/mailbox.py841
1 files changed, 375 insertions, 466 deletions
diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py
index 34cf535..c52a2e3 100644
--- a/src/leap/mail/imap/mailbox.py
+++ b/src/leap/mail/imap/mailbox.py
@@ -1,6 +1,6 @@
# *- coding: utf-8 -*-
# mailbox.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013-2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,36 +15,38 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-Soledad Mailbox.
+IMAP Mailbox.
"""
-import copy
-import threading
+import re
import logging
-import StringIO
-import cStringIO
import os
+import cStringIO
+import StringIO
+import time
from collections import defaultdict
+from email.utils import formatdate
from twisted.internet import defer
-from twisted.internet.task import deferLater
+from twisted.internet import reactor
from twisted.python import log
from twisted.mail import imap4
from zope.interface import implements
-from leap.common import events as leap_events
-from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL
-from leap.common.check import leap_assert, leap_assert_type
-from leap.mail.decorators import deferred_to_thread
-from leap.mail.utils import empty
-from leap.mail.imap.fields import WithMsgFields, fields
-from leap.mail.imap.messages import MessageCollection
-from leap.mail.imap.messageparts import MessageWrapper
-from leap.mail.imap.parser import MBoxParser
+from leap.common.check import leap_assert
+from leap.common.check import leap_assert_type
+from leap.mail.constants import INBOX_NAME, MessageFlags
+from leap.mail.imap.messages import IMAPMessage
logger = logging.getLogger(__name__)
+# TODO LIST
+# [ ] Restore profile_cmd instrumentation
+# [ ] finish the implementation of IMailboxListener
+# [ ] implement the rest of ISearchableMailbox
+
+
"""
If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid
notifying clients of new messages. Use during stress tests.
@@ -53,7 +55,6 @@ NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False)
PROFILE_CMD = os.environ.get('LEAP_PROFILE_IMAPCMD', False)
if PROFILE_CMD:
- import time
def _debugProfiling(result, cmdname, start):
took = (time.time() - start) * 1000
@@ -70,33 +71,32 @@ if PROFILE_CMD:
d.addCallback(_debugProfiling, name, time.time())
d.addErrback(lambda f: log.msg(f.getTraceback()))
+INIT_FLAGS = (MessageFlags.SEEN_FLAG, MessageFlags.ANSWERED_FLAG,
+ MessageFlags.FLAGGED_FLAG, MessageFlags.DELETED_FLAG,
+ MessageFlags.DRAFT_FLAG, MessageFlags.RECENT_FLAG,
+ MessageFlags.LIST_FLAG)
-class SoledadMailbox(WithMsgFields, MBoxParser):
+
+class IMAPMailbox(object):
"""
A Soledad-backed IMAP mailbox.
Implements the high-level method needed for the Mailbox interfaces.
- The low-level database methods are contained in MessageCollection class,
- which we instantiate and make accessible in the `messages` attribute.
+ The low-level database methods are contained in the generic
+ MessageCollection class. We receive an instance of it and it is made
+ accessible in the `collection` attribute.
"""
implements(
imap4.IMailbox,
imap4.IMailboxInfo,
- imap4.ICloseableMailbox,
imap4.ISearchableMailbox,
+ # XXX I think we do not need to implement CloseableMailbox, do we?
+ # We could remove ourselves from the collectionListener, although I
+ # think it simply will be garbage collected.
+ # imap4.ICloseableMailbox
imap4.IMessageCopier)
- # XXX should finish the implementation of IMailboxListener
- # XXX should completely implement ISearchableMailbox too
-
- messages = None
- _closed = False
-
- INIT_FLAGS = (WithMsgFields.SEEN_FLAG, WithMsgFields.ANSWERED_FLAG,
- WithMsgFields.FLAGGED_FLAG, WithMsgFields.DELETED_FLAG,
- WithMsgFields.DRAFT_FLAG, WithMsgFields.RECENT_FLAG,
- WithMsgFields.LIST_FLAG)
- flags = None
+ init_flags = INIT_FLAGS
CMD_MSG = "MESSAGES"
CMD_RECENT = "RECENT"
@@ -104,65 +104,25 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
CMD_UIDVALIDITY = "UIDVALIDITY"
CMD_UNSEEN = "UNSEEN"
- # FIXME we should turn this into a datastructure with limited capacity
+ # TODO we should turn this into a datastructure with limited capacity
_listeners = defaultdict(set)
- next_uid_lock = threading.Lock()
- last_uid_lock = threading.Lock()
-
- # TODO unify all the `primed` dicts
- _fdoc_primed = {}
- _last_uid_primed = {}
- _known_uids_primed = {}
-
- def __init__(self, mbox, soledad, memstore, rw=1):
+ def __init__(self, collection, rw=1):
"""
- SoledadMailbox constructor. Needs to get passed a name, plus a
- Soledad instance.
-
- :param mbox: the mailbox name
- :type mbox: str
-
- :param soledad: a Soledad instance.
- :type soledad: Soledad
-
- :param memstore: a MemoryStore instance
- :type memstore: MemoryStore
+ :param collection: instance of MessageCollection
+ :type collection: MessageCollection
:param rw: read-and-write flag for this mailbox
:type rw: int
"""
- leap_assert(mbox, "Need a mailbox name to initialize")
- leap_assert(soledad, "Need a soledad instance to initialize")
-
- from twisted.internet import reactor
- self.reactor = reactor
-
- self.mbox = self._parse_mailbox_name(mbox)
self.rw = rw
-
- self._soledad = soledad
- self._memstore = memstore
-
- self.messages = MessageCollection(
- mbox=mbox, soledad=self._soledad, memstore=self._memstore)
-
self._uidvalidity = None
+ self.collection = collection
+ self.collection.addListener(self)
- # XXX careful with this get/set (it would be
- # hitting db unconditionally, move to memstore too)
- # Now it's returning a fixed amount of flags from mem
- # as a workaround.
- if not self.getFlags():
- self.setFlags(self.INIT_FLAGS)
-
- if self._memstore:
- self.prime_known_uids_to_memstore()
- self.prime_last_uid_to_memstore()
- self.prime_flag_docs_to_memstore()
-
- # purge memstore from empty fdocs.
- self._memstore.purge_fdoc_store(mbox)
+ @property
+ def mbox_name(self):
+ return self.collection.mbox_name
@property
def listeners(self):
@@ -175,11 +135,17 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:rtype: set
"""
- return self._listeners[self.mbox]
+ return self._listeners[self.mbox_name]
+
+ def get_imap_message(self, message):
+ d = defer.Deferred()
+ IMAPMessage(message, store=self.collection.store, d=d)
+ return d
- # TODO this grows too crazily when many instances are fired, like
+ # FIXME this grows too crazily when many instances are fired, like
# during imaptest stress testing. Should have a queue of limited size
# instead.
+
def addListener(self, listener):
"""
Add a listener to the listeners queue.
@@ -192,8 +158,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
if not NOTIFY_NEW:
return
- logger.debug('adding mailbox listener: %s' % listener)
- self.listeners.add(listener)
+ listeners = self.listeners
+ logger.debug('adding mailbox listener: %s. Total: %s' % (
+ listener, len(listeners)))
+ listeners.add(listener)
def removeListener(self, listener):
"""
@@ -204,17 +172,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
self.listeners.remove(listener)
- def _get_mbox_doc(self):
- """
- Return mailbox document.
-
- :return: A SoledadDocument containing this mailbox, or None if
- the query failed.
- :rtype: SoledadDocument or None.
- """
- return self._memstore.get_mbox_doc(self.mbox)
-
- # XXX the memstore->soledadstore method in memstore is not complete
def getFlags(self):
"""
Returns the flags defined for this mailbox.
@@ -222,12 +179,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:returns: tuple of flags for this mailbox
:rtype: tuple of str
"""
- flags = self._memstore.get_mbox_flags(self.mbox)
+ flags = self.collection.mbox_wrapper.flags
if not flags:
- flags = self.INIT_FLAGS
- return map(str, flags)
+ flags = self.init_flags
+ flags_str = map(str, flags)
+ return flags_str
- # XXX the memstore->soledadstore method in memstore is not complete
def setFlags(self, flags):
"""
Sets flags for this mailbox.
@@ -236,87 +193,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type flags: tuple of str
"""
# XXX this is setting (overriding) old flags.
+ # Better pass a mode flag
leap_assert(isinstance(flags, tuple),
"flags expected to be a tuple")
- self._memstore.set_mbox_flags(self.mbox, flags)
-
- # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG.
-
- def _get_closed(self):
- """
- Return the closed attribute for this mailbox.
-
- :return: True if the mailbox is closed
- :rtype: bool
- """
- return self._memstore.get_mbox_closed(self.mbox)
-
- def _set_closed(self, closed):
- """
- Set the closed attribute for this mailbox.
-
- :param closed: the state to be set
- :type closed: bool
- """
- self._memstore.set_mbox_closed(self.mbox, closed)
-
- closed = property(
- _get_closed, _set_closed, doc="Closed attribute.")
-
- def _get_last_uid(self):
- """
- Return the last uid for this mailbox.
- If we have a memory store, the last UID will be the highest
- recorded UID in the message store, or a counter cached from
- the mailbox document in soledad if this is higher.
-
- :return: the last uid for messages in this mailbox
- :rtype: int
- """
- last = self._memstore.get_last_uid(self.mbox)
- logger.debug("last uid for %s: %s (from memstore)" % (
- repr(self.mbox), last))
- return last
-
- last_uid = property(
- _get_last_uid, doc="Last_UID attribute.")
-
- def prime_last_uid_to_memstore(self):
- """
- Prime memstore with last_uid value
- """
- primed = self._last_uid_primed.get(self.mbox, False)
- if not primed:
- mbox = self._get_mbox_doc()
- if mbox is None:
- # memory-only store
- return
- last = mbox.content.get('lastuid', 0)
- logger.info("Priming Soledad last_uid to %s" % (last,))
- self._memstore.set_last_soledad_uid(self.mbox, last)
- self._last_uid_primed[self.mbox] = True
-
- def prime_known_uids_to_memstore(self):
- """
- Prime memstore with the set of all known uids.
-
- We do this to be able to filter the requests efficiently.
- """
- primed = self._known_uids_primed.get(self.mbox, False)
- if not primed:
- known_uids = self.messages.all_soledad_uid_iter()
- self._memstore.set_known_uids(self.mbox, known_uids)
- self._known_uids_primed[self.mbox] = True
-
- def prime_flag_docs_to_memstore(self):
- """
- Prime memstore with all the flags documents.
- """
- primed = self._fdoc_primed.get(self.mbox, False)
- if not primed:
- all_flag_docs = self.messages.get_all_soledad_flag_docs()
- self._memstore.load_flag_docs(self.mbox, all_flag_docs)
- self._fdoc_primed[self.mbox] = True
+ return self.collection.set_mbox_attr("flags", flags)
def getUIDValidity(self):
"""
@@ -325,14 +205,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: unique validity identifier
:rtype: int
"""
- if self._uidvalidity is None:
- mbox = self._get_mbox_doc()
- if mbox is None:
- return 0
- self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1)
- return self._uidvalidity
+ return self.collection.get_mbox_attr("created")
- def getUID(self, message):
+ def getUID(self, message_number):
"""
Return the UID of a message in the mailbox
@@ -340,14 +215,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
but in the future will be useful to get absolute UIDs from
message sequence numbers.
- :param message: the message uid
+ :param message: the message sequence number.
:type message: int
:rtype: int
+ :return: the UID of the message.
"""
- msg = self.messages.get_msg_by_uid(message)
- if msg is not None:
- return msg.getUID()
+ # TODO support relative sequences. The (imap) message should
+ # receive a sequence number attribute: a deferred is not expected
+ return message_number
def getUIDNext(self):
"""
@@ -355,23 +231,20 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
mailbox. Currently it returns the higher UID incremented by
one.
- We increment the next uid *each* time this function gets called.
- In this way, there will be gaps if the message with the allocated
- uid cannot be saved. But that is preferable to having race conditions
- if we get to parallel message adding.
-
- :rtype: int
+ :return: deferred with int
+ :rtype: Deferred
"""
- with self.next_uid_lock:
- return self.last_uid + 1
+ d = self.collection.get_uid_next()
+ return d
def getMessageCount(self):
"""
Returns the total count of messages in this mailbox.
- :rtype: int
+ :return: deferred with int
+ :rtype: Deferred
"""
- return self.messages.count()
+ return self.collection.count()
def getUnseenCount(self):
"""
@@ -380,7 +253,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: count of messages flagged `unseen`
:rtype: int
"""
- return self.messages.count_unseen()
+ return self.collection.count_unseen()
def getRecentCount(self):
"""
@@ -389,7 +262,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: count of messages flagged `recent`
:rtype: int
"""
- return self.messages.count_recent()
+ return self.collection.count_recent()
def isWriteable(self):
"""
@@ -398,6 +271,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:return: 1 if mailbox is read-writeable, 0 otherwise.
:rtype: int
"""
+ # XXX We don't need to store it in the mbox doc, do we?
+ # return int(self.collection.get_mbox_attr('rw'))
return self.rw
def getHierarchicalDelimiter(self):
@@ -417,19 +292,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type names: iter
"""
r = {}
+ maybe = defer.maybeDeferred
if self.CMD_MSG in names:
- r[self.CMD_MSG] = self.getMessageCount()
+ r[self.CMD_MSG] = maybe(self.getMessageCount)
if self.CMD_RECENT in names:
- r[self.CMD_RECENT] = self.getRecentCount()
+ r[self.CMD_RECENT] = maybe(self.getRecentCount)
if self.CMD_UIDNEXT in names:
- r[self.CMD_UIDNEXT] = self.last_uid + 1
+ r[self.CMD_UIDNEXT] = maybe(self.getUIDNext)
if self.CMD_UIDVALIDITY in names:
- r[self.CMD_UIDVALIDITY] = self.getUIDValidity()
+ r[self.CMD_UIDVALIDITY] = maybe(self.getUIDValidity)
if self.CMD_UNSEEN in names:
- r[self.CMD_UNSEEN] = self.getUnseenCount()
- return defer.succeed(r)
+ r[self.CMD_UNSEEN] = maybe(self.getUnseenCount)
+
+ def as_a_dict(values):
+ return dict(zip(r.keys(), values))
- def addMessage(self, message, flags, date=None, notify_on_disk=False):
+ d = defer.gatherResults(r.values())
+ d.addCallback(as_a_dict)
+ return d
+
+ def addMessage(self, message, flags, date=None, notify_just_mdoc=True):
"""
Adds a message to this mailbox.
@@ -440,51 +322,69 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type flags: list of str
:param date: timestamp
- :type date: str
-
- :return: a deferred that evals to None
- """
+ :type date: str, or None
+
+ :param notify_just_mdoc:
+ boolean passed to the wrapper.create method, to indicate whether
+ we're insterested in being notified right after the mdoc has been
+ written (as it's the first doc to be written, and quite small, this
+ is faster, though potentially unsafe).
+ Setting it to True improves a *lot* the responsiveness of the
+ APPENDS: we just need to be notified when the mdoc is saved, and
+ let's just expect that the other parts are doing just fine. This
+ will not catch any errors when the inserts of the other parts
+ fail, but on the other hand allows us to return very quickly,
+ which seems a good compromise given that we have to serialize the
+ appends.
+ However, some operations like the saving of drafts need to wait for
+ all the parts to be saved, so if some heuristics are met down in
+ the call chain a Draft message will unconditionally set this flag
+ to False, and therefore ignoring the setting of this flag here.
+ :type notify_just_mdoc: bool
+
+ :return: a deferred that will be triggered with the UID of the added
+ message.
+ """
+ # TODO should raise ReadOnlyMailbox if not rw.
# TODO have a look at the cases for internal date in the rfc
+ # XXX we could treat the message as an IMessage from here
+
+ # TODO change notify_just_mdoc to something more meaningful, like
+ # fast_insert_notify?
+
+ # TODO notify_just_mdoc *sometimes* make the append tests fail.
+ # have to find a better solution for this. A workaround could probably
+ # be to have a list of the ongoing deferreds related to append, so that
+ # we queue for later all the requests having to do with these.
+
+ # A better solution will probably involve implementing MULTIAPPEND
+ # extension or patching imap server to support pipelining.
+
if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)):
message = message.getvalue()
- # XXX we could treat the message as an IMessage from here
leap_assert_type(message, basestring)
+
if flags is None:
flags = tuple()
else:
flags = tuple(str(flag) for flag in flags)
- d = self._do_add_message(message, flags=flags, date=date,
- notify_on_disk=notify_on_disk)
- if PROFILE_CMD:
- do_profile_cmd(d, "APPEND")
-
- # XXX should review now that we're not using qtreactor.
- # A better place for this would be the COPY/APPEND dispatcher
- # in server.py, but qtreactor hangs when I do that, so this seems
- # to work fine for now.
-
- def notifyCallback(x):
- self.reactor.callLater(0, self.notify_new)
- return x
+ if date is None:
+ date = formatdate(time.time())
- d.addCallback(notifyCallback)
- d.addErrback(lambda f: log.msg(f.getTraceback()))
- return d
-
- def _do_add_message(self, message, flags, date, notify_on_disk=False):
- """
- Calls to the messageCollection add_msg method.
- Invoked from addMessage.
- """
- d = self.messages.add_msg(message, flags=flags, date=date,
- notify_on_disk=notify_on_disk)
+ d = self.collection.add_msg(message, flags, date=date,
+ notify_just_mdoc=notify_just_mdoc)
+ d.addErrback(lambda failure: log.err(failure))
return d
def notify_new(self, *args):
"""
Notify of new messages to all the listeners.
+ This will be called indirectly by the underlying collection, that will
+ notify this IMAPMailbox whenever there are changes in the number of
+ messages in the collection, since we have added ourselves to the
+ collection listeners.
:param args: ignored.
"""
@@ -493,26 +393,36 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def cbNotifyNew(result):
exists, recent = result
- for l in self.listeners:
- l.newMessages(exists, recent)
+ for listener in self.listeners:
+ listener.newMessages(exists, recent)
+
d = self._get_notify_count()
d.addCallback(cbNotifyNew)
- d.addCallback(self.cb_signal_unread_to_ui)
+ d.addCallback(self.collection.cb_signal_unread_to_ui)
+ d.addErrback(lambda failure: log.err(failure))
- @deferred_to_thread
def _get_notify_count(self):
"""
Get message count and recent count for this mailbox
Executed in a separate thread. Called from notify_new.
- :return: number of messages and number of recent messages.
- :rtype: tuple
+ :return: a deferred that will fire with a tuple, with number of
+ messages and number of recent messages.
+ :rtype: Deferred
"""
- exists = self.getMessageCount()
- recent = self.getRecentCount()
- logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
- self.mbox, exists, recent))
- return exists, recent
+ d_exists = defer.maybeDeferred(self.getMessageCount)
+ d_recent = defer.maybeDeferred(self.getRecentCount)
+ d_list = [d_exists, d_recent]
+
+ def log_num_msg(result):
+ exists, recent = tuple(result)
+ logger.debug("NOTIFY (%r): there are %s messages, %s recent" % (
+ self.mbox_name, exists, recent))
+ return result
+
+ d = defer.gatherResults(d_list)
+ d.addCallback(log_num_msg)
+ return d
# commands, do not rename methods
@@ -522,31 +432,21 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
Should cleanup resources, and set the \\Noselect flag
on the mailbox.
+
"""
- # XXX this will overwrite all the existing flags!
+ # XXX this will overwrite all the existing flags
# should better simply addFlag
- self.setFlags((self.NOSELECT_FLAG,))
- self.deleteAllDocs()
+ self.setFlags((MessageFlags.NOSELECT_FLAG,))
- # XXX removing the mailbox in situ for now,
- # we should postpone the removal
+ def remove_mbox(_):
+ uuid = self.collection.mbox_uuid
+ d = self.collection.mbox_wrapper.delete(self.collection.store)
+ d.addCallback(
+ lambda _: self.collection.mbox_indexer.delete_table(uuid))
+ return d
- # XXX move to memory store??
- mbox_doc = self._get_mbox_doc()
- if mbox_doc is None:
- # memory-only store!
- return
- self._soledad.delete_doc(self._get_mbox_doc())
-
- def _close_cb(self, result):
- self.closed = True
-
- def close(self):
- """
- Expunge and mark as closed
- """
- d = self.expunge()
- d.addCallback(self._close_cb)
+ d = self.deleteAllDocs()
+ d.addCallback(remove_mbox)
return d
def expunge(self):
@@ -555,11 +455,35 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
if not self.isWriteable():
raise imap4.ReadOnlyMailbox
- d = defer.Deferred()
- self._memstore.expunge(self.mbox, d)
+ return self.collection.delete_all_flagged()
+
+ def _get_message_fun(self, uid):
+ """
+ Return the proper method to get a message for this mailbox, depending
+ on the passed uid flag.
+
+ :param uid: If true, the IDs specified in the query are UIDs;
+ otherwise they are message sequence IDs.
+ :type uid: bool
+ :rtype: callable
+ """
+ get_message_fun = [
+ self.collection.get_message_by_sequence_number,
+ self.collection.get_message_by_uid][uid]
+ return get_message_fun
+
+ def _get_messages_range(self, messages_asked, uid=True):
+
+ def get_range(messages_asked):
+ return self._filter_msg_seq(messages_asked)
+
+ d = defer.maybeDeferred(self._bound_seq, messages_asked, uid)
+ if uid:
+ d.addCallback(get_range)
+ d.addErrback(lambda f: log.err(f))
return d
- def _bound_seq(self, messages_asked):
+ def _bound_seq(self, messages_asked, uid):
"""
Put an upper bound to a messages sequence if this is open.
@@ -567,15 +491,27 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type messages_asked: MessageSet
:rtype: MessageSet
"""
+
+ def set_last_uid(last_uid):
+ messages_asked.last = last_uid
+ return messages_asked
+
+ def set_last_seq(all_uid):
+ messages_asked.last = len(all_uid)
+ return messages_asked
+
if not messages_asked.last:
try:
iter(messages_asked)
except TypeError:
# looks like we cannot iterate
- try:
- messages_asked.last = self.last_uid
- except ValueError:
- pass
+ if uid:
+ d = self.collection.get_last_uid()
+ d.addCallback(set_last_uid)
+ else:
+ d = self.collection.all_uid_iter()
+ d.addCallback(set_last_seq)
+ return d
return messages_asked
def _filter_msg_seq(self, messages_asked):
@@ -587,10 +523,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:type messages_asked: MessageSet
:rtype: set
"""
- set_asked = set(messages_asked)
- set_exist = set(self.messages.all_uid_iter())
- seq_messg = set_asked.intersection(set_exist)
- return seq_messg
+ # TODO we could pass the asked sequence to the indexer
+ # all_uid_iter, and bound the sql query instead.
+ def filter_by_asked(all_msg_uid):
+ set_asked = set(messages_asked)
+ set_exist = set(all_msg_uid)
+ return set_asked.intersection(set_exist)
+
+ d = self.collection.all_uid_iter()
+ d.addCallback(filter_by_asked)
+ return d
def fetch(self, messages_asked, uid):
"""
@@ -607,54 +549,48 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
otherwise.
:type uid: bool
- :rtype: deferred
- """
- d = defer.Deferred()
- self.reactor.callInThread(self._do_fetch, messages_asked, uid, d)
- if PROFILE_CMD:
- do_profile_cmd(d, "FETCH")
- d.addCallback(self.cb_signal_unread_to_ui)
+ :rtype: deferred with a generator that yields...
+ """
+ get_msg_fun = self._get_message_fun(uid)
+ getimapmsg = self.get_imap_message
+
+ def get_imap_messages_for_range(msg_range):
+
+ def _get_imap_msg(messages):
+ d_imapmsg = []
+ for msg in messages:
+ d_imapmsg.append(getimapmsg(msg))
+ return defer.gatherResults(d_imapmsg, consumeErrors=True)
+
+ def _zip_msgid(imap_messages):
+ zipped = zip(
+ list(msg_range), imap_messages)
+ return (item for item in zipped)
+
+ # XXX not called??
+ def _unset_recent(sequence):
+ reactor.callLater(0, self.unset_recent_flags, sequence)
+ return sequence
+
+ d_msg = []
+ for msgid in msg_range:
+ # XXX We want cdocs because we "probably" are asked for the
+ # body. We should be smarter at do_FETCH and pass a parameter
+ # to this method in order not to prefetch cdocs if they're not
+ # going to be used.
+ d_msg.append(get_msg_fun(msgid, get_cdocs=True))
+
+ d = defer.gatherResults(d_msg, consumeErrors=True)
+ d.addCallback(_get_imap_msg)
+ d.addCallback(_zip_msgid)
+ d.addErrback(lambda failure: log.err(failure))
+ return d
+
+ d = self._get_messages_range(messages_asked, uid)
+ d.addCallback(get_imap_messages_for_range)
+ d.addErrback(lambda failure: log.err(failure))
return d
- # called in thread
- def _do_fetch(self, messages_asked, uid, d):
- """
- :param messages_asked: IDs of the messages to retrieve information
- about
- :type messages_asked: MessageSet
-
- :param uid: If true, the IDs are UIDs. They are message sequence IDs
- otherwise.
- :type uid: bool
- :param d: deferred whose callback will be called with result.
- :type d: Deferred
-
- :rtype: A tuple of two-tuples of message sequence numbers and
- LeapMessage
- """
- # For the moment our UID is sequential, so we
- # can treat them all the same.
- # Change this to the flag that twisted expects when we
- # switch to content-hash based index + local UID table.
-
- sequence = False
- # sequence = True if uid == 0 else False
-
- messages_asked = self._bound_seq(messages_asked)
- seq_messg = self._filter_msg_seq(messages_asked)
- getmsg = lambda uid: self.messages.get_msg_by_uid(uid)
-
- # for sequence numbers (uid = 0)
- if sequence:
- logger.debug("Getting msg by index: INEFFICIENT call!")
- raise NotImplementedError
- else:
- got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg)
- result = ((msgid, msg) for msgid, msg in got_msg
- if msg is not None)
- self.reactor.callLater(0, self.unset_recent_flags, seq_messg)
- self.reactor.callFromThread(d.callback, result)
-
def fetch_flags(self, messages_asked, uid):
"""
A fast method to fetch all flags, tricking just the
@@ -679,13 +615,23 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
MessagePart.
:rtype: tuple
"""
+ # is_sequence = True if uid == 0 else False
+ # XXX FIXME -----------------------------------------------------
+ # imap/tests, or muas like mutt, it will choke until we implement
+ # sequence numbers. This is an easy hack meanwhile.
+ is_sequence = False
+ # ---------------------------------------------------------------
+
+ if is_sequence:
+ raise NotImplementedError(
+ "FETCH FLAGS NOT IMPLEMENTED FOR MESSAGE SEQUENCE NUMBERS YET")
+
d = defer.Deferred()
- self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d)
+ reactor.callLater(0, self._do_fetch_flags, messages_asked, uid, d)
if PROFILE_CMD:
do_profile_cmd(d, "FETCH-ALL-FLAGS")
return d
- # called in thread
def _do_fetch_flags(self, messages_asked, uid, d):
"""
:param messages_asked: IDs of the messages to retrieve information
@@ -698,8 +644,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:param d: deferred whose callback will be called with result.
:type d: Deferred
- :rtype: A tuple of two-tuples of message sequence numbers and
- flagsPart
+ :rtype: A generator that yields two-tuples of message sequence numbers
+ and flagsPart
"""
class flagsPart(object):
def __init__(self, uid, flags):
@@ -712,13 +658,28 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
def getFlags(self):
return map(str, self.flags)
- messages_asked = self._bound_seq(messages_asked)
- seq_messg = self._filter_msg_seq(messages_asked)
-
- all_flags = self._memstore.all_flags(self.mbox)
- result = ((msgid, flagsPart(
- msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg)
- self.reactor.callFromThread(d.callback, result)
+ def pack_flags(result):
+ _uid, _flags = result
+ return _uid, flagsPart(_uid, _flags)
+
+ def get_flags_for_seq(sequence):
+ d_all_flags = []
+ for msgid in sequence:
+ # TODO implement sequence numbers here too
+ d_flags_per_uid = self.collection.get_flags_by_uid(msgid)
+ d_flags_per_uid.addCallback(pack_flags)
+ d_all_flags.append(d_flags_per_uid)
+ gotflags = defer.gatherResults(d_all_flags)
+ gotflags.addCallback(get_uid_flag_generator)
+ return gotflags
+
+ def get_uid_flag_generator(result):
+ generator = (item for item in result)
+ d.callback(generator)
+
+ d_seq = self._get_messages_range(messages_asked, uid)
+ d_seq.addCallback(get_flags_for_seq)
+ return d_seq
def fetch_headers(self, messages_asked, uid):
"""
@@ -744,7 +705,11 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
MessagePart.
:rtype: tuple
"""
- # TODO how often is thunderbird doing this?
+ # TODO implement sequences
+ is_sequence = True if uid == 0 else False
+ if is_sequence:
+ raise NotImplementedError(
+ "FETCH HEADERS NOT IMPLEMENTED FOR SEQUENCE NUMBER YET")
class headersPart(object):
def __init__(self, uid, headers):
@@ -769,29 +734,6 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
for msgid in seq_messg)
return result
- def cb_signal_unread_to_ui(self, result):
- """
- Sends unread event to ui.
- Used as a callback in several commands.
-
- :param result: ignored
- """
- d = self._get_unseen_deferred()
- d.addCallback(self.__cb_signal_unread_to_ui)
- return result
-
- @deferred_to_thread
- def _get_unseen_deferred(self):
- return self.getUnseenCount()
-
- def __cb_signal_unread_to_ui(self, unseen):
- """
- Send the unread signal to UI.
- :param unseen: number of unseen messages.
- :type unseen: int
- """
- leap_events.signal(IMAP_UNREAD_MAIL, str(unseen))
-
def store(self, messages_asked, flags, mode, uid):
"""
Sets the flags of one or more messages.
@@ -826,17 +768,18 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
raise imap4.ReadOnlyMailbox
d = defer.Deferred()
- self.reactor.callLater(0, self._do_store, messages_asked, flags,
- mode, uid, d)
+ reactor.callLater(0, self._do_store, messages_asked, flags,
+ mode, uid, d)
if PROFILE_CMD:
do_profile_cmd(d, "STORE")
- d.addCallback(self.cb_signal_unread_to_ui)
- d.addErrback(lambda f: log.msg(f.getTraceback()))
+
+ d.addCallback(self.collection.cb_signal_unread_to_ui)
+ d.addErrback(lambda f: log.err(f))
return d
def _do_store(self, messages_asked, flags, mode, uid, observer):
"""
- Helper method, invoke set_flags method in the MessageCollection.
+ Helper method, invoke set_flags method in the IMAPMessageCollection.
See the documentation for the `store` method for the parameters.
@@ -845,14 +788,31 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
done.
:type observer: deferred
"""
- # XXX implement also sequence (uid = 0)
- # XXX we should prevent client from setting Recent flag?
+ # TODO we should prevent client from setting Recent flag
+ get_msg_fun = self._get_message_fun(uid)
leap_assert(not isinstance(flags, basestring),
"flags cannot be a string")
flags = tuple(flags)
- messages_asked = self._bound_seq(messages_asked)
- seq_messg = self._filter_msg_seq(messages_asked)
- self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer)
+
+ def set_flags_for_seq(sequence):
+ def return_result_dict(list_of_flags):
+ result = dict(zip(list(sequence), list_of_flags))
+ observer.callback(result)
+ return result
+
+ d_all_set = []
+ for msgid in sequence:
+ d = get_msg_fun(msgid)
+ d.addCallback(lambda msg: self.collection.update_flags(
+ msg, flags, mode))
+ d_all_set.append(d)
+ got_flags_setted = defer.gatherResults(d_all_set)
+ got_flags_setted.addCallback(return_result_dict)
+ return got_flags_setted
+
+ d_seq = self._get_messages_range(messages_asked, uid)
+ d_seq.addCallback(set_flags_for_seq)
+ return d_seq
# ISearchableMailbox
@@ -877,23 +837,24 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
:rtype: C{list} or C{Deferred}
"""
# TODO see if we can raise w/o interrupting flow
- #:raise IllegalQueryError: Raised when query is not valid.
+ # :raise IllegalQueryError: Raised when query is not valid.
# example query:
# ['UNDELETED', 'HEADER', 'Message-ID',
+ # XXX fixme, does not exist
# '52D44F11.9060107@dev.bitmask.net']
# TODO hardcoding for now! -- we'll support generic queries later on
- # but doing a quickfix for avoiding duplicat saves in the draft folder.
- # See issue #4209
+ # but doing a quickfix for avoiding duplicate saves in the draft
+ # folder. # See issue #4209
if len(query) > 2:
if query[1] == 'HEADER' and query[2].lower() == "message-id":
msgid = str(query[3]).strip()
logger.debug("Searching for %s" % (msgid,))
- d = self.messages._get_uid_from_msgid(str(msgid))
- d1 = defer.gatherResults([d])
- # we want a list, so return it all the same
- return d1
+
+ d = self.collection.get_uid_from_msgid(str(msgid))
+ d.addCallback(lambda result: [result])
+ return d
# nothing implemented for any other query
logger.warning("Cannot process query: %s" % (query,))
@@ -911,94 +872,19 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
uid when the copy succeed.
:rtype: Deferred
"""
- d = defer.Deferred()
if PROFILE_CMD:
do_profile_cmd(d, "COPY")
# A better place for this would be the COPY/APPEND dispatcher
# in server.py, but qtreactor hangs when I do that, so this seems
# to work fine for now.
- d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new))
- deferLater(self.reactor, 0, self._do_copy, message, d)
- return d
-
- def _do_copy(self, message, observer):
- """
- Call invoked from the deferLater in `copy`. This will
- copy the flags and header documents, and pass them to the
- `create_message` method in the MemoryStore, together with
- the observer deferred that we've been passed along.
-
- :param message: an IMessage implementor
- :type message: LeapMessage
- :param observer: the deferred that will fire with the
- UID of the message
- :type observer: Deferred
- """
- memstore = self._memstore
-
- def createCopy(result):
- exist, new_fdoc = result
- if exist:
- # Should we signal error on the callback?
- logger.warning("Destination message already exists!")
-
- # XXX I'm not sure if we should raise the
- # errback. This actually rases an ugly warning
- # in some muas like thunderbird.
- # UID 0 seems a good convention for no uid.
- observer.callback(0)
- else:
- mbox = self.mbox
- uid_next = memstore.increment_last_soledad_uid(mbox)
-
- new_fdoc[self.UID_KEY] = uid_next
- new_fdoc[self.MBOX_KEY] = mbox
-
- flags = list(new_fdoc[self.FLAGS_KEY])
- flags.append(fields.RECENT_FLAG)
- new_fdoc[self.FLAGS_KEY] = tuple(set(flags))
-
- # FIXME set recent!
-
- self._memstore.create_message(
- self.mbox, uid_next,
- MessageWrapper(new_fdoc),
- observer=observer,
- notify_on_disk=False)
-
- d = self._get_msg_copy(message)
- d.addCallback(createCopy)
- d.addErrback(lambda f: log.msg(f.getTraceback()))
-
- @deferred_to_thread
- def _get_msg_copy(self, message):
- """
- Get a copy of the fdoc for this message, and check whether
- it already exists.
-
- :param message: an IMessage implementor
- :type message: LeapMessage
- :return: exist, new_fdoc
- :rtype: tuple
- """
- # XXX for clarity, this could be delegated to a
- # MessageCollection mixin that implements copy too, and
- # moved out of here.
- msg = message
- memstore = self._memstore
-
- if empty(msg.fdoc):
- logger.warning("Tried to copy a MSG with no fdoc")
- return
- new_fdoc = copy.deepcopy(msg.fdoc.content)
- fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY]
+ # d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new))
+ # deferLater(self.reactor, 0, self._do_copy, message, d)
+ # return d
- dest_fdoc = memstore.get_fdoc_from_chash(
- fdoc_chash, self.mbox)
-
- exist = not empty(dest_fdoc)
- return exist, new_fdoc
+ d = self.collection.copy_msg(message.message,
+ self.collection.mbox_uuid)
+ return d
# convenience fun
@@ -1006,19 +892,42 @@ class SoledadMailbox(WithMsgFields, MBoxParser):
"""
Delete all docs in this mailbox
"""
- docs = self.messages.get_all_docs()
- for doc in docs:
- self.messages._soledad.delete_doc(doc)
+ # FIXME not implemented
+ return self.collection.delete_all_docs()
def unset_recent_flags(self, uid_seq):
"""
Unset Recent flag for a sequence of UIDs.
"""
- self.messages.unset_recent_flags(uid_seq)
+ # FIXME not implemented
+ return self.collection.unset_recent_flags(uid_seq)
def __repr__(self):
"""
Representation string for this mailbox.
"""
- return u"<SoledadMailbox: mbox '%s' (%s)>" % (
- self.mbox, self.messages.count())
+ return u"<IMAPMailbox: mbox '%s' (%s)>" % (
+ self.mbox_name, self.collection.count())
+
+
+_INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE)
+
+
+def normalize_mailbox(name):
+ """
+ Return a normalized representation of the mailbox ``name``.
+
+ This method ensures that an eventual initial 'inbox' part of a
+ mailbox name is made uppercase.
+
+ :param name: the name of the mailbox
+ :type name: unicode
+
+ :rtype: unicode
+ """
+ # XXX maybe it would make sense to normalize common folders too:
+ # trash, sent, drafts, etc...
+ if _INBOX_RE.match(name):
+ # ensure inital INBOX is uppercase
+ return INBOX_NAME + name[len(INBOX_NAME):]
+ return name