[feat] set fetched keys as Weak Chain if they are not from the same domain
authorRuben Pollan <meskio@sindominio.net>
Mon, 30 Mar 2015 09:25:21 +0000 (11:25 +0200)
committerRuben Pollan <meskio@sindominio.net>
Mon, 30 Mar 2015 20:58:43 +0000 (22:58 +0200)
Nicknym server is authoritative for its own domain, but for others it might
retrieve keys from key servers. On keys from the same domain we set the
validation level to 'Provider Trust'. For other domains in the email
address we set it to 'Weak Chain' as we don't have info about its source.

Resolves: #6815
Related: #6718
Releases: 0.4.0

changes/feature-6815_nicknym_validation_level [new file with mode: 0644]
src/leap/keymanager/__init__.py
src/leap/keymanager/tests/test_keymanager.py

diff --git a/changes/feature-6815_nicknym_validation_level b/changes/feature-6815_nicknym_validation_level
new file mode 100644 (file)
index 0000000..c82f7d4
--- /dev/null
@@ -0,0 +1 @@
+-  fetched keys from other domain than its provider are set as 'Weak Chain' validation level (Closes: #6815)
index 3ef6350..f7b1974 100644 (file)
@@ -44,6 +44,7 @@ import logging
 import requests
 
 from twisted.internet import defer
+from urlparse import urlparse
 
 from leap.common.check import leap_assert
 from leap.common.events import signal
@@ -219,13 +220,21 @@ class KeyManager(object):
             res = self._get(self._nickserver_uri, {'address': address})
             res.raise_for_status()
             server_keys = res.json()
+
             # insert keys in local database
             if self.OPENPGP_KEY in server_keys:
+                # nicknym server is authoritative for its own domain,
+                # for other domains the key might come from key servers.
+                validation_level = ValidationLevel.Weak_Chain
+                _, domain = _split_email(address)
+                if (domain == _get_domain(self._nickserver_uri)):
+                    validation_level = ValidationLevel.Provider_Trust
+
                 d = self.put_raw_key(
                     server_keys['openpgp'],
                     OpenPGPKey,
                     address=address,
-                    validation=ValidationLevel.Provider_Trust)
+                    validation=validation_level)
         except requests.exceptions.HTTPError as e:
             if e.response.status_code == 404:
                 d = defer.fail(KeyNotFound(address))
@@ -786,6 +795,35 @@ class KeyManager(object):
         if ktype not in self._wrapper_map:
             raise UnsupportedKeyTypeError(str(ktype))
 
+
+def _split_email(address):
+    """
+    Split username and domain from an email address
+
+    :param address: an email address
+    :type address: str
+
+    :return: username and domain from the email address
+    :rtype: (str, str)
+    """
+    if address.count("@") != 1:
+        return None
+    return address.split("@")
+
+
+def _get_domain(url):
+    """
+    Get the domain from an url
+
+    :param url: an url
+    :type url: str
+
+    :return: the domain part of the url
+    :rtype: str
+    """
+    return urlparse(url).hostname
+
+
 from ._version import get_versions
 __version__ = get_versions()['version']
 del get_versions
index 93bc42c..55f892e 100644 (file)
@@ -52,6 +52,9 @@ from leap.keymanager.tests import (
 )
 
 
+NICKSERVER_URI = "http://leap.se/"
+
+
 class KeyManagerUtilTestCase(unittest.TestCase):
 
     def test_is_address(self):
@@ -201,31 +204,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         """
         Test that the request is well formed when fetching keys from server.
         """
-        km = self._key_manager(url='http://nickserver.domain')
-
-        class Response(object):
-            status_code = 200
-            headers = {'content-type': 'application/json'}
-
-            def json(self):
-                return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2}
-
-            def raise_for_status(self):
-                pass
-
-        # mock the fetcher so it returns the key for ADDRESS_2
-        km._fetcher.get = Mock(
-            return_value=Response())
-        km.ca_cert_path = 'cacertpath'
+        km = self._key_manager(url=NICKSERVER_URI)
 
         def verify_the_call(_):
             km._fetcher.get.assert_called_once_with(
-                'http://nickserver.domain',
+                NICKSERVER_URI,
                 data={'address': ADDRESS_2},
                 verify='cacertpath',
             )
 
-        d = km._fetch_keys_from_server(ADDRESS_2)
+        d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
         d.addCallback(verify_the_call)
         return d
 
@@ -234,14 +222,35 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         """
         Test that getting a key successfuly fetches from server.
         """
-        km = self._key_manager(url='http://nickserver.domain')
+        km = self._key_manager(url=NICKSERVER_URI)
+
+        key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
+        self.assertIsInstance(key, OpenPGPKey)
+        self.assertTrue(ADDRESS in key.address)
+        self.assertEqual(key.validation, ValidationLevel.Provider_Trust)
+
+    @inlineCallbacks
+    def test_get_key_fetches_other_domain(self):
+        """
+        Test that getting a key successfuly fetches from server.
+        """
+        km = self._key_manager(url=NICKSERVER_URI)
+
+        key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
+        self.assertIsInstance(key, OpenPGPKey)
+        self.assertTrue(ADDRESS_OTHER in key.address)
+        self.assertEqual(key.validation, ValidationLevel.Weak_Chain)
 
+    def _fetch_key(self, km, address, key):
+        """
+        :returns: a Deferred that will fire with the OpenPGPKey
+        """
         class Response(object):
             status_code = 200
             headers = {'content-type': 'application/json'}
 
             def json(self):
-                return {'address': ADDRESS, 'openpgp': PUBLIC_KEY}
+                return {'address': address, 'openpgp': key}
 
             def raise_for_status(self):
                 pass
@@ -250,19 +259,18 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         km._fetcher.get = Mock(return_value=Response())
         km.ca_cert_path = 'cacertpath'
         # try to key get without fetching from server
-        d = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
-        yield self.assertFailure(d, KeyNotFound)
+        d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
+        d = self.assertFailure(d_fail, KeyNotFound)
         # try to get key fetching from server.
-        key = yield km.get_key(ADDRESS, OpenPGPKey)
-        self.assertIsInstance(key, OpenPGPKey)
-        self.assertTrue(ADDRESS in key.address)
+        d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
+        return d
 
     @inlineCallbacks
     def test_put_key_ascii(self):
         """
         Test that putting ascii key works
         """
-        km = self._key_manager(url='http://nickserver.domain')
+        km = self._key_manager(url=NICKSERVER_URI)
 
         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
         key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -387,3 +395,39 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
 import unittest
 if __name__ == "__main__":
     unittest.main()
+
+# key 0F91B402: someone@somedomain.org
+# 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
+ADDRESS_OTHER = "someone@somedomain.org"
+PUBLIC_KEY_OTHER = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
+kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
+jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
+wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
+QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
+AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
+bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
+9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
+cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
+TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
+Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
+hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
+BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
+u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
+MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
+2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
+SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
+1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
+CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
+DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
+6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
+U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
+XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
+QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
+=gDzy
+-----END PGP PUBLIC KEY BLOCK-----
+"""