diff options
| -rw-r--r-- | src/leap/bitmask/keymanager/__init__.py | 59 | ||||
| -rw-r--r-- | src/leap/bitmask/keymanager/keys.py | 22 | ||||
| -rw-r--r-- | src/leap/bitmask/keymanager/nicknym.py | 181 | ||||
| -rw-r--r-- | src/leap/bitmask/keymanager/openpgp.py | 15 | ||||
| -rw-r--r-- | src/leap/bitmask/keymanager/refresher.py | 132 | ||||
| -rw-r--r-- | tests/integration/keymanager/test_keymanager.py | 61 | ||||
| -rw-r--r-- | tests/test_refresher.py | 149 | 
7 files changed, 587 insertions, 32 deletions
| diff --git a/src/leap/bitmask/keymanager/__init__.py b/src/leap/bitmask/keymanager/__init__.py index 27c9853..637391c 100644 --- a/src/leap/bitmask/keymanager/__init__.py +++ b/src/leap/bitmask/keymanager/__init__.py @@ -22,8 +22,6 @@ import fileinput  import os  import sys  import tempfile -import json -import urllib  from urlparse import urlparse @@ -34,9 +32,10 @@ from twisted.web._responses import NOT_FOUND  from leap.common import ca_bundle  from leap.common.check import leap_assert +from leap.common.decorators import memoized_method  from leap.common.http import HTTPClient  from leap.common.events import emit_async, catalog -from leap.common.decorators import memoized_method +from leap.bitmask.keymanager.nicknym import Nicknym  from leap.bitmask.keymanager.errors import (      KeyNotFound, @@ -105,12 +104,20 @@ class KeyManager(object):              self._combined_ca_bundle = ''          self._async_client = HTTPClient(self._combined_ca_bundle) -        self._async_client_pinned = HTTPClient(self._ca_cert_path) +        self._nicknym = Nicknym(self._nickserver_uri, self._ca_cert_path, self._token) +        self.refresher = None      #      # utilities      # +    def start_refresher(self): +        self.refresher = RandomRefreshPublicKey(self._openpgp, self) +        self.refresher.start() + +    def stop_refresher(self): +        self.refresher.stop() +      def _create_combined_bundle_file(self):          leap_ca_bundle = ca_bundle.where() @@ -303,14 +310,7 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """          def send(pubkey): -            data = { -                self.PUBKEY_KEY: pubkey.key_data -            } -            uri = "%s/%s/users/%s.json" % ( -                self._api_uri, -                self._api_version, -                self._uid) -            d = self._put(uri, data) +            d = self._nicknym.put_key(self.uid, pubkey.key_data, self._api_uri, self._api_version)              d.addCallback(lambda _:                            emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS,                                       self._address)) @@ -321,6 +321,36 @@ class KeyManager(object):          d.addCallback(send)          return d +    @defer.inlineCallbacks +    def _fetch_keys_from_server_and_store_local(self, address): +        """ +        Fetch keys  from nickserver and insert them in locale database. + +        :param address: The address bound to the keys. +        :type address: str + +        :return: A Deferred which fires when the key is in the storage, +                     or which fails with KeyNotFound if the key was not found on +                     nickserver. +            :rtype: Deferred + +        """ +        server_keys = yield self._nicknym.fetch_key_with_address(address) + +        # insert keys in local database +        if self.OPENPGP_KEY in server_keys: +            # nicknym server is authoritative for its own domain, +            # for other domains the key might come from key servers. +            validation_level = ValidationLevels.Weak_Chain +            _, domain = _split_email(address) +            if (domain == _get_domain(self._nickserver_uri)): +                validation_level = ValidationLevels.Provider_Trust + +        yield self.put_raw_key( +            server_keys['openpgp'], +            address=address, +            validation=validation_level) +      def get_key(self, address, private=False, fetch_remote=True):          """          Return a key bound to address. @@ -364,7 +394,7 @@ class KeyManager(object):                  return failure              emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) -            d = self._fetch_keys_from_server(address) +            d = self._fetch_keys_from_server_and_store_local(address)              d.addCallback(                  lambda _: self._openpgp.get_key(address, private=False))              d.addCallback(key_found) @@ -396,7 +426,6 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -          def signal_finished(key):              emit_async(                  catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address) @@ -636,7 +665,6 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -          def verify(pubkey):              signed = self._openpgp.verify(                  data, pubkey, detached_sig=detached_sig) @@ -686,7 +714,6 @@ class KeyManager(object):          :raise UnsupportedKeyTypeError: if invalid key type          """ -          def old_key_not_found(failure):              if failure.check(KeyNotFound):                  return None diff --git a/src/leap/bitmask/keymanager/keys.py b/src/leap/bitmask/keymanager/keys.py index 45a7d72..1b236c3 100644 --- a/src/leap/bitmask/keymanager/keys.py +++ b/src/leap/bitmask/keymanager/keys.py @@ -272,6 +272,28 @@ class OpenPGPKey(object):              self.address,              "priv" if self.private else "publ") +    def is_active(self): +        """ +        Indicates if a key is active. +        :return: True if key is active. +        :rtype: bool +        """ +        return True if self.address is not None else False + +    def set_unactive(self): +        """ +        Sets a key as unactive. +        """ +        self.address = None + +    def is_expired(self): +        """ +        Indicates if a key is expired. +        :return: True if key expired. +        :rtype: bool +        """ +        return False if self.expiry_date is None else self.expiry_date < datetime.now() +  def parse_address(address):      """ diff --git a/src/leap/bitmask/keymanager/nicknym.py b/src/leap/bitmask/keymanager/nicknym.py new file mode 100644 index 0000000..7af49d6 --- /dev/null +++ b/src/leap/bitmask/keymanager/nicknym.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +# nicknym.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import sys +import logging +import urllib + +from twisted.internet import defer +from twisted.web import client +from twisted.web._responses import NOT_FOUND + +from leap.bitmask.keymanager.errors import KeyNotFound +from leap.common.check import leap_assert +from leap.common.http import HTTPClient +from leap.common.decorators import memoized_method + + +logger = logging.getLogger(__name__) + + +class Nicknym(object): +    """ +    Responsible for communication to the nicknym server. +    """ + +    PUBKEY_KEY = "user[public_key]" + +    def __init__(self, nickserver_uri, ca_cert_path, token): +        self._nickserver_uri = nickserver_uri +        self._async_client_pinned = HTTPClient(ca_cert_path) +        self._token = token + +    @defer.inlineCallbacks +    def put_key(self, uid, key_data, api_uri, api_version): +        """ +        Send a PUT request to C{uri} containing C{data}. + +        The request will be sent using the configured CA certificate path to +        verify the server certificate and the configured session id for +        authentication. + +        :param uid: The URI of the request. +        :type uid: str +        :param key_data: The body of the request. +        :type key_data: dict, str or file + +        :return: A deferred that will be fired when PUT request finishes +        :rtype: Deferred +        """ +        data = { +            self.PUBKEY_KEY: key_data +        } + +        uri = "%s/%s/users/%s.json" % ( +            api_uri, +            api_version, +            uid) + +        leap_assert( +            self._token is not None, +            'We need a token to interact with webapp!') +        if type(data) == dict: +            data = urllib.urlencode(data) +        headers = {'Authorization': [str('Token token=%s' % self._token)]} +        headers['Content-Type'] = ['application/x-www-form-urlencoded'] +        try: +            res = yield self._async_client_pinned.request(str(uri), 'PUT', +                                                          body=str(data), +                                                          headers=headers) +        except Exception as e: +            logger.warning("Error uploading key: %r" % (e,)) +            raise e +        if 'error' in res: +            # FIXME: That's a workaround for 500, +            # we need to implement a readBody to assert response code +            logger.warning("Error uploading key: %r" % (res,)) +            raise Exception(res) + +    @defer.inlineCallbacks +    def _get_key_from_nicknym(self, uri): +        """ +        Send a GET request to C{uri} containing C{data}. + +        :param uri: The URI of the request. +        :type uri: str + +        :return: A deferred that will be fired with GET content as json (dict) +        :rtype: Deferred +        """ +        try: +            content = yield self._fetch_and_handle_404_from_nicknym(uri) +            json_content = json.loads(content) + +        except KeyNotFound: +            raise +        except IOError as e: +            logger.warning("HTTP error retrieving key: %r" % (e,)) +            logger.warning("%s" % (content,)) +            raise KeyNotFound(e.message), None, sys.exc_info()[2] +        except ValueError as v: +            logger.warning("Invalid JSON data from key: %s" % (uri,)) +            raise KeyNotFound(v.message + ' - ' + uri), None, sys.exc_info()[2] + +        except Exception as e: +            logger.warning("Error retrieving key: %r" % (e,)) +            raise KeyNotFound(e.message), None, sys.exc_info()[2] +        # Responses are now text/plain, although it's json anyway, but +        # this will fail when it shouldn't +        # leap_assert( +        #     res.headers['content-type'].startswith('application/json'), +        #     'Content-type is not JSON.') +        defer.returnValue(json_content) + +    def _fetch_and_handle_404_from_nicknym(self, uri): +        """ +        Send a GET request to C{uri} containing C{data}. + +        :param uri: The URI of the request. +        :type uri: str + +        :return: A deferred that will be fired with GET content as json (dict) +        :rtype: Deferred +        """ + +        def check_404(response): +            if response.code == NOT_FOUND: +                message = ' %s: Key not found. Request: %s' % (response.code, uri) +                logger.warning(message) +                raise KeyNotFound(message), None, sys.exc_info()[2] +            return response + +        d = self._async_client_pinned.request(str(uri), 'GET', callback=check_404) +        d.addCallback(client.readBody) +        return d + +    @memoized_method(invalidation=300) +    def fetch_key_with_address(self, address): +        """ +        Fetch keys bound to address from nickserver. + +        :param address: The address bound to the keys. +        :type address: str + +        :return: A Deferred which fires when the key is in the storage, +                 or which fails with KeyNotFound if the key was not found on +                 nickserver. +        :rtype: Deferred + +        """ +        return self._get_key_from_nicknym(self._nickserver_uri + '?address=' + address) + +    @memoized_method(invalidation=300) +    def fetch_key_with_fingerprint(self, fingerprint): +        """ +        Fetch keys bound to fingerprint from nickserver. + +        :param fingerprint: The fingerprint bound to the keys. +        :type fingerprint: str + +        :return: A Deferred which fires when the key is in the storage, +                 or which fails with KeyNotFound if the key was not found on +                 nickserver. +        :rtype: Deferred + +        """ +        return self._get_key_from_nicknym(self._nickserver_uri + '?fingerprint=' + fingerprint) diff --git a/src/leap/bitmask/keymanager/openpgp.py b/src/leap/bitmask/keymanager/openpgp.py index 02b5456..b256ff0 100644 --- a/src/leap/bitmask/keymanager/openpgp.py +++ b/src/leap/bitmask/keymanager/openpgp.py @@ -410,10 +410,9 @@ class OpenPGPScheme(object):              active_json = key.get_active_json()              if activedoc:                  activedoc.set_json(active_json) -                d = self._soledad.put_doc(activedoc) -            else: -                d = self._soledad.create_doc_from_json(active_json) -            return d +                return self._soledad.put_doc(activedoc) +            elif key.is_active(): +                return self._soledad.create_doc_from_json(active_json)          def get_active_doc(keydoc):              d = self._get_active_doc_from_address(key.address, key.private) @@ -534,6 +533,14 @@ class OpenPGPScheme(object):          d.addCallback(delete_key)          return d +    def unactivate_key(self, address): +        """ +        Mark a active doc as deleted. +        :param address: The unique address for the active content. +        """ +        active_doc = self._get_active_doc_from_address(address, False) +        yield self._soledad.delete_doc(active_doc) +      #      # Data encryption, decryption, signing and verifying      # diff --git a/src/leap/bitmask/keymanager/refresher.py b/src/leap/bitmask/keymanager/refresher.py new file mode 100644 index 0000000..cd9db28 --- /dev/null +++ b/src/leap/bitmask/keymanager/refresher.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# refresher.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A service which continuous refreshes the (public) key directories randomly in a random time interval. +""" + +import logging + +from twisted.internet.task import LoopingCall +from twisted.internet import defer +from random import choice, randrange + +DEBUG_STOP_REFRESH = "Stop to refresh the key directory ..." +DEBUG_START_REFRESH = "Start to refresh the key directory ..." +ERROR_UNEQUAL_FINGERPRINTS = "[WARNING] Your provider might be cheat on you, " \ +                             "and gave a wrong key back. Fingerprints are unequal, old %s new %s " + +MIN_RANDOM_INTERVAL_RANGE = 4 * 60  # four minutes +MAX_RANDOM_INTERVAL_RANGE = 6 * 60  # six minutes + +logger = logging.getLogger(__name__) + + +class RandomRefreshPublicKey(object): + +    def __init__(self, openpgp, keymanager): +        """ +        Initialize the RandomRefreshPublicKey. +        :param openpgp: Openpgp object. +        :param keymanager: The key manager. +        """ +        self._openpgp = openpgp +        self._keymanger = keymanager +        self._loop = LoopingCall(self._refresh_continuous) +        self._loop.interval = self._random_interval_to_refersh() + +    def start(self): +        """ +        Start the looping call with random interval +        [MIN_RANDOM_INTERVAL_RANGE, MAX_RANDOM_INTERVAL_RANGE] +        :return: LoopingCall to start the service. +        :rtype: A deferred. +        """ +        self._loop.start(self._random_interval_to_refersh(), False) +        logger.debug(DEBUG_START_REFRESH) + +    def stop(self): +        """ +        Stop the looping call with random interval. +        """ +        self._loop.stop() +        logger.debug(DEBUG_STOP_REFRESH) + +    @defer.inlineCallbacks +    def _get_random_key(self): +        """ +        Get a random key of all the keys in a users key doc. +        :return: A random key. +        :rtype: A deferred. +        """ +        # TODO maybe make a check first if key is active and get another one then. +        keys = yield self._openpgp.get_all_keys() +        defer.returnValue(None if keys is None or keys == [] else choice(keys)) + +    @defer.inlineCallbacks +    def _refresh_continuous(self): +        """ +        The LoopingCall to refresh the key doc continuously. +        """ +        self._loop.interval = self._random_interval_to_refersh() +        yield self.maybe_refresh_key() + +    @defer.inlineCallbacks +    def _maybe_unactivate_key(self, key): +        """ +        Unactivate a given key. +        :param key: The key to be unactivated. +        """ +        if key.is_expired() and key.is_active():  # TODO or is_revoked +            yield self._openpgp.unactivate_key(key.address) +            key.set_unactive() + +    @defer.inlineCallbacks +    def maybe_refresh_key(self): +        """ +        Get key from nicknym and try to refresh. +        """ +        old_key = yield self._get_random_key() + +        if old_key is None: +            defer.returnValue(None) + +        old_updated_key = yield self._keymanger._nicknym.\ +            fetch_key_with_fingerprint(old_key.fingerprint) + +        if old_updated_key.fingerprint != old_key.fingerprint: +            logger.error(ERROR_UNEQUAL_FINGERPRINTS % +                         (old_key.fingerprint, old_updated_key.fingerprint)) +            defer.returnValue(None) + +        yield self._maybe_unactivate_key(old_updated_key) +        yield self._openpgp.put_key(old_updated_key) + +        # No new fetch by address needed, bc that will happen before sending an email +        # could be discussed since fetching before sending an email leaks information. + +    def _random_interval_to_refersh(self): +        """ +        Random interval. +        :return: A number in this interval. +        """ +        return randrange(MIN_RANDOM_INTERVAL_RANGE, MAX_RANDOM_INTERVAL_RANGE) + + + + diff --git a/tests/integration/keymanager/test_keymanager.py b/tests/integration/keymanager/test_keymanager.py index 443902a..9b5de83 100644 --- a/tests/integration/keymanager/test_keymanager.py +++ b/tests/integration/keymanager/test_keymanager.py @@ -27,12 +27,16 @@ from os import path  from twisted.internet import defer  from twisted.trial import unittest  from twisted.web._responses import NOT_FOUND +from twisted.web import client  import mock  from leap.common import ca_bundle  from leap.bitmask.keymanager import client  from leap.bitmask.keymanager import errors  from leap.bitmask.keymanager.keys import ( + +from leap.keymanager import errors +from leap.keymanager.keys import (      OpenPGPKey,      is_address,      build_key_from_dict, @@ -222,24 +226,42 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          headers = {'Authorization': [str('Token token=%s' % token)]}          headers['Content-Type'] = ['application/x-www-form-urlencoded']          url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid') -        km._async_client_pinned.request.assert_called_once_with( +        km._nicknym._async_client_pinned.request.assert_called_once_with(              str(url), 'PUT', body=str(data),              headers=headers          )      def test_fetch_keys_from_server(self):          """ -        Test that the request is well formed when fetching keys from server. +        Test that the request is well formed when fetching keys from server +        with address.          """          km = self._key_manager(url=NICKSERVER_URI)          expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2          def verify_the_call(_): -            used_kwargs = km._async_client_pinned.request.call_args[1] -            km._async_client_pinned.request.assert_called_once_with( +            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] +            km._nicknym._async_client_pinned.request.assert_called_once_with(                  expected_url, 'GET', **used_kwargs) -        d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2) +        d = self._fetch_key_with_address(km, ADDRESS_2, PUBLIC_KEY_2) +        d.addCallback(verify_the_call) +        return d + +    def test_fetch_keys_from_server_with_fingerprint(self): +        """ +        Test that the request is well formed when fetching keys from server +        with fingerprint. +        """ +        km = self._key_manager(url=NICKSERVER_URI) +        expected_url = NICKSERVER_URI + '?fingerprint=' + KEY_FINGERPRINT + +        def verify_the_call(_): +            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1] +            km._nicknym._async_client_pinned.request.assert_called_once_with( +                expected_url, 'GET', **used_kwargs) + +        d = self._fetch_key_with_fingerprint(km, KEY_FINGERPRINT, PUBLIC_KEY)          d.addCallback(verify_the_call)          return d @@ -254,16 +276,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):              return_value=defer.succeed(None))          url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS -        d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) +        d = km._nicknym._fetch_and_handle_404_from_nicknym(url)          def check_key_not_found_is_raised_if_404(_): -            used_kwargs = km._async_client_pinned.request.call_args[1] +            used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]              check_404_callback = used_kwargs['callback']              fake_response = mock.Mock()              fake_response.code = NOT_FOUND              with self.assertRaisesRegexp(                      errors.KeyNotFound, -                    '404: %s key not found.' % INVALID_MAIL_ADDRESS): +                    '404: Key not found. Request: %s' % url.replace('?', '\?')):                  check_404_callback(fake_response)          d.addCallback(check_key_not_found_is_raised_if_404) @@ -282,7 +304,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          def assert_key_not_found_raised(error):              self.assertEqual(error.value, key_not_found_exception) -        d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS) +        d = km._nicknym.fetch_key_with_address(INVALID_MAIL_ADDRESS)          d.addErrback(assert_key_not_found_raised)      @defer.inlineCallbacks @@ -292,7 +314,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url=NICKSERVER_URI) -        key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) +        key = yield self._fetch_key_with_address(km, ADDRESS, PUBLIC_KEY)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS in key.uids)          self.assertEqual(key.validation, ValidationLevels.Provider_Trust) @@ -304,12 +326,12 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url=NICKSERVER_URI) -        key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) +        key = yield self._fetch_key_with_address(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS_OTHER in key.uids)          self.assertEqual(key.validation, ValidationLevels.Weak_Chain) -    def _fetch_key(self, km, address, key): +    def _fetch_key_with_address(self, km, address, key):          """          :returns: a Deferred that will fire with the OpenPGPKey          """ @@ -328,6 +350,21 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          d.addCallback(lambda _: km.get_key(address))          return d +    def _fetch_key_with_fingerprint(self, km, fingerprint, key): +        """ +        :returns: a Deferred that will fire with the OpenPGPKey +        """ +        data = json.dumps({'fingerprint': fingerprint, 'openpgp': key}) + +        client.readBody = Mock(return_value=defer.succeed(data)) + +        # mock the fetcher so it returns the key for KEY_FINGERPRINT +        km._nicknym._async_client_pinned.request = Mock( +            return_value=defer.succeed(None)) +        km.ca_cert_path = 'cacertpath' +        key = km._nicknym.fetch_key_with_fingerprint(fingerprint) +        return key +      @defer.inlineCallbacks      def test_put_key_ascii(self):          """ diff --git a/tests/test_refresher.py b/tests/test_refresher.py new file mode 100644 index 0000000..13a46d4 --- /dev/null +++ b/tests/test_refresher.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# test_refresher.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Tests for refreshing the key directory. +""" + +import logging +from datetime import datetime + +from mock import Mock, patch +from twisted.internet import defer + +from common import KeyManagerWithSoledadTestCase, KEY_FINGERPRINT +from leap.keymanager import openpgp +from leap.keymanager.keys import OpenPGPKey +from leap.keymanager.refresher import RandomRefreshPublicKey, MIN_RANDOM_INTERVAL_RANGE, DEBUG_START_REFRESH, \ +    DEBUG_STOP_REFRESH, ERROR_UNEQUAL_FINGERPRINTS + +ANOTHER_FP = 'ANOTHERFINGERPRINT' + +logger = logging.getLogger(__name__) + + +class RandomRefreshPublicKeyTestCase(KeyManagerWithSoledadTestCase): + +    @defer.inlineCallbacks +    def test_get_random_address(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) +        key = OpenPGPKey(address='user@leap.se') +        key_another = OpenPGPKey(address='zara@leap.se') + +        pgp.get_all_keys = Mock(return_value=defer.succeed([key, key_another])) + +        random_key = yield rf._get_random_key() +        self.assertTrue(random_key.address == key.address or random_key.address == key_another.address) + +    @defer.inlineCallbacks +    def test_do_not_throw_error_for_empty_key_dict(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) + +        pgp.get_all_keys = Mock(return_value=defer.succeed([])) +        random_address = yield rf._get_random_key() +        self.assertTrue(random_address is None) + +    @defer.inlineCallbacks +    def test_log_error_if_fetch_by_fingerprint_returns_wrong_key(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        km = self._key_manager() + +        with patch.object(logging.Logger, 'error') as mock_logger_error: +            rf = RandomRefreshPublicKey(pgp, km) +            rf._get_random_key = \ +                Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT))) + +            km._nicknym.fetch_key_with_fingerprint = \ +                Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP))) + +            yield rf.maybe_refresh_key() + +            mock_logger_error.assert_called_with(ERROR_UNEQUAL_FINGERPRINTS % +                                                 (KEY_FINGERPRINT, ANOTHER_FP)) + +    @defer.inlineCallbacks +    def test_put_new_key_in_local_storage(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        km = self._key_manager() + +        rf = RandomRefreshPublicKey(pgp, km) +        rf._get_random_key = Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=KEY_FINGERPRINT))) + +        km._nicknym.fetch_key_with_fingerprint = \ +            Mock(return_value=defer.succeed(OpenPGPKey(fingerprint=ANOTHER_FP))) + +        yield rf.maybe_refresh_key() + +    @defer.inlineCallbacks +    def test_key_expired_will_be_deactivatet(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        km = self._key_manager() +        rf = RandomRefreshPublicKey(pgp, km) +        key = OpenPGPKey(address='zara@leap.se', expiry_date=datetime.now()) + +        self.assertTrue(key.address is 'zara@leap.se') + +        km._openpgp.unactivate_key = Mock(return_value=defer.succeed(None)) + +        yield rf._maybe_unactivate_key(key) + +        self.assertTrue(key.address is None) +        self.assertFalse(key.is_active()) + +    def test_start_refreshing(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) + +        with patch.object(logging.Logger, 'debug') as mock_logger_start: +            rf = RandomRefreshPublicKey(pgp, self._key_manager()) +            rf.start() +            mock_logger_start.assert_called_with(DEBUG_START_REFRESH) +            rf.stop() +            mock_logger_start.assert_called_with(DEBUG_STOP_REFRESH) + +    def test_random_interval_is_set_properly(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) +        self.assertTrue(rf._loop.interval >= MIN_RANDOM_INTERVAL_RANGE) + +    def test_is_random_really_random(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=self.gpg_binary_path) +        rf = RandomRefreshPublicKey(pgp, self._key_manager()) + +        for x in range(0, 5): +            random_numbers = [] + +            for y in range(0, 5): +                random_numbers.append(rf._random_interval_to_refersh()) + +                # there are different numbers in the list +                if len(random_numbers) == len(set(random_numbers)) \ +                        or len(random_numbers) == len(set(random_numbers)) + 1: +                    self.assertTrue(True) +                else: +                    self.assertTrue(False) + + | 
