diff options
| author | Zara Gebru <zgebru@thoughtworks.com> | 2016-07-08 11:55:55 +0200 | 
|---|---|---|
| committer | Kali Kaneko (leap communications) <kali@leap.se> | 2016-11-22 15:57:03 +0100 | 
| commit | e5717e853af7d2f91ac69e66c1b2ee058289e78d (patch) | |
| tree | 1fd7058206dd934a8ad0444c0f40a82e3d95dd09 /tests | |
| parent | f02921a627e9ea0e6524e4b8e7744806e654a733 (diff) | |
[feature] keymanager: background update keys
Port of the original commit:
8f1fe8dd4a54fd2bdda2fc78c339ce9b3d0fc331
by Zara Gebru that introduced updating keys in the background.
This was made in the legacy leapcode/keymanager repo, but was lost in
the merge to the unified bitmask-dev.
Original commit message follows:
--------------------------------
- refresh random key in random time
- add get key by fingerprint
- refactor nicknym methods to own file
- tests
- note this do not include a check for
revoked key, since that need some changes
in gnupg
- Related: #6089
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/integration/keymanager/test_keymanager.py | 61 | ||||
| -rw-r--r-- | tests/test_refresher.py | 149 | 
2 files changed, 198 insertions, 12 deletions
| diff --git a/tests/integration/keymanager/test_keymanager.py b/tests/integration/keymanager/test_keymanager.py index 443902ab..9b5de831 100644 --- a/tests/integration/keymanager/test_keymanager.py +++ b/tests/integration/keymanager/test_keymanager.py @@ -27,12 +27,16 @@ from os import path  from twisted.internet import defer  from twisted.trial import unittest  from twisted.web._responses import NOT_FOUND +from twisted.web import client  import mock  from leap.common import ca_bundle  from leap.bitmask.keymanager import client  from leap.bitmask.keymanager import errors  from leap.bitmask.keymanager.keys import ( + +from leap.keymanager import errors +from leap.keymanager.keys import (      OpenPGPKey,      is_address,      build_key_from_dict, @@ -222,24 +226,42 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          headers = {'Authorization': [str('Token token=%s' % token)]}          headers['Content-Type'] = ['application/x-www-form-urlencoded']          url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid') -        km._async_client_pinned.request.assert_called_once_with( +        km._nicknym._async_client_pinned.request.assert_called_once_with(              str(url), 'PUT', body=str(data),              headers=headers          )      def test_fetch_keys_from_server(self):          """ -        Test that the request is well formed when fetching keys from server. +        Test that the request is well formed when fetching keys from server +        with address.          """          km = self._key_manager(url=NICKSERVER_URI)          expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2          def verify_the_call(_): -            used_kwargs = km._async_client_pinned.request.call_args[1] -            km._async_client_pinned.request.assert_called_once_with( +            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] +            km._nicknym._async_client_pinned.request.assert_called_once_with(                  expected_url, 'GET', **used_kwargs) -        d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2) +        d = self._fetch_key_with_address(km, ADDRESS_2, PUBLIC_KEY_2) +        d.addCallback(verify_the_call) +        return d + +    def test_fetch_keys_from_server_with_fingerprint(self): +        """ +        Test that the request is well formed when fetching keys from server +        with fingerprint. +        """ +        km = self._key_manager(url=NICKSERVER_URI) +        expected_url = NICKSERVER_URI + '?fingerprint=' + KEY_FINGERPRINT + +        def verify_the_call(_): +            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] +            km._nicknym._async_client_pinned.request.assert_called_once_with( +                expected_url, 'GET', **used_kwargs) + +        d = self._fetch_key_with_fingerprint(km, KEY_FINGERPRINT, PUBLIC_KEY)          d.addCallback(verify_the_call)          return d @@ -254,16 +276,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              return_value=defer.succeed(None))          url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS -        d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) +        d = km._nicknym._fetch_and_handle_404_from_nicknym(url)          def check_key_not_found_is_raised_if_404(_): -            used_kwargs = km._async_client_pinned.request.call_args[1] +            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]              check_404_callback = used_kwargs['callback']              fake_response = mock.Mock()              fake_response.code = NOT_FOUND              with self.assertRaisesRegexp(                      errors.KeyNotFound, -                    '404: %s key not found.' % INVALID_MAIL_ADDRESS): +                    '404: Key not found. Request: %s' % url.replace('?', '\?')):                  check_404_callback(fake_response)          d.addCallback(check_key_not_found_is_raised_if_404) @@ -282,7 +304,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          def assert_key_not_found_raised(error):              self.assertEqual(error.value, key_not_found_exception) -        d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS) +        d = km._nicknym.fetch_key_with_address(INVALID_MAIL_ADDRESS)          d.addErrback(assert_key_not_found_raised)      @defer.inlineCallbacks @@ -292,7 +314,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url=NICKSERVER_URI) -        key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) +        key = yield self._fetch_key_with_address(km, ADDRESS, PUBLIC_KEY)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS in key.uids)          self.assertEqual(key.validation, ValidationLevels.Provider_Trust) @@ -304,12 +326,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url=NICKSERVER_URI) -        key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) +        key = yield self._fetch_key_with_address(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS_OTHER in key.uids)          self.assertEqual(key.validation, ValidationLevels.Weak_Chain) -    def _fetch_key(self, km, address, key): +    def _fetch_key_with_address(self, km, address, key):          """          :returns: a Deferred that will fire with the OpenPGPKey          """ @@ -328,6 +350,21 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          d.addCallback(lambda _: km.get_key(address))          return d +    def _fetch_key_with_fingerprint(self, km, fingerprint, key): +        """ +        :returns: a Deferred that will fire with the OpenPGPKey +        """ +        data = json.dumps({'fingerprint': fingerprint, 'openpgp': key}) + +        client.readBody = Mock(return_value=defer.succeed(data)) + +        # mock the fetcher so it returns the key for KEY_FINGERPRINT +        km._nicknym._async_client_pinned.request = Mock( +            return_value=defer.succeed(None)) +        km.ca_cert_path = 'cacertpath' +        key = km._nicknym.fetch_key_with_fingerprint(fingerprint) +        return key +      @defer.inlineCallbacks      def test_put_key_ascii(self):          """ diff --git a/tests/test_refresher.py b/tests/test_refresher.py new file mode 100644 index 00000000..13a46d47 --- /dev/null +++ b/tests/test_refresher.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# test_refresher.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Tests for refreshing the key directory. +""" + +import logging +from datetime import datetime + +from mock import Mock, patch +from twisted.internet import defer + +from common import KeyManagerWithSoledadTestCase, KEY_FINGERPRINT +from leap.keymanager import openpgp +from leap.keymanager.keys import OpenPGPKey +from leap.keymanager.refresher import RandomRefreshPublicKey, MIN_RANDOM_INTERVAL_RANGE, DEBUG_START_REFRESH, \ +    DEBUG_STOP_REFRESH, ERROR_UNEQUAL_FINGERPRINTS + +ANOTHER_FP = 'ANOTHERFINGERPRINT' + +logger = logging.getLogger(__name__) + + +class RandomRefreshPublicKeyTestCase(KeyManagerWithSoledadTestCase): + +    @defer.inlineCallbacks +    def test_get_random_address(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) +        key = OpenPGPKey(address='user@leap.se') +        key_another = OpenPGPKey(address='zara@leap.se') + +        pgp.get_all_keys = Mock(return_value=defer.succeed([key, key_another])) + +        random_key = yield rf._get_random_key() +        self.assertTrue(random_key.address == key.address or random_key.address == key_another.address) + +    @defer.inlineCallbacks +    def test_do_not_throw_error_for_empty_key_dict(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) + +        pgp.get_all_keys = Mock(return_value=defer.succeed([])) +        random_address = yield rf._get_random_key() +        self.assertTrue(random_address is None) + +    @defer.inlineCallbacks +    def test_log_error_if_fetch_by_fingerprint_returns_wrong_key(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        km = self._key_manager() + +        with patch.object(logging.Logger, 'error') as mock_logger_error: +            rf = RandomRefreshPublicKey(pgp, km) +            rf._get_random_key = \ +                Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT))) + +            km._nicknym.fetch_key_with_fingerprint = \ +                Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP))) + +            yield rf.maybe_refresh_key() + +            mock_logger_error.assert_called_with(ERROR_UNEQUAL_FINGERPRINTS % +                                                 (KEY_FINGERPRINT, ANOTHER_FP)) + +    @defer.inlineCallbacks +    def test_put_new_key_in_local_storage(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        km = self._key_manager() + +        rf = RandomRefreshPublicKey(pgp, km) +        rf._get_random_key = Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT))) + +        km._nicknym.fetch_key_with_fingerprint = \ +            Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP))) + +        yield rf.maybe_refresh_key() + +    @defer.inlineCallbacks +    def test_key_expired_will_be_deactivatet(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        km = self._key_manager() +        rf = RandomRefreshPublicKey(pgp, km) +        key = OpenPGPKey(address='zara@leap.se', expiry_date=datetime.now()) + +        self.assertTrue(key.address is 'zara@leap.se') + +        km._openpgp.unactivate_key = Mock(return_value=defer.succeed(None)) + +        yield rf._maybe_unactivate_key(key) + +        self.assertTrue(key.address is None) +        self.assertFalse(key.is_active()) + +    def test_start_refreshing(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) + +        with patch.object(logging.Logger, 'debug') as mock_logger_start: +            rf = RandomRefreshPublicKey(pgp, self._key_manager()) +            rf.start() +            mock_logger_start.assert_called_with(DEBUG_START_REFRESH) +            rf.stop() +            mock_logger_start.assert_called_with(DEBUG_STOP_REFRESH) + +    def test_random_interval_is_set_properly(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) +        self.assertTrue(rf._loop.interval >= MIN_RANDOM_INTERVAL_RANGE) + +    def test_is_random_really_random(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) + +        for x in range(0, 5): +            random_numbers = [] + +            for y in range(0, 5): +                random_numbers.append(rf._random_interval_to_refersh()) + +                # there are different numbers in the list +                if len(random_numbers) == len(set(random_numbers)) \ +                        or len(random_numbers) == len(set(random_numbers)) + 1: +                    self.assertTrue(True) +                else: +                    self.assertTrue(False) + + | 
