diff options
Diffstat (limited to 'keymanager/src/leap')
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 42 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/__init__.py | 5 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 68 | 
3 files changed, 107 insertions, 8 deletions
diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index cf43004..1220402 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -18,7 +18,10 @@  Key Manager is a Nicknym agent for LEAP client.  """  # let's do a little sanity check to see if we're using the wrong gnupg +import fileinput  import sys +import tempfile +from leap.common import ca_bundle  from ._version import get_versions  try: @@ -134,12 +137,30 @@ class KeyManager(object):          }          # the following are used to perform https requests          self._fetcher = requests -        self._session = self._fetcher.session() +        self._combined_ca_bundle = self._create_combined_bundle_file()      #      # utilities      # +    def _create_combined_bundle_file(self): +        leap_ca_bundle = ca_bundle.where() + +        if self._ca_cert_path == leap_ca_bundle: +            return self._ca_cert_path   # don't merge file with itself +        elif self._ca_cert_path is None: +            return leap_ca_bundle + +        tmp_file = tempfile.NamedTemporaryFile(delete=True)  # file is auto deleted when python process ends + +        with open(tmp_file.name, 'w') as fout: +            fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path)) +            for line in fin: +                fout.write(line) +            fin.close() + +        return tmp_file.name +      def _key_class_from_type(self, ktype):          """          Return key class from string representation of key type. @@ -176,6 +197,23 @@ class KeyManager(object):          #     'Content-type is not JSON.')          return res +    def _get_with_combined_ca_bundle(self, uri, data=None): +        """ +        Send a GET request to C{uri} containing C{data}. + +        Instead of using the ca_cert provided on construction time, this version also uses +        the default certificates shipped with leap.common + +        :param uri: The URI of the request. +        :type uri: str +        :param data: The body of the request. +        :type data: dict, str or file + +        :return: The response to the request. +        :rtype: requests.Response +        """ +        return self._fetcher.get(uri, data=data, verify=self._combined_ca_bundle) +      def _put(self, uri, data=None):          """          Send a PUT request to C{uri} containing C{data}. @@ -780,7 +818,7 @@ class KeyManager(object):          self._assert_supported_key_type(ktype)          logger.info("Fetch key for %s from %s" % (address, uri)) -        res = self._get(uri) +        res = self._get_with_combined_ca_bundle(uri)          if not res.ok:              return defer.fail(KeyNotFound(uri)) diff --git a/keymanager/src/leap/keymanager/tests/__init__.py b/keymanager/src/leap/keymanager/tests/__init__.py index 7128d20..6b647a4 100644 --- a/keymanager/src/leap/keymanager/tests/__init__.py +++ b/keymanager/src/leap/keymanager/tests/__init__.py @@ -73,11 +73,12 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):          d = km._wrapper_map[OpenPGPKey].deferred_indexes          d.addCallback(get_and_delete_keys)          d.addCallback(lambda _: self.tearDownEnv()) +        d.addCallback(lambda _: self._soledad.close())          return d -    def _key_manager(self, user=ADDRESS, url='', token=None): +    def _key_manager(self, user=ADDRESS, url='', token=None, ca_cert_path=None):          return KeyManager(user, url, self._soledad, token=token, -                          gpgbinary=self.gpg_binary_path) +                          gpgbinary=self.gpg_binary_path, ca_cert_path=ca_cert_path)      def _find_gpg(self):          gpg_path = distutils.spawn.find_executable('gpg') diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index a12cac0..984b037 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -22,7 +22,9 @@ Tests for the Key Manager.  from datetime import datetime -from mock import Mock +import tempfile +from leap.common import ca_bundle +from mock import Mock, MagicMock, patch  from twisted.internet.defer import inlineCallbacks  from twisted.trial import unittest @@ -50,6 +52,7 @@ from leap.keymanager.tests import (  NICKSERVER_URI = "http://leap.se/" +REMOTE_KEY_URL = "http://site.domain/key"  class KeyManagerUtilTestCase(unittest.TestCase): @@ -287,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              content = PUBLIC_KEY          km._fetcher.get = Mock(return_value=Response()) -        km.ca_cert_path = 'cacertpath'          yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)          key = yield km.get_key(ADDRESS, OpenPGPKey) @@ -304,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              content = ""          km._fetcher.get = Mock(return_value=Response()) -        km.ca_cert_path = 'cacertpath'          d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)          return self.assertFailure(d, KeyNotFound) @@ -320,10 +321,69 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              content = PUBLIC_KEY          km._fetcher.get = Mock(return_value=Response()) -        km.ca_cert_path = 'cacertpath'          d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)          return self.assertFailure(d, KeyAddressMismatch) +    def _mock_get_response(self, km, body): +        class Response(object): +            ok = True +            content = body + +        mock = MagicMock(return_value=Response()) +        km._fetcher.get = mock + +        return mock + +    @inlineCallbacks +    def test_fetch_key_uses_ca_bundle_if_none_specified(self): +        ca_cert_path = None +        km = self._key_manager(ca_cert_path=ca_cert_path) +        get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + +        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + +        get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where()) + +    @inlineCallbacks +    def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self): +        ca_cert_path = ca_bundle.where() +        km = self._key_manager(ca_cert_path=ca_cert_path) +        get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + +        yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + +        get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where()) + +    @inlineCallbacks +    def test_fetch_uses_combined_ca_bundle_otherwise(self): +        with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output: +            ca_content = 'some\ncontent\n' +            ca_cert_path = tmp_input.name +            self._dump_to_file(ca_cert_path, ca_content) + +            with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock: +                mock.return_value = tmp_output +                km = self._key_manager(ca_cert_path=ca_cert_path) +                get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + +                yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + +                # assert that combined bundle file is passed to get call +                get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name) + +                # assert that files got appended +                expected = self._slurp_file(ca_bundle.where()) + ca_content +                self.assertEqual(expected, self._slurp_file(tmp_output.name)) + +    def _dump_to_file(self, filename, content): +            with open(filename, 'w') as out: +                out.write(content) + +    def _slurp_file(self, filename): +        with open(filename) as f: +            content = f.read() +        return content +  class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):  | 
