diff options
author | Folker Bernitt <fbernitt@thoughtworks.com> | 2015-09-18 17:03:14 +0200 |
---|---|---|
committer | Folker Bernitt <fbernitt@thoughtworks.com> | 2015-09-18 17:26:04 +0200 |
commit | 9546348c3603f390fdd6d5a119414142e9bd02ea (patch) | |
tree | a1b5223033795c9d659f86e96e1c0f51ad536eb5 /src/leap/keymanager/tests/test_keymanager.py | |
parent | 0b9f64faef0ba9c5cf2a9efe485794ef9b999fab (diff) |
[feature] Use ca_bundle when fetching keys by url
This is necessary as a fetch by url will talk to remote
sites or, for providers with a commercial cert, with
a cert that had not been signed with the provider CA.
- support lookup of local keys by url for providers
with a commercial cert
- combine ca_bundle with ca_cert_path if specified
- close soledad after each test
Diffstat (limited to 'src/leap/keymanager/tests/test_keymanager.py')
-rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 68 |
1 files changed, 64 insertions, 4 deletions
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index a12cac0e..984b037a 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -22,7 +22,9 @@ Tests for the Key Manager. from datetime import datetime -from mock import Mock +import tempfile +from leap.common import ca_bundle +from mock import Mock, MagicMock, patch from twisted.internet.defer import inlineCallbacks from twisted.trial import unittest @@ -50,6 +52,7 @@ from leap.keymanager.tests import ( NICKSERVER_URI = "http://leap.se/" +REMOTE_KEY_URL = "http://site.domain/key" class KeyManagerUtilTestCase(unittest.TestCase): @@ -287,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = PUBLIC_KEY km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) key = yield km.get_key(ADDRESS, OpenPGPKey) @@ -304,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = "" km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) return self.assertFailure(d, KeyNotFound) @@ -320,10 +321,69 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = PUBLIC_KEY km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey) return self.assertFailure(d, KeyAddressMismatch) + def _mock_get_response(self, km, body): + class Response(object): + ok = True + content = body + + mock = MagicMock(return_value=Response()) + km._fetcher.get = mock + + return mock + + @inlineCallbacks + def test_fetch_key_uses_ca_bundle_if_none_specified(self): + ca_cert_path = None + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self): + ca_cert_path = ca_bundle.where() + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_uses_combined_ca_bundle_otherwise(self): + with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output: + ca_content = 'some\ncontent\n' + ca_cert_path = tmp_input.name + self._dump_to_file(ca_cert_path, ca_content) + + with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock: + mock.return_value = tmp_output + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + # assert that combined bundle file is passed to get call + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name) + + # assert that files got appended + expected = self._slurp_file(ca_bundle.where()) + ca_content + self.assertEqual(expected, self._slurp_file(tmp_output.name)) + + def _dump_to_file(self, filename, content): + with open(filename, 'w') as out: + out.write(content) + + def _slurp_file(self, filename): + with open(filename) as f: + content = f.read() + return content + class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): |