From fa60b76ce9cdf6684945c6bc724f10818104166b Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 1 Jan 2015 18:21:44 -0400 Subject: cleanup imap implementation --- src/leap/mail/adaptors/soledad.py | 13 +- src/leap/mail/constants.py | 14 + src/leap/mail/imap/account.py | 306 +++----- src/leap/mail/imap/fields.py | 51 -- src/leap/mail/imap/interfaces.py | 96 --- src/leap/mail/imap/mailbox.py | 472 ++++--------- src/leap/mail/imap/memorystore.py | 1340 ------------------------------------ src/leap/mail/imap/messageparts.py | 586 ---------------- src/leap/mail/imap/messages.py | 1007 +++++---------------------- src/leap/mail/imap/soledadstore.py | 617 ----------------- src/leap/mail/mail.py | 93 ++- src/leap/mail/messageflow.py | 200 ------ 12 files changed, 522 insertions(+), 4273 deletions(-) delete mode 100644 src/leap/mail/imap/fields.py delete mode 100644 src/leap/mail/imap/interfaces.py delete mode 100644 src/leap/mail/imap/memorystore.py delete mode 100644 src/leap/mail/imap/messageparts.py delete mode 100644 src/leap/mail/imap/soledadstore.py delete mode 100644 src/leap/mail/messageflow.py (limited to 'src') diff --git a/src/leap/mail/adaptors/soledad.py b/src/leap/mail/adaptors/soledad.py index 0b97869..bf8f7e9 100644 --- a/src/leap/mail/adaptors/soledad.py +++ b/src/leap/mail/adaptors/soledad.py @@ -513,9 +513,13 @@ class MailboxWrapper(SoledadDocumentWrapper): type_ = "mbox" mbox = INBOX_NAME flags = [] + recent = [] + created = 1 closed = False subscribed = False - rw = True + + # I think we don't need to store this one. + # rw = True class __meta__(object): index = "mbox" @@ -655,6 +659,7 @@ class SoledadMailAdaptor(SoledadIndexMixin): assert(MessageClass is not None) return MessageClass(MessageWrapper(mdoc, fdoc, hdoc, cdocs)) + # XXX pass UID too? def _get_msg_from_variable_doc_list(self, doc_list, msg_class): if len(doc_list) == 2: fdoc, hdoc = doc_list @@ -664,12 +669,14 @@ class SoledadMailAdaptor(SoledadIndexMixin): cdocs = dict(enumerate(doc_list[2:], 1)) return self.get_msg_from_docs(msg_class, fdoc, hdoc, cdocs) + # XXX pass UID too ? def get_msg_from_mdoc_id(self, MessageClass, store, doc_id, get_cdocs=False): metamsg_id = doc_id def wrap_meta_doc(doc): cls = MetaMsgDocWrapper + # XXX pass UID? return cls(doc_id=doc.doc_id, **doc.content) def get_part_docs_from_mdoc_wrapper(wrapper): @@ -692,8 +699,8 @@ class SoledadMailAdaptor(SoledadIndexMixin): return constants.FDOCID.format(mbox=mbox, chash=chash) d_docs = [] - fdoc_id = _get_fdoc_id_from_mdoc_id(doc_id) - hdoc_id = _get_hdoc_id_from_mdoc_id(doc_id) + fdoc_id = _get_fdoc_id_from_mdoc_id() + hdoc_id = _get_hdoc_id_from_mdoc_id() d_docs.append(store.get_doc(fdoc_id)) d_docs.append(store.get_doc(hdoc_id)) d = defer.gatherResults(d_docs) diff --git a/src/leap/mail/constants.py b/src/leap/mail/constants.py index bf1db7f..d76e652 100644 --- a/src/leap/mail/constants.py +++ b/src/leap/mail/constants.py @@ -36,3 +36,17 @@ HDOCID_RE = "H\-[0-9a-fA-F]+" CDOCID = "C-{phash}" CDOCID_RE = "C\-[0-9a-fA-F]+" + + +class MessageFlags(object): + """ + Flags used in Message and Mailbox. + """ + SEEN_FLAG = "\\Seen" + RECENT_FLAG = "\\Recent" + ANSWERED_FLAG = "\\Answered" + FLAGGED_FLAG = "\\Flagged" # yo dawg + DELETED_FLAG = "\\Deleted" + DRAFT_FLAG = "\\Draft" + NOSELECT_FLAG = "\\Noselect" + LIST_FLAG = "List" # is this OK? (no \. ie, no system flag) diff --git a/src/leap/mail/imap/account.py b/src/leap/mail/imap/account.py index 7dfbbd1..0baf078 100644 --- a/src/leap/mail/imap/account.py +++ b/src/leap/mail/imap/account.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # account.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013-2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,12 +15,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ -Soledad Backed Account. +Soledad Backed IMAP Account. """ -import copy import logging import os import time +from functools import partial from twisted.internet import defer from twisted.mail import imap4 @@ -29,9 +29,9 @@ from zope.interface import implements from leap.common.check import leap_assert, leap_assert_type +from leap.mail.constants import MessageFlags from leap.mail.mail import Account -from leap.mail.imap.fields import WithMsgFields -from leap.mail.imap.mailbox import SoledadMailbox, normalize_mailbox +from leap.mail.imap.mailbox import IMAPMailbox, normalize_mailbox from leap.soledad.client import Soledad logger = logging.getLogger(__name__) @@ -49,9 +49,10 @@ if PROFILE_CMD: # Soledad IMAP Account ####################################### -# TODO remove MsgFields too +# XXX watchout, account needs to be ready... so we should maybe return +# a deferred to the IMAP service when it's initialized -class IMAPAccount(WithMsgFields): +class IMAPAccount(object): """ An implementation of an imap4 Account that is backed by Soledad Encrypted Documents. @@ -72,37 +73,20 @@ class IMAPAccount(WithMsgFields): :param store: a Soledad instance. :type store: Soledad """ - # XXX assert a generic store interface instead, so that we - # can plug the memory store wrapper seamlessly. leap_assert(store, "Need a store instance to initialize") leap_assert_type(store, Soledad) - # XXX SHOULD assert too that the name matches the user/uuid with which + # TODO assert too that the name matches the user/uuid with which # soledad has been initialized. self.user_id = user_id self.account = Account(store) - # XXX should hide this in the adaptor... - def _get_mailbox_by_name(self, name): - """ - Return an mbox document by name. - - :param name: the name of the mailbox - :type name: str - - :rtype: SoledadDocument - """ - def get_first_if_any(docs): - return docs[0] if docs else None - - d = self._store.get_from_index( - self.TYPE_MBOX_IDX, self.MBOX_KEY, - normalize_mailbox(name)) - d.addCallback(get_first_if_any) - return d + def _return_mailbox_from_collection(self, collection, readwrite=1): + if collection is None: + return None + return IMAPMailbox(collection, rw=readwrite) - # XXX move to Account? - # XXX needed? + # XXX Where's this used from? -- self.delete... def getMailbox(self, name): """ Return a Mailbox with that name, without selecting it. @@ -110,31 +94,25 @@ class IMAPAccount(WithMsgFields): :param name: name of the mailbox :type name: str - :returns: a a SoledadMailbox instance - :rtype: SoledadMailbox + :returns: an IMAPMailbox instance + :rtype: IMAPMailbox """ name = normalize_mailbox(name) - if name not in self.account.mailboxes: - raise imap4.MailboxException("No such mailbox: %r" % name) + def check_it_exists(mailboxes): + if name not in mailboxes: + raise imap4.MailboxException("No such mailbox: %r" % name) - # XXX Does mailbox really need reference to soledad? - return SoledadMailbox(name, self._store) + d = self.account.list_all_mailbox_names() + d.addCallback(check_it_exists) + d.addCallback(lambda _: self.account.get_collection_by_mailbox, name) + d.addCallbacK(self._return_mailbox_from_collection) + return d # # IAccount # - def _get_empty_mailbox(self): - """ - Returns an empty mailbox. - - :rtype: dict - """ - # XXX move to mailbox module - return copy.deepcopy(mailbox.EMPTY_MBOX) - - # TODO use mail.Account.add_mailbox def addMailbox(self, name, creation_ts=None): """ Add a mailbox to the account. @@ -154,8 +132,9 @@ class IMAPAccount(WithMsgFields): leap_assert(name, "Need a mailbox name to create a mailbox") - if name in self.mailboxes: - raise imap4.MailboxCollision(repr(name)) + def check_it_does_not_exist(mailboxes): + if name in mailboxes: + raise imap4.MailboxCollision(repr(name)) if creation_ts is None: # by default, we pass an int value @@ -164,21 +143,18 @@ class IMAPAccount(WithMsgFields): # mailbox-uidvalidity. creation_ts = int(time.time() * 10E2) - mbox = self._get_empty_mailbox() - mbox[self.MBOX_KEY] = name - mbox[self.CREATED_KEY] = creation_ts - - def load_mbox_cache(result): - d = self._load_mailboxes() - d.addCallback(lambda _: result) + def set_mbox_creation_ts(collection): + d = collection.set_mbox_attr("created") + d.addCallback(lambda _: collection) return d - d = self._store.create_doc(mbox) - d.addCallback(load_mbox_cache) + d = self.account.list_all_mailbox_names() + d.addCallback(check_it_does_not_exist) + d.addCallback(lambda _: self.account.get_collection_by_mailbox, name) + d.addCallback(set_mbox_creation_ts) + d.addCallback(self._return_mailbox_from_collection) return d - # TODO use mail.Account.create_mailbox? - # Watch out, imap specific exceptions raised here. def create(self, pathspec): """ Create a new mailbox from the given hierarchical name. @@ -204,9 +180,10 @@ class IMAPAccount(WithMsgFields): for accum in range(1, len(paths)): try: - partial = sep.join(paths[:accum]) - d = self.addMailbox(partial) + partial_path = sep.join(paths[:accum]) + d = self.addMailbox(partial_path) subs.append(d) + # XXX should this be handled by the deferred? except imap4.MailboxCollision: pass try: @@ -222,21 +199,13 @@ class IMAPAccount(WithMsgFields): def all_good(result): return all(result) - def load_mbox_cache(result): - d = self._load_mailboxes() - d.addCallback(lambda _: result) - return d - if subs: d1 = defer.gatherResults(subs, consumeErrors=True) - d1.addCallback(load_mbox_cache) d1.addCallback(all_good) else: d1 = defer.succeed(False) - d1.addCallback(load_mbox_cache) return d1 - # TODO use mail.Account.get_collection_by_mailbox def select(self, name, readwrite=1): """ Selects a mailbox. @@ -250,15 +219,28 @@ class IMAPAccount(WithMsgFields): :rtype: SoledadMailbox """ name = normalize_mailbox(name) - if name not in self.mailboxes: - logger.warning("No such mailbox!") - return None - self.selected = name - sm = SoledadMailbox(name, self._store, readwrite) - return sm + def check_it_exists(mailboxes): + if name not in mailboxes: + logger.warning("SELECT: No such mailbox!") + return None + return name + + def set_selected(_): + self.selected = name + + def get_collection(name): + if name is None: + return None + return self.account.get_collection_by_mailbox(name) + + d = self.account.list_all_mailbox_names() + d.addCallback(check_it_exists) + d.addCallback(get_collection) + d.addCallback(partial( + self._return_mailbox_from_collection, readwrite=readwrite)) + return d - # TODO use mail.Account.delete_mailbox def delete(self, name, force=False): """ Deletes a mailbox. @@ -276,37 +258,52 @@ class IMAPAccount(WithMsgFields): :rtype: Deferred """ name = normalize_mailbox(name) + _mboxes = [] - if name not in self.mailboxes: - err = imap4.MailboxException("No such mailbox: %r" % name) - return defer.fail(err) - mbox = self.getMailbox(name) + def check_it_exists(mailboxes): + # FIXME works? -- pass variable ref to outer scope + _mboxes = mailboxes + if name not in mailboxes: + err = imap4.MailboxException("No such mailbox: %r" % name) + return defer.fail(err) - if not force: + def get_mailbox(_): + return self.getMailbox(name) + + def destroy_mailbox(mbox): + return mbox.destroy() + + def check_can_be_deleted(mbox): # See if this box is flagged \Noselect - # XXX use mbox.flags instead? mbox_flags = mbox.getFlags() - if self.NOSELECT_FLAG in mbox_flags: + if MessageFlags.NOSELECT_FLAG in mbox_flags: # Check for hierarchically inferior mailboxes with this one # as part of their root. - for others in self.mailboxes: + for others in _mboxes: if others != name and others.startswith(name): err = imap4.MailboxException( "Hierarchically inferior mailboxes " "exist and \\Noselect is set") return defer.fail(err) - self.__mailboxes.discard(name) - return mbox.destroy() + return mbox - # XXX FIXME --- not honoring the inferior names... + d = self.account.list_all_mailbox_names() + d.addCallback(check_it_exists) + d.addCallback(get_mailbox) + if not force: + d.addCallback(check_can_be_deleted) + d.addCallback(destroy_mailbox) + return d + # FIXME --- not honoring the inferior names... # if there are no hierarchically inferior names, we will # delete it from our ken. + # XXX is this right? # if self._inferiorNames(name) > 1: - # ??! -- can this be rite? - # self._index.removeMailbox(name) + # self._index.removeMailbox(name) # TODO use mail.Account.rename_mailbox + # TODO finish conversion to deferreds def rename(self, oldname, newname): """ Renames a mailbox. @@ -320,6 +317,9 @@ class IMAPAccount(WithMsgFields): oldname = normalize_mailbox(oldname) newname = normalize_mailbox(newname) + # FIXME check that scope works (test) + _mboxes = [] + if oldname not in self.mailboxes: raise imap4.NoSuchMailbox(repr(oldname)) @@ -327,34 +327,19 @@ class IMAPAccount(WithMsgFields): inferiors = [(o, o.replace(oldname, newname, 1)) for o in inferiors] for (old, new) in inferiors: - if new in self.mailboxes: + if new in _mboxes: raise imap4.MailboxCollision(repr(new)) rename_deferreds = [] - def load_mbox_cache(result): - d = self._load_mailboxes() - d.addCallback(lambda _: result) - return d - - def update_mbox_doc_name(mbox, oldname, newname, update_deferred): - mbox.content[self.MBOX_KEY] = newname - d = self._soledad.put_doc(mbox) - d.addCallback(lambda r: update_deferred.callback(True)) - for (old, new) in inferiors: - self.__mailboxes.discard(old) - self._memstore.rename_fdocs_mailbox(old, new) - - d0 = defer.Deferred() - d = self._get_mailbox_by_name(old) - d.addCallback(update_mbox_doc_name, old, new, d0) - rename_deferreds.append(d0) + d = self.account.rename_mailbox(old, new) + rename_deferreds.append(d) d1 = defer.gatherResults(rename_deferreds, consumeErrors=True) - d1.addCallback(load_mbox_cache) return d1 + # FIXME use deferreds (list_all_mailbox_names, etc) def _inferiorNames(self, name): """ Return hierarchically inferior mailboxes. @@ -387,16 +372,15 @@ class IMAPAccount(WithMsgFields): :type wildcard: str """ # XXX use wildcard in index query - ref = self._inferiorNames(normalize_mailbox(ref)) + # TODO get deferreds wildcard = imap4.wildcardToRegexp(wildcard, '/') + ref = self._inferiorNames(normalize_mailbox(ref)) return [(i, self.getMailbox(i)) for i in ref if wildcard.match(i)] # # The rest of the methods are specific for leap.mail.imap.account.Account # - # TODO ------------------ can we preserve the attr? - # maybe add to memory store. def isSubscribed(self, name): """ Returns True if user is subscribed to this mailbox. @@ -406,63 +390,13 @@ class IMAPAccount(WithMsgFields): :rtype: Deferred (will fire with bool) """ - # TODO use Flags class - subscribed = self.SUBSCRIBED_KEY - - def is_subscribed(mbox): - subs_bool = bool(mbox.content.get(subscribed, False)) - return subs_bool - - d = self._get_mailbox_by_name(name) - d.addCallback(is_subscribed) - return d - - # TODO ------------------ can we preserve the property? - # maybe add to memory store. - - def _get_subscriptions(self): - """ - Return a list of the current subscriptions for this account. - - :returns: A deferred that will fire with the subscriptions. - :rtype: Deferred - """ - def get_docs_content(docs): - return [doc.content[self.MBOX_KEY] for doc in docs] - - d = self._store.get_from_index( - self.TYPE_SUBS_IDX, self.MBOX_KEY, '1') - d.addCallback(get_docs_content) - return d - - def _set_subscription(self, name, value): - """ - Sets the subscription value for a given mailbox - - :param name: the mailbox - :type name: str - - :param value: the boolean value - :type value: bool - """ - # XXX Note that this kind of operation has - # no guarantees of atomicity. We should not be accessing mbox - # documents concurrently. - - subscribed = self.SUBSCRIBED_KEY + name = normalize_mailbox(name) - def update_subscribed_value(mbox): - mbox.content[subscribed] = value - return self._store.put_doc(mbox) + def get_subscribed(mbox): + return mbox.get_mbox_attr("subscribed") - # maybe we should store subscriptions in another - # document... - if name not in self.mailboxes: - d = self.addMailbox(name) - d.addCallback(lambda v: self._get_mailbox_by_name(name)) - else: - d = self._get_mailbox_by_name(name) - d.addCallback(update_subscribed_value) + d = self.getMailbox(name) + d.addCallback(get_subscribed) return d def subscribe(self, name): @@ -475,11 +409,11 @@ class IMAPAccount(WithMsgFields): """ name = normalize_mailbox(name) - def check_and_subscribe(subscriptions): - if name not in subscriptions: - return self._set_subscription(name, True) - d = self._get_subscriptions() - d.addCallback(check_and_subscribe) + def set_subscribed(mbox): + return mbox.set_mbox_attr("subscribed", True) + + d = self.getMailbox(name) + d.addCallback(set_subscribed) return d def unsubscribe(self, name): @@ -492,17 +426,17 @@ class IMAPAccount(WithMsgFields): """ name = normalize_mailbox(name) - def check_and_unsubscribe(subscriptions): - if name not in subscriptions: - raise imap4.MailboxException( - "Not currently subscribed to %r" % name) - return self._set_subscription(name, False) - d = self._get_subscriptions() - d.addCallback(check_and_unsubscribe) + def set_unsubscribed(mbox): + return mbox.set_mbox_attr("subscribed", False) + + d = self.getMailbox(name) + d.addCallback(set_unsubscribed) return d + # TODO -- get__all_mboxes, return tuple + # with ... name? and subscribed bool... def getSubscriptions(self): - return self._get_subscriptions() + raise NotImplementedError() # # INamespacePresenter @@ -517,20 +451,6 @@ class IMAPAccount(WithMsgFields): def getOtherNamespaces(self): return None - # extra, for convenience - - def deleteAllMessages(self, iknowhatiamdoing=False): - """ - Deletes all messages from all mailboxes. - Danger! high voltage! - - :param iknowhatiamdoing: confirmation parameter, needs to be True - to proceed. - """ - if iknowhatiamdoing is True: - for mbox in self.mailboxes: - self.delete(mbox, force=True) - def __repr__(self): """ Representation string for this object. diff --git a/src/leap/mail/imap/fields.py b/src/leap/mail/imap/fields.py deleted file mode 100644 index a751c6d..0000000 --- a/src/leap/mail/imap/fields.py +++ /dev/null @@ -1,51 +0,0 @@ -# -*- coding: utf-8 -*- -# fields.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Fields for Mailbox and Message. -""" - -# TODO deprecate !!! (move all to constants maybe?) -# Flags -> foo - - -class WithMsgFields(object): - """ - Container class for class-attributes to be shared by - several message-related classes. - """ - # Mailbox specific keys - CREATED_KEY = "created" # used??? - - RECENTFLAGS_KEY = "rct" - HDOCS_SET_KEY = "hdocset" - - # Flags in Mailbox and Message - SEEN_FLAG = "\\Seen" - RECENT_FLAG = "\\Recent" - ANSWERED_FLAG = "\\Answered" - FLAGGED_FLAG = "\\Flagged" # yo dawg - DELETED_FLAG = "\\Deleted" - DRAFT_FLAG = "\\Draft" - NOSELECT_FLAG = "\\Noselect" - LIST_FLAG = "List" # is this OK? (no \. ie, no system flag) - - # Fields in mail object - SUBJECT_FIELD = "Subject" - DATE_FIELD = "Date" - - -fields = WithMsgFields # alias for convenience diff --git a/src/leap/mail/imap/interfaces.py b/src/leap/mail/imap/interfaces.py deleted file mode 100644 index f8f25fa..0000000 --- a/src/leap/mail/imap/interfaces.py +++ /dev/null @@ -1,96 +0,0 @@ -# -*- coding: utf-8 -*- -# interfaces.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Interfaces for the IMAP module. -""" -from zope.interface import Interface, Attribute - - -# TODO remove ---------------- -class IMessageContainer(Interface): - """ - I am a container around the different documents that a message - is split into. - """ - fdoc = Attribute('The flags document for this message, if any.') - hdoc = Attribute('The headers document for this message, if any.') - cdocs = Attribute('The dict of content documents for this message, ' - 'if any.') - - def walk(self): - """ - Return an iterator to the docs for all the parts. - - :rtype: iterator - """ - - -# TODO remove -------------------- -class IMessageStore(Interface): - """ - I represent a generic storage for LEAP Messages. - """ - - def create_message(self, mbox, uid, message): - """ - Put the passed message into this IMessageStore. - - :param mbox: the mbox this message belongs. - :param uid: the UID that identifies this message in this mailbox. - :param message: a IMessageContainer implementor. - """ - - def put_message(self, mbox, uid, message): - """ - Put the passed message into this IMessageStore. - - :param mbox: the mbox this message belongs. - :param uid: the UID that identifies this message in this mailbox. - :param message: a IMessageContainer implementor. - """ - - def remove_message(self, mbox, uid): - """ - Remove the given message from this IMessageStore. - - :param mbox: the mbox this message belongs. - :param uid: the UID that identifies this message in this mailbox. - """ - - def get_message(self, mbox, uid): - """ - Get a IMessageContainer for the given mbox and uid combination. - - :param mbox: the mbox this message belongs. - :param uid: the UID that identifies this message in this mailbox. - :return: IMessageContainer - """ - - -class IMessageStoreWriter(Interface): - """ - I represent a storage that is able to write its contents to another - different IMessageStore. - """ - - def write_messages(self, store): - """ - Write the documents in this IMessageStore to a different - storage. Usually this will be done from a MemoryStorage to a DbStorage. - - :param store: another IMessageStore implementor. - """ diff --git a/src/leap/mail/imap/mailbox.py b/src/leap/mail/imap/mailbox.py index ea54d33..faeba9d 100644 --- a/src/leap/mail/imap/mailbox.py +++ b/src/leap/mail/imap/mailbox.py @@ -1,6 +1,6 @@ # *- coding: utf-8 -*- # mailbox.py -# Copyright (C) 2013, 2014 LEAP +# Copyright (C) 2013-2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,11 +15,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ -Soledad Mailbox. +IMAP Mailbox. """ -import copy import re -import threading import logging import StringIO import cStringIO @@ -29,7 +27,6 @@ from collections import defaultdict from twisted.internet import defer from twisted.internet import reactor -from twisted.internet.task import deferLater from twisted.python import log from twisted.mail import imap4 @@ -38,17 +35,15 @@ from zope.interface import implements from leap.common import events as leap_events from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL from leap.common.check import leap_assert, leap_assert_type -from leap.mail.constants import INBOX_NAME -from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty -from leap.mail.imap.fields import WithMsgFields, fields -from leap.mail.imap.messages import MessageCollection -from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.constants import INBOX_NAME, MessageFlags logger = logging.getLogger(__name__) -# TODO +# TODO LIST # [ ] Restore profile_cmd instrumentation +# [ ] finish the implementation of IMailboxListener +# [ ] implement the rest of ISearchableMailbox + """ If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid @@ -75,16 +70,20 @@ if PROFILE_CMD: d.addCallback(_debugProfiling, name, time.time()) d.addErrback(lambda f: log.msg(f.getTraceback())) +INIT_FLAGS = (MessageFlags.SEEN_FLAG, MessageFlags.ANSWERED_FLAG, + MessageFlags.FLAGGED_FLAG, MessageFlags.DELETED_FLAG, + MessageFlags.DRAFT_FLAG, MessageFlags.RECENT_FLAG, + MessageFlags.LIST_FLAG) + -# TODO Rename to Mailbox -# TODO Remove WithMsgFields -class SoledadMailbox(WithMsgFields): +class IMAPMailbox(object): """ A Soledad-backed IMAP mailbox. Implements the high-level method needed for the Mailbox interfaces. - The low-level database methods are contained in MessageCollection class, - which we instantiate and make accessible in the `messages` attribute. + The low-level database methods are contained in IMAPMessageCollection + class, which we instantiate and make accessible in the `messages` + attribute. """ implements( imap4.IMailbox, @@ -93,17 +92,7 @@ class SoledadMailbox(WithMsgFields): imap4.ISearchableMailbox, imap4.IMessageCopier) - # XXX should finish the implementation of IMailboxListener - # XXX should completely implement ISearchableMailbox too - - messages = None - _closed = False - - INIT_FLAGS = (WithMsgFields.SEEN_FLAG, WithMsgFields.ANSWERED_FLAG, - WithMsgFields.FLAGGED_FLAG, WithMsgFields.DELETED_FLAG, - WithMsgFields.DRAFT_FLAG, WithMsgFields.RECENT_FLAG, - WithMsgFields.LIST_FLAG) - flags = None + init_flags = INIT_FLAGS CMD_MSG = "MESSAGES" CMD_RECENT = "RECENT" @@ -111,58 +100,31 @@ class SoledadMailbox(WithMsgFields): CMD_UIDVALIDITY = "UIDVALIDITY" CMD_UNSEEN = "UNSEEN" - # FIXME we should turn this into a datastructure with limited capacity + # TODO we should turn this into a datastructure with limited capacity _listeners = defaultdict(set) - next_uid_lock = threading.Lock() - last_uid_lock = threading.Lock() - - # TODO unify all the `primed` dicts - _fdoc_primed = {} - _last_uid_primed = {} - _known_uids_primed = {} - - # TODO pass the collection to the constructor - # TODO pass the mbox_doc too - def __init__(self, mbox, store, rw=1): + def __init__(self, collection, rw=1): """ SoledadMailbox constructor. Needs to get passed a name, plus a Soledad instance. - :param mbox: the mailbox name - :type mbox: str - - :param store: - :type store: Soledad + :param collection: instance of IMAPMessageCollection + :type collection: IMAPMessageCollection :param rw: read-and-write flag for this mailbox :type rw: int """ - leap_assert(mbox, "Need a mailbox name to initialize") - leap_assert(store, "Need a store instance to initialize") - - self.mbox = normalize_mailbox(mbox) self.rw = rw - self.store = store - - self.messages = MessageCollection(mbox=mbox, soledad=store) self._uidvalidity = None + self.collection = collection - # XXX careful with this get/set (it would be - # hitting db unconditionally, move to memstore too) - # Now it's returning a fixed amount of flags from mem - # as a workaround. if not self.getFlags(): - self.setFlags(self.INIT_FLAGS) - - if self._memstore: - self.prime_known_uids_to_memstore() - self.prime_last_uid_to_memstore() - self.prime_flag_docs_to_memstore() + self.setFlags(self.init_flags) - # purge memstore from empty fdocs. - self._memstore.purge_fdoc_store(mbox) + @property + def mbox_name(self): + return self.collection.mbox_name @property def listeners(self): @@ -175,11 +137,12 @@ class SoledadMailbox(WithMsgFields): :rtype: set """ - return self._listeners[self.mbox] + return self._listeners[self.mbox_name] - # TODO this grows too crazily when many instances are fired, like + # FIXME this grows too crazily when many instances are fired, like # during imaptest stress testing. Should have a queue of limited size # instead. + def addListener(self, listener): """ Add a listener to the listeners queue. @@ -204,16 +167,6 @@ class SoledadMailbox(WithMsgFields): """ self.listeners.remove(listener) - def _get_mbox_doc(self): - """ - Return mailbox document. - - :return: A SoledadDocument containing this mailbox, or None if - the query failed. - :rtype: SoledadDocument or None. - """ - return self._memstore.get_mbox_doc(self.mbox) - def getFlags(self): """ Returns the flags defined for this mailbox. @@ -221,10 +174,11 @@ class SoledadMailbox(WithMsgFields): :returns: tuple of flags for this mailbox :rtype: tuple of str """ - flags = self._memstore.get_mbox_flags(self.mbox) + flags = self.collection.mbox_wrapper.flags if not flags: - flags = self.INIT_FLAGS - return map(str, flags) + flags = self.init_flags + flags_str = map(str, flags) + return flags_str def setFlags(self, flags): """ @@ -234,98 +188,31 @@ class SoledadMailbox(WithMsgFields): :type flags: tuple of str """ # XXX this is setting (overriding) old flags. + # Better pass a mode flag leap_assert(isinstance(flags, tuple), "flags expected to be a tuple") - self._memstore.set_mbox_flags(self.mbox, flags) - - # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG. + return self.collection.set_mbox_attr("flags", flags) - def _get_closed(self): + @property + def is_closed(self): """ Return the closed attribute for this mailbox. :return: True if the mailbox is closed :rtype: bool """ - return self._memstore.get_mbox_closed(self.mbox) + return self.collection.get_mbox_attr("closed") - def _set_closed(self, closed): + def set_closed(self, closed): """ Set the closed attribute for this mailbox. :param closed: the state to be set :type closed: bool - """ - self._memstore.set_mbox_closed(self.mbox, closed) - - closed = property( - _get_closed, _set_closed, doc="Closed attribute.") - - def _get_last_uid(self): - """ - Return the last uid for this mailbox. - If we have a memory store, the last UID will be the highest - recorded UID in the message store, or a counter cached from - the mailbox document in soledad if this is higher. - - :return: the last uid for messages in this mailbox - :rtype: int - """ - last = self._memstore.get_last_uid(self.mbox) - logger.debug("last uid for %s: %s (from memstore)" % ( - repr(self.mbox), last)) - return last - - last_uid = property( - _get_last_uid, doc="Last_UID attribute.") - def prime_last_uid_to_memstore(self): - """ - Prime memstore with last_uid value - """ - primed = self._last_uid_primed.get(self.mbox, False) - if not primed: - mbox = self._get_mbox_doc() - if mbox is None: - # memory-only store - return - last = mbox.content.get('lastuid', 0) - logger.info("Priming Soledad last_uid to %s" % (last,)) - self._memstore.set_last_soledad_uid(self.mbox, last) - self._last_uid_primed[self.mbox] = True - - def prime_known_uids_to_memstore(self): - """ - Prime memstore with the set of all known uids. - - We do this to be able to filter the requests efficiently. - """ - primed = self._known_uids_primed.get(self.mbox, False) - # XXX handle the maybeDeferred - - def set_primed(known_uids): - self._memstore.set_known_uids(self.mbox, known_uids) - self._known_uids_primed[self.mbox] = True - - if not primed: - d = self.messages.all_soledad_uid_iter() - d.addCallback(set_primed) - return d - - def prime_flag_docs_to_memstore(self): - """ - Prime memstore with all the flags documents. + :rtype: Deferred """ - primed = self._fdoc_primed.get(self.mbox, False) - - def set_flag_docs(flag_docs): - self._memstore.load_flag_docs(self.mbox, flag_docs) - self._fdoc_primed[self.mbox] = True - - if not primed: - d = self.messages.get_all_soledad_flag_docs() - d.addCallback(set_flag_docs) - return d + return self.collection.set_mbox_attr("closed", closed) def getUIDValidity(self): """ @@ -334,12 +221,7 @@ class SoledadMailbox(WithMsgFields): :return: unique validity identifier :rtype: int """ - if self._uidvalidity is None: - mbox = self._get_mbox_doc() - if mbox is None: - return 0 - self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1) - return self._uidvalidity + return self.collection.get_mbox_attr("created") def getUID(self, message): """ @@ -354,9 +236,9 @@ class SoledadMailbox(WithMsgFields): :rtype: int """ - msg = self.messages.get_msg_by_uid(message) - if msg is not None: - return msg.getUID() + d = self.collection.get_msg_by_uid(message) + d.addCallback(lambda m: m.getUID()) + return d def getUIDNext(self): """ @@ -364,23 +246,20 @@ class SoledadMailbox(WithMsgFields): mailbox. Currently it returns the higher UID incremented by one. - We increment the next uid *each* time this function gets called. - In this way, there will be gaps if the message with the allocated - uid cannot be saved. But that is preferable to having race conditions - if we get to parallel message adding. - - :rtype: int + :return: deferred with int + :rtype: Deferred """ - with self.next_uid_lock: - return self.last_uid + 1 + d = self.collection.get_uid_next() + return d def getMessageCount(self): """ Returns the total count of messages in this mailbox. - :rtype: int + :return: deferred with int + :rtype: Deferred """ - return self.messages.count() + return self.collection.count() def getUnseenCount(self): """ @@ -389,7 +268,7 @@ class SoledadMailbox(WithMsgFields): :return: count of messages flagged `unseen` :rtype: int """ - return self.messages.count_unseen() + return self.collection.count_unseen() def getRecentCount(self): """ @@ -398,7 +277,7 @@ class SoledadMailbox(WithMsgFields): :return: count of messages flagged `recent` :rtype: int """ - return self.messages.count_recent() + return self.collection.count_recent() def isWriteable(self): """ @@ -407,6 +286,8 @@ class SoledadMailbox(WithMsgFields): :return: 1 if mailbox is read-writeable, 0 otherwise. :rtype: int """ + # XXX We don't need to store it in the mbox doc, do we? + # return int(self.collection.get_mbox_attr('rw')) return self.rw def getHierarchicalDelimiter(self): @@ -431,14 +312,14 @@ class SoledadMailbox(WithMsgFields): if self.CMD_RECENT in names: r[self.CMD_RECENT] = self.getRecentCount() if self.CMD_UIDNEXT in names: - r[self.CMD_UIDNEXT] = self.last_uid + 1 + r[self.CMD_UIDNEXT] = self.getUIDNext() if self.CMD_UIDVALIDITY in names: r[self.CMD_UIDVALIDITY] = self.getUIDValidity() if self.CMD_UNSEEN in names: r[self.CMD_UNSEEN] = self.getUnseenCount() return defer.succeed(r) - def addMessage(self, message, flags, date=None, notify_on_disk=False): + def addMessage(self, message, flags, date=None): """ Adds a message to this mailbox. @@ -464,10 +345,8 @@ class SoledadMailbox(WithMsgFields): else: flags = tuple(str(flag) for flag in flags) - d = self._do_add_message(message, flags=flags, date=date, - notify_on_disk=notify_on_disk) - #if PROFILE_CMD: - #do_profile_cmd(d, "APPEND") + # if PROFILE_CMD: + # do_profile_cmd(d, "APPEND") # XXX should review now that we're not using qtreactor. # A better place for this would be the COPY/APPEND dispatcher @@ -478,19 +357,11 @@ class SoledadMailbox(WithMsgFields): reactor.callLater(0, self.notify_new) return x + d = self.collection.add_message(flags=flags, date=date) d.addCallback(notifyCallback) d.addErrback(lambda f: log.msg(f.getTraceback())) return d - def _do_add_message(self, message, flags, date, notify_on_disk=False): - """ - Calls to the messageCollection add_msg method. - Invoked from addMessage. - """ - d = self.messages.add_msg(message, flags=flags, date=date, - notify_on_disk=notify_on_disk) - return d - def notify_new(self, *args): """ Notify of new messages to all the listeners. @@ -502,26 +373,34 @@ class SoledadMailbox(WithMsgFields): def cbNotifyNew(result): exists, recent = result - for l in self.listeners: - l.newMessages(exists, recent) + for listener in self.listeners: + listener.newMessages(exists, recent) + d = self._get_notify_count() d.addCallback(cbNotifyNew) d.addCallback(self.cb_signal_unread_to_ui) - @deferred_to_thread def _get_notify_count(self): """ Get message count and recent count for this mailbox Executed in a separate thread. Called from notify_new. - :return: number of messages and number of recent messages. - :rtype: tuple + :return: a deferred that will fire with a tuple, with number of + messages and number of recent messages. + :rtype: Deferred """ - exists = self.getMessageCount() - recent = self.getRecentCount() - logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( - self.mbox, exists, recent)) - return exists, recent + d_exists = self.getMessageCount() + d_recent = self.getRecentCount() + d_list = [d_exists, d_recent] + + def log_num_msg(result): + exists, recent = result + logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( + self.mbox_name, exists, recent)) + + d = defer.gatherResults(d_list) + d.addCallback(log_num_msg) + return d # commands, do not rename methods @@ -533,27 +412,18 @@ class SoledadMailbox(WithMsgFields): on the mailbox. """ - # XXX this will overwrite all the existing flags! + # XXX this will overwrite all the existing flags # should better simply addFlag - self.setFlags((self.NOSELECT_FLAG,)) - - # XXX removing the mailbox in situ for now, - # we should postpone the removal - - def remove_mbox_doc(ignored): - # XXX move to memory store?? + self.setFlags((MessageFlags.NOSELECT_FLAG,)) - def _remove_mbox_doc(doc): - if doc is None: - # memory-only store! - return defer.succeed(True) - return self._soledad.delete_doc(doc) - - doc = self._get_mbox_doc() - return _remove_mbox_doc(doc) + def remove_mbox(_): + # FIXME collection does not have a delete_mbox method, + # it's in account. + # XXX should take care of deleting the uid table too. + return self.collection.delete_mbox(self.mbox_name) d = self.deleteAllDocs() - d.addCallback(remove_mbox_doc) + d.addCallback(remove_mbox) return d def _close_cb(self, result): @@ -574,9 +444,11 @@ class SoledadMailbox(WithMsgFields): if not self.isWriteable(): raise imap4.ReadOnlyMailbox d = defer.Deferred() - self._memstore.expunge(self.mbox, d) + # FIXME actually broken. + # Iterate through index, and do a expunge. return d + # FIXME -- get last_uid from mbox_indexer def _bound_seq(self, messages_asked): """ Put an upper bound to a messages sequence if this is open. @@ -596,6 +468,7 @@ class SoledadMailbox(WithMsgFields): pass return messages_asked + # TODO -- needed? --- we can get the sequence from the indexer. def _filter_msg_seq(self, messages_asked): """ Filter a message sequence returning only the ones that do exist in the @@ -627,29 +500,6 @@ class SoledadMailbox(WithMsgFields): :rtype: deferred """ - d = defer.Deferred() - - # XXX do not need no thread... - reactor.callInThread(self._do_fetch, messages_asked, uid, d) - d.addCallback(self.cb_signal_unread_to_ui) - return d - - # called in thread - def _do_fetch(self, messages_asked, uid, d): - """ - :param messages_asked: IDs of the messages to retrieve information - about - :type messages_asked: MessageSet - - :param uid: If true, the IDs are UIDs. They are message sequence IDs - otherwise. - :type uid: bool - :param d: deferred whose callback will be called with result. - :type d: Deferred - - :rtype: A tuple of two-tuples of message sequence numbers and - LeapMessage - """ # For the moment our UID is sequential, so we # can treat them all the same. # Change this to the flag that twisted expects when we @@ -660,18 +510,23 @@ class SoledadMailbox(WithMsgFields): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - getmsg = lambda uid: self.messages.get_msg_by_uid(uid) + getmsg = self.collection.get_msg_by_uid # for sequence numbers (uid = 0) if sequence: logger.debug("Getting msg by index: INEFFICIENT call!") + # TODO --- implement sequences in mailbox indexer raise NotImplementedError else: got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg) result = ((msgid, msg) for msgid, msg in got_msg if msg is not None) - self.reactor.callLater(0, self.unset_recent_flags, seq_messg) - self.reactor.callFromThread(d.callback, result) + reactor.callLater(0, self.unset_recent_flags, seq_messg) + + # TODO -- call signal_to_ui + # d.addCallback(self.cb_signal_unread_to_ui) + + return result def fetch_flags(self, messages_asked, uid): """ @@ -698,12 +553,11 @@ class SoledadMailbox(WithMsgFields): :rtype: tuple """ d = defer.Deferred() - self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d) + reactor.callLater(0, self._do_fetch_flags, messages_asked, uid, d) if PROFILE_CMD: do_profile_cmd(d, "FETCH-ALL-FLAGS") return d - # called in thread def _do_fetch_flags(self, messages_asked, uid, d): """ :param messages_asked: IDs of the messages to retrieve information @@ -733,10 +587,11 @@ class SoledadMailbox(WithMsgFields): messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - all_flags = self._memstore.all_flags(self.mbox) + # FIXME use deferreds here + all_flags = self.collection.get_all_flags(self.mbox_name) result = ((msgid, flagsPart( msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) - self.reactor.callFromThread(d.callback, result) + d.callback(result) def fetch_headers(self, messages_asked, uid): """ @@ -843,8 +698,8 @@ class SoledadMailbox(WithMsgFields): raise imap4.ReadOnlyMailbox d = defer.Deferred() - self.reactor.callLater(0, self._do_store, messages_asked, flags, - mode, uid, d) + reactor.callLater(0, self._do_store, messages_asked, flags, + mode, uid, d) if PROFILE_CMD: do_profile_cmd(d, "STORE") d.addCallback(self.cb_signal_unread_to_ui) @@ -853,7 +708,7 @@ class SoledadMailbox(WithMsgFields): def _do_store(self, messages_asked, flags, mode, uid, observer): """ - Helper method, invoke set_flags method in the MessageCollection. + Helper method, invoke set_flags method in the IMAPMessageCollection. See the documentation for the `store` method for the parameters. @@ -869,7 +724,8 @@ class SoledadMailbox(WithMsgFields): flags = tuple(flags) messages_asked = self._bound_seq(messages_asked) seq_messg = self._filter_msg_seq(messages_asked) - self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) + self.collection.set_flags( + self.mbox_name, seq_messg, flags, mode, observer) # ISearchableMailbox @@ -908,6 +764,7 @@ class SoledadMailbox(WithMsgFields): msgid = str(query[3]).strip() logger.debug("Searching for %s" % (msgid,)) d = self.messages._get_uid_from_msgid(str(msgid)) + # XXX remove gatherResults d1 = defer.gatherResults([d]) # we want a list, so return it all the same return d1 @@ -928,94 +785,18 @@ class SoledadMailbox(WithMsgFields): uid when the copy succeed. :rtype: Deferred """ - d = defer.Deferred() if PROFILE_CMD: do_profile_cmd(d, "COPY") # A better place for this would be the COPY/APPEND dispatcher # in server.py, but qtreactor hangs when I do that, so this seems # to work fine for now. - d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new)) - deferLater(self.reactor, 0, self._do_copy, message, d) - return d + #d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new)) + #deferLater(self.reactor, 0, self._do_copy, message, d) + #return d - def _do_copy(self, message, observer): - """ - Call invoked from the deferLater in `copy`. This will - copy the flags and header documents, and pass them to the - `create_message` method in the MemoryStore, together with - the observer deferred that we've been passed along. - - :param message: an IMessage implementor - :type message: LeapMessage - :param observer: the deferred that will fire with the - UID of the message - :type observer: Deferred - """ - memstore = self._memstore - - def createCopy(result): - exist, new_fdoc = result - if exist: - # Should we signal error on the callback? - logger.warning("Destination message already exists!") - - # XXX I'm not sure if we should raise the - # errback. This actually rases an ugly warning - # in some muas like thunderbird. - # UID 0 seems a good convention for no uid. - observer.callback(0) - else: - mbox = self.mbox - uid_next = memstore.increment_last_soledad_uid(mbox) - - new_fdoc[self.UID_KEY] = uid_next - new_fdoc[self.MBOX_KEY] = mbox - - flags = list(new_fdoc[self.FLAGS_KEY]) - flags.append(fields.RECENT_FLAG) - new_fdoc[self.FLAGS_KEY] = tuple(set(flags)) - - # FIXME set recent! - - self._memstore.create_message( - self.mbox, uid_next, - MessageWrapper(new_fdoc), - observer=observer, - notify_on_disk=False) - - d = self._get_msg_copy(message) - d.addCallback(createCopy) - d.addErrback(lambda f: log.msg(f.getTraceback())) - - #@deferred_to_thread - def _get_msg_copy(self, message): - """ - Get a copy of the fdoc for this message, and check whether - it already exists. - - :param message: an IMessage implementor - :type message: LeapMessage - :return: exist, new_fdoc - :rtype: tuple - """ - # XXX for clarity, this could be delegated to a - # MessageCollection mixin that implements copy too, and - # moved out of here. - msg = message - memstore = self._memstore - - if empty(msg.fdoc): - logger.warning("Tried to copy a MSG with no fdoc") - return - new_fdoc = copy.deepcopy(msg.fdoc.content) - fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - - dest_fdoc = memstore.get_fdoc_from_chash( - fdoc_chash, self.mbox) - - exist = not empty(dest_fdoc) - return exist, new_fdoc + # FIXME not implemented !!! --- + return self.collection.copy_msg(message, self.mbox_name) # convenience fun @@ -1023,29 +804,25 @@ class SoledadMailbox(WithMsgFields): """ Delete all docs in this mailbox """ - def del_all_docs(docs): - todelete = [] - for doc in docs: - d = self.messages._soledad.delete_doc(doc) - todelete.append(d) - return defer.gatherResults(todelete) - - d = self.messages.get_all_docs() - d.addCallback(del_all_docs) - return d + # FIXME not implemented + return self.collection.delete_all_docs() def unset_recent_flags(self, uid_seq): """ Unset Recent flag for a sequence of UIDs. """ - self.messages.unset_recent_flags(uid_seq) + # FIXME not implemented + return self.collection.unset_recent_flags(uid_seq) def __repr__(self): """ Representation string for this mailbox. """ - return u"" % ( - self.mbox, self.messages.count()) + return u"" % ( + self.mbox_name, self.messages.count()) + + +_INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE) def normalize_mailbox(name): @@ -1060,7 +837,8 @@ def normalize_mailbox(name): :rtype: unicode """ - _INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE) + # XXX maybe it would make sense to normalize common folders too: + # trash, sent, drafts, etc... if _INBOX_RE.match(name): # ensure inital INBOX is uppercase return INBOX_NAME + name[len(INBOX_NAME):] diff --git a/src/leap/mail/imap/memorystore.py b/src/leap/mail/imap/memorystore.py deleted file mode 100644 index eda5b96..0000000 --- a/src/leap/mail/imap/memorystore.py +++ /dev/null @@ -1,1340 +0,0 @@ - -# memorystore.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -In-memory transient store for a LEAPIMAPServer. -""" -import contextlib -import logging -import threading -import weakref - -from collections import defaultdict -from copy import copy - -from enum import Enum -from twisted.internet import defer -from twisted.internet import reactor -from twisted.internet.task import LoopingCall -from twisted.python import log -from zope.interface import implements - -from leap.common.check import leap_assert_type -from leap.mail import size -from leap.mail.utils import empty, phash_iter -from leap.mail.messageflow import MessageProducer -from leap.mail.imap import interfaces -from leap.mail.imap.fields import fields -from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc -from leap.mail.imap.messageparts import RecentFlagsDoc -from leap.mail.imap.messageparts import MessageWrapper -from leap.mail.imap.messageparts import ReferenciableDict - -from leap.mail.decorators import deferred_to_thread - -logger = logging.getLogger(__name__) - - -# The default period to do writebacks to the permanent -# soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 15 - -FDOC = MessagePartType.fdoc.name -HDOC = MessagePartType.hdoc.name -CDOCS = MessagePartType.cdocs.name -DOCS_ID = MessagePartType.docs_id.name - - -@contextlib.contextmanager -def set_bool_flag(obj, att): - """ - Set a boolean flag to True while we're doing our thing. - Just to let the world know. - """ - setattr(obj, att, True) - try: - yield True - except RuntimeError as exc: - logger.exception(exc) - finally: - setattr(obj, att, False) - - -DirtyState = Enum("DirtyState", "none dirty new") - - -class MemoryStore(object): - """ - An in-memory store to where we can write the different parts that - we split the messages into and buffer them until we write them to the - permanent storage. - - It uses MessageWrapper instances to represent the message-parts, which are - indexed by mailbox name and UID. - - It also can be passed a permanent storage as a paremeter (any implementor - of IMessageStore, in this case a SoledadStore). In this case, a periodic - dump of the messages stored in memory will be done. The period of the - writes to the permanent storage is controled by the write_period parameter - in the constructor. - """ - implements(interfaces.IMessageStore, - interfaces.IMessageStoreWriter) - - # TODO We will want to index by chash when we transition to local-only - # UIDs. - - WRITING_FLAG = "_writing" - _last_uid_lock = threading.Lock() - _fdoc_docid_lock = threading.Lock() - - def __init__(self, permanent_store=None, - write_period=SOLEDAD_WRITE_PERIOD): - """ - Initialize a MemoryStore. - - :param permanent_store: a IMessageStore implementor to dump - messages to. - :type permanent_store: IMessageStore - :param write_period: the interval to dump messages to disk, in seconds. - :type write_period: int - """ - self._permanent_store = permanent_store - self._write_period = write_period - - if permanent_store is None: - self._mbox_closed = defaultdict(lambda: False) - - # Internal Storage: messages - """ - flags document store. - _fdoc_store[mbox][uid] = { 'content': 'aaa' } - """ - self._fdoc_store = defaultdict(lambda: defaultdict( - lambda: ReferenciableDict({}))) - - # Sizes - """ - {'mbox, uid': } - """ - self._sizes = {} - - # Internal Storage: payload-hash - """ - fdocs:doc-id store, stores document IDs for putting - the dirty flags-docs. - """ - self._fdoc_id_store = defaultdict(lambda: defaultdict( - lambda: '')) - - # Internal Storage: content-hash:hdoc - """ - hdoc-store keeps references to - the header-documents indexed by content-hash. - - {'chash': { dict-stuff } - } - """ - self._hdoc_store = defaultdict(lambda: ReferenciableDict({})) - - # Internal Storage: payload-hash:cdoc - """ - content-docs stored by payload-hash - {'phash': { dict-stuff } } - """ - self._cdoc_store = defaultdict(lambda: ReferenciableDict({})) - - # Internal Storage: content-hash:fdoc - """ - chash-fdoc-store keeps references to - the flag-documents indexed by content-hash. - - {'chash': {'mbox-a': weakref.proxy(dict), - 'mbox-b': weakref.proxy(dict)} - } - """ - self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None)) - - # Internal Storage: recent-flags store - """ - recent-flags store keeps one dict per mailbox, - with the document-id of the u1db document - and the set of the UIDs that have the recent flag. - - {'mbox-a': {'doc_id': 'deadbeef', - 'set': {1,2,3,4} - } - } - """ - # TODO this will have to transition to content-hash - # indexes after we move to local-only UIDs. - - self._rflags_store = defaultdict( - lambda: {'doc_id': None, 'set': set([])}) - - """ - last-uid store keeps the count of the highest UID - per mailbox. - - {'mbox-a': 42, - 'mbox-b': 23} - """ - self._last_uid = defaultdict(lambda: 0) - - """ - known-uids keeps a count of the uids that soledad knows for a given - mailbox - - {'mbox-a': set([1,2,3])} - """ - self._known_uids = defaultdict(set) - - """ - mbox-flags is a dict containing flags for each mailbox. this is - modified from mailbox.getFlags / mailbox.setFlags - """ - self._mbox_flags = defaultdict(set) - - # New and dirty flags, to set MessageWrapper State. - self._new = set([]) - self._new_queue = set([]) - self._new_deferreds = {} - - self._dirty = set([]) - self._dirty_queue = set([]) - self._dirty_deferreds = {} - - self._rflags_dirty = set([]) - - # Flag for signaling we're busy writing to the disk storage. - setattr(self, self.WRITING_FLAG, False) - - if self._permanent_store is not None: - # this producer spits its messages to the permanent store - # consumer using a queue. We will use that to put - # our messages to be written. - self.producer = MessageProducer(permanent_store, - period=0.1) - # looping call for dumping to SoledadStore - self._write_loop = LoopingCall(self.write_messages, - permanent_store) - - # We can start the write loop right now, why wait? - self._start_write_loop() - else: - # We have a memory-only store. - self.producer = None - self._write_loop = None - - # TODO -- remove - def _start_write_loop(self): - """ - Start loop for writing to disk database. - """ - if self._write_loop is None: - return - if not self._write_loop.running: - self._write_loop.start(self._write_period, now=True) - - # TODO -- remove - def _stop_write_loop(self): - """ - Stop loop for writing to disk database. - """ - if self._write_loop is None: - return - if self._write_loop.running: - self._write_loop.stop() - - # IMessageStore - - # XXX this would work well for whole message operations. - # We would have to add a put_flags operation to modify only - # the flags doc (and set the dirty flag accordingly) - - def create_message(self, mbox, uid, message, observer, - notify_on_disk=True): - """ - Create the passed message into this MemoryStore. - - By default we consider that any message is a new message. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the UID for the message - :type uid: int - :param message: a message to be added - :type message: MessageWrapper - :param observer: - the deferred that will fire with the UID of the message. If - notify_on_disk is True, this will happen when the message is - written to Soledad. Otherwise it will fire as soon as we've added - the message to the memory store. - :type observer: Deferred - :param notify_on_disk: - whether the `observer` deferred should wait until the message is - written to disk to be fired. - :type notify_on_disk: bool - """ - # TODO -- return a deferred - log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid)) - key = mbox, uid - - self._add_message(mbox, uid, message, notify_on_disk) - self._new.add(key) - - if observer is not None: - if notify_on_disk: - # We store this deferred so we can keep track of the pending - # operations internally. - # TODO this should fire with the UID !!! -- change that in - # the soledad store code. - self._new_deferreds[key] = observer - - else: - # Caller does not care, just fired and forgot, so we pass - # a defer that will inmediately have its callback triggered. - reactor.callFromThread(observer.callback, uid) - - def put_message(self, mbox, uid, message, notify_on_disk=True): - """ - Put an existing message. - - This will also set the dirty flag on the MemoryStore. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the UID for the message - :type uid: int - :param message: a message to be added - :type message: MessageWrapper - :param notify_on_disk: whether the deferred that is returned should - wait until the message is written to disk to - be fired. - :type notify_on_disk: bool - - :return: a Deferred. if notify_on_disk is True, will be fired - when written to the db on disk. - Otherwise will fire inmediately - :rtype: Deferred - """ - key = mbox, uid - d = defer.Deferred() - d.addCallback(lambda result: log.msg("message PUT save: %s" % result)) - - self._dirty.add(key) - self._dirty_deferreds[key] = d - self._add_message(mbox, uid, message, notify_on_disk) - return d - - def _add_message(self, mbox, uid, message, notify_on_disk=True): - """ - Helper method, called by both create_message and put_message. - See those for parameter documentation. - """ - msg_dict = message.as_dict() - - fdoc = msg_dict.get(FDOC, None) - if fdoc is not None: - fdoc_store = self._fdoc_store[mbox][uid] - fdoc_store.update(fdoc) - chash_fdoc_store = self._chash_fdoc_store - - # content-hash indexing - chash = fdoc.get(fields.CONTENT_HASH_KEY) - chash_fdoc_store[chash][mbox] = weakref.proxy( - self._fdoc_store[mbox][uid]) - - hdoc = msg_dict.get(HDOC, None) - if hdoc is not None: - chash = hdoc.get(fields.CONTENT_HASH_KEY) - hdoc_store = self._hdoc_store[chash] - hdoc_store.update(hdoc) - - cdocs = message.cdocs - for cdoc in cdocs.values(): - phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) - if not phash: - continue - cdoc_store = self._cdoc_store[phash] - cdoc_store.update(cdoc) - - # Update memory store size - # XXX this should use [mbox][uid] - # TODO --- this has to be deferred to thread, - # TODO add hdoc and cdocs sizes too - # it's slowing things down here. - # key = mbox, uid - # self._sizes[key] = size.get_size(self._fdoc_store[key]) - - def purge_fdoc_store(self, mbox): - """ - Purge the empty documents from a fdoc store. - Called during initialization of the SoledadMailbox - - :param mbox: the mailbox - :type mbox: str or unicode - """ - # XXX This is really a workaround until I find the conditions - # that are making the empty items remain there. - # This happens, for instance, after running several times - # the regression test, that issues a store deleted + expunge + select - # The items are being correclty deleted, but in succesive appends - # the empty items with previously deleted uids reappear as empty - # documents. I suspect it's a timing condition with a previously - # evaluated sequence being used after the items has been removed. - - for uid, value in self._fdoc_store[mbox].items(): - if empty(value): - del self._fdoc_store[mbox][uid] - - def get_docid_for_fdoc(self, mbox, uid): - """ - Return Soledad document id for the flags-doc for a given mbox and uid, - or None of no flags document could be found. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the message UID - :type uid: int - :rtype: unicode or None - """ - with self._fdoc_docid_lock: - doc_id = self._fdoc_id_store[mbox][uid] - - if empty(doc_id): - fdoc = self._permanent_store.get_flags_doc(mbox, uid) - if empty(fdoc) or empty(fdoc.content): - return None - doc_id = fdoc.doc_id - self._fdoc_id_store[mbox][uid] = doc_id - - return doc_id - - def get_message(self, mbox, uid, dirtystate=DirtyState.none, - flags_only=False): - """ - Get a MessageWrapper for the given mbox and uid combination. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the message UID - :type uid: int - :param dirtystate: DirtyState enum: one of `dirty`, `new` - or `none` (default) - :type dirtystate: enum - :param flags_only: whether the message should carry only a reference - to the flags document. - :type flags_only: bool - : - - :return: MessageWrapper or None - """ - # TODO -- return deferred - if dirtystate == DirtyState.dirty: - flags_only = True - - key = mbox, uid - - fdoc = self._fdoc_store[mbox][uid] - if empty(fdoc): - return None - - new, dirty = False, False - if dirtystate == DirtyState.none: - new, dirty = self._get_new_dirty_state(key) - if dirtystate == DirtyState.dirty: - new, dirty = False, True - if dirtystate == DirtyState.new: - new, dirty = True, False - - if flags_only: - return MessageWrapper(fdoc=fdoc, - new=new, dirty=dirty, - memstore=weakref.proxy(self)) - else: - chash = fdoc.get(fields.CONTENT_HASH_KEY) - hdoc = self._hdoc_store[chash] - if empty(hdoc): - # XXX this will be a deferred - hdoc = self._permanent_store.get_headers_doc(chash) - if empty(hdoc): - return None - if not empty(hdoc.content): - self._hdoc_store[chash] = hdoc.content - hdoc = hdoc.content - cdocs = None - - pmap = hdoc.get(fields.PARTS_MAP_KEY, None) - if new and pmap is not None: - # take the different cdocs for write... - cdoc_store = self._cdoc_store - cdocs_list = phash_iter(hdoc) - cdocs = dict(enumerate( - [cdoc_store[phash] for phash in cdocs_list], 1)) - - return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs, - new=new, dirty=dirty, - memstore=weakref.proxy(self)) - - def remove_message(self, mbox, uid): - """ - Remove a Message from this MemoryStore. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the message UID - :type uid: int - """ - # XXX For the moment we are only removing the flags and headers - # docs. The rest we leave there polluting your hard disk, - # until we think about a good way of deorphaning. - - # XXX implement elijah's idea of using a PUT document as a - # token to ensure consistency in the removal. - - try: - del self._fdoc_store[mbox][uid] - except KeyError: - pass - - try: - key = mbox, uid - self._new.discard(key) - self._dirty.discard(key) - if key in self._sizes: - del self._sizes[key] - self._known_uids[mbox].discard(uid) - except KeyError: - pass - except Exception as exc: - logger.error("error while removing message!") - logger.exception(exc) - try: - with self._fdoc_docid_lock: - del self._fdoc_id_store[mbox][uid] - except KeyError: - pass - except Exception as exc: - logger.error("error while removing message!") - logger.exception(exc) - - # IMessageStoreWriter - - # TODO -- I think we don't need this anymore. - # instead, we can have - def write_messages(self, store): - """ - Write the message documents in this MemoryStore to a different store. - - :param store: the IMessageStore to write to - :rtype: False if queue is not empty, None otherwise. - """ - # For now, we pass if the queue is not empty, to avoid duplicate - # queuing. - # We would better use a flag to know when we've already enqueued an - # item. - - # XXX this could return the deferred for all the enqueued operations - - if not self.producer.is_queue_empty(): - return False - - if any(map(lambda i: not empty(i), (self._new, self._dirty))): - logger.info("Writing messages to Soledad...") - - # TODO change for lock, and make the property access - # is accquired - with set_bool_flag(self, self.WRITING_FLAG): - for rflags_doc_wrapper in self.all_rdocs_iter(): - self.producer.push(rflags_doc_wrapper, - state=self.producer.STATE_DIRTY) - for msg_wrapper in self.all_new_msg_iter(): - self.producer.push(msg_wrapper, - state=self.producer.STATE_NEW) - for msg_wrapper in self.all_dirty_msg_iter(): - self.producer.push(msg_wrapper, - state=self.producer.STATE_DIRTY) - - # MemoryStore specific methods. - - def get_uids(self, mbox): - """ - Get all uids for a given mbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: list - """ - return self._fdoc_store[mbox].keys() - - def get_soledad_known_uids(self, mbox): - """ - Get all uids that soledad knows about, from the memory cache. - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: list - """ - return self._known_uids.get(mbox, []) - - # last_uid - - def get_last_uid(self, mbox): - """ - Return the highest UID for a given mbox. - It will be the highest between the highest uid in the message store for - the mailbox, and the soledad integer cache. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: int - """ - uids = self.get_uids(mbox) - last_mem_uid = uids and max(uids) or 0 - last_soledad_uid = self.get_last_soledad_uid(mbox) - return max(last_mem_uid, last_soledad_uid) - - def get_last_soledad_uid(self, mbox): - """ - Get last uid for a given mbox from the soledad integer cache. - - :param mbox: the mailbox - :type mbox: str or unicode - """ - return self._last_uid.get(mbox, 0) - - def set_last_soledad_uid(self, mbox, value): - """ - Set last uid for a given mbox in the soledad integer cache. - SoledadMailbox should prime this value during initialization. - Other methods (during message adding) SHOULD call - `increment_last_soledad_uid` instead. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int - """ - # can be long??? - # leap_assert_type(value, int) - logger.info("setting last soledad uid for %s to %s" % - (mbox, value)) - # if we already have a value here, don't do anything - with self._last_uid_lock: - if not self._last_uid.get(mbox, None): - self._last_uid[mbox] = value - - def set_known_uids(self, mbox, value): - """ - Set the value fo the known-uids set for this mbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: a sequence of integers to be added to the set. - :type value: tuple - """ - current = self._known_uids[mbox] - self._known_uids[mbox] = current.union(set(value)) - - def increment_last_soledad_uid(self, mbox): - """ - Increment by one the soledad integer cache for the last_uid for - this mbox, and fire a defer-to-thread to update the soledad value. - The caller should lock the call tho this method. - - :param mbox: the mailbox - :type mbox: str or unicode - """ - with self._last_uid_lock: - self._last_uid[mbox] += 1 - value = self._last_uid[mbox] - reactor.callInThread(self.write_last_uid, mbox, value) - return value - - def write_last_uid(self, mbox, value): - """ - Increment the soledad integer cache for the highest uid value. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int - """ - leap_assert_type(value, int) - if self._permanent_store: - self._permanent_store.write_last_uid(mbox, value) - - def load_flag_docs(self, mbox, flag_docs): - """ - Load the flag documents for the given mbox. - Used during initial flag docs prefetch. - - :param mbox: the mailbox - :type mbox: str or unicode - :param flag_docs: a dict with the content for the flag docs, indexed - by uid. - :type flag_docs: dict - """ - # We can do direct assignments cause we know this will only - # be called during initialization of the mailbox. - # TODO could hook here a sanity-check - # for duplicates - - fdoc_store = self._fdoc_store[mbox] - chash_fdoc_store = self._chash_fdoc_store - for uid in flag_docs: - rdict = ReferenciableDict(flag_docs[uid]) - fdoc_store[uid] = rdict - # populate chash dict too, to avoid fdoc duplication - chash = flag_docs[uid]["chash"] - chash_fdoc_store[chash][mbox] = weakref.proxy( - self._fdoc_store[mbox][uid]) - - def update_flags(self, mbox, uid, fdoc): - """ - Update the flag document for a given mbox and uid combination, - and set the dirty flag. - We could use put_message, but this is faster. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the uid of the message - :type uid: int - - :param fdoc: a dict with the content for the flag docs - :type fdoc: dict - """ - key = mbox, uid - self._fdoc_store[mbox][uid].update(fdoc) - self._dirty.add(key) - - def load_header_docs(self, header_docs): - """ - Load the flag documents for the given mbox. - Used during header docs prefetch, and during cache after - a read from soledad if the hdoc property in message did not - find its value in here. - - :param flag_docs: a dict with the content for the flag docs. - :type flag_docs: dict - """ - hdoc_store = self._hdoc_store - for chash in header_docs: - hdoc_store[chash] = ReferenciableDict(header_docs[chash]) - - def all_flags(self, mbox): - """ - Return a dictionary with all the flags for a given mbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: dict - """ - fdict = {} - uids = self.get_uids(mbox) - fstore = self._fdoc_store[mbox] - - for uid in uids: - try: - fdict[uid] = fstore[uid][fields.FLAGS_KEY] - except KeyError: - continue - return fdict - - def all_headers(self, mbox): - """ - Return a dictionary with all the header docs for a given mbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: dict - """ - headers_dict = {} - uids = self.get_uids(mbox) - fdoc_store = self._fdoc_store[mbox] - hdoc_store = self._hdoc_store - - for uid in uids: - try: - chash = fdoc_store[uid][fields.CONTENT_HASH_KEY] - hdoc = hdoc_store[chash] - if not empty(hdoc): - headers_dict[uid] = hdoc - except KeyError: - continue - return headers_dict - - # Counting sheeps... - - def count_new_mbox(self, mbox): - """ - Count the new messages by mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: number of new messages - :rtype: int - """ - return len([(m, uid) for m, uid in self._new if mbox == mbox]) - - # XXX used at all? - def count_new(self): - """ - Count all the new messages in the MemoryStore. - - :rtype: int - """ - return len(self._new) - - def count(self, mbox): - """ - Return the count of messages for a given mbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: number of messages - :rtype: int - """ - return len(self._fdoc_store[mbox]) - - def unseen_iter(self, mbox): - """ - Get an iterator for the message UIDs with no `seen` flag - for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: iterator through unseen message doc UIDs - :rtype: iterable - """ - fdocs = self._fdoc_store[mbox] - - return [uid for uid, value - in fdocs.items() - if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])] - - def get_cdoc_from_phash(self, phash): - """ - Return a content-document by its payload-hash. - - :param phash: the payload hash to check against - :type phash: str or unicode - :rtype: MessagePartDoc - """ - doc = self._cdoc_store.get(phash, None) - - # XXX return None for consistency? - - # XXX have to keep a mapping between phash and its linkage - # info, to know if this payload is been already saved or not. - # We will be able to get this from the linkage-docs, - # not yet implemented. - new = True - dirty = False - return MessagePartDoc( - new=new, dirty=dirty, store="mem", - part=MessagePartType.cdoc, - content=doc, - doc_id=None) - - def get_fdoc_from_chash(self, chash, mbox): - """ - Return a flags-document by its content-hash and a given mailbox. - Used during content-duplication detection while copying or adding a - message. - - :param chash: the content hash to check against - :type chash: str or unicode - :param mbox: the mailbox - :type mbox: str or unicode - - :return: MessagePartDoc. It will return None if the flags document - has empty content or it is flagged as \\Deleted. - """ - fdoc = self._chash_fdoc_store[chash][mbox] - - # a couple of special cases. - # 1. We might have a doc with empty content... - if empty(fdoc): - return None - - # 2. ...Or the message could exist, but being flagged for deletion. - # We want to create a new one in this case. - # Hmmm what if the deletion is un-done?? We would end with a - # duplicate... - if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []): - return None - - uid = fdoc[fields.UID_KEY] - key = mbox, uid - new = key in self._new - dirty = key in self._dirty - - return MessagePartDoc( - new=new, dirty=dirty, store="mem", - part=MessagePartType.fdoc, - content=fdoc, - doc_id=None) - - def iter_fdoc_keys(self): - """ - Return a generator through all the mbox, uid keys in the flags-doc - store. - """ - fdoc_store = self._fdoc_store - for mbox in fdoc_store: - for uid in fdoc_store[mbox]: - yield mbox, uid - - def all_new_msg_iter(self): - """ - Return generator that iterates through all new messages. - - :return: generator of MessageWrappers - :rtype: generator - """ - gm = self.get_message - # need to freeze, set can change during iteration - new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)] - # move content from new set to the queue - self._new_queue.update(self._new) - self._new.difference_update(self._new) - return new - - def all_dirty_msg_iter(self): - """ - Return generator that iterates through all dirty messages. - - :return: generator of MessageWrappers - :rtype: generator - """ - gm = self.get_message - # need to freeze, set can change during iteration - dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty) - for key in tuple(self._dirty)] - # move content from new and dirty sets to the queue - - self._dirty_queue.update(self._dirty) - self._dirty.difference_update(self._dirty) - return dirty - - def all_deleted_uid_iter(self, mbox): - """ - Return a list with the UIDs for all messags - with deleted flag in a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: list of integers - :rtype: list - """ - # This *needs* to return a fixed sequence. Otherwise the dictionary len - # will change during iteration, when we modify it - fdocs = self._fdoc_store[mbox] - return [uid for uid, value - in fdocs.items() - if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])] - - # new, dirty flags - - def _get_new_dirty_state(self, key): - """ - Return `new` and `dirty` flags for a given message. - - :param key: the key for the message, in the form mbox, uid - :type key: tuple - :return: tuple of bools - :rtype: tuple - """ - # TODO change indexing of sets to [mbox][key] too. - # XXX should return *first* the news, and *then* the dirty... - - # TODO should query in queues too , true? - # - return map(lambda _set: key in _set, (self._new, self._dirty)) - - def set_new_queued(self, key): - """ - Add the key value to the `new-queue` set. - - :param key: the key for the message, in the form mbox, uid - :type key: tuple - """ - self._new_queue.add(key) - - def unset_new_queued(self, key): - """ - Remove the key value from the `new-queue` set. - - :param key: the key for the message, in the form mbox, uid - :type key: tuple - """ - self._new_queue.discard(key) - deferreds = self._new_deferreds - d = deferreds.get(key, None) - if d: - # XXX use a namedtuple for passing the result - # when we check it in the other side. - d.callback('%s, ok' % str(key)) - deferreds.pop(key) - - def set_dirty_queued(self, key): - """ - Add the key value to the `dirty-queue` set. - - :param key: the key for the message, in the form mbox, uid - :type key: tuple - """ - self._dirty_queue.add(key) - - def unset_dirty_queued(self, key): - """ - Remove the key value from the `dirty-queue` set. - - :param key: the key for the message, in the form mbox, uid - :type key: tuple - """ - self._dirty_queue.discard(key) - deferreds = self._dirty_deferreds - d = deferreds.get(key, None) - if d: - # XXX use a namedtuple for passing the result - # when we check it in the other side. - d.callback('%s, ok' % str(key)) - deferreds.pop(key) - - # Recent Flags - - def set_recent_flag(self, mbox, uid): - """ - Set the `Recent` flag for a given mailbox and UID. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the message UID - :type uid: int - """ - self._rflags_dirty.add(mbox) - self._rflags_store[mbox]['set'].add(uid) - - # TODO --- nice but unused - def unset_recent_flag(self, mbox, uid): - """ - Unset the `Recent` flag for a given mailbox and UID. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the message UID - :type uid: int - """ - self._rflags_store[mbox]['set'].discard(uid) - - def set_recent_flags(self, mbox, value): - """ - Set the value for the set of the recent flags. - Used from the property in the MessageCollection. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: a sequence of flags to set - :type value: sequence - """ - self._rflags_dirty.add(mbox) - self._rflags_store[mbox]['set'] = set(value) - - def load_recent_flags(self, mbox, flags_doc): - """ - Load the passed flags document in the recent flags store, for a given - mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :param flags_doc: A dictionary containing the `doc_id` of the Soledad - flags-document for this mailbox, and the `set` - of uids marked with that flag. - """ - self._rflags_store[mbox] = flags_doc - - def get_recent_flags(self, mbox): - """ - Return the set of UIDs with the `Recent` flag for this mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: set, or None - """ - rflag_for_mbox = self._rflags_store.get(mbox, None) - if not rflag_for_mbox: - return None - return self._rflags_store[mbox]['set'] - - # XXX -- remove - def all_rdocs_iter(self): - """ - Return an iterator through all in-memory recent flag dicts, wrapped - under a RecentFlagsDoc namedtuple. - Used for saving to disk. - - :return: a generator of RecentFlagDoc - :rtype: generator - """ - # XXX use enums - DOC_ID = "doc_id" - SET = "set" - - rflags_store = self._rflags_store - - def get_rdoc(mbox, rdict): - mbox_rflag_set = rdict[SET] - recent_set = copy(mbox_rflag_set) - # zero it! - mbox_rflag_set.difference_update(mbox_rflag_set) - return RecentFlagsDoc( - doc_id=rflags_store[mbox][DOC_ID], - content={ - fields.TYPE_KEY: fields.TYPE_RECENT_VAL, - fields.MBOX_KEY: mbox, - fields.RECENTFLAGS_KEY: list(recent_set) - }) - - return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() - if not empty(rdict[SET])) - - # Methods that mirror the IMailbox interface - - def remove_all_deleted(self, mbox): - """ - Remove all messages flagged \\Deleted from this Memory Store only. - Called from `expunge` - - :param mbox: the mailbox - :type mbox: str or unicode - :return: a list of UIDs - :rtype: list - """ - mem_deleted = self.all_deleted_uid_iter(mbox) - for uid in mem_deleted: - self.remove_message(mbox, uid) - return mem_deleted - - # TODO -- remove - def stop_and_flush(self): - """ - Stop the write loop and trigger a write to the producer. - """ - self._stop_write_loop() - if self._permanent_store is not None: - # XXX we should check if we did get a True value on this - # operation. If we got False we should retry! (queue was not empty) - self.write_messages(self._permanent_store) - self.producer.flush() - - def expunge(self, mbox, observer): - """ - Remove all messages flagged \\Deleted, from the Memory Store - and from the permanent store also. - - It first queues up a last write, and wait for the deferreds to be done - before continuing. - - :param mbox: the mailbox - :type mbox: str or unicode - :param observer: a deferred that will be fired when expunge is done - :type observer: Deferred - """ - soledad_store = self._permanent_store - if soledad_store is None: - # just-in memory store, easy then. - self._delete_from_memory(mbox, observer) - return - - # We have a soledad storage. - try: - # Stop and trigger last write - self.stop_and_flush() - # Wait on the writebacks to finish - - # XXX what if pending deferreds is empty? - pending_deferreds = (self._new_deferreds.get(mbox, []) + - self._dirty_deferreds.get(mbox, [])) - d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) - d1.addCallback( - self._delete_from_soledad_and_memory, mbox, observer) - except Exception as exc: - logger.exception(exc) - - def _delete_from_memory(self, mbox, observer): - """ - Remove all messages marked as deleted from soledad and memory. - - :param mbox: the mailbox - :type mbox: str or unicode - :param observer: a deferred that will be fired when expunge is done - :type observer: Deferred - """ - mem_deleted = self.remove_all_deleted(mbox) - # TODO return a DeferredList - observer.callback(mem_deleted) - - def _delete_from_soledad_and_memory(self, result, mbox, observer): - """ - Remove all messages marked as deleted from soledad and memory. - - :param result: ignored. the result of the deferredList that triggers - this as a callback from `expunge`. - :param mbox: the mailbox - :type mbox: str or unicode - :param observer: a deferred that will be fired when expunge is done - :type observer: Deferred - """ - all_deleted = [] - soledad_store = self._permanent_store - - try: - # 1. Delete all messages marked as deleted in soledad. - logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,)) - sol_deleted = soledad_store.remove_all_deleted(mbox) - - try: - self._known_uids[mbox].difference_update(set(sol_deleted)) - except Exception as exc: - logger.exception(exc) - - # 2. Delete all messages marked as deleted in memory. - logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,)) - mem_deleted = self.remove_all_deleted(mbox) - - all_deleted = set(mem_deleted).union(set(sol_deleted)) - logger.debug("deleted %r" % all_deleted) - except Exception as exc: - logger.exception(exc) - finally: - self._start_write_loop() - - observer.callback(all_deleted) - - # Mailbox documents and attributes - - # This could be also be cached in memstore, but proxying directly - # to soledad since it's not too performance-critical. - - def get_mbox_doc(self, mbox): - """ - Return the soledad document for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: SoledadDocument or None. - """ - if self.permanent_store is not None: - return self.permanent_store.get_mbox_document(mbox) - else: - return None - - def get_mbox_closed(self, mbox): - """ - Return the closed attribute for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: bool - """ - if self.permanent_store is not None: - return self.permanent_store.get_mbox_closed(mbox) - else: - return self._mbox_closed[mbox] - - def set_mbox_closed(self, mbox, closed): - """ - Set the closed attribute for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - """ - if self.permanent_store is not None: - self.permanent_store.set_mbox_closed(mbox, closed) - else: - self._mbox_closed[mbox] = closed - - def get_mbox_flags(self, mbox): - """ - Get the flags for a given mbox. - :rtype: list - """ - return sorted(self._mbox_flags[mbox]) - - def set_mbox_flags(self, mbox, flags): - """ - Set the mbox flags - """ - self._mbox_flags[mbox] = set(flags) - # TODO - # This should write to the permanent store!!! - - # Rename flag-documents - - def rename_fdocs_mailbox(self, old_mbox, new_mbox): - """ - Change the mailbox name for all flag documents in a given mailbox. - Used from account.rename - - :param old_mbox: name for the old mbox - :type old_mbox: str or unicode - :param new_mbox: name for the new mbox - :type new_mbox: str or unicode - """ - fs = self._fdoc_store - keys = fs[old_mbox].keys() - for k in keys: - fdoc = fs[old_mbox][k] - fdoc['mbox'] = new_mbox - fs[new_mbox][k] = fdoc - fs[old_mbox].pop(k) - self._dirty.add((new_mbox, k)) - - # Dump-to-disk controls. - - @property - def is_writing(self): - """ - Property that returns whether the store is currently writing its - internal state to a permanent storage. - - Used to evaluate whether the CHECK command can inform that the field - is clear to proceed, or waiting for the write operations to complete - is needed instead. - - :rtype: bool - """ - # FIXME this should return a deferred !!! - # TODO this should be moved to soledadStore instead - # (all pending deferreds) - return getattr(self, self.WRITING_FLAG) - - @property - def permanent_store(self): - return self._permanent_store - - # Memory management. - - def get_size(self): - """ - Return the size of the internal storage. - Use for calculating the limit beyond which we should flush the store. - - :rtype: int - """ - return reduce(lambda x, y: x + y, self._sizes, 0) diff --git a/src/leap/mail/imap/messageparts.py b/src/leap/mail/imap/messageparts.py deleted file mode 100644 index fb1d75a..0000000 --- a/src/leap/mail/imap/messageparts.py +++ /dev/null @@ -1,586 +0,0 @@ -# messageparts.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -MessagePart implementation. Used from LeapMessage. -""" -import logging -import StringIO -import weakref - -from collections import namedtuple - -from enum import Enum -from zope.interface import implements -from twisted.mail import imap4 - -from leap.common.decorators import memoized_method -from leap.common.mail import get_email_charset -from leap.mail.imap import interfaces -from leap.mail.imap.fields import fields -from leap.mail.utils import empty, first, find_charset - -MessagePartType = Enum("MessagePartType", "hdoc fdoc cdoc cdocs docs_id") - - -logger = logging.getLogger(__name__) - - -""" -A MessagePartDoc is a light wrapper around the dictionary-like -data that we pass along for message parts. It can be used almost everywhere -that you would expect a SoledadDocument, since it has a dict under the -`content` attribute. - -We also keep some metadata on it, relative in part to the message as a whole, -and sometimes to a part in particular only. - -* `new` indicates that the document has just been created. SoledadStore - should just create a new doc for all the related message parts. -* `store` indicates the type of store a given MessagePartDoc lives in. - We currently use this to indicate that the document comes from memeory, - but we should probably get rid of it as soon as we extend the use of the - SoledadStore interface along LeapMessage, MessageCollection and Mailbox. -* `part` is one of the MessagePartType enums. - -* `dirty` indicates that, while we already have the document in Soledad, - we have modified its state in memory, so we need to put_doc instead while - dumping the MemoryStore contents. - `dirty` attribute would only apply to flags-docs and linkage-docs. -* `doc_id` is the identifier for the document in the u1db database, if any. - -""" - -MessagePartDoc = namedtuple( - 'MessagePartDoc', - ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) - -""" -A RecentFlagsDoc is used to send the recent-flags document payload to the -SoledadWriter during dumps. -""" -RecentFlagsDoc = namedtuple( - 'RecentFlagsDoc', - ['content', 'doc_id']) - - -class ReferenciableDict(dict): - """ - A dict that can be weak-referenced. - - Some builtin objects are not weak-referenciable unless - subclassed. So we do. - - Used to return pointers to the items in the MemoryStore. - """ - - -class MessageWrapper(object): - """ - A simple nested dictionary container around the different message subparts. - """ - implements(interfaces.IMessageContainer) - - FDOC = "fdoc" - HDOC = "hdoc" - CDOCS = "cdocs" - DOCS_ID = "docs_id" - - # Using slots to limit some the memory use, - # Add your attribute here. - - __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] - - def __init__(self, fdoc=None, hdoc=None, cdocs=None, - from_dict=None, memstore=None, - new=True, dirty=False, docs_id={}): - """ - Initialize a MessageWrapper. - """ - # TODO add optional reference to original message in the incoming - self._dict = {} - self.memstore = memstore - - self._new = new - self._dirty = dirty - - self._storetype = "mem" - - if from_dict is not None: - self.from_dict(from_dict) - else: - if fdoc is not None: - self._dict[self.FDOC] = ReferenciableDict(fdoc) - if hdoc is not None: - self._dict[self.HDOC] = ReferenciableDict(hdoc) - if cdocs is not None: - self._dict[self.CDOCS] = ReferenciableDict(cdocs) - - # This will keep references to the doc_ids to be able to put - # messages to soledad. It will be populated during the walk() to avoid - # the overhead of reading from the db. - - # XXX it really *only* make sense for the FDOC, the other parts - # should not be "dirty", just new...!!! - self._dict[self.DOCS_ID] = docs_id - - # properties - - # TODO Could refactor new and dirty properties together. - - def _get_new(self): - """ - Get the value for the `new` flag. - - :rtype: bool - """ - return self._new - - def _set_new(self, value=False): - """ - Set the value for the `new` flag, and propagate it - to the memory store if any. - - :param value: the value to set - :type value: bool - """ - self._new = value - if self.memstore: - mbox = self.fdoc.content.get('mbox', None) - uid = self.fdoc.content.get('uid', None) - if not mbox or not uid: - logger.warning("Malformed fdoc") - return - key = mbox, uid - fun = [self.memstore.unset_new_queued, - self.memstore.set_new_queued][int(value)] - fun(key) - else: - logger.warning("Could not find a memstore referenced from this " - "MessageWrapper. The value for new will not be " - "propagated") - - new = property(_get_new, _set_new, - doc="The `new` flag for this MessageWrapper") - - def _get_dirty(self): - """ - Get the value for the `dirty` flag. - - :rtype: bool - """ - return self._dirty - - def _set_dirty(self, value=True): - """ - Set the value for the `dirty` flag, and propagate it - to the memory store if any. - - :param value: the value to set - :type value: bool - """ - self._dirty = value - if self.memstore: - mbox = self.fdoc.content.get('mbox', None) - uid = self.fdoc.content.get('uid', None) - if not mbox or not uid: - logger.warning("Malformed fdoc") - return - key = mbox, uid - fun = [self.memstore.unset_dirty_queued, - self.memstore.set_dirty_queued][int(value)] - fun(key) - else: - logger.warning("Could not find a memstore referenced from this " - "MessageWrapper. The value for new will not be " - "propagated") - - dirty = property(_get_dirty, _set_dirty) - - # IMessageContainer - - @property - def fdoc(self): - """ - Return a MessagePartDoc wrapping around a weak reference to - the flags-document in this MemoryStore, if any. - - :rtype: MessagePartDoc - """ - _fdoc = self._dict.get(self.FDOC, None) - if _fdoc: - content_ref = weakref.proxy(_fdoc) - else: - logger.warning("NO FDOC!!!") - content_ref = {} - - return MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.fdoc, - content=content_ref, - doc_id=self._dict[self.DOCS_ID].get( - self.FDOC, None)) - - @property - def hdoc(self): - """ - Return a MessagePartDoc wrapping around a weak reference to - the headers-document in this MemoryStore, if any. - - :rtype: MessagePartDoc - """ - _hdoc = self._dict.get(self.HDOC, None) - if _hdoc: - content_ref = weakref.proxy(_hdoc) - else: - content_ref = {} - return MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.hdoc, - content=content_ref, - doc_id=self._dict[self.DOCS_ID].get( - self.HDOC, None)) - - @property - def cdocs(self): - """ - Return a weak reference to a zero-indexed dict containing - the content-documents, or an empty dict if none found. - If you want access to the MessagePartDoc for the individual - parts, use the generator returned by `walk` instead. - - :rtype: dict - """ - _cdocs = self._dict.get(self.CDOCS, None) - if _cdocs: - return weakref.proxy(_cdocs) - else: - return {} - - def walk(self): - """ - Generator that iterates through all the parts, returning - MessagePartDoc. Used for writing to SoledadStore. - - :rtype: generator - """ - if self._dirty: - try: - mbox = self.fdoc.content[fields.MBOX_KEY] - uid = self.fdoc.content[fields.UID_KEY] - docid_dict = self._dict[self.DOCS_ID] - docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( - mbox, uid) - except Exception as exc: - logger.debug("Error while walking message...") - logger.exception(exc) - - if not empty(self.fdoc.content) and 'uid' in self.fdoc.content: - yield self.fdoc - if not empty(self.hdoc.content): - yield self.hdoc - for cdoc in self.cdocs.values(): - if not empty(cdoc): - content_ref = weakref.proxy(cdoc) - yield MessagePartDoc(new=self.new, dirty=self.dirty, - store=self._storetype, - part=MessagePartType.cdoc, - content=content_ref, - doc_id=None) - - # i/o - - def as_dict(self): - """ - Return a dict representation of the parts contained. - - :rtype: dict - """ - return self._dict - - def from_dict(self, msg_dict): - """ - Populate MessageWrapper parts from a dictionary. - It expects the same format that we use in a - MessageWrapper. - - - :param msg_dict: a dictionary containing the parts to populate - the MessageWrapper from - :type msg_dict: dict - """ - fdoc, hdoc, cdocs = map( - lambda part: msg_dict.get(part, None), - [self.FDOC, self.HDOC, self.CDOCS]) - - for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), - (self.CDOCS, cdocs)): - self._dict[t] = ReferenciableDict(doc) if doc else None - - -class MessagePart(object): - """ - IMessagePart implementor, to be passed to several methods - of the IMAP4Server. - It takes a subpart message and is able to find - the inner parts. - - See the interface documentation. - """ - - implements(imap4.IMessagePart) - - def __init__(self, soledad, part_map): - """ - Initializes the MessagePart. - - :param soledad: Soledad instance. - :type soledad: Soledad - :param part_map: a dictionary containing the parts map for this - message - :type part_map: dict - """ - # TODO - # It would be good to pass the uid/mailbox also - # for references while debugging. - - # We have a problem on bulk moves, and is - # that when the fetch on the new mailbox is done - # the parts maybe are not complete. - # So we should be able to fail with empty - # docs until we solve that. The ideal would be - # to gather the results of the deferred operations - # to signal the operation is complete. - #leap_assert(part_map, "part map dict cannot be null") - - self._soledad = soledad - self._pmap = part_map - - def getSize(self): - """ - Return the total size, in octets, of this message part. - - :return: size of the message, in octets - :rtype: int - """ - if empty(self._pmap): - return 0 - size = self._pmap.get('size', None) - if size is None: - logger.error("Message part cannot find size in the partmap") - size = 0 - return size - - def getBodyFile(self): - """ - Retrieve a file object containing only the body of this message. - - :return: file-like object opened for reading - :rtype: StringIO - """ - fd = StringIO.StringIO() - if not empty(self._pmap): - multi = self._pmap.get('multi') - if not multi: - phash = self._pmap.get("phash", None) - else: - pmap = self._pmap.get('part_map') - first_part = pmap.get('1', None) - if not empty(first_part): - phash = first_part['phash'] - else: - phash = None - - if phash is None: - logger.warning("Could not find phash for this subpart!") - payload = "" - else: - payload = self._get_payload_from_document_memoized(phash) - if empty(payload): - payload = self._get_payload_from_document(phash) - - else: - logger.warning("Message with no part_map!") - payload = "" - - if payload: - content_type = self._get_ctype_from_document(phash) - charset = find_charset(content_type) - if charset is None: - charset = self._get_charset(payload) - try: - if isinstance(payload, unicode): - payload = payload.encode(charset) - except UnicodeError as exc: - logger.error( - "Unicode error, using 'replace'. {0!r}".format(exc)) - payload = payload.encode(charset, 'replace') - - fd.write(payload) - fd.seek(0) - return fd - - # TODO should memory-bound this memoize!!! - @memoized_method - def _get_payload_from_document_memoized(self, phash): - """ - Memoized method call around the regular method, to be able - to call the non-memoized method in case we got a None. - - :param phash: the payload hash to retrieve by. - :type phash: str or unicode - :rtype: str or unicode or None - """ - return self._get_payload_from_document(phash) - - def _get_payload_from_document(self, phash): - """ - Return the message payload from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: str or unicode - :rtype: str or unicode or None - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if cdoc is None: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - payload = "" - else: - payload = cdoc.content.get(fields.RAW_KEY, "") - return payload - - # TODO should memory-bound this memoize!!! - @memoized_method - def _get_ctype_from_document(self, phash): - """ - Reeturn the content-type from the content document. - - :param phash: the payload hash to retrieve by. - :type phash: str or unicode - :rtype: str or unicode - """ - cdocs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - - cdoc = first(cdocs) - if not cdoc: - logger.warning( - "Could not find the content doc " - "for phash %s" % (phash,)) - ctype = cdoc.content.get('ctype', "") - return ctype - - @memoized_method - def _get_charset(self, stuff): - # TODO put in a common class with LeapMessage - """ - Gets (guesses?) the charset of a payload. - - :param stuff: the stuff to guess about. - :type stuff: str or unicode - :return: charset - :rtype: unicode - """ - # XXX existential doubt 2. shouldn't we make the scope - # of the decorator somewhat more persistent? - # ah! yes! and put memory bounds. - return get_email_charset(stuff) - - def getHeaders(self, negate, *names): - """ - Retrieve a group of message headers. - - :param names: The names of the headers to retrieve or omit. - :type names: tuple of str - - :param negate: If True, indicates that the headers listed in names - should be omitted from the return value, rather - than included. - :type negate: bool - - :return: A mapping of header field names to header field values - :rtype: dict - """ - # XXX refactor together with MessagePart method - if not self._pmap: - logger.warning("No pmap in Subpart!") - return {} - headers = dict(self._pmap.get("headers", [])) - - names = map(lambda s: s.upper(), names) - if negate: - cond = lambda key: key.upper() not in names - else: - cond = lambda key: key.upper() in names - - # default to most likely standard - charset = find_charset(headers, "utf-8") - headers2 = dict() - for key, value in headers.items(): - # twisted imap server expects *some* headers to be lowercase - # We could use a CaseInsensitiveDict here... - if key.lower() == "content-type": - key = key.lower() - - if not isinstance(key, str): - key = key.encode(charset, 'replace') - if not isinstance(value, str): - value = value.encode(charset, 'replace') - - # filter original dict by negate-condition - if cond(key): - headers2[key] = value - return headers2 - - def isMultipart(self): - """ - Return True if this message is multipart. - """ - if empty(self._pmap): - logger.warning("Could not get part map!") - return False - multi = self._pmap.get("multi", False) - return multi - - def getSubPart(self, part): - """ - Retrieve a MIME submessage - - :type part: C{int} - :param part: The number of the part to retrieve, indexed from 0. - :raise IndexError: Raised if the specified part does not exist. - :raise TypeError: Raised if this message is not multipart. - :rtype: Any object implementing C{IMessagePart}. - :return: The specified sub-part. - """ - if not self.isMultipart(): - raise TypeError - - sub_pmap = self._pmap.get("part_map", {}) - try: - part_map = sub_pmap[str(part + 1)] - except KeyError: - logger.debug("getSubpart for %s: KeyError" % (part,)) - raise IndexError - - # XXX check for validity - return MessagePart(self._soledad, part_map) diff --git a/src/leap/mail/imap/messages.py b/src/leap/mail/imap/messages.py index d47c8eb..7e0f973 100644 --- a/src/leap/mail/imap/messages.py +++ b/src/leap/mail/imap/messages.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# messages.py -# Copyright (C) 2013, 2014 LEAP +# imap/messages.py +# Copyright (C) 2013-2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,85 +15,41 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ -LeapMessage and MessageCollection. +IMAPMessage and IMAPMessageCollection. """ -import copy import logging -import threading -import StringIO - -from collections import defaultdict -from functools import partial - +# import StringIO from twisted.mail import imap4 -from twisted.internet import reactor from zope.interface import implements -from zope.proxy import sameProxiedObjects from leap.common.check import leap_assert, leap_assert_type from leap.common.decorators import memoized_method from leap.common.mail import get_email_charset -from leap.mail.adaptors import soledad_indexes as indexes -from leap.mail.constants import INBOX_NAME -from leap.mail.utils import find_charset, empty -from leap.mail.imap.index import IndexedDB -from leap.mail.imap.fields import fields, WithMsgFields -from leap.mail.imap.messageparts import MessagePart, MessagePartDoc -from leap.mail.imap.parser import MBoxParser - -logger = logging.getLogger(__name__) - -# TODO ------------------------------------------------------------ -# [ ] Add ref to incoming message during add_msg -# [ ] Add linked-from info. -# * Need a new type of documents: linkage info. -# * HDOCS are linked from FDOCs (ref to chash) -# * CDOCS are linked from HDOCS (ref to chash) +from leap.mail.utils import find_charset -# [ ] Delete incoming mail only after successful write! -# [ ] Remove UID from syncable db. Store only those indexes locally. +from leap.mail.imap.messageparts import MessagePart +# from leap.mail.imap.messagepargs import MessagePartDoc +logger = logging.getLogger(__name__) -def try_unique_query(curried): - """ - Try to execute a query that is expected to have a - single outcome, and log a warning if more than one document found. - - :param curried: a curried function - :type curried: callable - """ - # XXX FIXME ---------- convert to deferreds - leap_assert(callable(curried), "A callable is expected") - try: - query = curried() - if query: - if len(query) > 1: - # TODO we could take action, like trigger a background - # process to kill dupes. - name = getattr(curried, 'expected', 'doc') - logger.warning( - "More than one %s found for this mbox, " - "we got a duplicate!!" % (name,)) - return query.pop() - else: - return None - except Exception as exc: - logger.exception("Unhandled error %r" % exc) - +# TODO ------------------------------------------------------------ -# FIXME remove-me -#fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) +# [ ] Add ref to incoming message during add_msg. +# [ ] Delete incoming mail only after successful write. -class IMAPMessage(fields, MBoxParser): +class IMAPMessage(object): """ The main representation of a message. """ implements(imap4.IMessage) - def __init__(self, soledad, uid, mbox): + # TODO ---- see what should we pass here instead + # where's UID added to the message? + # def __init__(self, soledad, uid, mbox): + def __init__(self, message, collection): """ Initializes a LeapMessage. @@ -103,81 +59,14 @@ class IMAPMessage(fields, MBoxParser): :type uid: int or basestring :param mbox: the mbox this message belongs to :type mbox: str or unicode - :param collection: a reference to the parent collection object - :type collection: MessageCollection - :param container: a IMessageContainer implementor instance - :type container: IMessageContainer """ - self._soledad = soledad - self._uid = int(uid) if uid is not None else None - self._mbox = self._parse_mailbox_name(mbox) - - self.__chash = None - self.__bdoc = None - - # TODO collection and container are deprecated. - - # TODO move to adaptor - - #@property - #def fdoc(self): - #""" - #An accessor to the flags document. - #""" - #if all(map(bool, (self._uid, self._mbox))): - #fdoc = None - #if self._container is not None: - #fdoc = self._container.fdoc - #if not fdoc: - #fdoc = self._get_flags_doc() - #if fdoc: - #fdoc_content = fdoc.content - #self.__chash = fdoc_content.get( - #fields.CONTENT_HASH_KEY, None) - #return fdoc -# - #@property - #def hdoc(self): - #""" - #An accessor to the headers document. - #""" - #container = self._container - #if container is not None: - #hdoc = self._container.hdoc - #if hdoc and not empty(hdoc.content): - #return hdoc - #hdoc = self._get_headers_doc() -# - #if container and not empty(hdoc.content): - # mem-cache it - #hdoc_content = hdoc.content - #chash = hdoc_content.get(fields.CONTENT_HASH_KEY) - #hdocs = {chash: hdoc_content} - #container.memstore.load_header_docs(hdocs) - #return hdoc -# - #@property - #def chash(self): - #""" - #An accessor to the content hash for this message. - #""" - #if not self.fdoc: - #return None - #if not self.__chash and self.fdoc: - #self.__chash = self.fdoc.content.get( - #fields.CONTENT_HASH_KEY, None) - #return self.__chash - - #@property - #def bdoc(self): - #""" - #An accessor to the body document. - #""" - #if not self.hdoc: - #return None - #if not self.__bdoc: - #self.__bdoc = self._get_body_doc() - #return self.__bdoc + #self._uid = int(uid) if uid is not None else None + #self._mbox = normalize_mailbox(mbox) + + self.message = message + + # TODO maybe not needed, see setFlags below + self.collection = collection # IMessage implementation @@ -188,12 +77,7 @@ class IMAPMessage(fields, MBoxParser): :return: uid for this message :rtype: int """ - # TODO ----> return lookup in local sqlcipher table. - return self._uid - - # -------------------------------------------------------------- - # TODO -- from here on, all the methods should be proxied to the - # instance of leap.mail.mail.Message + return self.message.get_uid() def getFlags(self): """ @@ -202,24 +86,14 @@ class IMAPMessage(fields, MBoxParser): :return: The flags, represented as strings :rtype: tuple """ - uid = self._uid - - flags = set([]) - fdoc = self.fdoc - if fdoc: - flags = set(fdoc.content.get(self.FLAGS_KEY, None)) + return self.message.get_flags() - msgcol = self._collection + # setFlags not in the interface spec but we use it with store command. - # We treat the recent flag specially: gotten from - # a mailbox-level document. - if msgcol and uid in msgcol.recent_flags: - flags.add(fields.RECENT_FLAG) - if flags: - flags = map(str, flags) - return tuple(flags) + # XXX if we can move it to a collection method, we don't need to pass + # collection to the IMAPMessage - # setFlags not in the interface spec but we use it with store command. + # lookup method? IMAPMailbox? def setFlags(self, flags, mode): """ @@ -231,32 +105,11 @@ class IMAPMessage(fields, MBoxParser): :type mode: int """ leap_assert(isinstance(flags, tuple), "flags need to be a tuple") - mbox, uid = self._mbox, self._uid - - APPEND = 1 - REMOVE = -1 - SET = 0 - - doc = self.fdoc - if not doc: - logger.warning( - "Could not find FDOC for %r:%s while setting flags!" % - (mbox, uid)) - return - current = doc.content[self.FLAGS_KEY] - if mode == APPEND: - newflags = tuple(set(tuple(current) + flags)) - elif mode == REMOVE: - newflags = tuple(set(current).difference(set(flags))) - elif mode == SET: - newflags = flags - new_fdoc = { - self.FLAGS_KEY: newflags, - self.SEEN_KEY: self.SEEN_FLAG in newflags, - self.DEL_KEY: self.DELETED_FLAG in newflags} - self._collection.memstore.update_flags(mbox, uid, new_fdoc) - - return map(str, newflags) + # XXX + # return new flags + # map to str + #self.message.set_flags(flags, mode) + self.collection.update_flags(self.message, flags, mode) def getInternalDate(self): """ @@ -273,8 +126,7 @@ class IMAPMessage(fields, MBoxParser): :return: An RFC822-formatted date string. :rtype: str """ - date = self.hdoc.content.get(fields.DATE_KEY, '') - return date + return self.message.get_internal_date() # # IMessagePart @@ -290,42 +142,40 @@ class IMAPMessage(fields, MBoxParser): :return: file-like object opened for reading :rtype: StringIO """ - def write_fd(body): - fd.write(body) - fd.seek(0) - return fd - + #def write_fd(body): + #fd.write(body) + #fd.seek(0) + #return fd +# # TODO refactor with getBodyFile in MessagePart - - fd = StringIO.StringIO() - - if self.bdoc is not None: - bdoc_content = self.bdoc.content - if empty(bdoc_content): - logger.warning("No BDOC content found for message!!!") - return write_fd("") - - body = bdoc_content.get(self.RAW_KEY, "") - content_type = bdoc_content.get('content-type', "") - charset = find_charset(content_type) - if charset is None: - charset = self._get_charset(body) - try: - if isinstance(body, unicode): - body = body.encode(charset) - except UnicodeError as exc: - logger.error( - "Unicode error, using 'replace'. {0!r}".format(exc)) - logger.debug("Attempted to encode with: %s" % charset) - body = body.encode(charset, 'replace') - finally: - return write_fd(body) - - # We are still returning funky characters from here. - else: - logger.warning("No BDOC found for message.") - return write_fd("") - +# + #fd = StringIO.StringIO() +# + #if self.bdoc is not None: + #bdoc_content = self.bdoc.content + #if empty(bdoc_content): + #logger.warning("No BDOC content found for message!!!") + #return write_fd("") +# + #body = bdoc_content.get(self.RAW_KEY, "") + #content_type = bdoc_content.get('content-type', "") + #charset = find_charset(content_type) + #if charset is None: + #charset = self._get_charset(body) + #try: + #if isinstance(body, unicode): + #body = body.encode(charset) + #except UnicodeError as exc: + #logger.error( + #"Unicode error, using 'replace'. {0!r}".format(exc)) + #logger.debug("Attempted to encode with: %s" % charset) + #body = body.encode(charset, 'replace') + #finally: + #return write_fd(body) + + return self.message.get_body_file() + + # TODO move to mail.mail @memoized_method def _get_charset(self, stuff): """ @@ -337,7 +187,7 @@ class IMAPMessage(fields, MBoxParser): """ # XXX shouldn't we make the scope # of the decorator somewhat more persistent? - # ah! yes! and put memory bounds. + # and put memory bounds. return get_email_charset(stuff) def getSize(self): @@ -347,17 +197,11 @@ class IMAPMessage(fields, MBoxParser): :return: size of the message, in octets :rtype: int """ - size = None - if self.fdoc is not None: - fdoc_content = self.fdoc.content - size = fdoc_content.get(self.SIZE_KEY, False) - else: - logger.warning("No FLAGS doc for %s:%s" % (self._mbox, - self._uid)) - #if not size: - # XXX fallback, should remove when all migrated. - #size = self.getBodyFile().len - return size + #size = None + #fdoc_content = self.fdoc.content + #size = fdoc_content.get(self.SIZE_KEY, False) + #return size + return self.message.get_size() def getHeaders(self, negate, *names): """ @@ -374,10 +218,10 @@ class IMAPMessage(fields, MBoxParser): :return: A mapping of header field names to header field values :rtype: dict """ - # TODO split in smaller methods + # TODO split in smaller methods -- format_headers()? # XXX refactor together with MessagePart method - headers = self._get_headers() + headers = self.message.get_headers() # XXX keep this in the imap imessage implementation, # because the server impl. expects content-type to be present. @@ -417,34 +261,15 @@ class IMAPMessage(fields, MBoxParser): headers2[key] = value return headers2 - def _get_headers(self): - """ - Return the headers dict for this message. - """ - if self.hdoc is not None: - hdoc_content = self.hdoc.content - headers = hdoc_content.get(self.HEADERS_KEY, {}) - return headers - - else: - logger.warning( - "No HEADERS doc for msg %s:%s" % ( - self._mbox, - self._uid)) - def isMultipart(self): """ Return True if this message is multipart. """ - if self.fdoc: - fdoc_content = self.fdoc.content - is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) - return is_multipart - else: - logger.warning( - "No FLAGS doc for msg %s:%s" % ( - self._mbox, - self._uid)) + #fdoc_content = self.fdoc.content + #is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) + #return is_multipart + + return self.message.fdoc.is_multi def getSubPart(self, part): """ @@ -463,12 +288,16 @@ class IMAPMessage(fields, MBoxParser): pmap_dict = self._get_part_from_parts_map(part + 1) except KeyError: raise IndexError + + # TODO move access to adaptor ---- return MessagePart(self._soledad, pmap_dict) # # accessors # + # FIXME + # -- move to wrapper/adaptor def _get_part_from_parts_map(self, part): """ Get a part map from the headers doc @@ -476,100 +305,44 @@ class IMAPMessage(fields, MBoxParser): :raises: KeyError if key does not exist :rtype: dict """ - if not self.hdoc: - logger.warning("Tried to get part but no HDOC found!") - return None - - hdoc_content = self.hdoc.content - pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) + raise NotImplementedError() + #hdoc_content = self.hdoc.content + #pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) +# # remember, lads, soledad is using strings in its keys, # not integers! - return pmap[str(part)] + #return pmap[str(part)] - # XXX moved to memory store - # move the rest too. ------------------------------------------ - def _get_flags_doc(self): - """ - Return the document that keeps the flags for this - message. - """ - def get_first_if_any(docs): - result = first(docs) - return result if result else {} - - d = self._soledad.get_from_index( - fields.TYPE_MBOX_UID_IDX, - fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) - d.addCallback(get_first_if_any) - return d - - # TODO move to soledadstore instead of accessing soledad directly - def _get_headers_doc(self): - """ - Return the document that keeps the headers for this - message. - """ - d = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_HEADERS_VAL, str(self.chash)) - d.addCallback(lambda docs: first(docs)) - return d - - # TODO move to soledadstore instead of accessing soledad directly + # TODO move to wrapper/adaptor def _get_body_doc(self): """ Return the document that keeps the body for this message. """ - # XXX FIXME --- this might need a maybedeferred - # on the receiving side... - hdoc_content = self.hdoc.content - body_phash = hdoc_content.get( - fields.BODY_KEY, None) - if not body_phash: - logger.warning("No body phash for this document!") - return None - - # XXX get from memstore too... - # if memstore: memstore.get_phrash - # memstore should keep a dict with weakrefs to the - # phash doc... - - if self._container is not None: - bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) - if not empty(bdoc) and not empty(bdoc.content): - return bdoc - + # FIXME + # -- just get the body and retrieve the cdoc P- + #hdoc_content = self.hdoc.content + #body_phash = hdoc_content.get( + #fields.BODY_KEY, None) + #if not body_phash: + #logger.warning("No body phash for this document!") + #return None +# + #if self._container is not None: + #bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) + #if not empty(bdoc) and not empty(bdoc.content): + #return bdoc +# # no memstore, or no body doc found there - d = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(body_phash)) - d.addCallback(lambda docs: first(docs)) - return d - - def __getitem__(self, key): - """ - Return an item from the content of the flags document, - for convenience. - - :param key: The key - :type key: str - - :return: The content value indexed by C{key} or None - :rtype: str - """ - return self.fdoc.content.get(key, None) - - def does_exist(self): - """ - Return True if there is actually a flags document for this - UID and mbox. - """ - return not empty(self.fdoc) + #d = self._soledad.get_from_index( + #fields.TYPE_P_HASH_IDX, + #fields.TYPE_CONTENT_VAL, str(body_phash)) + #d.addCallback(lambda docs: first(docs)) + #return d -class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): +class IMAPMessageCollection(object): """ A collection of messages, surprisingly. @@ -578,9 +351,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): database. """ - # XXX this should be able to produce a MessageSet methinks - # could validate these kinds of objects turning them - # into a template for the class. + messageklass = IMAPMessage + + # TODO + # [ ] Add RECENT flags docs to mailbox-doc attributes (list-of-uids) + # [ ] move Query for all the headers documents to Collection + + # TODO this should be able to produce a MessageSet methinks + # TODO --- reimplement, review and prune documentation below. + FLAGS_DOC = "FLAGS" HEADERS_DOC = "HEADERS" CONTENT_DOC = "CONTENT" @@ -604,145 +383,40 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): """ HDOCS_SET_DOC = "HDOCS_SET" - templates = { - - # Mailbox Level - - RECENT_DOC: { - "type": indexes.RECENT, - "mbox": INBOX_NAME, - fields.RECENTFLAGS_KEY: [], - }, - - HDOCS_SET_DOC: { - "type": indexes.HDOCS_SET, - "mbox": INBOX_NAME, - fields.HDOCS_SET_KEY: [], - } - - - } - - # Different locks for wrapping both the u1db document getting/setting - # and the property getting/settting in an atomic operation. - - # TODO --- deprecate ! --- use SoledadDocumentWrapper + locks - _rdoc_lock = defaultdict(lambda: threading.Lock()) - _rdoc_write_lock = defaultdict(lambda: threading.Lock()) - _rdoc_read_lock = defaultdict(lambda: threading.Lock()) - _rdoc_property_lock = defaultdict(lambda: threading.Lock()) - - _initialized = {} - - def __init__(self, mbox=None, soledad=None, memstore=None): - """ - Constructor for MessageCollection. - - On initialization, we ensure that we have a document for - storing the recent flags. The nature of this flag make us wanting - to store the set of the UIDs with this flag at the level of the - MessageCollection for each mailbox, instead of treating them - as a property of each message. - - We are passed an instance of MemoryStore, the same for the - SoledadBackedAccount, that we use as a read cache and a buffer - for writes. - - :param mbox: the name of the mailbox. It is the name - with which we filter the query over the - messages database. - :type mbox: str - :param soledad: Soledad database - :type soledad: Soledad instance - :param memstore: a MemoryStore instance - :type memstore: MemoryStore + def __init__(self, collection): """ - leap_assert(mbox, "Need a mailbox name to initialize") - leap_assert(mbox.strip() != "", "mbox cannot be blank space") - leap_assert(isinstance(mbox, (str, unicode)), - "mbox needs to be a string") - leap_assert(soledad, "Need a soledad instance to initialize") + Constructor for IMAPMessageCollection. - # okay, all in order, keep going... - - self.mbox = self._parse_mailbox_name(mbox) - - # XXX get a SoledadStore passed instead - self._soledad = soledad - self.memstore = memstore - - self.__rflags = None - - if not self._initialized.get(mbox, False): - try: - self.initialize_db() - # ensure that we have a recent-flags doc - self._get_or_create_rdoc() - except Exception: - logger.debug("Error initializing %r" % (mbox,)) - else: - self._initialized[mbox] = True - - def _get_empty_doc(self, _type=FLAGS_DOC): - """ - Returns an empty doc for storing different message parts. - Defaults to returning a template for a flags document. - :return: a dict with the template - :rtype: dict - """ - if _type not in self.templates.keys(): - raise TypeError("Improper type passed to _get_empty_doc") - return copy.deepcopy(self.templates[_type]) - - def _get_or_create_rdoc(self): - """ - Try to retrieve the recent-flags doc for this MessageCollection, - and create one if not found. + :param collection: an instance of a MessageCollection + :type collection: MessageCollection """ - # XXX should move this to memstore too - with self._rdoc_write_lock[self.mbox]: - rdoc = self._get_recent_doc_from_soledad() - if rdoc is None: - rdoc = self._get_empty_doc(self.RECENT_DOC) - if self.mbox != fields.INBOX_VAL: - rdoc[fields.MBOX_KEY] = self.mbox - self._soledad.create_doc(rdoc) - - # -------------------------------------------------------------------- + leap_assert( + collection.is_mailbox_collection(), + "Need a mailbox name to initialize") + mbox_name = collection.mbox_name + leap_assert(mbox_name.strip() != "", "mbox cannot be blank space") + leap_assert(isinstance(mbox_name, (str, unicode)), + "mbox needs to be a string") + self.collection = collection - # ----------------------------------------------------------------------- + # XXX this has to be done in IMAPAccount + # (Where the collection must be instantiated and passed to us) + # self.mbox = normalize_mailbox(mbox) - def _fdoc_already_exists(self, chash): + @property + def mbox_name(self): """ - Check whether we can find a flags doc for this mailbox with the - given content-hash. It enforces that we can only have the same maessage - listed once for a a given mailbox. - - :param chash: the content-hash to check about. - :type chash: basestring - :return: False, if it does not exist, or UID. + Return the string that identifies this mailbox. """ - exist = False - exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) + return self.collection.mbox_name - if not exist: - exist = self._get_fdoc_from_chash(chash) - if exist and exist.content is not None: - return exist.content.get(fields.UID_KEY, "unknown-uid") - else: - return False - - def add_msg(self, raw, subject=None, flags=None, date=None, - notify_on_disk=False): + def add_msg(self, raw, flags=None, date=None): """ Creates a new message document. :param raw: the raw message :type raw: str - :param subject: subject of the message. - :type subject: str - :param flags: flags :type flags: list @@ -756,212 +430,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): if flags is None: flags = tuple() leap_assert_type(flags, tuple) + return self.collection.add_msg(raw, flags, date) - # TODO ---- proxy to MessageCollection addMessage - - #observer = defer.Deferred() - #d = self._do_parse(raw) - #d.addCallback(lambda result: reactor.callInThread( - #self._do_add_msg, result, flags, subject, date, - #notify_on_disk, observer)) - #return observer - - # TODO --------------------------------------------------- - # move this to leap.mail.adaptors.soledad - - def _do_add_msg(self, parse_result, flags, subject, - date, notify_on_disk, observer): - """ - """ - msg, parts, chash, size, multi = parse_result - - # XXX move to SoledadAdaptor write operation ... ??? - # check for uniqueness -------------------------------- - # Watch out! We're reserving a UID right after this! - existing_uid = self._fdoc_already_exists(chash) - if existing_uid: - msg = self.get_msg_by_uid(existing_uid) - reactor.callFromThread(observer.callback, existing_uid) - msg.setFlags((fields.DELETED_FLAG,), -1) - return - - # TODO move UID autoincrement to MessageCollection.addMessage(mailbox) - # TODO S2 -- get FUCKING UID from autoincremental table - #uid = self.memstore.increment_last_soledad_uid(self.mbox) - #self.set_recent_flag(uid) - - - # ------------------------------------------------------------ - - # - # getters: specific queries - # - - # recent flags - - def _get_recent_flags(self): - """ - An accessor for the recent-flags set for this mailbox. + def get_msg_by_uid(self, uid, absolute=True): """ - # XXX check if we should remove this - if self.__rflags is not None: - return self.__rflags - - if self.memstore is not None: - with self._rdoc_lock[self.mbox]: - rflags = self.memstore.get_recent_flags(self.mbox) - if not rflags: - # not loaded in the memory store yet. - # let's fetch them from soledad... - rdoc = self._get_recent_doc_from_soledad() - if rdoc is None: - return set([]) - rflags = set(rdoc.content.get( - fields.RECENTFLAGS_KEY, [])) - # ...and cache them now. - self.memstore.load_recent_flags( - self.mbox, - {'doc_id': rdoc.doc_id, 'set': rflags}) - return rflags - - def _set_recent_flags(self, value): - """ - Setter for the recent-flags set for this mailbox. - """ - if self.memstore is not None: - self.memstore.set_recent_flags(self.mbox, value) - - recent_flags = property( - _get_recent_flags, _set_recent_flags, - doc="Set of UIDs with the recent flag for this mailbox.") - - def _get_recent_doc_from_soledad(self): - """ - Get recent-flags document from Soledad for this mailbox. - :rtype: SoledadDocument or None - """ - # FIXME ----- use deferreds. - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MBOX_IDX, - fields.TYPE_RECENT_VAL, self.mbox) - curried.expected = "rdoc" - with self._rdoc_read_lock[self.mbox]: - return try_unique_query(curried) - - # Property-set modification (protected by a different - # lock to give atomicity to the read/write operation) - - def unset_recent_flags(self, uids): - """ - Unset Recent flag for a sequence of uids. - - :param uids: the uids to unset - :type uid: sequence - """ - # FIXME ----- use deferreds. - with self._rdoc_property_lock[self.mbox]: - self.recent_flags.difference_update( - set(uids)) - - # Individual flags operations - - def unset_recent_flag(self, uid): - """ - Unset Recent flag for a given uid. - - :param uid: the uid to unset - :type uid: int - """ - # FIXME ----- use deferreds. - with self._rdoc_property_lock[self.mbox]: - self.recent_flags.difference_update( - set([uid])) - - def set_recent_flag(self, uid): - """ - Set Recent flag for a given uid. + Retrieves a IMAPMessage by UID. + This is used primarity in the Mailbox fetch and store methods. - :param uid: the uid to set + :param uid: the message uid to query by :type uid: int - """ - # FIXME ----- use deferreds. - with self._rdoc_property_lock[self.mbox]: - self.recent_flags = self.recent_flags.union( - set([uid])) - - # individual doc getters, message layer. - def _get_fdoc_from_chash(self, chash): + :rtype: IMAPMessage """ - Return a flags document for this mailbox with a given chash. + def make_imap_msg(msg): + kls = self.messageklass + # TODO --- remove ref to collection + return kls(msg, self.collection) - :return: A SoledadDocument containing the Flags Document, or None if - the query failed. - :rtype: SoledadDocument or None. - """ - # USED from: - # [ ] duplicated fdoc detection - # [ ] _get_uid_from_msgidCb - - # FIXME ----- use deferreds. - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MBOX_C_HASH_IDX, - fields.TYPE_FLAGS_VAL, self.mbox, chash) - curried.expected = "fdoc" - fdoc = try_unique_query(curried) - if fdoc is not None: - return fdoc - else: - # probably this should be the other way round, - # ie, try fist on memstore... - cf = self.memstore._chash_fdoc_store - fdoc = cf[chash][self.mbox] - # hey, I just needed to wrap fdoc thing into - # a "content" attribute, look a better way... - if not empty(fdoc): - return MessagePartDoc( - new=None, dirty=None, part=None, - store=None, doc_id=None, - content=fdoc) - - def _get_uid_from_msgidCb(self, msgid): - hdoc = None - curried = partial( - self._soledad.get_from_index, - fields.TYPE_MSGID_IDX, - fields.TYPE_HEADERS_VAL, msgid) - curried.expected = "hdoc" - hdoc = try_unique_query(curried) - - # XXX this is only a quick hack to avoid regression - # on the "multiple copies of the draft" issue, but - # this is currently broken since it's not efficient to - # look for this. Should lookup better. - # FIXME! - - if hdoc is not None: - hdoc_dict = hdoc.content + d = self.collection.get_msg_by_uid(uid, absolute=absolute) + d.addCalback(make_imap_msg) + return d - else: - hdocstore = self.memstore._hdoc_store - match = [x for _, x in hdocstore.items() if x['msgid'] == msgid] - hdoc_dict = first(match) - - if hdoc_dict is None: - logger.warning("Could not find hdoc for msgid %s" - % (msgid,)) - return None - msg_chash = hdoc_dict.get(fields.CONTENT_HASH_KEY) - - fdoc = self._get_fdoc_from_chash(msg_chash) - if not fdoc: - logger.warning("Could not find fdoc for msgid %s" - % (msgid,)) - return None - return fdoc.content.get(fields.UID_KEY, None) + # TODO -- move this to collection too + # Used for the Search (Drafts) queries? def _get_uid_from_msgid(self, msgid): """ Return a UID for a given message-id. @@ -972,15 +464,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :return: A UID, or None """ - # We need to wait a little bit, cause in some of the cases - # the query is received right after we've saved the document, - # and we cannot find it otherwise. This seems to be enough. - - # XXX do a deferLater instead ?? - # XXX is this working? return self._get_uid_from_msgidCb(msgid) - def set_flags(self, mbox, messages, flags, mode, observer): + # TODO handle deferreds + def set_flags(self, messages, flags, mode): """ Set flags for a sequence of messages. @@ -1000,142 +487,27 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): getmsg = self.get_msg_by_uid def set_flags(uid, flags, mode): - msg = getmsg(uid, mem_only=True, flags_only=True) + msg = getmsg(uid) if msg is not None: + # XXX IMAPMessage needs access to the collection + # to be able to set flags. Better if we make use + # of collection... here. return uid, msg.setFlags(flags, mode) setted_flags = [set_flags(uid, flags, mode) for uid in messages] result = dict(filter(None, setted_flags)) + # XXX return gatherResults or something + return result - # TODO -- remove - reactor.callFromThread(observer.callback, result) - - # getters: generic for a mailbox - - def get_msg_by_uid(self, uid, mem_only=False, flags_only=False): - """ - Retrieves a LeapMessage by UID. - This is used primarity in the Mailbox fetch and store methods. - - :param uid: the message uid to query by - :type uid: int - :param mem_only: a flag that indicates whether this Message should - pass a reference to soledad to retrieve missing pieces - or not. - :type mem_only: bool - :param flags_only: whether the message should carry only a reference - to the flags document. - :type flags_only: bool - - :return: A LeapMessage instance matching the query, - or None if not found. - :rtype: LeapMessage - """ - msg_container = self.memstore.get_message( - self.mbox, uid, flags_only=flags_only) - - if msg_container is not None: - if mem_only: - msg = IMAPMessage(None, uid, self.mbox, collection=self, - container=msg_container) - else: - # We pass a reference to soledad just to be able to retrieve - # missing parts that cannot be found in the container, like - # the content docs after a copy. - msg = IMAPMessage(self._soledad, uid, self.mbox, - collection=self, container=msg_container) - else: - msg = IMAPMessage(self._soledad, uid, self.mbox, collection=self) - - if not msg.does_exist(): - return None - return msg - - # FIXME --- used where ? --------------------------------------------- - #def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL): - #""" - #Get all documents for the selected mailbox of the - #passed type. By default, it returns the flag docs. -# - #If you want acess to the content, use __iter__ instead -# - #:return: a Deferred, that will fire with a list of u1db documents - #:rtype: Deferred (promise of list of SoledadDocument) - #""" - #if _type not in fields.__dict__.values(): - #raise TypeError("Wrong type passed to get_all_docs") -# - # FIXME ----- either raise or return a deferred wrapper. - #if sameProxiedObjects(self._soledad, None): - #logger.warning('Tried to get messages but soledad is None!') - #return [] -# - #def get_sorted_docs(docs): - #all_docs = [doc for doc in docs] - # inneficient, but first let's grok it and then - # let's worry about efficiency. - # XXX FIXINDEX -- should implement order by in soledad - # FIXME ---------------------------------------------- - #return sorted(all_docs, key=lambda item: item.content['uid']) -# - #d = self._soledad.get_from_index( - #fields.TYPE_MBOX_IDX, _type, self.mbox) - #d.addCallback(get_sorted_docs) - #return d - - def all_soledad_uid_iter(self): - """ - Return an iterator through the UIDs of all messages, sorted in - ascending order. - """ - # XXX FIXME ------ sorted??? - - def get_uids(docs): - return set([ - doc.content[self.UID_KEY] for doc in docs if not empty(doc)]) - - d = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) - d.addCallback(get_uids) - return d - - def all_uid_iter(self): - """ - Return an iterator through the UIDs of all messages, from memory. + def count(self): """ - mem_uids = self.memstore.get_uids(self.mbox) - soledad_known_uids = self.memstore.get_soledad_known_uids( - self.mbox) - combined = tuple(set(mem_uids).union(soledad_known_uids)) - return combined + Return the count of messages for this mailbox. - def get_all_soledad_flag_docs(self): + :rtype: int """ - Return a dict with the content of all the flag documents - in soledad store for the given mbox. + return self.collection.count() - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: dict - """ - # XXX we really could return a reduced version with - # just {'uid': (flags-tuple,) since the prefetch is - # only oriented to get the flag tuples. - - def get_content(docs): - all_docs = [( - doc.content[self.UID_KEY], - dict(doc.content)) - for doc in docs - if not empty(doc.content)] - all_flags = dict(all_docs) - return all_flags - - d = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_FLAGS_VAL, self.mbox) - d.addCallback(get_content) - return d + # headers query def all_headers(self): """ @@ -1144,15 +516,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :rtype: dict """ - return self.memstore.all_headers(self.mbox) - - def count(self): - """ - Return the count of messages for this mailbox. - - :rtype: int - """ - return self.memstore.count(self.mbox) + # Use self.collection.mbox_indexer + # and derive all the doc_ids for the hdocs + raise NotImplementedError() # unseen messages @@ -1164,7 +530,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :return: iterator through unseen message doc UIDs :rtype: iterable """ - return self.memstore.unseen_iter(self.mbox) + raise NotImplementedError() def count_unseen(self): """ @@ -1182,13 +548,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :returns: a list of LeapMessages :rtype: list """ - return [IMAPMessage(self._soledad, docid, self.mbox, collection=self) - for docid in self.unseen_iter()] + raise NotImplementedError() + #return [self.messageklass(self._soledad, doc_id, self.mbox) + #for doc_id in self.unseen_iter()] # recent messages - # XXX take it from memstore - # XXX Used somewhere? def count_recent(self): """ Count all messages with the `Recent` flag. @@ -1199,32 +564,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): :returns: count :rtype: int """ - return len(self.recent_flags) + raise NotImplementedError() + + # magic def __len__(self): """ Returns the number of messages on this mailbox. - :rtype: int """ return self.count() - def __iter__(self): - """ - Returns an iterator over all messages. - - :returns: iterator of dicts with content for all messages. - :rtype: iterable - """ - return (IMAPMessage(self._soledad, docuid, self.mbox, collection=self) - for docuid in self.all_uid_iter()) - def __repr__(self): """ Representation string for this object. """ - return u"" % ( - self.mbox, self.count()) + return u"" % ( + self.mbox_name, self.count()) - # XXX should implement __eq__ also !!! - # use chash... + # TODO implement __iter__ ? diff --git a/src/leap/mail/imap/soledadstore.py b/src/leap/mail/imap/soledadstore.py deleted file mode 100644 index fc8ea55..0000000 --- a/src/leap/mail/imap/soledadstore.py +++ /dev/null @@ -1,617 +0,0 @@ -# -*- coding: utf-8 -*- -# soledadstore.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -A MessageStore that writes to Soledad. -""" -import logging -import threading - -from collections import defaultdict -from itertools import chain - -from u1db import errors as u1db_errors -from twisted.python import log -from zope.interface import implements - -from leap.common.check import leap_assert_type, leap_assert -from leap.mail.decorators import deferred_to_thread -from leap.mail.imap.messageparts import MessagePartType -from leap.mail.imap.messageparts import MessageWrapper -from leap.mail.imap.messageparts import RecentFlagsDoc -from leap.mail.imap.fields import fields -from leap.mail.imap.interfaces import IMessageStore -from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first, empty, accumulator_queue - -logger = logging.getLogger(__name__) - - -class ContentDedup(object): - """ - Message deduplication. - - We do a query for the content hashes before writing to our beloved - sqlcipher backend of Soledad. This means, by now, that: - - 1. We will not store the same body/attachment twice, only the hash of it. - 2. We will not store the same message header twice, only the hash of it. - - The first case is useful if you are always receiving the same old memes - from unwary friends that still have not discovered that 4chan is the - generator of the internet. The second will save your day if you have - initiated session with the same account in two different machines. I also - wonder why would you do that, but let's respect each other choices, like - with the religious celebrations, and assume that one day we'll be able - to run Bitmask in completely free phones. Yes, I mean that, the whole GSM - Stack. - """ - # TODO refactor using unique_query - - def _header_does_exist(self, doc): - """ - Check whether we already have a header document for this - content hash in our database. - - :param doc: tentative header for document - :type doc: dict - :returns: True if it exists, False otherwise. - """ - if not doc: - return False - chash = doc[fields.CONTENT_HASH_KEY] - header_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_HEADERS_VAL, str(chash)) - if not header_docs: - return False - - # FIXME enable only to debug this problem. - #if len(header_docs) != 1: - #logger.warning("Found more than one copy of chash %s!" - #% (chash,)) - - #logger.debug("Found header doc with that hash! Skipping save!") - return True - - def _content_does_exist(self, doc): - """ - Check whether we already have a content document for a payload - with this hash in our database. - - :param doc: tentative content for document - :type doc: dict - :returns: True if it exists, False otherwise. - """ - if not doc: - return False - phash = doc[fields.PAYLOAD_HASH_KEY] - attach_docs = self._soledad.get_from_index( - fields.TYPE_P_HASH_IDX, - fields.TYPE_CONTENT_VAL, str(phash)) - if not attach_docs: - return False - - # FIXME enable only to debug this problem - #if len(attach_docs) != 1: - #logger.warning("Found more than one copy of phash %s!" - #% (phash,)) - #logger.debug("Found attachment doc with that hash! Skipping save!") - return True - - -class MsgWriteError(Exception): - """ - Raised if any exception is found while saving message parts. - """ - pass - - -""" -A lock per document. -""" -# TODO should bound the space of this!!! -# http://stackoverflow.com/a/2437645/1157664 -# Setting this to twice the number of threads in the threadpool -# should be safe. - -put_locks = defaultdict(lambda: threading.Lock()) -mbox_doc_locks = defaultdict(lambda: threading.Lock()) - - -class SoledadStore(ContentDedup): - """ - This will create docs in the local Soledad database. - """ - _remove_lock = threading.Lock() - - implements(IMessageConsumer, IMessageStore) - - def __init__(self, soledad): - """ - Initialize the permanent store that writes to Soledad database. - - :param soledad: the soledad instance - :type soledad: Soledad - """ - from twisted.internet import reactor - self.reactor = reactor - - self._soledad = soledad - - self._CREATE_DOC_FUN = self._soledad.create_doc - self._PUT_DOC_FUN = self._soledad.put_doc - self._GET_DOC_FUN = self._soledad.get_doc - - # we instantiate an accumulator to batch the notifications - self.docs_notify_queue = accumulator_queue( - lambda item: reactor.callFromThread(self._unset_new_dirty, item), - 20) - - # IMessageStore - - # ------------------------------------------------------------------- - # We are not yet using this interface, but it would make sense - # to implement it. - - def create_message(self, mbox, uid, message): - """ - Create the passed message into this SoledadStore. - - :param mbox: the mbox this message belongs. - :type mbox: str or unicode - :param uid: the UID that identifies this message in this mailbox. - :type uid: int - :param message: a IMessageContainer implementor. - """ - raise NotImplementedError() - - def put_message(self, mbox, uid, message): - """ - Put the passed existing message into this SoledadStore. - - :param mbox: the mbox this message belongs. - :type mbox: str or unicode - :param uid: the UID that identifies this message in this mailbox. - :type uid: int - :param message: a IMessageContainer implementor. - """ - raise NotImplementedError() - - def remove_message(self, mbox, uid): - """ - Remove the given message from this SoledadStore. - - :param mbox: the mbox this message belongs. - :type mbox: str or unicode - :param uid: the UID that identifies this message in this mailbox. - :type uid: int - """ - raise NotImplementedError() - - def get_message(self, mbox, uid): - """ - Get a IMessageContainer for the given mbox and uid combination. - - :param mbox: the mbox this message belongs. - :type mbox: str or unicode - :param uid: the UID that identifies this message in this mailbox. - :type uid: int - """ - raise NotImplementedError() - - # IMessageConsumer - - # TODO should handle the delete case - # TODO should handle errors better - # TODO could generalize this method into a generic consumer - # and only implement `process` here - - def consume(self, queue): - """ - Creates a new document in soledad db. - - :param queue: a tuple of queues to get item from, with content of the - document to be inserted. - :type queue: tuple of Queues - """ - new, dirty = queue - while not new.empty(): - doc_wrapper = new.get() - self.reactor.callInThread(self._consume_doc, doc_wrapper, - self.docs_notify_queue) - while not dirty.empty(): - doc_wrapper = dirty.get() - self.reactor.callInThread(self._consume_doc, doc_wrapper, - self.docs_notify_queue) - - # Queue empty, flush the notifications queue. - self.docs_notify_queue(None, flush=True) - - def _unset_new_dirty(self, doc_wrapper): - """ - Unset the `new` and `dirty` flags for this document wrapper in the - memory store. - - :param doc_wrapper: a MessageWrapper instance - :type doc_wrapper: MessageWrapper - """ - if isinstance(doc_wrapper, MessageWrapper): - # XXX still needed for debug quite often - #logger.info("unsetting new flag!") - doc_wrapper.new = False - doc_wrapper.dirty = False - - @deferred_to_thread - def _consume_doc(self, doc_wrapper, notify_queue): - """ - Consume each document wrapper in a separate thread. - We pass an instance of an accumulator that handles the notifications - to the memorystore when the write has been done. - - :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance - :type doc_wrapper: MessageWrapper or RecentFlagsDoc - :param notify_queue: a callable that handles the writeback - notifications to the memstore. - :type notify_queue: callable - """ - def queueNotifyBack(failed, doc_wrapper): - if failed: - log.msg("There was an error writing the mesage...") - else: - notify_queue(doc_wrapper) - - def doSoledadCalls(items): - # we prime the generator, that should return the - # message or flags wrapper item in the first place. - try: - doc_wrapper = items.next() - except StopIteration: - pass - else: - failed = self._soledad_write_document_parts(items) - queueNotifyBack(failed, doc_wrapper) - - doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) - - # - # SoledadStore specific methods. - # - - def _soledad_write_document_parts(self, items): - """ - Write the document parts to soledad in a separate thread. - - :param items: the iterator through the different document wrappers - payloads. - :type items: iterator - :return: whether the write was successful or not - :rtype: bool - """ - failed = False - for item, call in items: - if empty(item): - continue - try: - self._try_call(call, item) - except Exception as exc: - logger.debug("ITEM WAS: %s" % repr(item)) - if hasattr(item, 'content'): - logger.debug("ITEM CONTENT WAS: %s" % - repr(item.content)) - logger.exception(exc) - failed = True - continue - return failed - - def _iter_wrapper_subparts(self, doc_wrapper): - """ - Return an iterator that will yield the doc_wrapper in the first place, - followed by the subparts item and the proper call type for every - item in the queue, if any. - - :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance - :type doc_wrapper: MessageWrapper or RecentFlagsDoc - """ - if isinstance(doc_wrapper, MessageWrapper): - return chain((doc_wrapper,), - self._get_calls_for_msg_parts(doc_wrapper)) - elif isinstance(doc_wrapper, RecentFlagsDoc): - return chain((doc_wrapper,), - self._get_calls_for_rflags_doc(doc_wrapper)) - else: - logger.warning("CANNOT PROCESS ITEM!") - return (i for i in []) - - def _try_call(self, call, item): - """ - Try to invoke a given call with item as a parameter. - - :param call: the function to call - :type call: callable - :param item: the payload to pass to the call as argument - :type item: object - """ - if call is None: - return - - if call == self._PUT_DOC_FUN: - doc_id = item.doc_id - if doc_id is None: - logger.warning("BUG! Dirty doc but has no doc_id!") - return - with put_locks[doc_id]: - doc = self._GET_DOC_FUN(doc_id) - - if doc is None: - logger.warning("BUG! Dirty doc but could not " - "find document %s" % (doc_id,)) - return - - doc.content = dict(item.content) - - item = doc - try: - call(item) - except u1db_errors.RevisionConflict as exc: - logger.exception("Error: %r" % (exc,)) - raise exc - except Exception as exc: - logger.exception("Error: %r" % (exc,)) - raise exc - - else: - try: - call(item) - except u1db_errors.RevisionConflict as exc: - logger.exception("Error: %r" % (exc,)) - raise exc - except Exception as exc: - logger.exception("Error: %r" % (exc,)) - raise exc - - def _get_calls_for_msg_parts(self, msg_wrapper): - """ - Generator that return the proper call type for a given item. - - :param msg_wrapper: A MessageWrapper - :type msg_wrapper: IMessageContainer - :return: a generator of tuples with recent-flags doc payload - and callable - :rtype: generator - """ - call = None - - if msg_wrapper.new: - call = self._CREATE_DOC_FUN - - # item is expected to be a MessagePartDoc - for item in msg_wrapper.walk(): - if item.part == MessagePartType.fdoc: - yield dict(item.content), call - - elif item.part == MessagePartType.hdoc: - if not self._header_does_exist(item.content): - yield dict(item.content), call - - elif item.part == MessagePartType.cdoc: - if not self._content_does_exist(item.content): - yield dict(item.content), call - - # For now, the only thing that will be dirty is - # the flags doc. - - elif msg_wrapper.dirty: - call = self._PUT_DOC_FUN - # item is expected to be a MessagePartDoc - for item in msg_wrapper.walk(): - # XXX FIXME Give error if dirty and not doc_id !!! - doc_id = item.doc_id # defend! - if not doc_id: - logger.warning("Dirty item but no doc_id!") - continue - - if item.part == MessagePartType.fdoc: - yield item, call - - # XXX also for linkage-doc !!! - else: - logger.error("Cannot delete documents yet from the queue...!") - - def _get_calls_for_rflags_doc(self, rflags_wrapper): - """ - We always put these documents. - - :param rflags_wrapper: A wrapper around recent flags doc. - :type rflags_wrapper: RecentFlagsWrapper - :return: a tuple with recent-flags doc payload and callable - :rtype: tuple - """ - call = self._PUT_DOC_FUN - - payload = rflags_wrapper.content - if payload: - logger.debug("Saving RFLAGS to Soledad...") - yield rflags_wrapper, call - - # Mbox documents and attributes - - def get_mbox_document(self, mbox): - """ - Return mailbox document. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: A SoledadDocument containing this mailbox, or None if - the query failed. - :rtype: SoledadDocument or None. - """ - with mbox_doc_locks[mbox]: - return self._get_mbox_document(mbox) - - def _get_mbox_document(self, mbox): - """ - Helper for returning the mailbox document. - """ - try: - query = self._soledad.get_from_index( - fields.TYPE_MBOX_IDX, - fields.TYPE_MBOX_VAL, mbox) - if query: - return query.pop() - else: - logger.error("Could not find mbox document for %r" % - (mbox,)) - except Exception as exc: - logger.exception("Unhandled error %r" % exc) - - def get_mbox_closed(self, mbox): - """ - Return the closed attribute for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :rtype: bool - """ - mbox_doc = self.get_mbox_document() - return mbox_doc.content.get(fields.CLOSED_KEY, False) - - def set_mbox_closed(self, mbox, closed): - """ - Set the closed attribute for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :param closed: the value to be set - :type closed: bool - """ - leap_assert(isinstance(closed, bool), "closed needs to be boolean") - with mbox_doc_locks[mbox]: - mbox_doc = self._get_mbox_document(mbox) - if mbox_doc is None: - logger.error( - "Could not find mbox document for %r" % (mbox,)) - return - mbox_doc.content[fields.CLOSED_KEY] = closed - self._soledad.put_doc(mbox_doc) - - def write_last_uid(self, mbox, value): - """ - Write the `last_uid` integer to the proper mailbox document - in Soledad. - This is called from the deferred triggered by - memorystore.increment_last_soledad_uid, which is expected to - run in a separate thread. - - :param mbox: the mailbox - :type mbox: str or unicode - :param value: the value to set - :type value: int - """ - leap_assert_type(value, int) - key = fields.LAST_UID_KEY - - # XXX use accumulator to reduce number of hits - with mbox_doc_locks[mbox]: - mbox_doc = self._get_mbox_document(mbox) - old_val = mbox_doc.content[key] - if value > old_val: - mbox_doc.content[key] = value - try: - self._soledad.put_doc(mbox_doc) - except Exception as exc: - logger.error("Error while setting last_uid for %r" - % (mbox,)) - logger.exception(exc) - - def get_flags_doc(self, mbox, uid): - """ - Return the SoledadDocument for the given mbox and uid. - - :param mbox: the mailbox - :type mbox: str or unicode - :param uid: the UID for the message - :type uid: int - :rtype: SoledadDocument or None - """ - # TODO -- inlineCallbacks - result = None - try: - # TODO -- yield - flag_docs = self._soledad.get_from_index( - fields.TYPE_MBOX_UID_IDX, - fields.TYPE_FLAGS_VAL, mbox, str(uid)) - if len(flag_docs) != 1: - logger.warning("More than one flag doc for %r:%s" % - (mbox, uid)) - result = first(flag_docs) - except Exception as exc: - # ugh! Something's broken down there! - logger.warning("ERROR while getting flags for UID: %s" % uid) - logger.exception(exc) - finally: - return result - - def get_headers_doc(self, chash): - """ - Return the document that keeps the headers for a message - indexed by its content-hash. - - :param chash: the content-hash to retrieve the document from. - :type chash: str or unicode - :rtype: SoledadDocument or None - """ - head_docs = self._soledad.get_from_index( - fields.TYPE_C_HASH_IDX, - fields.TYPE_HEADERS_VAL, str(chash)) - return first(head_docs) - - # deleted messages - - def deleted_iter(self, mbox): - """ - Get an iterator for the the doc_id for SoledadDocuments for messages - with \\Deleted flag for a given mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - :return: iterator through deleted message docs - :rtype: iterable - """ - return [doc.doc_id for doc in self._soledad.get_from_index( - fields.TYPE_MBOX_DEL_IDX, - fields.TYPE_FLAGS_VAL, mbox, '1')] - - def remove_all_deleted(self, mbox): - """ - Remove from Soledad all messages flagged as deleted for a given - mailbox. - - :param mbox: the mailbox - :type mbox: str or unicode - """ - deleted = [] - for doc_id in self.deleted_iter(mbox): - with self._remove_lock: - doc = self._soledad.get_doc(doc_id) - if doc is not None: - self._soledad.delete_doc(doc) - try: - deleted.append(doc.content[fields.UID_KEY]) - except TypeError: - # empty content - pass - return deleted diff --git a/src/leap/mail/mail.py b/src/leap/mail/mail.py index ca07f67..482b64d 100644 --- a/src/leap/mail/mail.py +++ b/src/leap/mail/mail.py @@ -20,6 +20,7 @@ Generic Access to Mail objects: Public LEAP Mail API. from twisted.internet import defer from leap.mail.constants import INBOX_NAME +from leap.mail.constants import MessageFlags from leap.mail.mailbox_indexer import MailboxIndexer from leap.mail.adaptors.soledad import SoledadMailAdaptor @@ -61,6 +62,18 @@ class Message(object): def get_internal_date(self): """ + Retrieve the date internally associated with this message + + According to the spec, this is NOT the date and time in the + RFC-822 header, but rather a date and time that reflects when the + message was received. + + * In SMTP, date and time of final delivery. + * In COPY, internal date/time of the source message. + * In APPEND, date/time specified. + + :return: An RFC822-formatted date string. + :rtype: str """ return self._wrapper.fdoc.date @@ -99,6 +112,15 @@ class Message(object): return tuple(self._wrapper.fdoc.tags) +class Flagsmode(object): + """ + Modes for setting the flags/tags. + """ + APPEND = 1 + REMOVE = -1 + SET = 0 + + class MessageCollection(object): """ A generic collection of messages. It can be messages sharing the same @@ -132,6 +154,7 @@ class MessageCollection(object): def __init__(self, adaptor, store, mbox_indexer=None, mbox_wrapper=None): """ + Constructore for a MessageCollection. """ self.adaptor = adaptor self.store = store @@ -149,6 +172,20 @@ class MessageCollection(object): """ return bool(self.mbox_wrapper) + @property + def mbox_name(self): + wrapper = getattr(self, "mbox_wrapper", None) + if not wrapper: + return None + return wrapper.mbox + + def get_mbox_attr(self, attr): + return getattr(self.mbox_wrapper, attr) + + def set_mbox_attr(self, attr, value): + setattr(self.mbox_wrapper, attr, value) + return self.mbox_wrapper.update(self.store) + # Get messages def get_message_by_content_hash(self, chash, get_cdocs=False): @@ -162,7 +199,7 @@ class MessageCollection(object): # or use the internal collection of pointers-to-docs. raise NotImplementedError() - metamsg_id = _get_mdoc_id(self.mbox_wrapper.mbox, chash) + metamsg_id = _get_mdoc_id(self.mbox_name, chash) return self.adaptor.get_msg_from_mdoc_id( self.messageklass, self.store, @@ -181,25 +218,37 @@ class MessageCollection(object): raise NotImplementedError("Does not support relative ids yet") def get_msg_from_mdoc_id(doc_id): + # XXX pass UID? return self.adaptor.get_msg_from_mdoc_id( self.messageklass, self.store, doc_id, get_cdocs=get_cdocs) - d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_wrapper.mbox, uid) + d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_name, uid) d.addCallback(get_msg_from_mdoc_id) return d def count(self): """ Count the messages in this collection. - :rtype: int + :return: a Deferred that will fire with the integer for the count. + :rtype: Deferred """ if not self.is_mailbox_collection(): raise NotImplementedError() - return self.mbox_indexer.count(self.mbox_wrapper.mbox) + return self.mbox_indexer.count(self.mbox_name) + + def get_uid_next(self): + """ + Get the next integer beyond the highest UID count for this mailbox. + + :return: a Deferred that will fire with the integer for the next uid. + :rtype: Deferred + """ + return self.mbox_indexer.get_uid_next(self.mbox_name) # Manipulate messages + # TODO pass flags, date too... def add_msg(self, raw_msg): """ Add a message to this collection. @@ -208,14 +257,14 @@ class MessageCollection(object): wrapper = msg.get_wrapper() if self.is_mailbox_collection(): - mbox = self.mbox_wrapper.mbox + mbox = self.mbox_name wrapper.set_mbox(mbox) def insert_mdoc_id(_): # XXX does this work? doc_id = wrapper.mdoc.doc_id return self.mbox_indexer.insert_doc( - self.mbox_wrapper.mbox, doc_id) + self.mbox_name, doc_id) d = wrapper.create(self.store) d.addCallback(insert_mdoc_id) @@ -248,31 +297,45 @@ class MessageCollection(object): # XXX does this work? doc_id = wrapper.mdoc.doc_id return self.mbox_indexer.delete_doc_by_hash( - self.mbox_wrapper.mbox, doc_id) + self.mbox_name, doc_id) d = wrapper.delete(self.store) d.addCallback(delete_mdoc_id) return d # TODO should add a delete-by-uid to collection? + def _update_flags_or_tags(self, old, new, mode): + if mode == Flagsmode.APPEND: + final = list((set(tuple(old) + new))) + elif mode == Flagsmode.REMOVE: + final = list(set(old).difference(set(new))) + elif mode == Flagsmode.SET: + final = new + return final + def udpate_flags(self, msg, flags, mode): """ Update flags for a given message. """ wrapper = msg.get_wrapper() - # 1. update the flags in the message wrapper --- stored where??? - # 2. update the special flags in the wrapper (seen, etc) - # 3. call adaptor.update_msg(store) - pass + current = wrapper.fdoc.flags + newflags = self._update_flags_or_tags(current, flags, mode) + wrapper.fdoc.flags = newflags + + wrapper.fdoc.seen = MessageFlags.SEEN_FLAG in newflags + wrapper.fdoc.deleted = MessageFlags.DELETED_FLAG in newflags + + return self.adaptor.update_msg(self.store, msg) def update_tags(self, msg, tags, mode): """ Update tags for a given message. """ wrapper = msg.get_wrapper() - # 1. update the tags in the message wrapper --- stored where??? - # 2. call adaptor.update_msg(store) - pass + current = wrapper.fdoc.tags + newtags = self._update_flags_or_tags(current, tags, mode) + wrapper.fdoc.tags = newtags + return self.adaptor.update_msg(self.store, msg) class Account(object): @@ -382,6 +445,8 @@ class Account(object): d.addCallback(rename_uid_table_cb) return d + # Get Collections + def get_collection_by_mailbox(self, name): """ :rtype: MessageCollection diff --git a/src/leap/mail/messageflow.py b/src/leap/mail/messageflow.py deleted file mode 100644 index c8f224c..0000000 --- a/src/leap/mail/messageflow.py +++ /dev/null @@ -1,200 +0,0 @@ -# -*- coding: utf-8 -*- -# messageflow.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -""" -Message Producers and Consumers for flow control. -""" -import Queue - -from twisted.internet.task import LoopingCall - -from zope.interface import Interface, implements - - -class IMessageConsumer(Interface): - """ - I consume messages from a queue. - """ - - def consume(self, queue): - """ - Consumes the passed item. - - :param item: a queue where we put the object to be consumed. - :type item: object - """ - # TODO we could add an optional type to be passed - # for doing type check. - - # TODO in case of errors, we could return the object to - # the queue, maybe wrapped in an object with a retries attribute. - - -class IMessageProducer(Interface): - """ - I produce messages and put them in a store to be consumed by other - entities. - """ - - def push(self, item, state=None): - """ - Push a new item in the queue. - """ - - def start(self): - """ - Start producing items. - """ - - def stop(self): - """ - Stop producing items. - """ - - def flush(self): - """ - Flush queued messages to consumer. - """ - - -class DummyMsgConsumer(object): - - implements(IMessageConsumer) - - def consume(self, queue): - """ - Just prints the passed item. - """ - if not queue.empty(): - print "got item %s" % queue.get() - - -class MessageProducer(object): - """ - A Producer class that we can use to temporarily buffer the production - of messages so that different objects can consume them. - - This is useful for serializing the consumption of the messages stream - in the case of an slow resource (db), or for returning early from a - deferred chain and leave further processing detached from the calling loop, - as in the case of smtp. - """ - implements(IMessageProducer) - - # TODO this can be seen as a first step towards properly implementing - # components that implement IPushProducer / IConsumer interfaces. - # However, I need to think more about how to pause the streaming. - # In any case, the differential rate between message production - # and consumption is not likely (?) to consume huge amounts of memory in - # our current settings, so the need to pause the stream is not urgent now. - - # TODO use enum - STATE_NEW = 1 - STATE_DIRTY = 2 - - def __init__(self, consumer, queue=Queue.Queue, period=1): - """ - Initializes the MessageProducer - - :param consumer: an instance of a IMessageConsumer that will consume - the new messages. - :param queue: any queue implementation to be used as the temporary - buffer for new items. Default is a FIFO Queue. - :param period: the period to check for new items, in seconds. - """ - # XXX should assert it implements IConsumer / IMailConsumer - # it should implement a `consume` method - self._consumer = consumer - - self._queue_new = queue() - self._queue_dirty = queue() - self._period = period - - self._loop = LoopingCall(self._check_for_new) - - # private methods - - def _check_for_new(self): - """ - Check for new items in the internal queue, and calls the consume - method in the consumer. - - If the queue is found empty, the loop is stopped. It will be started - again after the addition of new items. - """ - self._consumer.consume((self._queue_new, self._queue_dirty)) - if self.is_queue_empty(): - self.stop() - - def is_queue_empty(self): - """ - Return True if queue is empty, False otherwise. - """ - new = self._queue_new - dirty = self._queue_dirty - return new.empty() and dirty.empty() - - # public methods: IMessageProducer - - def push(self, item, state=None): - """ - Push a new item in the queue. - - If the queue was empty, we will start the loop again. - """ - # XXX this might raise if the queue does not accept any new - # items. what to do then? - queue = self._queue_new - - if state == self.STATE_NEW: - queue = self._queue_new - if state == self.STATE_DIRTY: - queue = self._queue_dirty - - queue.put(item) - self.start() - - def start(self): - """ - Start polling for new items. - """ - if not self._loop.running: - self._loop.start(self._period, now=True) - - def stop(self): - """ - Stop polling for new items. - """ - if self._loop.running: - self._loop.stop() - - def flush(self): - """ - Flush queued messages to consumer. - """ - self._check_for_new() - - -if __name__ == "__main__": - from twisted.internet import reactor - producer = MessageProducer(DummyMsgConsumer()) - producer.start() - - for delay, item in ((2, 1), (3, 2), (4, 3), - (6, 4), (7, 5), (8, 6), (8.2, 7), - (15, 'a'), (16, 'b'), (17, 'c')): - reactor.callLater(delay, producer.put, item) - reactor.run() -- cgit v1.2.3