summaryrefslogtreecommitdiff
path: root/src/leap/mail/tests
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2014-12-09 12:18:40 -0600
committerKali Kaneko <kali@leap.se>2015-02-11 14:05:42 -0400
commitea82f75f5465de47c4a838fbd1dfe8b2030fd842 (patch)
tree03e50f397188e30352b9bcf646940f6619f3e2c1 /src/leap/mail/tests
parent80ed7b9b85686b5d10f359114ca703dba4a5820b (diff)
New keymanager async API
Diffstat (limited to 'src/leap/mail/tests')
-rw-r--r--src/leap/mail/tests/__init__.py88
-rw-r--r--src/leap/mail/tests/test_service.py216
2 files changed, 127 insertions, 177 deletions
diff --git a/src/leap/mail/tests/__init__.py b/src/leap/mail/tests/__init__.py
index 10bc5fe..b35107d 100644
--- a/src/leap/mail/tests/__init__.py
+++ b/src/leap/mail/tests/__init__.py
@@ -19,16 +19,14 @@ Base classes and keys for leap.mail tests.
"""
import os
import distutils.spawn
-import shutil
-import tempfile
from mock import Mock
+from twisted.internet.defer import gatherResults
+from twisted.trial import unittest
from leap.soledad.client import Soledad
-from leap.keymanager import (
- KeyManager,
- openpgp,
-)
+from leap.keymanager import KeyManager
+from leap.keymanager.openpgp import OpenPGPKey
from leap.common.testing.basetest import BaseLeapTest
@@ -39,22 +37,12 @@ def _find_gpg():
return os.path.realpath(gpg_path) if gpg_path is not None else "/usr/bin/gpg"
-class TestCaseWithKeyManager(BaseLeapTest):
+class TestCaseWithKeyManager(unittest.TestCase, BaseLeapTest):
GPG_BINARY_PATH = _find_gpg()
def setUp(self):
- # mimic BaseLeapTest.setUpClass behaviour, because this is deprecated
- # in Twisted: http://twistedmatrix.com/trac/ticket/1870
- self.old_path = os.environ['PATH']
- self.old_home = os.environ['HOME']
- self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
- self.home = self.tempdir
- bin_tdir = os.path.join(
- self.tempdir,
- 'bin')
- os.environ["PATH"] = bin_tdir
- os.environ["HOME"] = self.tempdir
+ self.setUpEnv()
# setup our own stuff
address = 'leap@leap.se' # user's address in the form user@provider
@@ -65,36 +53,7 @@ class TestCaseWithKeyManager(BaseLeapTest):
server_url = 'http://provider/'
cert_file = ''
- self._soledad = self._soledad_instance(
- uuid, passphrase, secrets_path, local_db_path, server_url,
- cert_file)
- self._km = self._keymanager_instance(address)
-
- def _soledad_instance(self, uuid, passphrase, secrets_path, local_db_path,
- server_url, cert_file):
- """
- Return a Soledad instance for tests.
- """
- # mock key fetching and storing so Soledad doesn't fail when trying to
- # reach the server.
- Soledad._fetch_keys_from_shared_db = Mock(return_value=None)
- Soledad._assert_keys_in_shared_db = Mock(return_value=None)
-
- # instantiate soledad
- def _put_doc_side_effect(doc):
- self._doc_put = doc
-
- class MockSharedDB(object):
-
- get_doc = Mock(return_value=None)
- put_doc = Mock(side_effect=_put_doc_side_effect)
- lock = Mock(return_value=('atoken', 300))
- unlock = Mock(return_value=True)
-
- def __call__(self):
- return self
-
- soledad = Soledad(
+ self._soledad = Soledad(
uuid,
passphrase,
secrets_path=secrets_path,
@@ -103,13 +62,11 @@ class TestCaseWithKeyManager(BaseLeapTest):
cert_file=cert_file,
syncable=False
)
+ return self._setup_keymanager(address)
- soledad._shared_db = MockSharedDB()
- return soledad
-
- def _keymanager_instance(self, address):
+ def _setup_keymanager(self, address):
"""
- Return a Key Manager instance for tests.
+ Set up Key Manager and return a Deferred that will be fired when done.
"""
self._config = {
'host': 'https://provider/',
@@ -132,26 +89,17 @@ class TestCaseWithKeyManager(BaseLeapTest):
pass
nickserver_url = '' # the url of the nickserver
- km = KeyManager(address, nickserver_url, self._soledad,
- ca_cert_path='', gpgbinary=self.GPG_BINARY_PATH)
- km._fetcher.put = Mock()
- km._fetcher.get = Mock(return_value=Response())
-
- # insert test keys in key manager.
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=self.GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- pgp.put_ascii_key(PRIVATE_KEY_2)
+ self._km = KeyManager(address, nickserver_url, self._soledad,
+ ca_cert_path='', gpgbinary=self.GPG_BINARY_PATH)
+ self._km._fetcher.put = Mock()
+ self._km._fetcher.get = Mock(return_value=Response())
- return km
+ d1 = self._km.put_raw_key(PRIVATE_KEY, OpenPGPKey, ADDRESS)
+ d2 = self._km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2)
+ return gatherResults([d1, d2])
def tearDown(self):
- # mimic LeapBaseTest.tearDownClass behaviour
- os.environ["PATH"] = self.old_path
- os.environ["HOME"] = self.old_home
- # safety check
- assert 'leap_tests-' in self.tempdir
- shutil.rmtree(self.tempdir)
+ self.tearDownEnv()
# Key material for testing
diff --git a/src/leap/mail/tests/test_service.py b/src/leap/mail/tests/test_service.py
index f0a807d..43f354d 100644
--- a/src/leap/mail/tests/test_service.py
+++ b/src/leap/mail/tests/test_service.py
@@ -22,7 +22,8 @@ SMTP gateway tests.
import re
from datetime import datetime
-from twisted.mail.smtp import User, Address
+from twisted.internet.defer import fail
+from twisted.mail.smtp import User
from mock import Mock
@@ -33,7 +34,7 @@ from leap.mail.tests import (
ADDRESS,
ADDRESS_2,
)
-from leap.keymanager import openpgp
+from leap.keymanager import openpgp, errors
class TestOutgoingMail(TestCaseWithKeyManager):
@@ -54,71 +55,126 @@ class TestOutgoingMail(TestCaseWithKeyManager):
'QUIT']
def setUp(self):
- TestCaseWithKeyManager.setUp(self)
self.lines = [line for line in self.EMAIL_DATA[4:12]]
self.lines.append('') # add a trailing newline
self.raw = '\r\n'.join(self.lines)
+ self.expected_body = ('\r\n'.join(self.EMAIL_DATA[9:12]) +
+ "\r\n\r\n--\r\nI prefer encrypted email - "
+ "https://leap.se/key/anotheruser\r\n")
self.fromAddr = ADDRESS_2
- self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'],
- self._config['host'], self._config['port'])
- self.proto = SMTPFactory(
- u'anotheruser@leap.se',
- self._km,
- self._config['encrypted_only'],
- self.outgoing_mail).buildProtocol(('127.0.0.1', 0))
- self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS)
-
- def test_openpgp_encrypt_decrypt(self):
- "Test if openpgp can encrypt and decrypt."
- text = "simple raw text"
- pubkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=False)
- encrypted = self._km.encrypt(text, pubkey)
- self.assertNotEqual(
- text, encrypted, "Ciphertext is equal to plaintext.")
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(encrypted, privkey)
- self.assertEqual(text, decrypted,
- "Decrypted text differs from plaintext.")
+
+ def init_outgoing_and_proto(_):
+ self.outgoing_mail = OutgoingMail(
+ self.fromAddr, self._km, self._config['cert'],
+ self._config['key'], self._config['host'],
+ self._config['port'])
+ self.proto = SMTPFactory(
+ u'anotheruser@leap.se',
+ self._km,
+ self._config['encrypted_only'],
+ self.outgoing_mail).buildProtocol(('127.0.0.1', 0))
+ self.dest = User(ADDRESS, 'gateway.leap.se', self.proto, ADDRESS)
+
+ d = TestCaseWithKeyManager.setUp(self)
+ d.addCallback(init_outgoing_and_proto)
+ return d
def test_message_encrypt(self):
"""
Test if message gets encrypted to destination email.
"""
-
- message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
-
- # assert structure of encrypted message
- self.assertTrue('Content-Type' in message)
- self.assertEqual('multipart/encrypted', message.get_content_type())
- self.assertEqual('application/pgp-encrypted',
- message.get_param('protocol'))
- self.assertEqual(2, len(message.get_payload()))
- self.assertEqual('application/pgp-encrypted',
- message.get_payload(0).get_content_type())
- self.assertEqual('application/octet-stream',
- message.get_payload(1).get_content_type())
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- decrypted = self._km.decrypt(
- message.get_payload(1).get_payload(), privkey)
-
- expected = '\n' + '\r\n'.join(
- self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' + 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n'
- self.assertEqual(
- expected,
- decrypted,
- 'Decrypted text differs from plaintext.')
+ def check_decryption(res):
+ decrypted, _ = res
+ self.assertEqual(
+ '\n' + self.expected_body,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(lambda message: self._km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey))
+ d.addCallback(check_decryption)
+ return d
def test_message_encrypt_sign(self):
"""
Test if message gets encrypted to destination email and signed with
sender key.
- """
- message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ '"""
+ def check_decryption_and_verify(res):
+ decrypted, signkey = res
+ self.assertEqual(
+ '\n' + self.expected_body,
+ decrypted,
+ 'Decrypted text differs from plaintext.')
+ self.assertTrue(ADDRESS_2 in signkey.address,
+ "Verification failed")
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, self.dest)
+ d.addCallback(self._assert_encrypted)
+ d.addCallback(lambda message: self._km.decrypt(
+ message.get_payload(1).get_payload(), ADDRESS, openpgp.OpenPGPKey,
+ verify=ADDRESS_2))
+ d.addCallback(check_decryption_and_verify)
+ return d
- # assert structure of encrypted message
+ def test_message_sign(self):
+ """
+ Test if message is signed with sender key.
+ """
+ # mock the key fetching
+ self._km._fetch_keys_from_server = Mock(
+ return_value=fail(errors.KeyNotFound()))
+ recipient = User('ihavenopubkey@nonleap.se',
+ 'gateway.leap.se', self.proto, ADDRESS)
+ self.outgoing_mail = OutgoingMail(
+ self.fromAddr, self._km, self._config['cert'], self._config['key'],
+ self._config['host'], self._config['port'])
+
+ def check_signed(res):
+ message, _ = res
+ self.assertTrue('Content-Type' in message)
+ self.assertEqual('multipart/signed', message.get_content_type())
+ self.assertEqual('application/pgp-signature',
+ message.get_param('protocol'))
+ self.assertEqual('pgp-sha512', message.get_param('micalg'))
+ # assert content of message
+ self.assertEqual(self.expected_body,
+ message.get_payload(0).get_payload(decode=True))
+ # assert content of signature
+ self.assertTrue(
+ message.get_payload(1).get_payload().startswith(
+ '-----BEGIN PGP SIGNATURE-----\n'),
+ 'Message does not start with signature header.')
+ self.assertTrue(
+ message.get_payload(1).get_payload().endswith(
+ '-----END PGP SIGNATURE-----\n'),
+ 'Message does not end with signature footer.')
+ return message
+
+ def verify(message):
+ # replace EOL before verifying (according to rfc3156)
+ signed_text = re.sub('\r?\n', '\r\n',
+ message.get_payload(0).as_string())
+
+ def assert_verify(key):
+ self.assertTrue(ADDRESS_2 in key.address,
+ 'Signature could not be verified.')
+
+ d = self._km.verify(
+ signed_text, ADDRESS_2, openpgp.OpenPGPKey,
+ detached_sig=message.get_payload(1).get_payload())
+ d.addCallback(assert_verify)
+ return d
+
+ d = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
+ d.addCallback(check_signed)
+ d.addCallback(verify)
+ return d
+
+ def _assert_encrypted(self, res):
+ message, _ = res
self.assertTrue('Content-Type' in message)
self.assertEqual('multipart/encrypted', message.get_content_type())
self.assertEqual('application/pgp-encrypted',
@@ -128,58 +184,4 @@ class TestOutgoingMail(TestCaseWithKeyManager):
message.get_payload(0).get_content_type())
self.assertEqual('application/octet-stream',
message.get_payload(1).get_content_type())
- # decrypt and verify
- privkey = self._km.get_key(
- ADDRESS, openpgp.OpenPGPKey, private=True)
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- decrypted = self._km.decrypt(
- message.get_payload(1).get_payload(), privkey, verify=pubkey)
- self.assertEqual(
- '\n' + '\r\n'.join(self.EMAIL_DATA[9:12]) + '\r\n\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- decrypted,
- 'Decrypted text differs from plaintext.')
-
- def test_message_sign(self):
- """
- Test if message is signed with sender key.
- """
- # mock the key fetching
- self._km.fetch_keys_from_server = Mock(return_value=[])
- recipient = User('ihavenopubkey@nonleap.se',
- 'gateway.leap.se', self.proto, ADDRESS)
- self.outgoing_mail = OutgoingMail(self.fromAddr, self._km, self._config['cert'], self._config['key'],
- self._config['host'], self._config['port'])
-
- message, _ = self.outgoing_mail._maybe_encrypt_and_sign(self.raw, recipient)
-
- # assert structure of signed message
- self.assertTrue('Content-Type' in message)
- self.assertEqual('multipart/signed', message.get_content_type())
- self.assertEqual('application/pgp-signature',
- message.get_param('protocol'))
- self.assertEqual('pgp-sha512', message.get_param('micalg'))
- # assert content of message
- self.assertEqual(
- '\r\n'.join(self.EMAIL_DATA[9:13]) + '\r\n--\r\n' +
- 'I prefer encrypted email - https://leap.se/key/anotheruser\r\n',
- message.get_payload(0).get_payload(decode=True))
- # assert content of signature
- self.assertTrue(
- message.get_payload(1).get_payload().startswith(
- '-----BEGIN PGP SIGNATURE-----\n'),
- 'Message does not start with signature header.')
- self.assertTrue(
- message.get_payload(1).get_payload().endswith(
- '-----END PGP SIGNATURE-----\n'),
- 'Message does not end with signature footer.')
- # assert signature is valid
- pubkey = self._km.get_key(ADDRESS_2, openpgp.OpenPGPKey)
- # replace EOL before verifying (according to rfc3156)
- signed_text = re.sub('\r?\n', '\r\n',
- message.get_payload(0).as_string())
- self.assertTrue(
- self._km.verify(signed_text,
- pubkey,
- detached_sig=message.get_payload(1).get_payload()),
- 'Signature could not be verified.')
+ return message