summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFolker Bernitt <fbernitt@thoughtworks.com>2015-09-18 17:03:14 +0200
committerFolker Bernitt <fbernitt@thoughtworks.com>2015-09-18 17:26:04 +0200
commit9546348c3603f390fdd6d5a119414142e9bd02ea (patch)
treea1b5223033795c9d659f86e96e1c0f51ad536eb5
parent0b9f64faef0ba9c5cf2a9efe485794ef9b999fab (diff)
[feature] Use ca_bundle when fetching keys by url
This is necessary as a fetch by url will talk to remote sites or, for providers with a commercial cert, with a cert that had not been signed with the provider CA. - support lookup of local keys by url for providers with a commercial cert - combine ca_bundle with ca_cert_path if specified - close soledad after each test
-rw-r--r--src/leap/keymanager/__init__.py42
-rw-r--r--src/leap/keymanager/tests/__init__.py5
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py68
3 files changed, 107 insertions, 8 deletions
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index cf430043..1220402a 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -18,7 +18,10 @@
Key Manager is a Nicknym agent for LEAP client.
"""
# let's do a little sanity check to see if we're using the wrong gnupg
+import fileinput
import sys
+import tempfile
+from leap.common import ca_bundle
from ._version import get_versions
try:
@@ -134,12 +137,30 @@ class KeyManager(object):
}
# the following are used to perform https requests
self._fetcher = requests
- self._session = self._fetcher.session()
+ self._combined_ca_bundle = self._create_combined_bundle_file()
#
# utilities
#
+ def _create_combined_bundle_file(self):
+ leap_ca_bundle = ca_bundle.where()
+
+ if self._ca_cert_path == leap_ca_bundle:
+ return self._ca_cert_path # don't merge file with itself
+ elif self._ca_cert_path is None:
+ return leap_ca_bundle
+
+ tmp_file = tempfile.NamedTemporaryFile(delete=True) # file is auto deleted when python process ends
+
+ with open(tmp_file.name, 'w') as fout:
+ fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path))
+ for line in fin:
+ fout.write(line)
+ fin.close()
+
+ return tmp_file.name
+
def _key_class_from_type(self, ktype):
"""
Return key class from string representation of key type.
@@ -176,6 +197,23 @@ class KeyManager(object):
# 'Content-type is not JSON.')
return res
+ def _get_with_combined_ca_bundle(self, uri, data=None):
+ """
+ Send a GET request to C{uri} containing C{data}.
+
+ Instead of using the ca_cert provided on construction time, this version also uses
+ the default certificates shipped with leap.common
+
+ :param uri: The URI of the request.
+ :type uri: str
+ :param data: The body of the request.
+ :type data: dict, str or file
+
+ :return: The response to the request.
+ :rtype: requests.Response
+ """
+ return self._fetcher.get(uri, data=data, verify=self._combined_ca_bundle)
+
def _put(self, uri, data=None):
"""
Send a PUT request to C{uri} containing C{data}.
@@ -780,7 +818,7 @@ class KeyManager(object):
self._assert_supported_key_type(ktype)
logger.info("Fetch key for %s from %s" % (address, uri))
- res = self._get(uri)
+ res = self._get_with_combined_ca_bundle(uri)
if not res.ok:
return defer.fail(KeyNotFound(uri))
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index 7128d20d..6b647a4c 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -73,11 +73,12 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
d = km._wrapper_map[OpenPGPKey].deferred_indexes
d.addCallback(get_and_delete_keys)
d.addCallback(lambda _: self.tearDownEnv())
+ d.addCallback(lambda _: self._soledad.close())
return d
- def _key_manager(self, user=ADDRESS, url='', token=None):
+ def _key_manager(self, user=ADDRESS, url='', token=None, ca_cert_path=None):
return KeyManager(user, url, self._soledad, token=token,
- gpgbinary=self.gpg_binary_path)
+ gpgbinary=self.gpg_binary_path, ca_cert_path=ca_cert_path)
def _find_gpg(self):
gpg_path = distutils.spawn.find_executable('gpg')
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index a12cac0e..984b037a 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -22,7 +22,9 @@ Tests for the Key Manager.
from datetime import datetime
-from mock import Mock
+import tempfile
+from leap.common import ca_bundle
+from mock import Mock, MagicMock, patch
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
@@ -50,6 +52,7 @@ from leap.keymanager.tests import (
NICKSERVER_URI = "http://leap.se/"
+REMOTE_KEY_URL = "http://site.domain/key"
class KeyManagerUtilTestCase(unittest.TestCase):
@@ -287,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -304,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = ""
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyNotFound)
@@ -320,10 +321,69 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyAddressMismatch)
+ def _mock_get_response(self, km, body):
+ class Response(object):
+ ok = True
+ content = body
+
+ mock = MagicMock(return_value=Response())
+ km._fetcher.get = mock
+
+ return mock
+
+ @inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_none_specified(self):
+ ca_cert_path = None
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self):
+ ca_cert_path = ca_bundle.where()
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_uses_combined_ca_bundle_otherwise(self):
+ with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output:
+ ca_content = 'some\ncontent\n'
+ ca_cert_path = tmp_input.name
+ self._dump_to_file(ca_cert_path, ca_content)
+
+ with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
+ mock.return_value = tmp_output
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ # assert that combined bundle file is passed to get call
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name)
+
+ # assert that files got appended
+ expected = self._slurp_file(ca_bundle.where()) + ca_content
+ self.assertEqual(expected, self._slurp_file(tmp_output.name))
+
+ def _dump_to_file(self, filename, content):
+ with open(filename, 'w') as out:
+ out.write(content)
+
+ def _slurp_file(self, filename):
+ with open(filename) as f:
+ content = f.read()
+ return content
+
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):