From e5717e853af7d2f91ac69e66c1b2ee058289e78d Mon Sep 17 00:00:00 2001 From: Zara Gebru Date: Fri, 8 Jul 2016 11:55:55 +0200 Subject: [feature] keymanager: background update keys Port of the original commit: 8f1fe8dd4a54fd2bdda2fc78c339ce9b3d0fc331 by Zara Gebru that introduced updating keys in the background. This was made in the legacy leapcode/keymanager repo, but was lost in the merge to the unified bitmask-dev. Original commit message follows: -------------------------------- - refresh random key in random time - add get key by fingerprint - refactor nicknym methods to own file - tests - note this do not include a check for revoked key, since that need some changes in gnupg - Related: #6089 --- tests/integration/keymanager/test_keymanager.py | 61 ++++++++++++++++++++----- 1 file changed, 49 insertions(+), 12 deletions(-) (limited to 'tests/integration/keymanager/test_keymanager.py') diff --git a/tests/integration/keymanager/test_keymanager.py b/tests/integration/keymanager/test_keymanager.py index 443902a..9b5de83 100644 --- a/tests/integration/keymanager/test_keymanager.py +++ b/tests/integration/keymanager/test_keymanager.py @@ -27,12 +27,16 @@ from os import path from twisted.internet import defer from twisted.trial import unittest from twisted.web._responses import NOT_FOUND +from twisted.web import client import mock from leap.common import ca_bundle from leap.bitmask.keymanager import client from leap.bitmask.keymanager import errors from leap.bitmask.keymanager.keys import ( + +from leap.keymanager import errors +from leap.keymanager.keys import ( OpenPGPKey, is_address, build_key_from_dict, @@ -222,24 +226,42 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): headers = {'Authorization': [str('Token token=%s' % token)]} headers['Content-Type'] = ['application/x-www-form-urlencoded'] url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid') - km._async_client_pinned.request.assert_called_once_with( + km._nicknym._async_client_pinned.request.assert_called_once_with( str(url), 'PUT', body=str(data), headers=headers ) def test_fetch_keys_from_server(self): """ - Test that the request is well formed when fetching keys from server. + Test that the request is well formed when fetching keys from server + with address. """ km = self._key_manager(url=NICKSERVER_URI) expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2 def verify_the_call(_): - used_kwargs = km._async_client_pinned.request.call_args[1] - km._async_client_pinned.request.assert_called_once_with( + used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] + km._nicknym._async_client_pinned.request.assert_called_once_with( expected_url, 'GET', **used_kwargs) - d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2) + d = self._fetch_key_with_address(km, ADDRESS_2, PUBLIC_KEY_2) + d.addCallback(verify_the_call) + return d + + def test_fetch_keys_from_server_with_fingerprint(self): + """ + Test that the request is well formed when fetching keys from server + with fingerprint. + """ + km = self._key_manager(url=NICKSERVER_URI) + expected_url = NICKSERVER_URI + '?fingerprint=' + KEY_FINGERPRINT + + def verify_the_call(_): + used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] + km._nicknym._async_client_pinned.request.assert_called_once_with( + expected_url, 'GET', **used_kwargs) + + d = self._fetch_key_with_fingerprint(km, KEY_FINGERPRINT, PUBLIC_KEY) d.addCallback(verify_the_call) return d @@ -254,16 +276,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): return_value=defer.succeed(None)) url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS - d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) + d = km._nicknym._fetch_and_handle_404_from_nicknym(url) def check_key_not_found_is_raised_if_404(_): - used_kwargs = km._async_client_pinned.request.call_args[1] + used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] check_404_callback = used_kwargs['callback'] fake_response = mock.Mock() fake_response.code = NOT_FOUND with self.assertRaisesRegexp( errors.KeyNotFound, - '404: %s key not found.' % INVALID_MAIL_ADDRESS): + '404: Key not found. Request: %s' % url.replace('?', '\?')): check_404_callback(fake_response) d.addCallback(check_key_not_found_is_raised_if_404) @@ -282,7 +304,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def assert_key_not_found_raised(error): self.assertEqual(error.value, key_not_found_exception) - d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS) + d = km._nicknym.fetch_key_with_address(INVALID_MAIL_ADDRESS) d.addErrback(assert_key_not_found_raised) @defer.inlineCallbacks @@ -292,7 +314,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) - key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) + key = yield self._fetch_key_with_address(km, ADDRESS, PUBLIC_KEY) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.uids) self.assertEqual(key.validation, ValidationLevels.Provider_Trust) @@ -304,12 +326,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) - key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) + key = yield self._fetch_key_with_address(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS_OTHER in key.uids) self.assertEqual(key.validation, ValidationLevels.Weak_Chain) - def _fetch_key(self, km, address, key): + def _fetch_key_with_address(self, km, address, key): """ :returns: a Deferred that will fire with the OpenPGPKey """ @@ -328,6 +350,21 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): d.addCallback(lambda _: km.get_key(address)) return d + def _fetch_key_with_fingerprint(self, km, fingerprint, key): + """ + :returns: a Deferred that will fire with the OpenPGPKey + """ + data = json.dumps({'fingerprint': fingerprint, 'openpgp': key}) + + client.readBody = Mock(return_value=defer.succeed(data)) + + # mock the fetcher so it returns the key for KEY_FINGERPRINT + km._nicknym._async_client_pinned.request = Mock( + return_value=defer.succeed(None)) + km.ca_cert_path = 'cacertpath' + key = km._nicknym.fetch_key_with_fingerprint(fingerprint) + return key + @defer.inlineCallbacks def test_put_key_ascii(self): """ -- cgit v1.2.3