diff options
Diffstat (limited to 'keymanager')
| -rw-r--r-- | keymanager/changes/feature-6815_nicknym_validation_level | 1 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 40 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 96 | 
3 files changed, 110 insertions, 27 deletions
| diff --git a/keymanager/changes/feature-6815_nicknym_validation_level b/keymanager/changes/feature-6815_nicknym_validation_level new file mode 100644 index 0000000..c82f7d4 --- /dev/null +++ b/keymanager/changes/feature-6815_nicknym_validation_level @@ -0,0 +1 @@ +-  fetched keys from other domain than its provider are set as 'Weak Chain' validation level (Closes: #6815) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 3ef6350..f7b1974 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -44,6 +44,7 @@ import logging  import requests  from twisted.internet import defer +from urlparse import urlparse  from leap.common.check import leap_assert  from leap.common.events import signal @@ -219,13 +220,21 @@ class KeyManager(object):              res = self._get(self._nickserver_uri, {'address': address})              res.raise_for_status()              server_keys = res.json() +              # insert keys in local database              if self.OPENPGP_KEY in server_keys: +                # nicknym server is authoritative for its own domain, +                # for other domains the key might come from key servers. +                validation_level = ValidationLevel.Weak_Chain +                _, domain = _split_email(address) +                if (domain == _get_domain(self._nickserver_uri)): +                    validation_level = ValidationLevel.Provider_Trust +                  d = self.put_raw_key(                      server_keys['openpgp'],                      OpenPGPKey,                      address=address, -                    validation=ValidationLevel.Provider_Trust) +                    validation=validation_level)          except requests.exceptions.HTTPError as e:              if e.response.status_code == 404:                  d = defer.fail(KeyNotFound(address)) @@ -786,6 +795,35 @@ class KeyManager(object):          if ktype not in self._wrapper_map:              raise UnsupportedKeyTypeError(str(ktype)) + +def _split_email(address): +    """ +    Split username and domain from an email address + +    :param address: an email address +    :type address: str + +    :return: username and domain from the email address +    :rtype: (str, str) +    """ +    if address.count("@") != 1: +        return None +    return address.split("@") + + +def _get_domain(url): +    """ +    Get the domain from an url + +    :param url: an url +    :type url: str + +    :return: the domain part of the url +    :rtype: str +    """ +    return urlparse(url).hostname + +  from ._version import get_versions  __version__ = get_versions()['version']  del get_versions diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 93bc42c..55f892e 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -52,6 +52,9 @@ from leap.keymanager.tests import (  ) +NICKSERVER_URI = "http://leap.se/" + +  class KeyManagerUtilTestCase(unittest.TestCase):      def test_is_address(self): @@ -201,31 +204,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          Test that the request is well formed when fetching keys from server.          """ -        km = self._key_manager(url='http://nickserver.domain') - -        class Response(object): -            status_code = 200 -            headers = {'content-type': 'application/json'} - -            def json(self): -                return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2} - -            def raise_for_status(self): -                pass - -        # mock the fetcher so it returns the key for ADDRESS_2 -        km._fetcher.get = Mock( -            return_value=Response()) -        km.ca_cert_path = 'cacertpath' +        km = self._key_manager(url=NICKSERVER_URI)          def verify_the_call(_):              km._fetcher.get.assert_called_once_with( -                'http://nickserver.domain', +                NICKSERVER_URI,                  data={'address': ADDRESS_2},                  verify='cacertpath',              ) -        d = km._fetch_keys_from_server(ADDRESS_2) +        d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)          d.addCallback(verify_the_call)          return d @@ -234,14 +222,35 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          Test that getting a key successfuly fetches from server.          """ -        km = self._key_manager(url='http://nickserver.domain') +        km = self._key_manager(url=NICKSERVER_URI) + +        key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) +        self.assertIsInstance(key, OpenPGPKey) +        self.assertTrue(ADDRESS in key.address) +        self.assertEqual(key.validation, ValidationLevel.Provider_Trust) + +    @inlineCallbacks +    def test_get_key_fetches_other_domain(self): +        """ +        Test that getting a key successfuly fetches from server. +        """ +        km = self._key_manager(url=NICKSERVER_URI) + +        key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) +        self.assertIsInstance(key, OpenPGPKey) +        self.assertTrue(ADDRESS_OTHER in key.address) +        self.assertEqual(key.validation, ValidationLevel.Weak_Chain) +    def _fetch_key(self, km, address, key): +        """ +        :returns: a Deferred that will fire with the OpenPGPKey +        """          class Response(object):              status_code = 200              headers = {'content-type': 'application/json'}              def json(self): -                return {'address': ADDRESS, 'openpgp': PUBLIC_KEY} +                return {'address': address, 'openpgp': key}              def raise_for_status(self):                  pass @@ -250,19 +259,18 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          km._fetcher.get = Mock(return_value=Response())          km.ca_cert_path = 'cacertpath'          # try to key get without fetching from server -        d = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) -        yield self.assertFailure(d, KeyNotFound) +        d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False) +        d = self.assertFailure(d_fail, KeyNotFound)          # try to get key fetching from server. -        key = yield km.get_key(ADDRESS, OpenPGPKey) -        self.assertIsInstance(key, OpenPGPKey) -        self.assertTrue(ADDRESS in key.address) +        d.addCallback(lambda _: km.get_key(address, OpenPGPKey)) +        return d      @inlineCallbacks      def test_put_key_ascii(self):          """          Test that putting ascii key works          """ -        km = self._key_manager(url='http://nickserver.domain') +        km = self._key_manager(url=NICKSERVER_URI)          yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)          key = yield km.get_key(ADDRESS, OpenPGPKey) @@ -387,3 +395,39 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):  import unittest  if __name__ == "__main__":      unittest.main() + +# key 0F91B402: someone@somedomain.org +# 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402 +ADDRESS_OTHER = "someone@somedomain.org" +PUBLIC_KEY_OTHER = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL +kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM +jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr +wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt +QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU +AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv +bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ +CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi +9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ +cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj +TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R +Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7 +hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN +BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv +u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq +MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq +2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP +SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC +1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA +CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6 +DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy +6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56 +U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw +XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg +QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg== +=gDzy +-----END PGP PUBLIC KEY BLOCK----- +""" | 
