summaryrefslogtreecommitdiff
path: root/tests/test_keymanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_keymanager.py')
-rw-r--r--tests/test_keymanager.py67
1 files changed, 50 insertions, 17 deletions
diff --git a/tests/test_keymanager.py b/tests/test_keymanager.py
index b4ab805..118cd8c 100644
--- a/tests/test_keymanager.py
+++ b/tests/test_keymanager.py
@@ -32,7 +32,7 @@ from twisted.internet import defer
from twisted.trial import unittest
from twisted.web._responses import NOT_FOUND
-from leap.keymanager import client
+from twisted.web import client
from leap.keymanager import errors
from leap.keymanager.keys import (
@@ -208,7 +208,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
token = "mytoken"
km = self._key_manager(token=token)
yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS)
- km._async_client_pinned.request = Mock(return_value=defer.succeed(''))
+ km._nicknym._async_client_pinned.request = Mock(return_value=defer.succeed(''))
# the following data will be used on the send
km.ca_cert_path = 'capath'
km.session_id = 'sessionid'
@@ -224,24 +224,42 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
headers = {'Authorization': [str('Token token=%s' % token)]}
headers['Content-Type'] = ['application/x-www-form-urlencoded']
url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
- km._async_client_pinned.request.assert_called_once_with(
+ km._nicknym._async_client_pinned.request.assert_called_once_with(
str(url), 'PUT', body=str(data),
headers=headers
)
def test_fetch_keys_from_server(self):
"""
- Test that the request is well formed when fetching keys from server.
+ Test that the request is well formed when fetching keys from server
+ with address.
"""
km = self._key_manager(url=NICKSERVER_URI)
expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2
def verify_the_call(_):
- used_kwargs = km._async_client_pinned.request.call_args[1]
- km._async_client_pinned.request.assert_called_once_with(
+ used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
+ km._nicknym._async_client_pinned.request.assert_called_once_with(
expected_url, 'GET', **used_kwargs)
- d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
+ d = self._fetch_key_with_address(km, ADDRESS_2, PUBLIC_KEY_2)
+ d.addCallback(verify_the_call)
+ return d
+
+ def test_fetch_keys_from_server_with_fingerprint(self):
+ """
+ Test that the request is well formed when fetching keys from server
+ with fingerprint.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ expected_url = NICKSERVER_URI + '?fingerprint=' + KEY_FINGERPRINT
+
+ def verify_the_call(_):
+ used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
+ km._nicknym._async_client_pinned.request.assert_called_once_with(
+ expected_url, 'GET', **used_kwargs)
+
+ d = self._fetch_key_with_fingerprint(km, KEY_FINGERPRINT, PUBLIC_KEY)
d.addCallback(verify_the_call)
return d
@@ -252,20 +270,20 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
client.readBody = Mock(return_value=defer.succeed(None))
- km._async_client_pinned.request = Mock(
+ km._nicknym._async_client_pinned.request = Mock(
return_value=defer.succeed(None))
url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS
- d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS)
+ d = km._nicknym._fetch_and_handle_404_from_nicknym(url)
def check_key_not_found_is_raised_if_404(_):
- used_kwargs = km._async_client_pinned.request.call_args[1]
+ used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
check_404_callback = used_kwargs['callback']
fake_response = Mock()
fake_response.code = NOT_FOUND
with self.assertRaisesRegexp(
errors.KeyNotFound,
- '404: %s key not found.' % INVALID_MAIL_ADDRESS):
+ '404: Key not found. Request: %s' % url.replace('?', '\?')):
check_404_callback(fake_response)
d.addCallback(check_key_not_found_is_raised_if_404)
@@ -278,13 +296,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
key_not_found_exception = errors.KeyNotFound('some message')
- km._async_client_pinned.request = Mock(
+ km._nicknym._async_client_pinned.request = Mock(
side_effect=key_not_found_exception)
def assert_key_not_found_raised(error):
self.assertEqual(error.value, key_not_found_exception)
- d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS)
+ d = km._nicknym.fetch_key_with_address(INVALID_MAIL_ADDRESS)
d.addErrback(assert_key_not_found_raised)
@defer.inlineCallbacks
@@ -294,7 +312,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
- key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
+ key = yield self._fetch_key_with_address(km, ADDRESS, PUBLIC_KEY)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.uids)
self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
@@ -306,12 +324,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
- key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
+ key = yield self._fetch_key_with_address(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS_OTHER in key.uids)
self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
- def _fetch_key(self, km, address, key):
+ def _fetch_key_with_address(self, km, address, key):
"""
:returns: a Deferred that will fire with the OpenPGPKey
"""
@@ -320,7 +338,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
client.readBody = Mock(return_value=defer.succeed(data))
# mock the fetcher so it returns the key for ADDRESS_2
- km._async_client_pinned.request = Mock(
+ km._nicknym._async_client_pinned.request = Mock(
return_value=defer.succeed(None))
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
@@ -330,6 +348,21 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
d.addCallback(lambda _: km.get_key(address))
return d
+ def _fetch_key_with_fingerprint(self, km, fingerprint, key):
+ """
+ :returns: a Deferred that will fire with the OpenPGPKey
+ """
+ data = json.dumps({'fingerprint': fingerprint, 'openpgp': key})
+
+ client.readBody = Mock(return_value=defer.succeed(data))
+
+ # mock the fetcher so it returns the key for KEY_FINGERPRINT
+ km._nicknym._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
+ km.ca_cert_path = 'cacertpath'
+ key = km._nicknym.fetch_key_with_fingerprint(fingerprint)
+ return key
+
@defer.inlineCallbacks
def test_put_key_ascii(self):
"""