summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorZara Gebru <zgebru@thoughtworks.com>2016-07-08 11:55:55 +0200
committerKali Kaneko (leap communications) <kali@leap.se>2016-11-22 15:57:03 +0100
commite5717e853af7d2f91ac69e66c1b2ee058289e78d (patch)
tree1fd7058206dd934a8ad0444c0f40a82e3d95dd09 /tests
parentf02921a627e9ea0e6524e4b8e7744806e654a733 (diff)
[feature] keymanager: background update keys
Port of the original commit: 8f1fe8dd4a54fd2bdda2fc78c339ce9b3d0fc331 by Zara Gebru that introduced updating keys in the background. This was made in the legacy leapcode/keymanager repo, but was lost in the merge to the unified bitmask-dev. Original commit message follows: -------------------------------- - refresh random key in random time - add get key by fingerprint - refactor nicknym methods to own file - tests - note this do not include a check for revoked key, since that need some changes in gnupg - Related: #6089
Diffstat (limited to 'tests')
-rw-r--r--tests/integration/keymanager/test_keymanager.py61
-rw-r--r--tests/test_refresher.py149
2 files changed, 198 insertions, 12 deletions
diff --git a/tests/integration/keymanager/test_keymanager.py b/tests/integration/keymanager/test_keymanager.py
index 443902a..9b5de83 100644
--- a/tests/integration/keymanager/test_keymanager.py
+++ b/tests/integration/keymanager/test_keymanager.py
@@ -27,12 +27,16 @@ from os import path
from twisted.internet import defer
from twisted.trial import unittest
from twisted.web._responses import NOT_FOUND
+from twisted.web import client
import mock
from leap.common import ca_bundle
from leap.bitmask.keymanager import client
from leap.bitmask.keymanager import errors
from leap.bitmask.keymanager.keys import (
+
+from leap.keymanager import errors
+from leap.keymanager.keys import (
OpenPGPKey,
is_address,
build_key_from_dict,
@@ -222,24 +226,42 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
headers = {'Authorization': [str('Token token=%s' % token)]}
headers['Content-Type'] = ['application/x-www-form-urlencoded']
url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
- km._async_client_pinned.request.assert_called_once_with(
+ km._nicknym._async_client_pinned.request.assert_called_once_with(
str(url), 'PUT', body=str(data),
headers=headers
)
def test_fetch_keys_from_server(self):
"""
- Test that the request is well formed when fetching keys from server.
+ Test that the request is well formed when fetching keys from server
+ with address.
"""
km = self._key_manager(url=NICKSERVER_URI)
expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2
def verify_the_call(_):
- used_kwargs = km._async_client_pinned.request.call_args[1]
- km._async_client_pinned.request.assert_called_once_with(
+ used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
+ km._nicknym._async_client_pinned.request.assert_called_once_with(
expected_url, 'GET', **used_kwargs)
- d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
+ d = self._fetch_key_with_address(km, ADDRESS_2, PUBLIC_KEY_2)
+ d.addCallback(verify_the_call)
+ return d
+
+ def test_fetch_keys_from_server_with_fingerprint(self):
+ """
+ Test that the request is well formed when fetching keys from server
+ with fingerprint.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ expected_url = NICKSERVER_URI + '?fingerprint=' + KEY_FINGERPRINT
+
+ def verify_the_call(_):
+ used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
+ km._nicknym._async_client_pinned.request.assert_called_once_with(
+ expected_url, 'GET', **used_kwargs)
+
+ d = self._fetch_key_with_fingerprint(km, KEY_FINGERPRINT, PUBLIC_KEY)
d.addCallback(verify_the_call)
return d
@@ -254,16 +276,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
return_value=defer.succeed(None))
url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS
- d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS)
+ d = km._nicknym._fetch_and_handle_404_from_nicknym(url)
def check_key_not_found_is_raised_if_404(_):
- used_kwargs = km._async_client_pinned.request.call_args[1]
+ used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
check_404_callback = used_kwargs['callback']
fake_response = mock.Mock()
fake_response.code = NOT_FOUND
with self.assertRaisesRegexp(
errors.KeyNotFound,
- '404: %s key not found.' % INVALID_MAIL_ADDRESS):
+ '404: Key not found. Request: %s' % url.replace('?', '\?')):
check_404_callback(fake_response)
d.addCallback(check_key_not_found_is_raised_if_404)
@@ -282,7 +304,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def assert_key_not_found_raised(error):
self.assertEqual(error.value, key_not_found_exception)
- d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS)
+ d = km._nicknym.fetch_key_with_address(INVALID_MAIL_ADDRESS)
d.addErrback(assert_key_not_found_raised)
@defer.inlineCallbacks
@@ -292,7 +314,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
- key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
+ key = yield self._fetch_key_with_address(km, ADDRESS, PUBLIC_KEY)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.uids)
self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
@@ -304,12 +326,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url=NICKSERVER_URI)
- key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
+ key = yield self._fetch_key_with_address(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS_OTHER in key.uids)
self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
- def _fetch_key(self, km, address, key):
+ def _fetch_key_with_address(self, km, address, key):
"""
:returns: a Deferred that will fire with the OpenPGPKey
"""
@@ -328,6 +350,21 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
d.addCallback(lambda _: km.get_key(address))
return d
+ def _fetch_key_with_fingerprint(self, km, fingerprint, key):
+ """
+ :returns: a Deferred that will fire with the OpenPGPKey
+ """
+ data = json.dumps({'fingerprint': fingerprint, 'openpgp': key})
+
+ client.readBody = Mock(return_value=defer.succeed(data))
+
+ # mock the fetcher so it returns the key for KEY_FINGERPRINT
+ km._nicknym._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
+ km.ca_cert_path = 'cacertpath'
+ key = km._nicknym.fetch_key_with_fingerprint(fingerprint)
+ return key
+
@defer.inlineCallbacks
def test_put_key_ascii(self):
"""
diff --git a/tests/test_refresher.py b/tests/test_refresher.py
new file mode 100644
index 0000000..13a46d4
--- /dev/null
+++ b/tests/test_refresher.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# test_refresher.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for refreshing the key directory.
+"""
+
+import logging
+from datetime import datetime
+
+from mock import Mock, patch
+from twisted.internet import defer
+
+from common import KeyManagerWithSoledadTestCase, KEY_FINGERPRINT
+from leap.keymanager import openpgp
+from leap.keymanager.keys import OpenPGPKey
+from leap.keymanager.refresher import RandomRefreshPublicKey, MIN_RANDOM_INTERVAL_RANGE, DEBUG_START_REFRESH, \
+ DEBUG_STOP_REFRESH, ERROR_UNEQUAL_FINGERPRINTS
+
+ANOTHER_FP = 'ANOTHERFINGERPRINT'
+
+logger = logging.getLogger(__name__)
+
+
+class RandomRefreshPublicKeyTestCase(KeyManagerWithSoledadTestCase):
+
+ @defer.inlineCallbacks
+ def test_get_random_address(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ rf = RandomRefreshPublicKey(pgp, self._key_manager())
+ key = OpenPGPKey(address='user@leap.se')
+ key_another = OpenPGPKey(address='zara@leap.se')
+
+ pgp.get_all_keys = Mock(return_value=defer.succeed([key, key_another]))
+
+ random_key = yield rf._get_random_key()
+ self.assertTrue(random_key.address == key.address or random_key.address == key_another.address)
+
+ @defer.inlineCallbacks
+ def test_do_not_throw_error_for_empty_key_dict(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ rf = RandomRefreshPublicKey(pgp, self._key_manager())
+
+ pgp.get_all_keys = Mock(return_value=defer.succeed([]))
+ random_address = yield rf._get_random_key()
+ self.assertTrue(random_address is None)
+
+ @defer.inlineCallbacks
+ def test_log_error_if_fetch_by_fingerprint_returns_wrong_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ km = self._key_manager()
+
+ with patch.object(logging.Logger, 'error') as mock_logger_error:
+ rf = RandomRefreshPublicKey(pgp, km)
+ rf._get_random_key = \
+ Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT)))
+
+ km._nicknym.fetch_key_with_fingerprint = \
+ Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP)))
+
+ yield rf.maybe_refresh_key()
+
+ mock_logger_error.assert_called_with(ERROR_UNEQUAL_FINGERPRINTS %
+ (KEY_FINGERPRINT, ANOTHER_FP))
+
+ @defer.inlineCallbacks
+ def test_put_new_key_in_local_storage(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ km = self._key_manager()
+
+ rf = RandomRefreshPublicKey(pgp, km)
+ rf._get_random_key = Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT)))
+
+ km._nicknym.fetch_key_with_fingerprint = \
+ Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP)))
+
+ yield rf.maybe_refresh_key()
+
+ @defer.inlineCallbacks
+ def test_key_expired_will_be_deactivatet(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ km = self._key_manager()
+ rf = RandomRefreshPublicKey(pgp, km)
+ key = OpenPGPKey(address='zara@leap.se', expiry_date=datetime.now())
+
+ self.assertTrue(key.address is 'zara@leap.se')
+
+ km._openpgp.unactivate_key = Mock(return_value=defer.succeed(None))
+
+ yield rf._maybe_unactivate_key(key)
+
+ self.assertTrue(key.address is None)
+ self.assertFalse(key.is_active())
+
+ def test_start_refreshing(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ with patch.object(logging.Logger, 'debug') as mock_logger_start:
+ rf = RandomRefreshPublicKey(pgp, self._key_manager())
+ rf.start()
+ mock_logger_start.assert_called_with(DEBUG_START_REFRESH)
+ rf.stop()
+ mock_logger_start.assert_called_with(DEBUG_STOP_REFRESH)
+
+ def test_random_interval_is_set_properly(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ rf = RandomRefreshPublicKey(pgp, self._key_manager())
+ self.assertTrue(rf._loop.interval >= MIN_RANDOM_INTERVAL_RANGE)
+
+ def test_is_random_really_random(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ rf = RandomRefreshPublicKey(pgp, self._key_manager())
+
+ for x in range(0, 5):
+ random_numbers = []
+
+ for y in range(0, 5):
+ random_numbers.append(rf._random_interval_to_refersh())
+
+ # there are different numbers in the list
+ if len(random_numbers) == len(set(random_numbers)) \
+ or len(random_numbers) == len(set(random_numbers)) + 1:
+ self.assertTrue(True)
+ else:
+ self.assertTrue(False)
+
+