summaryrefslogtreecommitdiff
path: root/tests/integration/mail/incoming
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration/mail/incoming')
-rw-r--r--tests/integration/mail/incoming/rfc822.multi-encrypt-signed.message61
-rw-r--r--tests/integration/mail/incoming/test_incoming_mail.py390
2 files changed, 451 insertions, 0 deletions
diff --git a/tests/integration/mail/incoming/rfc822.multi-encrypt-signed.message b/tests/integration/mail/incoming/rfc822.multi-encrypt-signed.message
new file mode 100644
index 00000000..98304f24
--- /dev/null
+++ b/tests/integration/mail/incoming/rfc822.multi-encrypt-signed.message
@@ -0,0 +1,61 @@
+Content-Type: multipart/encrypted;
+ boundary="Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B";
+ protocol="application/pgp-encrypted";
+Subject: Enc signed
+Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\))
+From: Leap Test Key <leap@leap.se>
+Date: Tue, 24 May 2016 11:47:24 -0300
+Content-Description: OpenPGP encrypted message
+To: leap@leap.se
+
+This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)
+--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B
+Content-Type: application/pgp-encrypted
+Content-Description: PGP/MIME Versions Identification
+
+--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B
+Content-Disposition: inline;
+ filename=encrypted.asc
+Content-Type: application/octet-stream;
+ name=encrypted.asc
+Content-Description: OpenPGP encrypted message
+
+-----BEGIN PGP MESSAGE-----
+Version: GnuPG v2
+
+hQIMAyj9aG/xtZOwAQ/9Gft0KmOpgzL6z4wmVlLm2aeAvHolXmxWb7N/ByL/dZ4n
+YZd/GPRj42X3BwUrDEL5aO3Mcp+rqq8ACh9hsZXiau0Q9cs1K7Gr55Y06qLrIjom
+2fLqwLFBxCL2sAX1dvClgStyfsRFk9Y/+5tX+IjWaD8dAoRdxCO8IbUDuYGnaKld
+bB9h0NMfKVddCAvuQvX1Zc1Nx0Yb3Hd+ocDD7i9BVgX1BBiGu4/ElS3d32TAVCFs
+Na3tjitWB2G472CYu1O6exY7h1F5V4FHfXH6iMRJSYnvV2Jr+oPZENzNdEEA5H/H
+fUbpWrpKzPafjho9S5rJBBM/tqtmBQFBIdgFVcBVb+bXO6DJ8SMTLiiGcVUvvm1b
+9N2VQIhsxtZ8DpcHHSqFVgT2Gt4UkSrEleSoReg36TzS1s8Uw0oU068PwTe3K0Gx
+2pLMdT9NA6X/t7movpXP6tih1l6P5z62dxFl6W12J9OcegISCt0Q7gex1gk/a8zM
+rzBJC3mVxRiFlvHPBgD6oUKarnTJPQx5f5dFXg8DXBWR1Eh/aFjPQIzhZBYpmOi8
+HqgjcAA+WhMQ7v5c0enJoJJS+8Xfai/MK2vTUGsfAT6HqHLw1HSIn6XQGEf4sQ/U
+NfLeFHHbe9rTk8QhyjrSl2vvek2H4EBQVLF08/FUrAfPELUttOFtysQfC3+M0+PS
+6QGyeIlUjKpBJG7HBd4ibuKMQ5vnA+ACsg/TySYeCO6P85xsN+Lmqlr8cAICn/hR
+ezFSzlibaIelRgfDEDJdjVyCsa7qBMjhRCvGYBdkyTzIRq53qwD9pkhrQ6nwWQrv
+bBzyLrl+NVR8CTEOwbeFLI6qf68kblojk3lwo3Qi3psmeMJdiaV9uevsHrgmEFTH
+lZ3rFECPWzmrkMSfVjWu5d8jJqMcqa4lnGzFQKaB76I8BzGhCWrnuvHPB9c9SVhI
+AnAwNw3gY5xgsbXMxZhnPgYeBSViPkQkgRCWl8Jz41eiAJ3Gtj8QSSFWGHpX+MgP
+ohBaPHz6Fnkhz7Lok97e2AcuRZrDVKV6i28r8mizI3B2Mah6ZV0Yuv0EYNtzBv/v
+yV3nu4DWuOOU0301CXBayxJGX0h07z1Ycv7jWD6LNiBXa1vahtbU4WSYNkF0OJaz
+nf8O3CZy5twMq5kQYoPacdNNLregAmWquvE1nxqWbtHFMjtXitP7czxzUTU/DE+C
+jr+irDoYEregEKg9xov91UCRPZgxL+TML71+tSYOMO3JG6lbGw77PQ8s2So7xore
+8+FeDFPaaJqh6uhF5LETRSx8x/haZiXLd+WtO7wF8S3+Vz7AJIFIe8MUadZrYwnH
+wfMAktQKbep3iHCeZ5jHYA461AOhnCca2y+GoyHZUDDFwS1pC1RN4lMkafSE1AgH
+cmEcjLYsw1gqT0+DfqrvjbXmMjGgkgnkMybJH7df5TKu36Q0Nqvcbc2XLFkalr5V
+Vk0SScqKYnKL+cJjabqA8rKkeAh22E2FBCpKPqxSS3te2bRb3XBX26bP0LshkJuy
+GPu6LKvwmUn0obPKCnLJvb9ImIGZToXu6Fb/Cd2c3DG1IK5PptQz4f7ZRW98huPO
+2w59Bswwt5q4lQqsMEzVRnIDH45MmnhEUeS4NaxqLTO7eJpMpb4VxT2u/Ac3XWKp
+o2RE6CbqTyJ+n8tY9OwBRMKzdVd9RFAMqMHTzWTAuU4BgW2vT2sHYZdAsX8sktBr
+5mo9P3MqvgdPNpg8+AOB03JlIv0dzrAFWCZxxLLGIIIz0eXsjghHzQ9QjGfr0xFH
+Z79AKDjsoRisWyWCnadS2oM9fdAg4T/h1STnfxc44o7N1+ym7u58ODICFi+Kg8IR
+JBHIp3CK02JLTLd/WFhUVyWgc6l8gn+oBK+r7Dw+FTWhqX2/ZHCO8qKK1ZK3NIMn
+MBcSVvHSnTPtppb+oND5nk38xazVVHnwxNHaIh7g3NxDB4hl5rBhrWsgTNuqDDRU
+w7ufvMYr1AOV+8e92cHCEKPM19nFKEgaBFECEptEObesGI3QZPAESlojzQ3cDeBa
+=tEyc
+-----END PGP MESSAGE-----
+
+--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B-- \ No newline at end of file
diff --git a/tests/integration/mail/incoming/test_incoming_mail.py b/tests/integration/mail/incoming/test_incoming_mail.py
new file mode 100644
index 00000000..b8d69f12
--- /dev/null
+++ b/tests/integration/mail/incoming/test_incoming_mail.py
@@ -0,0 +1,390 @@
+# -*- coding: utf-8 -*-
+# test_incoming_mail.py
+# Copyright (C) 2015-2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test case for leap.bitmask.mail.incoming.service
+
+@authors: Ruben Pollan, <meskio@sindominio.net>
+
+@license: GPLv3, see included LICENSE file
+"""
+import json
+import os
+import tempfile
+import uuid
+
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.parser import Parser
+from mock import Mock
+
+from twisted.internet import defer
+from twisted.python import log
+
+from leap.bitmask.keymanager.errors import KeyAddressMismatch
+from leap.bitmask.mail.adaptors import soledad_indexes as fields
+from leap.bitmask.mail.adaptors.soledad import cleanup_deferred_locks
+from leap.bitmask.mail.adaptors.soledad import SoledadMailAdaptor
+from leap.bitmask.mail.mail import MessageCollection
+from leap.bitmask.mail.mailbox_indexer import MailboxIndexer
+
+from leap.bitmask.mail.incoming.service import IncomingMail
+from leap.bitmask.mail.rfc3156 import MultipartEncrypted, PGPEncrypted
+from leap.bitmask.mail.testing import KeyManagerWithSoledadTestCase
+from leap.bitmask.mail.testing import ADDRESS, ADDRESS_2
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common.crypto import (
+ EncryptionSchemes,
+ ENC_JSON_KEY,
+ ENC_SCHEME_KEY,
+)
+
+HERE = os.path.split(os.path.abspath(__file__))[0]
+
+# TODO: add some tests for encrypted, unencrypted, signed and unsgined messages
+
+
+class IncomingMailTestCase(KeyManagerWithSoledadTestCase):
+ """
+ Tests for the incoming mail parser
+ """
+ NICKSERVER = "http://domain"
+ BODY = """
+Governments of the Industrial World, you weary giants of flesh and steel, I
+come from Cyberspace, the new home of Mind. On behalf of the future, I ask
+you of the past to leave us alone. You are not welcome among us. You have
+no sovereignty where we gather.
+ """
+ EMAIL = """from: Test from SomeDomain <%(from)s>
+to: %(to)s
+subject: independence of cyberspace
+
+%(body)s
+ """ % {
+ "from": ADDRESS_2,
+ "to": ADDRESS,
+ "body": BODY
+ }
+
+ def setUp(self):
+ cleanup_deferred_locks()
+ try:
+ del self._soledad
+ del self.km
+ except AttributeError:
+ pass
+
+ # pytest handles correctly the setupEnv for the class,
+ # but trial ignores it.
+ if not getattr(self, 'tempdir', None):
+ self.tempdir = tempfile.mkdtemp()
+
+ def getCollection(_):
+ adaptor = SoledadMailAdaptor()
+ store = self._soledad
+ adaptor.store = store
+ mbox_indexer = MailboxIndexer(store)
+ mbox_name = "INBOX"
+ mbox_uuid = str(uuid.uuid4())
+
+ def get_collection_from_mbox_wrapper(wrapper):
+ wrapper.uuid = mbox_uuid
+ return MessageCollection(
+ adaptor, store,
+ mbox_indexer=mbox_indexer, mbox_wrapper=wrapper)
+
+ d = adaptor.initialize_store(store)
+ d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid))
+ d.addCallback(
+ lambda _: adaptor.get_or_create_mbox(store, mbox_name))
+ d.addCallback(get_collection_from_mbox_wrapper)
+ return d
+
+ def setUpFetcher(inbox_collection):
+ self.fetcher = IncomingMail(
+ self.km,
+ self._soledad,
+ inbox_collection,
+ ADDRESS)
+
+ # The messages don't exist on soledad will fail on deletion
+ self.fetcher._delete_incoming_message = Mock(
+ return_value=defer.succeed(None))
+
+ d = KeyManagerWithSoledadTestCase.setUp(self)
+ d.addCallback(getCollection)
+ d.addCallback(setUpFetcher)
+ d.addErrback(log.err)
+ return d
+
+ def tearDown(self):
+ d = KeyManagerWithSoledadTestCase.tearDown(self)
+ return d
+
+ def testExtractOpenPGPHeader(self):
+ """
+ Test the OpenPGP header key extraction
+ """
+ KEYURL = "https://leap.se/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = Parser().parsestr(self.EMAIL)
+ message.add_header("OpenPGP", OpenPGP)
+ self.fetcher._keymanager.fetch_key = Mock(
+ return_value=defer.succeed(None))
+
+ def fetch_key_called(ret):
+ self.fetcher._keymanager.fetch_key.assert_called_once_with(
+ ADDRESS_2, KEYURL)
+
+ d = self._create_incoming_email(message.as_string())
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ d.addCallback(fetch_key_called)
+ return d
+
+ def testExtractOpenPGPHeaderInvalidUrl(self):
+ """
+ Test the OpenPGP header key extraction
+ """
+ KEYURL = "https://someotherdomain.com/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = Parser().parsestr(self.EMAIL)
+ message.add_header("OpenPGP", OpenPGP)
+ self.fetcher._keymanager.fetch_key = Mock()
+
+ def fetch_key_called(ret):
+ self.assertFalse(self.fetcher._keymanager.fetch_key.called)
+
+ d = self._create_incoming_email(message.as_string())
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ d.addCallback(fetch_key_called)
+ return d
+
+ def testExtractAttachedKey(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.succeed(None))
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ return d
+
+ def testExtractInvalidAttachedKey(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.fail(KeyAddressMismatch()))
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ d.addErrback(log.err)
+ return d
+
+ def testExtractAttachedKeyAndNotOpenPGPHeader(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+ KEYURL = "https://leap.se/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ message.add_header("OpenPGP", OpenPGP)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.succeed(None))
+ self.fetcher._keymanager.fetch_key = Mock()
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+ self.assertFalse(self.fetcher._keymanager.fetch_key.called)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ return d
+
+ def testExtractOpenPGPHeaderIfInvalidAttachedKey(self):
+ KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
+ KEYURL = "https://leap.se/key.txt"
+ OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
+
+ message = MIMEMultipart()
+ message.add_header("from", ADDRESS_2)
+ message.add_header("OpenPGP", OpenPGP)
+ key = MIMEApplication("", "pgp-keys")
+ key.set_payload(KEY)
+ message.attach(key)
+
+ self.fetcher._keymanager.put_raw_key = Mock(
+ return_value=defer.fail(KeyAddressMismatch()))
+ self.fetcher._keymanager.fetch_key = Mock()
+
+ def put_raw_key_called(_):
+ self.fetcher._keymanager.put_raw_key.assert_called_once_with(
+ KEY, address=ADDRESS_2)
+ self.fetcher._keymanager.fetch_key.assert_called_once_with(
+ ADDRESS_2, KEYURL)
+
+ d = self._do_fetch(message.as_string())
+ d.addCallback(put_raw_key_called)
+ return d
+
+ def testAddDecryptedHeader(self):
+ class DummyMsg():
+
+ def __init__(self):
+ self.headers = {}
+
+ def add_header(self, k, v):
+ self.headers[k] = v
+
+ msg = DummyMsg()
+ self.fetcher._add_decrypted_header(msg)
+
+ self.assertEquals(msg.headers['X-Leap-Encryption'], 'decrypted')
+
+ def testDecryptEmail(self):
+
+ self.fetcher._decryption_error = Mock()
+ self.fetcher._add_decrypted_header = Mock()
+
+ def create_encrypted_message(encstr):
+ message = Parser().parsestr(self.EMAIL)
+ newmsg = MultipartEncrypted('application/pgp-encrypted')
+ for hkey, hval in message.items():
+ newmsg.add_header(hkey, hval)
+
+ encmsg = MIMEApplication(
+ encstr, _subtype='octet-stream', _encoder=lambda x: x)
+ encmsg.add_header('content-disposition', 'attachment',
+ filename='msg.asc')
+ # create meta message
+ metamsg = PGPEncrypted()
+ metamsg.add_header('Content-Disposition', 'attachment')
+ # attach pgp message parts to new message
+ newmsg.attach(metamsg)
+ newmsg.attach(encmsg)
+ return newmsg
+
+ def decryption_error_not_called(_):
+ self.assertFalse(self.fetcher._decryption_error.called,
+ "There was some errors with decryption")
+
+ def add_decrypted_header_called(_):
+ self.assertTrue(self.fetcher._add_decrypted_header.called,
+ "There was some errors with decryption")
+
+ d = self.km.encrypt(self.EMAIL, ADDRESS, sign=ADDRESS_2)
+ d.addCallback(create_encrypted_message)
+ d.addCallback(
+ lambda message:
+ self._do_fetch(message.as_string()))
+ d.addCallback(decryption_error_not_called)
+ d.addCallback(add_decrypted_header_called)
+ return d
+
+ def testValidateSignatureFromEncryptedEmailFromAppleMail(self):
+ enc_signed_file = os.path.join(
+ HERE, 'rfc822.multi-encrypt-signed.message')
+ self.fetcher._add_verified_signature_header = Mock()
+
+ def add_verified_signature_header_called(_):
+ self.assertTrue(self.fetcher._add_verified_signature_header.called,
+ "There was some errors verifying signature")
+
+ with open(enc_signed_file) as f:
+ enc_signed_raw = f.read()
+
+ d = self._do_fetch(enc_signed_raw)
+ d.addCallback(add_verified_signature_header_called)
+ return d
+
+ def testListener(self):
+ self.called = False
+
+ def listener(uid):
+ self.called = True
+
+ def listener_called(_):
+ self.assertTrue(self.called)
+
+ self.fetcher.add_listener(listener)
+ d = self._do_fetch(self.EMAIL)
+ d.addCallback(listener_called)
+ return d
+
+ def _do_fetch(self, message):
+ d = self._create_incoming_email(message)
+ d.addCallback(
+ lambda email:
+ self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
+ d.addCallback(lambda _: self.fetcher.fetch())
+ return d
+
+ def _create_incoming_email(self, email_str):
+ email = SoledadDocument()
+ data = json.dumps(
+ {"incoming": True, "content": email_str},
+ ensure_ascii=False)
+
+ def set_email_content(encr_data):
+ email.content = {
+ fields.INCOMING_KEY: True,
+ fields.ERROR_DECRYPTING_KEY: False,
+ ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
+ ENC_JSON_KEY: encr_data
+ }
+ return email
+ d = self.km.encrypt(data, ADDRESS, fetch_remote=False)
+ d.addCallback(set_email_content)
+ return d
+
+ def _mock_soledad_get_from_index(self, index_name, value):
+ get_from_index = self._soledad.get_from_index
+
+ def soledad_mock(idx_name, *key_values):
+ if index_name == idx_name:
+ return defer.succeed(value)
+ return get_from_index(idx_name, *key_values)
+ self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock)