summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/mail/incoming
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/bitmask/mail/incoming')
-rw-r--r--src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message61
-rw-r--r--src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py391
2 files changed, 0 insertions, 452 deletions
diff --git a/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message b/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message
deleted file mode 100644
index 98304f24..00000000
--- a/src/leap/bitmask/mail/incoming/tests/rfc822.multi-encrypt-signed.message
+++ /dev/null
@@ -1,61 +0,0 @@
-Content-Type: multipart/encrypted;
- boundary="Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B";
- protocol="application/pgp-encrypted";
-Subject: Enc signed
-Mime-Version: 1.0 (Mac OS X Mail 9.3 \(3124\))
-From: Leap Test Key <leap@leap.se>
-Date: Tue, 24 May 2016 11:47:24 -0300
-Content-Description: OpenPGP encrypted message
-To: leap@leap.se
-
-This is an OpenPGP/MIME encrypted message (RFC 2440 and 3156)
---Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B
-Content-Type: application/pgp-encrypted
-Content-Description: PGP/MIME Versions Identification
-
---Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B
-Content-Disposition: inline;
- filename=encrypted.asc
-Content-Type: application/octet-stream;
- name=encrypted.asc
-Content-Description: OpenPGP encrypted message
-
------BEGIN PGP MESSAGE-----
-Version: GnuPG v2
-
-hQIMAyj9aG/xtZOwAQ/9Gft0KmOpgzL6z4wmVlLm2aeAvHolXmxWb7N/ByL/dZ4n
-YZd/GPRj42X3BwUrDEL5aO3Mcp+rqq8ACh9hsZXiau0Q9cs1K7Gr55Y06qLrIjom
-2fLqwLFBxCL2sAX1dvClgStyfsRFk9Y/+5tX+IjWaD8dAoRdxCO8IbUDuYGnaKld
-bB9h0NMfKVddCAvuQvX1Zc1Nx0Yb3Hd+ocDD7i9BVgX1BBiGu4/ElS3d32TAVCFs
-Na3tjitWB2G472CYu1O6exY7h1F5V4FHfXH6iMRJSYnvV2Jr+oPZENzNdEEA5H/H
-fUbpWrpKzPafjho9S5rJBBM/tqtmBQFBIdgFVcBVb+bXO6DJ8SMTLiiGcVUvvm1b
-9N2VQIhsxtZ8DpcHHSqFVgT2Gt4UkSrEleSoReg36TzS1s8Uw0oU068PwTe3K0Gx
-2pLMdT9NA6X/t7movpXP6tih1l6P5z62dxFl6W12J9OcegISCt0Q7gex1gk/a8zM
-rzBJC3mVxRiFlvHPBgD6oUKarnTJPQx5f5dFXg8DXBWR1Eh/aFjPQIzhZBYpmOi8
-HqgjcAA+WhMQ7v5c0enJoJJS+8Xfai/MK2vTUGsfAT6HqHLw1HSIn6XQGEf4sQ/U
-NfLeFHHbe9rTk8QhyjrSl2vvek2H4EBQVLF08/FUrAfPELUttOFtysQfC3+M0+PS
-6QGyeIlUjKpBJG7HBd4ibuKMQ5vnA+ACsg/TySYeCO6P85xsN+Lmqlr8cAICn/hR
-ezFSzlibaIelRgfDEDJdjVyCsa7qBMjhRCvGYBdkyTzIRq53qwD9pkhrQ6nwWQrv
-bBzyLrl+NVR8CTEOwbeFLI6qf68kblojk3lwo3Qi3psmeMJdiaV9uevsHrgmEFTH
-lZ3rFECPWzmrkMSfVjWu5d8jJqMcqa4lnGzFQKaB76I8BzGhCWrnuvHPB9c9SVhI
-AnAwNw3gY5xgsbXMxZhnPgYeBSViPkQkgRCWl8Jz41eiAJ3Gtj8QSSFWGHpX+MgP
-ohBaPHz6Fnkhz7Lok97e2AcuRZrDVKV6i28r8mizI3B2Mah6ZV0Yuv0EYNtzBv/v
-yV3nu4DWuOOU0301CXBayxJGX0h07z1Ycv7jWD6LNiBXa1vahtbU4WSYNkF0OJaz
-nf8O3CZy5twMq5kQYoPacdNNLregAmWquvE1nxqWbtHFMjtXitP7czxzUTU/DE+C
-jr+irDoYEregEKg9xov91UCRPZgxL+TML71+tSYOMO3JG6lbGw77PQ8s2So7xore
-8+FeDFPaaJqh6uhF5LETRSx8x/haZiXLd+WtO7wF8S3+Vz7AJIFIe8MUadZrYwnH
-wfMAktQKbep3iHCeZ5jHYA461AOhnCca2y+GoyHZUDDFwS1pC1RN4lMkafSE1AgH
-cmEcjLYsw1gqT0+DfqrvjbXmMjGgkgnkMybJH7df5TKu36Q0Nqvcbc2XLFkalr5V
-Vk0SScqKYnKL+cJjabqA8rKkeAh22E2FBCpKPqxSS3te2bRb3XBX26bP0LshkJuy
-GPu6LKvwmUn0obPKCnLJvb9ImIGZToXu6Fb/Cd2c3DG1IK5PptQz4f7ZRW98huPO
-2w59Bswwt5q4lQqsMEzVRnIDH45MmnhEUeS4NaxqLTO7eJpMpb4VxT2u/Ac3XWKp
-o2RE6CbqTyJ+n8tY9OwBRMKzdVd9RFAMqMHTzWTAuU4BgW2vT2sHYZdAsX8sktBr
-5mo9P3MqvgdPNpg8+AOB03JlIv0dzrAFWCZxxLLGIIIz0eXsjghHzQ9QjGfr0xFH
-Z79AKDjsoRisWyWCnadS2oM9fdAg4T/h1STnfxc44o7N1+ym7u58ODICFi+Kg8IR
-JBHIp3CK02JLTLd/WFhUVyWgc6l8gn+oBK+r7Dw+FTWhqX2/ZHCO8qKK1ZK3NIMn
-MBcSVvHSnTPtppb+oND5nk38xazVVHnwxNHaIh7g3NxDB4hl5rBhrWsgTNuqDDRU
-w7ufvMYr1AOV+8e92cHCEKPM19nFKEgaBFECEptEObesGI3QZPAESlojzQ3cDeBa
-=tEyc
------END PGP MESSAGE-----
-
---Apple-Mail=_C01A1464-6C43-43BF-8F62-157335B7E25B-- \ No newline at end of file
diff --git a/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py b/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py
deleted file mode 100644
index 29422ecc..00000000
--- a/src/leap/bitmask/mail/incoming/tests/test_incoming_mail.py
+++ /dev/null
@@ -1,391 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_incoming_mail.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test case for leap.mail.incoming.service
-
-@authors: Ruben Pollan, <meskio@sindominio.net>
-
-@license: GPLv3, see included LICENSE file
-"""
-
-import json
-import os
-import tempfile
-import uuid
-
-from email.mime.application import MIMEApplication
-from email.mime.multipart import MIMEMultipart
-from email.parser import Parser
-from mock import Mock
-
-from twisted.internet import defer
-from twisted.python import log
-
-from leap.keymanager.errors import KeyAddressMismatch
-from leap.mail.adaptors import soledad_indexes as fields
-from leap.mail.adaptors.soledad import cleanup_deferred_locks
-from leap.mail.adaptors.soledad import SoledadMailAdaptor
-from leap.mail.mail import MessageCollection
-from leap.mail.mailbox_indexer import MailboxIndexer
-
-from leap.mail.incoming.service import IncomingMail
-from leap.mail.rfc3156 import MultipartEncrypted, PGPEncrypted
-from leap.mail.testing import KeyManagerWithSoledadTestCase
-from leap.mail.testing import ADDRESS, ADDRESS_2
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.common.crypto import (
- EncryptionSchemes,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
-)
-
-HERE = os.path.split(os.path.abspath(__file__))[0]
-
-# TODO: add some tests for encrypted, unencrypted, signed and unsgined messages
-
-
-class IncomingMailTestCase(KeyManagerWithSoledadTestCase):
- """
- Tests for the incoming mail parser
- """
- NICKSERVER = "http://domain"
- BODY = """
-Governments of the Industrial World, you weary giants of flesh and steel, I
-come from Cyberspace, the new home of Mind. On behalf of the future, I ask
-you of the past to leave us alone. You are not welcome among us. You have
-no sovereignty where we gather.
- """
- EMAIL = """from: Test from SomeDomain <%(from)s>
-to: %(to)s
-subject: independence of cyberspace
-
-%(body)s
- """ % {
- "from": ADDRESS_2,
- "to": ADDRESS,
- "body": BODY
- }
-
- def setUp(self):
- cleanup_deferred_locks()
- try:
- del self._soledad
- del self.km
- except AttributeError:
- pass
-
- # pytest handles correctly the setupEnv for the class,
- # but trial ignores it.
- if not getattr(self, 'tempdir', None):
- self.tempdir = tempfile.mkdtemp()
-
- def getCollection(_):
- adaptor = SoledadMailAdaptor()
- store = self._soledad
- adaptor.store = store
- mbox_indexer = MailboxIndexer(store)
- mbox_name = "INBOX"
- mbox_uuid = str(uuid.uuid4())
-
- def get_collection_from_mbox_wrapper(wrapper):
- wrapper.uuid = mbox_uuid
- return MessageCollection(
- adaptor, store,
- mbox_indexer=mbox_indexer, mbox_wrapper=wrapper)
-
- d = adaptor.initialize_store(store)
- d.addCallback(lambda _: mbox_indexer.create_table(mbox_uuid))
- d.addCallback(
- lambda _: adaptor.get_or_create_mbox(store, mbox_name))
- d.addCallback(get_collection_from_mbox_wrapper)
- return d
-
- def setUpFetcher(inbox_collection):
- self.fetcher = IncomingMail(
- self.km,
- self._soledad,
- inbox_collection,
- ADDRESS)
-
- # The messages don't exist on soledad will fail on deletion
- self.fetcher._delete_incoming_message = Mock(
- return_value=defer.succeed(None))
-
- d = KeyManagerWithSoledadTestCase.setUp(self)
- d.addCallback(getCollection)
- d.addCallback(setUpFetcher)
- d.addErrback(log.err)
- return d
-
- def tearDown(self):
- d = KeyManagerWithSoledadTestCase.tearDown(self)
- return d
-
- def testExtractOpenPGPHeader(self):
- """
- Test the OpenPGP header key extraction
- """
- KEYURL = "https://leap.se/key.txt"
- OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
-
- message = Parser().parsestr(self.EMAIL)
- message.add_header("OpenPGP", OpenPGP)
- self.fetcher._keymanager.fetch_key = Mock(
- return_value=defer.succeed(None))
-
- def fetch_key_called(ret):
- self.fetcher._keymanager.fetch_key.assert_called_once_with(
- ADDRESS_2, KEYURL)
-
- d = self._create_incoming_email(message.as_string())
- d.addCallback(
- lambda email:
- self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
- d.addCallback(lambda _: self.fetcher.fetch())
- d.addCallback(fetch_key_called)
- return d
-
- def testExtractOpenPGPHeaderInvalidUrl(self):
- """
- Test the OpenPGP header key extraction
- """
- KEYURL = "https://someotherdomain.com/key.txt"
- OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
-
- message = Parser().parsestr(self.EMAIL)
- message.add_header("OpenPGP", OpenPGP)
- self.fetcher._keymanager.fetch_key = Mock()
-
- def fetch_key_called(ret):
- self.assertFalse(self.fetcher._keymanager.fetch_key.called)
-
- d = self._create_incoming_email(message.as_string())
- d.addCallback(
- lambda email:
- self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
- d.addCallback(lambda _: self.fetcher.fetch())
- d.addCallback(fetch_key_called)
- return d
-
- def testExtractAttachedKey(self):
- KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
-
- message = MIMEMultipart()
- message.add_header("from", ADDRESS_2)
- key = MIMEApplication("", "pgp-keys")
- key.set_payload(KEY)
- message.attach(key)
- self.fetcher._keymanager.put_raw_key = Mock(
- return_value=defer.succeed(None))
-
- def put_raw_key_called(_):
- self.fetcher._keymanager.put_raw_key.assert_called_once_with(
- KEY, address=ADDRESS_2)
-
- d = self._do_fetch(message.as_string())
- d.addCallback(put_raw_key_called)
- return d
-
- def testExtractInvalidAttachedKey(self):
- KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
-
- message = MIMEMultipart()
- message.add_header("from", ADDRESS_2)
- key = MIMEApplication("", "pgp-keys")
- key.set_payload(KEY)
- message.attach(key)
- self.fetcher._keymanager.put_raw_key = Mock(
- return_value=defer.fail(KeyAddressMismatch()))
-
- def put_raw_key_called(_):
- self.fetcher._keymanager.put_raw_key.assert_called_once_with(
- KEY, address=ADDRESS_2)
-
- d = self._do_fetch(message.as_string())
- d.addCallback(put_raw_key_called)
- d.addErrback(log.err)
- return d
-
- def testExtractAttachedKeyAndNotOpenPGPHeader(self):
- KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
- KEYURL = "https://leap.se/key.txt"
- OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
-
- message = MIMEMultipart()
- message.add_header("from", ADDRESS_2)
- message.add_header("OpenPGP", OpenPGP)
- key = MIMEApplication("", "pgp-keys")
- key.set_payload(KEY)
- message.attach(key)
-
- self.fetcher._keymanager.put_raw_key = Mock(
- return_value=defer.succeed(None))
- self.fetcher._keymanager.fetch_key = Mock()
-
- def put_raw_key_called(_):
- self.fetcher._keymanager.put_raw_key.assert_called_once_with(
- KEY, address=ADDRESS_2)
- self.assertFalse(self.fetcher._keymanager.fetch_key.called)
-
- d = self._do_fetch(message.as_string())
- d.addCallback(put_raw_key_called)
- return d
-
- def testExtractOpenPGPHeaderIfInvalidAttachedKey(self):
- KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..."
- KEYURL = "https://leap.se/key.txt"
- OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,)
-
- message = MIMEMultipart()
- message.add_header("from", ADDRESS_2)
- message.add_header("OpenPGP", OpenPGP)
- key = MIMEApplication("", "pgp-keys")
- key.set_payload(KEY)
- message.attach(key)
-
- self.fetcher._keymanager.put_raw_key = Mock(
- return_value=defer.fail(KeyAddressMismatch()))
- self.fetcher._keymanager.fetch_key = Mock()
-
- def put_raw_key_called(_):
- self.fetcher._keymanager.put_raw_key.assert_called_once_with(
- KEY, address=ADDRESS_2)
- self.fetcher._keymanager.fetch_key.assert_called_once_with(
- ADDRESS_2, KEYURL)
-
- d = self._do_fetch(message.as_string())
- d.addCallback(put_raw_key_called)
- return d
-
- def testAddDecryptedHeader(self):
- class DummyMsg():
-
- def __init__(self):
- self.headers = {}
-
- def add_header(self, k, v):
- self.headers[k] = v
-
- msg = DummyMsg()
- self.fetcher._add_decrypted_header(msg)
-
- self.assertEquals(msg.headers['X-Leap-Encryption'], 'decrypted')
-
- def testDecryptEmail(self):
-
- self.fetcher._decryption_error = Mock()
- self.fetcher._add_decrypted_header = Mock()
-
- def create_encrypted_message(encstr):
- message = Parser().parsestr(self.EMAIL)
- newmsg = MultipartEncrypted('application/pgp-encrypted')
- for hkey, hval in message.items():
- newmsg.add_header(hkey, hval)
-
- encmsg = MIMEApplication(
- encstr, _subtype='octet-stream', _encoder=lambda x: x)
- encmsg.add_header('content-disposition', 'attachment',
- filename='msg.asc')
- # create meta message
- metamsg = PGPEncrypted()
- metamsg.add_header('Content-Disposition', 'attachment')
- # attach pgp message parts to new message
- newmsg.attach(metamsg)
- newmsg.attach(encmsg)
- return newmsg
-
- def decryption_error_not_called(_):
- self.assertFalse(self.fetcher._decryption_error.called,
- "There was some errors with decryption")
-
- def add_decrypted_header_called(_):
- self.assertTrue(self.fetcher._add_decrypted_header.called,
- "There was some errors with decryption")
-
- d = self.km.encrypt(self.EMAIL, ADDRESS, sign=ADDRESS_2)
- d.addCallback(create_encrypted_message)
- d.addCallback(
- lambda message:
- self._do_fetch(message.as_string()))
- d.addCallback(decryption_error_not_called)
- d.addCallback(add_decrypted_header_called)
- return d
-
- def testValidateSignatureFromEncryptedEmailFromAppleMail(self):
- enc_signed_file = os.path.join(
- HERE, 'rfc822.multi-encrypt-signed.message')
- self.fetcher._add_verified_signature_header = Mock()
-
- def add_verified_signature_header_called(_):
- self.assertTrue(self.fetcher._add_verified_signature_header.called,
- "There was some errors verifying signature")
-
- with open(enc_signed_file) as f:
- enc_signed_raw = f.read()
-
- d = self._do_fetch(enc_signed_raw)
- d.addCallback(add_verified_signature_header_called)
- return d
-
- def testListener(self):
- self.called = False
-
- def listener(uid):
- self.called = True
-
- def listener_called(_):
- self.assertTrue(self.called)
-
- self.fetcher.add_listener(listener)
- d = self._do_fetch(self.EMAIL)
- d.addCallback(listener_called)
- return d
-
- def _do_fetch(self, message):
- d = self._create_incoming_email(message)
- d.addCallback(
- lambda email:
- self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email]))
- d.addCallback(lambda _: self.fetcher.fetch())
- return d
-
- def _create_incoming_email(self, email_str):
- email = SoledadDocument()
- data = json.dumps(
- {"incoming": True, "content": email_str},
- ensure_ascii=False)
-
- def set_email_content(encr_data):
- email.content = {
- fields.INCOMING_KEY: True,
- fields.ERROR_DECRYPTING_KEY: False,
- ENC_SCHEME_KEY: EncryptionSchemes.PUBKEY,
- ENC_JSON_KEY: encr_data
- }
- return email
- d = self.km.encrypt(data, ADDRESS, fetch_remote=False)
- d.addCallback(set_email_content)
- return d
-
- def _mock_soledad_get_from_index(self, index_name, value):
- get_from_index = self._soledad.get_from_index
-
- def soledad_mock(idx_name, *key_values):
- if index_name == idx_name:
- return defer.succeed(value)
- return get_from_index(idx_name, *key_values)
- self.fetcher._soledad.get_from_index = Mock(side_effect=soledad_mock)