summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager/tests')
-rw-r--r--src/leap/keymanager/tests/__init__.py5
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py68
2 files changed, 67 insertions, 6 deletions
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index 7128d20d..6b647a4c 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -73,11 +73,12 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
d = km._wrapper_map[OpenPGPKey].deferred_indexes
d.addCallback(get_and_delete_keys)
d.addCallback(lambda _: self.tearDownEnv())
+ d.addCallback(lambda _: self._soledad.close())
return d
- def _key_manager(self, user=ADDRESS, url='', token=None):
+ def _key_manager(self, user=ADDRESS, url='', token=None, ca_cert_path=None):
return KeyManager(user, url, self._soledad, token=token,
- gpgbinary=self.gpg_binary_path)
+ gpgbinary=self.gpg_binary_path, ca_cert_path=ca_cert_path)
def _find_gpg(self):
gpg_path = distutils.spawn.find_executable('gpg')
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index a12cac0e..984b037a 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -22,7 +22,9 @@ Tests for the Key Manager.
from datetime import datetime
-from mock import Mock
+import tempfile
+from leap.common import ca_bundle
+from mock import Mock, MagicMock, patch
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
@@ -50,6 +52,7 @@ from leap.keymanager.tests import (
NICKSERVER_URI = "http://leap.se/"
+REMOTE_KEY_URL = "http://site.domain/key"
class KeyManagerUtilTestCase(unittest.TestCase):
@@ -287,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -304,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = ""
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyNotFound)
@@ -320,10 +321,69 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyAddressMismatch)
+ def _mock_get_response(self, km, body):
+ class Response(object):
+ ok = True
+ content = body
+
+ mock = MagicMock(return_value=Response())
+ km._fetcher.get = mock
+
+ return mock
+
+ @inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_none_specified(self):
+ ca_cert_path = None
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self):
+ ca_cert_path = ca_bundle.where()
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_uses_combined_ca_bundle_otherwise(self):
+ with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output:
+ ca_content = 'some\ncontent\n'
+ ca_cert_path = tmp_input.name
+ self._dump_to_file(ca_cert_path, ca_content)
+
+ with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
+ mock.return_value = tmp_output
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ # assert that combined bundle file is passed to get call
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name)
+
+ # assert that files got appended
+ expected = self._slurp_file(ca_bundle.where()) + ca_content
+ self.assertEqual(expected, self._slurp_file(tmp_output.name))
+
+ def _dump_to_file(self, filename, content):
+ with open(filename, 'w') as out:
+ out.write(content)
+
+ def _slurp_file(self, filename):
+ with open(filename) as f:
+ content = f.read()
+ return content
+
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):