From 9546348c3603f390fdd6d5a119414142e9bd02ea Mon Sep 17 00:00:00 2001 From: Folker Bernitt Date: Fri, 18 Sep 2015 17:03:14 +0200 Subject: [feature] Use ca_bundle when fetching keys by url This is necessary as a fetch by url will talk to remote sites or, for providers with a commercial cert, with a cert that had not been signed with the provider CA. - support lookup of local keys by url for providers with a commercial cert - combine ca_bundle with ca_cert_path if specified - close soledad after each test --- src/leap/keymanager/__init__.py | 42 ++++++++++++++++- src/leap/keymanager/tests/__init__.py | 5 +- src/leap/keymanager/tests/test_keymanager.py | 68 ++++++++++++++++++++++++++-- 3 files changed, 107 insertions(+), 8 deletions(-) (limited to 'src/leap/keymanager') diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index cf430043..1220402a 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -18,7 +18,10 @@ Key Manager is a Nicknym agent for LEAP client. """ # let's do a little sanity check to see if we're using the wrong gnupg +import fileinput import sys +import tempfile +from leap.common import ca_bundle from ._version import get_versions try: @@ -134,12 +137,30 @@ class KeyManager(object): } # the following are used to perform https requests self._fetcher = requests - self._session = self._fetcher.session() + self._combined_ca_bundle = self._create_combined_bundle_file() # # utilities # + def _create_combined_bundle_file(self): + leap_ca_bundle = ca_bundle.where() + + if self._ca_cert_path == leap_ca_bundle: + return self._ca_cert_path # don't merge file with itself + elif self._ca_cert_path is None: + return leap_ca_bundle + + tmp_file = tempfile.NamedTemporaryFile(delete=True) # file is auto deleted when python process ends + + with open(tmp_file.name, 'w') as fout: + fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path)) + for line in fin: + fout.write(line) + fin.close() + + return tmp_file.name + def _key_class_from_type(self, ktype): """ Return key class from string representation of key type. @@ -176,6 +197,23 @@ class KeyManager(object): # 'Content-type is not JSON.') return res + def _get_with_combined_ca_bundle(self, uri, data=None): + """ + Send a GET request to C{uri} containing C{data}. + + Instead of using the ca_cert provided on construction time, this version also uses + the default certificates shipped with leap.common + + :param uri: The URI of the request. + :type uri: str + :param data: The body of the request. + :type data: dict, str or file + + :return: The response to the request. + :rtype: requests.Response + """ + return self._fetcher.get(uri, data=data, verify=self._combined_ca_bundle) + def _put(self, uri, data=None): """ Send a PUT request to C{uri} containing C{data}. @@ -780,7 +818,7 @@ class KeyManager(object): self._assert_supported_key_type(ktype) logger.info("Fetch key for %s from %s" % (address, uri)) - res = self._get(uri) + res = self._get_with_combined_ca_bundle(uri) if not res.ok: return defer.fail(KeyNotFound(uri)) diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py index 7128d20d..6b647a4c 100644 --- a/src/leap/keymanager/tests/__init__.py +++ b/src/leap/keymanager/tests/__init__.py @@ -73,11 +73,12 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): d = km._wrapper_map[OpenPGPKey].deferred_indexes d.addCallback(get_and_delete_keys) d.addCallback(lambda _: self.tearDownEnv()) + d.addCallback(lambda _: self._soledad.close()) return d - def _key_manager(self, user=ADDRESS, url='', token=None): + def _key_manager(self, user=ADDRESS, url='', token=None, ca_cert_path=None): return KeyManager(user, url, self._soledad, token=token, - gpgbinary=self.gpg_binary_path) + gpgbinary=self.gpg_binary_path, ca_cert_path=ca_cert_path) def _find_gpg(self): gpg_path = distutils.spawn.find_executable('gpg') diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index a12cac0e..984b037a 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -22,7 +22,9 @@ Tests for the Key Manager. from datetime import datetime -from mock import Mock +import tempfile +from leap.common import ca_bundle +from mock import Mock, MagicMock, patch from twisted.internet.defer import inlineCallbacks from twisted.trial import unittest @@ -50,6 +52,7 @@ from leap.keymanager.tests import ( NICKSERVER_URI = "http://leap.se/" +REMOTE_KEY_URL = "http://site.domain/key" class KeyManagerUtilTestCase(unittest.TestCase): @@ -287,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = PUBLIC_KEY km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) key = yield km.get_key(ADDRESS, OpenPGPKey) @@ -304,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = "" km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) return self.assertFailure(d, KeyNotFound) @@ -320,10 +321,69 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = PUBLIC_KEY km._fetcher.get = Mock(return_value=Response()) - km.ca_cert_path = 'cacertpath' d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey) return self.assertFailure(d, KeyAddressMismatch) + def _mock_get_response(self, km, body): + class Response(object): + ok = True + content = body + + mock = MagicMock(return_value=Response()) + km._fetcher.get = mock + + return mock + + @inlineCallbacks + def test_fetch_key_uses_ca_bundle_if_none_specified(self): + ca_cert_path = None + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self): + ca_cert_path = ca_bundle.where() + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where()) + + @inlineCallbacks + def test_fetch_uses_combined_ca_bundle_otherwise(self): + with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output: + ca_content = 'some\ncontent\n' + ca_cert_path = tmp_input.name + self._dump_to_file(ca_cert_path, ca_content) + + with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock: + mock.return_value = tmp_output + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + + # assert that combined bundle file is passed to get call + get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name) + + # assert that files got appended + expected = self._slurp_file(ca_bundle.where()) + ca_content + self.assertEqual(expected, self._slurp_file(tmp_output.name)) + + def _dump_to_file(self, filename, content): + with open(filename, 'w') as out: + out.write(content) + + def _slurp_file(self, filename): + with open(filename) as f: + content = f.read() + return content + class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): -- cgit v1.2.3