From ea82f75f5465de47c4a838fbd1dfe8b2030fd842 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Tue, 9 Dec 2014 12:18:40 -0600 Subject: New keymanager async API --- src/leap/mail/tests/__init__.py | 88 +++------------ src/leap/mail/tests/test_service.py | 216 ++++++++++++++++++------------------ 2 files changed, 127 insertions(+), 177 deletions(-) (limited to 'src/leap/mail/tests') diff --git a/src/leap/mail/tests/__init__.py b/src/leap/mail/tests/__init__.py index 10bc5fe..b35107d 100644 --- a/src/leap/mail/tests/__init__.py +++ b/src/leap/mail/tests/__init__.py @@ -19,16 +19,14 @@ Base classes and keys for leap.mail tests. """ import os import distutils.spawn -import shutil -import tempfile from mock import Mock +from twisted.internet.defer import gatherResults +from twisted.trial import unittest from leap.soledad.client import Soledad -from leap.keymanager import ( - KeyManager, - openpgp, -) +from leap.keymanager import KeyManager +from leap.keymanager.openpgp import OpenPGPKey from leap.common.testing.basetest import BaseLeapTest @@ -39,22 +37,12 @@ def _find_gpg(): return os.path.realpath(gpg_path) if gpg_path is not None else "/usr/bin/gpg" -class TestCaseWithKeyManager(BaseLeapTest): +class TestCaseWithKeyManager(unittest.TestCase, BaseLeapTest): GPG_BINARY_PATH = _find_gpg() def setUp(self): - # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated - # in Twisted: http://twistedmatrix.com/trac/ticket/1870 - self.old_path = os.environ['PATH'] - self.old_home = os.environ['HOME'] - self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") - self.home = self.tempdir - bin_tdir = os.path.join( - self.tempdir, - 'bin') - os.environ["PATH"] = bin_tdir - os.environ["HOME"] = self.tempdir + self.setUpEnv() # setup our own stuff address = 'leap@leap.se' # user's address in the form user@provider @@ -65,36 +53,7 @@ class TestCaseWithKeyManager(BaseLeapTest): server_url = 'http://provider/' cert_file = '' - self._soledad = self._soledad_instance( - uuid, passphrase, secrets_path, local_db_path, server_url, - cert_file) - self._km = self._keymanager_instance(address) - - def _soledad_instance(self, uuid, passphrase, secrets_path, local_db_path, - server_url, cert_file): - """ - Return a Soledad instance for tests. - """ - # mock key fetching and storing so Soledad doesn't fail when trying to - # reach the server. - Soledad._fetch_keys_from_shared_db = Mock(return_value=None) - Soledad._assert_keys_in_shared_db = Mock(return_value=None) - - # instantiate soledad - def _put_doc_side_effect(doc): - self._doc_put = doc - - class MockSharedDB(object): - - get_doc = Mock(return_value=None) - put_doc = Mock(side_effect=_put_doc_side_effect) - lock = Mock(return_value=('atoken', 300)) - unlock = Mock(return_value=True) - - def __call__(self): - return self - - soledad = Soledad( + self._soledad = Soledad( uuid, passphrase, secrets_path=secrets_path, @@ -103,13 +62,11 @@ class TestCaseWithKeyManager(BaseLeapTest): cert_file=cert_file, syncable=False ) + return self._setup_keymanager(address) - soledad._shared_db = MockSharedDB() - return soledad - - def _keymanager_instance(self, address): + def _setup_keymanager(self, address): """ - Return a Key Manager instance for tests. + Set up Key Manager and return a Deferred that will be fired when done. """ self._config = { 'host': 'https://provider/', @@ -132,26 +89,17 @@ class TestCaseWithKeyManager(BaseLeapTest): pass nickserver_url = '' # the url of the nickserver - km = KeyManager(address, nickserver_url, self._soledad, - ca_cert_path='', gpgbinary=self.GPG_BINARY_PATH) - km._fetcher.put = Mock() - km._fetcher.get = Mock(return_value=Response()) - - # insert test keys in key manager. - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=self.GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) - pgp.put_ascii_key(PRIVATE_KEY_2) + self._km = KeyManager(address, nickserver_url, self._soledad, + ca_cert_path='', gpgbinary=self.GPG_BINARY_PATH) + self._km._fetcher.put = Mock() + self._km._fetcher.get = Mock(return_value=Response()) - return km + d1 = self._km.put_raw_key(PRIVATE_KEY, OpenPGPKey, ADDRESS) + d2 = self._km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2) + return gatherResults([d1, d2]) def tearDown(self): - # mimic LeapBaseTest.tearDownClass behaviour - os.environ["PATH"] = self.old_path - os.environ["HOME"] = self.old_home - # safety check - assert 'leap_tests-' in self.tempdir - shutil.rmtree(self.tempdir) + self.tearDownEnv() # Key material for testing diff --git a/src/leap/mail/tests/test_service.py b/src/leap/mail/tests/test_service.py index f0a807d..43f354d 100644 --- a/src/leap/mail/tests/test_service.py +++ b/src/leap/mail/tests/test_service.py @@ -22,7 +22,8 @@ SMTP gateway tests. import re from datetime import datetime -from twisted.mail.smtp import User, Address +from twisted.internet.defer import fail +from twisted.mail.smtp import User from mock import Mock @@ -33,7 +34,7 @@ from leap.mail.tests import ( ADDRESS, ADDRESS_2, ) -from leap.keymanager import openpgp +from leap.keymanager import openpgp, errors class TestOutgoingMail(TestCaseWithKeyManager): @@ -54,71 +55,126 @@ class TestOutgoingMail(TestCaseWithKeyManager): 'QUIT'] def setUp(self): - TestCaseWithKeyManager.setUp(self) self.lines = [line for line in self.EMAIL_DATA[4:12]] self.lines.append('') # add a trailing newline self.raw = '\r\n'.join(self.lines) + self.expected_body = ('\r\n'.join(self.EMAIL_DATA[9:12]) + + "\r\n\r\n--\r\nI prefer encrypted email - " + "https://leap.se/key/anotheruser\r\n") self.fromAddr = ADDRESS_2 - self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'], - self._config['host'], self._config['port']) - self.proto = SMTPFactory( - u'anotheruser@leap.se', - self._km, - self._config['encrypted_only'], - self.outgoing_mail).buildProtocol(('127.0.0.1', 0)) - self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS) - - def test_openpgp_encrypt_decrypt(self): - "Test if openpgp can encrypt and decrypt." - text = "simple raw text" - pubkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=False) - encrypted = self._km.encrypt(text, pubkey) - self.assertNotEqual( - text, encrypted, "Ciphertext is equal to plaintext.") - privkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=True) - decrypted = self._km.decrypt(encrypted, privkey) - self.assertEqual(text, decrypted, - "Decrypted text differs from plaintext.") + + def init_outgoing_and_proto(_): + self.outgoing_mail = OutgoingMail( + self.fromAddr, self._km, self._config['cert'], + self._config['key'], self._config['host'], + self._config['port']) + self.proto = SMTPFactory( + u'anotheruser@leap.se', + self._km, + self._config['encrypted_only'], + self.outgoing_mail).buildProtocol(('127.0.0.1', 0)) + self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS) + + d = TestCaseWithKeyManager.setUp(self) + d.addCallback(init_outgoing_and_proto) + return d def test_message_encrypt(self): """ Test if message gets encrypted to destination email. """ - - message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) - - # assert structure of encrypted message - self.assertTrue('Content-Type' in message) - self.assertEqual('multipart/encrypted', message.get_content_type()) - self.assertEqual('application/pgp-encrypted', - message.get_param('protocol')) - self.assertEqual(2, len(message.get_payload())) - self.assertEqual('application/pgp-encrypted', - message.get_payload(0).get_content_type()) - self.assertEqual('application/octet-stream', - message.get_payload(1).get_content_type()) - privkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=True) - decrypted = self._km.decrypt( - message.get_payload(1).get_payload(), privkey) - - expected = '\n' + '\r\n'.join( - self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n' - self.assertEqual( - expected, - decrypted, - 'Decrypted text differs from plaintext.') + def check_decryption(res): + decrypted, _ = res + self.assertEqual( + '\n' + self.expected_body, + decrypted, + 'Decrypted text differs from plaintext.') + + d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) + d.addCallback(self._assert_encrypted) + d.addCallback(lambda message: self._km.decrypt( + message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey)) + d.addCallback(check_decryption) + return d def test_message_encrypt_sign(self): """ Test if message gets encrypted to destination email and signed with sender key. - """ - message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) + '""" + def check_decryption_and_verify(res): + decrypted, signkey = res + self.assertEqual( + '\n' + self.expected_body, + decrypted, + 'Decrypted text differs from plaintext.') + self.assertTrue(ADDRESS_2 in signkey.address, + "Verification failed") + + d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest) + d.addCallback(self._assert_encrypted) + d.addCallback(lambda message: self._km.decrypt( + message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey, + verify=ADDRESS_2)) + d.addCallback(check_decryption_and_verify) + return d - # assert structure of encrypted message + def test_message_sign(self): + """ + Test if message is signed with sender key. + """ + # mock the key fetching + self._km._fetch_keys_from_server = Mock( + return_value=fail(errors.KeyNotFound())) + recipient = User('ihavenopubkey@nonleap.se', + 'gateway.leap.se', self.proto, ADDRESS) + self.outgoing_mail = OutgoingMail( + self.fromAddr, self._km, self._config['cert'], self._config['key'], + self._config['host'], self._config['port']) + + def check_signed(res): + message, _ = res + self.assertTrue('Content-Type' in message) + self.assertEqual('multipart/signed', message.get_content_type()) + self.assertEqual('application/pgp-signature', + message.get_param('protocol')) + self.assertEqual('pgp-sha512', message.get_param('micalg')) + # assert content of message + self.assertEqual(self.expected_body, + message.get_payload(0).get_payload(decode=True)) + # assert content of signature + self.assertTrue( + message.get_payload(1).get_payload().startswith( + '-----BEGIN PGP SIGNATURE-----\n'), + 'Message does not start with signature header.') + self.assertTrue( + message.get_payload(1).get_payload().endswith( + '-----END PGP SIGNATURE-----\n'), + 'Message does not end with signature footer.') + return message + + def verify(message): + # replace EOL before verifying (according to rfc3156) + signed_text = re.sub('\r?\n', '\r\n', + message.get_payload(0).as_string()) + + def assert_verify(key): + self.assertTrue(ADDRESS_2 in key.address, + 'Signature could not be verified.') + + d = self._km.verify( + signed_text, ADDRESS_2, openpgp.OpenPGPKey, + detached_sig=message.get_payload(1).get_payload()) + d.addCallback(assert_verify) + return d + + d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient) + d.addCallback(check_signed) + d.addCallback(verify) + return d + + def _assert_encrypted(self, res): + message, _ = res self.assertTrue('Content-Type' in message) self.assertEqual('multipart/encrypted', message.get_content_type()) self.assertEqual('application/pgp-encrypted', @@ -128,58 +184,4 @@ class TestOutgoingMail(TestCaseWithKeyManager): message.get_payload(0).get_content_type()) self.assertEqual('application/octet-stream', message.get_payload(1).get_content_type()) - # decrypt and verify - privkey = self._km.get_key( - ADDRESS, openpgp.OpenPGPKey, private=True) - pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) - decrypted = self._km.decrypt( - message.get_payload(1).get_payload(), privkey, verify=pubkey) - self.assertEqual( - '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + - 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', - decrypted, - 'Decrypted text differs from plaintext.') - - def test_message_sign(self): - """ - Test if message is signed with sender key. - """ - # mock the key fetching - self._km.fetch_keys_from_server = Mock(return_value=[]) - recipient = User('ihavenopubkey@nonleap.se', - 'gateway.leap.se', self.proto, ADDRESS) - self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'], - self._config['host'], self._config['port']) - - message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient) - - # assert structure of signed message - self.assertTrue('Content-Type' in message) - self.assertEqual('multipart/signed', message.get_content_type()) - self.assertEqual('application/pgp-signature', - message.get_param('protocol')) - self.assertEqual('pgp-sha512', message.get_param('micalg')) - # assert content of message - self.assertEqual( - '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' + - 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n', - message.get_payload(0).get_payload(decode=True)) - # assert content of signature - self.assertTrue( - message.get_payload(1).get_payload().startswith( - '-----BEGIN PGP SIGNATURE-----\n'), - 'Message does not start with signature header.') - self.assertTrue( - message.get_payload(1).get_payload().endswith( - '-----END PGP SIGNATURE-----\n'), - 'Message does not end with signature footer.') - # assert signature is valid - pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey) - # replace EOL before verifying (according to rfc3156) - signed_text = re.sub('\r?\n', '\r\n', - message.get_payload(0).as_string()) - self.assertTrue( - self._km.verify(signed_text, - pubkey, - detached_sig=message.get_payload(1).get_payload()), - 'Signature could not be verified.') + return message -- cgit v1.2.3