diff options
| author | Kali Kaneko <kali@leap.se> | 2015-01-01 18:21:44 -0400 | 
|---|---|---|
| committer | Kali Kaneko <kali@leap.se> | 2015-02-11 14:05:43 -0400 | 
| commit | 176835f5415a328b9e9813e658234fd24b4164c8 (patch) | |
| tree | 17f40ba362acfd02ae6ec963fe40002e170048a0 /mail | |
| parent | a5b725cda14074613193f793b76ccb4ea5a8a2a3 (diff) | |
cleanup imap implementation
Diffstat (limited to 'mail')
| -rw-r--r-- | mail/src/leap/mail/adaptors/soledad.py | 13 | ||||
| -rw-r--r-- | mail/src/leap/mail/constants.py | 14 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/account.py | 306 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/fields.py | 51 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/interfaces.py | 96 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/mailbox.py | 472 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/memorystore.py | 1340 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messageparts.py | 586 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/messages.py | 1007 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/soledadstore.py | 617 | ||||
| -rw-r--r-- | mail/src/leap/mail/mail.py | 93 | ||||
| -rw-r--r-- | mail/src/leap/mail/messageflow.py | 200 | 
12 files changed, 522 insertions, 4273 deletions
| diff --git a/mail/src/leap/mail/adaptors/soledad.py b/mail/src/leap/mail/adaptors/soledad.py index 0b97869..bf8f7e9 100644 --- a/mail/src/leap/mail/adaptors/soledad.py +++ b/mail/src/leap/mail/adaptors/soledad.py @@ -513,9 +513,13 @@ class MailboxWrapper(SoledadDocumentWrapper):          type_ = "mbox"          mbox = INBOX_NAME          flags = [] +        recent = [] +        created = 1          closed = False          subscribed = False -        rw = True + +        # I think we don't need to store this one. +        # rw = True          class __meta__(object):              index = "mbox" @@ -655,6 +659,7 @@ class SoledadMailAdaptor(SoledadIndexMixin):          assert(MessageClass is not None)          return MessageClass(MessageWrapper(mdoc, fdoc, hdoc, cdocs)) +    # XXX pass UID too?      def _get_msg_from_variable_doc_list(self, doc_list, msg_class):          if len(doc_list) == 2:              fdoc, hdoc = doc_list @@ -664,12 +669,14 @@ class SoledadMailAdaptor(SoledadIndexMixin):              cdocs = dict(enumerate(doc_list[2:], 1))          return self.get_msg_from_docs(msg_class, fdoc, hdoc, cdocs) +    # XXX pass UID too ?      def get_msg_from_mdoc_id(self, MessageClass, store, doc_id,                               get_cdocs=False):          metamsg_id = doc_id          def wrap_meta_doc(doc):              cls = MetaMsgDocWrapper +            # XXX pass UID?              return cls(doc_id=doc.doc_id, **doc.content)          def get_part_docs_from_mdoc_wrapper(wrapper): @@ -692,8 +699,8 @@ class SoledadMailAdaptor(SoledadIndexMixin):                  return constants.FDOCID.format(mbox=mbox, chash=chash)              d_docs = [] -            fdoc_id = _get_fdoc_id_from_mdoc_id(doc_id) -            hdoc_id = _get_hdoc_id_from_mdoc_id(doc_id) +            fdoc_id = _get_fdoc_id_from_mdoc_id() +            hdoc_id = _get_hdoc_id_from_mdoc_id()              d_docs.append(store.get_doc(fdoc_id))              d_docs.append(store.get_doc(hdoc_id))              d = defer.gatherResults(d_docs) diff --git a/mail/src/leap/mail/constants.py b/mail/src/leap/mail/constants.py index bf1db7f..d76e652 100644 --- a/mail/src/leap/mail/constants.py +++ b/mail/src/leap/mail/constants.py @@ -36,3 +36,17 @@ HDOCID_RE = "H\-[0-9a-fA-F]+"  CDOCID = "C-{phash}"  CDOCID_RE = "C\-[0-9a-fA-F]+" + + +class MessageFlags(object): +    """ +    Flags used in Message and Mailbox. +    """ +    SEEN_FLAG = "\\Seen" +    RECENT_FLAG = "\\Recent" +    ANSWERED_FLAG = "\\Answered" +    FLAGGED_FLAG = "\\Flagged"  # yo dawg +    DELETED_FLAG = "\\Deleted" +    DRAFT_FLAG = "\\Draft" +    NOSELECT_FLAG = "\\Noselect" +    LIST_FLAG = "List"  # is this OK? (no \. ie, no system flag) diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index 7dfbbd1..0baf078 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # account.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013-2015 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -15,12 +15,12 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  """ -Soledad Backed Account. +Soledad Backed IMAP Account.  """ -import copy  import logging  import os  import time +from functools import partial  from twisted.internet import defer  from twisted.mail import imap4 @@ -29,9 +29,9 @@ from zope.interface import implements  from leap.common.check import leap_assert, leap_assert_type +from leap.mail.constants import MessageFlags  from leap.mail.mail import Account -from leap.mail.imap.fields import WithMsgFields -from leap.mail.imap.mailbox import SoledadMailbox, normalize_mailbox +from leap.mail.imap.mailbox import IMAPMailbox, normalize_mailbox  from leap.soledad.client import Soledad  logger = logging.getLogger(__name__) @@ -49,9 +49,10 @@ if PROFILE_CMD:  # Soledad IMAP Account  ####################################### -# TODO remove MsgFields too +# XXX watchout, account needs to be ready... so we should maybe return +# a deferred to the IMAP service when it's initialized -class IMAPAccount(WithMsgFields): +class IMAPAccount(object):      """      An implementation of an imap4 Account      that is backed by Soledad Encrypted Documents. @@ -72,37 +73,20 @@ class IMAPAccount(WithMsgFields):          :param store: a Soledad instance.          :type store: Soledad          """ -        # XXX assert a generic store interface instead, so that we -        # can plug the memory store wrapper seamlessly.          leap_assert(store, "Need a store instance to initialize")          leap_assert_type(store, Soledad) -        # XXX SHOULD assert too that the name matches the user/uuid with which +        # TODO assert too that the name matches the user/uuid with which          # soledad has been initialized.          self.user_id = user_id          self.account = Account(store) -    # XXX should hide this in the adaptor... -    def _get_mailbox_by_name(self, name): -        """ -        Return an mbox document by name. - -        :param name: the name of the mailbox -        :type name: str - -        :rtype: SoledadDocument -        """ -        def get_first_if_any(docs): -            return docs[0] if docs else None - -        d = self._store.get_from_index( -            self.TYPE_MBOX_IDX, self.MBOX_KEY, -            normalize_mailbox(name)) -        d.addCallback(get_first_if_any) -        return d +    def _return_mailbox_from_collection(self, collection, readwrite=1): +        if collection is None: +            return None +        return IMAPMailbox(collection, rw=readwrite) -    # XXX move to Account? -    # XXX needed? +    # XXX Where's this used from? -- self.delete...      def getMailbox(self, name):          """          Return a Mailbox with that name, without selecting it. @@ -110,31 +94,25 @@ class IMAPAccount(WithMsgFields):          :param name: name of the mailbox          :type name: str -        :returns: a a SoledadMailbox instance -        :rtype: SoledadMailbox +        :returns: an IMAPMailbox instance +        :rtype: IMAPMailbox          """          name = normalize_mailbox(name) -        if name not in self.account.mailboxes: -            raise imap4.MailboxException("No such mailbox: %r" % name) +        def check_it_exists(mailboxes): +            if name not in mailboxes: +                raise imap4.MailboxException("No such mailbox: %r" % name) -        # XXX Does mailbox really need reference to soledad? -        return SoledadMailbox(name, self._store) +        d = self.account.list_all_mailbox_names() +        d.addCallback(check_it_exists) +        d.addCallback(lambda _: self.account.get_collection_by_mailbox, name) +        d.addCallbacK(self._return_mailbox_from_collection) +        return d      #      # IAccount      # -    def _get_empty_mailbox(self): -        """ -        Returns an empty mailbox. - -        :rtype: dict -        """ -        # XXX move to mailbox module -        return copy.deepcopy(mailbox.EMPTY_MBOX) - -    # TODO use mail.Account.add_mailbox      def addMailbox(self, name, creation_ts=None):          """          Add a mailbox to the account. @@ -154,8 +132,9 @@ class IMAPAccount(WithMsgFields):          leap_assert(name, "Need a mailbox name to create a mailbox") -        if name in self.mailboxes: -            raise imap4.MailboxCollision(repr(name)) +        def check_it_does_not_exist(mailboxes): +            if name in mailboxes: +                raise imap4.MailboxCollision(repr(name))          if creation_ts is None:              # by default, we pass an int value @@ -164,21 +143,18 @@ class IMAPAccount(WithMsgFields):              # mailbox-uidvalidity.              creation_ts = int(time.time() * 10E2) -        mbox = self._get_empty_mailbox() -        mbox[self.MBOX_KEY] = name -        mbox[self.CREATED_KEY] = creation_ts - -        def load_mbox_cache(result): -            d = self._load_mailboxes() -            d.addCallback(lambda _: result) +        def set_mbox_creation_ts(collection): +            d = collection.set_mbox_attr("created") +            d.addCallback(lambda _: collection)              return d -        d = self._store.create_doc(mbox) -        d.addCallback(load_mbox_cache) +        d = self.account.list_all_mailbox_names() +        d.addCallback(check_it_does_not_exist) +        d.addCallback(lambda _: self.account.get_collection_by_mailbox, name) +        d.addCallback(set_mbox_creation_ts) +        d.addCallback(self._return_mailbox_from_collection)          return d -    # TODO use mail.Account.create_mailbox? -    # Watch out, imap specific exceptions raised here.      def create(self, pathspec):          """          Create a new mailbox from the given hierarchical name. @@ -204,9 +180,10 @@ class IMAPAccount(WithMsgFields):          for accum in range(1, len(paths)):              try: -                partial = sep.join(paths[:accum]) -                d = self.addMailbox(partial) +                partial_path = sep.join(paths[:accum]) +                d = self.addMailbox(partial_path)                  subs.append(d) +            # XXX should this be handled by the deferred?              except imap4.MailboxCollision:                  pass          try: @@ -222,21 +199,13 @@ class IMAPAccount(WithMsgFields):          def all_good(result):              return all(result) -        def load_mbox_cache(result): -            d = self._load_mailboxes() -            d.addCallback(lambda _: result) -            return d -          if subs:              d1 = defer.gatherResults(subs, consumeErrors=True) -            d1.addCallback(load_mbox_cache)              d1.addCallback(all_good)          else:              d1 = defer.succeed(False) -            d1.addCallback(load_mbox_cache)          return d1 -    # TODO use mail.Account.get_collection_by_mailbox      def select(self, name, readwrite=1):          """          Selects a mailbox. @@ -250,15 +219,28 @@ class IMAPAccount(WithMsgFields):          :rtype: SoledadMailbox          """          name = normalize_mailbox(name) -        if name not in self.mailboxes: -            logger.warning("No such mailbox!") -            return None -        self.selected = name -        sm = SoledadMailbox(name, self._store, readwrite) -        return sm +        def check_it_exists(mailboxes): +            if name not in mailboxes: +                logger.warning("SELECT: No such mailbox!") +                return None +            return name + +        def set_selected(_): +            self.selected = name + +        def get_collection(name): +            if name is None: +                return None +            return self.account.get_collection_by_mailbox(name) + +        d = self.account.list_all_mailbox_names() +        d.addCallback(check_it_exists) +        d.addCallback(get_collection) +        d.addCallback(partial( +            self._return_mailbox_from_collection, readwrite=readwrite)) +        return d -    # TODO use mail.Account.delete_mailbox      def delete(self, name, force=False):          """          Deletes a mailbox. @@ -276,37 +258,52 @@ class IMAPAccount(WithMsgFields):          :rtype: Deferred          """          name = normalize_mailbox(name) +        _mboxes = [] -        if name not in self.mailboxes: -            err = imap4.MailboxException("No such mailbox: %r" % name) -            return defer.fail(err) -        mbox = self.getMailbox(name) +        def check_it_exists(mailboxes): +            # FIXME works? -- pass variable ref to outer scope +            _mboxes = mailboxes +            if name not in mailboxes: +                err = imap4.MailboxException("No such mailbox: %r" % name) +                return defer.fail(err) -        if not force: +        def get_mailbox(_): +            return self.getMailbox(name) + +        def destroy_mailbox(mbox): +            return mbox.destroy() + +        def check_can_be_deleted(mbox):              # See if this box is flagged \Noselect -            # XXX use mbox.flags instead?              mbox_flags = mbox.getFlags() -            if self.NOSELECT_FLAG in mbox_flags: +            if MessageFlags.NOSELECT_FLAG in mbox_flags:                  # Check for hierarchically inferior mailboxes with this one                  # as part of their root. -                for others in self.mailboxes: +                for others in _mboxes:                      if others != name and others.startswith(name):                          err = imap4.MailboxException(                              "Hierarchically inferior mailboxes "                              "exist and \\Noselect is set")                          return defer.fail(err) -        self.__mailboxes.discard(name) -        return mbox.destroy() +            return mbox -        # XXX FIXME --- not honoring the inferior names... +        d = self.account.list_all_mailbox_names() +        d.addCallback(check_it_exists) +        d.addCallback(get_mailbox) +        if not force: +            d.addCallback(check_can_be_deleted) +        d.addCallback(destroy_mailbox) +        return d +        # FIXME --- not honoring the inferior names...          # if there are no hierarchically inferior names, we will          # delete it from our ken. +        # XXX is this right?          # if self._inferiorNames(name) > 1: -        #  ??! -- can this be rite? -        # self._index.removeMailbox(name) +        #   self._index.removeMailbox(name)      # TODO use mail.Account.rename_mailbox +    # TODO finish conversion to deferreds      def rename(self, oldname, newname):          """          Renames a mailbox. @@ -320,6 +317,9 @@ class IMAPAccount(WithMsgFields):          oldname = normalize_mailbox(oldname)          newname = normalize_mailbox(newname) +        # FIXME check that scope works (test) +        _mboxes = [] +          if oldname not in self.mailboxes:              raise imap4.NoSuchMailbox(repr(oldname)) @@ -327,34 +327,19 @@ class IMAPAccount(WithMsgFields):          inferiors = [(o, o.replace(oldname, newname, 1)) for o in inferiors]          for (old, new) in inferiors: -            if new in self.mailboxes: +            if new in _mboxes:                  raise imap4.MailboxCollision(repr(new))          rename_deferreds = [] -        def load_mbox_cache(result): -            d = self._load_mailboxes() -            d.addCallback(lambda _: result) -            return d - -        def update_mbox_doc_name(mbox, oldname, newname, update_deferred): -            mbox.content[self.MBOX_KEY] = newname -            d = self._soledad.put_doc(mbox) -            d.addCallback(lambda r: update_deferred.callback(True)) -          for (old, new) in inferiors: -            self.__mailboxes.discard(old) -            self._memstore.rename_fdocs_mailbox(old, new) - -            d0 = defer.Deferred() -            d = self._get_mailbox_by_name(old) -            d.addCallback(update_mbox_doc_name, old, new, d0) -            rename_deferreds.append(d0) +            d = self.account.rename_mailbox(old, new) +            rename_deferreds.append(d)          d1 = defer.gatherResults(rename_deferreds, consumeErrors=True) -        d1.addCallback(load_mbox_cache)          return d1 +    # FIXME use deferreds (list_all_mailbox_names, etc)      def _inferiorNames(self, name):          """          Return hierarchically inferior mailboxes. @@ -387,16 +372,15 @@ class IMAPAccount(WithMsgFields):          :type wildcard: str          """          # XXX use wildcard in index query -        ref = self._inferiorNames(normalize_mailbox(ref)) +        # TODO get deferreds          wildcard = imap4.wildcardToRegexp(wildcard, '/') +        ref = self._inferiorNames(normalize_mailbox(ref))          return [(i, self.getMailbox(i)) for i in ref if wildcard.match(i)]      #      # The rest of the methods are specific for leap.mail.imap.account.Account      # -    # TODO ------------------ can we preserve the attr? -    # maybe add to memory store.      def isSubscribed(self, name):          """          Returns True if user is subscribed to this mailbox. @@ -406,63 +390,13 @@ class IMAPAccount(WithMsgFields):          :rtype: Deferred (will fire with bool)          """ -        # TODO use Flags class -        subscribed = self.SUBSCRIBED_KEY - -        def is_subscribed(mbox): -            subs_bool = bool(mbox.content.get(subscribed, False)) -            return subs_bool - -        d = self._get_mailbox_by_name(name) -        d.addCallback(is_subscribed) -        return d - -    # TODO ------------------ can we preserve the property? -    # maybe add to memory store. - -    def _get_subscriptions(self): -        """ -        Return a list of the current subscriptions for this account. - -        :returns: A deferred that will fire with the subscriptions. -        :rtype: Deferred -        """ -        def get_docs_content(docs): -            return [doc.content[self.MBOX_KEY] for doc in docs] - -        d = self._store.get_from_index( -            self.TYPE_SUBS_IDX, self.MBOX_KEY, '1') -        d.addCallback(get_docs_content) -        return d - -    def _set_subscription(self, name, value): -        """ -        Sets the subscription value for a given mailbox - -        :param name: the mailbox -        :type name: str - -        :param value: the boolean value -        :type value: bool -        """ -        # XXX Note that this kind of operation has -        # no guarantees of atomicity. We should not be accessing mbox -        # documents concurrently. - -        subscribed = self.SUBSCRIBED_KEY +        name = normalize_mailbox(name) -        def update_subscribed_value(mbox): -            mbox.content[subscribed] = value -            return self._store.put_doc(mbox) +        def get_subscribed(mbox): +            return mbox.get_mbox_attr("subscribed") -        # maybe we should store subscriptions in another -        # document... -        if name not in self.mailboxes: -            d = self.addMailbox(name) -            d.addCallback(lambda v: self._get_mailbox_by_name(name)) -        else: -            d = self._get_mailbox_by_name(name) -        d.addCallback(update_subscribed_value) +        d = self.getMailbox(name) +        d.addCallback(get_subscribed)          return d      def subscribe(self, name): @@ -475,11 +409,11 @@ class IMAPAccount(WithMsgFields):          """          name = normalize_mailbox(name) -        def check_and_subscribe(subscriptions): -            if name not in subscriptions: -                return self._set_subscription(name, True) -        d = self._get_subscriptions() -        d.addCallback(check_and_subscribe) +        def set_subscribed(mbox): +            return mbox.set_mbox_attr("subscribed", True) + +        d = self.getMailbox(name) +        d.addCallback(set_subscribed)          return d      def unsubscribe(self, name): @@ -492,17 +426,17 @@ class IMAPAccount(WithMsgFields):          """          name = normalize_mailbox(name) -        def check_and_unsubscribe(subscriptions): -            if name not in subscriptions: -                raise imap4.MailboxException( -                    "Not currently subscribed to %r" % name) -            return self._set_subscription(name, False) -        d = self._get_subscriptions() -        d.addCallback(check_and_unsubscribe) +        def set_unsubscribed(mbox): +            return mbox.set_mbox_attr("subscribed", False) + +        d = self.getMailbox(name) +        d.addCallback(set_unsubscribed)          return d +    # TODO -- get__all_mboxes, return tuple +    # with ... name? and subscribed bool...      def getSubscriptions(self): -        return self._get_subscriptions() +        raise NotImplementedError()      #      # INamespacePresenter @@ -517,20 +451,6 @@ class IMAPAccount(WithMsgFields):      def getOtherNamespaces(self):          return None -    # extra, for convenience - -    def deleteAllMessages(self, iknowhatiamdoing=False): -        """ -        Deletes all messages from all mailboxes. -        Danger! high voltage! - -        :param iknowhatiamdoing: confirmation parameter, needs to be True -                                 to proceed. -        """ -        if iknowhatiamdoing is True: -            for mbox in self.mailboxes: -                self.delete(mbox, force=True) -      def __repr__(self):          """          Representation string for this object. diff --git a/mail/src/leap/mail/imap/fields.py b/mail/src/leap/mail/imap/fields.py deleted file mode 100644 index a751c6d..0000000 --- a/mail/src/leap/mail/imap/fields.py +++ /dev/null @@ -1,51 +0,0 @@ -# -*- coding: utf-8 -*- -# fields.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. -""" -Fields for Mailbox and Message. -""" - -# TODO deprecate !!! (move all to constants maybe?) -# Flags -> foo - - -class WithMsgFields(object): -    """ -    Container class for class-attributes to be shared by -    several message-related classes. -    """ -    # Mailbox specific keys -    CREATED_KEY = "created"  # used??? - -    RECENTFLAGS_KEY = "rct" -    HDOCS_SET_KEY = "hdocset" - -    # Flags in Mailbox and Message -    SEEN_FLAG = "\\Seen" -    RECENT_FLAG = "\\Recent" -    ANSWERED_FLAG = "\\Answered" -    FLAGGED_FLAG = "\\Flagged"  # yo dawg -    DELETED_FLAG = "\\Deleted" -    DRAFT_FLAG = "\\Draft" -    NOSELECT_FLAG = "\\Noselect" -    LIST_FLAG = "List"  # is this OK? (no \. ie, no system flag) - -    # Fields in mail object -    SUBJECT_FIELD = "Subject" -    DATE_FIELD = "Date" - - -fields = WithMsgFields  # alias for convenience diff --git a/mail/src/leap/mail/imap/interfaces.py b/mail/src/leap/mail/imap/interfaces.py deleted file mode 100644 index f8f25fa..0000000 --- a/mail/src/leap/mail/imap/interfaces.py +++ /dev/null @@ -1,96 +0,0 @@ -# -*- coding: utf-8 -*- -# interfaces.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. -""" -Interfaces for the IMAP module. -""" -from zope.interface import Interface, Attribute - - -# TODO remove  ---------------- -class IMessageContainer(Interface): -    """ -    I am a container around the different documents that a message -    is split into. -    """ -    fdoc = Attribute('The flags document for this message, if any.') -    hdoc = Attribute('The headers document for this message, if any.') -    cdocs = Attribute('The dict of content documents for this message, ' -                      'if any.') - -    def walk(self): -        """ -        Return an iterator to the docs for all the parts. - -        :rtype: iterator -        """ - - -# TODO remove -------------------- -class IMessageStore(Interface): -    """ -    I represent a generic storage for LEAP Messages. -    """ - -    def create_message(self, mbox, uid, message): -        """ -        Put the passed message into this IMessageStore. - -        :param mbox: the mbox this message belongs. -        :param uid: the UID that identifies this message in this mailbox. -        :param message: a IMessageContainer implementor. -        """ - -    def put_message(self, mbox, uid, message): -        """ -        Put the passed message into this IMessageStore. - -        :param mbox: the mbox this message belongs. -        :param uid: the UID that identifies this message in this mailbox. -        :param message: a IMessageContainer implementor. -        """ - -    def remove_message(self, mbox, uid): -        """ -        Remove the given message from this IMessageStore. - -        :param mbox: the mbox this message belongs. -        :param uid: the UID that identifies this message in this mailbox. -        """ - -    def get_message(self, mbox, uid): -        """ -        Get a IMessageContainer for the given mbox and uid combination. - -        :param mbox: the mbox this message belongs. -        :param uid: the UID that identifies this message in this mailbox. -        :return: IMessageContainer -        """ - - -class IMessageStoreWriter(Interface): -    """ -    I represent a storage that is able to write its contents to another -    different IMessageStore. -    """ - -    def write_messages(self, store): -        """ -        Write the documents in this IMessageStore to a different -        storage. Usually this will be done from a MemoryStorage to a DbStorage. - -        :param store: another IMessageStore implementor. -        """ diff --git a/mail/src/leap/mail/imap/mailbox.py b/mail/src/leap/mail/imap/mailbox.py index ea54d33..faeba9d 100644 --- a/mail/src/leap/mail/imap/mailbox.py +++ b/mail/src/leap/mail/imap/mailbox.py @@ -1,6 +1,6 @@  # *- coding: utf-8 -*-  # mailbox.py -# Copyright (C) 2013, 2014 LEAP +# Copyright (C) 2013-2015 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -15,11 +15,9 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  """ -Soledad Mailbox. +IMAP Mailbox.  """ -import copy  import re -import threading  import logging  import StringIO  import cStringIO @@ -29,7 +27,6 @@ from collections import defaultdict  from twisted.internet import defer  from twisted.internet import reactor -from twisted.internet.task import deferLater  from twisted.python import log  from twisted.mail import imap4 @@ -38,17 +35,15 @@ from zope.interface import implements  from leap.common import events as leap_events  from leap.common.events.events_pb2 import IMAP_UNREAD_MAIL  from leap.common.check import leap_assert, leap_assert_type -from leap.mail.constants import INBOX_NAME -from leap.mail.decorators import deferred_to_thread -from leap.mail.utils import empty -from leap.mail.imap.fields import WithMsgFields, fields -from leap.mail.imap.messages import MessageCollection -from leap.mail.imap.messageparts import MessageWrapper +from leap.mail.constants import INBOX_NAME, MessageFlags  logger = logging.getLogger(__name__) -# TODO +# TODO LIST  # [ ] Restore profile_cmd instrumentation +# [ ] finish the implementation of IMailboxListener +# [ ] implement the rest of ISearchableMailbox +  """  If the environment variable `LEAP_SKIPNOTIFY` is set, we avoid @@ -75,16 +70,20 @@ if PROFILE_CMD:          d.addCallback(_debugProfiling, name, time.time())          d.addErrback(lambda f: log.msg(f.getTraceback())) +INIT_FLAGS = (MessageFlags.SEEN_FLAG, MessageFlags.ANSWERED_FLAG, +              MessageFlags.FLAGGED_FLAG, MessageFlags.DELETED_FLAG, +              MessageFlags.DRAFT_FLAG, MessageFlags.RECENT_FLAG, +              MessageFlags.LIST_FLAG) + -# TODO Rename to Mailbox -# TODO Remove WithMsgFields -class SoledadMailbox(WithMsgFields): +class IMAPMailbox(object):      """      A Soledad-backed IMAP mailbox.      Implements the high-level method needed for the Mailbox interfaces. -    The low-level database methods are contained in MessageCollection class, -    which we instantiate and make accessible in the `messages` attribute. +    The low-level database methods are contained in IMAPMessageCollection +    class, which we instantiate and make accessible in the `messages` +    attribute.      """      implements(          imap4.IMailbox, @@ -93,17 +92,7 @@ class SoledadMailbox(WithMsgFields):          imap4.ISearchableMailbox,          imap4.IMessageCopier) -    # XXX should finish the implementation of IMailboxListener -    # XXX should completely implement ISearchableMailbox too - -    messages = None -    _closed = False - -    INIT_FLAGS = (WithMsgFields.SEEN_FLAG, WithMsgFields.ANSWERED_FLAG, -                  WithMsgFields.FLAGGED_FLAG, WithMsgFields.DELETED_FLAG, -                  WithMsgFields.DRAFT_FLAG, WithMsgFields.RECENT_FLAG, -                  WithMsgFields.LIST_FLAG) -    flags = None +    init_flags = INIT_FLAGS      CMD_MSG = "MESSAGES"      CMD_RECENT = "RECENT" @@ -111,58 +100,31 @@ class SoledadMailbox(WithMsgFields):      CMD_UIDVALIDITY = "UIDVALIDITY"      CMD_UNSEEN = "UNSEEN" -    # FIXME we should turn this into a datastructure with limited capacity +    # TODO we should turn this into a datastructure with limited capacity      _listeners = defaultdict(set) -    next_uid_lock = threading.Lock() -    last_uid_lock = threading.Lock() - -    # TODO unify all the `primed` dicts -    _fdoc_primed = {} -    _last_uid_primed = {} -    _known_uids_primed = {} - -    # TODO pass the collection to the constructor -    # TODO pass the mbox_doc too -    def __init__(self, mbox, store, rw=1): +    def __init__(self, collection, rw=1):          """          SoledadMailbox constructor. Needs to get passed a name, plus a          Soledad instance. -        :param mbox: the mailbox name -        :type mbox: str - -        :param store: -        :type store: Soledad +        :param collection: instance of IMAPMessageCollection +        :type collection: IMAPMessageCollection          :param rw: read-and-write flag for this mailbox          :type rw: int          """ -        leap_assert(mbox, "Need a mailbox name to initialize") -        leap_assert(store, "Need a store instance to initialize") - -        self.mbox = normalize_mailbox(mbox)          self.rw = rw -        self.store = store - -        self.messages = MessageCollection(mbox=mbox, soledad=store)          self._uidvalidity = None +        self.collection = collection -        # XXX careful with this get/set (it would be -        # hitting db unconditionally, move to memstore too) -        # Now it's returning a fixed amount of flags from mem -        # as a workaround.          if not self.getFlags(): -            self.setFlags(self.INIT_FLAGS) - -        if self._memstore: -            self.prime_known_uids_to_memstore() -            self.prime_last_uid_to_memstore() -            self.prime_flag_docs_to_memstore() +            self.setFlags(self.init_flags) -        # purge memstore from empty fdocs. -        self._memstore.purge_fdoc_store(mbox) +    @property +    def mbox_name(self): +        return self.collection.mbox_name      @property      def listeners(self): @@ -175,11 +137,12 @@ class SoledadMailbox(WithMsgFields):          :rtype: set          """ -        return self._listeners[self.mbox] +        return self._listeners[self.mbox_name] -    # TODO this grows too crazily when many instances are fired, like +    # FIXME this grows too crazily when many instances are fired, like      # during imaptest stress testing. Should have a queue of limited size      # instead. +      def addListener(self, listener):          """          Add a listener to the listeners queue. @@ -204,16 +167,6 @@ class SoledadMailbox(WithMsgFields):          """          self.listeners.remove(listener) -    def _get_mbox_doc(self): -        """ -        Return mailbox document. - -        :return: A SoledadDocument containing this mailbox, or None if -                 the query failed. -        :rtype: SoledadDocument or None. -        """ -        return self._memstore.get_mbox_doc(self.mbox) -      def getFlags(self):          """          Returns the flags defined for this mailbox. @@ -221,10 +174,11 @@ class SoledadMailbox(WithMsgFields):          :returns: tuple of flags for this mailbox          :rtype: tuple of str          """ -        flags = self._memstore.get_mbox_flags(self.mbox) +        flags = self.collection.mbox_wrapper.flags          if not flags: -            flags = self.INIT_FLAGS -        return map(str, flags) +            flags = self.init_flags +        flags_str = map(str, flags) +        return flags_str      def setFlags(self, flags):          """ @@ -234,98 +188,31 @@ class SoledadMailbox(WithMsgFields):          :type flags: tuple of str          """          # XXX this is setting (overriding) old flags. +        # Better pass a mode flag          leap_assert(isinstance(flags, tuple),                      "flags expected to be a tuple") -        self._memstore.set_mbox_flags(self.mbox, flags) - -    # XXX SHOULD BETTER IMPLEMENT ADD_FLAG, REMOVE_FLAG. +        return self.collection.set_mbox_attr("flags", flags) -    def _get_closed(self): +    @property +    def is_closed(self):          """          Return the closed attribute for this mailbox.          :return: True if the mailbox is closed          :rtype: bool          """ -        return self._memstore.get_mbox_closed(self.mbox) +        return self.collection.get_mbox_attr("closed") -    def _set_closed(self, closed): +    def set_closed(self, closed):          """          Set the closed attribute for this mailbox.          :param closed: the state to be set          :type closed: bool -        """ -        self._memstore.set_mbox_closed(self.mbox, closed) - -    closed = property( -        _get_closed, _set_closed, doc="Closed attribute.") - -    def _get_last_uid(self): -        """ -        Return the last uid for this mailbox. -        If we have a memory store, the last UID will be the highest -        recorded UID in the message store, or a counter cached from -        the mailbox document in soledad if this is higher. - -        :return: the last uid for messages in this mailbox -        :rtype: int -        """ -        last = self._memstore.get_last_uid(self.mbox) -        logger.debug("last uid for %s: %s (from memstore)" % ( -            repr(self.mbox), last)) -        return last - -    last_uid = property( -        _get_last_uid, doc="Last_UID attribute.") -    def prime_last_uid_to_memstore(self): -        """ -        Prime memstore with last_uid value -        """ -        primed = self._last_uid_primed.get(self.mbox, False) -        if not primed: -            mbox = self._get_mbox_doc() -            if mbox is None: -                # memory-only store -                return -            last = mbox.content.get('lastuid', 0) -            logger.info("Priming Soledad last_uid to %s" % (last,)) -            self._memstore.set_last_soledad_uid(self.mbox, last) -            self._last_uid_primed[self.mbox] = True - -    def prime_known_uids_to_memstore(self): -        """ -        Prime memstore with the set of all known uids. - -        We do this to be able to filter the requests efficiently. -        """ -        primed = self._known_uids_primed.get(self.mbox, False) -        # XXX handle the maybeDeferred - -        def set_primed(known_uids): -            self._memstore.set_known_uids(self.mbox, known_uids) -            self._known_uids_primed[self.mbox] = True - -        if not primed: -            d = self.messages.all_soledad_uid_iter() -            d.addCallback(set_primed) -            return d - -    def prime_flag_docs_to_memstore(self): -        """ -        Prime memstore with all the flags documents. +        :rtype: Deferred          """ -        primed = self._fdoc_primed.get(self.mbox, False) - -        def set_flag_docs(flag_docs): -            self._memstore.load_flag_docs(self.mbox, flag_docs) -            self._fdoc_primed[self.mbox] = True - -        if not primed: -            d = self.messages.get_all_soledad_flag_docs() -            d.addCallback(set_flag_docs) -            return d +        return self.collection.set_mbox_attr("closed", closed)      def getUIDValidity(self):          """ @@ -334,12 +221,7 @@ class SoledadMailbox(WithMsgFields):          :return: unique validity identifier          :rtype: int          """ -        if self._uidvalidity is None: -            mbox = self._get_mbox_doc() -            if mbox is None: -                return 0 -            self._uidvalidity = mbox.content.get(self.CREATED_KEY, 1) -        return self._uidvalidity +        return self.collection.get_mbox_attr("created")      def getUID(self, message):          """ @@ -354,9 +236,9 @@ class SoledadMailbox(WithMsgFields):          :rtype: int          """ -        msg = self.messages.get_msg_by_uid(message) -        if msg is not None: -            return msg.getUID() +        d = self.collection.get_msg_by_uid(message) +        d.addCallback(lambda m: m.getUID()) +        return d      def getUIDNext(self):          """ @@ -364,23 +246,20 @@ class SoledadMailbox(WithMsgFields):          mailbox. Currently it returns the higher UID incremented by          one. -        We increment the next uid *each* time this function gets called. -        In this way, there will be gaps if the message with the allocated -        uid cannot be saved. But that is preferable to having race conditions -        if we get to parallel message adding. - -        :rtype: int +        :return: deferred with int +        :rtype: Deferred          """ -        with self.next_uid_lock: -            return self.last_uid + 1 +        d = self.collection.get_uid_next() +        return d      def getMessageCount(self):          """          Returns the total count of messages in this mailbox. -        :rtype: int +        :return: deferred with int +        :rtype: Deferred          """ -        return self.messages.count() +        return self.collection.count()      def getUnseenCount(self):          """ @@ -389,7 +268,7 @@ class SoledadMailbox(WithMsgFields):          :return: count of messages flagged `unseen`          :rtype: int          """ -        return self.messages.count_unseen() +        return self.collection.count_unseen()      def getRecentCount(self):          """ @@ -398,7 +277,7 @@ class SoledadMailbox(WithMsgFields):          :return: count of messages flagged `recent`          :rtype: int          """ -        return self.messages.count_recent() +        return self.collection.count_recent()      def isWriteable(self):          """ @@ -407,6 +286,8 @@ class SoledadMailbox(WithMsgFields):          :return: 1 if mailbox is read-writeable, 0 otherwise.          :rtype: int          """ +        # XXX We don't need to store it in the mbox doc, do we? +        # return int(self.collection.get_mbox_attr('rw'))          return self.rw      def getHierarchicalDelimiter(self): @@ -431,14 +312,14 @@ class SoledadMailbox(WithMsgFields):          if self.CMD_RECENT in names:              r[self.CMD_RECENT] = self.getRecentCount()          if self.CMD_UIDNEXT in names: -            r[self.CMD_UIDNEXT] = self.last_uid + 1 +            r[self.CMD_UIDNEXT] = self.getUIDNext()          if self.CMD_UIDVALIDITY in names:              r[self.CMD_UIDVALIDITY] = self.getUIDValidity()          if self.CMD_UNSEEN in names:              r[self.CMD_UNSEEN] = self.getUnseenCount()          return defer.succeed(r) -    def addMessage(self, message, flags, date=None, notify_on_disk=False): +    def addMessage(self, message, flags, date=None):          """          Adds a message to this mailbox. @@ -464,10 +345,8 @@ class SoledadMailbox(WithMsgFields):          else:              flags = tuple(str(flag) for flag in flags) -        d = self._do_add_message(message, flags=flags, date=date, -                                 notify_on_disk=notify_on_disk) -        #if PROFILE_CMD: -            #do_profile_cmd(d, "APPEND") +        # if PROFILE_CMD: +        # do_profile_cmd(d, "APPEND")          # XXX should review now that we're not using qtreactor.          # A better place for this would be  the COPY/APPEND dispatcher @@ -478,19 +357,11 @@ class SoledadMailbox(WithMsgFields):              reactor.callLater(0, self.notify_new)              return x +        d = self.collection.add_message(flags=flags, date=date)          d.addCallback(notifyCallback)          d.addErrback(lambda f: log.msg(f.getTraceback()))          return d -    def _do_add_message(self, message, flags, date, notify_on_disk=False): -        """ -        Calls to the messageCollection add_msg method. -        Invoked from addMessage. -        """ -        d = self.messages.add_msg(message, flags=flags, date=date, -                                  notify_on_disk=notify_on_disk) -        return d -      def notify_new(self, *args):          """          Notify of new messages to all the listeners. @@ -502,26 +373,34 @@ class SoledadMailbox(WithMsgFields):          def cbNotifyNew(result):              exists, recent = result -            for l in self.listeners: -                l.newMessages(exists, recent) +            for listener in self.listeners: +                listener.newMessages(exists, recent) +          d = self._get_notify_count()          d.addCallback(cbNotifyNew)          d.addCallback(self.cb_signal_unread_to_ui) -    @deferred_to_thread      def _get_notify_count(self):          """          Get message count and recent count for this mailbox          Executed in a separate thread. Called from notify_new. -        :return: number of messages and number of recent messages. -        :rtype: tuple +        :return: a deferred that will fire with a tuple, with number of +                 messages and number of recent messages. +        :rtype: Deferred          """ -        exists = self.getMessageCount() -        recent = self.getRecentCount() -        logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( -            self.mbox, exists, recent)) -        return exists, recent +        d_exists = self.getMessageCount() +        d_recent = self.getRecentCount() +        d_list = [d_exists, d_recent] + +        def log_num_msg(result): +            exists, recent = result +            logger.debug("NOTIFY (%r): there are %s messages, %s recent" % ( +                         self.mbox_name, exists, recent)) + +        d = defer.gatherResults(d_list) +        d.addCallback(log_num_msg) +        return d      # commands, do not rename methods @@ -533,27 +412,18 @@ class SoledadMailbox(WithMsgFields):          on the mailbox.          """ -        # XXX this will overwrite all the existing flags! +        # XXX this will overwrite all the existing flags          # should better simply addFlag -        self.setFlags((self.NOSELECT_FLAG,)) - -        # XXX removing the mailbox in situ for now, -        # we should postpone the removal - -        def remove_mbox_doc(ignored): -            # XXX move to memory store?? +        self.setFlags((MessageFlags.NOSELECT_FLAG,)) -            def _remove_mbox_doc(doc): -                if doc is None: -                    # memory-only store! -                    return defer.succeed(True) -                return self._soledad.delete_doc(doc) - -            doc = self._get_mbox_doc() -            return _remove_mbox_doc(doc) +        def remove_mbox(_): +            # FIXME collection does not have a delete_mbox method, +            # it's in account. +            # XXX should take care of deleting the uid table too. +            return self.collection.delete_mbox(self.mbox_name)          d = self.deleteAllDocs() -        d.addCallback(remove_mbox_doc) +        d.addCallback(remove_mbox)          return d      def _close_cb(self, result): @@ -574,9 +444,11 @@ class SoledadMailbox(WithMsgFields):          if not self.isWriteable():              raise imap4.ReadOnlyMailbox          d = defer.Deferred() -        self._memstore.expunge(self.mbox, d) +        # FIXME actually broken. +        # Iterate through index, and do a expunge.          return d +    # FIXME -- get last_uid from mbox_indexer      def _bound_seq(self, messages_asked):          """          Put an upper bound to a messages sequence if this is open. @@ -596,6 +468,7 @@ class SoledadMailbox(WithMsgFields):                      pass          return messages_asked +    # TODO -- needed? --- we can get the sequence from the indexer.      def _filter_msg_seq(self, messages_asked):          """          Filter a message sequence returning only the ones that do exist in the @@ -627,29 +500,6 @@ class SoledadMailbox(WithMsgFields):          :rtype: deferred          """ -        d = defer.Deferred() - -        # XXX do not need no thread... -        reactor.callInThread(self._do_fetch, messages_asked, uid, d) -        d.addCallback(self.cb_signal_unread_to_ui) -        return d - -    # called in thread -    def _do_fetch(self, messages_asked, uid, d): -        """ -        :param messages_asked: IDs of the messages to retrieve information -                               about -        :type messages_asked: MessageSet - -        :param uid: If true, the IDs are UIDs. They are message sequence IDs -                    otherwise. -        :type uid: bool -        :param d: deferred whose callback will be called with result. -        :type d: Deferred - -        :rtype: A tuple of two-tuples of message sequence numbers and -                LeapMessage -        """          # For the moment our UID is sequential, so we          # can treat them all the same.          # Change this to the flag that twisted expects when we @@ -660,18 +510,23 @@ class SoledadMailbox(WithMsgFields):          messages_asked = self._bound_seq(messages_asked)          seq_messg = self._filter_msg_seq(messages_asked) -        getmsg = lambda uid: self.messages.get_msg_by_uid(uid) +        getmsg = self.collection.get_msg_by_uid          # for sequence numbers (uid = 0)          if sequence:              logger.debug("Getting msg by index: INEFFICIENT call!") +            # TODO --- implement sequences in mailbox indexer              raise NotImplementedError          else:              got_msg = ((msgid, getmsg(msgid)) for msgid in seq_messg)              result = ((msgid, msg) for msgid, msg in got_msg                        if msg is not None) -            self.reactor.callLater(0, self.unset_recent_flags, seq_messg) -            self.reactor.callFromThread(d.callback, result) +            reactor.callLater(0, self.unset_recent_flags, seq_messg) + +        # TODO -- call signal_to_ui +        # d.addCallback(self.cb_signal_unread_to_ui) + +        return result      def fetch_flags(self, messages_asked, uid):          """ @@ -698,12 +553,11 @@ class SoledadMailbox(WithMsgFields):          :rtype: tuple          """          d = defer.Deferred() -        self.reactor.callInThread(self._do_fetch_flags, messages_asked, uid, d) +        reactor.callLater(0, self._do_fetch_flags, messages_asked, uid, d)          if PROFILE_CMD:              do_profile_cmd(d, "FETCH-ALL-FLAGS")          return d -    # called in thread      def _do_fetch_flags(self, messages_asked, uid, d):          """          :param messages_asked: IDs of the messages to retrieve information @@ -733,10 +587,11 @@ class SoledadMailbox(WithMsgFields):          messages_asked = self._bound_seq(messages_asked)          seq_messg = self._filter_msg_seq(messages_asked) -        all_flags = self._memstore.all_flags(self.mbox) +        # FIXME use deferreds here +        all_flags = self.collection.get_all_flags(self.mbox_name)          result = ((msgid, flagsPart(              msgid, all_flags.get(msgid, tuple()))) for msgid in seq_messg) -        self.reactor.callFromThread(d.callback, result) +        d.callback(result)      def fetch_headers(self, messages_asked, uid):          """ @@ -843,8 +698,8 @@ class SoledadMailbox(WithMsgFields):              raise imap4.ReadOnlyMailbox          d = defer.Deferred() -        self.reactor.callLater(0, self._do_store, messages_asked, flags, -                               mode, uid, d) +        reactor.callLater(0, self._do_store, messages_asked, flags, +                          mode, uid, d)          if PROFILE_CMD:              do_profile_cmd(d, "STORE")          d.addCallback(self.cb_signal_unread_to_ui) @@ -853,7 +708,7 @@ class SoledadMailbox(WithMsgFields):      def _do_store(self, messages_asked, flags, mode, uid, observer):          """ -        Helper method, invoke set_flags method in the MessageCollection. +        Helper method, invoke set_flags method in the IMAPMessageCollection.          See the documentation for the `store` method for the parameters. @@ -869,7 +724,8 @@ class SoledadMailbox(WithMsgFields):          flags = tuple(flags)          messages_asked = self._bound_seq(messages_asked)          seq_messg = self._filter_msg_seq(messages_asked) -        self.messages.set_flags(self.mbox, seq_messg, flags, mode, observer) +        self.collection.set_flags( +            self.mbox_name, seq_messg, flags, mode, observer)      # ISearchableMailbox @@ -908,6 +764,7 @@ class SoledadMailbox(WithMsgFields):                  msgid = str(query[3]).strip()                  logger.debug("Searching for %s" % (msgid,))                  d = self.messages._get_uid_from_msgid(str(msgid)) +                # XXX remove gatherResults                  d1 = defer.gatherResults([d])                  # we want a list, so return it all the same                  return d1 @@ -928,94 +785,18 @@ class SoledadMailbox(WithMsgFields):                   uid when the copy succeed.          :rtype: Deferred          """ -        d = defer.Deferred()          if PROFILE_CMD:              do_profile_cmd(d, "COPY")          # A better place for this would be  the COPY/APPEND dispatcher          # in server.py, but qtreactor hangs when I do that, so this seems          # to work fine for now. -        d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new)) -        deferLater(self.reactor, 0, self._do_copy, message, d) -        return d +        #d.addCallback(lambda r: self.reactor.callLater(0, self.notify_new)) +        #deferLater(self.reactor, 0, self._do_copy, message, d) +        #return d -    def _do_copy(self, message, observer): -        """ -        Call invoked from the deferLater in `copy`. This will -        copy the flags and header documents, and pass them to the -        `create_message` method in the MemoryStore, together with -        the observer deferred that we've been passed along. - -        :param message: an IMessage implementor -        :type message: LeapMessage -        :param observer: the deferred that will fire with the -                         UID of the message -        :type observer: Deferred -        """ -        memstore = self._memstore - -        def createCopy(result): -            exist, new_fdoc = result -            if exist: -                # Should we signal error on the callback? -                logger.warning("Destination message already exists!") - -                # XXX I'm not sure if we should raise the -                # errback. This actually rases an ugly warning -                # in some muas like thunderbird. -                # UID 0 seems a good convention for no uid. -                observer.callback(0) -            else: -                mbox = self.mbox -                uid_next = memstore.increment_last_soledad_uid(mbox) - -                new_fdoc[self.UID_KEY] = uid_next -                new_fdoc[self.MBOX_KEY] = mbox - -                flags = list(new_fdoc[self.FLAGS_KEY]) -                flags.append(fields.RECENT_FLAG) -                new_fdoc[self.FLAGS_KEY] = tuple(set(flags)) - -                # FIXME set recent! - -                self._memstore.create_message( -                    self.mbox, uid_next, -                    MessageWrapper(new_fdoc), -                    observer=observer, -                    notify_on_disk=False) - -        d = self._get_msg_copy(message) -        d.addCallback(createCopy) -        d.addErrback(lambda f: log.msg(f.getTraceback())) - -    #@deferred_to_thread -    def _get_msg_copy(self, message): -        """ -        Get a copy of the fdoc for this message, and check whether -        it already exists. - -        :param message: an IMessage implementor -        :type message: LeapMessage -        :return: exist, new_fdoc -        :rtype: tuple -        """ -        # XXX  for clarity, this could be delegated to a -        # MessageCollection mixin that implements copy too, and -        # moved out of here. -        msg = message -        memstore = self._memstore - -        if empty(msg.fdoc): -            logger.warning("Tried to copy a MSG with no fdoc") -            return -        new_fdoc = copy.deepcopy(msg.fdoc.content) -        fdoc_chash = new_fdoc[fields.CONTENT_HASH_KEY] - -        dest_fdoc = memstore.get_fdoc_from_chash( -            fdoc_chash, self.mbox) - -        exist = not empty(dest_fdoc) -        return exist, new_fdoc +        # FIXME not implemented !!! --- +        return self.collection.copy_msg(message, self.mbox_name)      # convenience fun @@ -1023,29 +804,25 @@ class SoledadMailbox(WithMsgFields):          """          Delete all docs in this mailbox          """ -        def del_all_docs(docs): -            todelete = [] -            for doc in docs: -                d = self.messages._soledad.delete_doc(doc) -                todelete.append(d) -            return defer.gatherResults(todelete) - -        d = self.messages.get_all_docs() -        d.addCallback(del_all_docs) -        return d +        # FIXME not implemented +        return self.collection.delete_all_docs()      def unset_recent_flags(self, uid_seq):          """          Unset Recent flag for a sequence of UIDs.          """ -        self.messages.unset_recent_flags(uid_seq) +        # FIXME not implemented +        return self.collection.unset_recent_flags(uid_seq)      def __repr__(self):          """          Representation string for this mailbox.          """ -        return u"<SoledadMailbox: mbox '%s' (%s)>" % ( -            self.mbox, self.messages.count()) +        return u"<IMAPMailbox: mbox '%s' (%s)>" % ( +            self.mbox_name, self.messages.count()) + + +_INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE)  def normalize_mailbox(name): @@ -1060,7 +837,8 @@ def normalize_mailbox(name):      :rtype: unicode      """ -    _INBOX_RE = re.compile(INBOX_NAME, re.IGNORECASE) +    # XXX maybe it would make sense to normalize common folders too: +    # trash, sent, drafts, etc...      if _INBOX_RE.match(name):          # ensure inital INBOX is uppercase          return INBOX_NAME + name[len(INBOX_NAME):] diff --git a/mail/src/leap/mail/imap/memorystore.py b/mail/src/leap/mail/imap/memorystore.py deleted file mode 100644 index eda5b96..0000000 --- a/mail/src/leap/mail/imap/memorystore.py +++ /dev/null @@ -1,1340 +0,0 @@ - -# memorystore.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. -""" -In-memory transient store for a LEAPIMAPServer. -""" -import contextlib -import logging -import threading -import weakref - -from collections import defaultdict -from copy import copy - -from enum import Enum -from twisted.internet import defer -from twisted.internet import reactor -from twisted.internet.task import LoopingCall -from twisted.python import log -from zope.interface import implements - -from leap.common.check import leap_assert_type -from leap.mail import size -from leap.mail.utils import empty, phash_iter -from leap.mail.messageflow import MessageProducer -from leap.mail.imap import interfaces -from leap.mail.imap.fields import fields -from leap.mail.imap.messageparts import MessagePartType, MessagePartDoc -from leap.mail.imap.messageparts import RecentFlagsDoc -from leap.mail.imap.messageparts import MessageWrapper -from leap.mail.imap.messageparts import ReferenciableDict - -from leap.mail.decorators import deferred_to_thread - -logger = logging.getLogger(__name__) - - -# The default period to do writebacks to the permanent -# soledad storage, in seconds. -SOLEDAD_WRITE_PERIOD = 15 - -FDOC = MessagePartType.fdoc.name -HDOC = MessagePartType.hdoc.name -CDOCS = MessagePartType.cdocs.name -DOCS_ID = MessagePartType.docs_id.name - - -@contextlib.contextmanager -def set_bool_flag(obj, att): -    """ -    Set a boolean flag to True while we're doing our thing. -    Just to let the world know. -    """ -    setattr(obj, att, True) -    try: -        yield True -    except RuntimeError as exc: -        logger.exception(exc) -    finally: -        setattr(obj, att, False) - - -DirtyState = Enum("DirtyState", "none dirty new") - - -class MemoryStore(object): -    """ -    An in-memory store to where we can write the different parts that -    we split the messages into and buffer them until we write them to the -    permanent storage. - -    It uses MessageWrapper instances to represent the message-parts, which are -    indexed by mailbox name and UID. - -    It also can be passed a permanent storage as a paremeter (any implementor -    of IMessageStore, in this case a SoledadStore). In this case, a periodic -    dump of the messages stored in memory will be done. The period of the -    writes to the permanent storage is controled by the write_period parameter -    in the constructor. -    """ -    implements(interfaces.IMessageStore, -               interfaces.IMessageStoreWriter) - -    # TODO We will want to index by chash when we transition to local-only -    # UIDs. - -    WRITING_FLAG = "_writing" -    _last_uid_lock = threading.Lock() -    _fdoc_docid_lock = threading.Lock() - -    def __init__(self, permanent_store=None, -                 write_period=SOLEDAD_WRITE_PERIOD): -        """ -        Initialize a MemoryStore. - -        :param permanent_store: a IMessageStore implementor to dump -                                messages to. -        :type permanent_store: IMessageStore -        :param write_period: the interval to dump messages to disk, in seconds. -        :type write_period: int -        """ -        self._permanent_store = permanent_store -        self._write_period = write_period - -        if permanent_store is None: -            self._mbox_closed = defaultdict(lambda: False) - -        # Internal Storage: messages -        """ -        flags document store. -        _fdoc_store[mbox][uid] = { 'content': 'aaa' } -        """ -        self._fdoc_store = defaultdict(lambda: defaultdict( -            lambda: ReferenciableDict({}))) - -        # Sizes -        """ -        {'mbox, uid': <int>} -        """ -        self._sizes = {} - -        # Internal Storage: payload-hash -        """ -        fdocs:doc-id store, stores document IDs for putting -        the dirty flags-docs. -        """ -        self._fdoc_id_store = defaultdict(lambda: defaultdict( -            lambda: '')) - -        # Internal Storage: content-hash:hdoc -        """ -        hdoc-store keeps references to -        the header-documents indexed by content-hash. - -        {'chash': { dict-stuff } -        } -        """ -        self._hdoc_store = defaultdict(lambda: ReferenciableDict({})) - -        # Internal Storage: payload-hash:cdoc -        """ -        content-docs stored by payload-hash -        {'phash': { dict-stuff } } -        """ -        self._cdoc_store = defaultdict(lambda: ReferenciableDict({})) - -        # Internal Storage: content-hash:fdoc -        """ -        chash-fdoc-store keeps references to -        the flag-documents indexed by content-hash. - -        {'chash': {'mbox-a': weakref.proxy(dict), -                   'mbox-b': weakref.proxy(dict)} -        } -        """ -        self._chash_fdoc_store = defaultdict(lambda: defaultdict(lambda: None)) - -        # Internal Storage: recent-flags store -        """ -        recent-flags store keeps one dict per mailbox, -        with the document-id of the u1db document -        and the set of the UIDs that have the recent flag. - -        {'mbox-a': {'doc_id': 'deadbeef', -                    'set': {1,2,3,4} -                    } -        } -        """ -        # TODO this will have to transition to content-hash -        # indexes after we move to local-only UIDs. - -        self._rflags_store = defaultdict( -            lambda: {'doc_id': None, 'set': set([])}) - -        """ -        last-uid store keeps the count of the highest UID -        per mailbox. - -        {'mbox-a': 42, -         'mbox-b': 23} -        """ -        self._last_uid = defaultdict(lambda: 0) - -        """ -        known-uids keeps a count of the uids that soledad knows for a given -        mailbox - -        {'mbox-a': set([1,2,3])} -        """ -        self._known_uids = defaultdict(set) - -        """ -        mbox-flags is a dict containing flags for each mailbox. this is -        modified from mailbox.getFlags / mailbox.setFlags -        """ -        self._mbox_flags = defaultdict(set) - -        # New and dirty flags, to set MessageWrapper State. -        self._new = set([]) -        self._new_queue = set([]) -        self._new_deferreds = {} - -        self._dirty = set([]) -        self._dirty_queue = set([]) -        self._dirty_deferreds = {} - -        self._rflags_dirty = set([]) - -        # Flag for signaling we're busy writing to the disk storage. -        setattr(self, self.WRITING_FLAG, False) - -        if self._permanent_store is not None: -            # this producer spits its messages to the permanent store -            # consumer using a queue. We will use that to put -            # our messages to be written. -            self.producer = MessageProducer(permanent_store, -                                            period=0.1) -            # looping call for dumping to SoledadStore -            self._write_loop = LoopingCall(self.write_messages, -                                           permanent_store) - -            # We can start the write loop right now, why wait? -            self._start_write_loop() -        else: -            # We have a memory-only store. -            self.producer = None -            self._write_loop = None - -    # TODO -- remove -    def _start_write_loop(self): -        """ -        Start loop for writing to disk database. -        """ -        if self._write_loop is None: -            return -        if not self._write_loop.running: -            self._write_loop.start(self._write_period, now=True) - -    # TODO -- remove -    def _stop_write_loop(self): -        """ -        Stop loop for writing to disk database. -        """ -        if self._write_loop is None: -            return -        if self._write_loop.running: -            self._write_loop.stop() - -    # IMessageStore - -    # XXX this would work well for whole message operations. -    # We would have to add a put_flags operation to modify only -    # the flags doc (and set the dirty flag accordingly) - -    def create_message(self, mbox, uid, message, observer, -                       notify_on_disk=True): -        """ -        Create the passed message into this MemoryStore. - -        By default we consider that any message is a new message. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the UID for the message -        :type uid: int -        :param message: a message to be added -        :type message: MessageWrapper -        :param observer: -            the deferred that will fire with the UID of the message. If -            notify_on_disk is True, this will happen when the message is -            written to Soledad. Otherwise it will fire as soon as we've added -            the message to the memory store. -        :type observer: Deferred -        :param notify_on_disk: -            whether the `observer` deferred should wait until the message is -            written to disk to be fired. -        :type notify_on_disk: bool -        """ -        # TODO -- return a deferred -        log.msg("Adding new doc to memstore %r (%r)" % (mbox, uid)) -        key = mbox, uid - -        self._add_message(mbox, uid, message, notify_on_disk) -        self._new.add(key) - -        if observer is not None: -            if notify_on_disk: -                # We store this deferred so we can keep track of the pending -                # operations internally. -                # TODO this should fire with the UID !!! -- change that in -                # the soledad store code. -                self._new_deferreds[key] = observer - -            else: -                # Caller does not care, just fired and forgot, so we pass -                # a defer that will inmediately have its callback triggered. -                reactor.callFromThread(observer.callback, uid) - -    def put_message(self, mbox, uid, message, notify_on_disk=True): -        """ -        Put an existing message. - -        This will also set the dirty flag on the MemoryStore. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the UID for the message -        :type uid: int -        :param message: a message to be added -        :type message: MessageWrapper -        :param notify_on_disk: whether the deferred that is returned should -                               wait until the message is written to disk to -                               be fired. -        :type notify_on_disk: bool - -        :return: a Deferred. if notify_on_disk is True, will be fired -                 when written to the db on disk. -                 Otherwise will fire inmediately -        :rtype: Deferred -        """ -        key = mbox, uid -        d = defer.Deferred() -        d.addCallback(lambda result: log.msg("message PUT save: %s" % result)) - -        self._dirty.add(key) -        self._dirty_deferreds[key] = d -        self._add_message(mbox, uid, message, notify_on_disk) -        return d - -    def _add_message(self, mbox, uid, message, notify_on_disk=True): -        """ -        Helper method, called by both create_message and put_message. -        See those for parameter documentation. -        """ -        msg_dict = message.as_dict() - -        fdoc = msg_dict.get(FDOC, None) -        if fdoc is not None: -            fdoc_store = self._fdoc_store[mbox][uid] -            fdoc_store.update(fdoc) -            chash_fdoc_store = self._chash_fdoc_store - -            # content-hash indexing -            chash = fdoc.get(fields.CONTENT_HASH_KEY) -            chash_fdoc_store[chash][mbox] = weakref.proxy( -                self._fdoc_store[mbox][uid]) - -        hdoc = msg_dict.get(HDOC, None) -        if hdoc is not None: -            chash = hdoc.get(fields.CONTENT_HASH_KEY) -            hdoc_store = self._hdoc_store[chash] -            hdoc_store.update(hdoc) - -        cdocs = message.cdocs -        for cdoc in cdocs.values(): -            phash = cdoc.get(fields.PAYLOAD_HASH_KEY, None) -            if not phash: -                continue -            cdoc_store = self._cdoc_store[phash] -            cdoc_store.update(cdoc) - -        # Update memory store size -        # XXX this should use [mbox][uid] -        # TODO --- this has to be deferred to thread, -        # TODO add hdoc and cdocs sizes too -        # it's slowing things down here. -        # key = mbox, uid -        # self._sizes[key] = size.get_size(self._fdoc_store[key]) - -    def purge_fdoc_store(self, mbox): -        """ -        Purge the empty documents from a fdoc store. -        Called during initialization of the SoledadMailbox - -        :param mbox: the mailbox -        :type mbox: str or unicode -        """ -        # XXX This is really a workaround until I find the conditions -        # that are making the empty items remain there. -        # This happens, for instance, after running several times -        # the regression test, that issues a store deleted + expunge + select -        # The items are being correclty deleted, but in succesive appends -        # the empty items with previously deleted uids reappear as empty -        # documents. I suspect it's a timing condition with a previously -        # evaluated sequence being used after the items has been removed. - -        for uid, value in self._fdoc_store[mbox].items(): -            if empty(value): -                del self._fdoc_store[mbox][uid] - -    def get_docid_for_fdoc(self, mbox, uid): -        """ -        Return Soledad document id for the flags-doc for a given mbox and uid, -        or None of no flags document could be found. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the message UID -        :type uid: int -        :rtype: unicode or None -        """ -        with self._fdoc_docid_lock: -            doc_id = self._fdoc_id_store[mbox][uid] - -        if empty(doc_id): -            fdoc = self._permanent_store.get_flags_doc(mbox, uid) -            if empty(fdoc) or empty(fdoc.content): -                return None -            doc_id = fdoc.doc_id -            self._fdoc_id_store[mbox][uid] = doc_id - -        return doc_id - -    def get_message(self, mbox, uid, dirtystate=DirtyState.none, -                    flags_only=False): -        """ -        Get a MessageWrapper for the given mbox and uid combination. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the message UID -        :type uid: int -        :param dirtystate: DirtyState enum: one of `dirty`, `new` -                           or `none` (default) -        :type dirtystate: enum -        :param flags_only: whether the message should carry only a reference -                           to the flags document. -        :type flags_only: bool -        : - -        :return: MessageWrapper or None -        """ -        # TODO -- return deferred -        if dirtystate == DirtyState.dirty: -            flags_only = True - -        key = mbox, uid - -        fdoc = self._fdoc_store[mbox][uid] -        if empty(fdoc): -            return None - -        new, dirty = False, False -        if dirtystate == DirtyState.none: -            new, dirty = self._get_new_dirty_state(key) -        if dirtystate == DirtyState.dirty: -            new, dirty = False, True -        if dirtystate == DirtyState.new: -            new, dirty = True, False - -        if flags_only: -            return MessageWrapper(fdoc=fdoc, -                                  new=new, dirty=dirty, -                                  memstore=weakref.proxy(self)) -        else: -            chash = fdoc.get(fields.CONTENT_HASH_KEY) -            hdoc = self._hdoc_store[chash] -            if empty(hdoc): -                # XXX this will be a deferred -                hdoc = self._permanent_store.get_headers_doc(chash) -                if empty(hdoc): -                    return None -                if not empty(hdoc.content): -                    self._hdoc_store[chash] = hdoc.content -                    hdoc = hdoc.content -            cdocs = None - -            pmap = hdoc.get(fields.PARTS_MAP_KEY, None) -            if new and pmap is not None: -                # take the different cdocs for write... -                cdoc_store = self._cdoc_store -                cdocs_list = phash_iter(hdoc) -                cdocs = dict(enumerate( -                    [cdoc_store[phash] for phash in cdocs_list], 1)) - -            return MessageWrapper(fdoc=fdoc, hdoc=hdoc, cdocs=cdocs, -                                  new=new, dirty=dirty, -                                  memstore=weakref.proxy(self)) - -    def remove_message(self, mbox, uid): -        """ -        Remove a Message from this MemoryStore. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the message UID -        :type uid: int -        """ -        # XXX For the moment we are only removing the flags and headers -        # docs. The rest we leave there polluting your hard disk, -        # until we think about a good way of deorphaning. - -        # XXX implement elijah's idea of using a PUT document as a -        # token to ensure consistency in the removal. - -        try: -            del self._fdoc_store[mbox][uid] -        except KeyError: -            pass - -        try: -            key = mbox, uid -            self._new.discard(key) -            self._dirty.discard(key) -            if key in self._sizes: -                del self._sizes[key] -            self._known_uids[mbox].discard(uid) -        except KeyError: -            pass -        except Exception as exc: -            logger.error("error while removing message!") -            logger.exception(exc) -        try: -            with self._fdoc_docid_lock: -                del self._fdoc_id_store[mbox][uid] -        except KeyError: -            pass -        except Exception as exc: -            logger.error("error while removing message!") -            logger.exception(exc) - -    # IMessageStoreWriter - -    # TODO -- I think we don't need this anymore. -    # instead, we can have -    def write_messages(self, store): -        """ -        Write the message documents in this MemoryStore to a different store. - -        :param store: the IMessageStore to write to -        :rtype: False if queue is not empty, None otherwise. -        """ -        # For now, we pass if the queue is not empty, to avoid duplicate -        # queuing. -        # We would better use a flag to know when we've already enqueued an -        # item. - -        # XXX this could return the deferred for all the enqueued operations - -        if not self.producer.is_queue_empty(): -            return False - -        if any(map(lambda i: not empty(i), (self._new, self._dirty))): -            logger.info("Writing messages to Soledad...") - -        # TODO change for lock, and make the property access -        # is accquired -        with set_bool_flag(self, self.WRITING_FLAG): -            for rflags_doc_wrapper in self.all_rdocs_iter(): -                self.producer.push(rflags_doc_wrapper, -                                   state=self.producer.STATE_DIRTY) -            for msg_wrapper in self.all_new_msg_iter(): -                self.producer.push(msg_wrapper, -                                   state=self.producer.STATE_NEW) -            for msg_wrapper in self.all_dirty_msg_iter(): -                self.producer.push(msg_wrapper, -                                   state=self.producer.STATE_DIRTY) - -    # MemoryStore specific methods. - -    def get_uids(self, mbox): -        """ -        Get all uids for a given mbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: list -        """ -        return self._fdoc_store[mbox].keys() - -    def get_soledad_known_uids(self, mbox): -        """ -        Get all uids that soledad knows about, from the memory cache. -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: list -        """ -        return self._known_uids.get(mbox, []) - -    # last_uid - -    def get_last_uid(self, mbox): -        """ -        Return the highest UID for a given mbox. -        It will be the highest between the highest uid in the message store for -        the mailbox, and the soledad integer cache. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: int -        """ -        uids = self.get_uids(mbox) -        last_mem_uid = uids and max(uids) or 0 -        last_soledad_uid = self.get_last_soledad_uid(mbox) -        return max(last_mem_uid, last_soledad_uid) - -    def get_last_soledad_uid(self, mbox): -        """ -        Get last uid for a given mbox from the soledad integer cache. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        """ -        return self._last_uid.get(mbox, 0) - -    def set_last_soledad_uid(self, mbox, value): -        """ -        Set last uid for a given mbox in the soledad integer cache. -        SoledadMailbox should prime this value during initialization. -        Other methods (during message adding) SHOULD call -        `increment_last_soledad_uid` instead. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param value: the value to set -        :type value: int -        """ -        # can be long??? -        # leap_assert_type(value, int) -        logger.info("setting last soledad uid for %s to %s" % -                    (mbox, value)) -        # if we already have a value here, don't do anything -        with self._last_uid_lock: -            if not self._last_uid.get(mbox, None): -                self._last_uid[mbox] = value - -    def set_known_uids(self, mbox, value): -        """ -        Set the value fo the known-uids set for this mbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param value: a sequence of integers to be added to the set. -        :type value: tuple -        """ -        current = self._known_uids[mbox] -        self._known_uids[mbox] = current.union(set(value)) - -    def increment_last_soledad_uid(self, mbox): -        """ -        Increment by one the soledad integer cache for the last_uid for -        this mbox, and fire a defer-to-thread to update the soledad value. -        The caller should lock the call tho this method. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        """ -        with self._last_uid_lock: -            self._last_uid[mbox] += 1 -            value = self._last_uid[mbox] -            reactor.callInThread(self.write_last_uid, mbox, value) -            return value - -    def write_last_uid(self, mbox, value): -        """ -        Increment the soledad integer cache for the highest uid value. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param value: the value to set -        :type value: int -        """ -        leap_assert_type(value, int) -        if self._permanent_store: -            self._permanent_store.write_last_uid(mbox, value) - -    def load_flag_docs(self, mbox, flag_docs): -        """ -        Load the flag documents for the given mbox. -        Used during initial flag docs prefetch. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param flag_docs: a dict with the content for the flag docs, indexed -                          by uid. -        :type flag_docs: dict -        """ -        # We can do direct assignments cause we know this will only -        # be called during initialization of the mailbox. -        # TODO could hook here a sanity-check -        # for duplicates - -        fdoc_store = self._fdoc_store[mbox] -        chash_fdoc_store = self._chash_fdoc_store -        for uid in flag_docs: -            rdict = ReferenciableDict(flag_docs[uid]) -            fdoc_store[uid] = rdict -            # populate chash dict too, to avoid fdoc duplication -            chash = flag_docs[uid]["chash"] -            chash_fdoc_store[chash][mbox] = weakref.proxy( -                self._fdoc_store[mbox][uid]) - -    def update_flags(self, mbox, uid, fdoc): -        """ -        Update the flag document for a given mbox and uid combination, -        and set the dirty flag. -        We could use put_message, but this is faster. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the uid of the message -        :type uid: int - -        :param fdoc: a dict with the content for the flag docs -        :type fdoc: dict -        """ -        key = mbox, uid -        self._fdoc_store[mbox][uid].update(fdoc) -        self._dirty.add(key) - -    def load_header_docs(self, header_docs): -        """ -        Load the flag documents for the given mbox. -        Used during header docs prefetch, and during cache after -        a read from soledad if the hdoc property in message did not -        find its value in here. - -        :param flag_docs: a dict with the content for the flag docs. -        :type flag_docs: dict -        """ -        hdoc_store = self._hdoc_store -        for chash in header_docs: -            hdoc_store[chash] = ReferenciableDict(header_docs[chash]) - -    def all_flags(self, mbox): -        """ -        Return a dictionary with all the flags for a given mbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: dict -        """ -        fdict = {} -        uids = self.get_uids(mbox) -        fstore = self._fdoc_store[mbox] - -        for uid in uids: -            try: -                fdict[uid] = fstore[uid][fields.FLAGS_KEY] -            except KeyError: -                continue -        return fdict - -    def all_headers(self, mbox): -        """ -        Return a dictionary with all the header docs for a given mbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: dict -        """ -        headers_dict = {} -        uids = self.get_uids(mbox) -        fdoc_store = self._fdoc_store[mbox] -        hdoc_store = self._hdoc_store - -        for uid in uids: -            try: -                chash = fdoc_store[uid][fields.CONTENT_HASH_KEY] -                hdoc = hdoc_store[chash] -                if not empty(hdoc): -                    headers_dict[uid] = hdoc -            except KeyError: -                continue -        return headers_dict - -    # Counting sheeps... - -    def count_new_mbox(self, mbox): -        """ -        Count the new messages by mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: number of new messages -        :rtype: int -        """ -        return len([(m, uid) for m, uid in self._new if mbox == mbox]) - -    # XXX used at all? -    def count_new(self): -        """ -        Count all the new messages in the MemoryStore. - -        :rtype: int -        """ -        return len(self._new) - -    def count(self, mbox): -        """ -        Return the count of messages for a given mbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: number of messages -        :rtype: int -        """ -        return len(self._fdoc_store[mbox]) - -    def unseen_iter(self, mbox): -        """ -        Get an iterator for the message UIDs with no `seen` flag -        for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: iterator through unseen message doc UIDs -        :rtype: iterable -        """ -        fdocs = self._fdoc_store[mbox] - -        return [uid for uid, value -                in fdocs.items() -                if fields.SEEN_FLAG not in value.get(fields.FLAGS_KEY, [])] - -    def get_cdoc_from_phash(self, phash): -        """ -        Return a content-document by its payload-hash. - -        :param phash: the payload hash to check against -        :type phash: str or unicode -        :rtype: MessagePartDoc -        """ -        doc = self._cdoc_store.get(phash, None) - -        # XXX return None for consistency? - -        # XXX have to keep a mapping between phash and its linkage -        # info, to know if this payload is been already saved or not. -        # We will be able to get this from the linkage-docs, -        # not yet implemented. -        new = True -        dirty = False -        return MessagePartDoc( -            new=new, dirty=dirty, store="mem", -            part=MessagePartType.cdoc, -            content=doc, -            doc_id=None) - -    def get_fdoc_from_chash(self, chash, mbox): -        """ -        Return a flags-document by its content-hash and a given mailbox. -        Used during content-duplication detection while copying or adding a -        message. - -        :param chash: the content hash to check against -        :type chash: str or unicode -        :param mbox: the mailbox -        :type mbox: str or unicode - -        :return: MessagePartDoc. It will return None if the flags document -                 has empty content or it is flagged as \\Deleted. -        """ -        fdoc = self._chash_fdoc_store[chash][mbox] - -        # a couple of special cases. -        # 1. We might have a doc with empty content... -        if empty(fdoc): -            return None - -        # 2. ...Or the message could exist, but being flagged for deletion. -        # We want to create a new one in this case. -        # Hmmm what if the deletion is un-done?? We would end with a -        # duplicate... -        if fdoc and fields.DELETED_FLAG in fdoc.get(fields.FLAGS_KEY, []): -            return None - -        uid = fdoc[fields.UID_KEY] -        key = mbox, uid -        new = key in self._new -        dirty = key in self._dirty - -        return MessagePartDoc( -            new=new, dirty=dirty, store="mem", -            part=MessagePartType.fdoc, -            content=fdoc, -            doc_id=None) - -    def iter_fdoc_keys(self): -        """ -        Return a generator through all the mbox, uid keys in the flags-doc -        store. -        """ -        fdoc_store = self._fdoc_store -        for mbox in fdoc_store: -            for uid in fdoc_store[mbox]: -                yield mbox, uid - -    def all_new_msg_iter(self): -        """ -        Return generator that iterates through all new messages. - -        :return: generator of MessageWrappers -        :rtype: generator -        """ -        gm = self.get_message -        # need to freeze, set can change during iteration -        new = [gm(*key, dirtystate=DirtyState.new) for key in tuple(self._new)] -        # move content from new set to the queue -        self._new_queue.update(self._new) -        self._new.difference_update(self._new) -        return new - -    def all_dirty_msg_iter(self): -        """ -        Return generator that iterates through all dirty messages. - -        :return: generator of MessageWrappers -        :rtype: generator -        """ -        gm = self.get_message -        # need to freeze, set can change during iteration -        dirty = [gm(*key, flags_only=True, dirtystate=DirtyState.dirty) -                 for key in tuple(self._dirty)] -        # move content from new and dirty sets to the queue - -        self._dirty_queue.update(self._dirty) -        self._dirty.difference_update(self._dirty) -        return dirty - -    def all_deleted_uid_iter(self, mbox): -        """ -        Return a list with the UIDs for all messags -        with deleted flag in a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: list of integers -        :rtype: list -        """ -        # This *needs* to return a fixed sequence. Otherwise the dictionary len -        # will change during iteration, when we modify it -        fdocs = self._fdoc_store[mbox] -        return [uid for uid, value -                in fdocs.items() -                if fields.DELETED_FLAG in value.get(fields.FLAGS_KEY, [])] - -    # new, dirty flags - -    def _get_new_dirty_state(self, key): -        """ -        Return `new` and `dirty` flags for a given message. - -        :param key: the key for the message, in the form mbox, uid -        :type key: tuple -        :return: tuple of bools -        :rtype: tuple -        """ -        # TODO change indexing of sets to [mbox][key] too. -        # XXX should return *first* the news, and *then* the dirty... - -        # TODO should query in queues too , true? -        # -        return map(lambda _set: key in _set, (self._new, self._dirty)) - -    def set_new_queued(self, key): -        """ -        Add the key value to the `new-queue` set. - -        :param key: the key for the message, in the form mbox, uid -        :type key: tuple -        """ -        self._new_queue.add(key) - -    def unset_new_queued(self, key): -        """ -        Remove the key value from the `new-queue` set. - -        :param key: the key for the message, in the form mbox, uid -        :type key: tuple -        """ -        self._new_queue.discard(key) -        deferreds = self._new_deferreds -        d = deferreds.get(key, None) -        if d: -            # XXX use a namedtuple for passing the result -            # when we check it in the other side. -            d.callback('%s, ok' % str(key)) -            deferreds.pop(key) - -    def set_dirty_queued(self, key): -        """ -        Add the key value to the `dirty-queue` set. - -        :param key: the key for the message, in the form mbox, uid -        :type key: tuple -        """ -        self._dirty_queue.add(key) - -    def unset_dirty_queued(self, key): -        """ -        Remove the key value from the `dirty-queue` set. - -        :param key: the key for the message, in the form mbox, uid -        :type key: tuple -        """ -        self._dirty_queue.discard(key) -        deferreds = self._dirty_deferreds -        d = deferreds.get(key, None) -        if d: -            # XXX use a namedtuple for passing the result -            # when we check it in the other side. -            d.callback('%s, ok' % str(key)) -            deferreds.pop(key) - -    # Recent Flags - -    def set_recent_flag(self, mbox, uid): -        """ -        Set the `Recent` flag for a given mailbox and UID. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the message UID -        :type uid: int -        """ -        self._rflags_dirty.add(mbox) -        self._rflags_store[mbox]['set'].add(uid) - -    # TODO --- nice but unused -    def unset_recent_flag(self, mbox, uid): -        """ -        Unset the `Recent` flag for a given mailbox and UID. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the message UID -        :type uid: int -        """ -        self._rflags_store[mbox]['set'].discard(uid) - -    def set_recent_flags(self, mbox, value): -        """ -        Set the value for the set of the recent flags. -        Used from the property in the MessageCollection. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param value: a sequence of flags to set -        :type value: sequence -        """ -        self._rflags_dirty.add(mbox) -        self._rflags_store[mbox]['set'] = set(value) - -    def load_recent_flags(self, mbox, flags_doc): -        """ -        Load the passed flags document in the recent flags store, for a given -        mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param flags_doc: A dictionary containing the `doc_id` of the Soledad -                          flags-document for this mailbox, and the `set` -                          of uids marked with that flag. -        """ -        self._rflags_store[mbox] = flags_doc - -    def get_recent_flags(self, mbox): -        """ -        Return the set of UIDs with the `Recent` flag for this mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: set, or None -        """ -        rflag_for_mbox = self._rflags_store.get(mbox, None) -        if not rflag_for_mbox: -            return None -        return self._rflags_store[mbox]['set'] - -    # XXX -- remove -    def all_rdocs_iter(self): -        """ -        Return an iterator through all in-memory recent flag dicts, wrapped -        under a RecentFlagsDoc namedtuple. -        Used for saving to disk. - -        :return: a generator of RecentFlagDoc -        :rtype: generator -        """ -        # XXX use enums -        DOC_ID = "doc_id" -        SET = "set" - -        rflags_store = self._rflags_store - -        def get_rdoc(mbox, rdict): -            mbox_rflag_set = rdict[SET] -            recent_set = copy(mbox_rflag_set) -            # zero it! -            mbox_rflag_set.difference_update(mbox_rflag_set) -            return RecentFlagsDoc( -                doc_id=rflags_store[mbox][DOC_ID], -                content={ -                    fields.TYPE_KEY: fields.TYPE_RECENT_VAL, -                    fields.MBOX_KEY: mbox, -                    fields.RECENTFLAGS_KEY: list(recent_set) -                }) - -        return (get_rdoc(mbox, rdict) for mbox, rdict in rflags_store.items() -                if not empty(rdict[SET])) - -    # Methods that mirror the IMailbox interface - -    def remove_all_deleted(self, mbox): -        """ -        Remove all messages flagged \\Deleted from this Memory Store only. -        Called from `expunge` - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: a list of UIDs -        :rtype: list -        """ -        mem_deleted = self.all_deleted_uid_iter(mbox) -        for uid in mem_deleted: -            self.remove_message(mbox, uid) -        return mem_deleted - -    # TODO -- remove -    def stop_and_flush(self): -        """ -        Stop the write loop and trigger a write to the producer. -        """ -        self._stop_write_loop() -        if self._permanent_store is not None: -            # XXX we should check if we did get a True value on this -            # operation. If we got False we should retry! (queue was not empty) -            self.write_messages(self._permanent_store) -            self.producer.flush() - -    def expunge(self, mbox, observer): -        """ -        Remove all messages flagged \\Deleted, from the Memory Store -        and from the permanent store also. - -        It first queues up a last write, and wait for the deferreds to be done -        before continuing. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param observer: a deferred that will be fired when expunge is done -        :type observer: Deferred -        """ -        soledad_store = self._permanent_store -        if soledad_store is None: -            # just-in memory store, easy then. -            self._delete_from_memory(mbox, observer) -            return - -        # We have a soledad storage. -        try: -            # Stop and trigger last write -            self.stop_and_flush() -            # Wait on the writebacks to finish - -            # XXX what if pending deferreds is empty? -            pending_deferreds = (self._new_deferreds.get(mbox, []) + -                                 self._dirty_deferreds.get(mbox, [])) -            d1 = defer.gatherResults(pending_deferreds, consumeErrors=True) -            d1.addCallback( -                self._delete_from_soledad_and_memory, mbox, observer) -        except Exception as exc: -            logger.exception(exc) - -    def _delete_from_memory(self, mbox, observer): -        """ -        Remove all messages marked as deleted from soledad and memory. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param observer: a deferred that will be fired when expunge is done -        :type observer: Deferred -        """ -        mem_deleted = self.remove_all_deleted(mbox) -        # TODO return a DeferredList -        observer.callback(mem_deleted) - -    def _delete_from_soledad_and_memory(self, result, mbox, observer): -        """ -        Remove all messages marked as deleted from soledad and memory. - -        :param result: ignored. the result of the deferredList that triggers -                       this as a callback from `expunge`. -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param observer: a deferred that will be fired when expunge is done -        :type observer: Deferred -        """ -        all_deleted = [] -        soledad_store = self._permanent_store - -        try: -            # 1. Delete all messages marked as deleted in soledad. -            logger.debug("DELETING FROM SOLEDAD ALL FOR %r" % (mbox,)) -            sol_deleted = soledad_store.remove_all_deleted(mbox) - -            try: -                self._known_uids[mbox].difference_update(set(sol_deleted)) -            except Exception as exc: -                logger.exception(exc) - -            # 2. Delete all messages marked as deleted in memory. -            logger.debug("DELETING FROM MEM ALL FOR %r" % (mbox,)) -            mem_deleted = self.remove_all_deleted(mbox) - -            all_deleted = set(mem_deleted).union(set(sol_deleted)) -            logger.debug("deleted %r" % all_deleted) -        except Exception as exc: -            logger.exception(exc) -        finally: -            self._start_write_loop() - -        observer.callback(all_deleted) - -    # Mailbox documents and attributes - -    # This could be also be cached in memstore, but proxying directly -    # to soledad since it's not too performance-critical. - -    def get_mbox_doc(self, mbox): -        """ -        Return the soledad document for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: SoledadDocument or None. -        """ -        if self.permanent_store is not None: -            return self.permanent_store.get_mbox_document(mbox) -        else: -            return None - -    def get_mbox_closed(self, mbox): -        """ -        Return the closed attribute for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: bool -        """ -        if self.permanent_store is not None: -            return self.permanent_store.get_mbox_closed(mbox) -        else: -            return self._mbox_closed[mbox] - -    def set_mbox_closed(self, mbox, closed): -        """ -        Set the closed attribute for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        """ -        if self.permanent_store is not None: -            self.permanent_store.set_mbox_closed(mbox, closed) -        else: -            self._mbox_closed[mbox] = closed - -    def get_mbox_flags(self, mbox): -        """ -        Get the flags for a given mbox. -        :rtype: list -        """ -        return sorted(self._mbox_flags[mbox]) - -    def set_mbox_flags(self, mbox, flags): -        """ -        Set the mbox flags -        """ -        self._mbox_flags[mbox] = set(flags) -        # TODO -        # This should write to the permanent store!!! - -    # Rename flag-documents - -    def rename_fdocs_mailbox(self, old_mbox, new_mbox): -        """ -        Change the mailbox name for all flag documents in a given mailbox. -        Used from account.rename - -        :param old_mbox: name for the old mbox -        :type old_mbox: str or unicode -        :param new_mbox: name for the new mbox -        :type new_mbox: str or unicode -        """ -        fs = self._fdoc_store -        keys = fs[old_mbox].keys() -        for k in keys: -            fdoc = fs[old_mbox][k] -            fdoc['mbox'] = new_mbox -            fs[new_mbox][k] = fdoc -            fs[old_mbox].pop(k) -            self._dirty.add((new_mbox, k)) - -    # Dump-to-disk controls. - -    @property -    def is_writing(self): -        """ -        Property that returns whether the store is currently writing its -        internal state to a permanent storage. - -        Used to evaluate whether the CHECK command can inform that the field -        is clear to proceed, or waiting for the write operations to complete -        is needed instead. - -        :rtype: bool -        """ -        # FIXME this should return a deferred !!! -        # TODO this should be moved to soledadStore instead -        # (all pending deferreds) -        return getattr(self, self.WRITING_FLAG) - -    @property -    def permanent_store(self): -        return self._permanent_store - -    # Memory management. - -    def get_size(self): -        """ -        Return the size of the internal storage. -        Use for calculating the limit beyond which we should flush the store. - -        :rtype: int -        """ -        return reduce(lambda x, y: x + y, self._sizes, 0) diff --git a/mail/src/leap/mail/imap/messageparts.py b/mail/src/leap/mail/imap/messageparts.py deleted file mode 100644 index fb1d75a..0000000 --- a/mail/src/leap/mail/imap/messageparts.py +++ /dev/null @@ -1,586 +0,0 @@ -# messageparts.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. -""" -MessagePart implementation. Used from LeapMessage. -""" -import logging -import StringIO -import weakref - -from collections import namedtuple - -from enum import Enum -from zope.interface import implements -from twisted.mail import imap4 - -from leap.common.decorators import memoized_method -from leap.common.mail import get_email_charset -from leap.mail.imap import interfaces -from leap.mail.imap.fields import fields -from leap.mail.utils import empty, first, find_charset - -MessagePartType = Enum("MessagePartType", "hdoc fdoc cdoc cdocs docs_id") - - -logger = logging.getLogger(__name__) - - -""" -A MessagePartDoc is a light wrapper around the dictionary-like -data that we pass along for message parts. It can be used almost everywhere -that you would expect a SoledadDocument, since it has a dict under the -`content` attribute. - -We also keep some metadata on it, relative in part to the message as a whole, -and sometimes to a part in particular only. - -* `new` indicates that the document has just been created. SoledadStore -  should just create a new doc for all the related message parts. -* `store` indicates the type of store a given MessagePartDoc lives in. -  We currently use this to indicate that  the document comes from memeory, -  but we should probably get rid of it as soon as we extend the use of the -  SoledadStore interface along LeapMessage, MessageCollection and Mailbox. -* `part` is one of the MessagePartType enums. - -* `dirty` indicates that, while we already have the document in Soledad, -  we have modified its state in memory, so we need to put_doc instead while -  dumping the MemoryStore contents. -  `dirty` attribute would only apply to flags-docs and linkage-docs. -* `doc_id` is the identifier for the document in the u1db database, if any. - -""" - -MessagePartDoc = namedtuple( -    'MessagePartDoc', -    ['new', 'dirty', 'part', 'store', 'content', 'doc_id']) - -""" -A RecentFlagsDoc is used to send the recent-flags document payload to the -SoledadWriter during dumps. -""" -RecentFlagsDoc = namedtuple( -    'RecentFlagsDoc', -    ['content', 'doc_id']) - - -class ReferenciableDict(dict): -    """ -    A dict that can be weak-referenced. - -    Some builtin objects are not weak-referenciable unless -    subclassed. So we do. - -    Used to return pointers to the items in the MemoryStore. -    """ - - -class MessageWrapper(object): -    """ -    A simple nested dictionary container around the different message subparts. -    """ -    implements(interfaces.IMessageContainer) - -    FDOC = "fdoc" -    HDOC = "hdoc" -    CDOCS = "cdocs" -    DOCS_ID = "docs_id" - -    # Using slots to limit some the memory use, -    # Add your attribute here. - -    __slots__ = ["_dict", "_new", "_dirty", "_storetype", "memstore"] - -    def __init__(self, fdoc=None, hdoc=None, cdocs=None, -                 from_dict=None, memstore=None, -                 new=True, dirty=False, docs_id={}): -        """ -        Initialize a MessageWrapper. -        """ -        # TODO add optional reference to original message in the incoming -        self._dict = {} -        self.memstore = memstore - -        self._new = new -        self._dirty = dirty - -        self._storetype = "mem" - -        if from_dict is not None: -            self.from_dict(from_dict) -        else: -            if fdoc is not None: -                self._dict[self.FDOC] = ReferenciableDict(fdoc) -            if hdoc is not None: -                self._dict[self.HDOC] = ReferenciableDict(hdoc) -            if cdocs is not None: -                self._dict[self.CDOCS] = ReferenciableDict(cdocs) - -        # This will keep references to the doc_ids to be able to put -        # messages to soledad. It will be populated during the walk() to avoid -        # the overhead of reading from the db. - -        # XXX it really *only* make sense for the FDOC, the other parts -        # should not be "dirty", just new...!!! -        self._dict[self.DOCS_ID] = docs_id - -    # properties - -    # TODO Could refactor new and dirty properties together. - -    def _get_new(self): -        """ -        Get the value for the `new` flag. - -        :rtype: bool -        """ -        return self._new - -    def _set_new(self, value=False): -        """ -        Set the value for the `new` flag, and propagate it -        to the memory store if any. - -        :param value: the value to set -        :type value: bool -        """ -        self._new = value -        if self.memstore: -            mbox = self.fdoc.content.get('mbox', None) -            uid = self.fdoc.content.get('uid', None) -            if not mbox or not uid: -                logger.warning("Malformed fdoc") -                return -            key = mbox, uid -            fun = [self.memstore.unset_new_queued, -                   self.memstore.set_new_queued][int(value)] -            fun(key) -        else: -            logger.warning("Could not find a memstore referenced from this " -                           "MessageWrapper. The value for new will not be " -                           "propagated") - -    new = property(_get_new, _set_new, -                   doc="The `new` flag for this MessageWrapper") - -    def _get_dirty(self): -        """ -        Get the value for the `dirty` flag. - -        :rtype: bool -        """ -        return self._dirty - -    def _set_dirty(self, value=True): -        """ -        Set the value for the `dirty` flag, and propagate it -        to the memory store if any. - -        :param value: the value to set -        :type value: bool -        """ -        self._dirty = value -        if self.memstore: -            mbox = self.fdoc.content.get('mbox', None) -            uid = self.fdoc.content.get('uid', None) -            if not mbox or not uid: -                logger.warning("Malformed fdoc") -                return -            key = mbox, uid -            fun = [self.memstore.unset_dirty_queued, -                   self.memstore.set_dirty_queued][int(value)] -            fun(key) -        else: -            logger.warning("Could not find a memstore referenced from this " -                           "MessageWrapper. The value for new will not be " -                           "propagated") - -    dirty = property(_get_dirty, _set_dirty) - -    # IMessageContainer - -    @property -    def fdoc(self): -        """ -        Return a MessagePartDoc wrapping around a weak reference to -        the flags-document in this MemoryStore, if any. - -        :rtype: MessagePartDoc -        """ -        _fdoc = self._dict.get(self.FDOC, None) -        if _fdoc: -            content_ref = weakref.proxy(_fdoc) -        else: -            logger.warning("NO FDOC!!!") -            content_ref = {} - -        return MessagePartDoc(new=self.new, dirty=self.dirty, -                              store=self._storetype, -                              part=MessagePartType.fdoc, -                              content=content_ref, -                              doc_id=self._dict[self.DOCS_ID].get( -                                  self.FDOC, None)) - -    @property -    def hdoc(self): -        """ -        Return a MessagePartDoc wrapping around a weak reference to -        the headers-document in this MemoryStore, if any. - -        :rtype: MessagePartDoc -        """ -        _hdoc = self._dict.get(self.HDOC, None) -        if _hdoc: -            content_ref = weakref.proxy(_hdoc) -        else: -            content_ref = {} -        return MessagePartDoc(new=self.new, dirty=self.dirty, -                              store=self._storetype, -                              part=MessagePartType.hdoc, -                              content=content_ref, -                              doc_id=self._dict[self.DOCS_ID].get( -                                  self.HDOC, None)) - -    @property -    def cdocs(self): -        """ -        Return a weak reference to a zero-indexed dict containing -        the content-documents, or an empty dict if none found. -        If you want access to the MessagePartDoc for the individual -        parts, use the generator returned by `walk` instead. - -        :rtype: dict -        """ -        _cdocs = self._dict.get(self.CDOCS, None) -        if _cdocs: -            return weakref.proxy(_cdocs) -        else: -            return {} - -    def walk(self): -        """ -        Generator that iterates through all the parts, returning -        MessagePartDoc. Used for writing to SoledadStore. - -        :rtype: generator -        """ -        if self._dirty: -            try: -                mbox = self.fdoc.content[fields.MBOX_KEY] -                uid = self.fdoc.content[fields.UID_KEY] -                docid_dict = self._dict[self.DOCS_ID] -                docid_dict[self.FDOC] = self.memstore.get_docid_for_fdoc( -                    mbox, uid) -            except Exception as exc: -                logger.debug("Error while walking message...") -                logger.exception(exc) - -        if not empty(self.fdoc.content) and 'uid' in self.fdoc.content: -            yield self.fdoc -        if not empty(self.hdoc.content): -            yield self.hdoc -        for cdoc in self.cdocs.values(): -            if not empty(cdoc): -                content_ref = weakref.proxy(cdoc) -                yield MessagePartDoc(new=self.new, dirty=self.dirty, -                                     store=self._storetype, -                                     part=MessagePartType.cdoc, -                                     content=content_ref, -                                     doc_id=None) - -    # i/o - -    def as_dict(self): -        """ -        Return a dict representation of the parts contained. - -        :rtype: dict -        """ -        return self._dict - -    def from_dict(self, msg_dict): -        """ -        Populate MessageWrapper parts from a dictionary. -        It expects the same format that we use in a -        MessageWrapper. - - -        :param msg_dict: a dictionary containing the parts to populate -                         the MessageWrapper from -        :type msg_dict: dict -        """ -        fdoc, hdoc, cdocs = map( -            lambda part: msg_dict.get(part, None), -            [self.FDOC, self.HDOC, self.CDOCS]) - -        for t, doc in ((self.FDOC, fdoc), (self.HDOC, hdoc), -                       (self.CDOCS, cdocs)): -            self._dict[t] = ReferenciableDict(doc) if doc else None - - -class MessagePart(object): -    """ -    IMessagePart implementor, to be passed to several methods -    of the IMAP4Server. -    It takes a subpart message and is able to find -    the inner parts. - -    See the interface documentation. -    """ - -    implements(imap4.IMessagePart) - -    def __init__(self, soledad, part_map): -        """ -        Initializes the MessagePart. - -        :param soledad: Soledad instance. -        :type soledad: Soledad -        :param part_map: a dictionary containing the parts map for this -                         message -        :type part_map: dict -        """ -        # TODO -        # It would be good to pass the uid/mailbox also -        # for references while debugging. - -        # We have a problem on bulk moves, and is -        # that when the fetch on the new mailbox is done -        # the parts maybe are not complete. -        # So we should be able to fail with empty -        # docs until we solve that. The ideal would be -        # to gather the results of the deferred operations -        # to signal the operation is complete. -        #leap_assert(part_map, "part map dict cannot be null") - -        self._soledad = soledad -        self._pmap = part_map - -    def getSize(self): -        """ -        Return the total size, in octets, of this message part. - -        :return: size of the message, in octets -        :rtype: int -        """ -        if empty(self._pmap): -            return 0 -        size = self._pmap.get('size', None) -        if size is None: -            logger.error("Message part cannot find size in the partmap") -            size = 0 -        return size - -    def getBodyFile(self): -        """ -        Retrieve a file object containing only the body of this message. - -        :return: file-like object opened for reading -        :rtype: StringIO -        """ -        fd = StringIO.StringIO() -        if not empty(self._pmap): -            multi = self._pmap.get('multi') -            if not multi: -                phash = self._pmap.get("phash", None) -            else: -                pmap = self._pmap.get('part_map') -                first_part = pmap.get('1', None) -                if not empty(first_part): -                    phash = first_part['phash'] -                else: -                    phash = None - -            if phash is None: -                logger.warning("Could not find phash for this subpart!") -                payload = "" -            else: -                payload = self._get_payload_from_document_memoized(phash) -                if empty(payload): -                    payload = self._get_payload_from_document(phash) - -        else: -            logger.warning("Message with no part_map!") -            payload = "" - -        if payload: -            content_type = self._get_ctype_from_document(phash) -            charset = find_charset(content_type) -            if charset is None: -                charset = self._get_charset(payload) -            try: -                if isinstance(payload, unicode): -                    payload = payload.encode(charset) -            except UnicodeError as exc: -                logger.error( -                    "Unicode error, using 'replace'. {0!r}".format(exc)) -                payload = payload.encode(charset, 'replace') - -        fd.write(payload) -        fd.seek(0) -        return fd - -    # TODO should memory-bound this memoize!!! -    @memoized_method -    def _get_payload_from_document_memoized(self, phash): -        """ -        Memoized method call around the regular method, to be able -        to call the non-memoized method in case we got a None. - -        :param phash: the payload hash to retrieve by. -        :type phash: str or unicode -        :rtype: str or unicode or None -        """ -        return self._get_payload_from_document(phash) - -    def _get_payload_from_document(self, phash): -        """ -        Return the message payload from the content document. - -        :param phash: the payload hash to retrieve by. -        :type phash: str or unicode -        :rtype: str or unicode or None -        """ -        cdocs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) - -        cdoc = first(cdocs) -        if cdoc is None: -            logger.warning( -                "Could not find the content doc " -                "for phash %s" % (phash,)) -            payload = "" -        else: -            payload = cdoc.content.get(fields.RAW_KEY, "") -        return payload - -    # TODO should memory-bound this memoize!!! -    @memoized_method -    def _get_ctype_from_document(self, phash): -        """ -        Reeturn the content-type from the content document. - -        :param phash: the payload hash to retrieve by. -        :type phash: str or unicode -        :rtype: str or unicode -        """ -        cdocs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) - -        cdoc = first(cdocs) -        if not cdoc: -            logger.warning( -                "Could not find the content doc " -                "for phash %s" % (phash,)) -        ctype = cdoc.content.get('ctype', "") -        return ctype - -    @memoized_method -    def _get_charset(self, stuff): -        # TODO put in a common class with LeapMessage -        """ -        Gets (guesses?) the charset of a payload. - -        :param stuff: the stuff to guess about. -        :type stuff: str or unicode -        :return: charset -        :rtype: unicode -        """ -        # XXX existential doubt 2. shouldn't we make the scope -        # of the decorator somewhat more persistent? -        # ah! yes! and put memory bounds. -        return get_email_charset(stuff) - -    def getHeaders(self, negate, *names): -        """ -        Retrieve a group of message headers. - -        :param names: The names of the headers to retrieve or omit. -        :type names: tuple of str - -        :param negate: If True, indicates that the headers listed in names -                       should be omitted from the return value, rather -                       than included. -        :type negate: bool - -        :return: A mapping of header field names to header field values -        :rtype: dict -        """ -        # XXX refactor together with MessagePart method -        if not self._pmap: -            logger.warning("No pmap in Subpart!") -            return {} -        headers = dict(self._pmap.get("headers", [])) - -        names = map(lambda s: s.upper(), names) -        if negate: -            cond = lambda key: key.upper() not in names -        else: -            cond = lambda key: key.upper() in names - -        # default to most likely standard -        charset = find_charset(headers, "utf-8") -        headers2 = dict() -        for key, value in headers.items(): -            # twisted imap server expects *some* headers to be lowercase -            # We could use a CaseInsensitiveDict here... -            if key.lower() == "content-type": -                key = key.lower() - -            if not isinstance(key, str): -                key = key.encode(charset, 'replace') -            if not isinstance(value, str): -                value = value.encode(charset, 'replace') - -            # filter original dict by negate-condition -            if cond(key): -                headers2[key] = value -        return headers2 - -    def isMultipart(self): -        """ -        Return True if this message is multipart. -        """ -        if empty(self._pmap): -            logger.warning("Could not get part map!") -            return False -        multi = self._pmap.get("multi", False) -        return multi - -    def getSubPart(self, part): -        """ -        Retrieve a MIME submessage - -        :type part: C{int} -        :param part: The number of the part to retrieve, indexed from 0. -        :raise IndexError: Raised if the specified part does not exist. -        :raise TypeError: Raised if this message is not multipart. -        :rtype: Any object implementing C{IMessagePart}. -        :return: The specified sub-part. -        """ -        if not self.isMultipart(): -            raise TypeError - -        sub_pmap = self._pmap.get("part_map", {}) -        try: -            part_map = sub_pmap[str(part + 1)] -        except KeyError: -            logger.debug("getSubpart for %s: KeyError" % (part,)) -            raise IndexError - -        # XXX check for validity -        return MessagePart(self._soledad, part_map) diff --git a/mail/src/leap/mail/imap/messages.py b/mail/src/leap/mail/imap/messages.py index d47c8eb..7e0f973 100644 --- a/mail/src/leap/mail/imap/messages.py +++ b/mail/src/leap/mail/imap/messages.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*- -# messages.py -# Copyright (C) 2013, 2014 LEAP +# imap/messages.py +# Copyright (C) 2013-2015 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -15,85 +15,41 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  """ -LeapMessage and MessageCollection. +IMAPMessage and IMAPMessageCollection.  """ -import copy  import logging -import threading -import StringIO - -from collections import defaultdict -from functools import partial - +# import StringIO  from twisted.mail import imap4 -from twisted.internet import reactor  from zope.interface import implements -from zope.proxy import sameProxiedObjects  from leap.common.check import leap_assert, leap_assert_type  from leap.common.decorators import memoized_method  from leap.common.mail import get_email_charset -from leap.mail.adaptors import soledad_indexes as indexes -from leap.mail.constants import INBOX_NAME -from leap.mail.utils import find_charset, empty -from leap.mail.imap.index import IndexedDB -from leap.mail.imap.fields import fields, WithMsgFields -from leap.mail.imap.messageparts import MessagePart, MessagePartDoc -from leap.mail.imap.parser import MBoxParser - -logger = logging.getLogger(__name__) - -# TODO ------------------------------------------------------------ -# [ ] Add ref to incoming message during add_msg -# [ ] Add linked-from info. -#     * Need a new type of documents: linkage info. -#     * HDOCS are linked from FDOCs (ref to chash) -#     * CDOCS are linked from HDOCS (ref to chash) +from leap.mail.utils import find_charset -# [ ] Delete incoming mail only after successful write! -# [ ] Remove UID from syncable db. Store only those indexes locally. +from leap.mail.imap.messageparts import MessagePart +# from leap.mail.imap.messagepargs import MessagePartDoc +logger = logging.getLogger(__name__) -def try_unique_query(curried): -    """ -    Try to execute a query that is expected to have a -    single outcome, and log a warning if more than one document found. - -    :param curried: a curried function -    :type curried: callable -    """ -    # XXX FIXME ---------- convert to deferreds -    leap_assert(callable(curried), "A callable is expected") -    try: -        query = curried() -        if query: -            if len(query) > 1: -                # TODO we could take action, like trigger a background -                # process to kill dupes. -                name = getattr(curried, 'expected', 'doc') -                logger.warning( -                    "More than one %s found for this mbox, " -                    "we got a duplicate!!" % (name,)) -            return query.pop() -        else: -            return None -    except Exception as exc: -        logger.exception("Unhandled error %r" % exc) - +# TODO ------------------------------------------------------------ -# FIXME remove-me -#fdoc_locks = defaultdict(lambda: defaultdict(lambda: threading.Lock())) +# [ ] Add ref to incoming message during add_msg. +# [ ] Delete incoming mail only after successful write. -class IMAPMessage(fields, MBoxParser): +class IMAPMessage(object):      """      The main representation of a message.      """      implements(imap4.IMessage) -    def __init__(self, soledad, uid, mbox): +    # TODO ---- see what should we pass here instead +    # where's UID added to the message? +    # def __init__(self, soledad, uid, mbox): +    def __init__(self, message, collection):          """          Initializes a LeapMessage. @@ -103,81 +59,14 @@ class IMAPMessage(fields, MBoxParser):          :type uid: int or basestring          :param mbox: the mbox this message belongs to          :type mbox: str or unicode -        :param collection: a reference to the parent collection object -        :type collection: MessageCollection -        :param container: a IMessageContainer implementor instance -        :type container: IMessageContainer          """ -        self._soledad = soledad -        self._uid = int(uid) if uid is not None else None -        self._mbox = self._parse_mailbox_name(mbox) - -        self.__chash = None -        self.__bdoc = None - -    # TODO collection and container are deprecated. - -    # TODO move to adaptor - -    #@property -    #def fdoc(self): -        #""" -        #An accessor to the flags document. -        #""" -        #if all(map(bool, (self._uid, self._mbox))): -            #fdoc = None -            #if self._container is not None: -                #fdoc = self._container.fdoc -            #if not fdoc: -                #fdoc = self._get_flags_doc() -            #if fdoc: -                #fdoc_content = fdoc.content -                #self.__chash = fdoc_content.get( -                    #fields.CONTENT_HASH_KEY, None) -            #return fdoc -# -    #@property -    #def hdoc(self): -        #""" -        #An accessor to the headers document. -        #""" -        #container = self._container -        #if container is not None: -            #hdoc = self._container.hdoc -            #if hdoc and not empty(hdoc.content): -                #return hdoc -        #hdoc = self._get_headers_doc() -# -        #if container and not empty(hdoc.content): -            # mem-cache it -            #hdoc_content = hdoc.content -            #chash = hdoc_content.get(fields.CONTENT_HASH_KEY) -            #hdocs = {chash: hdoc_content} -            #container.memstore.load_header_docs(hdocs) -        #return hdoc -# -    #@property -    #def chash(self): -        #""" -        #An accessor to the content hash for this message. -        #""" -        #if not self.fdoc: -            #return None -        #if not self.__chash and self.fdoc: -            #self.__chash = self.fdoc.content.get( -                #fields.CONTENT_HASH_KEY, None) -        #return self.__chash - -    #@property -    #def bdoc(self): -        #""" -        #An accessor to the body document. -        #""" -        #if not self.hdoc: -            #return None -        #if not self.__bdoc: -            #self.__bdoc = self._get_body_doc() -        #return self.__bdoc +        #self._uid = int(uid) if uid is not None else None +        #self._mbox = normalize_mailbox(mbox) + +        self.message = message + +        # TODO maybe not needed, see setFlags below +        self.collection = collection      # IMessage implementation @@ -188,12 +77,7 @@ class IMAPMessage(fields, MBoxParser):          :return: uid for this message          :rtype: int          """ -        # TODO ----> return lookup in local sqlcipher table. -        return self._uid - -    # -------------------------------------------------------------- -    # TODO -- from here on, all the methods should be proxied to the -    # instance of leap.mail.mail.Message +        return self.message.get_uid()      def getFlags(self):          """ @@ -202,24 +86,14 @@ class IMAPMessage(fields, MBoxParser):          :return: The flags, represented as strings          :rtype: tuple          """ -        uid = self._uid - -        flags = set([]) -        fdoc = self.fdoc -        if fdoc: -            flags = set(fdoc.content.get(self.FLAGS_KEY, None)) +        return self.message.get_flags() -        msgcol = self._collection +    # setFlags not in the interface spec but we use it with store command. -        # We treat the recent flag specially: gotten from -        # a mailbox-level document. -        if msgcol and uid in msgcol.recent_flags: -            flags.add(fields.RECENT_FLAG) -        if flags: -            flags = map(str, flags) -        return tuple(flags) +    # XXX if we can move it to a collection method, we don't need to pass +    # collection to the IMAPMessage -    # setFlags not in the interface spec but we use it with store command. +    # lookup method? IMAPMailbox?      def setFlags(self, flags, mode):          """ @@ -231,32 +105,11 @@ class IMAPMessage(fields, MBoxParser):          :type mode: int          """          leap_assert(isinstance(flags, tuple), "flags need to be a tuple") -        mbox, uid = self._mbox, self._uid - -        APPEND = 1 -        REMOVE = -1 -        SET = 0 - -        doc = self.fdoc -        if not doc: -            logger.warning( -                "Could not find FDOC for %r:%s while setting flags!" % -                (mbox, uid)) -            return -        current = doc.content[self.FLAGS_KEY] -        if mode == APPEND: -            newflags = tuple(set(tuple(current) + flags)) -        elif mode == REMOVE: -            newflags = tuple(set(current).difference(set(flags))) -        elif mode == SET: -            newflags = flags -        new_fdoc = { -            self.FLAGS_KEY: newflags, -            self.SEEN_KEY: self.SEEN_FLAG in newflags, -            self.DEL_KEY: self.DELETED_FLAG in newflags} -        self._collection.memstore.update_flags(mbox, uid, new_fdoc) - -        return map(str, newflags) +        # XXX +        # return new flags +        # map to str +        #self.message.set_flags(flags, mode) +        self.collection.update_flags(self.message, flags, mode)      def getInternalDate(self):          """ @@ -273,8 +126,7 @@ class IMAPMessage(fields, MBoxParser):          :return: An RFC822-formatted date string.          :rtype: str          """ -        date = self.hdoc.content.get(fields.DATE_KEY, '') -        return date +        return self.message.get_internal_date()      #      # IMessagePart @@ -290,42 +142,40 @@ class IMAPMessage(fields, MBoxParser):          :return: file-like object opened for reading          :rtype: StringIO          """ -        def write_fd(body): -            fd.write(body) -            fd.seek(0) -            return fd - +        #def write_fd(body): +            #fd.write(body) +            #fd.seek(0) +            #return fd +#          # TODO refactor with getBodyFile in MessagePart - -        fd = StringIO.StringIO() - -        if self.bdoc is not None: -            bdoc_content = self.bdoc.content -            if empty(bdoc_content): -                logger.warning("No BDOC content found for message!!!") -                return write_fd("") - -            body = bdoc_content.get(self.RAW_KEY, "") -            content_type = bdoc_content.get('content-type', "") -            charset = find_charset(content_type) -            if charset is None: -                charset = self._get_charset(body) -            try: -                if isinstance(body, unicode): -                    body = body.encode(charset) -            except UnicodeError as exc: -                logger.error( -                    "Unicode error, using 'replace'. {0!r}".format(exc)) -                logger.debug("Attempted to encode with: %s" % charset) -                body = body.encode(charset, 'replace') -            finally: -                return write_fd(body) - -        # We are still returning funky characters from here. -        else: -            logger.warning("No BDOC found for message.") -            return write_fd("") - +# +        #fd = StringIO.StringIO() +# +        #if self.bdoc is not None: +            #bdoc_content = self.bdoc.content +            #if empty(bdoc_content): +                #logger.warning("No BDOC content found for message!!!") +                #return write_fd("") +# +            #body = bdoc_content.get(self.RAW_KEY, "") +            #content_type = bdoc_content.get('content-type', "") +            #charset = find_charset(content_type) +            #if charset is None: +                #charset = self._get_charset(body) +            #try: +                #if isinstance(body, unicode): +                    #body = body.encode(charset) +            #except UnicodeError as exc: +                #logger.error( +                    #"Unicode error, using 'replace'. {0!r}".format(exc)) +                #logger.debug("Attempted to encode with: %s" % charset) +                #body = body.encode(charset, 'replace') +            #finally: +                #return write_fd(body) + +        return self.message.get_body_file() + +    # TODO move to mail.mail      @memoized_method      def _get_charset(self, stuff):          """ @@ -337,7 +187,7 @@ class IMAPMessage(fields, MBoxParser):          """          # XXX shouldn't we make the scope          # of the decorator somewhat more persistent? -        # ah! yes! and put memory bounds. +        # and put memory bounds.          return get_email_charset(stuff)      def getSize(self): @@ -347,17 +197,11 @@ class IMAPMessage(fields, MBoxParser):          :return: size of the message, in octets          :rtype: int          """ -        size = None -        if self.fdoc is not None: -            fdoc_content = self.fdoc.content -            size = fdoc_content.get(self.SIZE_KEY, False) -        else: -            logger.warning("No FLAGS doc for %s:%s" % (self._mbox, -                                                       self._uid)) -        #if not size: -            # XXX fallback, should remove when all migrated. -            #size = self.getBodyFile().len -        return size +        #size = None +        #fdoc_content = self.fdoc.content +        #size = fdoc_content.get(self.SIZE_KEY, False) +        #return size +        return self.message.get_size()      def getHeaders(self, negate, *names):          """ @@ -374,10 +218,10 @@ class IMAPMessage(fields, MBoxParser):          :return: A mapping of header field names to header field values          :rtype: dict          """ -        # TODO split in smaller methods +        # TODO split in smaller methods -- format_headers()?          # XXX refactor together with MessagePart method -        headers = self._get_headers() +        headers = self.message.get_headers()          # XXX keep this in the imap imessage implementation,          # because the server impl. expects content-type to be present. @@ -417,34 +261,15 @@ class IMAPMessage(fields, MBoxParser):                  headers2[key] = value          return headers2 -    def _get_headers(self): -        """ -        Return the headers dict for this message. -        """ -        if self.hdoc is not None: -            hdoc_content = self.hdoc.content -            headers = hdoc_content.get(self.HEADERS_KEY, {}) -            return headers - -        else: -            logger.warning( -                "No HEADERS doc for msg %s:%s" % ( -                    self._mbox, -                    self._uid)) -      def isMultipart(self):          """          Return True if this message is multipart.          """ -        if self.fdoc: -            fdoc_content = self.fdoc.content -            is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) -            return is_multipart -        else: -            logger.warning( -                "No FLAGS doc for msg %s:%s" % ( -                    self._mbox, -                    self._uid)) +        #fdoc_content = self.fdoc.content +        #is_multipart = fdoc_content.get(self.MULTIPART_KEY, False) +        #return is_multipart + +        return self.message.fdoc.is_multi      def getSubPart(self, part):          """ @@ -463,12 +288,16 @@ class IMAPMessage(fields, MBoxParser):              pmap_dict = self._get_part_from_parts_map(part + 1)          except KeyError:              raise IndexError + +        # TODO move access to adaptor ----          return MessagePart(self._soledad, pmap_dict)      #      # accessors      # +    # FIXME +    # -- move to wrapper/adaptor      def _get_part_from_parts_map(self, part):          """          Get a part map from the headers doc @@ -476,100 +305,44 @@ class IMAPMessage(fields, MBoxParser):          :raises: KeyError if key does not exist          :rtype: dict          """ -        if not self.hdoc: -            logger.warning("Tried to get part but no HDOC found!") -            return None - -        hdoc_content = self.hdoc.content -        pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) +        raise NotImplementedError() +        #hdoc_content = self.hdoc.content +        #pmap = hdoc_content.get(fields.PARTS_MAP_KEY, {}) +#          # remember, lads, soledad is using strings in its keys,          # not integers! -        return pmap[str(part)] +        #return pmap[str(part)] -    # XXX moved to memory store -    # move the rest too. ------------------------------------------ -    def _get_flags_doc(self): -        """ -        Return the document that keeps the flags for this -        message. -        """ -        def get_first_if_any(docs): -            result = first(docs) -            return result if result else {} - -        d = self._soledad.get_from_index( -            fields.TYPE_MBOX_UID_IDX, -            fields.TYPE_FLAGS_VAL, self._mbox, str(self._uid)) -        d.addCallback(get_first_if_any) -        return d - -    # TODO move to soledadstore instead of accessing soledad directly -    def _get_headers_doc(self): -        """ -        Return the document that keeps the headers for this -        message. -        """ -        d = self._soledad.get_from_index( -            fields.TYPE_C_HASH_IDX, -            fields.TYPE_HEADERS_VAL, str(self.chash)) -        d.addCallback(lambda docs: first(docs)) -        return d - -    # TODO move to soledadstore instead of accessing soledad directly +    # TODO move to wrapper/adaptor      def _get_body_doc(self):          """          Return the document that keeps the body for this          message.          """ -        # XXX FIXME --- this might need a maybedeferred -        # on the receiving side... -        hdoc_content = self.hdoc.content -        body_phash = hdoc_content.get( -            fields.BODY_KEY, None) -        if not body_phash: -            logger.warning("No body phash for this document!") -            return None - -        # XXX get from memstore too... -        # if memstore: memstore.get_phrash -        # memstore should keep a dict with weakrefs to the -        # phash doc... - -        if self._container is not None: -            bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) -            if not empty(bdoc) and not empty(bdoc.content): -                return bdoc - +        # FIXME +        # -- just get the body and retrieve the cdoc P-<phash> +        #hdoc_content = self.hdoc.content +        #body_phash = hdoc_content.get( +            #fields.BODY_KEY, None) +        #if not body_phash: +            #logger.warning("No body phash for this document!") +            #return None +# +        #if self._container is not None: +            #bdoc = self._container.memstore.get_cdoc_from_phash(body_phash) +            #if not empty(bdoc) and not empty(bdoc.content): +                #return bdoc +#          # no memstore, or no body doc found there -        d = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(body_phash)) -        d.addCallback(lambda docs: first(docs)) -        return d - -    def __getitem__(self, key): -        """ -        Return an item from the content of the flags document, -        for convenience. - -        :param key: The key -        :type key: str - -        :return: The content value indexed by C{key} or None -        :rtype: str -        """ -        return self.fdoc.content.get(key, None) - -    def does_exist(self): -        """ -        Return True if there is actually a flags document for this -        UID and mbox. -        """ -        return not empty(self.fdoc) +        #d = self._soledad.get_from_index( +            #fields.TYPE_P_HASH_IDX, +            #fields.TYPE_CONTENT_VAL, str(body_phash)) +        #d.addCallback(lambda docs: first(docs)) +        #return d -class MessageCollection(WithMsgFields, IndexedDB, MBoxParser): +class IMAPMessageCollection(object):      """      A collection of messages, surprisingly. @@ -578,9 +351,15 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):      database.      """ -    # XXX this should be able to produce a MessageSet methinks -    # could validate these kinds of objects turning them -    # into a template for the class. +    messageklass = IMAPMessage + +    # TODO +    # [ ] Add RECENT flags docs to mailbox-doc attributes (list-of-uids) +    # [ ] move Query for all the headers documents to Collection + +    # TODO this should be able to produce a MessageSet methinks +    # TODO --- reimplement, review and prune documentation below. +      FLAGS_DOC = "FLAGS"      HEADERS_DOC = "HEADERS"      CONTENT_DOC = "CONTENT" @@ -604,145 +383,40 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):      """      HDOCS_SET_DOC = "HDOCS_SET" -    templates = { - -        # Mailbox Level - -        RECENT_DOC: { -            "type": indexes.RECENT, -            "mbox": INBOX_NAME, -            fields.RECENTFLAGS_KEY: [], -        }, - -        HDOCS_SET_DOC: { -            "type": indexes.HDOCS_SET, -            "mbox": INBOX_NAME, -            fields.HDOCS_SET_KEY: [], -        } - - -    } - -    # Different locks for wrapping both the u1db document getting/setting -    # and the property getting/settting in an atomic operation. - -    # TODO --- deprecate ! --- use SoledadDocumentWrapper + locks -    _rdoc_lock = defaultdict(lambda: threading.Lock()) -    _rdoc_write_lock = defaultdict(lambda: threading.Lock()) -    _rdoc_read_lock = defaultdict(lambda: threading.Lock()) -    _rdoc_property_lock = defaultdict(lambda: threading.Lock()) - -    _initialized = {} - -    def __init__(self, mbox=None, soledad=None, memstore=None): -        """ -        Constructor for MessageCollection. - -        On initialization, we ensure that we have a document for -        storing the recent flags. The nature of this flag make us wanting -        to store the set of the UIDs with this flag at the level of the -        MessageCollection for each mailbox, instead of treating them -        as a property of each message. - -        We are passed an instance of MemoryStore, the same for the -        SoledadBackedAccount, that we use as a read cache and a buffer -        for writes. - -        :param mbox: the name of the mailbox. It is the name -                     with which we filter the query over the -                     messages database. -        :type mbox: str -        :param soledad: Soledad database -        :type soledad: Soledad instance -        :param memstore: a MemoryStore instance -        :type memstore: MemoryStore +    def __init__(self, collection):          """ -        leap_assert(mbox, "Need a mailbox name to initialize") -        leap_assert(mbox.strip() != "", "mbox cannot be blank space") -        leap_assert(isinstance(mbox, (str, unicode)), -                    "mbox needs to be a string") -        leap_assert(soledad, "Need a soledad instance to initialize") +        Constructor for IMAPMessageCollection. -        # okay, all in order, keep going... - -        self.mbox = self._parse_mailbox_name(mbox) - -        # XXX get a SoledadStore passed instead -        self._soledad = soledad -        self.memstore = memstore - -        self.__rflags = None - -        if not self._initialized.get(mbox, False): -            try: -                self.initialize_db() -                # ensure that we have a recent-flags doc -                self._get_or_create_rdoc() -            except Exception: -                logger.debug("Error initializing %r" % (mbox,)) -            else: -                self._initialized[mbox] = True - -    def _get_empty_doc(self, _type=FLAGS_DOC): -        """ -        Returns an empty doc for storing different message parts. -        Defaults to returning a template for a flags document. -        :return: a dict with the template -        :rtype: dict -        """ -        if _type not in self.templates.keys(): -            raise TypeError("Improper type passed to _get_empty_doc") -        return copy.deepcopy(self.templates[_type]) - -    def _get_or_create_rdoc(self): -        """ -        Try to retrieve the recent-flags doc for this MessageCollection, -        and create one if not found. +        :param collection: an instance of a MessageCollection +        :type collection: MessageCollection          """ -        # XXX should move this to memstore too -        with self._rdoc_write_lock[self.mbox]: -            rdoc = self._get_recent_doc_from_soledad() -            if rdoc is None: -                rdoc = self._get_empty_doc(self.RECENT_DOC) -                if self.mbox != fields.INBOX_VAL: -                    rdoc[fields.MBOX_KEY] = self.mbox -                self._soledad.create_doc(rdoc) - -    # -------------------------------------------------------------------- +        leap_assert( +            collection.is_mailbox_collection(), +            "Need a mailbox name to initialize") +        mbox_name = collection.mbox_name +        leap_assert(mbox_name.strip() != "", "mbox cannot be blank space") +        leap_assert(isinstance(mbox_name, (str, unicode)), +                    "mbox needs to be a string") +        self.collection = collection -    # ----------------------------------------------------------------------- +        # XXX this has to be done in IMAPAccount +        # (Where the collection must be instantiated and passed to us) +        # self.mbox = normalize_mailbox(mbox) -    def _fdoc_already_exists(self, chash): +    @property +    def mbox_name(self):          """ -        Check whether we can find a flags doc for this mailbox with the -        given content-hash. It enforces that we can only have the same maessage -        listed once for a a given mailbox. - -        :param chash: the content-hash to check about. -        :type chash: basestring -        :return: False, if it does not exist, or UID. +        Return the string that identifies this mailbox.          """ -        exist = False -        exist = self.memstore.get_fdoc_from_chash(chash, self.mbox) +        return self.collection.mbox_name -        if not exist: -            exist = self._get_fdoc_from_chash(chash) -        if exist and exist.content is not None: -            return exist.content.get(fields.UID_KEY, "unknown-uid") -        else: -            return False - -    def add_msg(self, raw, subject=None, flags=None, date=None, -                notify_on_disk=False): +    def add_msg(self, raw, flags=None, date=None):          """          Creates a new message document.          :param raw: the raw message          :type raw: str -        :param subject: subject of the message. -        :type subject: str -          :param flags: flags          :type flags: list @@ -756,212 +430,30 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          if flags is None:              flags = tuple()          leap_assert_type(flags, tuple) +        return self.collection.add_msg(raw, flags, date) -        # TODO ---- proxy to MessageCollection addMessage - -        #observer = defer.Deferred() -        #d = self._do_parse(raw) -        #d.addCallback(lambda result: reactor.callInThread( -            #self._do_add_msg, result, flags, subject, date, -            #notify_on_disk, observer)) -        #return observer - -    # TODO --------------------------------------------------- -    # move this to leap.mail.adaptors.soledad - -    def _do_add_msg(self, parse_result, flags, subject, -                    date, notify_on_disk, observer): -        """ -        """ -        msg, parts, chash, size, multi = parse_result - -        # XXX move to SoledadAdaptor write operation ... ??? -        # check for uniqueness -------------------------------- -        # Watch out! We're reserving a UID right after this! -        existing_uid = self._fdoc_already_exists(chash) -        if existing_uid: -            msg = self.get_msg_by_uid(existing_uid) -            reactor.callFromThread(observer.callback, existing_uid) -            msg.setFlags((fields.DELETED_FLAG,), -1) -            return - -        # TODO move UID autoincrement to MessageCollection.addMessage(mailbox) -        # TODO S2 -- get FUCKING UID from autoincremental table -        #uid = self.memstore.increment_last_soledad_uid(self.mbox) -        #self.set_recent_flag(uid) - - -    # ------------------------------------------------------------ - -    # -    # getters: specific queries -    # - -    # recent flags - -    def _get_recent_flags(self): -        """ -        An accessor for the recent-flags set for this mailbox. +    def get_msg_by_uid(self, uid, absolute=True):          """ -        # XXX check if we should remove this -        if self.__rflags is not None: -            return self.__rflags - -        if self.memstore is not None: -            with self._rdoc_lock[self.mbox]: -                rflags = self.memstore.get_recent_flags(self.mbox) -                if not rflags: -                    # not loaded in the memory store yet. -                    # let's fetch them from soledad... -                    rdoc = self._get_recent_doc_from_soledad() -                    if rdoc is None: -                        return set([]) -                    rflags = set(rdoc.content.get( -                        fields.RECENTFLAGS_KEY, [])) -                    # ...and cache them now. -                    self.memstore.load_recent_flags( -                        self.mbox, -                        {'doc_id': rdoc.doc_id, 'set': rflags}) -            return rflags - -    def _set_recent_flags(self, value): -        """ -        Setter for the recent-flags set for this mailbox. -        """ -        if self.memstore is not None: -            self.memstore.set_recent_flags(self.mbox, value) - -    recent_flags = property( -        _get_recent_flags, _set_recent_flags, -        doc="Set of UIDs with the recent flag for this mailbox.") - -    def _get_recent_doc_from_soledad(self): -        """ -        Get recent-flags document from Soledad for this mailbox. -        :rtype: SoledadDocument or None -        """ -        # FIXME ----- use deferreds. -        curried = partial( -            self._soledad.get_from_index, -            fields.TYPE_MBOX_IDX, -            fields.TYPE_RECENT_VAL, self.mbox) -        curried.expected = "rdoc" -        with self._rdoc_read_lock[self.mbox]: -            return try_unique_query(curried) - -    # Property-set modification (protected by a different -    # lock to give atomicity to the read/write operation) - -    def unset_recent_flags(self, uids): -        """ -        Unset Recent flag for a sequence of uids. - -        :param uids: the uids to unset -        :type uid: sequence -        """ -        # FIXME ----- use deferreds. -        with self._rdoc_property_lock[self.mbox]: -            self.recent_flags.difference_update( -                set(uids)) - -    # Individual flags operations - -    def unset_recent_flag(self, uid): -        """ -        Unset Recent flag for a given uid. - -        :param uid: the uid to unset -        :type uid: int -        """ -        # FIXME ----- use deferreds. -        with self._rdoc_property_lock[self.mbox]: -            self.recent_flags.difference_update( -                set([uid])) - -    def set_recent_flag(self, uid): -        """ -        Set Recent flag for a given uid. +        Retrieves a IMAPMessage by UID. +        This is used primarity in the Mailbox fetch and store methods. -        :param uid: the uid to set +        :param uid: the message uid to query by          :type uid: int -        """ -        # FIXME ----- use deferreds. -        with self._rdoc_property_lock[self.mbox]: -            self.recent_flags = self.recent_flags.union( -                set([uid])) - -    # individual doc getters, message layer. -    def _get_fdoc_from_chash(self, chash): +        :rtype: IMAPMessage          """ -        Return a flags document for this mailbox with a given chash. +        def make_imap_msg(msg): +            kls = self.messageklass +            # TODO --- remove ref to collection +            return kls(msg, self.collection) -        :return: A SoledadDocument containing the Flags Document, or None if -                 the query failed. -        :rtype: SoledadDocument or None. -        """ -        # USED from: -        # [ ] duplicated fdoc detection -        # [ ] _get_uid_from_msgidCb - -        # FIXME ----- use deferreds. -        curried = partial( -            self._soledad.get_from_index, -            fields.TYPE_MBOX_C_HASH_IDX, -            fields.TYPE_FLAGS_VAL, self.mbox, chash) -        curried.expected = "fdoc" -        fdoc = try_unique_query(curried) -        if fdoc is not None: -            return fdoc -        else: -            # probably this should be the other way round, -            # ie, try fist on memstore... -            cf = self.memstore._chash_fdoc_store -            fdoc = cf[chash][self.mbox] -            # hey, I just needed to wrap fdoc thing into -            # a "content" attribute, look a better way... -            if not empty(fdoc): -                return MessagePartDoc( -                    new=None, dirty=None, part=None, -                    store=None, doc_id=None, -                    content=fdoc) - -    def _get_uid_from_msgidCb(self, msgid): -        hdoc = None -        curried = partial( -            self._soledad.get_from_index, -            fields.TYPE_MSGID_IDX, -            fields.TYPE_HEADERS_VAL, msgid) -        curried.expected = "hdoc" -        hdoc = try_unique_query(curried) - -        # XXX this is only a quick hack to avoid regression -        # on the "multiple copies of the draft" issue, but -        # this is currently broken since  it's not efficient to -        # look for this. Should lookup better. -        # FIXME! - -        if hdoc is not None: -            hdoc_dict = hdoc.content +        d = self.collection.get_msg_by_uid(uid, absolute=absolute) +        d.addCalback(make_imap_msg) +        return d -        else: -            hdocstore = self.memstore._hdoc_store -            match = [x for _, x in hdocstore.items() if x['msgid'] == msgid] -            hdoc_dict = first(match) - -        if hdoc_dict is None: -            logger.warning("Could not find hdoc for msgid %s" -                           % (msgid,)) -            return None -        msg_chash = hdoc_dict.get(fields.CONTENT_HASH_KEY) - -        fdoc = self._get_fdoc_from_chash(msg_chash) -        if not fdoc: -            logger.warning("Could not find fdoc for msgid %s" -                           % (msgid,)) -            return None -        return fdoc.content.get(fields.UID_KEY, None) +    # TODO -- move this to collection too +    # Used for the Search (Drafts) queries?      def _get_uid_from_msgid(self, msgid):          """          Return a UID for a given message-id. @@ -972,15 +464,10 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          :return: A UID, or None          """ -        # We need to wait a little bit, cause in some of the cases -        # the query is received right after we've saved the document, -        # and we cannot find it otherwise. This seems to be enough. - -        # XXX do a deferLater instead ?? -        # XXX is this working?          return self._get_uid_from_msgidCb(msgid) -    def set_flags(self, mbox, messages, flags, mode, observer): +    # TODO handle deferreds +    def set_flags(self, messages, flags, mode):          """          Set flags for a sequence of messages. @@ -1000,142 +487,27 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          getmsg = self.get_msg_by_uid          def set_flags(uid, flags, mode): -            msg = getmsg(uid, mem_only=True, flags_only=True) +            msg = getmsg(uid)              if msg is not None: +                # XXX IMAPMessage needs access to the collection +                # to be able to set flags. Better if we make use +                # of collection... here.                  return uid, msg.setFlags(flags, mode)          setted_flags = [set_flags(uid, flags, mode) for uid in messages]          result = dict(filter(None, setted_flags)) +        # XXX return gatherResults or something +        return result -        # TODO -- remove -        reactor.callFromThread(observer.callback, result) - -    # getters: generic for a mailbox - -    def get_msg_by_uid(self, uid, mem_only=False, flags_only=False): -        """ -        Retrieves a LeapMessage by UID. -        This is used primarity in the Mailbox fetch and store methods. - -        :param uid: the message uid to query by -        :type uid: int -        :param mem_only: a flag that indicates whether this Message should -                         pass a reference to soledad to retrieve missing pieces -                         or not. -        :type mem_only: bool -        :param flags_only: whether the message should carry only a reference -                           to the flags document. -        :type flags_only: bool - -        :return: A LeapMessage instance matching the query, -                 or None if not found. -        :rtype: LeapMessage -        """ -        msg_container = self.memstore.get_message( -            self.mbox, uid, flags_only=flags_only) - -        if msg_container is not None: -            if mem_only: -                msg = IMAPMessage(None, uid, self.mbox, collection=self, -                                  container=msg_container) -            else: -                # We pass a reference to soledad just to be able to retrieve -                # missing parts that cannot be found in the container, like -                # the content docs after a copy. -                msg = IMAPMessage(self._soledad, uid, self.mbox, -                                  collection=self, container=msg_container) -        else: -            msg = IMAPMessage(self._soledad, uid, self.mbox, collection=self) - -        if not msg.does_exist(): -            return None -        return msg - -    # FIXME --- used where ? --------------------------------------------- -    #def get_all_docs(self, _type=fields.TYPE_FLAGS_VAL): -        #""" -        #Get all documents for the selected mailbox of the -        #passed type. By default, it returns the flag docs. -# -        #If you want acess to the content, use __iter__ instead -# -        #:return: a Deferred, that will fire with a list of u1db documents -        #:rtype: Deferred (promise of list of SoledadDocument) -        #""" -        #if _type not in fields.__dict__.values(): -            #raise TypeError("Wrong type passed to get_all_docs") -# -        # FIXME ----- either raise or return a deferred wrapper. -        #if sameProxiedObjects(self._soledad, None): -            #logger.warning('Tried to get messages but soledad is None!') -            #return [] -# -        #def get_sorted_docs(docs): -            #all_docs = [doc for doc in docs] -            # inneficient, but first let's grok it and then -            # let's worry about efficiency. -            # XXX FIXINDEX -- should implement order by in soledad -            # FIXME ---------------------------------------------- -            #return sorted(all_docs, key=lambda item: item.content['uid']) -# -        #d = self._soledad.get_from_index( -            #fields.TYPE_MBOX_IDX, _type, self.mbox) -        #d.addCallback(get_sorted_docs) -        #return d - -    def all_soledad_uid_iter(self): -        """ -        Return an iterator through the UIDs of all messages, sorted in -        ascending order. -        """ -        # XXX FIXME ------ sorted??? - -        def get_uids(docs): -            return set([ -                doc.content[self.UID_KEY] for doc in docs if not empty(doc)]) - -        d = self._soledad.get_from_index( -            fields.TYPE_MBOX_IDX, fields.TYPE_FLAGS_VAL, self.mbox) -        d.addCallback(get_uids) -        return d - -    def all_uid_iter(self): -        """ -        Return an iterator through the UIDs of all messages, from memory. +    def count(self):          """ -        mem_uids = self.memstore.get_uids(self.mbox) -        soledad_known_uids = self.memstore.get_soledad_known_uids( -            self.mbox) -        combined = tuple(set(mem_uids).union(soledad_known_uids)) -        return combined +        Return the count of messages for this mailbox. -    def get_all_soledad_flag_docs(self): +        :rtype: int          """ -        Return a dict with the content of all the flag documents -        in soledad store for the given mbox. +        return self.collection.count() -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: dict -        """ -        # XXX we really could return a reduced version with -        # just {'uid': (flags-tuple,) since the prefetch is -        # only oriented to get the flag tuples. - -        def get_content(docs): -            all_docs = [( -                doc.content[self.UID_KEY], -                dict(doc.content)) -                for doc in docs -                if not empty(doc.content)] -            all_flags = dict(all_docs) -            return all_flags - -        d = self._soledad.get_from_index( -            fields.TYPE_MBOX_IDX, -            fields.TYPE_FLAGS_VAL, self.mbox) -        d.addCallback(get_content) -        return d +    # headers query      def all_headers(self):          """ @@ -1144,15 +516,9 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          :rtype: dict          """ -        return self.memstore.all_headers(self.mbox) - -    def count(self): -        """ -        Return the count of messages for this mailbox. - -        :rtype: int -        """ -        return self.memstore.count(self.mbox) +        # Use self.collection.mbox_indexer +        # and derive all the doc_ids for the hdocs +        raise NotImplementedError()      # unseen messages @@ -1164,7 +530,7 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          :return: iterator through unseen message doc UIDs          :rtype: iterable          """ -        return self.memstore.unseen_iter(self.mbox) +        raise NotImplementedError()      def count_unseen(self):          """ @@ -1182,13 +548,12 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          :returns: a list of LeapMessages          :rtype: list          """ -        return [IMAPMessage(self._soledad, docid, self.mbox, collection=self) -                for docid in self.unseen_iter()] +        raise NotImplementedError() +        #return [self.messageklass(self._soledad, doc_id, self.mbox) +                #for doc_id in self.unseen_iter()]      # recent messages -    # XXX take it from memstore -    # XXX Used somewhere?      def count_recent(self):          """          Count all messages with the `Recent` flag. @@ -1199,32 +564,22 @@ class MessageCollection(WithMsgFields, IndexedDB, MBoxParser):          :returns: count          :rtype: int          """ -        return len(self.recent_flags) +        raise NotImplementedError() + +    # magic      def __len__(self):          """          Returns the number of messages on this mailbox. -          :rtype: int          """          return self.count() -    def __iter__(self): -        """ -        Returns an iterator over all messages. - -        :returns: iterator of dicts with content for all messages. -        :rtype: iterable -        """ -        return (IMAPMessage(self._soledad, docuid, self.mbox, collection=self) -                for docuid in self.all_uid_iter()) -      def __repr__(self):          """          Representation string for this object.          """ -        return u"<MessageCollection: mbox '%s' (%s)>" % ( -            self.mbox, self.count()) +        return u"<IMAPMessageCollection: mbox '%s' (%s)>" % ( +            self.mbox_name, self.count()) -    # XXX should implement __eq__ also !!! -    # use chash... +    # TODO implement __iter__ ? diff --git a/mail/src/leap/mail/imap/soledadstore.py b/mail/src/leap/mail/imap/soledadstore.py deleted file mode 100644 index fc8ea55..0000000 --- a/mail/src/leap/mail/imap/soledadstore.py +++ /dev/null @@ -1,617 +0,0 @@ -# -*- coding: utf-8 -*- -# soledadstore.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. -""" -A MessageStore that writes to Soledad. -""" -import logging -import threading - -from collections import defaultdict -from itertools import chain - -from u1db import errors as u1db_errors -from twisted.python import log -from zope.interface import implements - -from leap.common.check import leap_assert_type, leap_assert -from leap.mail.decorators import deferred_to_thread -from leap.mail.imap.messageparts import MessagePartType -from leap.mail.imap.messageparts import MessageWrapper -from leap.mail.imap.messageparts import RecentFlagsDoc -from leap.mail.imap.fields import fields -from leap.mail.imap.interfaces import IMessageStore -from leap.mail.messageflow import IMessageConsumer -from leap.mail.utils import first, empty, accumulator_queue - -logger = logging.getLogger(__name__) - - -class ContentDedup(object): -    """ -    Message deduplication. - -    We do a query for the content hashes before writing to our beloved -    sqlcipher backend of Soledad. This means, by now, that: - -    1. We will not store the same body/attachment twice, only the hash of it. -    2. We will not store the same message header twice, only the hash of it. - -    The first case is useful if you are always receiving the same old memes -    from unwary friends that still have not discovered that 4chan is the -    generator of the internet. The second will save your day if you have -    initiated session with the same account in two different machines. I also -    wonder why would you do that, but let's respect each other choices, like -    with the religious celebrations, and assume that one day we'll be able -    to run Bitmask in completely free phones. Yes, I mean that, the whole GSM -    Stack. -    """ -    # TODO refactor using unique_query - -    def _header_does_exist(self, doc): -        """ -        Check whether we already have a header document for this -        content hash in our database. - -        :param doc: tentative header for document -        :type doc: dict -        :returns: True if it exists, False otherwise. -        """ -        if not doc: -            return False -        chash = doc[fields.CONTENT_HASH_KEY] -        header_docs = self._soledad.get_from_index( -            fields.TYPE_C_HASH_IDX, -            fields.TYPE_HEADERS_VAL, str(chash)) -        if not header_docs: -            return False - -        # FIXME enable only to debug this problem. -        #if len(header_docs) != 1: -            #logger.warning("Found more than one copy of chash %s!" -                           #% (chash,)) - -        #logger.debug("Found header doc with that hash! Skipping save!") -        return True - -    def _content_does_exist(self, doc): -        """ -        Check whether we already have a content document for a payload -        with this hash in our database. - -        :param doc: tentative content for document -        :type doc: dict -        :returns: True if it exists, False otherwise. -        """ -        if not doc: -            return False -        phash = doc[fields.PAYLOAD_HASH_KEY] -        attach_docs = self._soledad.get_from_index( -            fields.TYPE_P_HASH_IDX, -            fields.TYPE_CONTENT_VAL, str(phash)) -        if not attach_docs: -            return False - -        # FIXME enable only to debug this problem -        #if len(attach_docs) != 1: -            #logger.warning("Found more than one copy of phash %s!" -                           #% (phash,)) -        #logger.debug("Found attachment doc with that hash! Skipping save!") -        return True - - -class MsgWriteError(Exception): -    """ -    Raised if any exception is found while saving message parts. -    """ -    pass - - -""" -A lock per document. -""" -# TODO should bound the space of this!!! -# http://stackoverflow.com/a/2437645/1157664 -# Setting this to twice the number of threads in the threadpool -# should be safe. - -put_locks = defaultdict(lambda: threading.Lock()) -mbox_doc_locks = defaultdict(lambda: threading.Lock()) - - -class SoledadStore(ContentDedup): -    """ -    This will create docs in the local Soledad database. -    """ -    _remove_lock = threading.Lock() - -    implements(IMessageConsumer, IMessageStore) - -    def __init__(self, soledad): -        """ -        Initialize the permanent store that writes to Soledad database. - -        :param soledad: the soledad instance -        :type soledad: Soledad -        """ -        from twisted.internet import reactor -        self.reactor = reactor - -        self._soledad = soledad - -        self._CREATE_DOC_FUN = self._soledad.create_doc -        self._PUT_DOC_FUN = self._soledad.put_doc -        self._GET_DOC_FUN = self._soledad.get_doc - -        # we instantiate an accumulator to batch the notifications -        self.docs_notify_queue = accumulator_queue( -            lambda item: reactor.callFromThread(self._unset_new_dirty, item), -            20) - -    # IMessageStore - -    # ------------------------------------------------------------------- -    # We are not yet using this interface, but it would make sense -    # to implement it. - -    def create_message(self, mbox, uid, message): -        """ -        Create the passed message into this SoledadStore. - -        :param mbox: the mbox this message belongs. -        :type mbox: str or unicode -        :param uid: the UID that identifies this message in this mailbox. -        :type uid: int -        :param message: a IMessageContainer implementor. -        """ -        raise NotImplementedError() - -    def put_message(self, mbox, uid, message): -        """ -        Put the passed existing message into this SoledadStore. - -        :param mbox: the mbox this message belongs. -        :type mbox: str or unicode -        :param uid: the UID that identifies this message in this mailbox. -        :type uid: int -        :param message: a IMessageContainer implementor. -        """ -        raise NotImplementedError() - -    def remove_message(self, mbox, uid): -        """ -        Remove the given message from this SoledadStore. - -        :param mbox: the mbox this message belongs. -        :type mbox: str or unicode -        :param uid: the UID that identifies this message in this mailbox. -        :type uid: int -        """ -        raise NotImplementedError() - -    def get_message(self, mbox, uid): -        """ -        Get a IMessageContainer for the given mbox and uid combination. - -        :param mbox: the mbox this message belongs. -        :type mbox: str or unicode -        :param uid: the UID that identifies this message in this mailbox. -        :type uid: int -        """ -        raise NotImplementedError() - -    # IMessageConsumer - -    # TODO should handle the delete case -    # TODO should handle errors better -    # TODO could generalize this method into a generic consumer -    # and only implement `process` here - -    def consume(self, queue): -        """ -        Creates a new document in soledad db. - -        :param queue: a tuple of queues to get item from, with content of the -                      document to be inserted. -        :type queue: tuple of Queues -        """ -        new, dirty = queue -        while not new.empty(): -            doc_wrapper = new.get() -            self.reactor.callInThread(self._consume_doc, doc_wrapper, -                                      self.docs_notify_queue) -        while not dirty.empty(): -            doc_wrapper = dirty.get() -            self.reactor.callInThread(self._consume_doc, doc_wrapper, -                                      self.docs_notify_queue) - -        # Queue empty, flush the notifications queue. -        self.docs_notify_queue(None, flush=True) - -    def _unset_new_dirty(self, doc_wrapper): -        """ -        Unset the `new` and `dirty` flags for this document wrapper in the -        memory store. - -        :param doc_wrapper: a MessageWrapper instance -        :type doc_wrapper: MessageWrapper -        """ -        if isinstance(doc_wrapper, MessageWrapper): -            # XXX still needed for debug quite often -            #logger.info("unsetting new flag!") -            doc_wrapper.new = False -            doc_wrapper.dirty = False - -    @deferred_to_thread -    def _consume_doc(self, doc_wrapper, notify_queue): -        """ -        Consume each document wrapper in a separate thread. -        We pass an instance of an accumulator that handles the notifications -        to the memorystore when the write has been done. - -        :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance -        :type doc_wrapper: MessageWrapper or RecentFlagsDoc -        :param notify_queue: a callable that handles the writeback -                             notifications to the memstore. -        :type notify_queue: callable -        """ -        def queueNotifyBack(failed, doc_wrapper): -            if failed: -                log.msg("There was an error writing the mesage...") -            else: -                notify_queue(doc_wrapper) - -        def doSoledadCalls(items): -            # we prime the generator, that should return the -            # message or flags wrapper item in the first place. -            try: -                doc_wrapper = items.next() -            except StopIteration: -                pass -            else: -                failed = self._soledad_write_document_parts(items) -                queueNotifyBack(failed, doc_wrapper) - -        doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper)) - -    # -    # SoledadStore specific methods. -    # - -    def _soledad_write_document_parts(self, items): -        """ -        Write the document parts to soledad in a separate thread. - -        :param items: the iterator through the different document wrappers -                      payloads. -        :type items: iterator -        :return: whether the write was successful or not -        :rtype: bool -        """ -        failed = False -        for item, call in items: -            if empty(item): -                continue -            try: -                self._try_call(call, item) -            except Exception as exc: -                logger.debug("ITEM WAS: %s" % repr(item)) -                if hasattr(item, 'content'): -                    logger.debug("ITEM CONTENT WAS: %s" % -                                 repr(item.content)) -                logger.exception(exc) -                failed = True -                continue -        return failed - -    def _iter_wrapper_subparts(self, doc_wrapper): -        """ -        Return an iterator that will yield the doc_wrapper in the first place, -        followed by the subparts item and the proper call type for every -        item in the queue, if any. - -        :param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance -        :type doc_wrapper: MessageWrapper or RecentFlagsDoc -        """ -        if isinstance(doc_wrapper, MessageWrapper): -            return chain((doc_wrapper,), -                         self._get_calls_for_msg_parts(doc_wrapper)) -        elif isinstance(doc_wrapper, RecentFlagsDoc): -            return chain((doc_wrapper,), -                         self._get_calls_for_rflags_doc(doc_wrapper)) -        else: -            logger.warning("CANNOT PROCESS ITEM!") -            return (i for i in []) - -    def _try_call(self, call, item): -        """ -        Try to invoke a given call with item as a parameter. - -        :param call: the function to call -        :type call: callable -        :param item: the payload to pass to the call as argument -        :type item: object -        """ -        if call is None: -            return - -        if call == self._PUT_DOC_FUN: -            doc_id = item.doc_id -            if doc_id is None: -                logger.warning("BUG! Dirty doc but has no doc_id!") -                return -            with put_locks[doc_id]: -                doc = self._GET_DOC_FUN(doc_id) - -                if doc is None: -                    logger.warning("BUG! Dirty doc but could not " -                                   "find document %s" % (doc_id,)) -                    return - -                doc.content = dict(item.content) - -                item = doc -                try: -                    call(item) -                except u1db_errors.RevisionConflict as exc: -                    logger.exception("Error: %r" % (exc,)) -                    raise exc -                except Exception as exc: -                    logger.exception("Error: %r" % (exc,)) -                    raise exc - -        else: -            try: -                call(item) -            except u1db_errors.RevisionConflict as exc: -                logger.exception("Error: %r" % (exc,)) -                raise exc -            except Exception as exc: -                logger.exception("Error: %r" % (exc,)) -                raise exc - -    def _get_calls_for_msg_parts(self, msg_wrapper): -        """ -        Generator that return the proper call type for a given item. - -        :param msg_wrapper: A MessageWrapper -        :type msg_wrapper: IMessageContainer -        :return: a generator of tuples with recent-flags doc payload -                 and callable -        :rtype: generator -        """ -        call = None - -        if msg_wrapper.new: -            call = self._CREATE_DOC_FUN - -            # item is expected to be a MessagePartDoc -            for item in msg_wrapper.walk(): -                if item.part == MessagePartType.fdoc: -                    yield dict(item.content), call - -                elif item.part == MessagePartType.hdoc: -                    if not self._header_does_exist(item.content): -                        yield dict(item.content), call - -                elif item.part == MessagePartType.cdoc: -                    if not self._content_does_exist(item.content): -                        yield dict(item.content), call - -        # For now, the only thing that will be dirty is -        # the flags doc. - -        elif msg_wrapper.dirty: -            call = self._PUT_DOC_FUN -            # item is expected to be a MessagePartDoc -            for item in msg_wrapper.walk(): -                # XXX FIXME Give error if dirty and not doc_id !!! -                doc_id = item.doc_id  # defend! -                if not doc_id: -                    logger.warning("Dirty item but no doc_id!") -                    continue - -                if item.part == MessagePartType.fdoc: -                    yield item, call - -                # XXX also for linkage-doc !!! -        else: -            logger.error("Cannot delete documents yet from the queue...!") - -    def _get_calls_for_rflags_doc(self, rflags_wrapper): -        """ -        We always put these documents. - -        :param rflags_wrapper: A wrapper around recent flags doc. -        :type rflags_wrapper: RecentFlagsWrapper -        :return: a tuple with recent-flags doc payload and callable -        :rtype: tuple -        """ -        call = self._PUT_DOC_FUN - -        payload = rflags_wrapper.content -        if payload: -            logger.debug("Saving RFLAGS to Soledad...") -            yield rflags_wrapper, call - -    # Mbox documents and attributes - -    def get_mbox_document(self, mbox): -        """ -        Return mailbox document. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: A SoledadDocument containing this mailbox, or None if -                 the query failed. -        :rtype: SoledadDocument or None. -        """ -        with mbox_doc_locks[mbox]: -            return self._get_mbox_document(mbox) - -    def _get_mbox_document(self, mbox): -        """ -        Helper for returning the mailbox document. -        """ -        try: -            query = self._soledad.get_from_index( -                fields.TYPE_MBOX_IDX, -                fields.TYPE_MBOX_VAL, mbox) -            if query: -                return query.pop() -            else: -                logger.error("Could not find mbox document for %r" % -                             (mbox,)) -        except Exception as exc: -            logger.exception("Unhandled error %r" % exc) - -    def get_mbox_closed(self, mbox): -        """ -        Return the closed attribute for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :rtype: bool -        """ -        mbox_doc = self.get_mbox_document() -        return mbox_doc.content.get(fields.CLOSED_KEY, False) - -    def set_mbox_closed(self, mbox, closed): -        """ -        Set the closed attribute for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param closed:  the value to be set -        :type closed: bool -        """ -        leap_assert(isinstance(closed, bool), "closed needs to be boolean") -        with mbox_doc_locks[mbox]: -            mbox_doc = self._get_mbox_document(mbox) -            if mbox_doc is None: -                logger.error( -                    "Could not find mbox document for %r" % (mbox,)) -                return -            mbox_doc.content[fields.CLOSED_KEY] = closed -            self._soledad.put_doc(mbox_doc) - -    def write_last_uid(self, mbox, value): -        """ -        Write the `last_uid` integer to the proper mailbox document -        in Soledad. -        This is called from the deferred triggered by -        memorystore.increment_last_soledad_uid, which is expected to -        run in a separate thread. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param value: the value to set -        :type value: int -        """ -        leap_assert_type(value, int) -        key = fields.LAST_UID_KEY - -        # XXX use accumulator to reduce number of hits -        with mbox_doc_locks[mbox]: -            mbox_doc = self._get_mbox_document(mbox) -            old_val = mbox_doc.content[key] -            if value > old_val: -                mbox_doc.content[key] = value -                try: -                    self._soledad.put_doc(mbox_doc) -                except Exception as exc: -                    logger.error("Error while setting last_uid for %r" -                                 % (mbox,)) -                    logger.exception(exc) - -    def get_flags_doc(self, mbox, uid): -        """ -        Return the SoledadDocument for the given mbox and uid. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :param uid: the UID for the message -        :type uid: int -        :rtype: SoledadDocument or None -        """ -        # TODO -- inlineCallbacks -        result = None -        try: -            # TODO -- yield -            flag_docs = self._soledad.get_from_index( -                fields.TYPE_MBOX_UID_IDX, -                fields.TYPE_FLAGS_VAL, mbox, str(uid)) -            if len(flag_docs) != 1: -                logger.warning("More than one flag doc for %r:%s" % -                               (mbox, uid)) -            result = first(flag_docs) -        except Exception as exc: -            # ugh! Something's broken down there! -            logger.warning("ERROR while getting flags for UID: %s" % uid) -            logger.exception(exc) -        finally: -            return result - -    def get_headers_doc(self, chash): -        """ -        Return the document that keeps the headers for a message -        indexed by its content-hash. - -        :param chash: the content-hash to retrieve the document from. -        :type chash: str or unicode -        :rtype: SoledadDocument or None -        """ -        head_docs = self._soledad.get_from_index( -            fields.TYPE_C_HASH_IDX, -            fields.TYPE_HEADERS_VAL, str(chash)) -        return first(head_docs) - -    # deleted messages - -    def deleted_iter(self, mbox): -        """ -        Get an iterator for the the doc_id for SoledadDocuments for messages -        with \\Deleted flag for a given mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        :return: iterator through deleted message docs -        :rtype: iterable -        """ -        return [doc.doc_id for doc in self._soledad.get_from_index( -                fields.TYPE_MBOX_DEL_IDX, -                fields.TYPE_FLAGS_VAL, mbox, '1')] - -    def remove_all_deleted(self, mbox): -        """ -        Remove from Soledad all messages flagged as deleted for a given -        mailbox. - -        :param mbox: the mailbox -        :type mbox: str or unicode -        """ -        deleted = [] -        for doc_id in self.deleted_iter(mbox): -            with self._remove_lock: -                doc = self._soledad.get_doc(doc_id) -                if doc is not None: -                    self._soledad.delete_doc(doc) -                    try: -                        deleted.append(doc.content[fields.UID_KEY]) -                    except TypeError: -                        # empty content -                        pass -        return deleted diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py index ca07f67..482b64d 100644 --- a/mail/src/leap/mail/mail.py +++ b/mail/src/leap/mail/mail.py @@ -20,6 +20,7 @@ Generic Access to Mail objects: Public LEAP Mail API.  from twisted.internet import defer  from leap.mail.constants import INBOX_NAME +from leap.mail.constants import MessageFlags  from leap.mail.mailbox_indexer import MailboxIndexer  from leap.mail.adaptors.soledad import SoledadMailAdaptor @@ -61,6 +62,18 @@ class Message(object):      def get_internal_date(self):          """ +        Retrieve the date internally associated with this message + +        According to the spec, this is NOT the date and time in the +        RFC-822 header, but rather a date and time that reflects when the +        message was received. + +        * In SMTP, date and time of final delivery. +        * In COPY, internal date/time of the source message. +        * In APPEND, date/time specified. + +        :return: An RFC822-formatted date string. +        :rtype: str          """          return self._wrapper.fdoc.date @@ -99,6 +112,15 @@ class Message(object):          return tuple(self._wrapper.fdoc.tags) +class Flagsmode(object): +    """ +    Modes for setting the flags/tags. +    """ +    APPEND = 1 +    REMOVE = -1 +    SET = 0 + +  class MessageCollection(object):      """      A generic collection of messages. It can be messages sharing the same @@ -132,6 +154,7 @@ class MessageCollection(object):      def __init__(self, adaptor, store, mbox_indexer=None, mbox_wrapper=None):          """ +        Constructore for a MessageCollection.          """          self.adaptor = adaptor          self.store = store @@ -149,6 +172,20 @@ class MessageCollection(object):          """          return bool(self.mbox_wrapper) +    @property +    def mbox_name(self): +        wrapper = getattr(self, "mbox_wrapper", None) +        if not wrapper: +            return None +        return wrapper.mbox + +    def get_mbox_attr(self, attr): +        return getattr(self.mbox_wrapper, attr) + +    def set_mbox_attr(self, attr, value): +        setattr(self.mbox_wrapper, attr, value) +        return self.mbox_wrapper.update(self.store) +      # Get messages      def get_message_by_content_hash(self, chash, get_cdocs=False): @@ -162,7 +199,7 @@ class MessageCollection(object):              # or use the internal collection of pointers-to-docs.              raise NotImplementedError() -        metamsg_id = _get_mdoc_id(self.mbox_wrapper.mbox, chash) +        metamsg_id = _get_mdoc_id(self.mbox_name, chash)          return self.adaptor.get_msg_from_mdoc_id(              self.messageklass, self.store, @@ -181,25 +218,37 @@ class MessageCollection(object):              raise NotImplementedError("Does not support relative ids yet")          def get_msg_from_mdoc_id(doc_id): +            # XXX pass UID?              return self.adaptor.get_msg_from_mdoc_id(                  self.messageklass, self.store,                  doc_id, get_cdocs=get_cdocs) -        d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_wrapper.mbox, uid) +        d = self.mbox_indexer.get_doc_id_from_uid(self.mbox_name, uid)          d.addCallback(get_msg_from_mdoc_id)          return d      def count(self):          """          Count the messages in this collection. -        :rtype: int +        :return: a Deferred that will fire with the integer for the count. +        :rtype: Deferred          """          if not self.is_mailbox_collection():              raise NotImplementedError() -        return self.mbox_indexer.count(self.mbox_wrapper.mbox) +        return self.mbox_indexer.count(self.mbox_name) + +    def get_uid_next(self): +        """ +        Get the next integer beyond the highest UID count for this mailbox. + +        :return: a Deferred that will fire with the integer for the next uid. +        :rtype: Deferred +        """ +        return self.mbox_indexer.get_uid_next(self.mbox_name)      # Manipulate messages +    # TODO pass flags, date too...      def add_msg(self, raw_msg):          """          Add a message to this collection. @@ -208,14 +257,14 @@ class MessageCollection(object):          wrapper = msg.get_wrapper()          if self.is_mailbox_collection(): -            mbox = self.mbox_wrapper.mbox +            mbox = self.mbox_name              wrapper.set_mbox(mbox)          def insert_mdoc_id(_):              # XXX does this work?              doc_id = wrapper.mdoc.doc_id              return self.mbox_indexer.insert_doc( -                self.mbox_wrapper.mbox, doc_id) +                self.mbox_name, doc_id)          d = wrapper.create(self.store)          d.addCallback(insert_mdoc_id) @@ -248,31 +297,45 @@ class MessageCollection(object):              # XXX does this work?              doc_id = wrapper.mdoc.doc_id              return self.mbox_indexer.delete_doc_by_hash( -                self.mbox_wrapper.mbox, doc_id) +                self.mbox_name, doc_id)          d = wrapper.delete(self.store)          d.addCallback(delete_mdoc_id)          return d      # TODO should add a delete-by-uid to collection? +    def _update_flags_or_tags(self, old, new, mode): +        if mode == Flagsmode.APPEND: +            final = list((set(tuple(old) + new))) +        elif mode == Flagsmode.REMOVE: +            final = list(set(old).difference(set(new))) +        elif mode == Flagsmode.SET: +            final = new +        return final +      def udpate_flags(self, msg, flags, mode):          """          Update flags for a given message.          """          wrapper = msg.get_wrapper() -        # 1. update the flags in the message wrapper --- stored where??? -        # 2. update the special flags in the wrapper (seen, etc) -        # 3. call adaptor.update_msg(store) -        pass +        current = wrapper.fdoc.flags +        newflags = self._update_flags_or_tags(current, flags, mode) +        wrapper.fdoc.flags = newflags + +        wrapper.fdoc.seen = MessageFlags.SEEN_FLAG in newflags +        wrapper.fdoc.deleted = MessageFlags.DELETED_FLAG in newflags + +        return self.adaptor.update_msg(self.store, msg)      def update_tags(self, msg, tags, mode):          """          Update tags for a given message.          """          wrapper = msg.get_wrapper() -        # 1. update the tags in the message wrapper --- stored where??? -        # 2. call adaptor.update_msg(store) -        pass +        current = wrapper.fdoc.tags +        newtags = self._update_flags_or_tags(current, tags, mode) +        wrapper.fdoc.tags = newtags +        return self.adaptor.update_msg(self.store, msg)  class Account(object): @@ -382,6 +445,8 @@ class Account(object):          d.addCallback(rename_uid_table_cb)          return d +    # Get Collections +      def get_collection_by_mailbox(self, name):          """          :rtype: MessageCollection diff --git a/mail/src/leap/mail/messageflow.py b/mail/src/leap/mail/messageflow.py deleted file mode 100644 index c8f224c..0000000 --- a/mail/src/leap/mail/messageflow.py +++ /dev/null @@ -1,200 +0,0 @@ -# -*- coding: utf-8 -*- -# messageflow.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. -""" -Message Producers and Consumers for flow control. -""" -import Queue - -from twisted.internet.task import LoopingCall - -from zope.interface import Interface, implements - - -class IMessageConsumer(Interface): -    """ -    I consume messages from a queue. -    """ - -    def consume(self, queue): -        """ -        Consumes the passed item. - -        :param item: a queue where we put the object to be consumed. -        :type item: object -        """ -        # TODO we could add an optional type to be passed -        # for doing type check. - -        # TODO in case of errors, we could return the object to -        # the queue, maybe wrapped in an object with a retries attribute. - - -class IMessageProducer(Interface): -    """ -    I produce messages and put them in a store to be consumed by other -    entities. -    """ - -    def push(self, item, state=None): -        """ -        Push a new item in the queue. -        """ - -    def start(self): -        """ -        Start producing items. -        """ - -    def stop(self): -        """ -        Stop producing items. -        """ - -    def flush(self): -        """ -        Flush queued messages to consumer. -        """ - - -class DummyMsgConsumer(object): - -    implements(IMessageConsumer) - -    def consume(self, queue): -        """ -        Just prints the passed item. -        """ -        if not queue.empty(): -            print "got item %s" % queue.get() - - -class MessageProducer(object): -    """ -    A Producer class that we can use to temporarily buffer the production -    of messages so that different objects can consume them. - -    This is useful for serializing the consumption of the messages stream -    in the case of an slow resource (db), or for returning early from a -    deferred chain and leave further processing detached from the calling loop, -    as in the case of smtp. -    """ -    implements(IMessageProducer) - -    # TODO this can be seen as a first step towards properly implementing -    # components that implement IPushProducer / IConsumer  interfaces. -    # However, I need to think more about how to pause the streaming. -    # In any case, the differential rate between message production -    # and consumption is not likely (?) to consume huge amounts of memory in -    # our current settings, so the need to pause the stream is not urgent now. - -    # TODO use enum -    STATE_NEW = 1 -    STATE_DIRTY = 2 - -    def __init__(self, consumer, queue=Queue.Queue, period=1): -        """ -        Initializes the MessageProducer - -        :param consumer: an instance of a IMessageConsumer that will consume -                         the new messages. -        :param queue: any queue implementation to be used as the temporary -                      buffer for new items. Default is a FIFO Queue. -        :param period: the period to check for new items, in seconds. -        """ -        # XXX should assert it implements IConsumer / IMailConsumer -        # it should implement a `consume` method -        self._consumer = consumer - -        self._queue_new = queue() -        self._queue_dirty = queue() -        self._period = period - -        self._loop = LoopingCall(self._check_for_new) - -    # private methods - -    def _check_for_new(self): -        """ -        Check for new items in the internal queue, and calls the consume -        method in the consumer. - -        If the queue is found empty, the loop is stopped. It will be started -        again after the addition of new items. -        """ -        self._consumer.consume((self._queue_new, self._queue_dirty)) -        if self.is_queue_empty(): -            self.stop() - -    def is_queue_empty(self): -        """ -        Return True if queue is empty, False otherwise. -        """ -        new = self._queue_new -        dirty = self._queue_dirty -        return new.empty() and dirty.empty() - -    # public methods: IMessageProducer - -    def push(self, item, state=None): -        """ -        Push a new item in the queue. - -        If the queue was empty, we will start the loop again. -        """ -        # XXX this might raise if the queue does not accept any new -        # items. what to do then? -        queue = self._queue_new - -        if state == self.STATE_NEW: -            queue = self._queue_new -        if state == self.STATE_DIRTY: -            queue = self._queue_dirty - -        queue.put(item) -        self.start() - -    def start(self): -        """ -        Start polling for new items. -        """ -        if not self._loop.running: -            self._loop.start(self._period, now=True) - -    def stop(self): -        """ -        Stop polling for new items. -        """ -        if self._loop.running: -            self._loop.stop() - -    def flush(self): -        """ -        Flush queued messages to consumer. -        """ -        self._check_for_new() - - -if __name__ == "__main__": -    from twisted.internet import reactor -    producer = MessageProducer(DummyMsgConsumer()) -    producer.start() - -    for delay, item in ((2, 1), (3, 2), (4, 3), -                        (6, 4), (7, 5), (8, 6), (8.2, 7), -                        (15, 'a'), (16, 'b'), (17, 'c')): -        reactor.callLater(delay, producer.put, item) -    reactor.run() | 
