diff options
Diffstat (limited to 'src/leap/bitmask/mail/incoming')
-rw-r--r-- | src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message | 61 | ||||
-rw-r--r-- | src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py | 391 |
2 files changed, 0 insertions, 452 deletions
diff --git a/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message b/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message deleted file mode 100644 index 98304f2..0000000 --- a/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message +++ /dev/null @@ -1,61 +0,0 @@ -Content-Type: multipart/encrypted; - boundary="Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B"; - protocol="application/pgp-encrypted"; -Subject: Enc signed -Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) -From: Leap Test Key <leap@leap.se> -Date: Tue, 24 May 2016 11:47:24 -0300 -Content-Description: OpenPGP encrypted message -To: leap@leap.se - -This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156) ---Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B -Content-Type: application/pgp-encrypted -Content-Description: PGP/MIME Versions Identification - ---Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B -Content-Disposition: inline; - filename=encrypted.asc -Content-Type: application/octet-stream; - name=encrypted.asc -Content-Description: OpenPGP encrypted message - ------BEGIN PGP MESSAGE----- -Version: GnuPG v2 - -hQIMAyj9aG/xtZOwAQ/9Gft0KmOpgzL6z4wmVlLm2aeAvHolXmxWb7N/ByL/dZ4n -YZd/GPRj42X3BwUrDEL5aO3Mcp+rqq8ACh9hsZXiau0Q9cs1K7Gr55Y06qLrIjom -2fLqwLFBxCL2sAX1dvClgStyfsRFk9Y/+5tX+IjWaD8dAoRdxCO8IbUDuYGnaKld -bB9h0NMfKVddCAvuQvX1Zc1Nx0Yb3Hd+ocDD7i9BVgX1BBiGu4/ElS3d32TAVCFs -Na3tjitWB2G472CYu1O6exY7h1F5V4FHfXH6iMRJSYnvV2Jr+oPZENzNdEEA5H/H -fUbpWrpKzPafjho9S5rJBBM/tqtmBQFBIdgFVcBVb+bXO6DJ8SMTLiiGcVUvvm1b -9N2VQIhsxtZ8DpcHHSqFVgT2Gt4UkSrEleSoReg36TzS1s8Uw0oU068PwTe3K0Gx -2pLMdT9NA6X/t7movpXP6tih1l6P5z62dxFl6W12J9OcegISCt0Q7gex1gk/a8zM -rzBJC3mVxRiFlvHPBgD6oUKarnTJPQx5f5dFXg8DXBWR1Eh/aFjPQIzhZBYpmOi8 -HqgjcAA+WhMQ7v5c0enJoJJS+8Xfai/MK2vTUGsfAT6HqHLw1HSIn6XQGEf4sQ/U -NfLeFHHbe9rTk8QhyjrSl2vvek2H4EBQVLF08/FUrAfPELUttOFtysQfC3+M0+PS -6QGyeIlUjKpBJG7HBd4ibuKMQ5vnA+ACsg/TySYeCO6P85xsN+Lmqlr8cAICn/hR -ezFSzlibaIelRgfDEDJdjVyCsa7qBMjhRCvGYBdkyTzIRq53qwD9pkhrQ6nwWQrv -bBzyLrl+NVR8CTEOwbeFLI6qf68kblojk3lwo3Qi3psmeMJdiaV9uevsHrgmEFTH -lZ3rFECPWzmrkMSfVjWu5d8jJqMcqa4lnGzFQKaB76I8BzGhCWrnuvHPB9c9SVhI -AnAwNw3gY5xgsbXMxZhnPgYeBSViPkQkgRCWl8Jz41eiAJ3Gtj8QSSFWGHpX+MgP -ohBaPHz6Fnkhz7Lok97e2AcuRZrDVKV6i28r8mizI3B2Mah6ZV0Yuv0EYNtzBv/v -yV3nu4DWuOOU0301CXBayxJGX0h07z1Ycv7jWD6LNiBXa1vahtbU4WSYNkF0OJaz -nf8O3CZy5twMq5kQYoPacdNNLregAmWquvE1nxqWbtHFMjtXitP7czxzUTU/DE+C -jr+irDoYEregEKg9xov91UCRPZgxL+TML71+tSYOMO3JG6lbGw77PQ8s2So7xore -8+FeDFPaaJqh6uhF5LETRSx8x/haZiXLd+WtO7wF8S3+Vz7AJIFIe8MUadZrYwnH -wfMAktQKbep3iHCeZ5jHYA461AOhnCca2y+GoyHZUDDFwS1pC1RN4lMkafSE1AgH -cmEcjLYsw1gqT0+DfqrvjbXmMjGgkgnkMybJH7df5TKu36Q0Nqvcbc2XLFkalr5V -Vk0SScqKYnKL+cJjabqA8rKkeAh22E2FBCpKPqxSS3te2bRb3XBX26bP0LshkJuy -GPu6LKvwmUn0obPKCnLJvb9ImIGZToXu6Fb/Cd2c3DG1IK5PptQz4f7ZRW98huPO -2w59Bswwt5q4lQqsMEzVRnIDH45MmnhEUeS4NaxqLTO7eJpMpb4VxT2u/Ac3XWKp -o2RE6CbqTyJ+n8tY9OwBRMKzdVd9RFAMqMHTzWTAuU4BgW2vT2sHYZdAsX8sktBr -5mo9P3MqvgdPNpg8+AOB03JlIv0dzrAFWCZxxLLGIIIz0eXsjghHzQ9QjGfr0xFH -Z79AKDjsoRisWyWCnadS2oM9fdAg4T/h1STnfxc44o7N1+ym7u58ODICFi+Kg8IR -JBHIp3CK02JLTLd/WFhUVyWgc6l8gn+oBK+r7Dw+FTWhqX2/ZHCO8qKK1ZK3NIMn -MBcSVvHSnTPtppb+oND5nk38xazVVHnwxNHaIh7g3NxDB4hl5rBhrWsgTNuqDDRU -w7ufvMYr1AOV+8e92cHCEKPM19nFKEgaBFECEptEObesGI3QZPAESlojzQ3cDeBa -=tEyc ------END PGP MESSAGE----- - ---Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B--
\ No newline at end of file diff --git a/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py b/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py deleted file mode 100644 index 29422ec..0000000 --- a/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py +++ /dev/null @@ -1,391 +0,0 @@ -# -*- coding: utf-8 -*- -# test_incoming_mail.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test case for leap.mail.incoming.service - -@authors: Ruben Pollan, <meskio@sindominio.net> - -@license: GPLv3, see included LICENSE file -""" - -import json -import os -import tempfile -import uuid - -from email.mime.application import MIMEApplication -from email.mime.multipart import MIMEMultipart -from email.parser import Parser -from mock import Mock - -from twisted.internet import defer -from twisted.python import log - -from leap.keymanager.errors import KeyAddressMismatch -from leap.mail.adaptors import soledad_indexes as fields -from leap.mail.adaptors.soledad import cleanup_deferred_locks -from leap.mail.adaptors.soledad import SoledadMailAdaptor -from leap.mail.mail import MessageCollection -from leap.mail.mailbox_indexer import MailboxIndexer - -from leap.mail.incoming.service import IncomingMail -from leap.mail.rfc3156 import MultipartEncrypted, PGPEncrypted -from leap.mail.testing import KeyManagerWithSoledadTestCase -from leap.mail.testing import ADDRESS, ADDRESS_2 -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.crypto import ( - EncryptionSchemes, - ENC_JSON_KEY, - ENC_SCHEME_KEY, -) - -HERE = os.path.split(os.path.abspath(__file__))[0] - -# TODO: add some tests for encrypted, unencrypted, signed and unsgined messages - - -class IncomingMailTestCase(KeyManagerWithSoledadTestCase): - """ - Tests for the incoming mail parser - """ - NICKSERVER = "http://domain" - BODY = """ -Governments of the Industrial World, you weary giants of flesh and steel, I -come from Cyberspace, the new home of Mind. On behalf of the future, I ask -you of the past to leave us alone. You are not welcome among us. You have -no sovereignty where we gather. - """ - EMAIL = """from: Test from SomeDomain <%(from)s> -to: %(to)s -subject: independence of cyberspace - -%(body)s - """ % { - "from": ADDRESS_2, - "to": ADDRESS, - "body": BODY - } - - def setUp(self): - cleanup_deferred_locks() - try: - del self._soledad - del self.km - except AttributeError: - pass - - # pytest handles correctly the setupEnv for the class, - # but trial ignores it. - if not getattr(self, 'tempdir', None): - self.tempdir = tempfile.mkdtemp() - - def getCollection(_): - adaptor = SoledadMailAdaptor() - store = self._soledad - adaptor.store = store - mbox_indexer = MailboxIndexer(store) - mbox_name = "INBOX" - mbox_uuid = str(uuid.uuid4()) - - def get_collection_from_mbox_wrapper(wrapper): - wrapper.uuid = mbox_uuid - return MessageCollection( - adaptor, store, - mbox_indexer=mbox_indexer, mbox_wrapper=wrapper) - - d = adaptor.initialize_store(store) - d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid)) - d.addCallback( - lambda _: adaptor.get_or_create_mbox(store, mbox_name)) - d.addCallback(get_collection_from_mbox_wrapper) - return d - - def setUpFetcher(inbox_collection): - self.fetcher = IncomingMail( - self.km, - self._soledad, - inbox_collection, - ADDRESS) - - # The messages don't exist on soledad will fail on deletion - self.fetcher._delete_incoming_message = Mock( - return_value=defer.succeed(None)) - - d = KeyManagerWithSoledadTestCase.setUp(self) - d.addCallback(getCollection) - d.addCallback(setUpFetcher) - d.addErrback(log.err) - return d - - def tearDown(self): - d = KeyManagerWithSoledadTestCase.tearDown(self) - return d - - def testExtractOpenPGPHeader(self): - """ - Test the OpenPGP header key extraction - """ - KEYURL = "https://leap.se/key.txt" - OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) - - message = Parser().parsestr(self.EMAIL) - message.add_header("OpenPGP", OpenPGP) - self.fetcher._keymanager.fetch_key = Mock( - return_value=defer.succeed(None)) - - def fetch_key_called(ret): - self.fetcher._keymanager.fetch_key.assert_called_once_with( - ADDRESS_2, KEYURL) - - d = self._create_incoming_email(message.as_string()) - d.addCallback( - lambda email: - self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) - d.addCallback(lambda _: self.fetcher.fetch()) - d.addCallback(fetch_key_called) - return d - - def testExtractOpenPGPHeaderInvalidUrl(self): - """ - Test the OpenPGP header key extraction - """ - KEYURL = "https://someotherdomain.com/key.txt" - OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) - - message = Parser().parsestr(self.EMAIL) - message.add_header("OpenPGP", OpenPGP) - self.fetcher._keymanager.fetch_key = Mock() - - def fetch_key_called(ret): - self.assertFalse(self.fetcher._keymanager.fetch_key.called) - - d = self._create_incoming_email(message.as_string()) - d.addCallback( - lambda email: - self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) - d.addCallback(lambda _: self.fetcher.fetch()) - d.addCallback(fetch_key_called) - return d - - def testExtractAttachedKey(self): - KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." - - message = MIMEMultipart() - message.add_header("from", ADDRESS_2) - key = MIMEApplication("", "pgp-keys") - key.set_payload(KEY) - message.attach(key) - self.fetcher._keymanager.put_raw_key = Mock( - return_value=defer.succeed(None)) - - def put_raw_key_called(_): - self.fetcher._keymanager.put_raw_key.assert_called_once_with( - KEY, address=ADDRESS_2) - - d = self._do_fetch(message.as_string()) - d.addCallback(put_raw_key_called) - return d - - def testExtractInvalidAttachedKey(self): - KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." - - message = MIMEMultipart() - message.add_header("from", ADDRESS_2) - key = MIMEApplication("", "pgp-keys") - key.set_payload(KEY) - message.attach(key) - self.fetcher._keymanager.put_raw_key = Mock( - return_value=defer.fail(KeyAddressMismatch())) - - def put_raw_key_called(_): - self.fetcher._keymanager.put_raw_key.assert_called_once_with( - KEY, address=ADDRESS_2) - - d = self._do_fetch(message.as_string()) - d.addCallback(put_raw_key_called) - d.addErrback(log.err) - return d - - def testExtractAttachedKeyAndNotOpenPGPHeader(self): - KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." - KEYURL = "https://leap.se/key.txt" - OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) - - message = MIMEMultipart() - message.add_header("from", ADDRESS_2) - message.add_header("OpenPGP", OpenPGP) - key = MIMEApplication("", "pgp-keys") - key.set_payload(KEY) - message.attach(key) - - self.fetcher._keymanager.put_raw_key = Mock( - return_value=defer.succeed(None)) - self.fetcher._keymanager.fetch_key = Mock() - - def put_raw_key_called(_): - self.fetcher._keymanager.put_raw_key.assert_called_once_with( - KEY, address=ADDRESS_2) - self.assertFalse(self.fetcher._keymanager.fetch_key.called) - - d = self._do_fetch(message.as_string()) - d.addCallback(put_raw_key_called) - return d - - def testExtractOpenPGPHeaderIfInvalidAttachedKey(self): - KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." - KEYURL = "https://leap.se/key.txt" - OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) - - message = MIMEMultipart() - message.add_header("from", ADDRESS_2) - message.add_header("OpenPGP", OpenPGP) - key = MIMEApplication("", "pgp-keys") - key.set_payload(KEY) - message.attach(key) - - self.fetcher._keymanager.put_raw_key = Mock( - return_value=defer.fail(KeyAddressMismatch())) - self.fetcher._keymanager.fetch_key = Mock() - - def put_raw_key_called(_): - self.fetcher._keymanager.put_raw_key.assert_called_once_with( - KEY, address=ADDRESS_2) - self.fetcher._keymanager.fetch_key.assert_called_once_with( - ADDRESS_2, KEYURL) - - d = self._do_fetch(message.as_string()) - d.addCallback(put_raw_key_called) - return d - - def testAddDecryptedHeader(self): - class DummyMsg(): - - def __init__(self): - self.headers = {} - - def add_header(self, k, v): - self.headers[k] = v - - msg = DummyMsg() - self.fetcher._add_decrypted_header(msg) - - self.assertEquals(msg.headers['X-Leap-Encryption'], 'decrypted') - - def testDecryptEmail(self): - - self.fetcher._decryption_error = Mock() - self.fetcher._add_decrypted_header = Mock() - - def create_encrypted_message(encstr): - message = Parser().parsestr(self.EMAIL) - newmsg = MultipartEncrypted('application/pgp-encrypted') - for hkey, hval in message.items(): - newmsg.add_header(hkey, hval) - - encmsg = MIMEApplication( - encstr, _subtype='octet-stream', _encoder=lambda x: x) - encmsg.add_header('content-disposition', 'attachment', - filename='msg.asc') - # create meta message - metamsg = PGPEncrypted() - metamsg.add_header('Content-Disposition', 'attachment') - # attach pgp message parts to new message - newmsg.attach(metamsg) - newmsg.attach(encmsg) - return newmsg - - def decryption_error_not_called(_): - self.assertFalse(self.fetcher._decryption_error.called, - "There was some errors with decryption") - - def add_decrypted_header_called(_): - self.assertTrue(self.fetcher._add_decrypted_header.called, - "There was some errors with decryption") - - d = self.km.encrypt(self.EMAIL, ADDRESS, sign=ADDRESS_2) - d.addCallback(create_encrypted_message) - d.addCallback( - lambda message: - self._do_fetch(message.as_string())) - d.addCallback(decryption_error_not_called) - d.addCallback(add_decrypted_header_called) - return d - - def testValidateSignatureFromEncryptedEmailFromAppleMail(self): - enc_signed_file = os.path.join( - HERE, 'rfc822.multi-encrypt-signed.message') - self.fetcher._add_verified_signature_header = Mock() - - def add_verified_signature_header_called(_): - self.assertTrue(self.fetcher._add_verified_signature_header.called, - "There was some errors verifying signature") - - with open(enc_signed_file) as f: - enc_signed_raw = f.read() - - d = self._do_fetch(enc_signed_raw) - d.addCallback(add_verified_signature_header_called) - return d - - def testListener(self): - self.called = False - - def listener(uid): - self.called = True - - def listener_called(_): - self.assertTrue(self.called) - - self.fetcher.add_listener(listener) - d = self._do_fetch(self.EMAIL) - d.addCallback(listener_called) - return d - - def _do_fetch(self, message): - d = self._create_incoming_email(message) - d.addCallback( - lambda email: - self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) - d.addCallback(lambda _: self.fetcher.fetch()) - return d - - def _create_incoming_email(self, email_str): - email = SoledadDocument() - data = json.dumps( - {"incoming": True, "content": email_str}, - ensure_ascii=False) - - def set_email_content(encr_data): - email.content = { - fields.INCOMING_KEY: True, - fields.ERROR_DECRYPTING_KEY: False, - ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY, - ENC_JSON_KEY: encr_data - } - return email - d = self.km.encrypt(data, ADDRESS, fetch_remote=False) - d.addCallback(set_email_content) - return d - - def _mock_soledad_get_from_index(self, index_name, value): - get_from_index = self._soledad.get_from_index - - def soledad_mock(idx_name, *key_values): - if index_name == idx_name: - return defer.succeed(value) - return get_from_index(idx_name, *key_values) - self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock) |