summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/incoming
diff options
context:
space:
mode:
authorKali Kaneko (leap communications) <kali@leap.se>2016-08-29 23:10:17 -0400
committerKali Kaneko (leap communications) <kali@leap.se>2016-08-29 23:11:41 -0400
commit5a3a2012bb8982ad0884ed659e61e969345e6fde (patch)
treefc2310d8d3244987bf5a1d2632cab99a60ba93f1 /src/leap/bitmask/mail/incoming
parent43df4205af42fce5d097f70bb0345b69e9d16f1c (diff)
[pkg] move mail source to leap.bitmask.mail
Diffstat (limited to 'src/leap/bitmask/mail/incoming')
-rw-r--r--src/leap/bitmask/mail/incoming/__init__.py0
-rw-r--r--src/leap/bitmask/mail/incoming/service.py844
-rw-r--r--src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message61
-rw-r--r--src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py391
4 files changed, 1296 insertions, 0 deletions
diff --git a/src/leap/bitmask/mail/incoming/__init__.py b/src/leap/bitmask/mail/incoming/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/bitmask/mail/incoming/__init__.py
diff --git a/src/leap/bitmask/mail/incoming/service.py b/src/leap/bitmask/mail/incoming/service.py
new file mode 100644
index 00000000..1e20862b
--- /dev/null
+++ b/src/leap/bitmask/mail/incoming/service.py
@@ -0,0 +1,844 @@
+# -*- coding: utf-8 -*-
+# service.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Incoming mail fetcher.
+"""
+import copy
+import logging
+import shlex
+import time
+import warnings
+
+from email.parser import Parser
+from email.utils import parseaddr
+from email.utils import formatdate
+from StringIO import StringIO
+from urlparse import urlparse
+
+from twisted.application.service import Service
+from twisted.python import log
+from twisted.python.failure import Failure
+from twisted.internet import defer, reactor
+from twisted.internet.task import LoopingCall
+from twisted.internet.task import deferLater
+
+from leap.common.events import emit_async, catalog
+from leap.common.check import leap_assert, leap_assert_type
+from leap.common.mail import get_email_charset
+from leap.keymanager import errors as keymanager_errors
+from leap.mail.adaptors import soledad_indexes as fields
+from leap.mail.generator import Generator
+from leap.mail.utils import json_loads, empty
+from leap.soledad.client import Soledad
+from leap.soledad.common.crypto import ENC_SCHEME_KEY, ENC_JSON_KEY
+from leap.soledad.common.errors import InvalidAuthTokenError
+
+
+logger = logging.getLogger(__name__)
+
+MULTIPART_ENCRYPTED = "multipart/encrypted"
+MULTIPART_SIGNED = "multipart/signed"
+PGP_BEGIN = "-----BEGIN PGP MESSAGE-----"
+PGP_END = "-----END PGP MESSAGE-----"
+
+# The period between succesive checks of the incoming mail
+# queue (in seconds)
+INCOMING_CHECK_PERIOD = 60
+
+
+class MalformedMessage(Exception):
+ """
+ Raised when a given message is not well formed.
+ """
+ pass
+
+
+class IncomingMail(Service):
+ """
+ Fetches and process mail from the incoming pool.
+
+ This object implements IService interface, has public methods
+ startService and stopService that will actually initiate a
+ LoopingCall with check_period recurrency.
+ The LoopingCall itself will invoke the fetch method each time
+ that the check_period expires.
+
+ This loop will sync the soledad db with the remote server and
+ process all the documents found tagged as incoming mail.
+ """
+ # TODO implements IService?
+
+ name = "IncomingMail"
+
+ RECENT_FLAG = "\\Recent"
+ CONTENT_KEY = "content"
+
+ LEAP_SIGNATURE_HEADER = 'X-Leap-Signature'
+ LEAP_ENCRYPTION_HEADER = 'X-Leap-Encryption'
+ """
+ Header added to messages when they are decrypted by the fetcher,
+ which states the validity of an eventual signature that might be included
+ in the encrypted blob.
+ """
+ LEAP_SIGNATURE_VALID = 'valid'
+ LEAP_SIGNATURE_INVALID = 'invalid'
+ LEAP_SIGNATURE_COULD_NOT_VERIFY = 'could not verify'
+
+ LEAP_ENCRYPTION_DECRYPTED = 'decrypted'
+
+ def __init__(self, keymanager, soledad, inbox, userid,
+ check_period=INCOMING_CHECK_PERIOD):
+
+ """
+ Initialize IncomingMail..
+
+ :param keymanager: a keymanager instance
+ :type keymanager: keymanager.KeyManager
+
+ :param soledad: a soledad instance
+ :type soledad: Soledad
+
+ :param inbox: the collection for the inbox where the new emails will be
+ stored
+ :type inbox: MessageCollection
+
+ :param check_period: the period to fetch new mail, in seconds.
+ :type check_period: int
+ """
+ leap_assert(keymanager, "need a keymanager to initialize")
+ leap_assert_type(soledad, Soledad)
+ leap_assert(check_period, "need a period to check incoming mail")
+ leap_assert_type(check_period, int)
+ leap_assert(userid, "need a userid to initialize")
+
+ self._keymanager = keymanager
+ self._soledad = soledad
+ self._inbox_collection = inbox
+ self._userid = userid
+
+ self._listeners = []
+ self._loop = None
+ self._check_period = check_period
+
+ # initialize a mail parser only once
+ self._parser = Parser()
+
+ def add_listener(self, listener):
+ """
+ Add a listener to inbox insertions.
+
+ This listener function will be called for each message added to the
+ inbox with its uid as parameter. This function should not be blocking
+ or it will block the incoming queue.
+
+ :param listener: the listener function
+ :type listener: callable
+ """
+ self._listeners.append(listener)
+
+ #
+ # Public API: fetch, start_loop, stop.
+ #
+
+ def fetch(self):
+ """
+ Fetch incoming mail, to be called periodically.
+
+ Calls a deferred that will execute the fetch callback.
+ """
+ def _sync_errback(failure):
+ log.err(failure)
+
+ def syncSoledadCallback(_):
+ # XXX this should be moved to adaptors
+ d = self._soledad.get_from_index(
+ fields.JUST_MAIL_IDX, "1", "0")
+ d.addCallback(self._process_doclist)
+ d.addErrback(_sync_errback)
+ return d
+
+ logger.debug("fetching mail for: %s %s" % (
+ self._soledad.uuid, self._userid))
+ d = self._sync_soledad()
+ d.addCallbacks(syncSoledadCallback, self._errback)
+ d.addCallbacks(self._signal_fetch_to_ui, self._errback)
+ return d
+
+ def startService(self):
+ """
+ Starts a loop to fetch mail.
+
+ :returns: A Deferred whose callback will be invoked with
+ the LoopingCall instance when loop.stop is called, or
+ whose errback will be invoked when the function raises an
+ exception or returned a deferred that has its errback
+ invoked.
+ """
+ Service.startService(self)
+ if self._loop is None:
+ self._loop = LoopingCall(self.fetch)
+ stop_deferred = self._loop.start(self._check_period)
+ return stop_deferred
+ else:
+ logger.warning("Tried to start an already running fetching loop.")
+ return defer.fail(Failure('Already running loop.'))
+
+ def stopService(self):
+ """
+ Stops the loop that fetches mail.
+ """
+ if self._loop and self._loop.running is True:
+ self._loop.stop()
+ self._loop = None
+ Service.stopService(self)
+
+ #
+ # Private methods.
+ #
+
+ # synchronize incoming mail
+
+ def _errback(self, failure):
+ log.err(failure)
+
+ def _sync_soledad(self):
+ """
+ Synchronize with remote soledad.
+
+ :returns: a list of LeapDocuments, or None.
+ :rtype: iterable or None
+ """
+ def _log_synced(result):
+ log.msg('FETCH soledad SYNCED.')
+ return result
+
+ def _signal_invalid_auth(failure):
+ failure.trap(InvalidAuthTokenError)
+ # if the token is invalid, send an event so the GUI can
+ # disable mail and show an error message.
+ emit_async(catalog.SOLEDAD_INVALID_AUTH_TOKEN, self._userid)
+
+ log.msg('FETCH: syncing soledad...')
+ d = self._soledad.sync()
+ d.addCallbacks(_log_synced, _signal_invalid_auth)
+ return d
+
+ def _signal_fetch_to_ui(self, doclist):
+ """
+ Send leap events to ui.
+
+ :param doclist: iterable with msg documents.
+ :type doclist: iterable.
+ :returns: doclist
+ :rtype: iterable
+ """
+ if doclist:
+ fetched_ts = time.mktime(time.gmtime())
+ num_mails = len(doclist) if doclist is not None else 0
+ if num_mails != 0:
+ log.msg("there are %s mails" % (num_mails,))
+ emit_async(catalog.MAIL_FETCHED_INCOMING, self._userid,
+ str(num_mails), str(fetched_ts))
+ return doclist
+
+ def _signal_unread_to_ui(self, *args):
+ """
+ Sends unread event to ui.
+ """
+ emit_async(catalog.MAIL_UNREAD_MESSAGES, self._userid,
+ str(self._inbox_collection.count_unseen()))
+
+ # process incoming mail.
+
+ def _process_doclist(self, doclist):
+ """
+ Iterates through the doclist, checks if each doc
+ looks like a message, and yields a deferred that will decrypt and
+ process the message.
+
+ :param doclist: iterable with msg documents.
+ :type doclist: iterable.
+ :returns: a list of deferreds for individual messages.
+ """
+ log.msg('processing doclist')
+ if not doclist:
+ logger.debug("no docs found")
+ return
+ num_mails = len(doclist)
+
+ deferreds = []
+ for index, doc in enumerate(doclist):
+ logger.debug("processing doc %d of %d" % (index + 1, num_mails))
+ emit_async(catalog.MAIL_MSG_PROCESSING, self._userid,
+ str(index), str(num_mails))
+
+ keys = doc.content.keys()
+
+ # TODO Compatibility check with the index in pre-0.6 mx
+ # that does not write the ERROR_DECRYPTING_KEY
+ # This should be removed in 0.7
+
+ has_errors = doc.content.get(fields.ERROR_DECRYPTING_KEY, None)
+
+ if has_errors is None:
+ warnings.warn("JUST_MAIL_COMPAT_IDX will be deprecated!",
+ DeprecationWarning)
+
+ if has_errors:
+ logger.debug("skipping msg with decrypting errors...")
+ elif self._is_msg(keys):
+ # TODO this pipeline is a bit obscure!
+ d = self._decrypt_doc(doc)
+ d.addCallback(self._maybe_extract_keys)
+ d.addCallbacks(self._add_message_locally, self._errback)
+ deferreds.append(d)
+
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+ d.addCallback(lambda _: doclist)
+ return d
+
+ #
+ # operations on individual messages
+ #
+
+ def _decrypt_doc(self, doc):
+ """
+ Decrypt the contents of a document.
+
+ :param doc: A document containing an encrypted message.
+ :type doc: SoledadDocument
+
+ :return: A Deferred that will be fired with the document and the
+ decrypted message.
+ :rtype: SoledadDocument, str
+ """
+ log.msg('decrypting msg')
+
+ def process_decrypted(res):
+ if isinstance(res, tuple):
+ decrdata, _ = res
+ success = True
+ else:
+ decrdata = ""
+ success = False
+
+ emit_async(catalog.MAIL_MSG_DECRYPTED, self._userid,
+ "1" if success else "0")
+ return self._process_decrypted_doc(doc, decrdata)
+
+ d = self._keymanager.decrypt(doc.content[ENC_JSON_KEY], self._userid)
+ d.addErrback(self._errback)
+ d.addCallback(process_decrypted)
+ d.addCallback(lambda data: (doc, data))
+ return d
+
+ def _process_decrypted_doc(self, doc, data):
+ """
+ Process a document containing a succesfully decrypted message.
+
+ :param doc: the incoming message
+ :type doc: SoledadDocument
+ :param data: the json-encoded, decrypted content of the incoming
+ message
+ :type data: str
+
+ :return: a Deferred that will be fired with an str of the proccessed
+ data.
+ :rtype: Deferred
+ """
+ log.msg('processing decrypted doc')
+
+ # XXX turn this into an errBack for each one of
+ # the deferreds that would process an individual document
+ try:
+ msg = json_loads(data)
+ except UnicodeError as exc:
+ logger.error("Error while decrypting %s" % (doc.doc_id,))
+ logger.exception(exc)
+
+ # we flag the message as "with decrypting errors",
+ # to avoid further decryption attempts during sync
+ # cycles until we're prepared to deal with that.
+ # What is the same, when Ivan deals with it...
+ # A new decrypting attempt event could be triggered by a
+ # future a library upgrade, or a cli flag to the client,
+ # we just `defer` that for now... :)
+ doc.content[fields.ERROR_DECRYPTING_KEY] = True
+ deferLater(reactor, 0, self._update_incoming_message, doc)
+
+ # FIXME this is just a dirty hack to delay the proper
+ # deferred organization here...
+ # and remember, boys, do not do this at home.
+ return []
+
+ if not isinstance(msg, dict):
+ defer.returnValue(False)
+ if not msg.get(fields.INCOMING_KEY, False):
+ defer.returnValue(False)
+
+ # ok, this is an incoming message
+ rawmsg = msg.get(self.CONTENT_KEY, None)
+ if rawmsg is None:
+ return ""
+ return self._maybe_decrypt_msg(rawmsg)
+
+ def _update_incoming_message(self, doc):
+ """
+ Do a put for a soledad document. This probably has been called only
+ in the case that we've needed to update the ERROR_DECRYPTING_KEY
+ flag in an incoming message, to get it out of the decrypting queue.
+
+ :param doc: the SoledadDocument to update
+ :type doc: SoledadDocument
+ """
+ log.msg("Updating Incoming MSG: SoledadDoc %s" % (doc.doc_id))
+ return self._soledad.put_doc(doc)
+
+ def _delete_incoming_message(self, doc):
+ """
+ Delete document.
+
+ :param doc: the SoledadDocument to delete
+ :type doc: SoledadDocument
+ """
+ log.msg("Deleting Incoming message: %s" % (doc.doc_id,))
+ return self._soledad.delete_doc(doc)
+
+ def _maybe_decrypt_msg(self, data):
+ """
+ Tries to decrypt a gpg message if data looks like one.
+
+ :param data: the text to be decrypted.
+ :type data: str
+
+ :return: a Deferred that will be fired with an str of data, possibly
+ decrypted.
+ :rtype: Deferred
+ """
+ leap_assert_type(data, str)
+ log.msg('maybe decrypting doc')
+
+ # parse the original message
+ encoding = get_email_charset(data)
+ msg = self._parser.parsestr(data)
+
+ fromHeader = msg.get('from', None)
+ senderAddress = None
+
+ if (fromHeader is not None and
+ (msg.get_content_type() == MULTIPART_ENCRYPTED or
+ msg.get_content_type() == MULTIPART_SIGNED)):
+ senderAddress = parseaddr(fromHeader)[1]
+
+ def add_leap_header(ret):
+ decrmsg, signkey = ret
+ if (senderAddress is None or signkey is None or
+ isinstance(signkey, keymanager_errors.KeyNotFound)):
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_COULD_NOT_VERIFY)
+ elif isinstance(signkey, keymanager_errors.InvalidSignature):
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_INVALID)
+ else:
+ self._add_verified_signature_header(decrmsg,
+ signkey.fingerprint)
+ return decrmsg.as_string()
+
+ if msg.get_content_type() == MULTIPART_ENCRYPTED:
+ d = self._decrypt_multipart_encrypted_msg(
+ msg, encoding, senderAddress)
+ elif msg.get_content_type() == MULTIPART_SIGNED:
+ d = self._verify_signature_not_encrypted_msg(msg, senderAddress)
+ else:
+ d = self._maybe_decrypt_inline_encrypted_msg(
+ msg, encoding, senderAddress)
+ d.addCallback(add_leap_header)
+ return d
+
+ def _add_verified_signature_header(self, decrmsg, fingerprint):
+ decrmsg.add_header(
+ self.LEAP_SIGNATURE_HEADER,
+ self.LEAP_SIGNATURE_VALID,
+ pubkey=fingerprint)
+
+ def _add_decrypted_header(self, msg):
+ msg.add_header(self.LEAP_ENCRYPTION_HEADER,
+ self.LEAP_ENCRYPTION_DECRYPTED)
+
+ def _decrypt_multipart_encrypted_msg(self, msg, encoding, senderAddress):
+ """
+ Decrypt a message with content-type 'multipart/encrypted'.
+
+ :param msg: The original encrypted message.
+ :type msg: Message
+ :param encoding: The encoding of the email message.
+ :type encoding: str
+ :param senderAddress: The email address of the sender of the message.
+ :type senderAddress: str
+
+ :return: A Deferred that will be fired with a tuple containing a
+ decrypted Message and the signing OpenPGPKey if the signature
+ is valid or InvalidSignature or KeyNotFound.
+ :rtype: Deferred
+ """
+ log.msg('decrypting multipart encrypted msg')
+ msg = copy.deepcopy(msg)
+ self._msg_multipart_sanity_check(msg)
+
+ # parse message and get encrypted content
+ pgpencmsg = msg.get_payload()[1]
+ encdata = pgpencmsg.get_payload()
+
+ # decrypt or fail gracefully
+ def build_msg(res):
+ decrdata, signkey = res
+
+ decrmsg = self._parser.parsestr(decrdata)
+ # remove original message's multipart/encrypted content-type
+ del(msg['content-type'])
+
+ # replace headers back in original message
+ for hkey, hval in decrmsg.items():
+ try:
+ # this will raise KeyError if header is not present
+ msg.replace_header(hkey, hval)
+ except KeyError:
+ msg[hkey] = hval
+
+ # all ok, replace payload by unencrypted payload
+ msg.set_payload(decrmsg.get_payload())
+ self._add_decrypted_header(msg)
+ return (msg, signkey)
+
+ def verify_signature_after_decrypt_an_email(res):
+ decrdata, signkey = res
+ if decrdata.get_content_type() == MULTIPART_SIGNED:
+ res = self._verify_signature_not_encrypted_msg(decrdata,
+ senderAddress)
+ return res
+
+ d = self._keymanager.decrypt(
+ encdata, self._userid, verify=senderAddress)
+ d.addCallbacks(build_msg, self._decryption_error, errbackArgs=(msg,))
+ d.addCallbacks(verify_signature_after_decrypt_an_email)
+ return d
+
+ def _maybe_decrypt_inline_encrypted_msg(self, origmsg, encoding,
+ senderAddress):
+ """
+ Possibly decrypt an inline OpenPGP encrypted message.
+
+ :param origmsg: The original, possibly encrypted message.
+ :type origmsg: Message
+ :param encoding: The encoding of the email message.
+ :type encoding: str
+ :param senderAddress: The email address of the sender of the message.
+ :type senderAddress: str
+
+ :return: A Deferred that will be fired with a tuple containing a
+ decrypted Message and the signing OpenPGPKey if the signature
+ is valid or InvalidSignature or KeyNotFound.
+ :rtype: Deferred
+ """
+ log.msg('maybe decrypting inline encrypted msg')
+
+ data = self._serialize_msg(origmsg)
+
+ def decrypted_data(res):
+ decrdata, signkey = res
+ replaced_data = data.replace(pgp_message, decrdata)
+ self._add_decrypted_header(origmsg)
+ return replaced_data, signkey
+
+ def encode_and_return(res):
+ data, signkey = res
+ if isinstance(data, unicode):
+ data = data.encode(encoding, 'replace')
+ return (self._parser.parsestr(data), signkey)
+
+ # handle exactly one inline PGP message
+ if PGP_BEGIN in data:
+ begin = data.find(PGP_BEGIN)
+ end = data.find(PGP_END)
+ pgp_message = data[begin:end + len(PGP_END)]
+ d = self._keymanager.decrypt(
+ pgp_message, self._userid, verify=senderAddress)
+ d.addCallbacks(decrypted_data, self._decryption_error,
+ errbackArgs=(data,))
+ else:
+ d = defer.succeed((data, None))
+ d.addCallback(encode_and_return)
+ return d
+
+ def _verify_signature_not_encrypted_msg(self, origmsg, sender_address):
+ """
+ Possibly decrypt an inline OpenPGP encrypted message.
+
+ :param origmsg: The original, possibly encrypted message.
+ :type origmsg: Message
+ :param sender_address: The email address of the sender of the message.
+ :type sender_address: str
+
+ :return: A Deferred that will be fired with a tuple containing a
+ signed Message and the signing OpenPGPKey if the signature
+ is valid or InvalidSignature.
+ :rtype: Deferred
+ """
+ msg = copy.deepcopy(origmsg)
+ data = self._serialize_msg(msg.get_payload(0))
+ detached_sig = self._extract_signature(msg)
+ d = self._keymanager.verify(data, sender_address, detached_sig)
+
+ d.addCallback(lambda sign_key: (msg, sign_key))
+ d.addErrback(lambda _: (msg, keymanager_errors.InvalidSignature()))
+ return d
+
+ def _serialize_msg(self, origmsg):
+ buf = StringIO()
+ g = Generator(buf)
+ g.flatten(origmsg)
+ return buf.getvalue()
+
+ def _extract_signature(self, msg):
+ body = msg.get_payload(0).get_payload()
+
+ if isinstance(body, str):
+ body = msg.get_payload(0)
+
+ detached_sig = msg.get_payload(1).get_payload()
+ msg.set_payload(body)
+ return detached_sig
+
+ def _decryption_error(self, failure, msg):
+ """
+ Check for known decryption errors
+ """
+ if failure.check(keymanager_errors.DecryptError):
+ logger.warning('Failed to decrypt encrypted message (%s). '
+ 'Storing message without modifications.'
+ % str(failure.value))
+ return (msg, None)
+ elif failure.check(keymanager_errors.KeyNotFound):
+ logger.error('Failed to find private key for decryption (%s). '
+ 'Storing message without modifications.'
+ % str(failure.value))
+ return (msg, None)
+ else:
+ return failure
+
+ @defer.inlineCallbacks
+ def _maybe_extract_keys(self, msgtuple):
+ """
+ Retrieve attached keys to the mesage and parse message headers for an
+ *OpenPGP* header as described on the `IETF draft
+ <http://tools.ietf.org/html/draft-josefsson-openpgp-mailnews-header-06>`
+ only urls with https and the same hostname than the email are supported
+ for security reasons.
+
+ :param msgtuple: a tuple consisting of a SoledadDocument
+ instance containing the incoming message
+ and data, the json-encoded, decrypted content of the
+ incoming message
+ :type msgtuple: (SoledadDocument, str)
+
+ :return: A Deferred that will be fired with msgtuple when key
+ extraction finishes
+ :rtype: Deferred
+ """
+ OpenPGP_HEADER = 'OpenPGP'
+ doc, data = msgtuple
+
+ # XXX the parsing of the message is done in mailbox.addMessage, maybe
+ # we should do it in this module so we don't need to parse it again
+ # here
+ msg = self._parser.parsestr(data)
+ _, fromAddress = parseaddr(msg['from'])
+
+ valid_attachment = False
+ if msg.is_multipart():
+ valid_attachment = yield self._maybe_extract_attached_key(
+ msg.get_payload(), fromAddress)
+
+ if not valid_attachment:
+ header = msg.get(OpenPGP_HEADER, None)
+ if header is not None:
+ yield self._maybe_extract_openpgp_header(header, fromAddress)
+
+ defer.returnValue(msgtuple)
+
+ def _maybe_extract_openpgp_header(self, header, address):
+ """
+ Import keys from the OpenPGP header
+
+ :param header: OpenPGP header string
+ :type header: str
+ :param address: email address in the from header
+ :type address: str
+
+ :return: A Deferred that will be fired when header extraction is done
+ :rtype: Deferred
+ """
+ d = defer.succeed(None)
+ fields = dict([f.strip(' ').split('=') for f in header.split(';')])
+ if 'url' in fields:
+ url = shlex.split(fields['url'])[0] # remove quotations
+ urlparts = urlparse(url)
+ addressHostname = address.split('@')[1]
+ if (
+ urlparts.scheme == 'https' and
+ urlparts.hostname == addressHostname
+ ):
+ def fetch_error(failure):
+ if failure.check(keymanager_errors.KeyNotFound):
+ logger.warning("Url from OpenPGP header %s failed"
+ % (url,))
+ elif failure.check(keymanager_errors.KeyAttributesDiffer):
+ logger.warning("Key from OpenPGP header url %s didn't "
+ "match the from address %s"
+ % (url, address))
+ else:
+ return failure
+
+ d = self._keymanager.fetch_key(address, url)
+ d.addCallback(
+ lambda _:
+ logger.info("Imported key from header %s" % (url,)))
+ d.addErrback(fetch_error)
+ else:
+ logger.debug("No valid url on OpenPGP header %s" % (url,))
+ else:
+ logger.debug("There is no url on the OpenPGP header: %s"
+ % (header,))
+ return d
+
+ def _maybe_extract_attached_key(self, attachments, address):
+ """
+ Import keys from the attachments
+
+ :param attachments: email attachment list
+ :type attachments: list(email.Message)
+ :param address: email address in the from header
+ :type address: str
+
+ :return: A Deferred that will be fired when all the keys are stored
+ with a boolean: True if there was a valid key attached, or
+ False otherwise.
+ :rtype: Deferred
+ """
+ MIME_KEY = "application/pgp-keys"
+
+ def log_key_added(ignored):
+ logger.debug('Added key found in attachment for %s' % address)
+ return True
+
+ def failed_put_key(failure):
+ logger.info("An error has ocurred adding attached key for %s: %s"
+ % (address, failure.getErrorMessage()))
+ return False
+
+ deferreds = []
+ for attachment in attachments:
+ if MIME_KEY == attachment.get_content_type():
+ d = self._keymanager.put_raw_key(
+ attachment.get_payload(decode=True), address=address)
+ d.addCallbacks(log_key_added, failed_put_key)
+ deferreds.append(d)
+ d = defer.gatherResults(deferreds)
+ d.addCallback(lambda result: any(result))
+ return d
+
+ def _add_message_locally(self, msgtuple):
+ """
+ Adds a message to local inbox and delete it from the incoming db
+ in soledad.
+
+ :param msgtuple: a tuple consisting of a SoledadDocument
+ instance containing the incoming message
+ and data, the json-encoded, decrypted content of the
+ incoming message
+ :type msgtuple: (SoledadDocument, str)
+
+ :return: A Deferred that will be fired when the messages is stored
+ :rtype: Defferred
+ """
+ doc, raw_data = msgtuple
+ insertion_date = formatdate(time.time())
+ log.msg('adding message %s to local db' % (doc.doc_id,))
+
+ def msgSavedCallback(result):
+ if empty(result):
+ return
+
+ for listener in self._listeners:
+ listener(result)
+
+ def signal_deleted(doc_id):
+ emit_async(catalog.MAIL_MSG_DELETED_INCOMING,
+ self._userid)
+ return doc_id
+
+ emit_async(catalog.MAIL_MSG_SAVED_LOCALLY, self._userid)
+ d = self._delete_incoming_message(doc)
+ d.addCallback(signal_deleted)
+ return d
+
+ d = self._inbox_collection.add_msg(
+ raw_data, (self.RECENT_FLAG,), date=insertion_date,
+ notify_just_mdoc=True)
+ d.addCallbacks(msgSavedCallback, self._errback)
+ return d
+
+ #
+ # helpers
+ #
+
+ def _msg_multipart_sanity_check(self, msg):
+ """
+ Performs a sanity check against a multipart encrypted msg
+
+ :param msg: The original encrypted message.
+ :type msg: Message
+ """
+ # sanity check
+ payload = msg.get_payload()
+ if len(payload) != 2:
+ raise MalformedMessage(
+ 'Multipart/encrypted messages should have exactly 2 body '
+ 'parts (instead of %d).' % len(payload))
+ if payload[0].get_content_type() != 'application/pgp-encrypted':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' first body part should "
+ "have content type equal to 'application/pgp-encrypted' "
+ "(instead of %s)." % payload[0].get_content_type())
+ if payload[1].get_content_type() != 'application/octet-stream':
+ raise MalformedMessage(
+ "Multipart/encrypted messages' second body part should "
+ "have content type equal to 'octet-stream' (instead of "
+ "%s)." % payload[1].get_content_type())
+
+ def _is_msg(self, keys):
+ """
+ Checks if the keys of a dictionary match the signature
+ of the document type we use for messages.
+
+ :param keys: iterable containing the strings to match.
+ :type keys: iterable of strings.
+ :rtype: bool
+ """
+ return ENC_SCHEME_KEY in keys and ENC_JSON_KEY in keys
diff --git a/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message b/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message
new file mode 100644
index 00000000..98304f24
--- /dev/null
+++ b/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message
@@ -0,0 +1,61 @@
+Content-Type: multipart/encrypted;
+ boundary="Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B";
+ protocol="application/pgp-encrypted";
+Subject: Enc signed
+Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\))
+From: Leap Test Key <leap@leap.se>
+Date: Tue, 24 May 2016 11:47:24 -0300
+Content-Description: OpenPGP encrypted message
+To: leap@leap.se
+
+This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)
+--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B
+Content-Type: application/pgp-encrypted
+Content-Description: PGP/MIME Versions Identification
+
+--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B
+Content-Disposition: inline;
+ filename=encrypted.asc
+Content-Type: application/octet-stream;
+ name=encrypted.asc
+Content-Description: OpenPGP encrypted message
+
+-----BEGIN PGP MESSAGE-----
+Version: GnuPG v2
+
+hQIMAyj9aG/xtZOwAQ/9Gft0KmOpgzL6z4wmVlLm2aeAvHolXmxWb7N/ByL/dZ4n
+YZd/GPRj42X3BwUrDEL5aO3Mcp+rqq8ACh9hsZXiau0Q9cs1K7Gr55Y06qLrIjom
+2fLqwLFBxCL2sAX1dvClgStyfsRFk9Y/+5tX+IjWaD8dAoRdxCO8IbUDuYGnaKld
+bB9h0NMfKVddCAvuQvX1Zc1Nx0Yb3Hd+ocDD7i9BVgX1BBiGu4/ElS3d32TAVCFs
+Na3tjitWB2G472CYu1O6exY7h1F5V4FHfXH6iMRJSYnvV2Jr+oPZENzNdEEA5H/H
+fUbpWrpKzPafjho9S5rJBBM/tqtmBQFBIdgFVcBVb+bXO6DJ8SMTLiiGcVUvvm1b
+9N2VQIhsxtZ8DpcHHSqFVgT2Gt4UkSrEleSoReg36TzS1s8Uw0oU068PwTe3K0Gx
+2pLMdT9NA6X/t7movpXP6tih1l6P5z62dxFl6W12J9OcegISCt0Q7gex1gk/a8zM
+rzBJC3mVxRiFlvHPBgD6oUKarnTJPQx5f5dFXg8DXBWR1Eh/aFjPQIzhZBYpmOi8
+HqgjcAA+WhMQ7v5c0enJoJJS+8Xfai/MK2vTUGsfAT6HqHLw1HSIn6XQGEf4sQ/U
+NfLeFHHbe9rTk8QhyjrSl2vvek2H4EBQVLF08/FUrAfPELUttOFtysQfC3+M0+PS
+6QGyeIlUjKpBJG7HBd4ibuKMQ5vnA+ACsg/TySYeCO6P85xsN+Lmqlr8cAICn/hR
+ezFSzlibaIelRgfDEDJdjVyCsa7qBMjhRCvGYBdkyTzIRq53qwD9pkhrQ6nwWQrv
+bBzyLrl+NVR8CTEOwbeFLI6qf68kblojk3lwo3Qi3psmeMJdiaV9uevsHrgmEFTH
+lZ3rFECPWzmrkMSfVjWu5d8jJqMcqa4lnGzFQKaB76I8BzGhCWrnuvHPB9c9SVhI
+AnAwNw3gY5xgsbXMxZhnPgYeBSViPkQkgRCWl8Jz41eiAJ3Gtj8QSSFWGHpX+MgP
+ohBaPHz6Fnkhz7Lok97e2AcuRZrDVKV6i28r8mizI3B2Mah6ZV0Yuv0EYNtzBv/v
+yV3nu4DWuOOU0301CXBayxJGX0h07z1Ycv7jWD6LNiBXa1vahtbU4WSYNkF0OJaz
+nf8O3CZy5twMq5kQYoPacdNNLregAmWquvE1nxqWbtHFMjtXitP7czxzUTU/DE+C
+jr+irDoYEregEKg9xov91UCRPZgxL+TML71+tSYOMO3JG6lbGw77PQ8s2So7xore
+8+FeDFPaaJqh6uhF5LETRSx8x/haZiXLd+WtO7wF8S3+Vz7AJIFIe8MUadZrYwnH
+wfMAktQKbep3iHCeZ5jHYA461AOhnCca2y+GoyHZUDDFwS1pC1RN4lMkafSE1AgH
+cmEcjLYsw1gqT0+DfqrvjbXmMjGgkgnkMybJH7df5TKu36Q0Nqvcbc2XLFkalr5V
+Vk0SScqKYnKL+cJjabqA8rKkeAh22E2FBCpKPqxSS3te2bRb3XBX26bP0LshkJuy
+GPu6LKvwmUn0obPKCnLJvb9ImIGZToXu6Fb/Cd2c3DG1IK5PptQz4f7ZRW98huPO
+2w59Bswwt5q4lQqsMEzVRnIDH45MmnhEUeS4NaxqLTO7eJpMpb4VxT2u/Ac3XWKp
+o2RE6CbqTyJ+n8tY9OwBRMKzdVd9RFAMqMHTzWTAuU4BgW2vT2sHYZdAsX8sktBr
+5mo9P3MqvgdPNpg8+AOB03JlIv0dzrAFWCZxxLLGIIIz0eXsjghHzQ9QjGfr0xFH
+Z79AKDjsoRisWyWCnadS2oM9fdAg4T/h1STnfxc44o7N1+ym7u58ODICFi+Kg8IR
+JBHIp3CK02JLTLd/WFhUVyWgc6l8gn+oBK+r7Dw+FTWhqX2/ZHCO8qKK1ZK3NIMn
+MBcSVvHSnTPtppb+oND5nk38xazVVHnwxNHaIh7g3NxDB4hl5rBhrWsgTNuqDDRU
+w7ufvMYr1AOV+8e92cHCEKPM19nFKEgaBFECEptEObesGI3QZPAESlojzQ3cDeBa
+=tEyc
+-----END PGP MESSAGE-----
+
+--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B-- \ No newline at end of file
diff --git a/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py b/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py
new file mode 100644
index 00000000..29422ecc
--- /dev/null
+++ b/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py
@@ -0,0 +1,391 @@
+# -*- coding: utf-8 -*-
+# test_incoming_mail.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test case for leap.mail.incoming.service
+
+@authors: Ruben Pollan, <meskio@sindominio.net>
+
+@license: GPLv3, see included LICENSE file
+"""
+
+import json
+import os
+import tempfile
+import uuid
+
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.parser import Parser
+from mock import Mock
+
+from twisted.internet import defer
+from twisted.python import log
+
+from leap.keymanager.errors import KeyAddressMismatch
+from leap.mail.adaptors import soledad_indexes as fields
+from leap.mail.adaptors.soledad import cleanup_deferred_locks
+from leap.mail.adaptors.soledad import SoledadMailAdaptor
+from leap.mail.mail import MessageCollection
+from leap.mail.mailbox_indexer import MailboxIndexer
+
+from leap.mail.incoming.service import IncomingMail
+from leap.mail.rfc3156 import MultipartEncrypted, PGPEncrypted
+from leap.mail.testing import KeyManagerWithSoledadTestCase
+from leap.mail.testing import ADDRESS, ADDRESS_2
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common.crypto import (
+ EncryptionSchemes,
+ ENC_JSON_KEY,
+ ENC_SCHEME_KEY,
+)
+
+HERE = os.path.split(os.path.abspath(__file__))[0]
+
+# TODO: add some tests for encrypted, unencrypted, signed and unsgined messages
+
+
+class IncomingMailTestCase(KeyManagerWithSoledadTestCase):
+ """
+ Tests for the incoming mail parser
+ """
+ NICKSERVER = "http://domain"
+ BODY = """
+Governments of the Industrial World, you weary giants of flesh and steel, I
+come from Cyberspace, the new home of Mind. On behalf of the future, I ask
+you of the past to leave us alone. You are not welcome among us. You have
+no sovereignty where we gather.
+ """
+ EMAIL = """from: Test from SomeDomain <%(from)s>
+to: %(to)s
+subject: independence of cyberspace
+
+%(body)s
+ """ % {
+ "from": ADDRESS_2,
+ "to": ADDRESS,
+ "body": BODY
+ }
+
+ def setUp(self):
+ cleanup_deferred_locks()
+ try:
+ del self._soledad
+ del self.km
+ except AttributeError:
+ pass
+
+ # pytest handles correctly the setupEnv for the class,
+ # but trial ignores it.
+ if not getattr(self, 'tempdir', None):
+ self.tempdir = tempfile.mkdtemp()
+
+ def getCollection(_):
+ adaptor = SoledadMailAdaptor()
+ store = self._soledad
+ adaptor.store = store
+ mbox_indexer = MailboxIndexer(store)
+ mbox_name = "INBOX"
+ mbox_uuid = str(uuid.uuid4())
+
+ def get_collection_from_mbox_wrapper(wrapper):
+ wrapper.uuid = mbox_uuid
+ return MessageCollection(
+ adaptor, store,
+ mbox_indexer=mbox_indexer, mbox_wrapper=wrapper)
+
+ d = adaptor.initialize_store(store)
+ d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid))
+ d.addCallback(
+ lambda _: adaptor.get_or_create_mbox(store, mbox_name))
+ d.addCallback(get_collection_from_mbox_wrapper)
+ return d
+
+ def setUpFetcher(inbox_collection):
+ self.fetcher = IncomingMail(
+ self.km,
+ self._soledad,
+ inbox_collection,
+ ADDRESS)
+
+ # The messages don't exist on soledad will fail on deletion
+ self.fetcher._delete_incoming_message = Mock(
+ return_value=defer.succeed(None))
+
+ d = KeyManagerWithSoledadTestCase.setUp(self)
+ d.addCallback(getCollection)
+ d.addCallback(setUpFetcher)
+ d.addErrback(log.err)
+ return d
+
+ def tearDown(self):
+ d = KeyManagerWithSoledadTestCase.tearDown(self)
+ return d
+
+ def testExtractOpenPGPHeader(self):
+ """
+ Test the OpenPGP header key extraction
+ """
+ KEYURL = "https://leap.se/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = Parser().parsestr(self.EMAIL)
+ message.add_header("OpenPGP", OpenPGP)
+ self.fetcher._keymanager.fetch_key = Mock(
+ return_value=defer.succeed(None))
+
+ def fetch_key_called(ret):
+ self.fetcher._keymanager.fetch_key.assert_called_once_with(
+ ADDRESS_2, KEYURL)
+
+ d = self._create_incoming_email(message.as_string())
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ d.addCallback(fetch_key_called)
+ return d
+
+ def testExtractOpenPGPHeaderInvalidUrl(self):
+ """
+ Test the OpenPGP header key extraction
+ """
+ KEYURL = "https://someotherdomain.com/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = Parser().parsestr(self.EMAIL)
+ message.add_header("OpenPGP", OpenPGP)
+ self.fetcher._keymanager.fetch_key = Mock()
+
+ def fetch_key_called(ret):
+ self.assertFalse(self.fetcher._keymanager.fetch_key.called)
+
+ d = self._create_incoming_email(message.as_string())
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ d.addCallback(fetch_key_called)
+ return d
+
+ def testExtractAttachedKey(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.succeed(None))
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ return d
+
+ def testExtractInvalidAttachedKey(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.fail(KeyAddressMismatch()))
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ d.addErrback(log.err)
+ return d
+
+ def testExtractAttachedKeyAndNotOpenPGPHeader(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+ KEYURL = "https://leap.se/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ message.add_header("OpenPGP", OpenPGP)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.succeed(None))
+ self.fetcher._keymanager.fetch_key = Mock()
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+ self.assertFalse(self.fetcher._keymanager.fetch_key.called)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ return d
+
+ def testExtractOpenPGPHeaderIfInvalidAttachedKey(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+ KEYURL = "https://leap.se/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ message.add_header("OpenPGP", OpenPGP)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.fail(KeyAddressMismatch()))
+ self.fetcher._keymanager.fetch_key = Mock()
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+ self.fetcher._keymanager.fetch_key.assert_called_once_with(
+ ADDRESS_2, KEYURL)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ return d
+
+ def testAddDecryptedHeader(self):
+ class DummyMsg():
+
+ def __init__(self):
+ self.headers = {}
+
+ def add_header(self, k, v):
+ self.headers[k] = v
+
+ msg = DummyMsg()
+ self.fetcher._add_decrypted_header(msg)
+
+ self.assertEquals(msg.headers['X-Leap-Encryption'], 'decrypted')
+
+ def testDecryptEmail(self):
+
+ self.fetcher._decryption_error = Mock()
+ self.fetcher._add_decrypted_header = Mock()
+
+ def create_encrypted_message(encstr):
+ message = Parser().parsestr(self.EMAIL)
+ newmsg = MultipartEncrypted('application/pgp-encrypted')
+ for hkey, hval in message.items():
+ newmsg.add_header(hkey, hval)
+
+ encmsg = MIMEApplication(
+ encstr, _subtype='octet-stream', _encoder=lambda x: x)
+ encmsg.add_header('content-disposition', 'attachment',
+ filename='msg.asc')
+ # create meta message
+ metamsg = PGPEncrypted()
+ metamsg.add_header('Content-Disposition', 'attachment')
+ # attach pgp message parts to new message
+ newmsg.attach(metamsg)
+ newmsg.attach(encmsg)
+ return newmsg
+
+ def decryption_error_not_called(_):
+ self.assertFalse(self.fetcher._decryption_error.called,
+ "There was some errors with decryption")
+
+ def add_decrypted_header_called(_):
+ self.assertTrue(self.fetcher._add_decrypted_header.called,
+ "There was some errors with decryption")
+
+ d = self.km.encrypt(self.EMAIL, ADDRESS, sign=ADDRESS_2)
+ d.addCallback(create_encrypted_message)
+ d.addCallback(
+ lambda message:
+ self._do_fetch(message.as_string()))
+ d.addCallback(decryption_error_not_called)
+ d.addCallback(add_decrypted_header_called)
+ return d
+
+ def testValidateSignatureFromEncryptedEmailFromAppleMail(self):
+ enc_signed_file = os.path.join(
+ HERE, 'rfc822.multi-encrypt-signed.message')
+ self.fetcher._add_verified_signature_header = Mock()
+
+ def add_verified_signature_header_called(_):
+ self.assertTrue(self.fetcher._add_verified_signature_header.called,
+ "There was some errors verifying signature")
+
+ with open(enc_signed_file) as f:
+ enc_signed_raw = f.read()
+
+ d = self._do_fetch(enc_signed_raw)
+ d.addCallback(add_verified_signature_header_called)
+ return d
+
+ def testListener(self):
+ self.called = False
+
+ def listener(uid):
+ self.called = True
+
+ def listener_called(_):
+ self.assertTrue(self.called)
+
+ self.fetcher.add_listener(listener)
+ d = self._do_fetch(self.EMAIL)
+ d.addCallback(listener_called)
+ return d
+
+ def _do_fetch(self, message):
+ d = self._create_incoming_email(message)
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ return d
+
+ def _create_incoming_email(self, email_str):
+ email = SoledadDocument()
+ data = json.dumps(
+ {"incoming": True, "content": email_str},
+ ensure_ascii=False)
+
+ def set_email_content(encr_data):
+ email.content = {
+ fields.INCOMING_KEY: True,
+ fields.ERROR_DECRYPTING_KEY: False,
+ ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
+ ENC_JSON_KEY: encr_data
+ }
+ return email
+ d = self.km.encrypt(data, ADDRESS, fetch_remote=False)
+ d.addCallback(set_email_content)
+ return d
+
+ def _mock_soledad_get_from_index(self, index_name, value):
+ get_from_index = self._soledad.get_from_index
+
+ def soledad_mock(idx_name, *key_values):
+ if index_name == idx_name:
+ return defer.succeed(value)
+ return get_from_index(idx_name, *key_values)
+ self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock)