diff options
| -rw-r--r-- | mail/src/leap/mail/imap/account.py | 2 | ||||
| -rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 30 | ||||
| -rw-r--r-- | mail/src/leap/mail/incoming/__init__.py | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/incoming/service.py (renamed from mail/src/leap/mail/imap/fetch.py) | 91 | ||||
| -rw-r--r-- | mail/src/leap/mail/incoming/tests/__init__.py | 0 | ||||
| -rw-r--r-- | mail/src/leap/mail/incoming/tests/test_incoming_mail.py (renamed from mail/src/leap/mail/imap/tests/test_incoming_mail.py) | 93 | ||||
| -rw-r--r-- | mail/src/leap/mail/mail.py | 17 | 
7 files changed, 111 insertions, 122 deletions
| diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index 8a6e87eb..146d0663 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -63,7 +63,7 @@ class IMAPAccount(object):      selected = None      closed = False -    def __init__(self, user_id, store, d=None): +    def __init__(self, user_id, store, d=defer.Deferred()):          """          Keeps track of the mailboxes and subscriptions handled by this account. diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 5d88a79d..93e4d62a 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -30,10 +30,9 @@ logger = logging.getLogger(__name__)  from leap.common import events as leap_events  from leap.common.check import leap_assert, leap_assert_type, leap_check -from leap.keymanager import KeyManager  from leap.mail.imap.account import IMAPAccount -from leap.mail.imap.fetch import LeapIncomingMail  from leap.mail.imap.server import LEAPIMAPServer +from leap.mail.incoming import IncomingMail  from leap.soledad.client import Soledad  from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED @@ -55,10 +54,6 @@ if DO_PROFILE:  # The default port in which imap service will run  IMAP_PORT = 1984 -# The period between succesive checks of the incoming mail -# queue (in seconds) -INCOMING_CHECK_PERIOD = 60 -  class IMAPAuthRealm(object):      """ @@ -132,21 +127,16 @@ def run_service(*args, **kwargs):      """      Main entry point to run the service from the client. -    :returns: the LoopingCall instance that will have to be stoppped -              before shutting down the client, the port as returned by -              the reactor when starts listening, and the factory for -              the protocol. +    :returns: the port as returned by the reactor when starts listening, and +              the factory for the protocol.      """      leap_assert(len(args) == 2) -    soledad, keymanager = args +    soledad = args      leap_assert_type(soledad, Soledad) -    leap_assert_type(keymanager, KeyManager)      port = kwargs.get('port', IMAP_PORT) -    check_period = kwargs.get('check_period', INCOMING_CHECK_PERIOD)      userid = kwargs.get('userid', None)      leap_check(userid is not None, "need an user id") -    offline = kwargs.get('offline', False)      uuid = soledad.uuid      factory = LeapIMAPFactory(uuid, userid, soledad) @@ -154,16 +144,6 @@ def run_service(*args, **kwargs):      try:          tport = reactor.listenTCP(port, factory,                                    interface="localhost") -        if not offline: -            # FIXME --- update after meskio's work -            fetcher = LeapIncomingMail( -                keymanager, -                soledad, -                factory.theAccount, -                check_period, -                userid) -        else: -            fetcher = None      except CannotListenError:          logger.error("IMAP Service failed to start: "                       "cannot listen in port %s" % (port,)) @@ -186,7 +166,7 @@ def run_service(*args, **kwargs):          leap_events.signal(IMAP_SERVICE_STARTED, str(port))          # FIXME -- change service signature -        return fetcher, tport, factory +        return tport, factory      # not ok, signal error.      leap_events.signal(IMAP_SERVICE_FAILED_TO_START, str(port)) diff --git a/mail/src/leap/mail/incoming/__init__.py b/mail/src/leap/mail/incoming/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/mail/src/leap/mail/incoming/__init__.py diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/incoming/service.py index dbc726ac..e52c7272 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/incoming/service.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*- -# fetch.py -# Copyright (C) 2013 LEAP +# service.py +# Copyright (C) 2015 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -31,6 +31,7 @@ from email.utils import parseaddr  from StringIO import StringIO  from urlparse import urlparse +from twisted.application.service import Service  from twisted.python import log  from twisted.internet import defer, reactor  from twisted.internet.task import LoopingCall @@ -49,8 +50,8 @@ from leap.common.events.events_pb2 import SOLEDAD_INVALID_AUTH_TOKEN  from leap.common.mail import get_email_charset  from leap.keymanager import errors as keymanager_errors  from leap.keymanager.openpgp import OpenPGPKey +from leap.mail.adaptors import soledad_indexes as fields  from leap.mail.decorators import deferred_to_thread -from leap.mail.imap.fields import fields  from leap.mail.utils import json_loads, empty, first  from leap.soledad.client import Soledad  from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -64,6 +65,10 @@ MULTIPART_SIGNED = "multipart/signed"  PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"  PGP_END = "-----END PGP MESSAGE-----" +# The period between succesive checks of the incoming mail +# queue (in seconds) +INCOMING_CHECK_PERIOD = 60 +  class MalformedMessage(Exception):      """ @@ -72,12 +77,13 @@ class MalformedMessage(Exception):      pass -class LeapIncomingMail(object): +class IncomingMail(Service):      """      Fetches and process mail from the incoming pool. -    This object has public methods start_loop and stop that will -    actually initiate a LoopingCall with check_period recurrency. +    This object implements IService interface, has public methods +    startService and stopService that will actually initiate a +    LoopingCall with check_period recurrency.      The LoopingCall itself will invoke the fetch method each time      that the check_period expires. @@ -85,6 +91,8 @@ class LeapIncomingMail(object):      process all the documents found tagged as incoming mail.      """ +    name = "IncomingMail" +      RECENT_FLAG = "\\Recent"      CONTENT_KEY = "content" @@ -100,11 +108,11 @@ class LeapIncomingMail(object):      fetching_lock = threading.Lock() -    def __init__(self, keymanager, soledad, imap_account, -                 check_period, userid): +    def __init__(self, keymanager, soledad, inbox, userid, +                 check_period=INCOMING_CHECK_PERIOD):          """ -        Initialize LeapIncomingMail.. +        Initialize IncomingMail..          :param keymanager: a keymanager instance          :type keymanager: keymanager.KeyManager @@ -112,8 +120,8 @@ class LeapIncomingMail(object):          :param soledad: a soledad instance          :type soledad: Soledad -        :param imap_account: the account to fetch periodically -        :type imap_account: SoledadBackedAccount +        :param inbox: the inbox where the new emails will be stored +        :type inbox: IMAPMailbox          :param check_period: the period to fetch new mail, in seconds.          :type check_period: int @@ -127,8 +135,7 @@ class LeapIncomingMail(object):          self._keymanager = keymanager          self._soledad = soledad -        self.imapAccount = imap_account -        self._inbox = self.imapAccount.getMailbox('inbox') +        self._inbox = inbox          self._userid = userid          self._loop = None @@ -148,21 +155,22 @@ class LeapIncomingMail(object):          Calls a deferred that will execute the fetch callback          in a separate thread          """ -        def syncSoledadCallback(result): -            # FIXME this needs a matching change in mx!!! -            # --> need to add ERROR_DECRYPTING_KEY = False -            # as default. -            try: -                doclist = self._soledad.get_from_index( -                    fields.JUST_MAIL_IDX, "*", "0") -            except u1db_errors.InvalidGlobbing: +        def mail_compat(failure): +            if failure.check(u1db_errors.InvalidGlobbing):                  # It looks like we are a dealing with an outdated                  # mx. Fallback to the version of the index                  warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",                                DeprecationWarning) -                doclist = self._soledad.get_from_index( +                return self._soledad.get_from_index(                      fields.JUST_MAIL_COMPAT_IDX, "*") -            return self._process_doclist(doclist) +            return failure + +        def syncSoledadCallback(_): +            d = self._soledad.get_from_index( +                fields.JUST_MAIL_IDX, "*", "0") +            d.addErrback(mail_compat) +            d.addCallback(self._process_doclist) +            return d          logger.debug("fetching mail for: %s %s" % (              self._soledad.uuid, self._userid)) @@ -175,24 +183,25 @@ class LeapIncomingMail(object):          else:              logger.debug("Already fetching mail.") -    def start_loop(self): +    def startService(self):          """          Starts a loop to fetch mail.          """ +        Service.startService(self)          if self._loop is None:              self._loop = LoopingCall(self.fetch)              self._loop.start(self._check_period)          else:              logger.warning("Tried to start an already running fetching loop.") -    def stop(self): -        # XXX change the name to stop_loop, for consistency. +    def stopService(self):          """          Stops the loop that fetches mail.          """          if self._loop and self._loop.running is True:              self._loop.stop()              self._loop = None +        Service.stopService(self)      #      # Private methods. @@ -296,7 +305,7 @@ class LeapIncomingMail(object):      # operations on individual messages      # -    @deferred_to_thread +    #FIXME: @deferred_to_thread      def _decrypt_doc(self, doc):          """          Decrypt the contents of a document. @@ -319,15 +328,14 @@ class LeapIncomingMail(object):                  success = False              leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") - -            data = self._process_decrypted_doc(doc, decrdata) -            return doc, data +            return self._process_decrypted_doc(doc, decrdata)          d = self._keymanager.decrypt(              doc.content[ENC_JSON_KEY],              self._userid, OpenPGPKey)          d.addErrback(self._errback)          d.addCallback(process_decrypted) +        d.addCallback(lambda data: (doc, data))          return d      def _process_decrypted_doc(self, doc, data): @@ -340,8 +348,9 @@ class LeapIncomingMail(object):                       message          :type data: str -        :return: the processed data. -        :rtype: str +        :return: a Deferred that will be fired with an str of the proccessed +                 data. +        :rtype: Deferred          """          log.msg('processing decrypted doc') @@ -409,8 +418,10 @@ class LeapIncomingMail(object):          :param data: the text to be decrypted.          :type data: str -        :return: data, possibly decrypted. -        :rtype: str + +        :return: a Deferred that will be fired with an str of data, possibly +                 decrypted. +        :rtype: Deferred          """          leap_assert_type(data, str)          log.msg('maybe decrypting doc') @@ -426,7 +437,8 @@ class LeapIncomingMail(object):                   or msg.get_content_type() == MULTIPART_SIGNED)):                  senderAddress = parseaddr(fromHeader) -        def add_leap_header(decrmsg, signkey): +        def add_leap_header(ret): +            decrmsg, signkey = ret              if (senderAddress is None or                      isinstance(signkey, keymanager_errors.KeyNotFound)):                  decrmsg.add_header( @@ -596,11 +608,11 @@ class LeapIncomingMail(object):          _, fromAddress = parseaddr(msg['from'])          header = msg.get(OpenPGP_HEADER, None) -        dh = defer.success() +        dh = defer.succeed(None)          if header is not None:              dh = self._extract_openpgp_header(header, fromAddress) -        da = defer.success() +        da = defer.succeed(None)          if msg.is_multipart():              da = self._extract_attached_key(msg.get_payload(), fromAddress) @@ -620,7 +632,7 @@ class LeapIncomingMail(object):          :return: A Deferred that will be fired when header extraction is done          :rtype: Deferred          """ -        d = defer.success() +        d = defer.succeed(None)          fields = dict([f.strip(' ').split('=') for f in header.split(';')])          if 'url' in fields:              url = shlex.split(fields['url'])[0]  # remove quotations @@ -704,8 +716,7 @@ class LeapIncomingMail(object):                  deferLater(reactor, 0, self._delete_incoming_message, doc)                  leap_events.signal(IMAP_MSG_DELETED_INCOMING) -        d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,), -                                   notify_on_disk=True) +        d = self._inbox.addMessage(data, (self.RECENT_FLAG,))          d.addCallbacks(msgSavedCallback, self._errback)          return d diff --git a/mail/src/leap/mail/incoming/tests/__init__.py b/mail/src/leap/mail/incoming/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/mail/src/leap/mail/incoming/tests/__init__.py diff --git a/mail/src/leap/mail/imap/tests/test_incoming_mail.py b/mail/src/leap/mail/incoming/tests/test_incoming_mail.py index 03c0164c..bf95b1d1 100644 --- a/mail/src/leap/mail/imap/tests/test_incoming_mail.py +++ b/mail/src/leap/mail/incoming/tests/test_incoming_mail.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*- -# test_imap.py -# Copyright (C) 2014 LEAP +# test_incoming_mail.py +# Copyright (C) 2015 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -15,7 +15,7 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  """ -Test case for leap.email.imap.fetch +Test case for leap.mail.incoming.service  @authors: Ruben Pollan, <meskio@sindominio.net> @@ -28,13 +28,13 @@ from email.mime.application import MIMEApplication  from email.mime.multipart import MIMEMultipart  from email.parser import Parser  from mock import Mock +from twisted.internet import defer  from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.imap.account import SoledadBackedAccount -from leap.mail.imap.fetch import LeapIncomingMail -from leap.mail.imap.fields import fields -from leap.mail.imap.memorystore import MemoryStore -from leap.mail.imap.service.imap import INCOMING_CHECK_PERIOD +from leap.mail.adaptors import soledad_indexes as fields +from leap.mail.constants import INBOX_NAME +from leap.mail.imap.account import IMAPAccount +from leap.mail.incoming.service import IncomingMail  from leap.mail.tests import (      TestCaseWithKeyManager,      ADDRESS, @@ -47,7 +47,7 @@ from leap.soledad.common.crypto import (  ) -class LeapIncomingMailTestCase(TestCaseWithKeyManager): +class IncomingMailTestCase(TestCaseWithKeyManager):      """      Tests for the incoming mail parser      """ @@ -71,28 +71,31 @@ subject: independence of cyberspace      }      def setUp(self): -        super(LeapIncomingMailTestCase, self).setUp() - -        # Soledad sync makes trial block forever. The sync it's mocked to fix -        # this problem. _mock_soledad_get_from_index can be used from the tests -        # to provide documents. -        self._soledad.sync = Mock() - -        memstore = MemoryStore() -        theAccount = SoledadBackedAccount( -            ADDRESS, -            soledad=self._soledad, -            memstore=memstore) -        self.fetcher = LeapIncomingMail( -            self._km, -            self._soledad, -            theAccount, -            INCOMING_CHECK_PERIOD, -            ADDRESS) +        def getInbox(_): +            theAccount = IMAPAccount(ADDRESS, self._soledad) +            return theAccount.callWhenReady( +                lambda _: theAccount.getMailbox(INBOX_NAME)) + +        def setUpFetcher(inbox): +            # Soledad sync makes trial block forever. The sync it's mocked to +            # fix this problem. _mock_soledad_get_from_index can be used from +            # the tests to provide documents. +            self._soledad.sync = Mock() + +            self.fetcher = IncomingMail( +                self._km, +                self._soledad, +                inbox, +                ADDRESS) + +        d = super(IncomingMailTestCase, self).setUp() +        d.addCallback(getInbox) +        d.addCallback(setUpFetcher) +        return d      def tearDown(self):          del self.fetcher -        super(LeapIncomingMailTestCase, self).tearDown() +        return super(IncomingMailTestCase, self).tearDown()      def testExtractOpenPGPHeader(self):          """ @@ -103,15 +106,18 @@ subject: independence of cyberspace          message = Parser().parsestr(self.EMAIL)          message.add_header("OpenPGP", OpenPGP) -        email = self._create_incoming_email(message.as_string()) -        self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]) -        self.fetcher._keymanager.fetch_key = Mock() +        self.fetcher._keymanager.fetch_key = Mock( +            return_value=defer.succeed(None))          def fetch_key_called(ret):              self.fetcher._keymanager.fetch_key.assert_called_once_with(                  self.FROM_ADDRESS, KEYURL, OpenPGPKey) -        d = self.fetcher.fetch() +        d = self._create_incoming_email(message.as_string()) +        d.addCallback( +            lambda email: +            self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) +        d.addCallback(lambda _: self.fetcher.fetch())          d.addCallback(fetch_key_called)          return d @@ -124,14 +130,16 @@ subject: independence of cyberspace          message = Parser().parsestr(self.EMAIL)          message.add_header("OpenPGP", OpenPGP) -        email = self._create_incoming_email(message.as_string()) -        self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])          self.fetcher._keymanager.fetch_key = Mock()          def fetch_key_called(ret):              self.assertFalse(self.fetcher._keymanager.fetch_key.called) -        d = self.fetcher.fetch() +        d = self._create_incoming_email(message.as_string()) +        d.addCallback( +            lambda email: +            self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) +        d.addCallback(lambda _: self.fetcher.fetch())          d.addCallback(fetch_key_called)          return d @@ -146,12 +154,14 @@ subject: independence of cyberspace          key = MIMEApplication("", "pgp-keys")          key.set_payload(KEY)          message.attach(key) +        self.fetcher._keymanager.put_raw_key = Mock( +            return_value=defer.succeed(None)) -        def put_raw_key_called(ret): +        def put_raw_key_called(_):              self.fetcher._keymanager.put_raw_key.assert_called_once_with(                  KEY, OpenPGPKey, address=self.FROM_ADDRESS) -        d = self.mock_fetch(message.as_string()) +        d = self._mock_fetch(message.as_string())          d.addCallback(put_raw_key_called)          return d @@ -170,16 +180,15 @@ subject: independence of cyberspace              {"incoming": True, "content": email_str},              ensure_ascii=False) -        def set_email_content(pubkey): +        def set_email_content(encr_data):              email.content = {                  fields.INCOMING_KEY: True,                  fields.ERROR_DECRYPTING_KEY: False,                  ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY, -                ENC_JSON_KEY: str(self._km.encrypt(data, pubkey)) +                ENC_JSON_KEY: encr_data              }              return email - -        d = self._km.get_key(ADDRESS, OpenPGPKey) +        d = self._km.encrypt(data, ADDRESS, OpenPGPKey, fetch_remote=False)          d.addCallback(set_email_content)          return d @@ -188,6 +197,6 @@ subject: independence of cyberspace          def soledad_mock(idx_name, *key_values):              if index_name == idx_name: -                return value +                return defer.succeed(value)              return get_from_index(idx_name, *key_values)          self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock) diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py index 8137f972..cb37d255 100644 --- a/mail/src/leap/mail/mail.py +++ b/mail/src/leap/mail/mail.py @@ -414,15 +414,10 @@ class MessageCollection(object):      # Manipulate messages -    def add_msg(self, raw_msg, flags=None, tags=None, date=None): +    def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date=""):          """          Add a message to this collection.          """ -        if not flags: -            flags = tuple() -        if not tags: -            tags = tuple() -          leap_assert_type(flags, tuple)          leap_assert_type(date, str) @@ -582,7 +577,6 @@ class Account(object):          self.mbox_indexer = MailboxIndexer(self.store)          self.deferred_initialization = defer.Deferred() -        self._initialized = False          self._ready_cb = ready_cb          self._init_d = self._initialize_storage() @@ -594,7 +588,6 @@ class Account(object):                  return self.add_mailbox(INBOX_NAME)          def finish_initialization(result): -            self._initialized = True              self.deferred_initialization.callback(None)              if self._ready_cb is not None:                  self._ready_cb() @@ -606,12 +599,8 @@ class Account(object):          return d      def callWhenReady(self, cb, *args, **kw): -        if self._initialized: -            cb(self, *args, **kw) -            return defer.succeed(None) -        else: -            self.deferred_initialization.addCallback(cb, *args, **kw) -            return self.deferred_initialization +        self.deferred_initialization.addCallback(cb, *args, **kw) +        return self.deferred_initialization      #      # Public API Starts | 
