diff options
author | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
---|---|---|
committer | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-02-04 16:20:54 -0300 |
commit | a6fa032552d119eafbd7a9338a3a90b5b31b07ac (patch) | |
tree | 37c156921ca56fa8836c05cc2660f49db3d9db10 | |
parent | a25a93d99d9c10b49279ec5b3568639db8ead67a (diff) | |
parent | 8d9b198a2aca9e3c616e0a8a1583767ac6ae8cff (diff) |
Merge remote-tracking branch 'kali/feature/in-memory-store' into develop
21 files changed, 3269 insertions, 1032 deletions
diff --git a/mail/changes/feature_in-memory-store b/mail/changes/feature_in-memory-store new file mode 100644 index 0000000..a7a4d7a --- /dev/null +++ b/mail/changes/feature_in-memory-store @@ -0,0 +1 @@ + o Use a memory store as write-buffer and read-cache. diff --git a/mail/pkg/requirements.pip b/mail/pkg/requirements.pip index dc0635c..603eaf6 100644 --- a/mail/pkg/requirements.pip +++ b/mail/pkg/requirements.pip @@ -4,3 +4,4 @@ leap.common>=0.3.5 leap.keymanager>=0.3.7 twisted # >= 12.0.3 ?? zope.proxy +enum diff --git a/mail/src/leap/mail/decorators.py b/mail/src/leap/mail/decorators.py index d5eac97..ae115f8 100644 --- a/mail/src/leap/mail/decorators.py +++ b/mail/src/leap/mail/decorators.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) # See this answer: http://stackoverflow.com/a/19019648/1157664 # And the notes by glyph and jpcalderone -def deferred(f): +def deferred_to_thread(f): """ Decorator, for deferring methods to Threads. diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index ce83079..f985c04 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -48,7 +48,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): selected = None closed = False - def __init__(self, account_name, soledad=None): + def __init__(self, account_name, soledad, memstore=None): """ Creates a SoledadAccountIndex that keeps track of the mailboxes and subscriptions handled by this account. @@ -57,7 +57,9 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): :type acct_name: str :param soledad: a Soledad instance. - :param soledad: Soledad + :type soledad: Soledad + :param memstore: a MemoryStore instance. + :type memstore: MemoryStore """ leap_assert(soledad, "Need a soledad instance to initialize") leap_assert_type(soledad, Soledad) @@ -67,6 +69,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): self._account_name = self._parse_mailbox_name(account_name) self._soledad = soledad + self._memstore = memstore self.initialize_db() @@ -131,7 +134,8 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): if name not in self.mailboxes: raise imap4.MailboxException("No such mailbox: %r" % name) - return SoledadMailbox(name, soledad=self._soledad) + return SoledadMailbox(name, self._soledad, + memstore=self._memstore) ## ## IAccount @@ -221,8 +225,7 @@ class SoledadBackedAccount(WithMsgFields, IndexedDB, MBoxParser): self.selected = name return SoledadMailbox( - name, rw=readwrite, - soledad=self._soledad) + name, self._soledad, self._memstore, readwrite) def delete(self, name, force=False): """ diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/imap/fetch.py index 817ad6a..40dadb3 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/imap/fetch.py @@ -45,7 +45,7 @@ from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread from leap.mail.utils import json_loads from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -199,7 +199,7 @@ class LeapIncomingMail(object): logger.exception(failure.value) traceback.print_tb(*sys.exc_info()) - @deferred + @deferred_to_thread def _sync_soledad(self): """ Synchronizes with remote soledad. diff --git a/mail/src/leap/mail/imap/interfaces.py b/mail/src/leap/mail/imap/interfaces.py new file mode 100644 index 0000000..c906278 --- /dev/null +++ b/mail/src/leap/mail/imap/interfaces.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# interfaces.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Interfaces for the IMAP module. +""" +from zope.interface import Interface, Attribute + + +class IMessageContainer(Interface): + """ + I am a container around the different documents that a message + is split into. + """ + fdoc = Attribute('The flags document for this message, if any.') + hdoc = Attribute('The headers document for this message, if any.') + cdocs = Attribute('The dict of content documents for this message, ' + 'if any.') + + def walk(self): + """ + Return an iterator to the docs for all the parts. + + :rtype: iterator + """ + + +class IMessageStore(Interface): + """ + I represent a generic storage for LEAP Messages. + """ + + def create_message(self, mbox, uid, message): + """ + Put the passed message into this IMessageStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def put_message(self, mbox, uid, message): + """ + Put the passed message into this IMessageStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :param message: a IMessageContainer implementor. + """ + + def remove_message(self, mbox, uid): + """ + Remove the given message from this IMessageStore. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + """ + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :param uid: the UID that identifies this message in this mailbox. + :return: IMessageContainer + """ + + +class IMessageStoreWriter(Interface): + """ + I represent a storage that is able to write its contents to another + different IMessageStore. + """ + + def write_messages(self, store): + """ + Write the documents in this IMessageStore to a different + storage. Usually this will be done from a MemoryStorage to a DbStorage. + + :param store: another IMessageStore implementor. + """ diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index 0131ce0..c682578 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -22,6 +22,7 @@ import threading import logging import StringIO import cStringIO +import os from collections import defaultdict @@ -35,13 +36,21 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.decorators import deferred +from leap.mail.decorators import deferred_to_thread +from leap.mail.utils import empty from leap.mail.imap.fields import WithMsgFields, fields from leap.mail.imap.messages import MessageCollection +from leap.mail.imap.messageparts import MessageWrapper from leap.mail.imap.parser import MBoxParser logger = logging.getLogger(__name__) +""" +If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid +notifying clients of new messages. Use during stress tests. +""" +NOTIFY_NEW = not os.environ.get('LEAP_SKIPNOTIFY', False) + class SoledadMailbox(WithMsgFields, MBoxParser): """ @@ -76,11 +85,12 @@ class SoledadMailbox(WithMsgFields, MBoxParser): CMD_UIDVALIDITY = "UIDVALIDITY" CMD_UNSEEN = "UNSEEN" + # FIXME we should turn this into a datastructure with limited capacity _listeners = defaultdict(set) next_uid_lock = threading.Lock() - def __init__(self, mbox, soledad=None, rw=1): + def __init__(self, mbox, soledad, memstore, rw=1): """ SoledadMailbox constructor. Needs to get passed a name, plus a Soledad instance. @@ -91,7 +101,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param soledad: a Soledad instance. :type soledad: Soledad - :param rw: read-and-write flags + :param memstore: a MemoryStore instance + :type memstore: MemoryStore + + :param rw: read-and-write flag for this mailbox :type rw: int """ leap_assert(mbox, "Need a mailbox name to initialize") @@ -105,13 +118,18 @@ class SoledadMailbox(WithMsgFields, MBoxParser): self.rw = rw self._soledad = soledad + self._memstore = memstore self.messages = MessageCollection( - mbox=mbox, soledad=self._soledad) + mbox=mbox, soledad=self._soledad, memstore=self._memstore) if not self.getFlags(): self.setFlags(self.INIT_FLAGS) + if self._memstore: + self.prime_known_uids_to_memstore() + self.prime_last_uid_to_memstore() + @property def listeners(self): """ @@ -125,6 +143,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ return self._listeners[self.mbox] + # TODO this grows too crazily when many instances are fired, like + # during imaptest stress testing. Should have a queue of limited size + # instead. def addListener(self, listener): """ Add a listener to the listeners queue. @@ -134,6 +155,9 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param listener: listener to add :type listener: an object that implements IMailboxListener """ + if not NOTIFY_NEW: + return + logger.debug('adding mailbox listener: %s' % listener) self.listeners.add(listener) @@ -146,6 +170,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ self.listeners.remove(listener) + # TODO move completely to soledadstore, under memstore reponsibility. def _get_mbox(self): """ Return mailbox document. @@ -221,48 +246,38 @@ class SoledadMailbox(WithMsgFields, MBoxParser): def _get_last_uid(self): """ Return the last uid for this mailbox. + If we have a memory store, the last UID will be the highest + recorded UID in the message store, or a counter cached from + the mailbox document in soledad if this is higher. :return: the last uid for messages in this mailbox - :rtype: bool + :rtype: int """ - mbox = self._get_mbox() - if not mbox: - logger.error("We could not get a mbox!") - # XXX It looks like it has been corrupted. - # We need to be able to survive this. - return None - return mbox.content.get(self.LAST_UID_KEY, 1) + last = self._memstore.get_last_uid(self.mbox) + logger.debug("last uid for %s: %s (from memstore)" % ( + repr(self.mbox), last)) + return last - def _set_last_uid(self, uid): - """ - Sets the last uid for this mailbox. + last_uid = property( + _get_last_uid, doc="Last_UID attribute.") - :param uid: the uid to be set - :type uid: int + def prime_last_uid_to_memstore(self): """ - leap_assert(isinstance(uid, int), "uid has to be int") - mbox = self._get_mbox() - key = self.LAST_UID_KEY - - count = self.getMessageCount() - - # XXX safety-catch. If we do get duplicates, - # we want to avoid further duplication. - - if uid >= count: - value = uid - else: - # something is wrong, - # just set the last uid - # beyond the max msg count. - logger.debug("WRONG uid < count. Setting last uid to %s", count) - value = count + Prime memstore with last_uid value + """ + set_exist = set(self.messages.all_uid_iter()) + last = max(set_exist) if set_exist else 0 + logger.info("Priming Soledad last_uid to %s" % (last,)) + self._memstore.set_last_soledad_uid(self.mbox, last) - mbox.content[key] = value - self._soledad.put_doc(mbox) + def prime_known_uids_to_memstore(self): + """ + Prime memstore with the set of all known uids. - last_uid = property( - _get_last_uid, _set_last_uid, doc="Last_UID attribute.") + We do this to be able to filter the requests efficiently. + """ + known_uids = self.messages.all_soledad_uid_iter() + self._memstore.set_known_uids(self.mbox, known_uids) def getUIDValidity(self): """ @@ -304,8 +319,15 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :rtype: int """ with self.next_uid_lock: - self.last_uid += 1 - return self.last_uid + if self._memstore: + return self.last_uid + 1 + else: + # XXX after lock, it should be safe to + # return just the increment here, and + # have a different method that actually increments + # the counter when really adding. + self.last_uid += 1 + return self.last_uid def getMessageCount(self): """ @@ -366,7 +388,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): if self.CMD_UIDNEXT in names: r[self.CMD_UIDNEXT] = self.last_uid + 1 if self.CMD_UIDVALIDITY in names: - r[self.CMD_UIDVALIDITY] = self.getUID() + r[self.CMD_UIDVALIDITY] = self.getUIDValidity() if self.CMD_UNSEEN in names: r[self.CMD_UNSEEN] = self.getUnseenCount() return defer.succeed(r) @@ -386,26 +408,26 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :return: a deferred that evals to None """ + # TODO have a look at the cases for internal date in the rfc if isinstance(message, (cStringIO.OutputType, StringIO.StringIO)): message = message.getvalue() - # XXX we should treat the message as an IMessage from here + + # XXX we could treat the message as an IMessage from here leap_assert_type(message, basestring) - uid_next = self.getUIDNext() - logger.debug('Adding msg with UID :%s' % uid_next) if flags is None: flags = tuple() else: flags = tuple(str(flag) for flag in flags) - d = self._do_add_message(message, flags=flags, date=date, uid=uid_next) + d = self._do_add_message(message, flags=flags, date=date) return d - def _do_add_message(self, message, flags, date, uid): + def _do_add_message(self, message, flags, date): """ - Calls to the messageCollection add_msg method (deferred to thread). + Calls to the messageCollection add_msg method. Invoked from addMessage. """ - d = self.messages.add_msg(message, flags=flags, date=date, uid=uid) + d = self.messages.add_msg(message, flags=flags, date=date) # XXX Removing notify temporarily. # This is interfering with imaptest results. I'm not clear if it's # because we clutter the logging or because the set of listeners is @@ -421,6 +443,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): :param args: ignored. """ + if not NOTIFY_NEW: + return exists = self.getMessageCount() recent = self.getRecentCount() logger.debug("NOTIFY: there are %s messages, %s recent" % ( @@ -445,6 +469,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # XXX removing the mailbox in situ for now, # we should postpone the removal + + # XXX move to memory store?? self._soledad.delete_doc(self._get_mbox()) def _close_cb(self, result): @@ -467,13 +493,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): """ if not self.isWriteable(): raise imap4.ReadOnlyMailbox - d = self.messages.remove_all_deleted() - d.addCallback(self._expunge_cb) - d.addCallback(self.messages.reset_last_uid) - - # XXX DEBUG ------------------- - # FIXME !!! - # XXX should remove the hdocset too!!! + d = defer.Deferred() + return self._memstore.expunge(self.mbox, d) + self._memstore.expunge(self.mbox) + d.addCallback(self._expunge_cb, d) return d def _bound_seq(self, messages_asked): @@ -509,7 +532,8 @@ class SoledadMailbox(WithMsgFields, MBoxParser): seq_messg = set_asked.intersection(set_exist) return seq_messg - @deferred + @deferred_to_thread + #@profile def fetch(self, messages_asked, uid): """ Retrieve one or more messages in this mailbox. @@ -548,7 +572,7 @@ class SoledadMailbox(WithMsgFields, MBoxParser): result = ((msgid, getmsg(msgid)) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_flags(self, messages_asked, uid): """ A fast method to fetch all flags, tricking just the @@ -589,10 +613,10 @@ class SoledadMailbox(WithMsgFields, MBoxParser): all_flags = self.messages.all_flags() result = ((msgid, flagsPart( - msgid, all_flags[msgid])) for msgid in seq_messg) + msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) return result - @deferred + @deferred_to_thread def fetch_headers(self, messages_asked, uid): """ A fast method to fetch all headers, tricking just the @@ -641,14 +665,16 @@ class SoledadMailbox(WithMsgFields, MBoxParser): for msgid in seq_messg) return result - def signal_unread_to_ui(self): + def signal_unread_to_ui(self, *args, **kwargs): """ Sends unread event to ui. + + :param args: ignored + :param kwargs: ignored """ unseen = self.getUnseenCount() leap_events.signal(IMAP_UNREAD_MAIL, str(unseen)) - @deferred def store(self, messages_asked, flags, mode, uid): """ Sets the flags of one or more messages. @@ -670,49 +696,43 @@ class SoledadMailbox(WithMsgFields, MBoxParser): otherwise they are message sequence IDs. :type uid: bool - :return: A dict mapping message sequence numbers to sequences of - str representing the flags set on the message after this - operation has been performed. - :rtype: dict + :return: A deferred, that will be called with a dict mapping message + sequence numbers to sequences of str representing the flags + set on the message after this operation has been performed. + :rtype: deferred :raise ReadOnlyMailbox: Raised if this mailbox is not open for read-write. """ + from twisted.internet import reactor + if not self.isWriteable(): + log.msg('read only mailbox!') + raise imap4.ReadOnlyMailbox + + d = defer.Deferred() + deferLater(reactor, 0, self._do_store, messages_asked, flags, + mode, uid, d) + return d + + def _do_store(self, messages_asked, flags, mode, uid, observer): + """ + Helper method, invoke set_flags method in the MessageCollection. + + See the documentation for the `store` method for the parameters. + + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ # XXX implement also sequence (uid = 0) - # XXX we should prevent cclient from setting Recent flag. + # XXX we should prevent cclient from setting Recent flag? leap_assert(not isinstance(flags, basestring), "flags cannot be a string") flags = tuple(flags) - messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - - if not self.isWriteable(): - log.msg('read only mailbox!') - raise imap4.ReadOnlyMailbox - - result = {} - for msg_id in seq_messg: - log.msg("MSG ID = %s" % msg_id) - msg = self.messages.get_msg_by_uid(msg_id) - if not msg: - continue - if mode == 1: - msg.addFlags(flags) - elif mode == -1: - msg.removeFlags(flags) - elif mode == 0: - msg.setFlags(flags) - result[msg_id] = msg.getFlags() - - # After changing flags, we want to signal again to the - # UI because the number of unread might have changed. - # Hoever, we should probably limit this to INBOX only? - # this should really be called as a final callback of - # the do_STORE method... - from twisted.internet import reactor - deferLater(reactor, 1, self.signal_unread_to_ui) - return result + self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -760,44 +780,85 @@ class SoledadMailbox(WithMsgFields, MBoxParser): # IMessageCopier - @deferred - def copy(self, messageObject): + def copy(self, message): """ Copy the given message object into this mailbox. + + :param message: an IMessage implementor + :type message: LeapMessage + :return: a deferred that will be fired with the message + uid when the copy succeed. + :rtype: Deferred """ from twisted.internet import reactor - uid_next = self.getUIDNext() - msg = messageObject - # XXX DEBUG ---------------------------------------- - #print "copying MESSAGE from %s (%s) to %s (%s)" % ( - #msg._mbox, msg._uid, self.mbox, uid_next) + d = defer.Deferred() + # XXX this should not happen ... track it down, + # probably to FETCH... + if message is None: + log.msg("BUG: COPY found a None in passed message") + d.callback(None) + deferLater(reactor, 0, self._do_copy, message, d) + return d + + def _do_copy(self, message, observer): + """ + Call invoked from the deferLater in `copy`. This will + copy the flags and header documents, and pass them to the + `create_message` method in the MemoryStore, together with + the observer deferred that we've been passed along. + + :param message: an IMessage implementor + :type message: LeapMessage + :param observer: the deferred that will fire with the + UID of the message + :type observer: Deferred + """ + # XXX for clarity, this could be delegated to a + # MessageCollection mixin that implements copy too, and + # moved out of here. + msg = message + memstore = self._memstore # XXX should use a public api instead fdoc = msg._fdoc + hdoc = msg._hdoc if not fdoc: - logger.debug("Tried to copy a MSG with no fdoc") + logger.warning("Tried to copy a MSG with no fdoc") return - new_fdoc = copy.deepcopy(fdoc.content) - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = self.mbox - self._do_add_doc(new_fdoc) - # XXX should use a public api instead - hdoc = msg._hdoc - self.messages.add_hdocset_docid(hdoc.doc_id) + fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - deferLater(reactor, 1, self.notify_new) + # XXX is this hitting the db??? --- probably. + # We should profile after the pre-fetch. + dest_fdoc = memstore.get_fdoc_from_chash( + fdoc_chash, self.mbox) + exist = dest_fdoc and not empty(dest_fdoc.content) - def _do_add_doc(self, doc): - """ - Defer the adding of a new doc. + if exist: + # Should we signal error on the callback? + logger.warning("Destination message already exists!") - :param doc: document to be created in soledad. - :type doc: dict - """ - self._soledad.create_doc(doc) + # XXX I'm still not clear if we should raise the + # errback. This actually rases an ugly warning + # in some muas like thunderbird. I guess the user does + # not deserve that. + observer.callback(True) + else: + mbox = self.mbox + uid_next = memstore.increment_last_soledad_uid(mbox) + new_fdoc[self.UID_KEY] = uid_next + new_fdoc[self.MBOX_KEY] = mbox + + # FIXME set recent! + + self._memstore.create_message( + self.mbox, uid_next, + MessageWrapper( + new_fdoc, hdoc.content), + observer=observer, + notify_on_disk=False) # convenience fun diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py new file mode 100644 index 0000000..195cef7 --- /dev/null +++ b/mail/src/leap/mail/imap/memorystore.py @@ -0,0 +1,961 @@ +# -*- coding: utf-8 -*- +# memorystore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +In-memory transient store for a LEAPIMAPServer. +""" +import contextlib +import logging +import threading +import weakref + +from collections import defaultdict +from copy import copy + +from twisted.internet import defer +from twisted.internet.task import LoopingCall +from twisted.python import log +from zope.interface import implements + +from leap.common.check import leap_assert_type +from leap.mail import size +from leap.mail.decorators import deferred_to_thread +from leap.mail.utils import empty +from leap.mail.messageflow import MessageProducer +from leap.mail.imap import interfaces +from leap.mail.imap.fields import fields +from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc +from leap.mail.imap.messageparts import RecentFlagsDoc +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import ReferenciableDict + +logger = logging.getLogger(__name__) + + +# The default period to do writebacks to the permanent +# soledad storage, in seconds. +SOLEDAD_WRITE_PERIOD = 10 + + +@contextlib.contextmanager +def set_bool_flag(obj, att): + """ + Set a boolean flag to True while we're doing our thing. + Just to let the world know. + """ + setattr(obj, att, True) + try: + yield True + except RuntimeError as exc: + logger.exception(exc) + finally: + setattr(obj, att, False) + + +class MemoryStore(object): + """ + An in-memory store to where we can write the different parts that + we split the messages into and buffer them until we write them to the + permanent storage. + + It uses MessageWrapper instances to represent the message-parts, which are + indexed by mailbox name and UID. + + It also can be passed a permanent storage as a paremeter (any implementor + of IMessageStore, in this case a SoledadStore). In this case, a periodic + dump of the messages stored in memory will be done. The period of the + writes to the permanent storage is controled by the write_period parameter + in the constructor. + """ + implements(interfaces.IMessageStore, + interfaces.IMessageStoreWriter) + + # TODO We will want to index by chash when we transition to local-only + # UIDs. + + WRITING_FLAG = "_writing" + _last_uid_lock = threading.Lock() + + def __init__(self, permanent_store=None, + write_period=SOLEDAD_WRITE_PERIOD): + """ + Initialize a MemoryStore. + + :param permanent_store: a IMessageStore implementor to dump + messages to. + :type permanent_store: IMessageStore + :param write_period: the interval to dump messages to disk, in seconds. + :type write_period: int + """ + self._permanent_store = permanent_store + self._write_period = write_period + + # Internal Storage: messages + self._msg_store = {} + + # Internal Storage: payload-hash + """ + {'phash': weakreaf.proxy(dict)} + """ + self._phash_store = {} + + # Internal Storage: content-hash:fdoc + """ + chash-fdoc-store keeps references to + the flag-documents indexed by content-hash. + + {'chash': {'mbox-a': weakref.proxy(dict), + 'mbox-b': weakref.proxy(dict)} + } + """ + self._chash_fdoc_store = {} + + # Internal Storage: recent-flags store + """ + recent-flags store keeps one dict per mailbox, + with the document-id of the u1db document + and the set of the UIDs that have the recent flag. + + {'mbox-a': {'doc_id': 'deadbeef', + 'set': {1,2,3,4} + } + } + """ + # TODO this will have to transition to content-hash + # indexes after we move to local-only UIDs. + + self._rflags_store = defaultdict( + lambda: {'doc_id': None, 'set': set([])}) + + """ + last-uid store keeps the count of the highest UID + per mailbox. + + {'mbox-a': 42, + 'mbox-b': 23} + """ + self._last_uid = {} + + """ + known-uids keeps a count of the uids that soledad knows for a given + mailbox + + {'mbox-a': set([1,2,3])} + """ + self._known_uids = defaultdict(set) + + # New and dirty flags, to set MessageWrapper State. + self._new = set([]) + self._new_deferreds = {} + self._dirty = set([]) + self._rflags_dirty = set([]) + self._dirty_deferreds = {} + + # Flag for signaling we're busy writing to the disk storage. + setattr(self, self.WRITING_FLAG, False) + + if self._permanent_store is not None: + # this producer spits its messages to the permanent store + # consumer using a queue. We will use that to put + # our messages to be written. + self.producer = MessageProducer(permanent_store, + period=0.1) + # looping call for dumping to SoledadStore + self._write_loop = LoopingCall(self.write_messages, + permanent_store) + + # We can start the write loop right now, why wait? + self._start_write_loop() + + def _start_write_loop(self): + """ + Start loop for writing to disk database. + """ + if not self._write_loop.running: + self._write_loop.start(self._write_period, now=True) + + def _stop_write_loop(self): + """ + Stop loop for writing to disk database. + """ + if self._write_loop.running: + self._write_loop.stop() + + # IMessageStore + + # XXX this would work well for whole message operations. + # We would have to add a put_flags operation to modify only + # the flags doc (and set the dirty flag accordingly) + + def create_message(self, mbox, uid, message, observer, + notify_on_disk=True): + """ + Create the passed message into this MemoryStore. + + By default we consider that any message is a new message. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + :param message: a message to be added + :type message: MessageWrapper + :param observer: the deferred that will fire with the + UID of the message. If notify_on_disk is True, + this will happen when the message is written to + Soledad. Otherwise it will fire as soon as we've + added the message to the memory store. + :type observer: Deferred + :param notify_on_disk: whether the `observer` deferred should + wait until the message is written to disk to + be fired. + :type notify_on_disk: bool + """ + log.msg("adding new doc to memstore %r (%r)" % (mbox, uid)) + key = mbox, uid + + self._add_message(mbox, uid, message, notify_on_disk) + self._new.add(key) + + def log_add(result): + log.msg("message save: %s" % result) + return result + observer.addCallback(log_add) + + if notify_on_disk: + # We store this deferred so we can keep track of the pending + # operations internally. + # TODO this should fire with the UID !!! -- change that in + # the soledad store code. + self._new_deferreds[key] = observer + if not notify_on_disk: + # Caller does not care, just fired and forgot, so we pass + # a defer that will inmediately have its callback triggered. + observer.callback(uid) + + def put_message(self, mbox, uid, message, notify_on_disk=True): + """ + Put an existing message. + + This will set the dirty flag on the MemoryStore. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + :param message: a message to be added + :type message: MessageWrapper + :param notify_on_disk: whether the deferred that is returned should + wait until the message is written to disk to + be fired. + :type notify_on_disk: bool + + :return: a Deferred. if notify_on_disk is True, will be fired + when written to the db on disk. + Otherwise will fire inmediately + :rtype: Deferred + """ + key = mbox, uid + d = defer.Deferred() + d.addCallback(lambda result: log.msg("message PUT save: %s" % result)) + + self._dirty.add(key) + self._dirty_deferreds[key] = d + self._add_message(mbox, uid, message, notify_on_disk) + return d + + def _add_message(self, mbox, uid, message, notify_on_disk=True): + """ + Helper method, called by both create_message and put_message. + See those for parameter documentation. + """ + # XXX have to differentiate between notify_new and notify_dirty + # TODO defaultdict the hell outa here... + + key = mbox, uid + msg_dict = message.as_dict() + + FDOC = MessagePartType.fdoc.key + HDOC = MessagePartType.hdoc.key + CDOCS = MessagePartType.cdocs.key + DOCS_ID = MessagePartType.docs_id.key + + try: + store = self._msg_store[key] + except KeyError: + self._msg_store[key] = {FDOC: {}, + HDOC: {}, + CDOCS: {}, + DOCS_ID: {}} + store = self._msg_store[key] + + fdoc = msg_dict.get(FDOC, None) + if fdoc: + if not store.get(FDOC, None): + store[FDOC] = ReferenciableDict({}) + store[FDOC].update(fdoc) + + # content-hash indexing + chash = fdoc.get(fields.CONTENT_HASH_KEY) + chash_fdoc_store = self._chash_fdoc_store + if not chash in chash_fdoc_store: + chash_fdoc_store[chash] = {} + + chash_fdoc_store[chash][mbox] = weakref.proxy( + store[FDOC]) + + hdoc = msg_dict.get(HDOC, None) + if hdoc is not None: + if not store.get(HDOC, None): + store[HDOC] = ReferenciableDict({}) + store[HDOC].update(hdoc) + + docs_id = msg_dict.get(DOCS_ID, None) + if docs_id: + if not store.get(DOCS_ID, None): + store[DOCS_ID] = {} + store[DOCS_ID].update(docs_id) + + cdocs = message.cdocs + for cdoc_key in cdocs.keys(): + if not store.get(CDOCS, None): + store[CDOCS] = {} + + cdoc = cdocs[cdoc_key] + # first we make it weak-referenciable + referenciable_cdoc = ReferenciableDict(cdoc) + store[CDOCS][cdoc_key] = referenciable_cdoc + phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) + if not phash: + continue + self._phash_store[phash] = weakref.proxy(referenciable_cdoc) + + def prune(seq, store): + for key in seq: + if key in store and empty(store.get(key)): + store.pop(key) + prune((FDOC, HDOC, CDOCS, DOCS_ID), store) + + def get_docid_for_fdoc(self, mbox, uid): + """ + Return Soledad document id for the flags-doc for a given mbox and uid, + or None of no flags document could be found. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + :rtype: unicode or None + """ + fdoc = self._permanent_store.get_flags_doc(mbox, uid) + if empty(fdoc): + return None + doc_id = fdoc.doc_id + return doc_id + + def get_message(self, mbox, uid, flags_only=False): + """ + Get a MessageWrapper for the given mbox and uid combination. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool + + :return: MessageWrapper or None + """ + key = mbox, uid + FDOC = MessagePartType.fdoc.key + + msg_dict = self._msg_store.get(key, None) + if empty(msg_dict): + return None + new, dirty = self._get_new_dirty_state(key) + if flags_only: + return MessageWrapper(fdoc=msg_dict[FDOC], + new=new, dirty=dirty, + memstore=weakref.proxy(self)) + else: + return MessageWrapper(from_dict=msg_dict, + new=new, dirty=dirty, + memstore=weakref.proxy(self)) + + def remove_message(self, mbox, uid): + """ + Remove a Message from this MemoryStore. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + """ + # XXX For the moment we are only removing the flags and headers + # docs. The rest we leave there polluting your hard disk, + # until we think about a good way of deorphaning. + + # XXX implement elijah's idea of using a PUT document as a + # token to ensure consistency in the removal. + + try: + key = mbox, uid + self._new.discard(key) + self._dirty.discard(key) + self._msg_store.pop(key, None) + except Exception as exc: + logger.exception(exc) + + # IMessageStoreWriter + + def write_messages(self, store): + """ + Write the message documents in this MemoryStore to a different store. + + :param store: the IMessageStore to write to + """ + # For now, we pass if the queue is not empty, to avoid duplicate + # queuing. + # We would better use a flag to know when we've already enqueued an + # item. + + # XXX this could return the deferred for all the enqueued operations + + if not self.producer.is_queue_empty(): + return + + if any(map(lambda i: not empty(i), (self._new, self._dirty))): + logger.info("Writing messages to Soledad...") + + # TODO change for lock, and make the property access + # is accquired + with set_bool_flag(self, self.WRITING_FLAG): + for rflags_doc_wrapper in self.all_rdocs_iter(): + self.producer.push(rflags_doc_wrapper) + for msg_wrapper in self.all_new_dirty_msg_iter(): + self.producer.push(msg_wrapper) + + # MemoryStore specific methods. + + def get_uids(self, mbox): + """ + Get all uids for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: list + """ + all_keys = self._msg_store.keys() + return [uid for m, uid in all_keys if m == mbox] + + def get_soledad_known_uids(self, mbox): + """ + Get all uids that soledad knows about, from the memory cache. + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: list + """ + return self._known_uids.get(mbox, []) + + # last_uid + + def get_last_uid(self, mbox): + """ + Return the highest UID for a given mbox. + It will be the highest between the highest uid in the message store for + the mailbox, and the soledad integer cache. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: int + """ + uids = self.get_uids(mbox) + last_mem_uid = uids and max(uids) or 0 + last_soledad_uid = self.get_last_soledad_uid(mbox) + return max(last_mem_uid, last_soledad_uid) + + def get_last_soledad_uid(self, mbox): + """ + Get last uid for a given mbox from the soledad integer cache. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + return self._last_uid.get(mbox, 0) + + def set_last_soledad_uid(self, mbox, value): + """ + Set last uid for a given mbox in the soledad integer cache. + SoledadMailbox should prime this value during initialization. + Other methods (during message adding) SHOULD call + `increment_last_soledad_uid` instead. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + logger.info("setting last soledad uid for %s to %s" % + (mbox, value)) + # if we already have a value here, don't do anything + with self._last_uid_lock: + if not self._last_uid.get(mbox, None): + self._last_uid[mbox] = value + + def set_known_uids(self, mbox, value): + """ + Set the value fo the known-uids set for this mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: a sequence of integers to be added to the set. + :type value: tuple + """ + current = self._known_uids[mbox] + self._known_uids[mbox] = current.union(set(value)) + + def increment_last_soledad_uid(self, mbox): + """ + Increment by one the soledad integer cache for the last_uid for + this mbox, and fire a defer-to-thread to update the soledad value. + The caller should lock the call tho this method. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + with self._last_uid_lock: + self._last_uid[mbox] += 1 + value = self._last_uid[mbox] + self.write_last_uid(mbox, value) + return value + + @deferred_to_thread + def write_last_uid(self, mbox, value): + """ + Increment the soledad integer cache for the highest uid value. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + if self._permanent_store: + self._permanent_store.write_last_uid(mbox, value) + + # Counting sheeps... + + def count_new_mbox(self, mbox): + """ + Count the new messages by inbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: number of new messages + :rtype: int + """ + return len([(m, uid) for m, uid in self._new if mbox == mbox]) + + # XXX used at all? + def count_new(self): + """ + Count all the new messages in the MemoryStore. + + :rtype: int + """ + return len(self._new) + + def get_cdoc_from_phash(self, phash): + """ + Return a content-document by its payload-hash. + + :param phash: the payload hash to check against + :type phash: str or unicode + :rtype: MessagePartDoc + """ + doc = self._phash_store.get(phash, None) + + # XXX return None for consistency? + + # XXX have to keep a mapping between phash and its linkage + # info, to know if this payload is been already saved or not. + # We will be able to get this from the linkage-docs, + # not yet implemented. + new = True + dirty = False + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.cdoc, + content=doc, + doc_id=None) + + def get_fdoc_from_chash(self, chash, mbox): + """ + Return a flags-document by its content-hash and a given mailbox. + Used during content-duplication detection while copying or adding a + message. + + :param chash: the content hash to check against + :type chash: str or unicode + :param mbox: the mailbox + :type mbox: str or unicode + + :return: MessagePartDoc. It will return None if the flags document + has empty content or it is flagged as \\Deleted. + """ + docs_dict = self._chash_fdoc_store.get(chash, None) + fdoc = docs_dict.get(mbox, None) if docs_dict else None + + # a couple of special cases. + # 1. We might have a doc with empty content... + if empty(fdoc): + return None + + # 2. ...Or the message could exist, but being flagged for deletion. + # We want to create a new one in this case. + # Hmmm what if the deletion is un-done?? We would end with a + # duplicate... + if fdoc and fields.DELETED_FLAG in fdoc[fields.FLAGS_KEY]: + return None + + uid = fdoc[fields.UID_KEY] + key = mbox, uid + new = key in self._new + dirty = key in self._dirty + return MessagePartDoc( + new=new, dirty=dirty, store="mem", + part=MessagePartType.fdoc, + content=fdoc, + doc_id=None) + + def all_msg_iter(self): + """ + Return generator that iterates through all messages in the store. + + :return: generator of MessageWrappers + :rtype: generator + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys())) + + def all_new_dirty_msg_iter(self): + """ + Return generator that iterates through all new and dirty messages. + + :return: generator of MessageWrappers + :rtype: generator + """ + return (self.get_message(*key) + for key in sorted(self._msg_store.keys()) + if key in self._new or key in self._dirty) + + def all_msg_dict_for_mbox(self, mbox): + """ + Return all the message dicts for a given mbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: list of dictionaries + :rtype: list + """ + # This *needs* to return a fixed sequence. Otherwise the dictionary len + # will change during iteration, when we modify it + return [self._msg_store[(mb, uid)] + for mb, uid in self._msg_store if mb == mbox] + + def all_deleted_uid_iter(self, mbox): + """ + Return a list with the UIDs for all messags + with deleted flag in a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: list of integers + :rtype: list + """ + # This *needs* to return a fixed sequence. Otherwise the dictionary len + # will change during iteration, when we modify it + all_deleted = [ + msg['fdoc']['uid'] for msg in self.all_msg_dict_for_mbox(mbox) + if msg.get('fdoc', None) + and fields.DELETED_FLAG in msg['fdoc']['flags']] + return all_deleted + + # new, dirty flags + + def _get_new_dirty_state(self, key): + """ + Return `new` and `dirty` flags for a given message. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + :return: tuple of bools + :rtype: tuple + """ + # XXX should return *first* the news, and *then* the dirty... + return map(lambda _set: key in _set, (self._new, self._dirty)) + + def set_new(self, key): + """ + Add the key value to the `new` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._new.add(key) + + def unset_new(self, key): + """ + Remove the key value from the `new` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._new.discard(key) + deferreds = self._new_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + def set_dirty(self, key): + """ + Add the key value to the `dirty` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._dirty.add(key) + + def unset_dirty(self, key): + """ + Remove the key value from the `dirty` set. + + :param key: the key for the message, in the form mbox, uid + :type key: tuple + """ + self._dirty.discard(key) + deferreds = self._dirty_deferreds + d = deferreds.get(key, None) + if d: + # XXX use a namedtuple for passing the result + # when we check it in the other side. + d.callback('%s, ok' % str(key)) + deferreds.pop(key) + + # Recent Flags + + # TODO --- nice but unused + def set_recent_flag(self, mbox, uid): + """ + Set the `Recent` flag for a given mailbox and UID. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + """ + self._rflags_dirty.add(mbox) + self._rflags_store[mbox]['set'].add(uid) + + # TODO --- nice but unused + def unset_recent_flag(self, mbox, uid): + """ + Unset the `Recent` flag for a given mailbox and UID. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the message UID + :type uid: int + """ + self._rflags_store[mbox]['set'].discard(uid) + + def set_recent_flags(self, mbox, value): + """ + Set the value for the set of the recent flags. + Used from the property in the MessageCollection. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: a sequence of flags to set + :type value: sequence + """ + self._rflags_dirty.add(mbox) + self._rflags_store[mbox]['set'] = set(value) + + def load_recent_flags(self, mbox, flags_doc): + """ + Load the passed flags document in the recent flags store, for a given + mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :param flags_doc: A dictionary containing the `doc_id` of the Soledad + flags-document for this mailbox, and the `set` + of uids marked with that flag. + """ + self._rflags_store[mbox] = flags_doc + + def get_recent_flags(self, mbox): + """ + Return the set of UIDs with the `Recent` flag for this mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :rtype: set, or None + """ + rflag_for_mbox = self._rflags_store.get(mbox, None) + if not rflag_for_mbox: + return None + return self._rflags_store[mbox]['set'] + + def all_rdocs_iter(self): + """ + Return an iterator through all in-memory recent flag dicts, wrapped + under a RecentFlagsDoc namedtuple. + Used for saving to disk. + + :return: a generator of RecentFlagDoc + :rtype: generator + """ + # XXX use enums + DOC_ID = "doc_id" + SET = "set" + + rflags_store = self._rflags_store + + def get_rdoc(mbox, rdict): + mbox_rflag_set = rdict[SET] + recent_set = copy(mbox_rflag_set) + # zero it! + mbox_rflag_set.difference_update(mbox_rflag_set) + return RecentFlagsDoc( + doc_id=rflags_store[mbox][DOC_ID], + content={ + fields.TYPE_KEY: fields.TYPE_RECENT_VAL, + fields.MBOX_KEY: mbox, + fields.RECENTFLAGS_KEY: list(recent_set) + }) + + return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() + if not empty(rdict[SET])) + + # Methods that mirror the IMailbox interface + + def remove_all_deleted(self, mbox): + """ + Remove all messages flagged \\Deleted from this Memory Store only. + Called from `expunge` + + :param mbox: the mailbox + :type mbox: str or unicode + :return: a list of UIDs + :rtype: list + """ + mem_deleted = self.all_deleted_uid_iter(mbox) + for uid in mem_deleted: + self.remove_message(mbox, uid) + return mem_deleted + + def expunge(self, mbox, observer): + """ + Remove all messages flagged \\Deleted, from the Memory Store + and from the permanent store also. + + :param mbox: the mailbox + :type mbox: str or unicode + :param observer: a deferred that will be fired when expunge is done + :type observer: Deferred + :return: a list of UIDs + :rtype: list + """ + # TODO expunge should add itself as a callback to the ongoing + # writes. + soledad_store = self._permanent_store + all_deleted = [] + + try: + # 1. Stop the writing call + self._stop_write_loop() + # 2. Enqueue a last write. + #self.write_messages(soledad_store) + # 3. Should wait on the writebacks to finish ??? + # FIXME wait for this, and add all the rest of the method + # as a callback!!! + except Exception as exc: + logger.exception(exc) + + # Now, we...: + + try: + # 1. Delete all messages marked as deleted in soledad. + + # XXX this could be deferred for faster operation. + if soledad_store: + sol_deleted = soledad_store.remove_all_deleted(mbox) + else: + sol_deleted = [] + + try: + self._known_uids[mbox].difference_update(set(sol_deleted)) + except Exception as exc: + logger.exception(exc) + + # 2. Delete all messages marked as deleted in memory. + mem_deleted = self.remove_all_deleted(mbox) + + all_deleted = set(mem_deleted).union(set(sol_deleted)) + logger.debug("deleted %r" % all_deleted) + except Exception as exc: + logger.exception(exc) + finally: + self._start_write_loop() + observer.callback(True) + return all_deleted + + # Dump-to-disk controls. + + @property + def is_writing(self): + """ + Property that returns whether the store is currently writing its + internal state to a permanent storage. + + Used to evaluate whether the CHECK command can inform that the field + is clear to proceed, or waiting for the write operations to complete + is needed instead. + + :rtype: bool + """ + # FIXME this should return a deferred !!! + # XXX ----- can fire when all new + dirty deferreds + # are done (gatherResults) + return getattr(self, self.WRITING_FLAG) + + # Memory management. + + def get_size(self): + """ + Return the size of the internal storage. + Use for calculating the limit beyond which we should flush the store. + + :rtype: int + """ + return size.get_size(self._msg_store) diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py new file mode 100644 index 0000000..b07681b --- /dev/null +++ b/mail/src/leap/mail/imap/messageparts.py @@ -0,0 +1,565 @@ +# messageparts.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +MessagePart implementation. Used from LeapMessage. +""" +import logging +import StringIO +import weakref + +from collections import namedtuple + +from enum import Enum +from zope.interface import implements +from twisted.mail import imap4 + +from leap.common.decorators import memoized_method +from leap.common.mail import get_email_charset +from leap.mail.imap import interfaces +from leap.mail.imap.fields import fields +from leap.mail.utils import empty, first, find_charset + +MessagePartType = Enum("hdoc", "fdoc", "cdoc", "cdocs", "docs_id") + + +logger = logging.getLogger(__name__) + + +""" +A MessagePartDoc is a light wrapper around the dictionary-like +data that we pass along for message parts. It can be used almost everywhere +that you would expect a SoledadDocument, since it has a dict under the +`content` attribute. + +We also keep some metadata on it, relative in part to the message as a whole, +and sometimes to a part in particular only. + +* `new` indicates that the document has just been created. SoledadStore + should just create a new doc for all the related message parts. +* `store` indicates the type of store a given MessagePartDoc lives in. + We currently use this to indicate that the document comes from memeory, + but we should probably get rid of it as soon as we extend the use of the + SoledadStore interface along LeapMessage, MessageCollection and Mailbox. +* `part` is one of the MessagePartType enums. + +* `dirty` indicates that, while we already have the document in Soledad, + we have modified its state in memory, so we need to put_doc instead while + dumping the MemoryStore contents. + `dirty` attribute would only apply to flags-docs and linkage-docs. +* `doc_id` is the identifier for the document in the u1db database, if any. + +""" + +MessagePartDoc = namedtuple( + 'MessagePartDoc', + ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) + +""" +A RecentFlagsDoc is used to send the recent-flags document payload to the +SoledadWriter during dumps. +""" +RecentFlagsDoc = namedtuple( + 'RecentFlagsDoc', + ['content', 'doc_id']) + + +class ReferenciableDict(dict): + """ + A dict that can be weak-referenced. + + Some builtin objects are not weak-referenciable unless + subclassed. So we do. + + Used to return pointers to the items in the MemoryStore. + """ + + +class MessageWrapper(object): + """ + A simple nested dictionary container around the different message subparts. + """ + implements(interfaces.IMessageContainer) + + FDOC = "fdoc" + HDOC = "hdoc" + CDOCS = "cdocs" + DOCS_ID = "docs_id" + + # Using slots to limit some the memory footprint, + # Add your attribute here. + + __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] + + def __init__(self, fdoc=None, hdoc=None, cdocs=None, + from_dict=None, memstore=None, + new=True, dirty=False, docs_id={}): + """ + Initialize a MessageWrapper. + """ + # TODO add optional reference to original message in the incoming + self._dict = {} + self.memstore = memstore + + self._new = new + self._dirty = dirty + + self._storetype = "mem" + + if from_dict is not None: + self.from_dict(from_dict) + else: + if fdoc is not None: + self._dict[self.FDOC] = ReferenciableDict(fdoc) + if hdoc is not None: + self._dict[self.HDOC] = ReferenciableDict(hdoc) + if cdocs is not None: + self._dict[self.CDOCS] = ReferenciableDict(cdocs) + + # This will keep references to the doc_ids to be able to put + # messages to soledad. It will be populated during the walk() to avoid + # the overhead of reading from the db. + + # XXX it really *only* make sense for the FDOC, the other parts + # should not be "dirty", just new...!!! + self._dict[self.DOCS_ID] = docs_id + + # properties + + # TODO Could refactor new and dirty properties together. + + def _get_new(self): + """ + Get the value for the `new` flag. + + :rtype: bool + """ + return self._new + + def _set_new(self, value=True): + """ + Set the value for the `new` flag, and propagate it + to the memory store if any. + + :param value: the value to set + :type value: bool + """ + self._new = value + if self.memstore: + mbox = self.fdoc.content['mbox'] + uid = self.fdoc.content['uid'] + key = mbox, uid + fun = [self.memstore.unset_new, + self.memstore.set_new][int(value)] + fun(key) + else: + logger.warning("Could not find a memstore referenced from this " + "MessageWrapper. The value for new will not be " + "propagated") + + new = property(_get_new, _set_new, + doc="The `new` flag for this MessageWrapper") + + def _get_dirty(self): + """ + Get the value for the `dirty` flag. + + :rtype: bool + """ + return self._dirty + + def _set_dirty(self, value=True): + """ + Set the value for the `dirty` flag, and propagate it + to the memory store if any. + + :param value: the value to set + :type value: bool + """ + self._dirty = value + if self.memstore: + mbox = self.fdoc.content['mbox'] + uid = self.fdoc.content['uid'] + key = mbox, uid + fun = [self.memstore.unset_dirty, + self.memstore.set_dirty][int(value)] + fun(key) + else: + logger.warning("Could not find a memstore referenced from this " + "MessageWrapper. The value for new will not be " + "propagated") + + dirty = property(_get_dirty, _set_dirty) + + # IMessageContainer + + @property + def fdoc(self): + """ + Return a MessagePartDoc wrapping around a weak reference to + the flags-document in this MemoryStore, if any. + + :rtype: MessagePartDoc + """ + _fdoc = self._dict.get(self.FDOC, None) + if _fdoc: + content_ref = weakref.proxy(_fdoc) + else: + logger.warning("NO FDOC!!!") + content_ref = {} + + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.fdoc, + content=content_ref, + doc_id=self._dict[self.DOCS_ID].get( + self.FDOC, None)) + + @property + def hdoc(self): + """ + Return a MessagePartDoc wrapping around a weak reference to + the headers-document in this MemoryStore, if any. + + :rtype: MessagePartDoc + """ + _hdoc = self._dict.get(self.HDOC, None) + if _hdoc: + content_ref = weakref.proxy(_hdoc) + else: + content_ref = {} + return MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.hdoc, + content=content_ref, + doc_id=self._dict[self.DOCS_ID].get( + self.HDOC, None)) + + @property + def cdocs(self): + """ + Return a weak reference to a zero-indexed dict containing + the content-documents, or an empty dict if none found. + If you want access to the MessagePartDoc for the individual + parts, use the generator returned by `walk` instead. + + :rtype: dict + """ + _cdocs = self._dict.get(self.CDOCS, None) + if _cdocs: + return weakref.proxy(_cdocs) + else: + return {} + + def walk(self): + """ + Generator that iterates through all the parts, returning + MessagePartDoc. Used for writing to SoledadStore. + + :rtype: generator + """ + if self._dirty: + mbox = self.fdoc.content[fields.MBOX_KEY] + uid = self.fdoc.content[fields.UID_KEY] + docid_dict = self._dict[self.DOCS_ID] + docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( + mbox, uid) + + if not empty(self.fdoc.content): + yield self.fdoc + if not empty(self.hdoc.content): + yield self.hdoc + for cdoc in self.cdocs.values(): + if not empty(cdoc): + content_ref = weakref.proxy(cdoc) + yield MessagePartDoc(new=self.new, dirty=self.dirty, + store=self._storetype, + part=MessagePartType.cdoc, + content=content_ref, + doc_id=None) + + # i/o + + def as_dict(self): + """ + Return a dict representation of the parts contained. + + :rtype: dict + """ + return self._dict + + def from_dict(self, msg_dict): + """ + Populate MessageWrapper parts from a dictionary. + It expects the same format that we use in a + MessageWrapper. + + + :param msg_dict: a dictionary containing the parts to populate + the MessageWrapper from + :type msg_dict: dict + """ + fdoc, hdoc, cdocs = map( + lambda part: msg_dict.get(part, None), + [self.FDOC, self.HDOC, self.CDOCS]) + + for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), + (self.CDOCS, cdocs)): + self._dict[t] = ReferenciableDict(doc) if doc else None + + +class MessagePart(object): + """ + IMessagePart implementor, to be passed to several methods + of the IMAP4Server. + It takes a subpart message and is able to find + the inner parts. + + See the interface documentation. + """ + + implements(imap4.IMessagePart) + + def __init__(self, soledad, part_map): + """ + Initializes the MessagePart. + + :param soledad: Soledad instance. + :type soledad: Soledad + :param part_map: a dictionary containing the parts map for this + message + :type part_map: dict + """ + # TODO + # It would be good to pass the uid/mailbox also + # for references while debugging. + + # We have a problem on bulk moves, and is + # that when the fetch on the new mailbox is done + # the parts maybe are not complete. + # So we should be able to fail with empty + # docs until we solve that. The ideal would be + # to gather the results of the deferred operations + # to signal the operation is complete. + #leap_assert(part_map, "part map dict cannot be null") + + self._soledad = soledad + self._pmap = part_map + + def getSize(self): + """ + Return the total size, in octets, of this message part. + + :return: size of the message, in octets + :rtype: int + """ + if empty(self._pmap): + return 0 + size = self._pmap.get('size', None) + if size is None: + logger.error("Message part cannot find size in the partmap") + size = 0 + return size + + def getBodyFile(self): + """ + Retrieve a file object containing only the body of this message. + + :return: file-like object opened for reading + :rtype: StringIO + """ + fd = StringIO.StringIO() + if not empty(self._pmap): + multi = self._pmap.get('multi') + if not multi: + phash = self._pmap.get("phash", None) + else: + pmap = self._pmap.get('part_map') + first_part = pmap.get('1', None) + if not empty(first_part): + phash = first_part['phash'] + else: + phash = None + + if phash is None: + logger.warning("Could not find phash for this subpart!") + payload = "" + else: + payload = self._get_payload_from_document(phash) + + else: + logger.warning("Message with no part_map!") + payload = "" + + if payload: + content_type = self._get_ctype_from_document(phash) + charset = find_charset(content_type) + logger.debug("Got charset from header: %s" % (charset,)) + if charset is None: + charset = self._get_charset(payload) + logger.debug("Got charset: %s" % (charset,)) + try: + if isinstance(payload, unicode): + payload = payload.encode(charset) + except UnicodeError as exc: + logger.error( + "Unicode error, using 'replace'. {0!r}".format(exc)) + payload = payload.encode(charset, 'replace') + + fd.write(payload) + fd.seek(0) + return fd + + # TODO should memory-bound this memoize!!! + @memoized_method + def _get_payload_from_document(self, phash): + """ + Return the message payload from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: str or unicode + :rtype: str or unicode + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if cdoc is None: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + payload = "" + else: + payload = cdoc.content.get(fields.RAW_KEY, "") + return payload + + # TODO should memory-bound this memoize!!! + @memoized_method + def _get_ctype_from_document(self, phash): + """ + Reeturn the content-type from the content document. + + :param phash: the payload hash to retrieve by. + :type phash: str or unicode + :rtype: str or unicode + """ + cdocs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + + cdoc = first(cdocs) + if not cdoc: + logger.warning( + "Could not find the content doc " + "for phash %s" % (phash,)) + ctype = cdoc.content.get('ctype', "") + return ctype + + @memoized_method + def _get_charset(self, stuff): + # TODO put in a common class with LeapMessage + """ + Gets (guesses?) the charset of a payload. + + :param stuff: the stuff to guess about. + :type stuff: str or unicode + :return: charset + :rtype: unicode + """ + # XXX existential doubt 2. shouldn't we make the scope + # of the decorator somewhat more persistent? + # ah! yes! and put memory bounds. + return get_email_charset(stuff) + + def getHeaders(self, negate, *names): + """ + Retrieve a group of message headers. + + :param names: The names of the headers to retrieve or omit. + :type names: tuple of str + + :param negate: If True, indicates that the headers listed in names + should be omitted from the return value, rather + than included. + :type negate: bool + + :return: A mapping of header field names to header field values + :rtype: dict + """ + # XXX refactor together with MessagePart method + if not self._pmap: + logger.warning("No pmap in Subpart!") + return {} + headers = dict(self._pmap.get("headers", [])) + + names = map(lambda s: s.upper(), names) + if negate: + cond = lambda key: key.upper() not in names + else: + cond = lambda key: key.upper() in names + + # default to most likely standard + charset = find_charset(headers, "utf-8") + headers2 = dict() + for key, value in headers.items(): + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... + if key.lower() == "content-type": + key = key.lower() + + if not isinstance(key, str): + key = key.encode(charset, 'replace') + if not isinstance(value, str): + value = value.encode(charset, 'replace') + + # filter original dict by negate-condition + if cond(key): + headers2[key] = value + return headers2 + + def isMultipart(self): + """ + Return True if this message is multipart. + """ + if empty(self._pmap): + logger.warning("Could not get part map!") + return False + multi = self._pmap.get("multi", False) + return multi + + def getSubPart(self, part): + """ + Retrieve a MIME submessage + + :type part: C{int} + :param part: The number of the part to retrieve, indexed from 0. + :raise IndexError: Raised if the specified part does not exist. + :raise TypeError: Raised if this message is not multipart. + :rtype: Any object implementing C{IMessagePart}. + :return: The specified sub-part. + """ + if not self.isMultipart(): + raise TypeError + + sub_pmap = self._pmap.get("part_map", {}) + try: + part_map = sub_pmap[str(part + 1)] + except KeyError: + logger.debug("getSubpart for %s: KeyError" % (part,)) + raise IndexError + + # XXX check for validity + return MessagePart(self._soledad, part_map) diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index 34304ea..25fc55f 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -20,17 +20,15 @@ LeapMessage and MessageCollection. import copy import logging import re -import time import threading import StringIO -from collections import defaultdict, namedtuple +from collections import defaultdict from functools import partial from twisted.mail import imap4 from twisted.internet import defer from twisted.python import log -from u1db import errors as u1db_errors from zope.interface import implements from zope.proxy import sameProxiedObjects @@ -38,33 +36,30 @@ from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset from leap.mail import walk -from leap.mail.utils import first, find_charset -from leap.mail.decorators import deferred +from leap.mail.utils import first, find_charset, lowerdict, empty +from leap.mail.utils import stringify_parts_map +from leap.mail.decorators import deferred_to_thread from leap.mail.imap.index import IndexedDB from leap.mail.imap.fields import fields, WithMsgFields +from leap.mail.imap.memorystore import MessageWrapper +from leap.mail.imap.messageparts import MessagePart from leap.mail.imap.parser import MailParser, MBoxParser -from leap.mail.messageflow import IMessageConsumer logger = logging.getLogger(__name__) # TODO ------------------------------------------------------------ +# [ ] Add ref to incoming message during add_msg # [ ] Add linked-from info. +# * Need a new type of documents: linkage info. +# * HDOCS are linked from FDOCs (ref to chash) +# * CDOCS are linked from HDOCS (ref to chash) + # [ ] Delete incoming mail only after successful write! # [ ] Remove UID from syncable db. Store only those indexes locally. - -def lowerdict(_dict): - """ - Return a dict with the keys in lowercase. - - :param _dict: the dict to convert - :rtype: dict - """ - # TODO should properly implement a CaseInsensitive dict. - # Look into requests code. - return dict((key.lower(), value) - for key, value in _dict.items()) +MSGID_PATTERN = r"""<([\w@.]+)>""" +MSGID_RE = re.compile(MSGID_PATTERN) def try_unique_query(curried): @@ -92,232 +87,6 @@ def try_unique_query(curried): except Exception as exc: logger.exception("Unhandled error %r" % exc) -MSGID_PATTERN = r"""<([\w@.]+)>""" -MSGID_RE = re.compile(MSGID_PATTERN) - - -class MessagePart(object): - """ - IMessagePart implementor. - It takes a subpart message and is able to find - the inner parts. - - Excusatio non petita: see the interface documentation. - """ - - implements(imap4.IMessagePart) - - def __init__(self, soledad, part_map): - """ - Initializes the MessagePart. - - :param part_map: a dictionary containing the parts map for this - message - :type part_map: dict - """ - # TODO - # It would be good to pass the uid/mailbox also - # for references while debugging. - - # We have a problem on bulk moves, and is - # that when the fetch on the new mailbox is done - # the parts maybe are not complete. - # So we should be able to fail with empty - # docs until we solve that. The ideal would be - # to gather the results of the deferred operations - # to signal the operation is complete. - #leap_assert(part_map, "part map dict cannot be null") - self._soledad = soledad - self._pmap = part_map - - def getSize(self): - """ - Return the total size, in octets, of this message part. - - :return: size of the message, in octets - :rtype: int - """ - if not self._pmap: - return 0 - size = self._pmap.get('size', None) - if not size: - logger.error("Message part cannot find size in the partmap") - return size - - def getBodyFile(self): - """ - Retrieve a file object containing only the body of this message. - - :return: file-like object opened for reading - :rtype: StringIO - """ - fd = StringIO.StringIO() - if self._pmap: - multi = self._pmap.get('multi') - if not multi: - phash = self._pmap.get("phash", None) - else: - pmap = self._pmap.get('part_map') - first_part = pmap.get('1', None) - if first_part: - phash = first_part['phash'] - - if not phash: - logger.warning("Could not find phash for this subpart!") - payload = str("") - else: - payload = self._get_payload_from_document(phash) - - else: - logger.warning("Message with no part_map!") - payload = str("") - - if payload: - content_type = self._get_ctype_from_document(phash) - charset = find_charset(content_type) - logger.debug("Got charset from header: %s" % (charset,)) - if charset is None: - charset = self._get_charset(payload) - logger.debug("Got charset: %s" % (charset,)) - try: - payload = payload.encode(charset) - except (UnicodeEncodeError, UnicodeDecodeError) as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) - payload = payload.encode(charset, 'replace') - - fd.write(payload) - fd.seek(0) - return fd - - # TODO cache the phash retrieval - def _get_payload_from_document(self, phash): - """ - Gets the message payload from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: basestring - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - payload = cdoc.content.get(fields.RAW_KEY, "") - return payload - - # TODO cache the pahash retrieval - def _get_ctype_from_document(self, phash): - """ - Gets the content-type from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: basestring - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - ctype = cdoc.content.get('ctype', "") - return ctype - - @memoized_method - def _get_charset(self, stuff): - # TODO put in a common class with LeapMessage - """ - Gets (guesses?) the charset of a payload. - - :param stuff: the stuff to guess about. - :type stuff: basestring - :returns: charset - """ - # XXX existential doubt 2. shouldn't we make the scope - # of the decorator somewhat more persistent? - # ah! yes! and put memory bounds. - return get_email_charset(unicode(stuff)) - - def getHeaders(self, negate, *names): - """ - Retrieve a group of message headers. - - :param names: The names of the headers to retrieve or omit. - :type names: tuple of str - - :param negate: If True, indicates that the headers listed in names - should be omitted from the return value, rather - than included. - :type negate: bool - - :return: A mapping of header field names to header field values - :rtype: dict - """ - if not self._pmap: - logger.warning("No pmap in Subpart!") - return {} - headers = dict(self._pmap.get("headers", [])) - - # twisted imap server expects *some* headers to be lowercase - # We could use a CaseInsensitiveDict here... - headers = dict( - (str(key), str(value)) if key.lower() != "content-type" - else (str(key.lower()), str(value)) - for (key, value) in headers.items()) - - names = map(lambda s: s.upper(), names) - if negate: - cond = lambda key: key.upper() not in names - else: - cond = lambda key: key.upper() in names - - # unpack and filter original dict by negate-condition - filter_by_cond = [ - map(str, (key, val)) for - key, val in headers.items() - if cond(key)] - filtered = dict(filter_by_cond) - return filtered - - def isMultipart(self): - """ - Return True if this message is multipart. - """ - if not self._pmap: - logger.warning("Could not get part map!") - return False - multi = self._pmap.get("multi", False) - return multi - - def getSubPart(self, part): - """ - Retrieve a MIME submessage - - :type part: C{int} - :param part: The number of the part to retrieve, indexed from 0. - :raise IndexError: Raised if the specified part does not exist. - :raise TypeError: Raised if this message is not multipart. - :rtype: Any object implementing C{IMessagePart}. - :return: The specified sub-part. - """ - if not self.isMultipart(): - raise TypeError - sub_pmap = self._pmap.get("part_map", {}) - try: - part_map = sub_pmap[str(part + 1)] - except KeyError: - logger.debug("getSubpart for %s: KeyError" % (part,)) - raise IndexError - - # XXX check for validity - return MessagePart(self._soledad, part_map) - class LeapMessage(fields, MailParser, MBoxParser): """ @@ -328,12 +97,14 @@ class LeapMessage(fields, MailParser, MBoxParser): """ # TODO this has to change. - # Should index primarily by chash, and keep a local-lonly + # Should index primarily by chash, and keep a local-only # UID table. implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox, collection=None): + flags_lock = threading.Lock() + + def __init__(self, soledad, uid, mbox, collection=None, container=None): """ Initializes a LeapMessage. @@ -342,32 +113,54 @@ class LeapMessage(fields, MailParser, MBoxParser): :param uid: the UID for the message. :type uid: int or basestring :param mbox: the mbox this message belongs to - :type mbox: basestring + :type mbox: str or unicode :param collection: a reference to the parent collection object :type collection: MessageCollection + :param container: a IMessageContainer implementor instance + :type container: IMessageContainer """ MailParser.__init__(self) self._soledad = soledad self._uid = int(uid) self._mbox = self._parse_mailbox_name(mbox) self._collection = collection + self._container = container self.__chash = None self.__bdoc = None + # XXX make these properties public + @property def _fdoc(self): """ An accessor to the flags document. """ if all(map(bool, (self._uid, self._mbox))): - fdoc = self._get_flags_doc() + fdoc = None + if self._container is not None: + fdoc = self._container.fdoc + if not fdoc: + fdoc = self._get_flags_doc() if fdoc: - self.__chash = fdoc.content.get( + fdoc_content = fdoc.content + self.__chash = fdoc_content.get( fields.CONTENT_HASH_KEY, None) return fdoc @property + def _hdoc(self): + """ + An accessor to the headers document. + """ + if self._container is not None: + hdoc = self._container.hdoc + if hdoc and not empty(hdoc.content): + return hdoc + # XXX cache this into the memory store !!! + return self._get_headers_doc() + + @property def _chash(self): """ An accessor to the content hash for this message. @@ -380,13 +173,6 @@ class LeapMessage(fields, MailParser, MBoxParser): return self.__chash @property - def _hdoc(self): - """ - An accessor to the headers document. - """ - return self._get_headers_doc() - - @property def _bdoc(self): """ An accessor to the body document. @@ -415,39 +201,33 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: The flags, represented as strings :rtype: tuple """ - if self._uid is None: - return [] uid = self._uid - flags = [] + flags = set([]) fdoc = self._fdoc if fdoc: - flags = fdoc.content.get(self.FLAGS_KEY, None) + flags = set(fdoc.content.get(self.FLAGS_KEY, None)) msgcol = self._collection # We treat the recent flag specially: gotten from # a mailbox-level document. if msgcol and uid in msgcol.recent_flags: - flags.append(fields.RECENT_FLAG) + flags.add(fields.RECENT_FLAG) if flags: flags = map(str, flags) return tuple(flags) - # setFlags, addFlags, removeFlags are not in the interface spec - # but we use them with store command. + # setFlags not in the interface spec but we use it with store command. - def setFlags(self, flags): + def setFlags(self, flags, mode): """ Sets the flags for this message - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to update in the message. :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") log.msg('setting flags: %s (%s)' % (self._uid, flags)) @@ -458,42 +238,36 @@ class LeapMessage(fields, MailParser, MBoxParser): "Could not find FDOC for %s:%s while setting flags!" % (self._mbox, self._uid)) return - doc.content[self.FLAGS_KEY] = flags - doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags - doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags - self._soledad.put_doc(doc) - - def addFlags(self, flags): - """ - Adds flags to this message. - - Returns a SoledadDocument that needs to be updated by the caller. - :param flags: the flags to add to the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(flags + oldflags))) - - def removeFlags(self, flags): - """ - Remove flags from this message. - - Returns a SoledadDocument that needs to be updated by the caller. - - :param flags: the flags to be removed from the message. - :type flags: tuple of str - - :return: a SoledadDocument instance - :rtype: SoledadDocument - """ - leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - oldflags = self.getFlags() - self.setFlags(tuple(set(oldflags) - set(flags))) + APPEND = 1 + REMOVE = -1 + SET = 0 + + with self.flags_lock: + current = doc.content[self.FLAGS_KEY] + if mode == APPEND: + newflags = tuple(set(tuple(current) + flags)) + elif mode == REMOVE: + newflags = tuple(set(current).difference(set(flags))) + elif mode == SET: + newflags = flags + + # We could defer this, but I think it's better + # to put it under the lock... + doc.content[self.FLAGS_KEY] = newflags + doc.content[self.SEEN_KEY] = self.SEEN_FLAG in flags + doc.content[self.DEL_KEY] = self.DELETED_FLAG in flags + + if self._collection.memstore is not None: + log.msg("putting message in collection") + self._collection.memstore.put_message( + self._mbox, self._uid, + MessageWrapper(fdoc=doc.content, new=False, dirty=True, + docs_id={'fdoc': doc.doc_id})) + else: + # fallback for non-memstore initializations. + self._soledad.put_doc(doc) + return map(str, newflags) def getInternalDate(self): """ @@ -519,29 +293,42 @@ class LeapMessage(fields, MailParser, MBoxParser): :return: file-like object opened for reading :rtype: StringIO """ + def write_fd(body): + fd.write(body) + fd.seek(0) + return fd + # TODO refactor with getBodyFile in MessagePart + fd = StringIO.StringIO() - bdoc = self._bdoc - if bdoc: - body = self._bdoc.content.get(self.RAW_KEY, "") - content_type = bdoc.content.get('content-type', "") + + if self._bdoc is not None: + bdoc_content = self._bdoc.content + if empty(bdoc_content): + logger.warning("No BDOC content found for message!!!") + return write_fd("") + + body = bdoc_content.get(self.RAW_KEY, "") + content_type = bdoc_content.get('content-type', "") charset = find_charset(content_type) + logger.debug('got charset from content-type: %s' % charset) if charset is None: charset = self._get_charset(body) try: - body = body.encode(charset) - except UnicodeError as e: - logger.error("Unicode error, using 'replace'. {0!r}".format(e)) + if isinstance(body, unicode): + body = body.encode(charset) + except UnicodeError as exc: + logger.error( + "Unicode error, using 'replace'. {0!r}".format(exc)) + logger.debug("Attempted to encode with: %s" % charset) body = body.encode(charset, 'replace') + finally: + return write_fd(body) # We are still returning funky characters from here. else: logger.warning("No BDOC found for message.") - body = str("") - - fd.write(body) - fd.seek(0) - return fd + return write_fd("") @memoized_method def _get_charset(self, stuff): @@ -552,11 +339,10 @@ class LeapMessage(fields, MailParser, MBoxParser): :type stuff: basestring :returns: charset """ - # TODO get from subpart headers - # XXX existential doubt 2. shouldn't we make the scope + # XXX shouldn't we make the scope # of the decorator somewhat more persistent? # ah! yes! and put memory bounds. - return get_email_charset(unicode(stuff)) + return get_email_charset(stuff) def getSize(self): """ @@ -567,7 +353,8 @@ class LeapMessage(fields, MailParser, MBoxParser): """ size = None if self._fdoc: - size = self._fdoc.content.get(self.SIZE_KEY, False) + fdoc_content = self._fdoc.content + size = fdoc_content.get(self.SIZE_KEY, False) else: logger.warning("No FLAGS doc for %s:%s" % (self._mbox, self._uid)) @@ -592,6 +379,8 @@ class LeapMessage(fields, MailParser, MBoxParser): :rtype: dict """ # TODO split in smaller methods + # XXX refactor together with MessagePart method + headers = self._get_headers() if not headers: logger.warning("No headers found") @@ -608,11 +397,10 @@ class LeapMessage(fields, MailParser, MBoxParser): # default to most likely standard charset = find_charset(headers, "utf-8") - - # twisted imap server expects *some* headers to be lowercase - # XXX refactor together with MessagePart method headers2 = dict() for key, value in headers.items(): + # twisted imap server expects *some* headers to be lowercase + # We could use a CaseInsensitiveDict here... if key.lower() == "content-type": key = key.lower() @@ -621,10 +409,13 @@ class LeapMessage(fields, MailParser, MBoxParser): if not isinstance(value, str): value = value.encode(charset, 'replace') + if value.endswith(";"): + # bastards + value = value[:-1] + # filter original dict by negate-condition if cond(key): headers2[key] = value - return headers2 def _get_headers(self): @@ -632,7 +423,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the headers dict for this message. """ if self._hdoc is not None: - headers = self._hdoc.content.get(self.HEADERS_KEY, {}) + hdoc_content = self._hdoc.content + headers = hdoc_content.get(self.HEADERS_KEY, {}) return headers else: @@ -646,7 +438,8 @@ class LeapMessage(fields, MailParser, MBoxParser): Return True if this message is multipart. """ if self._fdoc: - is_multipart = self._fdoc.content.get(self.MULTIPART_KEY, False) + fdoc_content = self._fdoc.content + is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) return is_multipart else: logger.warning( @@ -688,9 +481,15 @@ class LeapMessage(fields, MailParser, MBoxParser): logger.warning("Tried to get part but no HDOC found!") return None - pmap = self._hdoc.content.get(fields.PARTS_MAP_KEY, {}) + hdoc_content = self._hdoc.content + pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) + + # remember, lads, soledad is using strings in its keys, + # not integers! return pmap[str(part)] + # XXX moved to memory store + # move the rest too. ------------------------------------------ def _get_flags_doc(self): """ Return the document that keeps the flags for this @@ -724,16 +523,31 @@ class LeapMessage(fields, MailParser, MBoxParser): Return the document that keeps the body for this message. """ - body_phash = self._hdoc.content.get( + hdoc_content = self._hdoc.content + body_phash = hdoc_content.get( fields.BODY_KEY, None) if not body_phash: logger.warning("No body phash for this document!") return None - body_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(body_phash)) - return first(body_docs) + # XXX get from memstore too... + # if memstore: memstore.get_phrash + # memstore should keep a dict with weakrefs to the + # phash doc... + + if self._container is not None: + bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) + if not empty(bdoc) and not empty(bdoc.content): + return bdoc + + # no memstore, or no body doc found there + if self._soledad: + body_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(body_phash)) + return first(body_docs) + else: + logger.error("No phash in container, and no soledad found!") def __getitem__(self, key): """ @@ -748,216 +562,19 @@ class LeapMessage(fields, MailParser, MBoxParser): """ return self._fdoc.content.get(key, None) - # setters - - # XXX to be used in the messagecopier interface?! - - def set_uid(self, uid): - """ - Set new uid for this message. - - :param uid: the new uid - :type uid: basestring - """ - # XXX dangerous! lock? - self._uid = uid - d = self._fdoc - d.content[self.UID_KEY] = uid - self._soledad.put_doc(d) - - def set_mbox(self, mbox): - """ - Set new mbox for this message. - - :param mbox: the new mbox - :type mbox: basestring - """ - # XXX dangerous! lock? - self._mbox = mbox - d = self._fdoc - d.content[self.MBOX_KEY] = mbox - self._soledad.put_doc(d) - - # destructor - - @deferred - def remove(self): - """ - Remove all docs associated with this message. - """ - # XXX For the moment we are only removing the flags and headers - # docs. The rest we leave there polluting your hard disk, - # until we think about a good way of deorphaning. - # Maybe a crawler of unreferenced docs. - - # XXX implement elijah's idea of using a PUT document as a - # token to ensure consistency in the removal. - - uid = self._uid - - fd = self._get_flags_doc() - #hd = self._get_headers_doc() - #bd = self._get_body_doc() - #docs = [fd, hd, bd] - - docs = [fd] - - for d in filter(None, docs): - try: - self._soledad.delete_doc(d) - except Exception as exc: - logger.error(exc) - return uid - def does_exist(self): """ - Return True if there is actually a flags message for this + Return True if there is actually a flags document for this UID and mbox. """ - return self._fdoc is not None - - -class ContentDedup(object): - """ - Message deduplication. - - We do a query for the content hashes before writing to our beloved - sqlcipher backend of Soledad. This means, by now, that: - - 1. We will not store the same attachment twice, only the hash of it. - 2. We will not store the same message body twice, only the hash of it. - - The first case is useful if you are always receiving the same old memes - from unwary friends that still have not discovered that 4chan is the - generator of the internet. The second will save your day if you have - initiated session with the same account in two different machines. I also - wonder why would you do that, but let's respect each other choices, like - with the religious celebrations, and assume that one day we'll be able - to run Bitmask in completely free phones. Yes, I mean that, the whole GSM - Stack. - """ - - def _content_does_exist(self, doc): - """ - Check whether we already have a content document for a payload - with this hash in our database. - - :param doc: tentative body document - :type doc: dict - :returns: True if that happens, False otherwise. - """ - if not doc: - return False - phash = doc[fields.PAYLOAD_HASH_KEY] - attach_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - if not attach_docs: - return False - - if len(attach_docs) != 1: - logger.warning("Found more than one copy of phash %s!" - % (phash,)) - logger.debug("Found attachment doc with that hash! Skipping save!") - return True - - -SoledadWriterPayload = namedtuple( - 'SoledadWriterPayload', ['mode', 'payload']) - -# TODO we could consider using enum here: -# https://pypi.python.org/pypi/enum - -SoledadWriterPayload.CREATE = 1 -SoledadWriterPayload.PUT = 2 -SoledadWriterPayload.CONTENT_CREATE = 3 - - -""" -SoledadDocWriter was used to avoid writing to the db from multiple threads. -Its use here has been deprecated in favor of a local rw_lock in the client. -But we might want to reuse in in the near future to implement priority queues. -""" - - -class SoledadDocWriter(object): - """ - This writer will create docs serially in the local soledad database. - """ - - implements(IMessageConsumer) - - def __init__(self, soledad): - """ - Initialize the writer. - - :param soledad: the soledad instance - :type soledad: Soledad - """ - self._soledad = soledad - - def _get_call_for_item(self, item): - """ - Return the proper call type for a given item. - - :param item: one of the types defined under the - attributes of SoledadWriterPayload - :type item: int - """ - call = None - payload = item.payload - - if item.mode == SoledadWriterPayload.CREATE: - call = self._soledad.create_doc - elif (item.mode == SoledadWriterPayload.CONTENT_CREATE - and not self._content_does_exist(payload)): - call = self._soledad.create_doc - elif item.mode == SoledadWriterPayload.PUT: - call = self._soledad.put_doc - return call - - def _process(self, queue): - """ - Return the item and the proper call type for the next - item in the queue if any. + return not empty(self._fdoc) - :param queue: the queue from where we'll pick item. - :type queue: Queue - """ - item = queue.get() - call = self._get_call_for_item(item) - return item, call - - def consume(self, queue): - """ - Creates a new document in soledad db. - - :param queue: queue to get item from, with content of the document - to be inserted. - :type queue: Queue - """ - empty = queue.empty() - while not empty: - item, call = self._process(queue) - - if call: - # XXX should handle the delete case - # should handle errors - try: - call(item.payload) - except u1db_errors.RevisionConflict as exc: - logger.error("Error: %r" % (exc,)) - raise exc - empty = queue.empty() - - -class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, - ContentDedup): +class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser): """ A collection of messages, surprisingly. - It is tied to a selected mailbox name that is passed to constructor. + It is tied to a selected mailbox name that is passed to its constructor. Implements a filter query over the messages contained in a soledad database. """ @@ -1058,7 +675,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, _hdocset_lock = threading.Lock() _hdocset_property_lock = threading.Lock() - def __init__(self, mbox=None, soledad=None): + def __init__(self, mbox=None, soledad=None, memstore=None): """ Constructor for MessageCollection. @@ -1068,13 +685,18 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, MessageCollection for each mailbox, instead of treating them as a property of each message. + We are passed an instance of MemoryStore, the same for the + SoledadBackedAccount, that we use as a read cache and a buffer + for writes. + :param mbox: the name of the mailbox. It is the name with which we filter the query over the - messages database + messages database. :type mbox: str - :param soledad: Soledad database :type soledad: Soledad instance + :param memstore: a MemoryStore instance + :type memstore: MemoryStore """ MailParser.__init__(self) leap_assert(mbox, "Need a mailbox name to initialize") @@ -1084,15 +706,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, leap_assert(soledad, "Need a soledad instance to initialize") # okay, all in order, keep going... + self.mbox = self._parse_mailbox_name(mbox) + + # XXX get a SoledadStore passed instead self._soledad = soledad + self.memstore = memstore + self.__rflags = None self.__hdocset = None self.initialize_db() # ensure that we have a recent-flags and a hdocs-sec doc self._get_or_create_rdoc() - self._get_or_create_hdocset() + + # Not for now... + #self._get_or_create_hdocset() def _get_empty_doc(self, _type=FLAGS_DOC): """ @@ -1210,17 +839,21 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :type chash: basestring :return: False, if it does not exist, or UID. """ - exist = self._get_fdoc_from_chash(chash) + exist = False + if self.memstore is not None: + exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + + if not exist: + exist = self._get_fdoc_from_chash(chash) if exist: return exist.content.get(fields.UID_KEY, "unknown-uid") else: return False - @deferred - def add_msg(self, raw, subject=None, flags=None, date=None, uid=1): + def add_msg(self, raw, subject=None, flags=None, date=None, uid=None, + notify_on_disk=False): """ Creates a new message document. - Here lives the magic of the leap mail. Well, in soledad, really. :param raw: the raw message :type raw: str @@ -1236,27 +869,63 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :param uid: the message uid for this mailbox :type uid: int - """ - # TODO signal that we can delete the original message!----- - # when all the processing is done. - - # TODO add the linked-from info ! + :return: a deferred that will be fired with the message + uid when the adding succeed. + :rtype: deferred + """ logger.debug('adding message') if flags is None: flags = tuple() leap_assert_type(flags, tuple) + d = defer.Deferred() + self._do_add_msg(raw, flags, subject, date, notify_on_disk, d) + return d + + # We SHOULD defer this (or the heavy load here) to the thread pool, + # but it gives troubles with the QSocketNotifier used by Qt... + def _do_add_msg(self, raw, flags, subject, date, notify_on_disk, observer): + """ + Helper that creates a new message document. + Here lives the magic of the leap mail. Well, in soledad, really. + + See `add_msg` docstring for parameter info. + + :param observer: a deferred that will be fired with the message + uid when the adding succeed. + :type observer: deferred + """ + # TODO signal that we can delete the original message!----- + # when all the processing is done. + + # TODO add the linked-from info ! + # TODO add reference to the original message + # parse msg, chash, size, multi = self._do_parse(raw) - # check for uniqueness. - if self._fdoc_already_exists(chash): - logger.warning("We already have that message in this mailbox.") - # note that this operation will leave holes in the UID sequence, - # but we're gonna change that all the same for a local-only table. - # so not touch it by the moment. - return False + # check for uniqueness -------------------------------- + # XXX profiler says that this test is costly. + # So we probably should just do an in-memory check and + # move the complete check to the soledad writer? + # Watch out! We're reserving a UID right after this! + existing_uid = self._fdoc_already_exists(chash) + if existing_uid: + logger.warning("We already have that message in this " + "mailbox, unflagging as deleted") + uid = existing_uid + msg = self.get_msg_by_uid(uid) + msg.setFlags((fields.DELETED_FLAG,), -1) + + # XXX if this is deferred to thread again we should not use + # the callback in the deferred thread, but return and + # call the callback from the caller fun... + observer.callback(uid) + return + + uid = self.memstore.increment_last_soledad_uid(self.mbox) + logger.info("ADDING MSG WITH UID: %s" % uid) fd = self._populate_flags(flags, uid, chash, size, multi) hd = self._populate_headr(msg, chash, subject, date) @@ -1273,48 +942,16 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, hd[key] = parts_map[key] del parts_map - # Saving ---------------------------------------- - self.set_recent_flag(uid) - - # first, regular docs: flags and headers - self._soledad.create_doc(fd) - # XXX should check for content duplication on headers too - # but with chash. !!! - hdoc = self._soledad.create_doc(hd) - # We add the newly created hdoc to the fast-access set of - # headers documents associated with the mailbox. - self.add_hdocset_docid(hdoc.doc_id) - - # and last, but not least, try to create - # content docs if not already there. - cdocs = walk.get_raw_docs(msg, parts) - for cdoc in cdocs: - if not self._content_does_exist(cdoc): - self._soledad.create_doc(cdoc) + hd = stringify_parts_map(hd) - def _remove_cb(self, result): - return result - - def remove_all_deleted(self): - """ - Removes all messages flagged as deleted. - """ - delete_deferl = [] - for msg in self.get_deleted(): - delete_deferl.append(msg.remove()) - d1 = defer.gatherResults(delete_deferl, consumeErrors=True) - d1.addCallback(self._remove_cb) - return d1 + # The MessageContainer expects a dict, one-indexed + cdocs = dict(enumerate(walk.get_raw_docs(msg, parts), 1)) - def remove(self, msg): - """ - Remove a given msg. - :param msg: the message to be removed - :type msg: LeapMessage - """ - d = msg.remove() - d.addCallback(self._remove_cb) - return d + self.set_recent_flag(uid) + msg_container = MessageWrapper(fd, hd, cdocs) + self.memstore.create_message(self.mbox, uid, msg_container, + observer=observer, + notify_on_disk=notify_on_disk) # # getters: specific queries @@ -1326,32 +963,59 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ An accessor for the recent-flags set for this mailbox. """ - if not self.__rflags: + # XXX check if we should remove this + if self.__rflags is not None: + return self.__rflags + + if self.memstore is not None: with self._rdoc_lock: - rdoc = self._get_recent_doc() - self.__rflags = set(rdoc.content.get( - fields.RECENTFLAGS_KEY, [])) - return self.__rflags + rflags = self.memstore.get_recent_flags(self.mbox) + if not rflags: + # not loaded in the memory store yet. + # let's fetch them from soledad... + rdoc = self._get_recent_doc() + rflags = set(rdoc.content.get( + fields.RECENTFLAGS_KEY, [])) + # ...and cache them now. + self.memstore.load_recent_flags( + self.mbox, + {'doc_id': rdoc.doc_id, 'set': rflags}) + return rflags + + #else: + # fallback for cases without memory store + #with self._rdoc_lock: + #rdoc = self._get_recent_doc() + #self.__rflags = set(rdoc.content.get( + #fields.RECENTFLAGS_KEY, [])) + #return self.__rflags def _set_recent_flags(self, value): """ Setter for the recent-flags set for this mailbox. """ - with self._rdoc_lock: - rdoc = self._get_recent_doc() - newv = set(value) - self.__rflags = newv - rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(rdoc) + if self.memstore is not None: + self.memstore.set_recent_flags(self.mbox, value) + + #else: + # fallback for cases without memory store + #with self._rdoc_lock: + #rdoc = self._get_recent_doc() + #newv = set(value) + #self.__rflags = newv + #rdoc.content[fields.RECENTFLAGS_KEY] = list(newv) + # XXX should deferLater 0 it? + #self._soledad.put_doc(rdoc) recent_flags = property( _get_recent_flags, _set_recent_flags, doc="Set of UIDs with the recent flag for this mailbox.") + # XXX change naming, indicate soledad query. def _get_recent_doc(self): """ - Get recent-flags document for this mailbox. + Get recent-flags document from Soledad for this mailbox. + :rtype: SoledadDocument or None """ curried = partial( self._soledad.get_from_index, @@ -1367,100 +1031,39 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, def unset_recent_flags(self, uids): """ Unset Recent flag for a sequence of uids. + + :param uids: the uids to unset + :type uid: sequence """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set(uids)) + # Individual flags operations + def unset_recent_flag(self, uid): """ Unset Recent flag for a given uid. + + :param uid: the uid to unset + :type uid: int """ with self._rdoc_property_lock: - self.recent_flags = self.recent_flags.difference( + self.recent_flags.difference_update( set([uid])) + @deferred_to_thread def set_recent_flag(self, uid): """ Set Recent flag for a given uid. + + :param uid: the uid to set + :type uid: int """ with self._rdoc_property_lock: self.recent_flags = self.recent_flags.union( set([uid])) - # headers-docs-set - - def _get_hdocset(self): - """ - An accessor for the hdocs-set for this mailbox. - """ - if not self.__hdocset: - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - value = set(hdocset_doc.content.get( - fields.HDOCS_SET_KEY, [])) - self.__hdocset = value - return self.__hdocset - - def _set_hdocset(self, value): - """ - Setter for the hdocs-set for this mailbox. - """ - with self._hdocset_lock: - hdocset_doc = self._get_hdocset_doc() - newv = set(value) - self.__hdocset = newv - hdocset_doc.content[fields.HDOCS_SET_KEY] = list(newv) - # XXX should deferLater 0 it? - self._soledad.put_doc(hdocset_doc) - - _hdocset = property( - _get_hdocset, _set_hdocset, - doc="Set of Document-IDs for the headers docs associated " - "with this mailbox.") - - def _get_hdocset_doc(self): - """ - Get hdocs-set document for this mailbox. - """ - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MBOX_IDX, - fields.TYPE_HDOCS_SET_VAL, self.mbox) - curried.expected = "hdocset" - hdocset_doc = try_unique_query(curried) - return hdocset_doc - - # Property-set modification (protected by a different - # lock to give atomicity to the read/write operation) - - def remove_hdocset_docids(self, docids): - """ - Remove the given document IDs from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set(docids)) - - def remove_hdocset_docid(self, docid): - """ - Remove the given document ID from the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.difference( - set([docid])) - - def add_hdocset_docid(self, docid): - """ - Add the given document ID to the set of - header-documents associated with this mailbox. - """ - with self._hdocset_property_lock: - self._hdocset = self._hdocset.union( - set([docid])) - # individual doc getters, message layer. def _get_fdoc_from_chash(self, chash): @@ -1499,7 +1102,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, return None return fdoc.content.get(fields.UID_KEY, None) - @deferred + @deferred_to_thread def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -1515,24 +1118,83 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # and we cannot find it otherwise. This seems to be enough. # XXX do a deferLater instead ?? - time.sleep(0.3) + # XXX is this working? return self._get_uid_from_msgidCb(msgid) + def set_flags(self, mbox, messages, flags, mode, observer): + """ + Set flags for a sequence of messages. + + :param mbox: the mbox this message belongs to + :type mbox: str or unicode + :param messages: the messages to iterate through + :type messages: sequence + :flags: the flags to be set + :type flags: tuple + :param mode: the mode for setting. 1 is append, -1 is remove, 0 set. + :type mode: int + :param observer: a deferred that will be called with the dictionary + mapping UIDs to flags after the operation has been + done. + :type observer: deferred + """ + # XXX we could defer *this* to thread pool, and gather results... + # XXX use deferredList + + deferreds = [] + for msg_id in messages: + deferreds.append( + self._set_flag_for_uid(msg_id, flags, mode)) + + def notify(result): + observer.callback(dict(result)) + d1 = defer.gatherResults(deferreds, consumeErrors=True) + d1.addCallback(notify) + + @deferred_to_thread + def _set_flag_for_uid(self, msg_id, flags, mode): + """ + Run the set_flag operation in the thread pool. + """ + log.msg("MSG ID = %s" % msg_id) + msg = self.get_msg_by_uid(msg_id, mem_only=True, flags_only=True) + if msg is not None: + return msg_id, msg.setFlags(flags, mode) + # getters: generic for a mailbox - def get_msg_by_uid(self, uid): + def get_msg_by_uid(self, uid, mem_only=False, flags_only=False): """ Retrieves a LeapMessage by UID. This is used primarity in the Mailbox fetch and store methods. :param uid: the message uid to query by :type uid: int + :param mem_only: a flag that indicates whether this Message should + pass a reference to soledad to retrieve missing pieces + or not. + :type mem_only: bool + :param flags_only: whether the message should carry only a reference + to the flags document. + :type flags_only: bool :return: A LeapMessage instance matching the query, or None if not found. :rtype: LeapMessage """ - msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) + msg_container = self.memstore.get_message(self.mbox, uid, flags_only) + if msg_container is not None: + if mem_only: + msg = LeapMessage(None, uid, self.mbox, collection=self, + container=msg_container) + else: + # We pass a reference to soledad just to be able to retrieve + # missing parts that cannot be found in the container, like + # the content docs after a copy. + msg = LeapMessage(self._soledad, uid, self.mbox, + collection=self, container=msg_container) + else: + msg = LeapMessage(self._soledad, uid, self.mbox, collection=self) if not msg.does_exist(): return None return msg @@ -1564,40 +1226,50 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # FIXME ---------------------------------------------- return sorted(all_docs, key=lambda item: item.content['uid']) - def all_uid_iter(self): + def all_soledad_uid_iter(self): """ - Return an iterator trhough the UIDs of all messages, sorted in + Return an iterator through the UIDs of all messages, sorted in ascending order. """ - # XXX we should get this from the uid table, local-only - all_uids = (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox)) - return (u for u in sorted(all_uids)) + db_uids = set([doc.content[self.UID_KEY] for doc in + self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_FLAGS_VAL, self.mbox)]) + return db_uids - def reset_last_uid(self, param): + def all_uid_iter(self): """ - Set the last uid to the highest uid found. - Used while expunging, passed as a callback. + Return an iterator through the UIDs of all messages, from memory. """ - try: - self.last_uid = max(self.all_uid_iter()) + 1 - except ValueError: - # empty sequence - pass - return param + if self.memstore is not None: + mem_uids = self.memstore.get_uids(self.mbox) + soledad_known_uids = self.memstore.get_soledad_known_uids( + self.mbox) + combined = tuple(set(mem_uids).union(soledad_known_uids)) + return combined + # XXX MOVE to memstore def all_flags(self): """ Return a dict with all flags documents for this mailbox. """ + # XXX get all from memstore and cache it there + # FIXME should get all uids, get them fro memstore, + # and get only the missing ones from disk. + all_flags = dict((( doc.content[self.UID_KEY], doc.content[self.FLAGS_KEY]) for doc in self._soledad.get_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox))) + if self.memstore is not None: + uids = self.memstore.get_uids(self.mbox) + docs = ((uid, self.memstore.get_message(self.mbox, uid)) + for uid in uids) + for uid, doc in docs: + all_flags[uid] = doc.fdoc.content[self.FLAGS_KEY] + return all_flags def all_flags_chash(self): @@ -1630,9 +1302,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, :rtype: int """ + # XXX We should cache this in memstore too until next write... count = self._soledad.get_count_from_index( fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) + if self.memstore is not None: + count += self.memstore.count_new() return count # unseen messages @@ -1674,6 +1349,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, # recent messages + # XXX take it from memstore def count_recent(self): """ Count all messages with the `Recent` flag. @@ -1686,30 +1362,6 @@ class MessageCollection(WithMsgFields, IndexedDB, MailParser, MBoxParser, """ return len(self.recent_flags) - # deleted messages - - def deleted_iter(self): - """ - Get an iterator for the message UIDs with `deleted` flag. - - :return: iterator through deleted message docs - :rtype: iterable - """ - return (doc.content[self.UID_KEY] for doc in - self._soledad.get_from_index( - fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, '1')) - - def get_deleted(self): - """ - Get all messages with the `Deleted` flag. - - :returns: a generator of LeapMessages - :rtype: generator - """ - return (LeapMessage(self._soledad, docid, self.mbox) - for docid in self.deleted_iter()) - def __len__(self): """ Returns the number of messages on this mailbox. diff --git a/mail/src/leap/mail/imap/server.py b/mail/src/leap/mail/imap/server.py new file mode 100644 index 0000000..ba63846 --- /dev/null +++ b/mail/src/leap/mail/imap/server.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +# server.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Leap IMAP4 Server Implementation. +""" +from copy import copy + +from twisted import cred +from twisted.internet import defer +from twisted.internet.defer import maybeDeferred +from twisted.internet.task import deferLater +from twisted.mail import imap4 +from twisted.python import log + +from leap.common import events as leap_events +from leap.common.check import leap_assert, leap_assert_type +from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN +from leap.soledad.client import Soledad + + +class LeapIMAPServer(imap4.IMAP4Server): + """ + An IMAP4 Server with mailboxes backed by soledad + """ + def __init__(self, *args, **kwargs): + # pop extraneous arguments + soledad = kwargs.pop('soledad', None) + uuid = kwargs.pop('uuid', None) + userid = kwargs.pop('userid', None) + leap_assert(soledad, "need a soledad instance") + leap_assert_type(soledad, Soledad) + leap_assert(uuid, "need a user in the initialization") + + self._userid = userid + + # initialize imap server! + imap4.IMAP4Server.__init__(self, *args, **kwargs) + + # we should initialize the account here, + # but we move it to the factory so we can + # populate the test account properly (and only once + # per session) + + def lineReceived(self, line): + """ + Attempt to parse a single line from the server. + + :param line: the line from the server, without the line delimiter. + :type line: str + """ + if self.theAccount.closed is True and self.state != "unauth": + log.msg("Closing the session. State: unauth") + self.state = "unauth" + + if "login" in line.lower(): + # avoid to log the pass, even though we are using a dummy auth + # by now. + msg = line[:7] + " [...]" + else: + msg = copy(line) + log.msg('rcv (%s): %s' % (self.state, msg)) + imap4.IMAP4Server.lineReceived(self, line) + + def authenticateLogin(self, username, password): + """ + Lookup the account with the given parameters, and deny + the improper combinations. + + :param username: the username that is attempting authentication. + :type username: str + :param password: the password to authenticate with. + :type password: str + """ + # XXX this should use portal: + # return portal.login(cred.credentials.UsernamePassword(user, pass) + if username != self._userid: + # bad username, reject. + raise cred.error.UnauthorizedLogin() + # any dummy password is allowed so far. use realm instead! + leap_events.signal(IMAP_CLIENT_LOGIN, "1") + return imap4.IAccount, self.theAccount, lambda: None + + def do_FETCH(self, tag, messages, query, uid=0): + """ + Overwritten fetch dispatcher to use the fast fetch_flags + method + """ + if not query: + self.sendPositiveResponse(tag, 'FETCH complete') + return + + cbFetch = self._IMAP4Server__cbFetch + ebFetch = self._IMAP4Server__ebFetch + + if len(query) == 1 and str(query[0]) == "flags": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_flags, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + elif len(query) == 1 and str(query[0]) == "rfc822.header": + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch_headers, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback(ebFetch, tag) + else: + self._oldTimeout = self.setTimeout(None) + # no need to call iter, we get a generator + maybeDeferred( + self.mbox.fetch, messages, uid=uid + ).addCallback( + cbFetch, tag, query, uid + ).addErrback( + ebFetch, tag + ).addCallback( + self.on_fetch_finished, messages) + + select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_fetchatt) + + def on_fetch_finished(self, _, messages): + from twisted.internet import reactor + + print "FETCH FINISHED -- NOTIFY NEW" + deferLater(reactor, 0, self.notifyNew) + deferLater(reactor, 0, self.mbox.unset_recent_flags, messages) + deferLater(reactor, 0, self.mbox.signal_unread_to_ui) + + def on_copy_finished(self, defers): + d = defer.gatherResults(filter(None, defers)) + + def when_finished(result): + log.msg("COPY FINISHED") + self.notifyNew() + self.mbox.signal_unread_to_ui() + d.addCallback(when_finished) + #d.addCallback(self.notifyNew) + #d.addCallback(self.mbox.signal_unread_to_ui) + + def do_COPY(self, tag, messages, mailbox, uid=0): + from twisted.internet import reactor + defers = [] + d = imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) + defers.append(d) + deferLater(reactor, 0, self.on_copy_finished, defers) + + select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, + imap4.IMAP4Server.arg_astring) + + def notifyNew(self, ignored=None): + """ + Notify new messages to listeners. + """ + print "TRYING TO NOTIFY NEW" + self.mbox.notify_new() + + def _cbSelectWork(self, mbox, cmdName, tag): + """ + Callback for selectWork, patched to avoid conformance errors due to + incomplete UIDVALIDITY line. + """ + if mbox is None: + self.sendNegativeResponse(tag, 'No such mailbox') + return + if '\\noselect' in [s.lower() for s in mbox.getFlags()]: + self.sendNegativeResponse(tag, 'Mailbox cannot be selected') + return + + flags = mbox.getFlags() + self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS') + self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT') + self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags)) + + # Patched ------------------------------------------------------- + # imaptest was complaining about the incomplete line, we're adding + # "UIDs valid" here. + self.sendPositiveResponse( + None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity()) + # ---------------------------------------------------------------- + + s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY' + mbox.addListener(self) + self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName)) + self.state = 'select' + self.mbox = mbox + + def checkpoint(self): + """ + Called when the client issues a CHECK command. + + This should perform any checkpoint operations required by the server. + It may be a long running operation, but may not block. If it returns + a deferred, the client will only be informed of success (or failure) + when the deferred's callback (or errback) is invoked. + """ + # TODO return the output of _memstore.is_writing + # XXX and that should return a deferred! + return None diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index ad22da6..5487cfc 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -17,17 +17,12 @@ """ Imap service initialization """ -from copy import copy - import logging +import os from twisted.internet.protocol import ServerFactory -from twisted.internet.defer import maybeDeferred from twisted.internet.error import CannotListenError -from twisted.internet.task import deferLater from twisted.mail import imap4 -from twisted.python import log -from twisted import cred logger = logging.getLogger(__name__) @@ -36,6 +31,9 @@ from leap.common.check import leap_assert, leap_assert_type, leap_check from leap.keymanager import KeyManager from leap.mail.imap.account import SoledadBackedAccount from leap.mail.imap.fetch import LeapIncomingMail +from leap.mail.imap.memorystore import MemoryStore +from leap.mail.imap.server import LeapIMAPServer +from leap.mail.imap.soledadstore import SoledadStore from leap.soledad.client import Soledad # The default port in which imap service will run @@ -47,7 +45,6 @@ INCOMING_CHECK_PERIOD = 60 from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED from leap.common.events.events_pb2 import IMAP_SERVICE_FAILED_TO_START -from leap.common.events.events_pb2 import IMAP_CLIENT_LOGIN ###################################################### # Temporary workaround for RecursionLimit when using @@ -68,160 +65,9 @@ except Exception: ###################################################### - -class LeapIMAPServer(imap4.IMAP4Server): - """ - An IMAP4 Server with mailboxes backed by soledad - """ - def __init__(self, *args, **kwargs): - # pop extraneous arguments - soledad = kwargs.pop('soledad', None) - uuid = kwargs.pop('uuid', None) - userid = kwargs.pop('userid', None) - leap_assert(soledad, "need a soledad instance") - leap_assert_type(soledad, Soledad) - leap_assert(uuid, "need a user in the initialization") - - self._userid = userid - - # initialize imap server! - imap4.IMAP4Server.__init__(self, *args, **kwargs) - - # we should initialize the account here, - # but we move it to the factory so we can - # populate the test account properly (and only once - # per session) - - def lineReceived(self, line): - """ - Attempt to parse a single line from the server. - - :param line: the line from the server, without the line delimiter. - :type line: str - """ - if self.theAccount.closed is True and self.state != "unauth": - log.msg("Closing the session. State: unauth") - self.state = "unauth" - - if "login" in line.lower(): - # avoid to log the pass, even though we are using a dummy auth - # by now. - msg = line[:7] + " [...]" - else: - msg = copy(line) - log.msg('rcv (%s): %s' % (self.state, msg)) - imap4.IMAP4Server.lineReceived(self, line) - - def authenticateLogin(self, username, password): - """ - Lookup the account with the given parameters, and deny - the improper combinations. - - :param username: the username that is attempting authentication. - :type username: str - :param password: the password to authenticate with. - :type password: str - """ - # XXX this should use portal: - # return portal.login(cred.credentials.UsernamePassword(user, pass) - if username != self._userid: - # bad username, reject. - raise cred.error.UnauthorizedLogin() - # any dummy password is allowed so far. use realm instead! - leap_events.signal(IMAP_CLIENT_LOGIN, "1") - return imap4.IAccount, self.theAccount, lambda: None - - def do_FETCH(self, tag, messages, query, uid=0): - """ - Overwritten fetch dispatcher to use the fast fetch_flags - method - """ - from twisted.internet import reactor - if not query: - self.sendPositiveResponse(tag, 'FETCH complete') - return # XXX ??? - - cbFetch = self._IMAP4Server__cbFetch - ebFetch = self._IMAP4Server__ebFetch - - if len(query) == 1 and str(query[0]) == "flags": - self._oldTimeout = self.setTimeout(None) - # no need to call iter, we get a generator - maybeDeferred( - self.mbox.fetch_flags, messages, uid=uid - ).addCallback( - cbFetch, tag, query, uid - ).addErrback(ebFetch, tag) - elif len(query) == 1 and str(query[0]) == "rfc822.header": - self._oldTimeout = self.setTimeout(None) - # no need to call iter, we get a generator - maybeDeferred( - self.mbox.fetch_headers, messages, uid=uid - ).addCallback( - cbFetch, tag, query, uid - ).addErrback(ebFetch, tag) - else: - self._oldTimeout = self.setTimeout(None) - # no need to call iter, we get a generator - maybeDeferred( - self.mbox.fetch, messages, uid=uid - ).addCallback( - cbFetch, tag, query, uid - ).addErrback( - ebFetch, tag) - - deferLater(reactor, - 2, self.mbox.unset_recent_flags, messages) - deferLater(reactor, 1, self.mbox.signal_unread_to_ui) - - select_FETCH = (do_FETCH, imap4.IMAP4Server.arg_seqset, - imap4.IMAP4Server.arg_fetchatt) - - def do_COPY(self, tag, messages, mailbox, uid=0): - from twisted.internet import reactor - imap4.IMAP4Server.do_COPY(self, tag, messages, mailbox, uid) - deferLater(reactor, - 2, self.mbox.unset_recent_flags, messages) - deferLater(reactor, 1, self.mbox.signal_unread_to_ui) - - select_COPY = (do_COPY, imap4.IMAP4Server.arg_seqset, - imap4.IMAP4Server.arg_astring) - - def notifyNew(self, ignored): - """ - Notify new messages to listeners. - """ - self.mbox.notify_new() - - def _cbSelectWork(self, mbox, cmdName, tag): - """ - Callback for selectWork, patched to avoid conformance errors due to - incomplete UIDVALIDITY line. - """ - if mbox is None: - self.sendNegativeResponse(tag, 'No such mailbox') - return - if '\\noselect' in [s.lower() for s in mbox.getFlags()]: - self.sendNegativeResponse(tag, 'Mailbox cannot be selected') - return - - flags = mbox.getFlags() - self.sendUntaggedResponse(str(mbox.getMessageCount()) + ' EXISTS') - self.sendUntaggedResponse(str(mbox.getRecentCount()) + ' RECENT') - self.sendUntaggedResponse('FLAGS (%s)' % ' '.join(flags)) - - # Patched ------------------------------------------------------- - # imaptest was complaining about the incomplete line, we're adding - # "UIDs valid" here. - self.sendPositiveResponse( - None, '[UIDVALIDITY %d] UIDs valid' % mbox.getUIDValidity()) - # ---------------------------------------------------------------- - - s = mbox.isWriteable() and 'READ-WRITE' or 'READ-ONLY' - mbox.addListener(self) - self.sendPositiveResponse(tag, '[%s] %s successful' % (s, cmdName)) - self.state = 'select' - self.mbox = mbox +DO_MANHOLE = os.environ.get("LEAP_MAIL_MANHOLE", None) +if DO_MANHOLE: + from leap.mail.imap.service import manhole class IMAPAuthRealm(object): @@ -256,11 +102,16 @@ class LeapIMAPFactory(ServerFactory): self._uuid = uuid self._userid = userid self._soledad = soledad + self._memstore = MemoryStore( + permanent_store=SoledadStore(soledad)) theAccount = SoledadBackedAccount( - uuid, soledad=soledad) + uuid, soledad=soledad, + memstore=self._memstore) self.theAccount = theAccount + # XXX how to pass the store along? + def buildProtocol(self, addr): "Return a protocol suitable for the job." imapProtocol = LeapIMAPServer( @@ -281,6 +132,8 @@ def run_service(*args, **kwargs): the reactor when starts listening, and the factory for the protocol. """ + from twisted.internet import reactor + leap_assert(len(args) == 2) soledad, keymanager = args leap_assert_type(soledad, Soledad) @@ -295,8 +148,6 @@ def run_service(*args, **kwargs): uuid = soledad._get_uuid() factory = LeapIMAPFactory(uuid, userid, soledad) - from twisted.internet import reactor - try: tport = reactor.listenTCP(port, factory, interface="localhost") @@ -317,6 +168,16 @@ def run_service(*args, **kwargs): else: # all good. # (the caller has still to call fetcher.start_loop) + + if DO_MANHOLE: + # TODO get pass from env var.too. + manhole_factory = manhole.getManholeFactory( + {'f': factory, + 'a': factory.theAccount, + 'gm': factory.theAccount.getMailbox}, + "boss", "leap") + reactor.listenTCP(manhole.MANHOLE_PORT, manhole_factory, + interface="127.0.0.1") logger.debug("IMAP4 Server is RUNNING in port %s" % (port,)) leap_events.signal(IMAP_SERVICE_STARTED, str(port)) return fetcher, tport, factory diff --git a/mail/src/leap/mail/imap/service/manhole.py b/mail/src/leap/mail/imap/service/manhole.py new file mode 100644 index 0000000..c83ae89 --- /dev/null +++ b/mail/src/leap/mail/imap/service/manhole.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# manhole.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Utilities for enabling the manhole administrative interface into the +LEAP Mail application. +""" +MANHOLE_PORT = 2222 + + +def getManholeFactory(namespace, user, secret): + """ + Get an administrative manhole into the application. + + :param namespace: the namespace to show in the manhole + :type namespace: dict + :param user: the user to authenticate into the administrative shell. + :type user: str + :param secret: pass for this manhole + :type secret: str + """ + import string + + from twisted.cred.portal import Portal + from twisted.conch import manhole, manhole_ssh + from twisted.conch.insults import insults + from twisted.cred.checkers import ( + InMemoryUsernamePasswordDatabaseDontUse as MemoryDB) + + from rlcompleter import Completer + + class EnhancedColoredManhole(manhole.ColoredManhole): + """ + A Manhole with some primitive autocomplete support. + """ + # TODO use introspection to make life easier + + def find_common(self, l): + """ + find common parts in thelist items + ex: 'ab' for ['abcd','abce','abf'] + requires an ordered list + """ + if len(l) == 1: + return l[0] + + init = l[0] + for item in l[1:]: + for i, (x, y) in enumerate(zip(init, item)): + if x != y: + init = "".join(init[:i]) + break + + if not init: + return None + return init + + def handle_TAB(self): + """ + Trap the TAB keystroke. + """ + necessarypart = "".join(self.lineBuffer).split(' ')[-1] + completer = Completer(globals()) + if completer.complete(necessarypart, 0): + matches = list(set(completer.matches)) # has multiples + + if len(matches) == 1: + length = len(necessarypart) + self.lineBuffer = self.lineBuffer[:-length] + self.lineBuffer.extend(matches[0]) + self.lineBufferIndex = len(self.lineBuffer) + else: + matches.sort() + commons = self.find_common(matches) + if commons: + length = len(necessarypart) + self.lineBuffer = self.lineBuffer[:-length] + self.lineBuffer.extend(commons) + self.lineBufferIndex = len(self.lineBuffer) + + self.terminal.nextLine() + while matches: + matches, part = matches[4:], matches[:4] + for item in part: + self.terminal.write('%s' % item.ljust(30)) + self.terminal.write('\n') + self.terminal.nextLine() + + self.terminal.eraseLine() + self.terminal.cursorBackward(self.lineBufferIndex + 5) + self.terminal.write("%s %s" % ( + self.ps[self.pn], "".join(self.lineBuffer))) + + def keystrokeReceived(self, keyID, modifier): + """ + Act upon any keystroke received. + """ + self.keyHandlers.update({'\b': self.handle_BACKSPACE}) + m = self.keyHandlers.get(keyID) + if m is not None: + m() + elif keyID in string.printable: + self.characterReceived(keyID, False) + + sshRealm = manhole_ssh.TerminalRealm() + + def chainedProtocolFactory(): + return insults.ServerProtocol(EnhancedColoredManhole, namespace) + + sshRealm = manhole_ssh.TerminalRealm() + sshRealm.chainedProtocolFactory = chainedProtocolFactory + + portal = Portal( + sshRealm, [MemoryDB(**{user: secret})]) + + f = manhole_ssh.ConchFactory(portal) + return f diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py new file mode 100644 index 0000000..8e22f26 --- /dev/null +++ b/mail/src/leap/mail/imap/soledadstore.py @@ -0,0 +1,487 @@ +# -*- coding: utf-8 -*- +# soledadstore.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A MessageStore that writes to Soledad. +""" +import logging +import threading + +from itertools import chain + +from u1db import errors as u1db_errors +from twisted.internet import defer +from twisted.python import log +from zope.interface import implements + +from leap.common.check import leap_assert_type +from leap.mail.decorators import deferred_to_thread +from leap.mail.imap.messageparts import MessagePartType +from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.imap.messageparts import RecentFlagsDoc +from leap.mail.imap.fields import fields +from leap.mail.imap.interfaces import IMessageStore +from leap.mail.messageflow import IMessageConsumer +from leap.mail.utils import first + +logger = logging.getLogger(__name__) + + +# TODO +# [ ] Delete original message from the incoming queue after all successful +# writes. +# [ ] Implement a retry queue. +# [ ] Consider journaling of operations. + + +class ContentDedup(object): + """ + Message deduplication. + + We do a query for the content hashes before writing to our beloved + sqlcipher backend of Soledad. This means, by now, that: + + 1. We will not store the same body/attachment twice, only the hash of it. + 2. We will not store the same message header twice, only the hash of it. + + The first case is useful if you are always receiving the same old memes + from unwary friends that still have not discovered that 4chan is the + generator of the internet. The second will save your day if you have + initiated session with the same account in two different machines. I also + wonder why would you do that, but let's respect each other choices, like + with the religious celebrations, and assume that one day we'll be able + to run Bitmask in completely free phones. Yes, I mean that, the whole GSM + Stack. + """ + # TODO refactor using unique_query + + def _header_does_exist(self, doc): + """ + Check whether we already have a header document for this + content hash in our database. + + :param doc: tentative header for document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + chash = doc[fields.CONTENT_HASH_KEY] + header_docs = self._soledad.get_from_index( + fields.TYPE_C_HASH_IDX, + fields.TYPE_HEADERS_VAL, str(chash)) + if not header_docs: + return False + + if len(header_docs) != 1: + logger.warning("Found more than one copy of chash %s!" + % (chash,)) + logger.debug("Found header doc with that hash! Skipping save!") + return True + + def _content_does_exist(self, doc): + """ + Check whether we already have a content document for a payload + with this hash in our database. + + :param doc: tentative content for document + :type doc: dict + :returns: True if it exists, False otherwise. + """ + if not doc: + return False + phash = doc[fields.PAYLOAD_HASH_KEY] + attach_docs = self._soledad.get_from_index( + fields.TYPE_P_HASH_IDX, + fields.TYPE_CONTENT_VAL, str(phash)) + if not attach_docs: + return False + + if len(attach_docs) != 1: + logger.warning("Found more than one copy of phash %s!" + % (phash,)) + logger.debug("Found attachment doc with that hash! Skipping save!") + return True + + +class MsgWriteError(Exception): + """ + Raised if any exception is found while saving message parts. + """ + + +class SoledadStore(ContentDedup): + """ + This will create docs in the local Soledad database. + """ + _last_uid_lock = threading.Lock() + + implements(IMessageConsumer, IMessageStore) + + def __init__(self, soledad): + """ + Initialize the permanent store that writes to Soledad database. + + :param soledad: the soledad instance + :type soledad: Soledad + """ + self._soledad = soledad + + # IMessageStore + + # ------------------------------------------------------------------- + # We are not yet using this interface, but it would make sense + # to implement it. + + def create_message(self, mbox, uid, message): + """ + Create the passed message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + :param message: a IMessageContainer implementor. + """ + raise NotImplementedError() + + def put_message(self, mbox, uid, message): + """ + Put the passed existing message into this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + :param message: a IMessageContainer implementor. + """ + raise NotImplementedError() + + def remove_message(self, mbox, uid): + """ + Remove the given message from this SoledadStore. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + """ + raise NotImplementedError() + + def get_message(self, mbox, uid): + """ + Get a IMessageContainer for the given mbox and uid combination. + + :param mbox: the mbox this message belongs. + :type mbox: str or unicode + :param uid: the UID that identifies this message in this mailbox. + :type uid: int + """ + raise NotImplementedError() + + # IMessageConsumer + + # It's not thread-safe to defer this to a different thread + + def consume(self, queue): + """ + Creates a new document in soledad db. + + :param queue: queue to get item from, with content of the document + to be inserted. + :type queue: Queue + """ + # TODO should delete the original message from incoming only after + # the writes are done. + # TODO should handle the delete case + # TODO should handle errors + # TODO could generalize this method into a generic consumer + # and only implement `process` here + + def docWriteCallBack(doc_wrapper): + """ + Callback for a successful write of a document wrapper. + """ + if isinstance(doc_wrapper, MessageWrapper): + # If everything went well, we can unset the new flag + # in the source store (memory store) + self._unset_new_dirty(doc_wrapper) + + def docWriteErrorBack(failure): + """ + Errorback for write operations. + """ + log.error("Error while processing item.") + log.msg(failure.getTraceBack()) + + while not queue.empty(): + doc_wrapper = queue.get() + d = defer.Deferred() + d.addCallbacks(docWriteCallBack, docWriteErrorBack) + + self._consume_doc(doc_wrapper, d) + + @deferred_to_thread + def _unset_new_dirty(self, doc_wrapper): + """ + Unset the `new` and `dirty` flags for this document wrapper in the + memory store. + + :param doc_wrapper: a MessageWrapper instance + :type doc_wrapper: MessageWrapper + """ + # XXX debug msg id/mbox? + logger.info("unsetting new flag!") + doc_wrapper.new = False + doc_wrapper.dirty = False + + @deferred_to_thread + def _consume_doc(self, doc_wrapper, deferred): + """ + Consume each document wrapper in a separate thread. + + :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance + :type doc_wrapper: MessageWrapper or RecentFlagsDoc + :param deferred: a deferred that will be fired when the write operation + has finished, either calling its callback or its + errback depending on whether it succeed. + :type deferred: Deferred + """ + items = self._process(doc_wrapper) + + # we prime the generator, that should return the + # message or flags wrapper item in the first place. + doc_wrapper = items.next() + + # From here, we unpack the subpart items and + # the right soledad call. + failed = False + for item, call in items: + try: + self._try_call(call, item) + except Exception as exc: + failed = exc + continue + if failed: + deferred.errback(MsgWriteError( + "There was an error writing the mesage")) + else: + deferred.callback(doc_wrapper) + + # + # SoledadStore specific methods. + # + + def _process(self, doc_wrapper): + """ + Return an iterator that will yield the doc_wrapper in the first place, + followed by the subparts item and the proper call type for every + item in the queue, if any. + + :param queue: the queue from where we'll pick item. + :type queue: Queue + """ + if isinstance(doc_wrapper, MessageWrapper): + return chain((doc_wrapper,), + self._get_calls_for_msg_parts(doc_wrapper)) + elif isinstance(doc_wrapper, RecentFlagsDoc): + return chain((doc_wrapper,), + self._get_calls_for_rflags_doc(doc_wrapper)) + else: + logger.warning("CANNOT PROCESS ITEM!") + return (i for i in []) + + def _try_call(self, call, item): + """ + Try to invoke a given call with item as a parameter. + + :param call: the function to call + :type call: callable + :param item: the payload to pass to the call as argument + :type item: object + """ + if call is None: + return + try: + call(item) + except u1db_errors.RevisionConflict as exc: + logger.exception("Error: %r" % (exc,)) + raise exc + + def _get_calls_for_msg_parts(self, msg_wrapper): + """ + Generator that return the proper call type for a given item. + + :param msg_wrapper: A MessageWrapper + :type msg_wrapper: IMessageContainer + :return: a generator of tuples with recent-flags doc payload + and callable + :rtype: generator + """ + call = None + + if msg_wrapper.new: + call = self._soledad.create_doc + + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + if item.part == MessagePartType.fdoc: + yield dict(item.content), call + + elif item.part == MessagePartType.hdoc: + if not self._header_does_exist(item.content): + yield dict(item.content), call + + elif item.part == MessagePartType.cdoc: + if not self._content_does_exist(item.content): + yield dict(item.content), call + + # For now, the only thing that will be dirty is + # the flags doc. + + elif msg_wrapper.dirty: + call = self._soledad.put_doc + # item is expected to be a MessagePartDoc + for item in msg_wrapper.walk(): + # XXX FIXME Give error if dirty and not doc_id !!! + doc_id = item.doc_id # defend! + if not doc_id: + continue + doc = self._soledad.get_doc(doc_id) + doc.content = dict(item.content) + if item.part == MessagePartType.fdoc: + logger.debug("PUT dirty fdoc") + yield doc, call + + # XXX also for linkage-doc !!! + else: + logger.error("Cannot delete documents yet from the queue...!") + + def _get_calls_for_rflags_doc(self, rflags_wrapper): + """ + We always put these documents. + + :param rflags_wrapper: A wrapper around recent flags doc. + :type rflags_wrapper: RecentFlagsWrapper + :return: a tuple with recent-flags doc payload and callable + :rtype: tuple + """ + call = self._soledad.put_doc + rdoc = self._soledad.get_doc(rflags_wrapper.doc_id) + + payload = rflags_wrapper.content + logger.debug("Saving RFLAGS to Soledad...") + + if payload: + rdoc.content = payload + yield rdoc, call + + def _get_mbox_document(self, mbox): + """ + Return mailbox document. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: A SoledadDocument containing this mailbox, or None if + the query failed. + :rtype: SoledadDocument or None. + """ + try: + query = self._soledad.get_from_index( + fields.TYPE_MBOX_IDX, + fields.TYPE_MBOX_VAL, mbox) + if query: + return query.pop() + except Exception as exc: + logger.exception("Unhandled error %r" % exc) + + def get_flags_doc(self, mbox, uid): + """ + Return the SoledadDocument for the given mbox and uid. + + :param mbox: the mailbox + :type mbox: str or unicode + :param uid: the UID for the message + :type uid: int + """ + result = None + try: + flag_docs = self._soledad.get_from_index( + fields.TYPE_MBOX_UID_IDX, + fields.TYPE_FLAGS_VAL, mbox, str(uid)) + result = first(flag_docs) + except Exception as exc: + # ugh! Something's broken down there! + logger.warning("ERROR while getting flags for UID: %s" % uid) + logger.exception(exc) + finally: + return result + + def write_last_uid(self, mbox, value): + """ + Write the `last_uid` integer to the proper mailbox document + in Soledad. + This is called from the deferred triggered by + memorystore.increment_last_soledad_uid, which is expected to + run in a separate thread. + + :param mbox: the mailbox + :type mbox: str or unicode + :param value: the value to set + :type value: int + """ + leap_assert_type(value, int) + key = fields.LAST_UID_KEY + + with self._last_uid_lock: + mbox_doc = self._get_mbox_document(mbox) + old_val = mbox_doc.content[key] + if value < old_val: + logger.error("%r:%s Tried to write a UID lesser than what's " + "stored!" % (mbox, value)) + mbox_doc.content[key] = value + self._soledad.put_doc(mbox_doc) + + # deleted messages + + def deleted_iter(self, mbox): + """ + Get an iterator for the SoledadDocuments for messages + with \\Deleted flag for a given mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + :return: iterator through deleted message docs + :rtype: iterable + """ + return (doc for doc in self._soledad.get_from_index( + fields.TYPE_MBOX_DEL_IDX, + fields.TYPE_FLAGS_VAL, mbox, '1')) + + # TODO can deferToThread this? + def remove_all_deleted(self, mbox): + """ + Remove from Soledad all messages flagged as deleted for a given + mailbox. + + :param mbox: the mailbox + :type mbox: str or unicode + """ + deleted = [] + for doc in self.deleted_iter(mbox): + deleted.append(doc.content[fields.UID_KEY]) + self._soledad.delete_doc(doc) + return deleted diff --git a/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh b/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh index 676d1a8..544faca 100755 --- a/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh +++ b/mail/src/leap/mail/imap/tests/leap_tests_imap.zsh @@ -76,7 +76,7 @@ imaptest_cmd() { } stress_imap() { - mknod imap_pipe p + mkfifo imap_pipe cat imap_pipe | tee output & imaptest_cmd >> imap_pipe } @@ -99,7 +99,7 @@ print_results() { echo "----------------------" echo "\tavg\tstdev" $GREP "avg" ./output | sed -e 's/^ *//g' -e 's/ *$//g' | \ - awk ' + gawk ' function avg(data, count) { sum=0; for( x=0; x <= count-1; x++) { diff --git a/mail/src/leap/mail/imap/tests/walktree.py b/mail/src/leap/mail/imap/tests/walktree.py index 1626f65..f3cbcb0 100644 --- a/mail/src/leap/mail/imap/tests/walktree.py +++ b/mail/src/leap/mail/imap/tests/walktree.py @@ -18,12 +18,14 @@ Tests for the walktree module. """ import os +import sys from email import parser from leap.mail import walk as W DEBUG = os.environ.get("BITMASK_MAIL_DEBUG") + p = parser.Parser() # TODO pass an argument of the type of message @@ -31,9 +33,17 @@ p = parser.Parser() ################################################## # Input from hell -#msg = p.parse(open('rfc822.multi-signed.message')) -#msg = p.parse(open('rfc822.plain.message')) -msg = p.parse(open('rfc822.multi-minimal.message')) +if len(sys.argv) > 1: + FILENAME = sys.argv[1] +else: + FILENAME = "rfc822.multi-minimal.message" + +""" +FILENAME = "rfc822.multi-signed.message" +FILENAME = "rfc822.plain.message" +""" + +msg = p.parse(open(FILENAME)) DO_CHECK = False ################################################# diff --git a/mail/src/leap/mail/load_tests.py b/mail/src/leap/mail/load_tests.py index ee89fcc..be65b8d 100644 --- a/mail/src/leap/mail/load_tests.py +++ b/mail/src/leap/mail/load_tests.py @@ -14,12 +14,9 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Provide a function for loading tests. """ - import unittest diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py index ac26e45..b7fc030 100644 --- a/mail/src/leap/mail/messageflow.py +++ b/mail/src/leap/mail/messageflow.py @@ -25,12 +25,15 @@ from zope.interface import Interface, implements class IMessageConsumer(Interface): + """ + I consume messages from a queue. + """ def consume(self, queue): """ Consumes the passed item. - :param item: q queue where we put the object to be consumed. + :param item: a queue where we put the object to be consumed. :type item: object """ # TODO we could add an optional type to be passed @@ -40,6 +43,28 @@ class IMessageConsumer(Interface): # the queue, maybe wrapped in an object with a retries attribute. +class IMessageProducer(Interface): + """ + I produce messages and put them in a store to be consumed by other + entities. + """ + + def push(self, item): + """ + Push a new item in the queue. + """ + + def start(self): + """ + Start producing items. + """ + + def stop(self): + """ + Stop producing items. + """ + + class DummyMsgConsumer(object): implements(IMessageConsumer) @@ -62,6 +87,8 @@ class MessageProducer(object): deferred chain and leave further processing detached from the calling loop, as in the case of smtp. """ + implements(IMessageProducer) + # TODO this can be seen as a first step towards properly implementing # components that implement IPushProducer / IConsumer interfaces. # However, I need to think more about how to pause the streaming. @@ -92,21 +119,27 @@ class MessageProducer(object): def _check_for_new(self): """ - Checks for new items in the internal queue, and calls the consume + Check for new items in the internal queue, and calls the consume method in the consumer. If the queue is found empty, the loop is stopped. It will be started again after the addition of new items. """ self._consumer.consume(self._queue) - if self._queue.empty(): + if self.is_queue_empty(): self.stop() - # public methods + def is_queue_empty(self): + """ + Return True if queue is empty, False otherwise. + """ + return self._queue.empty() + + # public methods: IMessageProducer - def put(self, item): + def push(self, item): """ - Puts a new item in the queue. + Push a new item in the queue. If the queue was empty, we will start the loop again. """ @@ -117,7 +150,7 @@ class MessageProducer(object): def start(self): """ - Starts polling for new items. + Start polling for new items. """ if not self._loop.running: self._loop.start(self._period, now=True) diff --git a/mail/src/leap/mail/size.py b/mail/src/leap/mail/size.py new file mode 100644 index 0000000..c9eaabd --- /dev/null +++ b/mail/src/leap/mail/size.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# size.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Recursively get size of objects. +""" +from gc import collect +from itertools import chain +from sys import getsizeof + + +def _get_size(item, seen): + known_types = {dict: lambda d: chain.from_iterable(d.items())} + default_size = getsizeof(0) + + def size_walk(item): + if id(item) in seen: + return 0 + seen.add(id(item)) + s = getsizeof(item, default_size) + for _type, fun in known_types.iteritems(): + if isinstance(item, _type): + s += sum(map(size_walk, fun(item))) + break + return s + + return size_walk(item) + + +def get_size(item): + """ + Return the cumulative size of a given object. + + Currently it supports only dictionaries, and seemingly leaks + some memory, so use with care. + + :param item: the item which size wants to be computed + :rtype: int + """ + seen = set() + size = _get_size(item, seen) + del seen + collect() + return size diff --git a/mail/src/leap/mail/utils.py b/mail/src/leap/mail/utils.py index 6c79227..942acfb 100644 --- a/mail/src/leap/mail/utils.py +++ b/mail/src/leap/mail/utils.py @@ -17,10 +17,13 @@ """ Mail utilities. """ +import copy import json import re import traceback +from leap.soledad.common.document import SoledadDocument + CHARSET_PATTERN = r"""charset=([\w-]+)""" CHARSET_RE = re.compile(CHARSET_PATTERN, re.IGNORECASE) @@ -36,6 +39,28 @@ def first(things): return None +def empty(thing): + """ + Return True if a thing is None or its length is zero. + """ + if thing is None: + return True + if isinstance(thing, SoledadDocument): + thing = thing.content + try: + return len(thing) == 0 + except ReferenceError: + return True + + +def maybe_call(thing): + """ + Return the same thing, or the result of its invocation if it is a + callable. + """ + return thing() if callable(thing) else thing + + def find_charset(thing, default=None): """ Looks into the object 'thing' for a charset specification. @@ -46,16 +71,65 @@ def find_charset(thing, default=None): :param default: the dafault charset to return if no charset is found. :type default: str - :returns: the charset or 'default' + :return: the charset or 'default' :rtype: str or None """ charset = first(CHARSET_RE.findall(repr(thing))) if charset is None: charset = default - return charset +def lowerdict(_dict): + """ + Return a dict with the keys in lowercase. + + :param _dict: the dict to convert + :rtype: dict + """ + # TODO should properly implement a CaseInsensitive dict. + # Look into requests code. + return dict((key.lower(), value) + for key, value in _dict.items()) + + +PART_MAP = "part_map" + + +def _str_dict(d, k): + """ + Convert the dictionary key to string if it was a string. + + :param d: the dict + :type d: dict + :param k: the key + :type k: object + """ + if isinstance(k, int): + val = d[k] + d[str(k)] = val + del(d[k]) + + +def stringify_parts_map(d): + """ + Modify a dictionary making all the nested dicts under "part_map" keys + having strings as keys. + + :param d: the dictionary to modify + :type d: dictionary + :rtype: dictionary + """ + for k in d: + if k == PART_MAP: + pmap = d[k] + for kk in pmap.keys(): + _str_dict(d[k], kk) + for kk in pmap.keys(): + stringify_parts_map(d[k][str(kk)]) + return d + + class CustomJsonScanner(object): """ This class is a context manager definition used to monkey patch the default diff --git a/mail/src/leap/mail/walk.py b/mail/src/leap/mail/walk.py index 30cb70a..49f2c22 100644 --- a/mail/src/leap/mail/walk.py +++ b/mail/src/leap/mail/walk.py @@ -176,3 +176,36 @@ def walk_msg_tree(parts, body_phash=None): pdoc = outer pdoc[BODY] = body_phash return pdoc + +""" +Groucho Marx: Now pay particular attention to this first clause, because it's + most important. There's the party of the first part shall be + known in this contract as the party of the first part. How do you + like that, that's pretty neat eh? + +Chico Marx: No, that's no good. +Groucho Marx: What's the matter with it? + +Chico Marx: I don't know, let's hear it again. +Groucho Marx: So the party of the first part shall be known in this contract as + the party of the first part. + +Chico Marx: Well it sounds a little better this time. +Groucho Marx: Well, it grows on you. Would you like to hear it once more? + +Chico Marx: Just the first part. +Groucho Marx: All right. It says the first part of the party of the first part + shall be known in this contract as the first part of the party of + the first part, shall be known in this contract - look, why + should we quarrel about a thing like this, we'll take it right + out, eh? + +Chico Marx: Yes, it's too long anyhow. Now what have we got left? +Groucho Marx: Well I've got about a foot and a half. Now what's the matter? + +Chico Marx: I don't like the second party either. +""" + +""" +I feel you deserved it after reading the above and try to debug your problem ;) +""" |