[feature] Use ca_bundle when fetching keys by url
authorFolker Bernitt <fbernitt@thoughtworks.com>
Fri, 18 Sep 2015 15:03:14 +0000 (17:03 +0200)
committerFolker Bernitt <fbernitt@thoughtworks.com>
Fri, 18 Sep 2015 15:26:04 +0000 (17:26 +0200)
This is necessary as a fetch by url will talk to remote
sites or, for providers with a commercial cert, with
a cert that had not been signed with the provider CA.

- support lookup of local keys by url for providers
  with a commercial cert
- combine ca_bundle with ca_cert_path if specified
- close soledad after each test

src/leap/keymanager/__init__.py
src/leap/keymanager/tests/__init__.py
src/leap/keymanager/tests/test_keymanager.py

index cf43004..1220402 100644 (file)
 Key Manager is a Nicknym agent for LEAP client.
 """
 # let's do a little sanity check to see if we're using the wrong gnupg
+import fileinput
 import sys
+import tempfile
+from leap.common import ca_bundle
 from ._version import get_versions
 
 try:
@@ -134,12 +137,30 @@ class KeyManager(object):
         }
         # the following are used to perform https requests
         self._fetcher = requests
-        self._session = self._fetcher.session()
+        self._combined_ca_bundle = self._create_combined_bundle_file()
 
     #
     # utilities
     #
 
+    def _create_combined_bundle_file(self):
+        leap_ca_bundle = ca_bundle.where()
+
+        if self._ca_cert_path == leap_ca_bundle:
+            return self._ca_cert_path   # don't merge file with itself
+        elif self._ca_cert_path is None:
+            return leap_ca_bundle
+
+        tmp_file = tempfile.NamedTemporaryFile(delete=True)  # file is auto deleted when python process ends
+
+        with open(tmp_file.name, 'w') as fout:
+            fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path))
+            for line in fin:
+                fout.write(line)
+            fin.close()
+
+        return tmp_file.name
+
     def _key_class_from_type(self, ktype):
         """
         Return key class from string representation of key type.
@@ -176,6 +197,23 @@ class KeyManager(object):
         #     'Content-type is not JSON.')
         return res
 
+    def _get_with_combined_ca_bundle(self, uri, data=None):
+        """
+        Send a GET request to C{uri} containing C{data}.
+
+        Instead of using the ca_cert provided on construction time, this version also uses
+        the default certificates shipped with leap.common
+
+        :param uri: The URI of the request.
+        :type uri: str
+        :param data: The body of the request.
+        :type data: dict, str or file
+
+        :return: The response to the request.
+        :rtype: requests.Response
+        """
+        return self._fetcher.get(uri, data=data, verify=self._combined_ca_bundle)
+
     def _put(self, uri, data=None):
         """
         Send a PUT request to C{uri} containing C{data}.
@@ -780,7 +818,7 @@ class KeyManager(object):
         self._assert_supported_key_type(ktype)
 
         logger.info("Fetch key for %s from %s" % (address, uri))
-        res = self._get(uri)
+        res = self._get_with_combined_ca_bundle(uri)
         if not res.ok:
             return defer.fail(KeyNotFound(uri))
 
index 7128d20..6b647a4 100644 (file)
@@ -73,11 +73,12 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
         d = km._wrapper_map[OpenPGPKey].deferred_indexes
         d.addCallback(get_and_delete_keys)
         d.addCallback(lambda _: self.tearDownEnv())
+        d.addCallback(lambda _: self._soledad.close())
         return d
 
-    def _key_manager(self, user=ADDRESS, url='', token=None):
+    def _key_manager(self, user=ADDRESS, url='', token=None, ca_cert_path=None):
         return KeyManager(user, url, self._soledad, token=token,
-                          gpgbinary=self.gpg_binary_path)
+                          gpgbinary=self.gpg_binary_path, ca_cert_path=ca_cert_path)
 
     def _find_gpg(self):
         gpg_path = distutils.spawn.find_executable('gpg')
index a12cac0..984b037 100644 (file)
@@ -22,7 +22,9 @@ Tests for the Key Manager.
 
 
 from datetime import datetime
-from mock import Mock
+import tempfile
+from leap.common import ca_bundle
+from mock import Mock, MagicMock, patch
 from twisted.internet.defer import inlineCallbacks
 from twisted.trial import unittest
 
@@ -50,6 +52,7 @@ from leap.keymanager.tests import (
 
 
 NICKSERVER_URI = "http://leap.se/"
+REMOTE_KEY_URL = "http://site.domain/key"
 
 
 class KeyManagerUtilTestCase(unittest.TestCase):
@@ -287,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
             content = PUBLIC_KEY
 
         km._fetcher.get = Mock(return_value=Response())
-        km.ca_cert_path = 'cacertpath'
 
         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
         key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -304,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
             content = ""
 
         km._fetcher.get = Mock(return_value=Response())
-        km.ca_cert_path = 'cacertpath'
         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
         return self.assertFailure(d, KeyNotFound)
 
@@ -320,10 +321,69 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
             content = PUBLIC_KEY
 
         km._fetcher.get = Mock(return_value=Response())
-        km.ca_cert_path = 'cacertpath'
         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
         return self.assertFailure(d, KeyAddressMismatch)
 
+    def _mock_get_response(self, km, body):
+        class Response(object):
+            ok = True
+            content = body
+
+        mock = MagicMock(return_value=Response())
+        km._fetcher.get = mock
+
+        return mock
+
+    @inlineCallbacks
+    def test_fetch_key_uses_ca_bundle_if_none_specified(self):
+        ca_cert_path = None
+        km = self._key_manager(ca_cert_path=ca_cert_path)
+        get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+        get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
+
+    @inlineCallbacks
+    def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self):
+        ca_cert_path = ca_bundle.where()
+        km = self._key_manager(ca_cert_path=ca_cert_path)
+        get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+        get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
+
+    @inlineCallbacks
+    def test_fetch_uses_combined_ca_bundle_otherwise(self):
+        with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output:
+            ca_content = 'some\ncontent\n'
+            ca_cert_path = tmp_input.name
+            self._dump_to_file(ca_cert_path, ca_content)
+
+            with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
+                mock.return_value = tmp_output
+                km = self._key_manager(ca_cert_path=ca_cert_path)
+                get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+                yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+                # assert that combined bundle file is passed to get call
+                get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name)
+
+                # assert that files got appended
+                expected = self._slurp_file(ca_bundle.where()) + ca_content
+                self.assertEqual(expected, self._slurp_file(tmp_output.name))
+
+    def _dump_to_file(self, filename, content):
+            with open(filename, 'w') as out:
+                out.write(content)
+
+    def _slurp_file(self, filename):
+        with open(filename) as f:
+            content = f.read()
+        return content
+
 
 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):