From 8f1fe8dd4a54fd2bdda2fc78c339ce9b3d0fc331 Mon Sep 17 00:00:00 2001 From: Zara Gebru Date: Fri, 8 Jul 2016 11:55:55 +0200 Subject: [feature] keymanager - background update keys - refresh random key in random time - add get key by fingerprint - refactor nicknym methods to own file - tests - note this do not include a check for revoked key, since that need some changes in gnupg - Related: #6089 --- tests/test_keymanager.py | 67 +++++++++++++++------ tests/test_refresher.py | 149 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 17 deletions(-) create mode 100644 tests/test_refresher.py (limited to 'tests') diff --git a/tests/test_keymanager.py b/tests/test_keymanager.py index b4ab805..118cd8c 100644 --- a/tests/test_keymanager.py +++ b/tests/test_keymanager.py @@ -32,7 +32,7 @@ from twisted.internet import defer from twisted.trial import unittest from twisted.web._responses import NOT_FOUND -from leap.keymanager import client +from twisted.web import client from leap.keymanager import errors from leap.keymanager.keys import ( @@ -208,7 +208,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): token = "mytoken" km = self._key_manager(token=token) yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS) - km._async_client_pinned.request = Mock(return_value=defer.succeed('')) + km._nicknym._async_client_pinned.request = Mock(return_value=defer.succeed('')) # the following data will be used on the send km.ca_cert_path = 'capath' km.session_id = 'sessionid' @@ -224,24 +224,42 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): headers = {'Authorization': [str('Token token=%s' % token)]} headers['Content-Type'] = ['application/x-www-form-urlencoded'] url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid') - km._async_client_pinned.request.assert_called_once_with( + km._nicknym._async_client_pinned.request.assert_called_once_with( str(url), 'PUT', body=str(data), headers=headers ) def test_fetch_keys_from_server(self): """ - Test that the request is well formed when fetching keys from server. + Test that the request is well formed when fetching keys from server + with address. """ km = self._key_manager(url=NICKSERVER_URI) expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2 def verify_the_call(_): - used_kwargs = km._async_client_pinned.request.call_args[1] - km._async_client_pinned.request.assert_called_once_with( + used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] + km._nicknym._async_client_pinned.request.assert_called_once_with( expected_url, 'GET', **used_kwargs) - d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2) + d = self._fetch_key_with_address(km, ADDRESS_2, PUBLIC_KEY_2) + d.addCallback(verify_the_call) + return d + + def test_fetch_keys_from_server_with_fingerprint(self): + """ + Test that the request is well formed when fetching keys from server + with fingerprint. + """ + km = self._key_manager(url=NICKSERVER_URI) + expected_url = NICKSERVER_URI + '?fingerprint=' + KEY_FINGERPRINT + + def verify_the_call(_): + used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] + km._nicknym._async_client_pinned.request.assert_called_once_with( + expected_url, 'GET', **used_kwargs) + + d = self._fetch_key_with_fingerprint(km, KEY_FINGERPRINT, PUBLIC_KEY) d.addCallback(verify_the_call) return d @@ -252,20 +270,20 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) client.readBody = Mock(return_value=defer.succeed(None)) - km._async_client_pinned.request = Mock( + km._nicknym._async_client_pinned.request = Mock( return_value=defer.succeed(None)) url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS - d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) + d = km._nicknym._fetch_and_handle_404_from_nicknym(url) def check_key_not_found_is_raised_if_404(_): - used_kwargs = km._async_client_pinned.request.call_args[1] + used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] check_404_callback = used_kwargs['callback'] fake_response = Mock() fake_response.code = NOT_FOUND with self.assertRaisesRegexp( errors.KeyNotFound, - '404: %s key not found.' % INVALID_MAIL_ADDRESS): + '404: Key not found. Request: %s' % url.replace('?', '\?')): check_404_callback(fake_response) d.addCallback(check_key_not_found_is_raised_if_404) @@ -278,13 +296,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) key_not_found_exception = errors.KeyNotFound('some message') - km._async_client_pinned.request = Mock( + km._nicknym._async_client_pinned.request = Mock( side_effect=key_not_found_exception) def assert_key_not_found_raised(error): self.assertEqual(error.value, key_not_found_exception) - d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS) + d = km._nicknym.fetch_key_with_address(INVALID_MAIL_ADDRESS) d.addErrback(assert_key_not_found_raised) @defer.inlineCallbacks @@ -294,7 +312,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) - key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) + key = yield self._fetch_key_with_address(km, ADDRESS, PUBLIC_KEY) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.uids) self.assertEqual(key.validation, ValidationLevels.Provider_Trust) @@ -306,12 +324,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) - key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) + key = yield self._fetch_key_with_address(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS_OTHER in key.uids) self.assertEqual(key.validation, ValidationLevels.Weak_Chain) - def _fetch_key(self, km, address, key): + def _fetch_key_with_address(self, km, address, key): """ :returns: a Deferred that will fire with the OpenPGPKey """ @@ -320,7 +338,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): client.readBody = Mock(return_value=defer.succeed(data)) # mock the fetcher so it returns the key for ADDRESS_2 - km._async_client_pinned.request = Mock( + km._nicknym._async_client_pinned.request = Mock( return_value=defer.succeed(None)) km.ca_cert_path = 'cacertpath' # try to key get without fetching from server @@ -330,6 +348,21 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): d.addCallback(lambda _: km.get_key(address)) return d + def _fetch_key_with_fingerprint(self, km, fingerprint, key): + """ + :returns: a Deferred that will fire with the OpenPGPKey + """ + data = json.dumps({'fingerprint': fingerprint, 'openpgp': key}) + + client.readBody = Mock(return_value=defer.succeed(data)) + + # mock the fetcher so it returns the key for KEY_FINGERPRINT + km._nicknym._async_client_pinned.request = Mock( + return_value=defer.succeed(None)) + km.ca_cert_path = 'cacertpath' + key = km._nicknym.fetch_key_with_fingerprint(fingerprint) + return key + @defer.inlineCallbacks def test_put_key_ascii(self): """ diff --git a/tests/test_refresher.py b/tests/test_refresher.py new file mode 100644 index 0000000..13a46d4 --- /dev/null +++ b/tests/test_refresher.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# test_refresher.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Tests for refreshing the key directory. +""" + +import logging +from datetime import datetime + +from mock import Mock, patch +from twisted.internet import defer + +from common import KeyManagerWithSoledadTestCase, KEY_FINGERPRINT +from leap.keymanager import openpgp +from leap.keymanager.keys import OpenPGPKey +from leap.keymanager.refresher import RandomRefreshPublicKey, MIN_RANDOM_INTERVAL_RANGE, DEBUG_START_REFRESH, \ + DEBUG_STOP_REFRESH, ERROR_UNEQUAL_FINGERPRINTS + +ANOTHER_FP = 'ANOTHERFINGERPRINT' + +logger = logging.getLogger(__name__) + + +class RandomRefreshPublicKeyTestCase(KeyManagerWithSoledadTestCase): + + @defer.inlineCallbacks + def test_get_random_address(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + rf = RandomRefreshPublicKey(pgp, self._key_manager()) + key = OpenPGPKey(address='user@leap.se') + key_another = OpenPGPKey(address='zara@leap.se') + + pgp.get_all_keys = Mock(return_value=defer.succeed([key, key_another])) + + random_key = yield rf._get_random_key() + self.assertTrue(random_key.address == key.address or random_key.address == key_another.address) + + @defer.inlineCallbacks + def test_do_not_throw_error_for_empty_key_dict(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + rf = RandomRefreshPublicKey(pgp, self._key_manager()) + + pgp.get_all_keys = Mock(return_value=defer.succeed([])) + random_address = yield rf._get_random_key() + self.assertTrue(random_address is None) + + @defer.inlineCallbacks + def test_log_error_if_fetch_by_fingerprint_returns_wrong_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + km = self._key_manager() + + with patch.object(logging.Logger, 'error') as mock_logger_error: + rf = RandomRefreshPublicKey(pgp, km) + rf._get_random_key = \ + Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT))) + + km._nicknym.fetch_key_with_fingerprint = \ + Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP))) + + yield rf.maybe_refresh_key() + + mock_logger_error.assert_called_with(ERROR_UNEQUAL_FINGERPRINTS % + (KEY_FINGERPRINT, ANOTHER_FP)) + + @defer.inlineCallbacks + def test_put_new_key_in_local_storage(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + km = self._key_manager() + + rf = RandomRefreshPublicKey(pgp, km) + rf._get_random_key = Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT))) + + km._nicknym.fetch_key_with_fingerprint = \ + Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP))) + + yield rf.maybe_refresh_key() + + @defer.inlineCallbacks + def test_key_expired_will_be_deactivatet(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + km = self._key_manager() + rf = RandomRefreshPublicKey(pgp, km) + key = OpenPGPKey(address='zara@leap.se', expiry_date=datetime.now()) + + self.assertTrue(key.address is 'zara@leap.se') + + km._openpgp.unactivate_key = Mock(return_value=defer.succeed(None)) + + yield rf._maybe_unactivate_key(key) + + self.assertTrue(key.address is None) + self.assertFalse(key.is_active()) + + def test_start_refreshing(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + with patch.object(logging.Logger, 'debug') as mock_logger_start: + rf = RandomRefreshPublicKey(pgp, self._key_manager()) + rf.start() + mock_logger_start.assert_called_with(DEBUG_START_REFRESH) + rf.stop() + mock_logger_start.assert_called_with(DEBUG_STOP_REFRESH) + + def test_random_interval_is_set_properly(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + rf = RandomRefreshPublicKey(pgp, self._key_manager()) + self.assertTrue(rf._loop.interval >= MIN_RANDOM_INTERVAL_RANGE) + + def test_is_random_really_random(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + rf = RandomRefreshPublicKey(pgp, self._key_manager()) + + for x in range(0, 5): + random_numbers = [] + + for y in range(0, 5): + random_numbers.append(rf._random_interval_to_refersh()) + + # there are different numbers in the list + if len(random_numbers) == len(set(random_numbers)) \ + or len(random_numbers) == len(set(random_numbers)) + 1: + self.assertTrue(True) + else: + self.assertTrue(False) + + -- cgit v1.2.3