diff options
author | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
---|---|---|
committer | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
commit | 74428b3176d286312f69e124d4d613c27a1ec93e (patch) | |
tree | eec0561e2184472dfedba3135a3d005efd478c34 /src/leap/mail/imap | |
parent | 781bd2f4d2a047088d1a0ecd673a38c80ea0c0c0 (diff) | |
parent | 23e28bae2c3cb74e00e29ee8add0b73adeb65c2b (diff) |
Merge remote-tracking branch 'kali/feature/in-memory-store' into develop
Diffstat (limited to 'src/leap/mail/imap')
-rw-r--r-- | src/leap/mail/imap/account.py | 13 | ||||
-rw-r--r-- | src/leap/mail/imap/fetch.py | 4 | ||||
-rw-r--r-- | src/leap/mail/imap/interfaces.py | 94 | ||||
-rw-r--r-- | src/leap/mail/imap/mailbox.py | 295 | ||||
-rw-r--r-- | src/leap/mail/imap/memorystore.py | 961 | ||||
-rw-r--r-- | src/leap/mail/imap/messageparts.py | 565 | ||||
-rw-r--r-- | src/leap/mail/imap/messages.py | 1104 | ||||
-rw-r--r-- | src/leap/mail/imap/server.py | 217 | ||||
-rw-r--r-- | src/leap/mail/imap/service/imap.py | 189 | ||||
-rw-r--r-- | src/leap/mail/imap/service/manhole.py | 130 | ||||
-rw-r--r-- | src/leap/mail/imap/soledadstore.py | 487 | ||||
-rwxr-xr-x | src/leap/mail/imap/tests/leap_tests_imap.zsh | 4 | ||||
-rw-r--r-- | src/leap/mail/imap/tests/walktree.py | 16 |
13 files changed, 3060 insertions, 1019 deletions
diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py index ce83079..f985c04 100644 --- a/src/leap/mail/imap/account.py +++ b/src/leap/mail/imap/account.py @@ -48,7 +48,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): selected = None closed = False - def __init__(self, account_name, soledad=None): + def __init__(self, account_name, soledad, memstore=None): """ Creates a SoledadAccountIndex that keeps track of the mailboxes and subscriptions handled by this account. @@ -57,7 +57,9 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): :type acct_name: str :param soledad: a Soledad instance. - :param soledad: Soledad + :type soledad: Soledad + :param memstore: a MemoryStore instance. + :type memstore: MemoryStore """ leap_assert(soledad, "Need a soledad instance to initialize") leap_assert_type(soledad, Soledad) @@ -67,6 +69,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): self._account_name = self._parse_mailbox_name(account_name) self._soledad = soledad + self._memstore = memstore self.initialize_db() @@ -131,7 +134,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): if name not in self.mailboxes: raise imap4.MailboxException("No such mailbox: %r" % name) - return SoledadMailbox(name, soledad=self._soledad) + return SoledadMailbox(name, self._soledad, + memstore=self._memstore) ## ## IAccount @@ -221,8 +225,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): self.selected = name return SoledadMailbox( - name, rw=readwrite, - soledad=self._soledad) + name, self._soledad, self._memstore, readwrite) def delete(self, name, force=False): """ diff --git a/src/leap/mail/imap/fetch.py b/src/leap/mail/imap/fetch.py index 817ad6a..40dadb3 100644 --- a/src/leap/mail/imap/fetch.py +++ b/src/leap/mail/imap/fetch.py @@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import json_loads from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -199,7 +199,7 @@ class LeapIncomingMail(object): logger.exception(failure.value) traceback.print_tb(*sys.exc_info()) - @deferred + @deferred_to_thread def _sync_soledad(self): """ Synchronizes with remote soledad. diff --git a/src/leap/mail/imap/interfaces.py b/src/leap/mail/imap/interfaces.py new file mode 100644 index 0000000..c906278 --- /dev/null +++ b/src/leap/mail/imap/interfaces.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# interfaces.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Interfaces for the IMAP module. +""" +from zope.interface import Interface, Attribute + + +class IMessageContainer(Interface): + """ + I am a container around the different documents that a message + is split into. + """ + fdoc = Attribute('The flags document for this message, if any.') + hdoc = Attribute('The headers document for this message, if any.') + cdocs = Attribute('The dict of content documents for this message, ' + 'if any.') + + def walk(self): + """ + Return an iterator to the docs for all the parts. + + :rtype: iterator + """ + + +class IMessageStore(Interface): + """ + I represent a generic storage for LEAP Messages. + """ + + def create_message(self, mbox, uid, message): + """ + Put the passed message into this IMessageStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def put_message(self, mbox, uid, message): + """ + Put the passed message into this IMessageStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def remove_message(self, mbox, uid): + """ + Remove the given message from this IMessageStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + """ + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :return: IMessageContainer + """ + + +class IMessageStoreWriter(Interface): + """ + I represent a storage that is able to write its contents to another + different IMessageStore. + """ + + def write_messages(self, store): + """ + Write the documents in this IMessageStore to a different + storage. Usually this will be done from a MemoryStorage to a DbStorage. + + :param store: another IMessageStore implementor. + """ diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index 0131ce0..c682578 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -22,6 +22,7 @@ import threading import logging import StringIO import cStringIO +import os from collections import defaultdict @@ -35,13 +36,21 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread +from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.parser import MBoxParser logger = logging.getLogger(__name__) +""" +If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid +notifying clients of new messages. Use during stress tests. +""" +NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) + class SoledadMailbox(WithMsgFields, MBoxParser): """ @@ -76,11 +85,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): CMD_UIDVALIDITY = "UIDVALIDITY" CMD_UNSEEN = "UNSEEN" + # FIXME we should turn this into a datastructure with limited capacity _listeners = defaultdict(set) next_uid_lock = threading.Lock() - def __init__(self, mbox, soledad=None, rw=1): + def __init__(self, mbox, soledad, memstore, rw=1): """ SoledadMailbox constructor. Needs to get passed a name, plus a Soledad instance. @@ -91,7 +101,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param soledad: a Soledad instance. :type soledad: Soledad - :param rw: read-and-write flags + :param memstore: a MemoryStore instance + :type memstore: MemoryStore + + :param rw: read-and-write flag for this mailbox :type rw: int """ leap_assert(mbox, "Need a mailbox name to initialize") @@ -105,13 +118,18 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.rw = rw self._soledad = soledad + self._memstore = memstore self.messages = MessageCollection( - mbox=mbox, soledad=self._soledad) + mbox=mbox, soledad=self._soledad, memstore=self._memstore) if not self.getFlags(): self.setFlags(self.INIT_FLAGS) + if self._memstore: + self.prime_known_uids_to_memstore() + self.prime_last_uid_to_memstore() + @property def listeners(self): """ @@ -125,6 +143,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ return self._listeners[self.mbox] + # TODO this grows too crazily when many instances are fired, like + # during imaptest stress testing. Should have a queue of limited size + # instead. def addListener(self, listener): """ Add a listener to the listeners queue. @@ -134,6 +155,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param listener: listener to add :type listener: an object that implements IMailboxListener """ + if not NOTIFY_NEW: + return + logger.debug('adding mailbox listener: %s' % listener) self.listeners.add(listener) @@ -146,6 +170,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) + # TODO move completely to soledadstore, under memstore reponsibility. def _get_mbox(self): """ Return mailbox document. @@ -221,48 +246,38 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def _get_last_uid(self): """ Return the last uid for this mailbox. + If we have a memory store, the last UID will be the highest + recorded UID in the message store, or a counter cached from + the mailbox document in soledad if this is higher. :return: the last uid for messages in this mailbox - :rtype: bool + :rtype: int """ - mbox = self._get_mbox() - if not mbox: - logger.error("We could not get a mbox!") - # XXX It looks like it has been corrupted. - # We need to be able to survive this. - return None - return mbox.content.get(self.LAST_UID_KEY, 1) + last = self._memstore.get_last_uid(self.mbox) + logger.debug("last uid for %s: %s (from memstore)" % ( + repr(self.mbox), last)) + return last - def _set_last_uid(self, uid): - """ - Sets the last uid for this mailbox. + last_uid = property( + _get_last_uid, doc="Last_UID attribute.") - :param uid: the uid to be set - :type uid: int + def prime_last_uid_to_memstore(self): """ - leap_assert(isinstance(uid, int), "uid has to be int") - mbox = self._get_mbox() - key = self.LAST_UID_KEY - - count = self.getMessageCount() - - # XXX safety-catch. If we do get duplicates, - # we want to avoid further duplication. - - if uid >= count: - value = uid - else: - # something is wrong, - # just set the last uid - # beyond the max msg count. - logger.debug("WRONG uid < count. Setting last uid to %s", count) - value = count + Prime memstore with last_uid value + """ + set_exist = set(self.messages.all_uid_iter()) + last = max(set_exist) if set_exist else 0 + logger.info("Priming Soledad last_uid to %s" % (last,)) + self._memstore.set_last_soledad_uid(self.mbox, last) - mbox.content[key] = value - self._soledad.put_doc(mbox) + def prime_known_uids_to_memstore(self): + """ + Prime memstore with the set of all known uids. - last_uid = property( - _get_last_uid, _set_last_uid, doc="Last_UID attribute.") + We do this to be able to filter the requests efficiently. + """ + known_uids = self.messages.all_soledad_uid_iter() + self._memstore.set_known_uids(self.mbox, known_uids) def getUIDValidity(self): """ @@ -304,8 +319,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: int """ with self.next_uid_lock: - self.last_uid += 1 - return self.last_uid + if self._memstore: + return self.last_uid + 1 + else: + # XXX after lock, it should be safe to + # return just the increment here, and + # have a different method that actually increments + # the counter when really adding. + self.last_uid += 1 + return self.last_uid def getMessageCount(self): """ @@ -366,7 +388,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if self.CMD_UIDNEXT in names: r[self.CMD_UIDNEXT] = self.last_uid + 1 if self.CMD_UIDVALIDITY in names: - r[self.CMD_UIDVALIDITY] = self.getUID() + r[self.CMD_UIDVALIDITY] = self.getUIDValidity() if self.CMD_UNSEEN in names: r[self.CMD_UNSEEN] = self.getUnseenCount() return defer.succeed(r) @@ -386,26 +408,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: a deferred that evals to None """ + # TODO have a look at the cases for internal date in the rfc if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): message = message.getvalue() - # XXX we should treat the message as an IMessage from here + + # XXX we could treat the message as an IMessage from here leap_assert_type(message, basestring) - uid_next = self.getUIDNext() - logger.debug('Adding msg with UID :%s' % uid_next) if flags is None: flags = tuple() else: flags = tuple(str(flag) for flag in flags) - d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) + d = self._do_add_message(message, flags=flags, date=date) return d - def _do_add_message(self, message, flags, date, uid): + def _do_add_message(self, message, flags, date): """ - Calls to the messageCollection add_msg method (deferred to thread). + Calls to the messageCollection add_msg method. Invoked from addMessage. """ - d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) + d = self.messages.add_msg(message, flags=flags, date=date) # XXX Removing notify temporarily. # This is interfering with imaptest results. I'm not clear if it's # because we clutter the logging or because the set of listeners is @@ -421,6 +443,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param args: ignored. """ + if not NOTIFY_NEW: + return exists = self.getMessageCount() recent = self.getRecentCount() logger.debug("NOTIFY: there are %s messages, %s recent" % ( @@ -445,6 +469,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # XXX removing the mailbox in situ for now, # we should postpone the removal + + # XXX move to memory store?? self._soledad.delete_doc(self._get_mbox()) def _close_cb(self, result): @@ -467,13 +493,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ if not self.isWriteable(): raise imap4.ReadOnlyMailbox - d = self.messages.remove_all_deleted() - d.addCallback(self._expunge_cb) - d.addCallback(self.messages.reset_last_uid) - - # XXX DEBUG ------------------- - # FIXME !!! - # XXX should remove the hdocset too!!! + d = defer.Deferred() + return self._memstore.expunge(self.mbox, d) + self._memstore.expunge(self.mbox) + d.addCallback(self._expunge_cb, d) return d def _bound_seq(self, messages_asked): @@ -509,7 +532,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred + @deferred_to_thread + #@profile def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. @@ -548,7 +572,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result = ((msgid, getmsg(msgid)) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -589,10 +613,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self.messages.all_flags() result = ((msgid, flagsPart( - msgid, all_flags[msgid])) for msgid in seq_messg) + msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -641,14 +665,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser): for msgid in seq_messg) return result - def signal_unread_to_ui(self): + def signal_unread_to_ui(self, *args, **kwargs): """ Sends unread event to ui. + + :param args: ignored + :param kwargs: ignored """ unseen = self.getUnseenCount() leap_events.signal(IMAP_UNREAD_MAIL, str(unseen)) - @deferred def store(self, messages_asked, flags, mode, uid): """ Sets the flags of one or more messages. @@ -670,49 +696,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise they are message sequence IDs. :type uid: bool - :return: A dict mapping message sequence numbers to sequences of - str representing the flags set on the message after this - operation has been performed. - :rtype: dict + :return: A deferred, that will be called with a dict mapping message + sequence numbers to sequences of str representing the flags + set on the message after this operation has been performed. + :rtype: deferred :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ + from twisted.internet import reactor + if not self.isWriteable(): + log.msg('read only mailbox!') + raise imap4.ReadOnlyMailbox + + d = defer.Deferred() + deferLater(reactor, 0, self._do_store, messages_asked, flags, + mode, uid, d) + return d + + def _do_store(self, messages_asked, flags, mode, uid, observer): + """ + Helper method, invoke set_flags method in the MessageCollection. + + See the documentation for the `store` method for the parameters. + + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag. + # XXX we should prevent cclient from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) - messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - - if not self.isWriteable(): - log.msg('read only mailbox!') - raise imap4.ReadOnlyMailbox - - result = {} - for msg_id in seq_messg: - log.msg("MSG ID = %s" % msg_id) - msg = self.messages.get_msg_by_uid(msg_id) - if not msg: - continue - if mode == 1: - msg.addFlags(flags) - elif mode == -1: - msg.removeFlags(flags) - elif mode == 0: - msg.setFlags(flags) - result[msg_id] = msg.getFlags() - - # After changing flags, we want to signal again to the - # UI because the number of unread might have changed. - # Hoever, we should probably limit this to INBOX only? - # this should really be called as a final callback of - # the do_STORE method... - from twisted.internet import reactor - deferLater(reactor, 1, self.signal_unread_to_ui) - return result + self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -760,44 +780,85 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier - @deferred - def copy(self, messageObject): + def copy(self, message): """ Copy the given message object into this mailbox. + + :param message: an IMessage implementor + :type message: LeapMessage + :return: a deferred that will be fired with the message + uid when the copy succeed. + :rtype: Deferred """ from twisted.internet import reactor - uid_next = self.getUIDNext() - msg = messageObject - # XXX DEBUG ---------------------------------------- - #print "copying MESSAGE from %s (%s) to %s (%s)" % ( - #msg._mbox, msg._uid, self.mbox, uid_next) + d = defer.Deferred() + # XXX this should not happen ... track it down, + # probably to FETCH... + if message is None: + log.msg("BUG: COPY found a None in passed message") + d.callback(None) + deferLater(reactor, 0, self._do_copy, message, d) + return d + + def _do_copy(self, message, observer): + """ + Call invoked from the deferLater in `copy`. This will + copy the flags and header documents, and pass them to the + `create_message` method in the MemoryStore, together with + the observer deferred that we've been passed along. + + :param message: an IMessage implementor + :type message: LeapMessage + :param observer: the deferred that will fire with the + UID of the message + :type observer: Deferred + """ + # XXX for clarity, this could be delegated to a + # MessageCollection mixin that implements copy too, and + # moved out of here. + msg = message + memstore = self._memstore # XXX should use a public api instead fdoc = msg._fdoc + hdoc = msg._hdoc if not fdoc: - logger.debug("Tried to copy a MSG with no fdoc") + logger.warning("Tried to copy a MSG with no fdoc") return - new_fdoc = copy.deepcopy(fdoc.content) - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = self.mbox - self._do_add_doc(new_fdoc) - # XXX should use a public api instead - hdoc = msg._hdoc - self.messages.add_hdocset_docid(hdoc.doc_id) + fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - deferLater(reactor, 1, self.notify_new) + # XXX is this hitting the db??? --- probably. + # We should profile after the pre-fetch. + dest_fdoc = memstore.get_fdoc_from_chash( + fdoc_chash, self.mbox) + exist = dest_fdoc and not empty(dest_fdoc.content) - def _do_add_doc(self, doc): - """ - Defer the adding of a new doc. + if exist: + # Should we signal error on the callback? + logger.warning("Destination message already exists!") - :param doc: document to be created in soledad. - :type doc: dict - """ - self._soledad.create_doc(doc) + # XXX I'm still not clear if we should raise the + # errback. This actually rases an ugly warning + # in some muas like thunderbird. I guess the user does + # not deserve that. + observer.callback(True) + else: + mbox = self.mbox + uid_next = memstore.increment_last_soledad_uid(mbox) + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = mbox + + # FIXME set recent! + + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper( + new_fdoc, hdoc.content), + observer=observer, + notify_on_disk=False) # convenience fun diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py new file mode 100644 index 0000000..195cef7 --- /dev/null +++ b/src/leap/mail/imap/memorystore.py @@ -0,0 +1,961 @@ +# -*- coding: utf-8 -*- +# memorystore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +In-memory transient store for a LEAPIMAPServer. +""" +import contextlib +import logging +import threading +import weakref + +from collections import defaultdict +from copy import copy + +from twisted.internet import defer +from twisted.internet.task import LoopingCall +from twisted.python import log +from zope.interface import implements + +from leap.common.check import leap_assert_type +from leap.mail import size +from leap.mail.decorators import deferred_to_thread +from leap.mail.utils import empty +from leap.mail.messageflow import MessageProducer +from leap.mail.imap import interfaces +from leap.mail.imap.fields import fields +from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import RecentFlagsDoc +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import ReferenciableDict + +logger = logging.getLogger(__name__) + + +# The default period to do writebacks to the permanent +# soledad storage, in seconds. +SOLEDAD_WRITE_PERIOD = 10 + + +@contextlib.contextmanager +def set_bool_flag(obj, att): + """ + Set a boolean flag to True while we're doing our thing. + Just to let the world know. + """ + setattr(obj, att, True) + try: + yield True + except RuntimeError as exc: + logger.exception(exc) + finally: + setattr(obj, att, False) + + +class MemoryStore(object): + """ + An in-memory store to where we can write the different parts that + we split the messages into and buffer them until we write them to the + permanent storage. + + It uses MessageWrapper instances to represent the message-parts, which are + indexed by mailbox name and UID. + + It also can be passed a permanent storage as a paremeter (any implementor + of IMessageStore, in this case a SoledadStore). In this case, a periodic + dump of the messages stored in memory will be done. The period of the + writes to the permanent storage is controled by the write_period parameter + in the constructor. + """ + implements(interfaces.IMessageStore, + interfaces.IMessageStoreWriter) + + # TODO We will want to index by chash when we transition to local-only + # UIDs. + + WRITING_FLAG = "_writing" + _last_uid_lock = threading.Lock() + + def __init__(self, permanent_store=None, + write_period=SOLEDAD_WRITE_PERIOD): + """ + Initialize a MemoryStore. + + :param permanent_store: a IMessageStore implementor to dump + messages to. + :type permanent_store: IMessageStore + :param write_period: the interval to dump messages to disk, in seconds. + :type write_period: int + """ + self._permanent_store = permanent_store + self._write_period = write_period + + # Internal Storage: messages + self._msg_store = {} + + # Internal Storage: payload-hash + """ + {'phash': weakreaf.proxy(dict)} + """ + self._phash_store = {} + + # Internal Storage: content-hash:fdoc + """ + chash-fdoc-store keeps references to + the flag-documents indexed by content-hash. + + {'chash': {'mbox-a': weakref.proxy(dict), + 'mbox-b': weakref.proxy(dict)} + } + """ + self._chash_fdoc_store = {} + + # Internal Storage: recent-flags store + """ + recent-flags store keeps one dict per mailbox, + with the document-id of the u1db document + and the set of the UIDs that have the recent flag. + + {'mbox-a': {'doc_id': 'deadbeef', + 'set': {1,2,3,4} + } + } + """ + # TODO this will have to transition to content-hash + # indexes after we move to local-only UIDs. + + self._rflags_store = defaultdict( + lambda: {'doc_id': None, 'set': set([])}) + + """ + last-uid store keeps the count of the highest UID + per mailbox. + + {'mbox-a': 42, + 'mbox-b': 23} + """ + self._last_uid = {} + + """ + known-uids keeps a count of the uids that soledad knows for a given + mailbox + + {'mbox-a': set([1,2,3])} + """ + self._known_uids = defaultdict(set) + + # New and dirty flags, to set MessageWrapper State. + self._new = set([]) + self._new_deferreds = {} + self._dirty = set([]) + self._rflags_dirty = set([]) + self._dirty_deferreds = {} + + # Flag for signaling we're busy writing to the disk storage. + setattr(self, self.WRITING_FLAG, False) + + if self._permanent_store is not None: + # this producer spits its messages to the permanent store + # consumer using a queue. We will use that to put + # our messages to be written. + self.producer = MessageProducer(permanent_store, + period=0.1) + # looping call for dumping to SoledadStore + self._write_loop = LoopingCall(self.write_messages, + permanent_store) + + # We can start the write loop right now, why wait? + self._start_write_loop() + + def _start_write_loop(self): + """ + Start loop for writing to disk database. + """ + if not self._write_loop.running: + self._write_loop.start(self._write_period, now=True) + + def _stop_write_loop(self): + """ + Stop loop for writing to disk database. + """ + if self._write_loop.running: + self._write_loop.stop() + + # IMessageStore + + # XXX this would work well for whole message operations. + # We would have to add a put_flags operation to modify only + # the flags doc (and set the dirty flag accordingly) + + def create_message(self, mbox, uid, message, observer, + notify_on_disk=True): + """ + Create the passed message into this MemoryStore. + + By default we consider that any message is a new message. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + :param message: a message to be added + :type message: MessageWrapper + :param observer: the deferred that will fire with the + UID of the message. If notify_on_disk is True, + this will happen when the message is written to + Soledad. Otherwise it will fire as soon as we've + added the message to the memory store. + :type observer: Deferred + :param notify_on_disk: whether the `observer` deferred should + wait until the message is written to disk to + be fired. + :type notify_on_disk: bool + """ + log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) + key = mbox, uid + + self._add_message(mbox, uid, message, notify_on_disk) + self._new.add(key) + + def log_add(result): + log.msg("message save: %s" % result) + return result + observer.addCallback(log_add) + + if notify_on_disk: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + if not notify_on_disk: + # Caller does not care, just fired and forgot, so we pass + # a defer that will inmediately have its callback triggered. + observer.callback(uid) + + def put_message(self, mbox, uid, message, notify_on_disk=True): + """ + Put an existing message. + + This will set the dirty flag on the MemoryStore. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + :param message: a message to be added + :type message: MessageWrapper + :param notify_on_disk: whether the deferred that is returned should + wait until the message is written to disk to + be fired. + :type notify_on_disk: bool + + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred + """ + key = mbox, uid + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message PUT save: %s" % result)) + + self._dirty.add(key) + self._dirty_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + return d + + def _add_message(self, mbox, uid, message, notify_on_disk=True): + """ + Helper method, called by both create_message and put_message. + See those for parameter documentation. + """ + # XXX have to differentiate between notify_new and notify_dirty + # TODO defaultdict the hell outa here... + + key = mbox, uid + msg_dict = message.as_dict() + + FDOC = MessagePartType.fdoc.key + HDOC = MessagePartType.hdoc.key + CDOCS = MessagePartType.cdocs.key + DOCS_ID = MessagePartType.docs_id.key + + try: + store = self._msg_store[key] + except KeyError: + self._msg_store[key] = {FDOC: {}, + HDOC: {}, + CDOCS: {}, + DOCS_ID: {}} + store = self._msg_store[key] + + fdoc = msg_dict.get(FDOC, None) + if fdoc: + if not store.get(FDOC, None): + store[FDOC] = ReferenciableDict({}) + store[FDOC].update(fdoc) + + # content-hash indexing + chash = fdoc.get(fields.CONTENT_HASH_KEY) + chash_fdoc_store = self._chash_fdoc_store + if not chash in chash_fdoc_store: + chash_fdoc_store[chash] = {} + + chash_fdoc_store[chash][mbox] = weakref.proxy( + store[FDOC]) + + hdoc = msg_dict.get(HDOC, None) + if hdoc is not None: + if not store.get(HDOC, None): + store[HDOC] = ReferenciableDict({}) + store[HDOC].update(hdoc) + + docs_id = msg_dict.get(DOCS_ID, None) + if docs_id: + if not store.get(DOCS_ID, None): + store[DOCS_ID] = {} + store[DOCS_ID].update(docs_id) + + cdocs = message.cdocs + for cdoc_key in cdocs.keys(): + if not store.get(CDOCS, None): + store[CDOCS] = {} + + cdoc = cdocs[cdoc_key] + # first we make it weak-referenciable + referenciable_cdoc = ReferenciableDict(cdoc) + store[CDOCS][cdoc_key] = referenciable_cdoc + phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) + if not phash: + continue + self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + + def prune(seq, store): + for key in seq: + if key in store and empty(store.get(key)): + store.pop(key) + prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + + def get_docid_for_fdoc(self, mbox, uid): + """ + Return Soledad document id for the flags-doc for a given mbox and uid, + or None of no flags document could be found. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + :rtype: unicode or None + """ + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if empty(fdoc): + return None + doc_id = fdoc.doc_id + return doc_id + + def get_message(self, mbox, uid, flags_only=False): + """ + Get a MessageWrapper for the given mbox and uid combination. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool + + :return: MessageWrapper or None + """ + key = mbox, uid + FDOC = MessagePartType.fdoc.key + + msg_dict = self._msg_store.get(key, None) + if empty(msg_dict): + return None + new, dirty = self._get_new_dirty_state(key) + if flags_only: + return MessageWrapper(fdoc=msg_dict[FDOC], + new=new, dirty=dirty, + memstore=weakref.proxy(self)) + else: + return MessageWrapper(from_dict=msg_dict, + new=new, dirty=dirty, + memstore=weakref.proxy(self)) + + def remove_message(self, mbox, uid): + """ + Remove a Message from this MemoryStore. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + """ + # XXX For the moment we are only removing the flags and headers + # docs. The rest we leave there polluting your hard disk, + # until we think about a good way of deorphaning. + + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + + try: + key = mbox, uid + self._new.discard(key) + self._dirty.discard(key) + self._msg_store.pop(key, None) + except Exception as exc: + logger.exception(exc) + + # IMessageStoreWriter + + def write_messages(self, store): + """ + Write the message documents in this MemoryStore to a different store. + + :param store: the IMessageStore to write to + """ + # For now, we pass if the queue is not empty, to avoid duplicate + # queuing. + # We would better use a flag to know when we've already enqueued an + # item. + + # XXX this could return the deferred for all the enqueued operations + + if not self.producer.is_queue_empty(): + return + + if any(map(lambda i: not empty(i), (self._new, self._dirty))): + logger.info("Writing messages to Soledad...") + + # TODO change for lock, and make the property access + # is accquired + with set_bool_flag(self, self.WRITING_FLAG): + for rflags_doc_wrapper in self.all_rdocs_iter(): + self.producer.push(rflags_doc_wrapper) + for msg_wrapper in self.all_new_dirty_msg_iter(): + self.producer.push(msg_wrapper) + + # MemoryStore specific methods. + + def get_uids(self, mbox): + """ + Get all uids for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: list + """ + all_keys = self._msg_store.keys() + return [uid for m, uid in all_keys if m == mbox] + + def get_soledad_known_uids(self, mbox): + """ + Get all uids that soledad knows about, from the memory cache. + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: list + """ + return self._known_uids.get(mbox, []) + + # last_uid + + def get_last_uid(self, mbox): + """ + Return the highest UID for a given mbox. + It will be the highest between the highest uid in the message store for + the mailbox, and the soledad integer cache. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: int + """ + uids = self.get_uids(mbox) + last_mem_uid = uids and max(uids) or 0 + last_soledad_uid = self.get_last_soledad_uid(mbox) + return max(last_mem_uid, last_soledad_uid) + + def get_last_soledad_uid(self, mbox): + """ + Get last uid for a given mbox from the soledad integer cache. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + return self._last_uid.get(mbox, 0) + + def set_last_soledad_uid(self, mbox, value): + """ + Set last uid for a given mbox in the soledad integer cache. + SoledadMailbox should prime this value during initialization. + Other methods (during message adding) SHOULD call + `increment_last_soledad_uid` instead. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + logger.info("setting last soledad uid for %s to %s" % + (mbox, value)) + # if we already have a value here, don't do anything + with self._last_uid_lock: + if not self._last_uid.get(mbox, None): + self._last_uid[mbox] = value + + def set_known_uids(self, mbox, value): + """ + Set the value fo the known-uids set for this mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: a sequence of integers to be added to the set. + :type value: tuple + """ + current = self._known_uids[mbox] + self._known_uids[mbox] = current.union(set(value)) + + def increment_last_soledad_uid(self, mbox): + """ + Increment by one the soledad integer cache for the last_uid for + this mbox, and fire a defer-to-thread to update the soledad value. + The caller should lock the call tho this method. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + with self._last_uid_lock: + self._last_uid[mbox] += 1 + value = self._last_uid[mbox] + self.write_last_uid(mbox, value) + return value + + @deferred_to_thread + def write_last_uid(self, mbox, value): + """ + Increment the soledad integer cache for the highest uid value. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + if self._permanent_store: + self._permanent_store.write_last_uid(mbox, value) + + # Counting sheeps... + + def count_new_mbox(self, mbox): + """ + Count the new messages by inbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: number of new messages + :rtype: int + """ + return len([(m, uid) for m, uid in self._new if mbox == mbox]) + + # XXX used at all? + def count_new(self): + """ + Count all the new messages in the MemoryStore. + + :rtype: int + """ + return len(self._new) + + def get_cdoc_from_phash(self, phash): + """ + Return a content-document by its payload-hash. + + :param phash: the payload hash to check against + :type phash: str or unicode + :rtype: MessagePartDoc + """ + doc = self._phash_store.get(phash, None) + + # XXX return None for consistency? + + # XXX have to keep a mapping between phash and its linkage + # info, to know if this payload is been already saved or not. + # We will be able to get this from the linkage-docs, + # not yet implemented. + new = True + dirty = False + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.cdoc, + content=doc, + doc_id=None) + + def get_fdoc_from_chash(self, chash, mbox): + """ + Return a flags-document by its content-hash and a given mailbox. + Used during content-duplication detection while copying or adding a + message. + + :param chash: the content hash to check against + :type chash: str or unicode + :param mbox: the mailbox + :type mbox: str or unicode + + :return: MessagePartDoc. It will return None if the flags document + has empty content or it is flagged as \\Deleted. + """ + docs_dict = self._chash_fdoc_store.get(chash, None) + fdoc = docs_dict.get(mbox, None) if docs_dict else None + + # a couple of special cases. + # 1. We might have a doc with empty content... + if empty(fdoc): + return None + + # 2. ...Or the message could exist, but being flagged for deletion. + # We want to create a new one in this case. + # Hmmm what if the deletion is un-done?? We would end with a + # duplicate... + if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + return None + + uid = fdoc[fields.UID_KEY] + key = mbox, uid + new = key in self._new + dirty = key in self._dirty + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.fdoc, + content=fdoc, + doc_id=None) + + def all_msg_iter(self): + """ + Return generator that iterates through all messages in the store. + + :return: generator of MessageWrappers + :rtype: generator + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys())) + + def all_new_dirty_msg_iter(self): + """ + Return generator that iterates through all new and dirty messages. + + :return: generator of MessageWrappers + :rtype: generator + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys()) + if key in self._new or key in self._dirty) + + def all_msg_dict_for_mbox(self, mbox): + """ + Return all the message dicts for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: list of dictionaries + :rtype: list + """ + # This *needs* to return a fixed sequence. Otherwise the dictionary len + # will change during iteration, when we modify it + return [self._msg_store[(mb, uid)] + for mb, uid in self._msg_store if mb == mbox] + + def all_deleted_uid_iter(self, mbox): + """ + Return a list with the UIDs for all messags + with deleted flag in a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: list of integers + :rtype: list + """ + # This *needs* to return a fixed sequence. Otherwise the dictionary len + # will change during iteration, when we modify it + all_deleted = [ + msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) + if msg.get('fdoc', None) + and fields.DELETED_FLAG in msg['fdoc']['flags']] + return all_deleted + + # new, dirty flags + + def _get_new_dirty_state(self, key): + """ + Return `new` and `dirty` flags for a given message. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + :return: tuple of bools + :rtype: tuple + """ + # XXX should return *first* the news, and *then* the dirty... + return map(lambda _set: key in _set, (self._new, self._dirty)) + + def set_new(self, key): + """ + Add the key value to the `new` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._new.add(key) + + def unset_new(self, key): + """ + Remove the key value from the `new` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._new.discard(key) + deferreds = self._new_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + def set_dirty(self, key): + """ + Add the key value to the `dirty` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._dirty.add(key) + + def unset_dirty(self, key): + """ + Remove the key value from the `dirty` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._dirty.discard(key) + deferreds = self._dirty_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + # Recent Flags + + # TODO --- nice but unused + def set_recent_flag(self, mbox, uid): + """ + Set the `Recent` flag for a given mailbox and UID. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + """ + self._rflags_dirty.add(mbox) + self._rflags_store[mbox]['set'].add(uid) + + # TODO --- nice but unused + def unset_recent_flag(self, mbox, uid): + """ + Unset the `Recent` flag for a given mailbox and UID. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + """ + self._rflags_store[mbox]['set'].discard(uid) + + def set_recent_flags(self, mbox, value): + """ + Set the value for the set of the recent flags. + Used from the property in the MessageCollection. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: a sequence of flags to set + :type value: sequence + """ + self._rflags_dirty.add(mbox) + self._rflags_store[mbox]['set'] = set(value) + + def load_recent_flags(self, mbox, flags_doc): + """ + Load the passed flags document in the recent flags store, for a given + mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param flags_doc: A dictionary containing the `doc_id` of the Soledad + flags-document for this mailbox, and the `set` + of uids marked with that flag. + """ + self._rflags_store[mbox] = flags_doc + + def get_recent_flags(self, mbox): + """ + Return the set of UIDs with the `Recent` flag for this mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: set, or None + """ + rflag_for_mbox = self._rflags_store.get(mbox, None) + if not rflag_for_mbox: + return None + return self._rflags_store[mbox]['set'] + + def all_rdocs_iter(self): + """ + Return an iterator through all in-memory recent flag dicts, wrapped + under a RecentFlagsDoc namedtuple. + Used for saving to disk. + + :return: a generator of RecentFlagDoc + :rtype: generator + """ + # XXX use enums + DOC_ID = "doc_id" + SET = "set" + + rflags_store = self._rflags_store + + def get_rdoc(mbox, rdict): + mbox_rflag_set = rdict[SET] + recent_set = copy(mbox_rflag_set) + # zero it! + mbox_rflag_set.difference_update(mbox_rflag_set) + return RecentFlagsDoc( + doc_id=rflags_store[mbox][DOC_ID], + content={ + fields.TYPE_KEY: fields.TYPE_RECENT_VAL, + fields.MBOX_KEY: mbox, + fields.RECENTFLAGS_KEY: list(recent_set) + }) + + return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() + if not empty(rdict[SET])) + + # Methods that mirror the IMailbox interface + + def remove_all_deleted(self, mbox): + """ + Remove all messages flagged \\Deleted from this Memory Store only. + Called from `expunge` + + :param mbox: the mailbox + :type mbox: str or unicode + :return: a list of UIDs + :rtype: list + """ + mem_deleted = self.all_deleted_uid_iter(mbox) + for uid in mem_deleted: + self.remove_message(mbox, uid) + return mem_deleted + + def expunge(self, mbox, observer): + """ + Remove all messages flagged \\Deleted, from the Memory Store + and from the permanent store also. + + :param mbox: the mailbox + :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred + :return: a list of UIDs + :rtype: list + """ + # TODO expunge should add itself as a callback to the ongoing + # writes. + soledad_store = self._permanent_store + all_deleted = [] + + try: + # 1. Stop the writing call + self._stop_write_loop() + # 2. Enqueue a last write. + #self.write_messages(soledad_store) + # 3. Should wait on the writebacks to finish ??? + # FIXME wait for this, and add all the rest of the method + # as a callback!!! + except Exception as exc: + logger.exception(exc) + + # Now, we...: + + try: + # 1. Delete all messages marked as deleted in soledad. + + # XXX this could be deferred for faster operation. + if soledad_store: + sol_deleted = soledad_store.remove_all_deleted(mbox) + else: + sol_deleted = [] + + try: + self._known_uids[mbox].difference_update(set(sol_deleted)) + except Exception as exc: + logger.exception(exc) + + # 2. Delete all messages marked as deleted in memory. + mem_deleted = self.remove_all_deleted(mbox) + + all_deleted = set(mem_deleted).union(set(sol_deleted)) + logger.debug("deleted %r" % all_deleted) + except Exception as exc: + logger.exception(exc) + finally: + self._start_write_loop() + observer.callback(True) + return all_deleted + + # Dump-to-disk controls. + + @property + def is_writing(self): + """ + Property that returns whether the store is currently writing its + internal state to a permanent storage. + + Used to evaluate whether the CHECK command can inform that the field + is clear to proceed, or waiting for the write operations to complete + is needed instead. + + :rtype: bool + """ + # FIXME this should return a deferred !!! + # XXX ----- can fire when all new + dirty deferreds + # are done (gatherResults) + return getattr(self, self.WRITING_FLAG) + + # Memory management. + + def get_size(self): + """ + Return the size of the internal storage. + Use for calculating the limit beyond which we should flush the store. + + :rtype: int + """ + return size.get_size(self._msg_store) diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py new file mode 100644 index 0000000..b07681b --- /dev/null +++ b/src/leap/mail/imap/messageparts.py @@ -0,0 +1,565 @@ +# messageparts.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +MessagePart implementation. Used from LeapMessage. +""" +import logging +import StringIO +import weakref + +from collections import namedtuple + +from enum import Enum +from zope.interface import implements +from twisted.mail import imap4 + +from leap.common.decorators import memoized_method +from leap.common.mail import get_email_charset +from leap.mail.imap import interfaces +from leap.mail.imap.fields import fields +from leap.mail.utils import empty, first, find_charset + +MessagePartType = Enum("hdoc", "fdoc", "cdoc", "cdocs", "docs_id") + + +logger = logging.getLogger(__name__) + + +""" +A MessagePartDoc is a light wrapper around the dictionary-like +data that we pass along for message parts. It can be used almost everywhere +that you would expect a SoledadDocument, since it has a dict under the +`content` attribute. + +We also keep some metadata on it, relative in part to the message as a whole, +and sometimes to a part in particular only. + +* `new` indicates that the document has just been created. SoledadStore + should just create a new doc for all the related message parts. +* `store` indicates the type of store a given MessagePartDoc lives in. + We currently use this to indicate that the document comes from memeory, + but we should probably get rid of it as soon as we extend the use of the + SoledadStore interface along LeapMessage, MessageCollection and Mailbox. +* `part` is one of the MessagePartType enums. + +* `dirty` indicates that, while we already have the document in Soledad, + we have modified its state in memory, so we need to put_doc instead while + dumping the MemoryStore contents. + `dirty` attribute would only apply to flags-docs and linkage-docs. +* `doc_id` is the identifier for the document in the u1db database, if any. + +""" + +MessagePartDoc = namedtuple( + 'MessagePartDoc', + ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) + +""" +A RecentFlagsDoc is used to send the recent-flags document payload to the +SoledadWriter during dumps. +""" +RecentFlagsDoc = namedtuple( + 'RecentFlagsDoc', + ['content', 'doc_id']) + + +class ReferenciableDict(dict): + """ + A dict that can be weak-referenced. + + Some builtin objects are not weak-referenciable unless + subclassed. So we do. + + Used to return pointers to the items in the MemoryStore. + """ + + +class MessageWrapper(object): + """ + A simple nested dictionary container around the different message subparts. + """ + implements(interfaces.IMessageContainer) + + FDOC = "fdoc" + HDOC = "hdoc" + CDOCS = "cdocs" + DOCS_ID = "docs_id" + + # Using slots to limit some the memory footprint, + # Add your attribute here. + + __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] + + def __init__(self, fdoc=None, hdoc=None, cdocs=None, + from_dict=None, memstore=None, + new=True, dirty=False, docs_id={}): + """ + Initialize a MessageWrapper. + """ + # TODO add optional reference to original message in the incoming + self._dict = {} + self.memstore = memstore + + self._new = new + self._dirty = dirty + + self._storetype = "mem" + + if from_dict is not None: + self.from_dict(from_dict) + else: + if fdoc is not None: + self._dict[self.FDOC] = ReferenciableDict(fdoc) + if hdoc is not None: + self._dict[self.HDOC] = ReferenciableDict(hdoc) + if cdocs is not None: + self._dict[self.CDOCS] = ReferenciableDict(cdocs) + + # This will keep references to the doc_ids to be able to put + # messages to soledad. It will be populated during the walk() to avoid + # the overhead of reading from the db. + + # XXX it really *only* make sense for the FDOC, the other parts + # should not be "dirty", just new...!!! + self._dict[self.DOCS_ID] = docs_id + + # properties + + # TODO Could refactor new and dirty properties together. + + def _get_new(self): + """ + Get the value for the `new` flag. + + :rtype: bool + """ + return self._new + + def _set_new(self, value=True): + """ + Set the value for the `new` flag, and propagate it + to the memory store if any. + + :param value: the value to set + :type value: bool + """ + self._new = value + if self.memstore: + mbox = self.fdoc.content['mbox'] + uid = self.fdoc.content['uid'] + key = mbox, uid + fun = [self.memstore.unset_new, + self.memstore.set_new][int(value)] + fun(key) + else: + logger.warning("Could not find a memstore referenced from this " + "MessageWrapper. The value for new will not be " + "propagated") + + new = property(_get_new, _set_new, + doc="The `new` flag for this MessageWrapper") + + def _get_dirty(self): + """ + Get the value for the `dirty` flag. + + :rtype: bool + """ + return self._dirty + + def _set_dirty(self, value=True): + """ + Set the value for the `dirty` flag, and propagate it + to the memory store if any. + + :param value: the value to set + :type value: bool + """ + self._dirty = value + if self.memstore: + mbox = self.fdoc.content['mbox'] + uid = self.fdoc.content['uid'] + key = mbox, uid + fun = [self.memstore.unset_dirty, + self.memstore.set_dirty][int(value)] + fun(key) + else: + logger.warning("Could not find a memstore referenced from this " + "MessageWrapper. The value for new will not be " + "propagated") + + dirty = property(_get_dirty, _set_dirty) + + # IMessageContainer + + @property + def fdoc(self): + """ + Return a MessagePartDoc wrapping around a weak reference to + the flags-document in this MemoryStore, if any. + + :rtype: MessagePartDoc + """ + _fdoc = self._dict.get(self.FDOC, None) + if _fdoc: + content_ref = weakref.proxy(_fdoc) + else: + logger.warning("NO FDOC!!!") + content_ref = {} + + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.fdoc, + content=content_ref, + doc_id=self._dict[self.DOCS_ID].get( + self.FDOC, None)) + + @property + def hdoc(self): + """ + Return a MessagePartDoc wrapping around a weak reference to + the headers-document in this MemoryStore, if any. + + :rtype: MessagePartDoc + """ + _hdoc = self._dict.get(self.HDOC, None) + if _hdoc: + content_ref = weakref.proxy(_hdoc) + else: + content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.hdoc, + content=content_ref, + doc_id=self._dict[self.DOCS_ID].get( + self.HDOC, None)) + + @property + def cdocs(self): + """ + Return a weak reference to a zero-indexed dict containing + the content-documents, or an empty dict if none found. + If you want access to the MessagePartDoc for the individual + parts, use the generator returned by `walk` instead. + + :rtype: dict + """ + _cdocs = self._dict.get(self.CDOCS, None) + if _cdocs: + return weakref.proxy(_cdocs) + else: + return {} + + def walk(self): + """ + Generator that iterates through all the parts, returning + MessagePartDoc. Used for writing to SoledadStore. + + :rtype: generator + """ + if self._dirty: + mbox = self.fdoc.content[fields.MBOX_KEY] + uid = self.fdoc.content[fields.UID_KEY] + docid_dict = self._dict[self.DOCS_ID] + docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( + mbox, uid) + + if not empty(self.fdoc.content): + yield self.fdoc + if not empty(self.hdoc.content): + yield self.hdoc + for cdoc in self.cdocs.values(): + if not empty(cdoc): + content_ref = weakref.proxy(cdoc) + yield MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.cdoc, + content=content_ref, + doc_id=None) + + # i/o + + def as_dict(self): + """ + Return a dict representation of the parts contained. + + :rtype: dict + """ + return self._dict + + def from_dict(self, msg_dict): + """ + Populate MessageWrapper parts from a dictionary. + It expects the same format that we use in a + MessageWrapper. + + + :param msg_dict: a dictionary containing the parts to populate + the MessageWrapper from + :type msg_dict: dict + """ + fdoc, hdoc, cdocs = map( + lambda part: msg_dict.get(part, None), + [self.FDOC, self.HDOC, self.CDOCS]) + + for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), + (self.CDOCS, cdocs)): + self._dict[t] = ReferenciableDict(doc) if doc else None + + +class MessagePart(object): + """ + IMessagePart implementor, to be passed to several methods + of the IMAP4Server. + It takes a subpart message and is able to find + the inner parts. + + See the interface documentation. + """ + + implements(imap4.IMessagePart) + + def __init__(self, soledad, part_map): + """ + Initializes the MessagePart. + + :param soledad: Soledad instance. + :type soledad: Soledad + :param part_map: a dictionary containing the parts map for this + message + :type part_map: dict + """ + # TODO + # It would be good to pass the uid/mailbox also + # for references while debugging. + + # We have a problem on bulk moves, and is + # that when the fetch on the new mailbox is done + # the parts maybe are not complete. + # So we should be able to fail with empty + # docs until we solve that. The ideal would be + # to gather the results of the deferred operations + # to signal the operation is complete. + #leap_assert(part_map, "part map dict cannot be null") + + self._soledad = soledad + self._pmap = part_map + + def getSize(self): + """ + Return the total size, in octets, of this message part. + + :return: size of the message, in octets + :rtype: int + """ + if empty(self._pmap): + return 0 + size = self._pmap.get('size', None) + if size is None: + logger.error("Message part cannot find size in the partmap") + size = 0 + return size + + def getBodyFile(self): + """ + Retrieve a file object containing only the body of this message. + + :return: file-like object opened for reading + :rtype: StringIO + """ + fd = StringIO.StringIO() + if not empty(self._pmap): + multi = self._pmap.get('multi') + if not multi: + phash = self._pmap.get("phash", None) + else: + pmap = self._pmap.get('part_map') + first_part = pmap.get('1', None) + if not empty(first_part): + phash = first_part['phash'] + else: + phash = None + + if phash is None: + logger.warning("Could not find phash for this subpart!") + payload = "" + else: + payload = self._get_payload_from_document(phash) + + else: + logger.warning("Message with no part_map!") + payload = "" + + if payload: + content_type = self._get_ctype_from_document(phash) + charset = find_charset(content_type) + logger.debug("Got charset from header: %s" % (charset,)) + if charset is None: + charset = self._get_charset(payload) + logger.debug("Got charset: %s" % (charset,)) + try: + if isinstance(payload, unicode): + payload = payload.encode(charset) + except UnicodeError as exc: + logger.error( + "Unicode error, using 'replace'. {0!r}".format(exc)) + payload = payload.encode(charset, 'replace') + + fd.write(payload) + fd.seek(0) + return fd + + # TODO should memory-bound this memoize!!! + @memoized_method + def _get_payload_from_document(self, phash): + """ + Return the message payload from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: str or unicode + :rtype: str or unicode + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if cdoc is None: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + payload = "" + else: + payload = cdoc.content.get(fields.RAW_KEY, "") + return payload + + # TODO should memory-bound this memoize!!! + @memoized_method + def _get_ctype_from_document(self, phash): + """ + Reeturn the content-type from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: str or unicode + :rtype: str or unicode + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + ctype = cdoc.content.get('ctype', "") + return ctype + + @memoized_method + def _get_charset(self, stuff): + # TODO put in a common class with LeapMessage + """ + Gets (guesses?) the charset of a payload. + + :param stuff: the stuff to guess about. + :type stuff: str or unicode + :return: charset + :rtype: unicode + """ + # XXX existential doubt 2. shouldn't we make the scope + # of the decorator somewhat more persistent? + # ah! yes! and put memory bounds. + return get_email_charset(stuff) + + def getHeaders(self, negate, *names): + """ + Retrieve a group of message headers. + + :param names: The names of the headers to retrieve or omit. + :type names: tuple of str + + :param negate: If True, indicates that the headers listed in names + should be omitted from the return value, rather + than included. + :type negate: bool + + :return: A mapping of header field names to header field values + :rtype: dict + """ + # XXX refactor together with MessagePart method + if not self._pmap: + logger.warning("No pmap in Subpart!") + return {} + headers = dict(self._pmap.get("headers", [])) + + names = map(lambda s: s.upper(), names) + if negate: + cond = lambda key: key.upper() not in names + else: + cond = lambda key: key.upper() in names + + # default to most likely standard + charset = find_charset(headers, "utf-8") + headers2 = dict() + for key, value in headers.items(): + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... + if key.lower() == "content-type": + key = key.lower() + + if not isinstance(key, str): + key = key.encode(charset, 'replace') + if not isinstance(value, str): + value = value.encode(charset, 'replace') + + # filter original dict by negate-condition + if cond(key): + headers2[key] = value + return headers2 + + def isMultipart(self): + """ + Return True if this message is multipart. + """ + if empty(self._pmap): + logger.warning("Could not get part map!") + return False + multi = self._pmap.get("multi", False) + return multi + + def getSubPart(self, part): + """ + Retrieve a MIME submessage + + :type part: C{int} + :param part: The number of the part to retrieve, indexed from 0. + :raise IndexError: Raised if the specified part does not exist. + :raise TypeError: Raised if this message is not multipart. + :rtype: Any object implementing C{IMessagePart}. + :return: The specified sub-part. + """ + if not self.isMultipart(): + raise TypeError + + sub_pmap = self._pmap.get("part_map", {}) + try: + part_map = sub_pmap[str(part + 1)] + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) + raise IndexError + + # XXX check for validity + return MessagePart(self._soledad, part_map) diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index 34304ea..25fc55f 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -20,17 +20,15 @@ LeapMessage and MessageCollection. import copy import logging import re -import time import threading import StringIO -from collections import defaultdict, namedtuple +from collections import defaultdict from functools import partial from twisted.mail import imap4 from twisted.internet import defer from twisted.python import log -from u1db import errors as u1db_errors from zope.interface import implements from zope.proxy import sameProxiedObjects @@ -38,33 +36,30 @@ from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk -from leap.mail.utils import first, find_charset -from leap.mail.decorators import deferred +from leap.mail.utils import first, find_charset, lowerdict, empty +from leap.mail.utils import stringify_parts_map +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields +from leap.mail.imap.memorystore import MessageWrapper +from leap.mail.imap.messageparts import MessagePart from leap.mail.imap.parser import MailParser, MBoxParser -from leap.mail.messageflow import IMessageConsumer logger = logging.getLogger(__name__) # TODO ------------------------------------------------------------ +# [ ] Add ref to incoming message during add_msg # [ ] Add linked-from info. +# * Need a new type of documents: linkage info. +# * HDOCS are linked from FDOCs (ref to chash) +# * CDOCS are linked from HDOCS (ref to chash) + # [ ] Delete incoming mail only after successful write! # [ ] Remove UID from syncable db. Store only those indexes locally. - -def lowerdict(_dict): - """ - Return a dict with the keys in lowercase. - - :param _dict: the dict to convert - :rtype: dict - """ - # TODO should properly implement a CaseInsensitive dict. - # Look into requests code. - return dict((key.lower(), value) - for key, value in _dict.items()) +MSGID_PATTERN = r"""<([\w@.]+)>""" +MSGID_RE = re.compile(MSGID_PATTERN) def try_unique_query(curried): @@ -92,232 +87,6 @@ def try_unique_query(curried): except Exception as exc: logger.exception("Unhandled error %r" % exc) -MSGID_PATTERN = r"""<([\w@.]+)>""" -MSGID_RE = re.compile(MSGID_PATTERN) - - -class MessagePart(object): - """ - IMessagePart implementor. - It takes a subpart message and is able to find - the inner parts. - - Excusatio non petita: see the interface documentation. - """ - - implements(imap4.IMessagePart) - - def __init__(self, soledad, part_map): - """ - Initializes the MessagePart. - - :param part_map: a dictionary containing the parts map for this - message - :type part_map: dict - """ - # TODO - # It would be good to pass the uid/mailbox also - # for references while debugging. - - # We have a problem on bulk moves, and is - # that when the fetch on the new mailbox is done - # the parts maybe are not complete. - # So we should be able to fail with empty - # docs until we solve that. The ideal would be - # to gather the results of the deferred operations - # to signal the operation is complete. - #leap_assert(part_map, "part map dict cannot be null") - self._soledad = soledad - self._pmap = part_map - - def getSize(self): - """ - Return the total size, in octets, of this message part. - - :return: size of the message, in octets - :rtype: int - """ - if not self._pmap: - return 0 - size = self._pmap.get('size', None) - if not size: - logger.error("Message part cannot find size in the partmap") - return size - - def getBodyFile(self): - """ - Retrieve a file object containing only the body of this message. - - :return: file-like object opened for reading - :rtype: StringIO - """ - fd = StringIO.StringIO() - if self._pmap: - multi = self._pmap.get('multi') - if not multi: - phash = self._pmap.get("phash", None) - else: - pmap = self._pmap.get('part_map') - first_part = pmap.get('1', None) - if first_part: - phash = first_part['phash'] - - if not phash: - logger.warning("Could not find phash for this subpart!") - payload = str("") - else: - payload = self._get_payload_from_document(phash) - - else: - logger.warning("Message with no part_map!") - payload = str("") - - if payload: - content_type = self._get_ctype_from_document(phash) - charset = find_charset(content_type) - logger.debug("Got charset from header: %s" % (charset,)) - if charset is None: - charset = self._get_charset(payload) - logger.debug("Got charset: %s" % (charset,)) - try: - payload = payload.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) - payload = payload.encode(charset, 'replace') - - fd.write(payload) - fd.seek(0) - return fd - - # TODO cache the phash retrieval - def _get_payload_from_document(self, phash): - """ - Gets the message payload from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: basestring - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - payload = cdoc.content.get(fields.RAW_KEY, "") - return payload - - # TODO cache the pahash retrieval - def _get_ctype_from_document(self, phash): - """ - Gets the content-type from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: basestring - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - ctype = cdoc.content.get('ctype', "") - return ctype - - @memoized_method - def _get_charset(self, stuff): - # TODO put in a common class with LeapMessage - """ - Gets (guesses?) the charset of a payload. - - :param stuff: the stuff to guess about. - :type stuff: basestring - :returns: charset - """ - # XXX existential doubt 2. shouldn't we make the scope - # of the decorator somewhat more persistent? - # ah! yes! and put memory bounds. - return get_email_charset(unicode(stuff)) - - def getHeaders(self, negate, *names): - """ - Retrieve a group of message headers. - - :param names: The names of the headers to retrieve or omit. - :type names: tuple of str - - :param negate: If True, indicates that the headers listed in names - should be omitted from the return value, rather - than included. - :type negate: bool - - :return: A mapping of header field names to header field values - :rtype: dict - """ - if not self._pmap: - logger.warning("No pmap in Subpart!") - return {} - headers = dict(self._pmap.get("headers", [])) - - # twisted imap server expects *some* headers to be lowercase - # We could use a CaseInsensitiveDict here... - headers = dict( - (str(key), str(value)) if key.lower() != "content-type" - else (str(key.lower()), str(value)) - for (key, value) in headers.items()) - - names = map(lambda s: s.upper(), names) - if negate: - cond = lambda key: key.upper() not in names - else: - cond = lambda key: key.upper() in names - - # unpack and filter original dict by negate-condition - filter_by_cond = [ - map(str, (key, val)) for - key, val in headers.items() - if cond(key)] - filtered = dict(filter_by_cond) - return filtered - - def isMultipart(self): - """ - Return True if this message is multipart. - """ - if not self._pmap: - logger.warning("Could not get part map!") - return False - multi = self._pmap.get("multi", False) - return multi - - def getSubPart(self, part): - """ - Retrieve a MIME submessage - - :type part: C{int} - :param part: The number of the part to retrieve, indexed from 0. - :raise IndexError: Raised if the specified part does not exist. - :raise TypeError: Raised if this message is not multipart. - :rtype: Any object implementing C{IMessagePart}. - :return: The specified sub-part. - """ - if not self.isMultipart(): - raise TypeError - sub_pmap = self._pmap.get("part_map", {}) - try: - part_map = sub_pmap[str(part + 1)] - except KeyError: - logger.debug("getSubpart for %s: KeyError" % (part,)) - raise IndexError - - # XXX check for validity - return MessagePart(self._soledad, part_map) - class LeapMessage(fields, MailParser, MBoxParser): """ @@ -328,12 +97,14 @@ class LeapMessage(fields, MailParser, MBoxParser): """ # TODO this has to change. - # Should index primarily by chash, and keep a local-lonly + # Should index primarily by chash, and keep a local-only # UID table. implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox, collection=None): + flags_lock = threading.Lock() + + def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -342,32 +113,54 @@ class LeapMessage(fields, MailParser, MBoxParser): :param uid: the UID for the message. :type uid: int or basestring :param mbox: the mbox this message belongs to - :type mbox: basestring + :type mbox: str or unicode :param collection: a reference to the parent collection object :type collection: MessageCollection + :param container: a IMessageContainer implementor instance + :type container: IMessageContainer """ MailParser.__init__(self) self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) self._collection = collection + self._container = container self.__chash = None self.__bdoc = None + # XXX make these properties public + @property def _fdoc(self): """ An accessor to the flags document. """ if all(map(bool, (self._uid, self._mbox))): - fdoc = self._get_flags_doc() + fdoc = None + if self._container is not None: + fdoc = self._container.fdoc + if not fdoc: + fdoc = self._get_flags_doc() if fdoc: - self.__chash = fdoc.content.get( + fdoc_content = fdoc.content + self.__chash = fdoc_content.get( fields.CONTENT_HASH_KEY, None) return fdoc @property + def _hdoc(self): + """ + An accessor to the headers document. + """ + if self._container is not None: + hdoc = self._container.hdoc + if hdoc and not empty(hdoc.content): + return hdoc + # XXX cache this into the memory store !!! + return self._get_headers_doc() + + @property def _chash(self): """ An accessor to the content hash for this message. @@ -380,13 +173,6 @@ class LeapMessage(fields, MailParser, MBoxParser): return self.__chash @property - def _hdoc(self): - """ - An accessor to the headers document. - """ - return self._get_headers_doc() - - @property def _bdoc(self): """ An accessor to the body document. @@ -415,39 +201,33 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The flags, represented as strings :rtype: tuple """ - if self._uid is None: - return [] uid = self._uid - flags = [] + flags = set([]) fdoc = self._fdoc if fdoc: - flags = fdoc.content.get(self.FLAGS_KEY, None) + flags = set(fdoc.content.get(self.FLAGS_KEY, None)) msgcol = self._collection # We treat the recent flag specially: gotten from # a mailbox-level document. if msgcol and uid in msgcol.recent_flags: - flags.append(fields.RECENT_FLAG) + flags.add(fields.RECENT_FLAG) if flags: flags = map(str, flags) return tuple(flags) - # setFlags, addFlags, removeFlags are not in the interface spec - # but we use them with store command. + # setFlags not in the interface spec but we use it with store command. - def setFlags(self, flags): + def setFlags(self, flags, mode): """ Sets the flags for this message - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to update in the message. :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -458,42 +238,36 @@ class LeapMessage(fields, MailParser, MBoxParser): "Could not find FDOC for %s:%s while setting flags!" % (self._mbox, self._uid)) return - doc.content[self.FLAGS_KEY] = flags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - self._soledad.put_doc(doc) - - def addFlags(self, flags): - """ - Adds flags to this message. - - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to add to the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(flags + oldflags))) - - def removeFlags(self, flags): - """ - Remove flags from this message. - - Returns a SoledadDocument that needs to be updated by the caller. - - :param flags: the flags to be removed from the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(oldflags) - set(flags))) + APPEND = 1 + REMOVE = -1 + SET = 0 + + with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) + return map(str, newflags) def getInternalDate(self): """ @@ -519,29 +293,42 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: file-like object opened for reading :rtype: StringIO """ + def write_fd(body): + fd.write(body) + fd.seek(0) + return fd + # TODO refactor with getBodyFile in MessagePart + fd = StringIO.StringIO() - bdoc = self._bdoc - if bdoc: - body = self._bdoc.content.get(self.RAW_KEY, "") - content_type = bdoc.content.get('content-type', "") + + if self._bdoc is not None: + bdoc_content = self._bdoc.content + if empty(bdoc_content): + logger.warning("No BDOC content found for message!!!") + return write_fd("") + + body = bdoc_content.get(self.RAW_KEY, "") + content_type = bdoc_content.get('content-type', "") charset = find_charset(content_type) + logger.debug('got charset from content-type: %s' % charset) if charset is None: charset = self._get_charset(body) try: - body = body.encode(charset) - except UnicodeError as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) + if isinstance(body, unicode): + body = body.encode(charset) + except UnicodeError as exc: + logger.error( + "Unicode error, using 'replace'. {0!r}".format(exc)) + logger.debug("Attempted to encode with: %s" % charset) body = body.encode(charset, 'replace') + finally: + return write_fd(body) # We are still returning funky characters from here. else: logger.warning("No BDOC found for message.") - body = str("") - - fd.write(body) - fd.seek(0) - return fd + return write_fd("") @memoized_method def _get_charset(self, stuff): @@ -552,11 +339,10 @@ class LeapMessage(fields, MailParser, MBoxParser): :type stuff: basestring :returns: charset """ - # TODO get from subpart headers - # XXX existential doubt 2. shouldn't we make the scope + # XXX shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. - return get_email_charset(unicode(stuff)) + return get_email_charset(stuff) def getSize(self): """ @@ -567,7 +353,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ size = None if self._fdoc: - size = self._fdoc.content.get(self.SIZE_KEY, False) + fdoc_content = self._fdoc.content + size = fdoc_content.get(self.SIZE_KEY, False) else: logger.warning("No FLAGS doc for %s:%s" % (self._mbox, self._uid)) @@ -592,6 +379,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: dict """ # TODO split in smaller methods + # XXX refactor together with MessagePart method + headers = self._get_headers() if not headers: logger.warning("No headers found") @@ -608,11 +397,10 @@ class LeapMessage(fields, MailParser, MBoxParser): # default to most likely standard charset = find_charset(headers, "utf-8") - - # twisted imap server expects *some* headers to be lowercase - # XXX refactor together with MessagePart method headers2 = dict() for key, value in headers.items(): + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... if key.lower() == "content-type": key = key.lower() @@ -621,10 +409,13 @@ class LeapMessage(fields, MailParser, MBoxParser): if not isinstance(value, str): value = value.encode(charset, 'replace') + if value.endswith(";"): + # bastards + value = value[:-1] + # filter original dict by negate-condition if cond(key): headers2[key] = value - return headers2 def _get_headers(self): @@ -632,7 +423,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + hdoc_content = self._hdoc.content + headers = hdoc_content.get(self.HEADERS_KEY, {}) return headers else: @@ -646,7 +438,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + fdoc_content = self._fdoc.content + is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) return is_multipart else: logger.warning( @@ -688,9 +481,15 @@ class LeapMessage(fields, MailParser, MBoxParser): logger.warning("Tried to get part but no HDOC found!") return None - pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + hdoc_content = self._hdoc.content + pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) + + # remember, lads, soledad is using strings in its keys, + # not integers! return pmap[str(part)] + # XXX moved to memory store + # move the rest too. ------------------------------------------ def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -724,16 +523,31 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_phash = self._hdoc.content.get( + hdoc_content = self._hdoc.content + body_phash = hdoc_content.get( fields.BODY_KEY, None) if not body_phash: logger.warning("No body phash for this document!") return None - body_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(body_phash)) - return first(body_docs) + # XXX get from memstore too... + # if memstore: memstore.get_phrash + # memstore should keep a dict with weakrefs to the + # phash doc... + + if self._container is not None: + bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) + if not empty(bdoc) and not empty(bdoc.content): + return bdoc + + # no memstore, or no body doc found there + if self._soledad: + body_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(body_phash)) + return first(body_docs) + else: + logger.error("No phash in container, and no soledad found!") def __getitem__(self, key): """ @@ -748,216 +562,19 @@ class LeapMessage(fields, MailParser, MBoxParser): """ return self._fdoc.content.get(key, None) - # setters - - # XXX to be used in the messagecopier interface?! - - def set_uid(self, uid): - """ - Set new uid for this message. - - :param uid: the new uid - :type uid: basestring - """ - # XXX dangerous! lock? - self._uid = uid - d = self._fdoc - d.content[self.UID_KEY] = uid - self._soledad.put_doc(d) - - def set_mbox(self, mbox): - """ - Set new mbox for this message. - - :param mbox: the new mbox - :type mbox: basestring - """ - # XXX dangerous! lock? - self._mbox = mbox - d = self._fdoc - d.content[self.MBOX_KEY] = mbox - self._soledad.put_doc(d) - - # destructor - - @deferred - def remove(self): - """ - Remove all docs associated with this message. - """ - # XXX For the moment we are only removing the flags and headers - # docs. The rest we leave there polluting your hard disk, - # until we think about a good way of deorphaning. - # Maybe a crawler of unreferenced docs. - - # XXX implement elijah's idea of using a PUT document as a - # token to ensure consistency in the removal. - - uid = self._uid - - fd = self._get_flags_doc() - #hd = self._get_headers_doc() - #bd = self._get_body_doc() - #docs = [fd, hd, bd] - - docs = [fd] - - for d in filter(None, docs): - try: - self._soledad.delete_doc(d) - except Exception as exc: - logger.error(exc) - return uid - def does_exist(self): """ - Return True if there is actually a flags message for this + Return True if there is actually a flags document for this UID and mbox. """ - return self._fdoc is not None - - -class ContentDedup(object): - """ - Message deduplication. - - We do a query for the content hashes before writing to our beloved - sqlcipher backend of Soledad. This means, by now, that: - - 1. We will not store the same attachment twice, only the hash of it. - 2. We will not store the same message body twice, only the hash of it. - - The first case is useful if you are always receiving the same old memes - from unwary friends that still have not discovered that 4chan is the - generator of the internet. The second will save your day if you have - initiated session with the same account in two different machines. I also - wonder why would you do that, but let's respect each other choices, like - with the religious celebrations, and assume that one day we'll be able - to run Bitmask in completely free phones. Yes, I mean that, the whole GSM - Stack. - """ - - def _content_does_exist(self, doc): - """ - Check whether we already have a content document for a payload - with this hash in our database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - phash = doc[fields.PAYLOAD_HASH_KEY] - attach_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - if not attach_docs: - return False - - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") - return True - - -SoledadWriterPayload = namedtuple( - 'SoledadWriterPayload', ['mode', 'payload']) - -# TODO we could consider using enum here: -# https://pypi.python.org/pypi/enum - -SoledadWriterPayload.CREATE = 1 -SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.CONTENT_CREATE = 3 - - -""" -SoledadDocWriter was used to avoid writing to the db from multiple threads. -Its use here has been deprecated in favor of a local rw_lock in the client. -But we might want to reuse in in the near future to implement priority queues. -""" - - -class SoledadDocWriter(object): - """ - This writer will create docs serially in the local soledad database. - """ - - implements(IMessageConsumer) - - def __init__(self, soledad): - """ - Initialize the writer. - - :param soledad: the soledad instance - :type soledad: Soledad - """ - self._soledad = soledad - - def _get_call_for_item(self, item): - """ - Return the proper call type for a given item. - - :param item: one of the types defined under the - attributes of SoledadWriterPayload - :type item: int - """ - call = None - payload = item.payload - - if item.mode == SoledadWriterPayload.CREATE: - call = self._soledad.create_doc - elif (item.mode == SoledadWriterPayload.CONTENT_CREATE - and not self._content_does_exist(payload)): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.PUT: - call = self._soledad.put_doc - return call - - def _process(self, queue): - """ - Return the item and the proper call type for the next - item in the queue if any. + return not empty(self._fdoc) - :param queue: the queue from where we'll pick item. - :type queue: Queue - """ - item = queue.get() - call = self._get_call_for_item(item) - return item, call - - def consume(self, queue): - """ - Creates a new document in soledad db. - - :param queue: queue to get item from, with content of the document - to be inserted. - :type queue: Queue - """ - empty = queue.empty() - while not empty: - item, call = self._process(queue) - - if call: - # XXX should handle the delete case - # should handle errors - try: - call(item.payload) - except u1db_errors.RevisionConflict as exc: - logger.error("Error: %r" % (exc,)) - raise exc - empty = queue.empty() - - -class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, - ContentDedup): +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ A collection of messages, surprisingly. - It is tied to a selected mailbox name that is passed to constructor. + It is tied to a selected mailbox name that is passed to its constructor. Implements a filter query over the messages contained in a soledad database. """ @@ -1058,7 +675,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, _hdocset_lock = threading.Lock() _hdocset_property_lock = threading.Lock() - def __init__(self, mbox=None, soledad=None): + def __init__(self, mbox=None, soledad=None, memstore=None): """ Constructor for MessageCollection. @@ -1068,13 +685,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, MessageCollection for each mailbox, instead of treating them as a property of each message. + We are passed an instance of MemoryStore, the same for the + SoledadBackedAccount, that we use as a read cache and a buffer + for writes. + :param mbox: the name of the mailbox. It is the name with which we filter the query over the - messages database + messages database. :type mbox: str - :param soledad: Soledad database :type soledad: Soledad instance + :param memstore: a MemoryStore instance + :type memstore: MemoryStore """ MailParser.__init__(self) leap_assert(mbox, "Need a mailbox name to initialize") @@ -1084,15 +706,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, leap_assert(soledad, "Need a soledad instance to initialize") # okay, all in order, keep going... + self.mbox = self._parse_mailbox_name(mbox) + + # XXX get a SoledadStore passed instead self._soledad = soledad + self.memstore = memstore + self.__rflags = None self.__hdocset = None self.initialize_db() # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() - self._get_or_create_hdocset() + + # Not for now... + #self._get_or_create_hdocset() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -1210,17 +839,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :type chash: basestring :return: False, if it does not exist, or UID. """ - exist = self._get_fdoc_from_chash(chash) + exist = False + if self.memstore is not None: + exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + + if not exist: + exist = self._get_fdoc_from_chash(chash) if exist: return exist.content.get(fields.UID_KEY, "unknown-uid") else: return False - @deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, + notify_on_disk=False): """ Creates a new message document. - Here lives the magic of the leap mail. Well, in soledad, really. :param raw: the raw message :type raw: str @@ -1236,27 +869,63 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :param uid: the message uid for this mailbox :type uid: int - """ - # TODO signal that we can delete the original message!----- - # when all the processing is done. - - # TODO add the linked-from info ! + :return: a deferred that will be fired with the message + uid when the adding succeed. + :rtype: deferred + """ logger.debug('adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) + d = defer.Deferred() + self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) + return d + + # We SHOULD defer this (or the heavy load here) to the thread pool, + # but it gives troubles with the QSocketNotifier used by Qt... + def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + """ + Helper that creates a new message document. + Here lives the magic of the leap mail. Well, in soledad, really. + + See `add_msg` docstring for parameter info. + + :param observer: a deferred that will be fired with the message + uid when the adding succeed. + :type observer: deferred + """ + # TODO signal that we can delete the original message!----- + # when all the processing is done. + + # TODO add the linked-from info ! + # TODO add reference to the original message + # parse msg, chash, size, multi = self._do_parse(raw) - # check for uniqueness. - if self._fdoc_already_exists(chash): - logger.warning("We already have that message in this mailbox.") - # note that this operation will leave holes in the UID sequence, - # but we're gonna change that all the same for a local-only table. - # so not touch it by the moment. - return False + # check for uniqueness -------------------------------- + # XXX profiler says that this test is costly. + # So we probably should just do an in-memory check and + # move the complete check to the soledad writer? + # Watch out! We're reserving a UID right after this! + existing_uid = self._fdoc_already_exists(chash) + if existing_uid: + logger.warning("We already have that message in this " + "mailbox, unflagging as deleted") + uid = existing_uid + msg = self.get_msg_by_uid(uid) + msg.setFlags((fields.DELETED_FLAG,), -1) + + # XXX if this is deferred to thread again we should not use + # the callback in the deferred thread, but return and + # call the callback from the caller fun... + observer.callback(uid) + return + + uid = self.memstore.increment_last_soledad_uid(self.mbox) + logger.info("ADDING MSG WITH UID: %s" % uid) fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) @@ -1273,48 +942,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[key] = parts_map[key] del parts_map - # Saving ---------------------------------------- - self.set_recent_flag(uid) - - # first, regular docs: flags and headers - self._soledad.create_doc(fd) - # XXX should check for content duplication on headers too - # but with chash. !!! - hdoc = self._soledad.create_doc(hd) - # We add the newly created hdoc to the fast-access set of - # headers documents associated with the mailbox. - self.add_hdocset_docid(hdoc.doc_id) - - # and last, but not least, try to create - # content docs if not already there. - cdocs = walk.get_raw_docs(msg, parts) - for cdoc in cdocs: - if not self._content_does_exist(cdoc): - self._soledad.create_doc(cdoc) + hd = stringify_parts_map(hd) - def _remove_cb(self, result): - return result - - def remove_all_deleted(self): - """ - Removes all messages flagged as deleted. - """ - delete_deferl = [] - for msg in self.get_deleted(): - delete_deferl.append(msg.remove()) - d1 = defer.gatherResults(delete_deferl, consumeErrors=True) - d1.addCallback(self._remove_cb) - return d1 + # The MessageContainer expects a dict, one-indexed + cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1)) - def remove(self, msg): - """ - Remove a given msg. - :param msg: the message to be removed - :type msg: LeapMessage - """ - d = msg.remove() - d.addCallback(self._remove_cb) - return d + self.set_recent_flag(uid) + msg_container = MessageWrapper(fd, hd, cdocs) + self.memstore.create_message(self.mbox, uid, msg_container, + observer=observer, + notify_on_disk=notify_on_disk) # # getters: specific queries @@ -1326,32 +963,59 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ An accessor for the recent-flags set for this mailbox. """ - if not self.__rflags: + # XXX check if we should remove this + if self.__rflags is not None: + return self.__rflags + + if self.memstore is not None: with self._rdoc_lock: - rdoc = self._get_recent_doc() - self.__rflags = set(rdoc.content.get( - fields.RECENTFLAGS_KEY, [])) - return self.__rflags + rflags = self.memstore.get_recent_flags(self.mbox) + if not rflags: + # not loaded in the memory store yet. + # let's fetch them from soledad... + rdoc = self._get_recent_doc() + rflags = set(rdoc.content.get( + fields.RECENTFLAGS_KEY, [])) + # ...and cache them now. + self.memstore.load_recent_flags( + self.mbox, + {'doc_id': rdoc.doc_id, 'set': rflags}) + return rflags + + #else: + # fallback for cases without memory store + #with self._rdoc_lock: + #rdoc = self._get_recent_doc() + #self.__rflags = set(rdoc.content.get( + #fields.RECENTFLAGS_KEY, [])) + #return self.__rflags def _set_recent_flags(self, value): """ Setter for the recent-flags set for this mailbox. """ - with self._rdoc_lock: - rdoc = self._get_recent_doc() - newv = set(value) - self.__rflags = newv - rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(rdoc) + if self.memstore is not None: + self.memstore.set_recent_flags(self.mbox, value) + + #else: + # fallback for cases without memory store + #with self._rdoc_lock: + #rdoc = self._get_recent_doc() + #newv = set(value) + #self.__rflags = newv + #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) + # XXX should deferLater 0 it? + #self._soledad.put_doc(rdoc) recent_flags = property( _get_recent_flags, _set_recent_flags, doc="Set of UIDs with the recent flag for this mailbox.") + # XXX change naming, indicate soledad query. def _get_recent_doc(self): """ - Get recent-flags document for this mailbox. + Get recent-flags document from Soledad for this mailbox. + :rtype: SoledadDocument or None """ curried = partial( self._soledad.get_from_index, @@ -1367,100 +1031,39 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def unset_recent_flags(self, uids): """ Unset Recent flag for a sequence of uids. + + :param uids: the uids to unset + :type uid: sequence """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set(uids)) + # Individual flags operations + def unset_recent_flag(self, uid): """ Unset Recent flag for a given uid. + + :param uid: the uid to unset + :type uid: int """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set([uid])) + @deferred_to_thread def set_recent_flag(self, uid): """ Set Recent flag for a given uid. + + :param uid: the uid to set + :type uid: int """ with self._rdoc_property_lock: self.recent_flags = self.recent_flags.union( set([uid])) - # headers-docs-set - - def _get_hdocset(self): - """ - An accessor for the hdocs-set for this mailbox. - """ - if not self.__hdocset: - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - value = set(hdocset_doc.content.get( - fields.HDOCS_SET_KEY, [])) - self.__hdocset = value - return self.__hdocset - - def _set_hdocset(self, value): - """ - Setter for the hdocs-set for this mailbox. - """ - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - newv = set(value) - self.__hdocset = newv - hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(hdocset_doc) - - _hdocset = property( - _get_hdocset, _set_hdocset, - doc="Set of Document-IDs for the headers docs associated " - "with this mailbox.") - - def _get_hdocset_doc(self): - """ - Get hdocs-set document for this mailbox. - """ - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MBOX_IDX, - fields.TYPE_HDOCS_SET_VAL, self.mbox) - curried.expected = "hdocset" - hdocset_doc = try_unique_query(curried) - return hdocset_doc - - # Property-set modification (protected by a different - # lock to give atomicity to the read/write operation) - - def remove_hdocset_docids(self, docids): - """ - Remove the given document IDs from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set(docids)) - - def remove_hdocset_docid(self, docid): - """ - Remove the given document ID from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set([docid])) - - def add_hdocset_docid(self, docid): - """ - Add the given document ID to the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.union( - set([docid])) - # individual doc getters, message layer. def _get_fdoc_from_chash(self, chash): @@ -1499,7 +1102,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, return None return fdoc.content.get(fields.UID_KEY, None) - @deferred + @deferred_to_thread def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -1515,24 +1118,83 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # and we cannot find it otherwise. This seems to be enough. # XXX do a deferLater instead ?? - time.sleep(0.3) + # XXX is this working? return self._get_uid_from_msgidCb(msgid) + def set_flags(self, mbox, messages, flags, mode, observer): + """ + Set flags for a sequence of messages. + + :param mbox: the mbox this message belongs to + :type mbox: str or unicode + :param messages: the messages to iterate through + :type messages: sequence + :flags: the flags to be set + :type flags: tuple + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ + # XXX we could defer *this* to thread pool, and gather results... + # XXX use deferredList + + deferreds = [] + for msg_id in messages: + deferreds.append( + self._set_flag_for_uid(msg_id, flags, mode)) + + def notify(result): + observer.callback(dict(result)) + d1 = defer.gatherResults(deferreds, consumeErrors=True) + d1.addCallback(notify) + + @deferred_to_thread + def _set_flag_for_uid(self, msg_id, flags, mode): + """ + Run the set_flag operation in the thread pool. + """ + log.msg("MSG ID = %s" % msg_id) + msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) + if msg is not None: + return msg_id, msg.setFlags(flags, mode) + # getters: generic for a mailbox - def get_msg_by_uid(self, uid): + def get_msg_by_uid(self, uid, mem_only=False, flags_only=False): """ Retrieves a LeapMessage by UID. This is used primarity in the Mailbox fetch and store methods. :param uid: the message uid to query by :type uid: int + :param mem_only: a flag that indicates whether this Message should + pass a reference to soledad to retrieve missing pieces + or not. + :type mem_only: bool + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool :return: A LeapMessage instance matching the query, or None if not found. :rtype: LeapMessage """ - msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) + msg_container = self.memstore.get_message(self.mbox, uid, flags_only) + if msg_container is not None: + if mem_only: + msg = LeapMessage(None, uid, self.mbox, collection=self, + container=msg_container) + else: + # We pass a reference to soledad just to be able to retrieve + # missing parts that cannot be found in the container, like + # the content docs after a copy. + msg = LeapMessage(self._soledad, uid, self.mbox, + collection=self, container=msg_container) + else: + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg @@ -1564,40 +1226,50 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # FIXME ---------------------------------------------- return sorted(all_docs, key=lambda item: item.content['uid']) - def all_uid_iter(self): + def all_soledad_uid_iter(self): """ - Return an iterator trhough the UIDs of all messages, sorted in + Return an iterator through the UIDs of all messages, sorted in ascending order. """ - # XXX we should get this from the uid table, local-only - all_uids = (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox)) - return (u for u in sorted(all_uids)) + db_uids = set([doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox)]) + return db_uids - def reset_last_uid(self, param): + def all_uid_iter(self): """ - Set the last uid to the highest uid found. - Used while expunging, passed as a callback. + Return an iterator through the UIDs of all messages, from memory. """ - try: - self.last_uid = max(self.all_uid_iter()) + 1 - except ValueError: - # empty sequence - pass - return param + if self.memstore is not None: + mem_uids = self.memstore.get_uids(self.mbox) + soledad_known_uids = self.memstore.get_soledad_known_uids( + self.mbox) + combined = tuple(set(mem_uids).union(soledad_known_uids)) + return combined + # XXX MOVE to memstore def all_flags(self): """ Return a dict with all flags documents for this mailbox. """ + # XXX get all from memstore and cache it there + # FIXME should get all uids, get them fro memstore, + # and get only the missing ones from disk. + all_flags = dict((( doc.content[self.UID_KEY], doc.content[self.FLAGS_KEY]) for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox))) + if self.memstore is not None: + uids = self.memstore.get_uids(self.mbox) + docs = ((uid, self.memstore.get_message(self.mbox, uid)) + for uid in uids) + for uid, doc in docs: + all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY] + return all_flags def all_flags_chash(self): @@ -1630,9 +1302,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :rtype: int """ + # XXX We should cache this in memstore too until next write... count = self._soledad.get_count_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) + if self.memstore is not None: + count += self.memstore.count_new() return count # unseen messages @@ -1674,6 +1349,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # recent messages + # XXX take it from memstore def count_recent(self): """ Count all messages with the `Recent` flag. @@ -1686,30 +1362,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ return len(self.recent_flags) - # deleted messages - - def deleted_iter(self): - """ - Get an iterator for the message UIDs with `deleted` flag. - - :return: iterator through deleted message docs - :rtype: iterable - """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1')) - - def get_deleted(self): - """ - Get all messages with the `Deleted` flag. - - :returns: a generator of LeapMessages - :rtype: generator - """ - return (LeapMessage(self._soledad, docid, self.mbox) - for docid in self.deleted_iter()) - def __len__(self): """ Returns the number of messages on this mailbox. diff --git a/src/leap/mail/imap/server.py b/src/leap/mail/imap/server.py new file mode 100644 index 0000000..ba63846 --- /dev/null +++ b/src/leap/mail/imap/server.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +# server.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Leap IMAP4 Server Implementation. +""" +from copy import copy + +from twisted import cred +from twisted.internet import defer +from twisted.internet.defer import maybeDeferred +from twisted.internet.task import deferLater +from twisted.mail import imap4 +from twisted.python import log + +from leap.common import events as leap_events +from leap.common.check import leap_assert, leap_assert_type +from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN +from leap.soledad.client import Soledad + + +class LeapIMAPServer(imap4.IMAP4Server): + """ + An IMAP4 Server with mailboxes backed by soledad + """ + def __init__(self, *args, **kwargs): + # pop extraneous arguments + soledad = kwargs.pop('soledad', None) + uuid = kwargs.pop('uuid', None) + userid = kwargs.pop('userid', None) + leap_assert(soledad, "need a soledad instance") + leap_assert_type(soledad, Soledad) + leap_assert(uuid, "need a user in the initialization") + + self._userid = userid + + # initialize imap server! + imap4.IMAP4Server.__init__(self, *args, **kwargs) + + # we should initialize the account here, + # but we move it to the factory so we can + # populate the test account properly (and only once + # per session) + + def lineReceived(self, line): + """ + Attempt to parse a single line from the server. + + :param line: the line from the server, without the line delimiter. + :type line: str + """ + if self.theAccount.closed is True and self.state != "unauth": + log.msg("Closing the session. State: unauth") + self.state = "unauth" + + if "login" in line.lower(): + # avoid to log the pass, even though we are using a dummy auth + # by now. + msg = line[:7] + " [...]" + else: + msg = copy(line) + log.msg('rcv (%s): %s' % (self.state, msg)) + imap4.IMAP4Server.lineReceived(self, line) + + def authenticateLogin(self, username, password): + """ + Lookup the account with the given parameters, and deny + the improper combinations. + + :param username: the username that is attempting authentication. + :type username: str + :param password: the password to authenticate with. + :type password: str + """ + # XXX this should use portal: + # return portal.login(cred.credentials.UsernamePassword(user, pass) + if username != self._userid: + # bad username, reject. + raise cred.error.UnauthorizedLogin() + # any dummy password is allowed so far. use realm instead! + leap_events.signal(IMAP_CLIENT_LOGIN, "1") + return imap4.IAccount, self.theAccount, lambda: None + + def do_FETCH(self, tag, messages, query, uid=0): + """ + Overwritten fetch dispatcher to use the fast fetch_flags + method + """ + if not query: + self.sendPositiveResponse(tag, 'FETCH complete') + return + + cbFetch = self._IMAP4Server__cbFetch + ebFetch = self._IMAP4Server__ebFetch + + if len(query) == 1 and str(query[0]) == "flags": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_flags, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + elif len(query) == 1 and str(query[0]) == "rfc822.header": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_headers, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + else: + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback( + ebFetch, tag + ).addCallback( + self.on_fetch_finished, messages) + + select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_fetchatt) + + def on_fetch_finished(self, _, messages): + from twisted.internet import reactor + + print "FETCH FINISHED -- NOTIFY NEW" + deferLater(reactor, 0, self.notifyNew) + deferLater(reactor, 0, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 0, self.mbox.signal_unread_to_ui) + + def on_copy_finished(self, defers): + d = defer.gatherResults(filter(None, defers)) + + def when_finished(result): + log.msg("COPY FINISHED") + self.notifyNew() + self.mbox.signal_unread_to_ui() + d.addCallback(when_finished) + #d.addCallback(self.notifyNew) + #d.addCallback(self.mbox.signal_unread_to_ui) + + def do_COPY(self, tag, messages, mailbox, uid=0): + from twisted.internet import reactor + defers = [] + d = imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) + defers.append(d) + deferLater(reactor, 0, self.on_copy_finished, defers) + + select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_astring) + + def notifyNew(self, ignored=None): + """ + Notify new messages to listeners. + """ + print "TRYING TO NOTIFY NEW" + self.mbox.notify_new() + + def _cbSelectWork(self, mbox, cmdName, tag): + """ + Callback for selectWork, patched to avoid conformance errors due to + incomplete UIDVALIDITY line. + """ + if mbox is None: + self.sendNegativeResponse(tag, 'No such mailbox') + return + if '\\noselect' in [s.lower() for s in mbox.getFlags()]: + self.sendNegativeResponse(tag, 'Mailbox cannot be selected') + return + + flags = mbox.getFlags() + self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS') + self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT') + self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags)) + + # Patched ------------------------------------------------------- + # imaptest was complaining about the incomplete line, we're adding + # "UIDs valid" here. + self.sendPositiveResponse( + None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity()) + # ---------------------------------------------------------------- + + s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY' + mbox.addListener(self) + self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName)) + self.state = 'select' + self.mbox = mbox + + def checkpoint(self): + """ + Called when the client issues a CHECK command. + + This should perform any checkpoint operations required by the server. + It may be a long running operation, but may not block. If it returns + a deferred, the client will only be informed of success (or failure) + when the deferred's callback (or errback) is invoked. + """ + # TODO return the output of _memstore.is_writing + # XXX and that should return a deferred! + return None diff --git a/src/leap/mail/imap/service/imap.py b/src/leap/mail/imap/service/imap.py index ad22da6..5487cfc 100644 --- a/src/leap/mail/imap/service/imap.py +++ b/src/leap/mail/imap/service/imap.py @@ -17,17 +17,12 @@ """ Imap service initialization """ -from copy import copy - import logging +import os from twisted.internet.protocol import ServerFactory -from twisted.internet.defer import maybeDeferred from twisted.internet.error import CannotListenError -from twisted.internet.task import deferLater from twisted.mail import imap4 -from twisted.python import log -from twisted import cred logger = logging.getLogger(__name__) @@ -36,6 +31,9 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check from leap.keymanager import KeyManager from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.fetch import LeapIncomingMail +from leap.mail.imap.memorystore import MemoryStore +from leap.mail.imap.server import LeapIMAPServer +from leap.mail.imap.soledadstore import SoledadStore from leap.soledad.client import Soledad # The default port in which imap service will run @@ -47,7 +45,6 @@ INCOMING_CHECK_PERIOD = 60 from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START -from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN ###################################################### # Temporary workaround for RecursionLimit when using @@ -68,160 +65,9 @@ except Exception: ###################################################### - -class LeapIMAPServer(imap4.IMAP4Server): - """ - An IMAP4 Server with mailboxes backed by soledad - """ - def __init__(self, *args, **kwargs): - # pop extraneous arguments - soledad = kwargs.pop('soledad', None) - uuid = kwargs.pop('uuid', None) - userid = kwargs.pop('userid', None) - leap_assert(soledad, "need a soledad instance") - leap_assert_type(soledad, Soledad) - leap_assert(uuid, "need a user in the initialization") - - self._userid = userid - - # initialize imap server! - imap4.IMAP4Server.__init__(self, *args, **kwargs) - - # we should initialize the account here, - # but we move it to the factory so we can - # populate the test account properly (and only once - # per session) - - def lineReceived(self, line): - """ - Attempt to parse a single line from the server. - - :param line: the line from the server, without the line delimiter. - :type line: str - """ - if self.theAccount.closed is True and self.state != "unauth": - log.msg("Closing the session. State: unauth") - self.state = "unauth" - - if "login" in line.lower(): - # avoid to log the pass, even though we are using a dummy auth - # by now. - msg = line[:7] + " [...]" - else: - msg = copy(line) - log.msg('rcv (%s): %s' % (self.state, msg)) - imap4.IMAP4Server.lineReceived(self, line) - - def authenticateLogin(self, username, password): - """ - Lookup the account with the given parameters, and deny - the improper combinations. - - :param username: the username that is attempting authentication. - :type username: str - :param password: the password to authenticate with. - :type password: str - """ - # XXX this should use portal: - # return portal.login(cred.credentials.UsernamePassword(user, pass) - if username != self._userid: - # bad username, reject. - raise cred.error.UnauthorizedLogin() - # any dummy password is allowed so far. use realm instead! - leap_events.signal(IMAP_CLIENT_LOGIN, "1") - return imap4.IAccount, self.theAccount, lambda: None - - def do_FETCH(self, tag, messages, query, uid=0): - """ - Overwritten fetch dispatcher to use the fast fetch_flags - method - """ - from twisted.internet import reactor - if not query: - self.sendPositiveResponse(tag, 'FETCH complete') - return # XXX ??? - - cbFetch = self._IMAP4Server__cbFetch - ebFetch = self._IMAP4Server__ebFetch - - if len(query) == 1 and str(query[0]) == "flags": - self._oldTimeout = self.setTimeout(None) - # no need to call iter, we get a generator - maybeDeferred( - self.mbox.fetch_flags, messages, uid=uid - ).addCallback( - cbFetch, tag, query, uid - ).addErrback(ebFetch, tag) - elif len(query) == 1 and str(query[0]) == "rfc822.header": - self._oldTimeout = self.setTimeout(None) - # no need to call iter, we get a generator - maybeDeferred( - self.mbox.fetch_headers, messages, uid=uid - ).addCallback( - cbFetch, tag, query, uid - ).addErrback(ebFetch, tag) - else: - self._oldTimeout = self.setTimeout(None) - # no need to call iter, we get a generator - maybeDeferred( - self.mbox.fetch, messages, uid=uid - ).addCallback( - cbFetch, tag, query, uid - ).addErrback( - ebFetch, tag) - - deferLater(reactor, - 2, self.mbox.unset_recent_flags, messages) - deferLater(reactor, 1, self.mbox.signal_unread_to_ui) - - select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, - imap4.IMAP4Server.arg_fetchatt) - - def do_COPY(self, tag, messages, mailbox, uid=0): - from twisted.internet import reactor - imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) - deferLater(reactor, - 2, self.mbox.unset_recent_flags, messages) - deferLater(reactor, 1, self.mbox.signal_unread_to_ui) - - select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, - imap4.IMAP4Server.arg_astring) - - def notifyNew(self, ignored): - """ - Notify new messages to listeners. - """ - self.mbox.notify_new() - - def _cbSelectWork(self, mbox, cmdName, tag): - """ - Callback for selectWork, patched to avoid conformance errors due to - incomplete UIDVALIDITY line. - """ - if mbox is None: - self.sendNegativeResponse(tag, 'No such mailbox') - return - if '\\noselect' in [s.lower() for s in mbox.getFlags()]: - self.sendNegativeResponse(tag, 'Mailbox cannot be selected') - return - - flags = mbox.getFlags() - self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS') - self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT') - self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags)) - - # Patched ------------------------------------------------------- - # imaptest was complaining about the incomplete line, we're adding - # "UIDs valid" here. - self.sendPositiveResponse( - None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity()) - # ---------------------------------------------------------------- - - s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY' - mbox.addListener(self) - self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName)) - self.state = 'select' - self.mbox = mbox +DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None) +if DO_MANHOLE: + from leap.mail.imap.service import manhole class IMAPAuthRealm(object): @@ -256,11 +102,16 @@ class LeapIMAPFactory(ServerFactory): self._uuid = uuid self._userid = userid self._soledad = soledad + self._memstore = MemoryStore( + permanent_store=SoledadStore(soledad)) theAccount = SoledadBackedAccount( - uuid, soledad=soledad) + uuid, soledad=soledad, + memstore=self._memstore) self.theAccount = theAccount + # XXX how to pass the store along? + def buildProtocol(self, addr): "Return a protocol suitable for the job." imapProtocol = LeapIMAPServer( @@ -281,6 +132,8 @@ def run_service(*args, **kwargs): the reactor when starts listening, and the factory for the protocol. """ + from twisted.internet import reactor + leap_assert(len(args) == 2) soledad, keymanager = args leap_assert_type(soledad, Soledad) @@ -295,8 +148,6 @@ def run_service(*args, **kwargs): uuid = soledad._get_uuid() factory = LeapIMAPFactory(uuid, userid, soledad) - from twisted.internet import reactor - try: tport = reactor.listenTCP(port, factory, interface="localhost") @@ -317,6 +168,16 @@ def run_service(*args, **kwargs): else: # all good. # (the caller has still to call fetcher.start_loop) + + if DO_MANHOLE: + # TODO get pass from env var.too. + manhole_factory = manhole.getManholeFactory( + {'f': factory, + 'a': factory.theAccount, + 'gm': factory.theAccount.getMailbox}, + "boss", "leap") + reactor.listenTCP(manhole.MANHOLE_PORT, manhole_factory, + interface="127.0.0.1") logger.debug("IMAP4 Server is RUNNING in port %s" % (port,)) leap_events.signal(IMAP_SERVICE_STARTED, str(port)) return fetcher, tport, factory diff --git a/src/leap/mail/imap/service/manhole.py b/src/leap/mail/imap/service/manhole.py new file mode 100644 index 0000000..c83ae89 --- /dev/null +++ b/src/leap/mail/imap/service/manhole.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# manhole.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Utilities for enabling the manhole administrative interface into the +LEAP Mail application. +""" +MANHOLE_PORT = 2222 + + +def getManholeFactory(namespace, user, secret): + """ + Get an administrative manhole into the application. + + :param namespace: the namespace to show in the manhole + :type namespace: dict + :param user: the user to authenticate into the administrative shell. + :type user: str + :param secret: pass for this manhole + :type secret: str + """ + import string + + from twisted.cred.portal import Portal + from twisted.conch import manhole, manhole_ssh + from twisted.conch.insults import insults + from twisted.cred.checkers import ( + InMemoryUsernamePasswordDatabaseDontUse as MemoryDB) + + from rlcompleter import Completer + + class EnhancedColoredManhole(manhole.ColoredManhole): + """ + A Manhole with some primitive autocomplete support. + """ + # TODO use introspection to make life easier + + def find_common(self, l): + """ + find common parts in thelist items + ex: 'ab' for ['abcd','abce','abf'] + requires an ordered list + """ + if len(l) == 1: + return l[0] + + init = l[0] + for item in l[1:]: + for i, (x, y) in enumerate(zip(init, item)): + if x != y: + init = "".join(init[:i]) + break + + if not init: + return None + return init + + def handle_TAB(self): + """ + Trap the TAB keystroke. + """ + necessarypart = "".join(self.lineBuffer).split(' ')[-1] + completer = Completer(globals()) + if completer.complete(necessarypart, 0): + matches = list(set(completer.matches)) # has multiples + + if len(matches) == 1: + length = len(necessarypart) + self.lineBuffer = self.lineBuffer[:-length] + self.lineBuffer.extend(matches[0]) + self.lineBufferIndex = len(self.lineBuffer) + else: + matches.sort() + commons = self.find_common(matches) + if commons: + length = len(necessarypart) + self.lineBuffer = self.lineBuffer[:-length] + self.lineBuffer.extend(commons) + self.lineBufferIndex = len(self.lineBuffer) + + self.terminal.nextLine() + while matches: + matches, part = matches[4:], matches[:4] + for item in part: + self.terminal.write('%s' % item.ljust(30)) + self.terminal.write('\n') + self.terminal.nextLine() + + self.terminal.eraseLine() + self.terminal.cursorBackward(self.lineBufferIndex + 5) + self.terminal.write("%s %s" % ( + self.ps[self.pn], "".join(self.lineBuffer))) + + def keystrokeReceived(self, keyID, modifier): + """ + Act upon any keystroke received. + """ + self.keyHandlers.update({'\b': self.handle_BACKSPACE}) + m = self.keyHandlers.get(keyID) + if m is not None: + m() + elif keyID in string.printable: + self.characterReceived(keyID, False) + + sshRealm = manhole_ssh.TerminalRealm() + + def chainedProtocolFactory(): + return insults.ServerProtocol(EnhancedColoredManhole, namespace) + + sshRealm = manhole_ssh.TerminalRealm() + sshRealm.chainedProtocolFactory = chainedProtocolFactory + + portal = Portal( + sshRealm, [MemoryDB(**{user: secret})]) + + f = manhole_ssh.ConchFactory(portal) + return f diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py new file mode 100644 index 0000000..8e22f26 --- /dev/null +++ b/src/leap/mail/imap/soledadstore.py @@ -0,0 +1,487 @@ +# -*- coding: utf-8 -*- +# soledadstore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A MessageStore that writes to Soledad. +""" +import logging +import threading + +from itertools import chain + +from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log +from zope.interface import implements + +from leap.common.check import leap_assert_type +from leap.mail.decorators import deferred_to_thread +from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import RecentFlagsDoc +from leap.mail.imap.fields import fields +from leap.mail.imap.interfaces import IMessageStore +from leap.mail.messageflow import IMessageConsumer +from leap.mail.utils import first + +logger = logging.getLogger(__name__) + + +# TODO +# [ ] Delete original message from the incoming queue after all successful +# writes. +# [ ] Implement a retry queue. +# [ ] Consider journaling of operations. + + +class ContentDedup(object): + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + sqlcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same body/attachment twice, only the hash of it. + 2. We will not store the same message header twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + # TODO refactor using unique_query + + def _header_does_exist(self, doc): + """ + Check whether we already have a header document for this + content hash in our database. + + :param doc: tentative header for document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + header_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + if not header_docs: + return False + + if len(header_docs) != 1: + logger.warning("Found more than one copy of chash %s!" + % (chash,)) + logger.debug("Found header doc with that hash! Skipping save!") + return True + + def _content_does_exist(self, doc): + """ + Check whether we already have a content document for a payload + with this hash in our database. + + :param doc: tentative content for document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + + +class MsgWriteError(Exception): + """ + Raised if any exception is found while saving message parts. + """ + + +class SoledadStore(ContentDedup): + """ + This will create docs in the local Soledad database. + """ + _last_uid_lock = threading.Lock() + + implements(IMessageConsumer, IMessageStore) + + def __init__(self, soledad): + """ + Initialize the permanent store that writes to Soledad database. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + # IMessageStore + + # ------------------------------------------------------------------- + # We are not yet using this interface, but it would make sense + # to implement it. + + def create_message(self, mbox, uid, message): + """ + Create the passed message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + :param message: a IMessageContainer implementor. + """ + raise NotImplementedError() + + def put_message(self, mbox, uid, message): + """ + Put the passed existing message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + :param message: a IMessageContainer implementor. + """ + raise NotImplementedError() + + def remove_message(self, mbox, uid): + """ + Remove the given message from this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + """ + raise NotImplementedError() + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + """ + raise NotImplementedError() + + # IMessageConsumer + + # It's not thread-safe to defer this to a different thread + + def consume(self, queue): + """ + Creates a new document in soledad db. + + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue + """ + # TODO should delete the original message from incoming only after + # the writes are done. + # TODO should handle the delete case + # TODO should handle errors + # TODO could generalize this method into a generic consumer + # and only implement `process` here + + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + + while not queue.empty(): + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False + + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc + :param deferred: a deferred that will be fired when the write operation + has finished, either calling its callback or its + errback depending on whether it succeed. + :type deferred: Deferred + """ + items = self._process(doc_wrapper) + + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: + try: + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) + + # + # SoledadStore specific methods. + # + + def _process(self, doc_wrapper): + """ + Return an iterator that will yield the doc_wrapper in the first place, + followed by the subparts item and the proper call type for every + item in the queue, if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + if isinstance(doc_wrapper, MessageWrapper): + return chain((doc_wrapper,), + self._get_calls_for_msg_parts(doc_wrapper)) + elif isinstance(doc_wrapper, RecentFlagsDoc): + return chain((doc_wrapper,), + self._get_calls_for_rflags_doc(doc_wrapper)) + else: + logger.warning("CANNOT PROCESS ITEM!") + return (i for i in []) + + def _try_call(self, call, item): + """ + Try to invoke a given call with item as a parameter. + + :param call: the function to call + :type call: callable + :param item: the payload to pass to the call as argument + :type item: object + """ + if call is None: + return + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + + def _get_calls_for_msg_parts(self, msg_wrapper): + """ + Generator that return the proper call type for a given item. + + :param msg_wrapper: A MessageWrapper + :type msg_wrapper: IMessageContainer + :return: a generator of tuples with recent-flags doc payload + and callable + :rtype: generator + """ + call = None + + if msg_wrapper.new: + call = self._soledad.create_doc + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + if item.part == MessagePartType.fdoc: + yield dict(item.content), call + + elif item.part == MessagePartType.hdoc: + if not self._header_does_exist(item.content): + yield dict(item.content), call + + elif item.part == MessagePartType.cdoc: + if not self._content_does_exist(item.content): + yield dict(item.content), call + + # For now, the only thing that will be dirty is + # the flags doc. + + elif msg_wrapper.dirty: + call = self._soledad.put_doc + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + # XXX FIXME Give error if dirty and not doc_id !!! + doc_id = item.doc_id # defend! + if not doc_id: + continue + doc = self._soledad.get_doc(doc_id) + doc.content = dict(item.content) + if item.part == MessagePartType.fdoc: + logger.debug("PUT dirty fdoc") + yield doc, call + + # XXX also for linkage-doc !!! + else: + logger.error("Cannot delete documents yet from the queue...!") + + def _get_calls_for_rflags_doc(self, rflags_wrapper): + """ + We always put these documents. + + :param rflags_wrapper: A wrapper around recent flags doc. + :type rflags_wrapper: RecentFlagsWrapper + :return: a tuple with recent-flags doc payload and callable + :rtype: tuple + """ + call = self._soledad.put_doc + rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + + payload = rflags_wrapper.content + logger.debug("Saving RFLAGS to Soledad...") + + if payload: + rdoc.content = payload + yield rdoc, call + + def _get_mbox_document(self, mbox): + """ + Return mailbox document. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: A SoledadDocument containing this mailbox, or None if + the query failed. + :rtype: SoledadDocument or None. + """ + try: + query = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_MBOX_VAL, mbox) + if query: + return query.pop() + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + + def get_flags_doc(self, mbox, uid): + """ + Return the SoledadDocument for the given mbox and uid. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + """ + result = None + try: + flag_docs = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, mbox, str(uid)) + result = first(flag_docs) + except Exception as exc: + # ugh! Something's broken down there! + logger.warning("ERROR while getting flags for UID: %s" % uid) + logger.exception(exc) + finally: + return result + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + with self._last_uid_lock: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value < old_val: + logger.error("%r:%s Tried to write a UID lesser than what's " + "stored!" % (mbox, value)) + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + + # deleted messages + + def deleted_iter(self, mbox): + """ + Get an iterator for the SoledadDocuments for messages + with \\Deleted flag for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: iterator through deleted message docs + :rtype: iterable + """ + return (doc for doc in self._soledad.get_from_index( + fields.TYPE_MBOX_DEL_IDX, + fields.TYPE_FLAGS_VAL, mbox, '1')) + + # TODO can deferToThread this? + def remove_all_deleted(self, mbox): + """ + Remove from Soledad all messages flagged as deleted for a given + mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + deleted = [] + for doc in self.deleted_iter(mbox): + deleted.append(doc.content[fields.UID_KEY]) + self._soledad.delete_doc(doc) + return deleted diff --git a/src/leap/mail/imap/tests/leap_tests_imap.zsh b/src/leap/mail/imap/tests/leap_tests_imap.zsh index 676d1a8..544faca 100755 --- a/src/leap/mail/imap/tests/leap_tests_imap.zsh +++ b/src/leap/mail/imap/tests/leap_tests_imap.zsh @@ -76,7 +76,7 @@ imaptest_cmd() { } stress_imap() { - mknod imap_pipe p + mkfifo imap_pipe cat imap_pipe | tee output & imaptest_cmd >> imap_pipe } @@ -99,7 +99,7 @@ print_results() { echo "----------------------" echo "\tavg\tstdev" $GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \ - awk ' + gawk ' function avg(data, count) { sum=0; for( x=0; x <= count-1; x++) { diff --git a/src/leap/mail/imap/tests/walktree.py b/src/leap/mail/imap/tests/walktree.py index 1626f65..f3cbcb0 100644 --- a/src/leap/mail/imap/tests/walktree.py +++ b/src/leap/mail/imap/tests/walktree.py @@ -18,12 +18,14 @@ Tests for the walktree module. """ import os +import sys from email import parser from leap.mail import walk as W DEBUG = os.environ.get("BITMASK_MAIL_DEBUG") + p = parser.Parser() # TODO pass an argument of the type of message @@ -31,9 +33,17 @@ p = parser.Parser() ################################################## # Input from hell -#msg = p.parse(open('rfc822.multi-signed.message')) -#msg = p.parse(open('rfc822.plain.message')) -msg = p.parse(open('rfc822.multi-minimal.message')) +if len(sys.argv) > 1: + FILENAME = sys.argv[1] +else: + FILENAME = "rfc822.multi-minimal.message" + +""" +FILENAME = "rfc822.multi-signed.message" +FILENAME = "rfc822.plain.message" +""" + +msg = p.parse(open(FILENAME)) DO_CHECK = False ################################################# |