diff options
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 33 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 51 | 
2 files changed, 74 insertions, 10 deletions
| diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 194a4ee..aa0a9ac 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -26,6 +26,8 @@ import json  import urllib  from leap.common import ca_bundle +from twisted.web import client +from twisted.web._responses import NOT_FOUND  from ._version import get_versions @@ -209,12 +211,12 @@ class KeyManager(object):          """          try:              uri = self._nickserver_uri + '?address=' + address -            content = yield self._async_client_pinned.request(str(uri), 'GET') +            content = yield self._fetch_and_handle_404_from_nicknym(uri, address)              json_content = json.loads(content) + +        except KeyNotFound: +            raise          except IOError as e: -            # FIXME: 404 doesnt raise today, but it wont produce json anyway -            # if e.response.status_code == 404: -                # raise KeyNotFound(address)              logger.warning("HTTP error retrieving key: %r" % (e,))              logger.warning("%s" % (content,))              raise KeyNotFound(e.message), None, sys.exc_info()[2] @@ -232,6 +234,29 @@ class KeyManager(object):          #     'Content-type is not JSON.')          defer.returnValue(json_content) +    def _fetch_and_handle_404_from_nicknym(self, uri, address): +        """ +        Send a GET request to C{uri} containing C{data}. + +        :param uri: The URI of the request. +        :type uri: str +        :param address: The email corresponding to the key. +        :type address: str + +        :return: A deferred that will be fired with GET content as json (dict) +        :rtype: Deferred +        """ +        def check_404(response): +            if response.code == NOT_FOUND: +                message = '%s: %s key not found.' % (response.code, address) +                logger.warning(message) +                raise KeyNotFound(message), None, sys.exc_info()[2] +            return response + +        d = self._async_client_pinned.request(str(uri), 'GET', callback=check_404) +        d.addCallback(client.readBody) +        return d +      @defer.inlineCallbacks      def _get_with_combined_ca_bundle(self, uri, data=None):          """ diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 05c1cdd..14f47f6 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -30,6 +30,9 @@ from leap.common import ca_bundle  from mock import Mock, MagicMock, patch  from twisted.internet import defer  from twisted.trial import unittest +from twisted.web._responses import NOT_FOUND + +from leap.keymanager import client  from leap.keymanager import (      KeyNotFound, @@ -56,6 +59,7 @@ from leap.keymanager.tests import (  NICKSERVER_URI = "http://leap.se/"  REMOTE_KEY_URL = "http://site.domain/key" +INVALID_MAIL_ADDRESS = "notexistingemail@example.org"  class KeyManagerUtilTestCase(unittest.TestCase): @@ -229,15 +233,49 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2          def verify_the_call(_): -            km._async_client_pinned.request.assert_called_once_with( -                expected_url, -                'GET', -            ) +            used_kwargs = km._async_client_pinned.request.call_args[1] +            km._async_client_pinned.request.assert_called_once_with(expected_url, 'GET', **used_kwargs)          d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)          d.addCallback(verify_the_call)          return d +    def test_key_not_found_is_raised_if_key_search_responds_404(self): +        """ +        Test if key search request comes back with a 404 response then KeyNotFound is raised, with corresponding error message. +        """ +        km = self._key_manager(url=NICKSERVER_URI) +        client.readBody = Mock(return_value=defer.succeed(None)) +        km._async_client_pinned.request = Mock(return_value=defer.succeed(None)) +        url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS + +        d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) + +        def check_key_not_found_is_raised_if_404(_): +            used_kwargs = km._async_client_pinned.request.call_args[1] +            check_404_callback = used_kwargs['callback'] +            fake_response = Mock() +            fake_response.code = NOT_FOUND +            with self.assertRaisesRegexp(KeyNotFound, '404: %s key not found.' % INVALID_MAIL_ADDRESS): +                check_404_callback(fake_response) + +        d.addCallback(check_key_not_found_is_raised_if_404) +        return d + +    def test_non_existing_key_from_nicknym_is_relayed(self): +        """ +        Test if key search requests throws KeyNotFound, the same error is raised. +        """ +        km = self._key_manager(url=NICKSERVER_URI) +        key_not_found_exception = KeyNotFound('some message') +        km._async_client_pinned.request = Mock(side_effect=key_not_found_exception) + +        def assert_key_not_found_raised(error): +            self.assertEqual(error.value, key_not_found_exception) + +        d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS) +        d.addErrback(assert_key_not_found_raised) +      @defer.inlineCallbacks      def test_get_key_fetches_from_server(self):          """ @@ -268,9 +306,10 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          data = json.dumps({'address': address, 'openpgp': key}) +        client.readBody = Mock(return_value=defer.succeed(data)) +          # mock the fetcher so it returns the key for ADDRESS_2 -        km._async_client_pinned.request = Mock( -            return_value=defer.succeed(data)) +        km._async_client_pinned.request = Mock(return_value=defer.succeed(None))          km.ca_cert_path = 'cacertpath'          # try to key get without fetching from server          d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False) | 
