diff options
Diffstat (limited to 'src/leap/keymanager/tests/test_keymanager.py')
-rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 142 |
1 files changed, 127 insertions, 15 deletions
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 55f892e..856d6da 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -20,9 +20,11 @@ Tests for the Key Manager. """ - +from os import path from datetime import datetime -from mock import Mock +import tempfile +from leap.common import ca_bundle +from mock import Mock, MagicMock, patch from twisted.internet.defer import inlineCallbacks from twisted.trial import unittest @@ -36,10 +38,7 @@ from leap.keymanager.keys import ( is_address, build_key_from_dict, ) -from leap.keymanager.validation import ( - ValidationLevel, - toValidationLevel -) +from leap.keymanager.validation import ValidationLevels from leap.keymanager.tests import ( KeyManagerWithSoledadTestCase, ADDRESS, @@ -53,6 +52,7 @@ from leap.keymanager.tests import ( NICKSERVER_URI = "http://leap.se/" +REMOTE_KEY_URL = "http://site.domain/key" class KeyManagerUtilTestCase(unittest.TestCase): @@ -82,7 +82,7 @@ class KeyManagerUtilTestCase(unittest.TestCase): 'expiry_date': 0, 'last_audited_at': 0, 'refreshed_at': 1311239602, - 'validation': ValidationLevel.Weak_Chain.name, + 'validation': str(ValidationLevels.Weak_Chain), 'encr_used': False, 'sign_used': True, } @@ -115,7 +115,7 @@ class KeyManagerUtilTestCase(unittest.TestCase): datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at, 'Wrong data in key.') self.assertEqual( - toValidationLevel(kdict['validation']), key.validation, + ValidationLevels.get(kdict['validation']), key.validation, 'Wrong data in key.') self.assertEqual( kdict['encr_used'], key.encr_used, @@ -227,7 +227,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.address) - self.assertEqual(key.validation, ValidationLevel.Provider_Trust) + self.assertEqual(key.validation, ValidationLevels.Provider_Trust) @inlineCallbacks def test_get_key_fetches_other_domain(self): @@ -239,7 +239,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS_OTHER in key.address) - self.assertEqual(key.validation, ValidationLevel.Weak_Chain) + self.assertEqual(key.validation, ValidationLevels.Weak_Chain) def _fetch_key(self, km, address, key): """ @@ -290,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = PUBLIC_KEY km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) key = yield km.get_key(ADDRESS, OpenPGPKey) @@ -307,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = "" km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) return self.assertFailure(d, KeyNotFound) @@ -323,10 +321,125 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = PUBLIC_KEY km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey) return self.assertFailure(d, KeyAddressMismatch) + def _mock_get_response(self, km, body): + class Response(object): + ok = True + content = body + + mock = MagicMock(return_value=Response()) + km._fetcher.get = mock + + return mock + + @inlineCallbacks + def test_fetch_key_uses_ca_bundle_if_none_specified(self): + ca_cert_path = None + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, + verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self): + ca_cert_path = '' + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, + verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self): + ca_cert_path = ca_bundle.where() + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, + verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_uses_combined_ca_bundle_otherwise(self): + with tempfile.NamedTemporaryFile() as tmp_input, \ + tempfile.NamedTemporaryFile(delete=False) as tmp_output: + ca_content = 'some\ncontent\n' + ca_cert_path = tmp_input.name + self._dump_to_file(ca_cert_path, ca_content) + + with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock: + mock.return_value = tmp_output + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + # assert that combined bundle file is passed to get call + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, + verify=tmp_output.name) + + # assert that files got appended + expected = self._slurp_file(ca_bundle.where()) + ca_content + self.assertEqual(expected, self._slurp_file(tmp_output.name)) + + del km # force km out of scope + self.assertFalse(path.exists(tmp_output.name)) + + def _dump_to_file(self, filename, content): + with open(filename, 'w') as out: + out.write(content) + + def _slurp_file(self, filename): + with open(filename) as f: + content = f.read() + return content + + @inlineCallbacks + def test_decrypt_updates_sign_used_for_signer(self): + # given + km = self._key_manager() + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key( + PRIVATE_KEY_2, ADDRESS_2) + encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey, + sign=ADDRESS_2, fetch_remote=False) + yield km.decrypt( + encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) + + # when + key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False) + + # then + self.assertEqual(True, key.sign_used) + + @inlineCallbacks + def test_decrypt_does_not_update_sign_used_for_recipient(self): + # given + km = self._key_manager() + yield km._wrapper_map[OpenPGPKey].put_ascii_key( + PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key( + PRIVATE_KEY_2, ADDRESS_2) + encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey, + sign=ADDRESS_2, fetch_remote=False) + yield km.decrypt( + encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) + + # when + key = yield km.get_key( + ADDRESS, OpenPGPKey, private=False, fetch_remote=False) + + # then + self.assertEqual(False, key.sign_used) + class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -391,9 +504,8 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): sign=ADDRESS, fetch_remote=False)) return self.assertFailure(d, KeyNotFound) - -import unittest if __name__ == "__main__": + import unittest unittest.main() # key 0F91B402: someone@somedomain.org |