summaryrefslogtreecommitdiff
path: root/mail/src
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-01-14 13:50:02 -0600
committerKali Kaneko <kali@leap.se>2015-02-11 14:05:43 -0400
commitb481d239101e9bd090ffcca547294cf30738d028 (patch)
treeb8c85184db6b711a3eb583ae7565c3fdcb72f5d5 /mail/src
parenta794d60f9985f5dc7507ada1d5dab65e9fe6874e (diff)
Refactor fetch into leap.mail.incoming IService
Diffstat (limited to 'mail/src')
-rw-r--r--mail/src/leap/mail/imap/account.py2
-rw-r--r--mail/src/leap/mail/imap/service/imap.py30
-rw-r--r--mail/src/leap/mail/incoming/__init__.py0
-rw-r--r--mail/src/leap/mail/incoming/service.py (renamed from mail/src/leap/mail/imap/fetch.py)91
-rw-r--r--mail/src/leap/mail/incoming/tests/__init__.py0
-rw-r--r--mail/src/leap/mail/incoming/tests/test_incoming_mail.py (renamed from mail/src/leap/mail/imap/tests/test_incoming_mail.py)93
-rw-r--r--mail/src/leap/mail/mail.py17
7 files changed, 111 insertions, 122 deletions
diff --git a/mail/src/leap/mail/imap/account.py b/mail/src/leap/mail/imap/account.py
index 8a6e87eb..146d0663 100644
--- a/mail/src/leap/mail/imap/account.py
+++ b/mail/src/leap/mail/imap/account.py
@@ -63,7 +63,7 @@ class IMAPAccount(object):
selected = None
closed = False
- def __init__(self, user_id, store, d=None):
+ def __init__(self, user_id, store, d=defer.Deferred()):
"""
Keeps track of the mailboxes and subscriptions handled by this account.
diff --git a/mail/src/leap/mail/imap/service/imap.py b/mail/src/leap/mail/imap/service/imap.py
index 5d88a79d..93e4d62a 100644
--- a/mail/src/leap/mail/imap/service/imap.py
+++ b/mail/src/leap/mail/imap/service/imap.py
@@ -30,10 +30,9 @@ logger = logging.getLogger(__name__)
from leap.common import events as leap_events
from leap.common.check import leap_assert, leap_assert_type, leap_check
-from leap.keymanager import KeyManager
from leap.mail.imap.account import IMAPAccount
-from leap.mail.imap.fetch import LeapIncomingMail
from leap.mail.imap.server import LEAPIMAPServer
+from leap.mail.incoming import IncomingMail
from leap.soledad.client import Soledad
from leap.common.events.events_pb2 import IMAP_SERVICE_STARTED
@@ -55,10 +54,6 @@ if DO_PROFILE:
# The default port in which imap service will run
IMAP_PORT = 1984
-# The period between succesive checks of the incoming mail
-# queue (in seconds)
-INCOMING_CHECK_PERIOD = 60
-
class IMAPAuthRealm(object):
"""
@@ -132,21 +127,16 @@ def run_service(*args, **kwargs):
"""
Main entry point to run the service from the client.
- :returns: the LoopingCall instance that will have to be stoppped
- before shutting down the client, the port as returned by
- the reactor when starts listening, and the factory for
- the protocol.
+ :returns: the port as returned by the reactor when starts listening, and
+ the factory for the protocol.
"""
leap_assert(len(args) == 2)
- soledad, keymanager = args
+ soledad = args
leap_assert_type(soledad, Soledad)
- leap_assert_type(keymanager, KeyManager)
port = kwargs.get('port', IMAP_PORT)
- check_period = kwargs.get('check_period', INCOMING_CHECK_PERIOD)
userid = kwargs.get('userid', None)
leap_check(userid is not None, "need an user id")
- offline = kwargs.get('offline', False)
uuid = soledad.uuid
factory = LeapIMAPFactory(uuid, userid, soledad)
@@ -154,16 +144,6 @@ def run_service(*args, **kwargs):
try:
tport = reactor.listenTCP(port, factory,
interface="localhost")
- if not offline:
- # FIXME --- update after meskio's work
- fetcher = LeapIncomingMail(
- keymanager,
- soledad,
- factory.theAccount,
- check_period,
- userid)
- else:
- fetcher = None
except CannotListenError:
logger.error("IMAP Service failed to start: "
"cannot listen in port %s" % (port,))
@@ -186,7 +166,7 @@ def run_service(*args, **kwargs):
leap_events.signal(IMAP_SERVICE_STARTED, str(port))
# FIXME -- change service signature
- return fetcher, tport, factory
+ return tport, factory
# not ok, signal error.
leap_events.signal(IMAP_SERVICE_FAILED_TO_START, str(port))
diff --git a/mail/src/leap/mail/incoming/__init__.py b/mail/src/leap/mail/incoming/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/mail/src/leap/mail/incoming/__init__.py
diff --git a/mail/src/leap/mail/imap/fetch.py b/mail/src/leap/mail/incoming/service.py
index dbc726ac..e52c7272 100644
--- a/mail/src/leap/mail/imap/fetch.py
+++ b/mail/src/leap/mail/incoming/service.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# fetch.py
-# Copyright (C) 2013 LEAP
+# service.py
+# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -31,6 +31,7 @@ from email.utils import parseaddr
from StringIO import StringIO
from urlparse import urlparse
+from twisted.application.service import Service
from twisted.python import log
from twisted.internet import defer, reactor
from twisted.internet.task import LoopingCall
@@ -49,8 +50,8 @@ from leap.common.events.events_pb2 import SOLEDAD_INVALID_AUTH_TOKEN
from leap.common.mail import get_email_charset
from leap.keymanager import errors as keymanager_errors
from leap.keymanager.openpgp import OpenPGPKey
+from leap.mail.adaptors import soledad_indexes as fields
from leap.mail.decorators import deferred_to_thread
-from leap.mail.imap.fields import fields
from leap.mail.utils import json_loads, empty, first
from leap.soledad.client import Soledad
from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
@@ -64,6 +65,10 @@ MULTIPART_SIGNED = "multipart/signed"
PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
PGP_END = "-----END PGP MESSAGE-----"
+# The period between succesive checks of the incoming mail
+# queue (in seconds)
+INCOMING_CHECK_PERIOD = 60
+
class MalformedMessage(Exception):
"""
@@ -72,12 +77,13 @@ class MalformedMessage(Exception):
pass
-class LeapIncomingMail(object):
+class IncomingMail(Service):
"""
Fetches and process mail from the incoming pool.
- This object has public methods start_loop and stop that will
- actually initiate a LoopingCall with check_period recurrency.
+ This object implements IService interface, has public methods
+ startService and stopService that will actually initiate a
+ LoopingCall with check_period recurrency.
The LoopingCall itself will invoke the fetch method each time
that the check_period expires.
@@ -85,6 +91,8 @@ class LeapIncomingMail(object):
process all the documents found tagged as incoming mail.
"""
+ name = "IncomingMail"
+
RECENT_FLAG = "\\Recent"
CONTENT_KEY = "content"
@@ -100,11 +108,11 @@ class LeapIncomingMail(object):
fetching_lock = threading.Lock()
- def __init__(self, keymanager, soledad, imap_account,
- check_period, userid):
+ def __init__(self, keymanager, soledad, inbox, userid,
+ check_period=INCOMING_CHECK_PERIOD):
"""
- Initialize LeapIncomingMail..
+ Initialize IncomingMail..
:param keymanager: a keymanager instance
:type keymanager: keymanager.KeyManager
@@ -112,8 +120,8 @@ class LeapIncomingMail(object):
:param soledad: a soledad instance
:type soledad: Soledad
- :param imap_account: the account to fetch periodically
- :type imap_account: SoledadBackedAccount
+ :param inbox: the inbox where the new emails will be stored
+ :type inbox: IMAPMailbox
:param check_period: the period to fetch new mail, in seconds.
:type check_period: int
@@ -127,8 +135,7 @@ class LeapIncomingMail(object):
self._keymanager = keymanager
self._soledad = soledad
- self.imapAccount = imap_account
- self._inbox = self.imapAccount.getMailbox('inbox')
+ self._inbox = inbox
self._userid = userid
self._loop = None
@@ -148,21 +155,22 @@ class LeapIncomingMail(object):
Calls a deferred that will execute the fetch callback
in a separate thread
"""
- def syncSoledadCallback(result):
- # FIXME this needs a matching change in mx!!!
- # --> need to add ERROR_DECRYPTING_KEY = False
- # as default.
- try:
- doclist = self._soledad.get_from_index(
- fields.JUST_MAIL_IDX, "*", "0")
- except u1db_errors.InvalidGlobbing:
+ def mail_compat(failure):
+ if failure.check(u1db_errors.InvalidGlobbing):
# It looks like we are a dealing with an outdated
# mx. Fallback to the version of the index
warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",
DeprecationWarning)
- doclist = self._soledad.get_from_index(
+ return self._soledad.get_from_index(
fields.JUST_MAIL_COMPAT_IDX, "*")
- return self._process_doclist(doclist)
+ return failure
+
+ def syncSoledadCallback(_):
+ d = self._soledad.get_from_index(
+ fields.JUST_MAIL_IDX, "*", "0")
+ d.addErrback(mail_compat)
+ d.addCallback(self._process_doclist)
+ return d
logger.debug("fetching mail for: %s %s" % (
self._soledad.uuid, self._userid))
@@ -175,24 +183,25 @@ class LeapIncomingMail(object):
else:
logger.debug("Already fetching mail.")
- def start_loop(self):
+ def startService(self):
"""
Starts a loop to fetch mail.
"""
+ Service.startService(self)
if self._loop is None:
self._loop = LoopingCall(self.fetch)
self._loop.start(self._check_period)
else:
logger.warning("Tried to start an already running fetching loop.")
- def stop(self):
- # XXX change the name to stop_loop, for consistency.
+ def stopService(self):
"""
Stops the loop that fetches mail.
"""
if self._loop and self._loop.running is True:
self._loop.stop()
self._loop = None
+ Service.stopService(self)
#
# Private methods.
@@ -296,7 +305,7 @@ class LeapIncomingMail(object):
# operations on individual messages
#
- @deferred_to_thread
+ #FIXME: @deferred_to_thread
def _decrypt_doc(self, doc):
"""
Decrypt the contents of a document.
@@ -319,15 +328,14 @@ class LeapIncomingMail(object):
success = False
leap_events.signal(IMAP_MSG_DECRYPTED, "1" if success else "0")
-
- data = self._process_decrypted_doc(doc, decrdata)
- return doc, data
+ return self._process_decrypted_doc(doc, decrdata)
d = self._keymanager.decrypt(
doc.content[ENC_JSON_KEY],
self._userid, OpenPGPKey)
d.addErrback(self._errback)
d.addCallback(process_decrypted)
+ d.addCallback(lambda data: (doc, data))
return d
def _process_decrypted_doc(self, doc, data):
@@ -340,8 +348,9 @@ class LeapIncomingMail(object):
message
:type data: str
- :return: the processed data.
- :rtype: str
+ :return: a Deferred that will be fired with an str of the proccessed
+ data.
+ :rtype: Deferred
"""
log.msg('processing decrypted doc')
@@ -409,8 +418,10 @@ class LeapIncomingMail(object):
:param data: the text to be decrypted.
:type data: str
- :return: data, possibly decrypted.
- :rtype: str
+
+ :return: a Deferred that will be fired with an str of data, possibly
+ decrypted.
+ :rtype: Deferred
"""
leap_assert_type(data, str)
log.msg('maybe decrypting doc')
@@ -426,7 +437,8 @@ class LeapIncomingMail(object):
or msg.get_content_type() == MULTIPART_SIGNED)):
senderAddress = parseaddr(fromHeader)
- def add_leap_header(decrmsg, signkey):
+ def add_leap_header(ret):
+ decrmsg, signkey = ret
if (senderAddress is None or
isinstance(signkey, keymanager_errors.KeyNotFound)):
decrmsg.add_header(
@@ -596,11 +608,11 @@ class LeapIncomingMail(object):
_, fromAddress = parseaddr(msg['from'])
header = msg.get(OpenPGP_HEADER, None)
- dh = defer.success()
+ dh = defer.succeed(None)
if header is not None:
dh = self._extract_openpgp_header(header, fromAddress)
- da = defer.success()
+ da = defer.succeed(None)
if msg.is_multipart():
da = self._extract_attached_key(msg.get_payload(), fromAddress)
@@ -620,7 +632,7 @@ class LeapIncomingMail(object):
:return: A Deferred that will be fired when header extraction is done
:rtype: Deferred
"""
- d = defer.success()
+ d = defer.succeed(None)
fields = dict([f.strip(' ').split('=') for f in header.split(';')])
if 'url' in fields:
url = shlex.split(fields['url'])[0] # remove quotations
@@ -704,8 +716,7 @@ class LeapIncomingMail(object):
deferLater(reactor, 0, self._delete_incoming_message, doc)
leap_events.signal(IMAP_MSG_DELETED_INCOMING)
- d = self._inbox.addMessage(data, flags=(self.RECENT_FLAG,),
- notify_on_disk=True)
+ d = self._inbox.addMessage(data, (self.RECENT_FLAG,))
d.addCallbacks(msgSavedCallback, self._errback)
return d
diff --git a/mail/src/leap/mail/incoming/tests/__init__.py b/mail/src/leap/mail/incoming/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/mail/src/leap/mail/incoming/tests/__init__.py
diff --git a/mail/src/leap/mail/imap/tests/test_incoming_mail.py b/mail/src/leap/mail/incoming/tests/test_incoming_mail.py
index 03c0164c..bf95b1d1 100644
--- a/mail/src/leap/mail/imap/tests/test_incoming_mail.py
+++ b/mail/src/leap/mail/incoming/tests/test_incoming_mail.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# test_imap.py
-# Copyright (C) 2014 LEAP
+# test_incoming_mail.py
+# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-Test case for leap.email.imap.fetch
+Test case for leap.mail.incoming.service
@authors: Ruben Pollan, <meskio@sindominio.net>
@@ -28,13 +28,13 @@ from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.parser import Parser
from mock import Mock
+from twisted.internet import defer
from leap.keymanager.openpgp import OpenPGPKey
-from leap.mail.imap.account import SoledadBackedAccount
-from leap.mail.imap.fetch import LeapIncomingMail
-from leap.mail.imap.fields import fields
-from leap.mail.imap.memorystore import MemoryStore
-from leap.mail.imap.service.imap import INCOMING_CHECK_PERIOD
+from leap.mail.adaptors import soledad_indexes as fields
+from leap.mail.constants import INBOX_NAME
+from leap.mail.imap.account import IMAPAccount
+from leap.mail.incoming.service import IncomingMail
from leap.mail.tests import (
TestCaseWithKeyManager,
ADDRESS,
@@ -47,7 +47,7 @@ from leap.soledad.common.crypto import (
)
-class LeapIncomingMailTestCase(TestCaseWithKeyManager):
+class IncomingMailTestCase(TestCaseWithKeyManager):
"""
Tests for the incoming mail parser
"""
@@ -71,28 +71,31 @@ subject: independence of cyberspace
}
def setUp(self):
- super(LeapIncomingMailTestCase, self).setUp()
-
- # Soledad sync makes trial block forever. The sync it's mocked to fix
- # this problem. _mock_soledad_get_from_index can be used from the tests
- # to provide documents.
- self._soledad.sync = Mock()
-
- memstore = MemoryStore()
- theAccount = SoledadBackedAccount(
- ADDRESS,
- soledad=self._soledad,
- memstore=memstore)
- self.fetcher = LeapIncomingMail(
- self._km,
- self._soledad,
- theAccount,
- INCOMING_CHECK_PERIOD,
- ADDRESS)
+ def getInbox(_):
+ theAccount = IMAPAccount(ADDRESS, self._soledad)
+ return theAccount.callWhenReady(
+ lambda _: theAccount.getMailbox(INBOX_NAME))
+
+ def setUpFetcher(inbox):
+ # Soledad sync makes trial block forever. The sync it's mocked to
+ # fix this problem. _mock_soledad_get_from_index can be used from
+ # the tests to provide documents.
+ self._soledad.sync = Mock()
+
+ self.fetcher = IncomingMail(
+ self._km,
+ self._soledad,
+ inbox,
+ ADDRESS)
+
+ d = super(IncomingMailTestCase, self).setUp()
+ d.addCallback(getInbox)
+ d.addCallback(setUpFetcher)
+ return d
def tearDown(self):
del self.fetcher
- super(LeapIncomingMailTestCase, self).tearDown()
+ return super(IncomingMailTestCase, self).tearDown()
def testExtractOpenPGPHeader(self):
"""
@@ -103,15 +106,18 @@ subject: independence of cyberspace
message = Parser().parsestr(self.EMAIL)
message.add_header("OpenPGP", OpenPGP)
- email = self._create_incoming_email(message.as_string())
- self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])
- self.fetcher._keymanager.fetch_key = Mock()
+ self.fetcher._keymanager.fetch_key = Mock(
+ return_value=defer.succeed(None))
def fetch_key_called(ret):
self.fetcher._keymanager.fetch_key.assert_called_once_with(
self.FROM_ADDRESS, KEYURL, OpenPGPKey)
- d = self.fetcher.fetch()
+ d = self._create_incoming_email(message.as_string())
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
d.addCallback(fetch_key_called)
return d
@@ -124,14 +130,16 @@ subject: independence of cyberspace
message = Parser().parsestr(self.EMAIL)
message.add_header("OpenPGP", OpenPGP)
- email = self._create_incoming_email(message.as_string())
- self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])
self.fetcher._keymanager.fetch_key = Mock()
def fetch_key_called(ret):
self.assertFalse(self.fetcher._keymanager.fetch_key.called)
- d = self.fetcher.fetch()
+ d = self._create_incoming_email(message.as_string())
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
d.addCallback(fetch_key_called)
return d
@@ -146,12 +154,14 @@ subject: independence of cyberspace
key = MIMEApplication("", "pgp-keys")
key.set_payload(KEY)
message.attach(key)
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.succeed(None))
- def put_raw_key_called(ret):
+ def put_raw_key_called(_):
self.fetcher._keymanager.put_raw_key.assert_called_once_with(
KEY, OpenPGPKey, address=self.FROM_ADDRESS)
- d = self.mock_fetch(message.as_string())
+ d = self._mock_fetch(message.as_string())
d.addCallback(put_raw_key_called)
return d
@@ -170,16 +180,15 @@ subject: independence of cyberspace
{"incoming": True, "content": email_str},
ensure_ascii=False)
- def set_email_content(pubkey):
+ def set_email_content(encr_data):
email.content = {
fields.INCOMING_KEY: True,
fields.ERROR_DECRYPTING_KEY: False,
ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
- ENC_JSON_KEY: str(self._km.encrypt(data, pubkey))
+ ENC_JSON_KEY: encr_data
}
return email
-
- d = self._km.get_key(ADDRESS, OpenPGPKey)
+ d = self._km.encrypt(data, ADDRESS, OpenPGPKey, fetch_remote=False)
d.addCallback(set_email_content)
return d
@@ -188,6 +197,6 @@ subject: independence of cyberspace
def soledad_mock(idx_name, *key_values):
if index_name == idx_name:
- return value
+ return defer.succeed(value)
return get_from_index(idx_name, *key_values)
self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock)
diff --git a/mail/src/leap/mail/mail.py b/mail/src/leap/mail/mail.py
index 8137f972..cb37d255 100644
--- a/mail/src/leap/mail/mail.py
+++ b/mail/src/leap/mail/mail.py
@@ -414,15 +414,10 @@ class MessageCollection(object):
# Manipulate messages
- def add_msg(self, raw_msg, flags=None, tags=None, date=None):
+ def add_msg(self, raw_msg, flags=tuple(), tags=tuple(), date=""):
"""
Add a message to this collection.
"""
- if not flags:
- flags = tuple()
- if not tags:
- tags = tuple()
-
leap_assert_type(flags, tuple)
leap_assert_type(date, str)
@@ -582,7 +577,6 @@ class Account(object):
self.mbox_indexer = MailboxIndexer(self.store)
self.deferred_initialization = defer.Deferred()
- self._initialized = False
self._ready_cb = ready_cb
self._init_d = self._initialize_storage()
@@ -594,7 +588,6 @@ class Account(object):
return self.add_mailbox(INBOX_NAME)
def finish_initialization(result):
- self._initialized = True
self.deferred_initialization.callback(None)
if self._ready_cb is not None:
self._ready_cb()
@@ -606,12 +599,8 @@ class Account(object):
return d
def callWhenReady(self, cb, *args, **kw):
- if self._initialized:
- cb(self, *args, **kw)
- return defer.succeed(None)
- else:
- self.deferred_initialization.addCallback(cb, *args, **kw)
- return self.deferred_initialization
+ self.deferred_initialization.addCallback(cb, *args, **kw)
+ return self.deferred_initialization
#
# Public API Starts