From 5a3a2012bb8982ad0884ed659e61e969345e6fde Mon Sep 17 00:00:00 2001 From: "Kali Kaneko (leap communications)" Date: Mon, 29 Aug 2016 23:10:17 -0400 Subject: [pkg] move mail source to leap.bitmask.mail --- .../tests/rfc822.multi-encrypt-signed.message | 61 ++++ .../mail/incoming/tests/test_incoming_mail.py | 391 +++++++++++++++++++++ 2 files changed, 452 insertions(+) create mode 100644 src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message create mode 100644 src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py (limited to 'src/leap/bitmask/mail/incoming/tests') diff --git a/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message b/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message new file mode 100644 index 0000000..98304f2 --- /dev/null +++ b/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message @@ -0,0 +1,61 @@ +Content-Type: multipart/encrypted; + boundary="Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B"; + protocol="application/pgp-encrypted"; +Subject: Enc signed +Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\)) +From: Leap Test Key +Date: Tue, 24 May 2016 11:47:24 -0300 +Content-Description: OpenPGP encrypted message +To: leap@leap.se + +This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156) +--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B +Content-Type: application/pgp-encrypted +Content-Description: PGP/MIME Versions Identification + +--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B +Content-Disposition: inline; + filename=encrypted.asc +Content-Type: application/octet-stream; + name=encrypted.asc +Content-Description: OpenPGP encrypted message + +-----BEGIN PGP MESSAGE----- +Version: GnuPG v2 + +hQIMAyj9aG/xtZOwAQ/9Gft0KmOpgzL6z4wmVlLm2aeAvHolXmxWb7N/ByL/dZ4n +YZd/GPRj42X3BwUrDEL5aO3Mcp+rqq8ACh9hsZXiau0Q9cs1K7Gr55Y06qLrIjom +2fLqwLFBxCL2sAX1dvClgStyfsRFk9Y/+5tX+IjWaD8dAoRdxCO8IbUDuYGnaKld +bB9h0NMfKVddCAvuQvX1Zc1Nx0Yb3Hd+ocDD7i9BVgX1BBiGu4/ElS3d32TAVCFs +Na3tjitWB2G472CYu1O6exY7h1F5V4FHfXH6iMRJSYnvV2Jr+oPZENzNdEEA5H/H +fUbpWrpKzPafjho9S5rJBBM/tqtmBQFBIdgFVcBVb+bXO6DJ8SMTLiiGcVUvvm1b +9N2VQIhsxtZ8DpcHHSqFVgT2Gt4UkSrEleSoReg36TzS1s8Uw0oU068PwTe3K0Gx +2pLMdT9NA6X/t7movpXP6tih1l6P5z62dxFl6W12J9OcegISCt0Q7gex1gk/a8zM +rzBJC3mVxRiFlvHPBgD6oUKarnTJPQx5f5dFXg8DXBWR1Eh/aFjPQIzhZBYpmOi8 +HqgjcAA+WhMQ7v5c0enJoJJS+8Xfai/MK2vTUGsfAT6HqHLw1HSIn6XQGEf4sQ/U +NfLeFHHbe9rTk8QhyjrSl2vvek2H4EBQVLF08/FUrAfPELUttOFtysQfC3+M0+PS +6QGyeIlUjKpBJG7HBd4ibuKMQ5vnA+ACsg/TySYeCO6P85xsN+Lmqlr8cAICn/hR +ezFSzlibaIelRgfDEDJdjVyCsa7qBMjhRCvGYBdkyTzIRq53qwD9pkhrQ6nwWQrv +bBzyLrl+NVR8CTEOwbeFLI6qf68kblojk3lwo3Qi3psmeMJdiaV9uevsHrgmEFTH +lZ3rFECPWzmrkMSfVjWu5d8jJqMcqa4lnGzFQKaB76I8BzGhCWrnuvHPB9c9SVhI +AnAwNw3gY5xgsbXMxZhnPgYeBSViPkQkgRCWl8Jz41eiAJ3Gtj8QSSFWGHpX+MgP +ohBaPHz6Fnkhz7Lok97e2AcuRZrDVKV6i28r8mizI3B2Mah6ZV0Yuv0EYNtzBv/v +yV3nu4DWuOOU0301CXBayxJGX0h07z1Ycv7jWD6LNiBXa1vahtbU4WSYNkF0OJaz +nf8O3CZy5twMq5kQYoPacdNNLregAmWquvE1nxqWbtHFMjtXitP7czxzUTU/DE+C +jr+irDoYEregEKg9xov91UCRPZgxL+TML71+tSYOMO3JG6lbGw77PQ8s2So7xore +8+FeDFPaaJqh6uhF5LETRSx8x/haZiXLd+WtO7wF8S3+Vz7AJIFIe8MUadZrYwnH +wfMAktQKbep3iHCeZ5jHYA461AOhnCca2y+GoyHZUDDFwS1pC1RN4lMkafSE1AgH +cmEcjLYsw1gqT0+DfqrvjbXmMjGgkgnkMybJH7df5TKu36Q0Nqvcbc2XLFkalr5V +Vk0SScqKYnKL+cJjabqA8rKkeAh22E2FBCpKPqxSS3te2bRb3XBX26bP0LshkJuy +GPu6LKvwmUn0obPKCnLJvb9ImIGZToXu6Fb/Cd2c3DG1IK5PptQz4f7ZRW98huPO +2w59Bswwt5q4lQqsMEzVRnIDH45MmnhEUeS4NaxqLTO7eJpMpb4VxT2u/Ac3XWKp +o2RE6CbqTyJ+n8tY9OwBRMKzdVd9RFAMqMHTzWTAuU4BgW2vT2sHYZdAsX8sktBr +5mo9P3MqvgdPNpg8+AOB03JlIv0dzrAFWCZxxLLGIIIz0eXsjghHzQ9QjGfr0xFH +Z79AKDjsoRisWyWCnadS2oM9fdAg4T/h1STnfxc44o7N1+ym7u58ODICFi+Kg8IR +JBHIp3CK02JLTLd/WFhUVyWgc6l8gn+oBK+r7Dw+FTWhqX2/ZHCO8qKK1ZK3NIMn +MBcSVvHSnTPtppb+oND5nk38xazVVHnwxNHaIh7g3NxDB4hl5rBhrWsgTNuqDDRU +w7ufvMYr1AOV+8e92cHCEKPM19nFKEgaBFECEptEObesGI3QZPAESlojzQ3cDeBa +=tEyc +-----END PGP MESSAGE----- + +--Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B-- \ No newline at end of file diff --git a/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py b/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py new file mode 100644 index 0000000..29422ec --- /dev/null +++ b/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py @@ -0,0 +1,391 @@ +# -*- coding: utf-8 -*- +# test_incoming_mail.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Test case for leap.mail.incoming.service + +@authors: Ruben Pollan, + +@license: GPLv3, see included LICENSE file +""" + +import json +import os +import tempfile +import uuid + +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.parser import Parser +from mock import Mock + +from twisted.internet import defer +from twisted.python import log + +from leap.keymanager.errors import KeyAddressMismatch +from leap.mail.adaptors import soledad_indexes as fields +from leap.mail.adaptors.soledad import cleanup_deferred_locks +from leap.mail.adaptors.soledad import SoledadMailAdaptor +from leap.mail.mail import MessageCollection +from leap.mail.mailbox_indexer import MailboxIndexer + +from leap.mail.incoming.service import IncomingMail +from leap.mail.rfc3156 import MultipartEncrypted, PGPEncrypted +from leap.mail.testing import KeyManagerWithSoledadTestCase +from leap.mail.testing import ADDRESS, ADDRESS_2 +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.crypto import ( + EncryptionSchemes, + ENC_JSON_KEY, + ENC_SCHEME_KEY, +) + +HERE = os.path.split(os.path.abspath(__file__))[0] + +# TODO: add some tests for encrypted, unencrypted, signed and unsgined messages + + +class IncomingMailTestCase(KeyManagerWithSoledadTestCase): + """ + Tests for the incoming mail parser + """ + NICKSERVER = "http://domain" + BODY = """ +Governments of the Industrial World, you weary giants of flesh and steel, I +come from Cyberspace, the new home of Mind. On behalf of the future, I ask +you of the past to leave us alone. You are not welcome among us. You have +no sovereignty where we gather. + """ + EMAIL = """from: Test from SomeDomain <%(from)s> +to: %(to)s +subject: independence of cyberspace + +%(body)s + """ % { + "from": ADDRESS_2, + "to": ADDRESS, + "body": BODY + } + + def setUp(self): + cleanup_deferred_locks() + try: + del self._soledad + del self.km + except AttributeError: + pass + + # pytest handles correctly the setupEnv for the class, + # but trial ignores it. + if not getattr(self, 'tempdir', None): + self.tempdir = tempfile.mkdtemp() + + def getCollection(_): + adaptor = SoledadMailAdaptor() + store = self._soledad + adaptor.store = store + mbox_indexer = MailboxIndexer(store) + mbox_name = "INBOX" + mbox_uuid = str(uuid.uuid4()) + + def get_collection_from_mbox_wrapper(wrapper): + wrapper.uuid = mbox_uuid + return MessageCollection( + adaptor, store, + mbox_indexer=mbox_indexer, mbox_wrapper=wrapper) + + d = adaptor.initialize_store(store) + d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid)) + d.addCallback( + lambda _: adaptor.get_or_create_mbox(store, mbox_name)) + d.addCallback(get_collection_from_mbox_wrapper) + return d + + def setUpFetcher(inbox_collection): + self.fetcher = IncomingMail( + self.km, + self._soledad, + inbox_collection, + ADDRESS) + + # The messages don't exist on soledad will fail on deletion + self.fetcher._delete_incoming_message = Mock( + return_value=defer.succeed(None)) + + d = KeyManagerWithSoledadTestCase.setUp(self) + d.addCallback(getCollection) + d.addCallback(setUpFetcher) + d.addErrback(log.err) + return d + + def tearDown(self): + d = KeyManagerWithSoledadTestCase.tearDown(self) + return d + + def testExtractOpenPGPHeader(self): + """ + Test the OpenPGP header key extraction + """ + KEYURL = "https://leap.se/key.txt" + OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) + + message = Parser().parsestr(self.EMAIL) + message.add_header("OpenPGP", OpenPGP) + self.fetcher._keymanager.fetch_key = Mock( + return_value=defer.succeed(None)) + + def fetch_key_called(ret): + self.fetcher._keymanager.fetch_key.assert_called_once_with( + ADDRESS_2, KEYURL) + + d = self._create_incoming_email(message.as_string()) + d.addCallback( + lambda email: + self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) + d.addCallback(lambda _: self.fetcher.fetch()) + d.addCallback(fetch_key_called) + return d + + def testExtractOpenPGPHeaderInvalidUrl(self): + """ + Test the OpenPGP header key extraction + """ + KEYURL = "https://someotherdomain.com/key.txt" + OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) + + message = Parser().parsestr(self.EMAIL) + message.add_header("OpenPGP", OpenPGP) + self.fetcher._keymanager.fetch_key = Mock() + + def fetch_key_called(ret): + self.assertFalse(self.fetcher._keymanager.fetch_key.called) + + d = self._create_incoming_email(message.as_string()) + d.addCallback( + lambda email: + self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) + d.addCallback(lambda _: self.fetcher.fetch()) + d.addCallback(fetch_key_called) + return d + + def testExtractAttachedKey(self): + KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." + + message = MIMEMultipart() + message.add_header("from", ADDRESS_2) + key = MIMEApplication("", "pgp-keys") + key.set_payload(KEY) + message.attach(key) + self.fetcher._keymanager.put_raw_key = Mock( + return_value=defer.succeed(None)) + + def put_raw_key_called(_): + self.fetcher._keymanager.put_raw_key.assert_called_once_with( + KEY, address=ADDRESS_2) + + d = self._do_fetch(message.as_string()) + d.addCallback(put_raw_key_called) + return d + + def testExtractInvalidAttachedKey(self): + KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." + + message = MIMEMultipart() + message.add_header("from", ADDRESS_2) + key = MIMEApplication("", "pgp-keys") + key.set_payload(KEY) + message.attach(key) + self.fetcher._keymanager.put_raw_key = Mock( + return_value=defer.fail(KeyAddressMismatch())) + + def put_raw_key_called(_): + self.fetcher._keymanager.put_raw_key.assert_called_once_with( + KEY, address=ADDRESS_2) + + d = self._do_fetch(message.as_string()) + d.addCallback(put_raw_key_called) + d.addErrback(log.err) + return d + + def testExtractAttachedKeyAndNotOpenPGPHeader(self): + KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." + KEYURL = "https://leap.se/key.txt" + OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) + + message = MIMEMultipart() + message.add_header("from", ADDRESS_2) + message.add_header("OpenPGP", OpenPGP) + key = MIMEApplication("", "pgp-keys") + key.set_payload(KEY) + message.attach(key) + + self.fetcher._keymanager.put_raw_key = Mock( + return_value=defer.succeed(None)) + self.fetcher._keymanager.fetch_key = Mock() + + def put_raw_key_called(_): + self.fetcher._keymanager.put_raw_key.assert_called_once_with( + KEY, address=ADDRESS_2) + self.assertFalse(self.fetcher._keymanager.fetch_key.called) + + d = self._do_fetch(message.as_string()) + d.addCallback(put_raw_key_called) + return d + + def testExtractOpenPGPHeaderIfInvalidAttachedKey(self): + KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." + KEYURL = "https://leap.se/key.txt" + OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) + + message = MIMEMultipart() + message.add_header("from", ADDRESS_2) + message.add_header("OpenPGP", OpenPGP) + key = MIMEApplication("", "pgp-keys") + key.set_payload(KEY) + message.attach(key) + + self.fetcher._keymanager.put_raw_key = Mock( + return_value=defer.fail(KeyAddressMismatch())) + self.fetcher._keymanager.fetch_key = Mock() + + def put_raw_key_called(_): + self.fetcher._keymanager.put_raw_key.assert_called_once_with( + KEY, address=ADDRESS_2) + self.fetcher._keymanager.fetch_key.assert_called_once_with( + ADDRESS_2, KEYURL) + + d = self._do_fetch(message.as_string()) + d.addCallback(put_raw_key_called) + return d + + def testAddDecryptedHeader(self): + class DummyMsg(): + + def __init__(self): + self.headers = {} + + def add_header(self, k, v): + self.headers[k] = v + + msg = DummyMsg() + self.fetcher._add_decrypted_header(msg) + + self.assertEquals(msg.headers['X-Leap-Encryption'], 'decrypted') + + def testDecryptEmail(self): + + self.fetcher._decryption_error = Mock() + self.fetcher._add_decrypted_header = Mock() + + def create_encrypted_message(encstr): + message = Parser().parsestr(self.EMAIL) + newmsg = MultipartEncrypted('application/pgp-encrypted') + for hkey, hval in message.items(): + newmsg.add_header(hkey, hval) + + encmsg = MIMEApplication( + encstr, _subtype='octet-stream', _encoder=lambda x: x) + encmsg.add_header('content-disposition', 'attachment', + filename='msg.asc') + # create meta message + metamsg = PGPEncrypted() + metamsg.add_header('Content-Disposition', 'attachment') + # attach pgp message parts to new message + newmsg.attach(metamsg) + newmsg.attach(encmsg) + return newmsg + + def decryption_error_not_called(_): + self.assertFalse(self.fetcher._decryption_error.called, + "There was some errors with decryption") + + def add_decrypted_header_called(_): + self.assertTrue(self.fetcher._add_decrypted_header.called, + "There was some errors with decryption") + + d = self.km.encrypt(self.EMAIL, ADDRESS, sign=ADDRESS_2) + d.addCallback(create_encrypted_message) + d.addCallback( + lambda message: + self._do_fetch(message.as_string())) + d.addCallback(decryption_error_not_called) + d.addCallback(add_decrypted_header_called) + return d + + def testValidateSignatureFromEncryptedEmailFromAppleMail(self): + enc_signed_file = os.path.join( + HERE, 'rfc822.multi-encrypt-signed.message') + self.fetcher._add_verified_signature_header = Mock() + + def add_verified_signature_header_called(_): + self.assertTrue(self.fetcher._add_verified_signature_header.called, + "There was some errors verifying signature") + + with open(enc_signed_file) as f: + enc_signed_raw = f.read() + + d = self._do_fetch(enc_signed_raw) + d.addCallback(add_verified_signature_header_called) + return d + + def testListener(self): + self.called = False + + def listener(uid): + self.called = True + + def listener_called(_): + self.assertTrue(self.called) + + self.fetcher.add_listener(listener) + d = self._do_fetch(self.EMAIL) + d.addCallback(listener_called) + return d + + def _do_fetch(self, message): + d = self._create_incoming_email(message) + d.addCallback( + lambda email: + self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) + d.addCallback(lambda _: self.fetcher.fetch()) + return d + + def _create_incoming_email(self, email_str): + email = SoledadDocument() + data = json.dumps( + {"incoming": True, "content": email_str}, + ensure_ascii=False) + + def set_email_content(encr_data): + email.content = { + fields.INCOMING_KEY: True, + fields.ERROR_DECRYPTING_KEY: False, + ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY, + ENC_JSON_KEY: encr_data + } + return email + d = self.km.encrypt(data, ADDRESS, fetch_remote=False) + d.addCallback(set_email_content) + return d + + def _mock_soledad_get_from_index(self, index_name, value): + get_from_index = self._soledad.get_from_index + + def soledad_mock(idx_name, *key_values): + if index_name == idx_name: + return defer.succeed(value) + return get_from_index(idx_name, *key_values) + self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock) -- cgit v1.2.3