diff options
author | Ruben Pollan <meskio@sindominio.net> | 2015-01-14 13:50:02 -0600 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-02-11 14:05:43 -0400 |
commit | b481d239101e9bd090ffcca547294cf30738d028 (patch) | |
tree | b8c85184db6b711a3eb583ae7565c3fdcb72f5d5 | |
parent | a794d60f9985f5dc7507ada1d5dab65e9fe6874e (diff) |
Refactor fetch into leap.mail.incoming IService
-rw-r--r-- | mail/src/leap/mail/imap/account.py | 2 | ||||
-rw-r--r-- | mail/src/leap/mail/imap/service/imap.py | 30 | ||||
-rw-r--r-- | mail/src/leap/mail/incoming/__init__.py | 0 | ||||
-rw-r--r-- | mail/src/leap/mail/incoming/service.py (renamed from mail/src/leap/mail/imap/fetch.py) | 91 | ||||
-rw-r--r-- | mail/src/leap/mail/incoming/tests/__init__.py | 0 | ||||
-rw-r--r-- | mail/src/leap/mail/incoming/tests/test_incoming_mail.py (renamed from mail/src/leap/mail/imap/tests/test_incoming_mail.py) | 93 | ||||
-rw-r--r-- | mail/src/leap/mail/mail.py | 17 |
7 files changed, 111 insertions, 122 deletions
diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py index 8a6e87e..146d066 100644 --- a/mail/src/leap/mail/imap/account.py +++ b/mail/src/leap/mail/imap/account.py @@ -63,7 +63,7 @@ class IMAPAccount(object): selected = None closed = False - def __init__(self, user_id, store, d=None): + def __init__(self, user_id, store, d=defer.Deferred()): """ Keeps track of the mailboxes and subscriptions handled by this account. diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py index 5d88a79..93e4d62 100644 --- a/mail/src/leap/mail/imap/service/imap.py +++ b/mail/src/leap/mail/imap/service/imap.py @@ -30,10 +30,9 @@ logger = logging.getLogger(__name__) from leap.common import events as leap_events from leap.common.check import leap_assert, leap_assert_type, leap_check -from leap.keymanager import KeyManager from leap.mail.imap.account import IMAPAccount -from leap.mail.imap.fetch import LeapIncomingMail from leap.mail.imap.server import LEAPIMAPServer +from leap.mail.incoming import IncomingMail from leap.soledad.client import Soledad from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED @@ -55,10 +54,6 @@ if DO_PROFILE: # The default port in which imap service will run IMAP_PORT = 1984 -# The period between succesive checks of the incoming mail -# queue (in seconds) -INCOMING_CHECK_PERIOD = 60 - class IMAPAuthRealm(object): """ @@ -132,21 +127,16 @@ def run_service(*args, **kwargs): """ Main entry point to run the service from the client. - :returns: the LoopingCall instance that will have to be stoppped - before shutting down the client, the port as returned by - the reactor when starts listening, and the factory for - the protocol. + :returns: the port as returned by the reactor when starts listening, and + the factory for the protocol. """ leap_assert(len(args) == 2) - soledad, keymanager = args + soledad = args leap_assert_type(soledad, Soledad) - leap_assert_type(keymanager, KeyManager) port = kwargs.get('port', IMAP_PORT) - check_period = kwargs.get('check_period', INCOMING_CHECK_PERIOD) userid = kwargs.get('userid', None) leap_check(userid is not None, "need an user id") - offline = kwargs.get('offline', False) uuid = soledad.uuid factory = LeapIMAPFactory(uuid, userid, soledad) @@ -154,16 +144,6 @@ def run_service(*args, **kwargs): try: tport = reactor.listenTCP(port, factory, interface="localhost") - if not offline: - # FIXME --- update after meskio's work - fetcher = LeapIncomingMail( - keymanager, - soledad, - factory.theAccount, - check_period, - userid) - else: - fetcher = None except CannotListenError: logger.error("IMAP Service failed to start: " "cannot listen in port %s" % (port,)) @@ -186,7 +166,7 @@ def run_service(*args, **kwargs): leap_events.signal(IMAP_SERVICE_STARTED, str(port)) # FIXME -- change service signature - return fetcher, tport, factory + return tport, factory # not ok, signal error. leap_events.signal(IMAP_SERVICE_FAILED_TO_START, str(port)) diff --git a/mail/src/leap/mail/incoming/__init__.py b/mail/src/leap/mail/incoming/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/mail/src/leap/mail/incoming/__init__.py diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/incoming/service.py index dbc726a..e52c727 100644 --- a/mail/src/leap/mail/imap/fetch.py +++ b/mail/src/leap/mail/incoming/service.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# fetch.py -# Copyright (C) 2013 LEAP +# service.py +# Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -31,6 +31,7 @@ from email.utils import parseaddr from StringIO import StringIO from urlparse import urlparse +from twisted.application.service import Service from twisted.python import log from twisted.internet import defer, reactor from twisted.internet.task import LoopingCall @@ -49,8 +50,8 @@ from leap.common.events.events_pb2 import SOLEDAD_INVALID_AUTH_TOKEN from leap.common.mail import get_email_charset from leap.keymanager import errors as keymanager_errors from leap.keymanager.openpgp import OpenPGPKey +from leap.mail.adaptors import soledad_indexes as fields from leap.mail.decorators import deferred_to_thread -from leap.mail.imap.fields import fields from leap.mail.utils import json_loads, empty, first from leap.soledad.client import Soledad from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY @@ -64,6 +65,10 @@ MULTIPART_SIGNED = "multipart/signed" PGP_BEGIN = "-----BEGIN PGP MESSAGE-----" PGP_END = "-----END PGP MESSAGE-----" +# The period between succesive checks of the incoming mail +# queue (in seconds) +INCOMING_CHECK_PERIOD = 60 + class MalformedMessage(Exception): """ @@ -72,12 +77,13 @@ class MalformedMessage(Exception): pass -class LeapIncomingMail(object): +class IncomingMail(Service): """ Fetches and process mail from the incoming pool. - This object has public methods start_loop and stop that will - actually initiate a LoopingCall with check_period recurrency. + This object implements IService interface, has public methods + startService and stopService that will actually initiate a + LoopingCall with check_period recurrency. The LoopingCall itself will invoke the fetch method each time that the check_period expires. @@ -85,6 +91,8 @@ class LeapIncomingMail(object): process all the documents found tagged as incoming mail. """ + name = "IncomingMail" + RECENT_FLAG = "\\Recent" CONTENT_KEY = "content" @@ -100,11 +108,11 @@ class LeapIncomingMail(object): fetching_lock = threading.Lock() - def __init__(self, keymanager, soledad, imap_account, - check_period, userid): + def __init__(self, keymanager, soledad, inbox, userid, + check_period=INCOMING_CHECK_PERIOD): """ - Initialize LeapIncomingMail.. + Initialize IncomingMail.. :param keymanager: a keymanager instance :type keymanager: keymanager.KeyManager @@ -112,8 +120,8 @@ class LeapIncomingMail(object): :param soledad: a soledad instance :type soledad: Soledad - :param imap_account: the account to fetch periodically - :type imap_account: SoledadBackedAccount + :param inbox: the inbox where the new emails will be stored + :type inbox: IMAPMailbox :param check_period: the period to fetch new mail, in seconds. :type check_period: int @@ -127,8 +135,7 @@ class LeapIncomingMail(object): self._keymanager = keymanager self._soledad = soledad - self.imapAccount = imap_account - self._inbox = self.imapAccount.getMailbox('inbox') + self._inbox = inbox self._userid = userid self._loop = None @@ -148,21 +155,22 @@ class LeapIncomingMail(object): Calls a deferred that will execute the fetch callback in a separate thread """ - def syncSoledadCallback(result): - # FIXME this needs a matching change in mx!!! - # --> need to add ERROR_DECRYPTING_KEY = False - # as default. - try: - doclist = self._soledad.get_from_index( - fields.JUST_MAIL_IDX, "*", "0") - except u1db_errors.InvalidGlobbing: + def mail_compat(failure): + if failure.check(u1db_errors.InvalidGlobbing): # It looks like we are a dealing with an outdated # mx. Fallback to the version of the index warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!", DeprecationWarning) - doclist = self._soledad.get_from_index( + return self._soledad.get_from_index( fields.JUST_MAIL_COMPAT_IDX, "*") - return self._process_doclist(doclist) + return failure + + def syncSoledadCallback(_): + d = self._soledad.get_from_index( + fields.JUST_MAIL_IDX, "*", "0") + d.addErrback(mail_compat) + d.addCallback(self._process_doclist) + return d logger.debug("fetching mail for: %s %s" % ( self._soledad.uuid, self._userid)) @@ -175,24 +183,25 @@ class LeapIncomingMail(object): else: logger.debug("Already fetching mail.") - def start_loop(self): + def startService(self): """ Starts a loop to fetch mail. """ + Service.startService(self) if self._loop is None: self._loop = LoopingCall(self.fetch) self._loop.start(self._check_period) else: logger.warning("Tried to start an already running fetching loop.") - def stop(self): - # XXX change the name to stop_loop, for consistency. + def stopService(self): """ Stops the loop that fetches mail. """ if self._loop and self._loop.running is True: self._loop.stop() self._loop = None + Service.stopService(self) # # Private methods. @@ -296,7 +305,7 @@ class LeapIncomingMail(object): # operations on individual messages # - @deferred_to_thread + #FIXME: @deferred_to_thread def _decrypt_doc(self, doc): """ Decrypt the contents of a document. @@ -319,15 +328,14 @@ class LeapIncomingMail(object): success = False leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0") - - data = self._process_decrypted_doc(doc, decrdata) - return doc, data + return self._process_decrypted_doc(doc, decrdata) d = self._keymanager.decrypt( doc.content[ENC_JSON_KEY], self._userid, OpenPGPKey) d.addErrback(self._errback) d.addCallback(process_decrypted) + d.addCallback(lambda data: (doc, data)) return d def _process_decrypted_doc(self, doc, data): @@ -340,8 +348,9 @@ class LeapIncomingMail(object): message :type data: str - :return: the processed data. - :rtype: str + :return: a Deferred that will be fired with an str of the proccessed + data. + :rtype: Deferred """ log.msg('processing decrypted doc') @@ -409,8 +418,10 @@ class LeapIncomingMail(object): :param data: the text to be decrypted. :type data: str - :return: data, possibly decrypted. - :rtype: str + + :return: a Deferred that will be fired with an str of data, possibly + decrypted. + :rtype: Deferred """ leap_assert_type(data, str) log.msg('maybe decrypting doc') @@ -426,7 +437,8 @@ class LeapIncomingMail(object): or msg.get_content_type() == MULTIPART_SIGNED)): senderAddress = parseaddr(fromHeader) - def add_leap_header(decrmsg, signkey): + def add_leap_header(ret): + decrmsg, signkey = ret if (senderAddress is None or isinstance(signkey, keymanager_errors.KeyNotFound)): decrmsg.add_header( @@ -596,11 +608,11 @@ class LeapIncomingMail(object): _, fromAddress = parseaddr(msg['from']) header = msg.get(OpenPGP_HEADER, None) - dh = defer.success() + dh = defer.succeed(None) if header is not None: dh = self._extract_openpgp_header(header, fromAddress) - da = defer.success() + da = defer.succeed(None) if msg.is_multipart(): da = self._extract_attached_key(msg.get_payload(), fromAddress) @@ -620,7 +632,7 @@ class LeapIncomingMail(object): :return: A Deferred that will be fired when header extraction is done :rtype: Deferred """ - d = defer.success() + d = defer.succeed(None) fields = dict([f.strip(' ').split('=') for f in header.split(';')]) if 'url' in fields: url = shlex.split(fields['url'])[0] # remove quotations @@ -704,8 +716,7 @@ class LeapIncomingMail(object): deferLater(reactor, 0, self._delete_incoming_message, doc) leap_events.signal(IMAP_MSG_DELETED_INCOMING) - d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,), - notify_on_disk=True) + d = self._inbox.addMessage(data, (self.RECENT_FLAG,)) d.addCallbacks(msgSavedCallback, self._errback) return d diff --git a/mail/src/leap/mail/incoming/tests/__init__.py b/mail/src/leap/mail/incoming/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/mail/src/leap/mail/incoming/tests/__init__.py diff --git a/mail/src/leap/mail/imap/tests/test_incoming_mail.py b/mail/src/leap/mail/incoming/tests/test_incoming_mail.py index 03c0164..bf95b1d 100644 --- a/mail/src/leap/mail/imap/tests/test_incoming_mail.py +++ b/mail/src/leap/mail/incoming/tests/test_incoming_mail.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# test_imap.py -# Copyright (C) 2014 LEAP +# test_incoming_mail.py +# Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ -Test case for leap.email.imap.fetch +Test case for leap.mail.incoming.service @authors: Ruben Pollan, <meskio@sindominio.net> @@ -28,13 +28,13 @@ from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.parser import Parser from mock import Mock +from twisted.internet import defer from leap.keymanager.openpgp import OpenPGPKey -from leap.mail.imap.account import SoledadBackedAccount -from leap.mail.imap.fetch import LeapIncomingMail -from leap.mail.imap.fields import fields -from leap.mail.imap.memorystore import MemoryStore -from leap.mail.imap.service.imap import INCOMING_CHECK_PERIOD +from leap.mail.adaptors import soledad_indexes as fields +from leap.mail.constants import INBOX_NAME +from leap.mail.imap.account import IMAPAccount +from leap.mail.incoming.service import IncomingMail from leap.mail.tests import ( TestCaseWithKeyManager, ADDRESS, @@ -47,7 +47,7 @@ from leap.soledad.common.crypto import ( ) -class LeapIncomingMailTestCase(TestCaseWithKeyManager): +class IncomingMailTestCase(TestCaseWithKeyManager): """ Tests for the incoming mail parser """ @@ -71,28 +71,31 @@ subject: independence of cyberspace } def setUp(self): - super(LeapIncomingMailTestCase, self).setUp() - - # Soledad sync makes trial block forever. The sync it's mocked to fix - # this problem. _mock_soledad_get_from_index can be used from the tests - # to provide documents. - self._soledad.sync = Mock() - - memstore = MemoryStore() - theAccount = SoledadBackedAccount( - ADDRESS, - soledad=self._soledad, - memstore=memstore) - self.fetcher = LeapIncomingMail( - self._km, - self._soledad, - theAccount, - INCOMING_CHECK_PERIOD, - ADDRESS) + def getInbox(_): + theAccount = IMAPAccount(ADDRESS, self._soledad) + return theAccount.callWhenReady( + lambda _: theAccount.getMailbox(INBOX_NAME)) + + def setUpFetcher(inbox): + # Soledad sync makes trial block forever. The sync it's mocked to + # fix this problem. _mock_soledad_get_from_index can be used from + # the tests to provide documents. + self._soledad.sync = Mock() + + self.fetcher = IncomingMail( + self._km, + self._soledad, + inbox, + ADDRESS) + + d = super(IncomingMailTestCase, self).setUp() + d.addCallback(getInbox) + d.addCallback(setUpFetcher) + return d def tearDown(self): del self.fetcher - super(LeapIncomingMailTestCase, self).tearDown() + return super(IncomingMailTestCase, self).tearDown() def testExtractOpenPGPHeader(self): """ @@ -103,15 +106,18 @@ subject: independence of cyberspace message = Parser().parsestr(self.EMAIL) message.add_header("OpenPGP", OpenPGP) - email = self._create_incoming_email(message.as_string()) - self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]) - self.fetcher._keymanager.fetch_key = Mock() + self.fetcher._keymanager.fetch_key = Mock( + return_value=defer.succeed(None)) def fetch_key_called(ret): self.fetcher._keymanager.fetch_key.assert_called_once_with( self.FROM_ADDRESS, KEYURL, OpenPGPKey) - d = self.fetcher.fetch() + d = self._create_incoming_email(message.as_string()) + d.addCallback( + lambda email: + self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) + d.addCallback(lambda _: self.fetcher.fetch()) d.addCallback(fetch_key_called) return d @@ -124,14 +130,16 @@ subject: independence of cyberspace message = Parser().parsestr(self.EMAIL) message.add_header("OpenPGP", OpenPGP) - email = self._create_incoming_email(message.as_string()) - self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]) self.fetcher._keymanager.fetch_key = Mock() def fetch_key_called(ret): self.assertFalse(self.fetcher._keymanager.fetch_key.called) - d = self.fetcher.fetch() + d = self._create_incoming_email(message.as_string()) + d.addCallback( + lambda email: + self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) + d.addCallback(lambda _: self.fetcher.fetch()) d.addCallback(fetch_key_called) return d @@ -146,12 +154,14 @@ subject: independence of cyberspace key = MIMEApplication("", "pgp-keys") key.set_payload(KEY) message.attach(key) + self.fetcher._keymanager.put_raw_key = Mock( + return_value=defer.succeed(None)) - def put_raw_key_called(ret): + def put_raw_key_called(_): self.fetcher._keymanager.put_raw_key.assert_called_once_with( KEY, OpenPGPKey, address=self.FROM_ADDRESS) - d = self.mock_fetch(message.as_string()) + d = self._mock_fetch(message.as_string()) d.addCallback(put_raw_key_called) return d @@ -170,16 +180,15 @@ subject: independence of cyberspace {"incoming": True, "content": email_str}, ensure_ascii=False) - def set_email_content(pubkey): + def set_email_content(encr_data): email.content = { fields.INCOMING_KEY: True, fields.ERROR_DECRYPTING_KEY: False, ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY, - ENC_JSON_KEY: str(self._km.encrypt(data, pubkey)) + ENC_JSON_KEY: encr_data } return email - - d = self._km.get_key(ADDRESS, OpenPGPKey) + d = self._km.encrypt(data, ADDRESS, OpenPGPKey, fetch_remote=False) d.addCallback(set_email_content) return d @@ -188,6 +197,6 @@ subject: independence of cyberspace def soledad_mock(idx_name, *key_values): if index_name == idx_name: - return value + return defer.succeed(value) return get_from_index(idx_name, *key_values) self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock) diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py index 8137f97..cb37d25 100644 --- a/mail/src/leap/mail/mail.py +++ b/mail/src/leap/mail/mail.py @@ -414,15 +414,10 @@ class MessageCollection(object): # Manipulate messages - def add_msg(self, raw_msg, flags=None, tags=None, date=None): + def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date=""): """ Add a message to this collection. """ - if not flags: - flags = tuple() - if not tags: - tags = tuple() - leap_assert_type(flags, tuple) leap_assert_type(date, str) @@ -582,7 +577,6 @@ class Account(object): self.mbox_indexer = MailboxIndexer(self.store) self.deferred_initialization = defer.Deferred() - self._initialized = False self._ready_cb = ready_cb self._init_d = self._initialize_storage() @@ -594,7 +588,6 @@ class Account(object): return self.add_mailbox(INBOX_NAME) def finish_initialization(result): - self._initialized = True self.deferred_initialization.callback(None) if self._ready_cb is not None: self._ready_cb() @@ -606,12 +599,8 @@ class Account(object): return d def callWhenReady(self, cb, *args, **kw): - if self._initialized: - cb(self, *args, **kw) - return defer.succeed(None) - else: - self.deferred_initialization.addCallback(cb, *args, **kw) - return self.deferred_initialization + self.deferred_initialization.addCallback(cb, *args, **kw) + return self.deferred_initialization # # Public API Starts |