summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/feature-6815_nicknym_validation_level1
-rw-r--r--src/leap/keymanager/__init__.py40
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py96
3 files changed, 110 insertions, 27 deletions
diff --git a/changes/feature-6815_nicknym_validation_level b/changes/feature-6815_nicknym_validation_level
new file mode 100644
index 00000000..c82f7d46
--- /dev/null
+++ b/changes/feature-6815_nicknym_validation_level
@@ -0,0 +1 @@
+- fetched keys from other domain than its provider are set as 'Weak Chain' validation level (Closes: #6815)
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index 3ef63505..f7b19747 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -44,6 +44,7 @@ import logging
import requests
from twisted.internet import defer
+from urlparse import urlparse
from leap.common.check import leap_assert
from leap.common.events import signal
@@ -219,13 +220,21 @@ class KeyManager(object):
res = self._get(self._nickserver_uri, {'address': address})
res.raise_for_status()
server_keys = res.json()
+
# insert keys in local database
if self.OPENPGP_KEY in server_keys:
+ # nicknym server is authoritative for its own domain,
+ # for other domains the key might come from key servers.
+ validation_level = ValidationLevel.Weak_Chain
+ _, domain = _split_email(address)
+ if (domain == _get_domain(self._nickserver_uri)):
+ validation_level = ValidationLevel.Provider_Trust
+
d = self.put_raw_key(
server_keys['openpgp'],
OpenPGPKey,
address=address,
- validation=ValidationLevel.Provider_Trust)
+ validation=validation_level)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
d = defer.fail(KeyNotFound(address))
@@ -786,6 +795,35 @@ class KeyManager(object):
if ktype not in self._wrapper_map:
raise UnsupportedKeyTypeError(str(ktype))
+
+def _split_email(address):
+ """
+ Split username and domain from an email address
+
+ :param address: an email address
+ :type address: str
+
+ :return: username and domain from the email address
+ :rtype: (str, str)
+ """
+ if address.count("@") != 1:
+ return None
+ return address.split("@")
+
+
+def _get_domain(url):
+ """
+ Get the domain from an url
+
+ :param url: an url
+ :type url: str
+
+ :return: the domain part of the url
+ :rtype: str
+ """
+ return urlparse(url).hostname
+
+
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 93bc42c7..55f892e9 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -52,6 +52,9 @@ from leap.keymanager.tests import (
)
+NICKSERVER_URI = "http://leap.se/"
+
+
class KeyManagerUtilTestCase(unittest.TestCase):
def test_is_address(self):
@@ -201,31 +204,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
Test that the request is well formed when fetching keys from server.
"""
- km = self._key_manager(url='http://nickserver.domain')
-
- class Response(object):
- status_code = 200
- headers = {'content-type': 'application/json'}
-
- def json(self):
- return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2}
-
- def raise_for_status(self):
- pass
-
- # mock the fetcher so it returns the key for ADDRESS_2
- km._fetcher.get = Mock(
- return_value=Response())
- km.ca_cert_path = 'cacertpath'
+ km = self._key_manager(url=NICKSERVER_URI)
def verify_the_call(_):
km._fetcher.get.assert_called_once_with(
- 'http://nickserver.domain',
+ NICKSERVER_URI,
data={'address': ADDRESS_2},
verify='cacertpath',
)
- d = km._fetch_keys_from_server(ADDRESS_2)
+ d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
d.addCallback(verify_the_call)
return d
@@ -234,14 +222,35 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
Test that getting a key successfuly fetches from server.
"""
- km = self._key_manager(url='http://nickserver.domain')
+ km = self._key_manager(url=NICKSERVER_URI)
+
+ key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS in key.address)
+ self.assertEqual(key.validation, ValidationLevel.Provider_Trust)
+
+ @inlineCallbacks
+ def test_get_key_fetches_other_domain(self):
+ """
+ Test that getting a key successfuly fetches from server.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+
+ key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS_OTHER in key.address)
+ self.assertEqual(key.validation, ValidationLevel.Weak_Chain)
+ def _fetch_key(self, km, address, key):
+ """
+ :returns: a Deferred that will fire with the OpenPGPKey
+ """
class Response(object):
status_code = 200
headers = {'content-type': 'application/json'}
def json(self):
- return {'address': ADDRESS, 'openpgp': PUBLIC_KEY}
+ return {'address': address, 'openpgp': key}
def raise_for_status(self):
pass
@@ -250,19 +259,18 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km._fetcher.get = Mock(return_value=Response())
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
- d = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
- yield self.assertFailure(d, KeyNotFound)
+ d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
+ d = self.assertFailure(d_fail, KeyNotFound)
# try to get key fetching from server.
- key = yield km.get_key(ADDRESS, OpenPGPKey)
- self.assertIsInstance(key, OpenPGPKey)
- self.assertTrue(ADDRESS in key.address)
+ d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
+ return d
@inlineCallbacks
def test_put_key_ascii(self):
"""
Test that putting ascii key works
"""
- km = self._key_manager(url='http://nickserver.domain')
+ km = self._key_manager(url=NICKSERVER_URI)
yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -387,3 +395,39 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
import unittest
if __name__ == "__main__":
unittest.main()
+
+# key 0F91B402: someone@somedomain.org
+# 9420 EC7B 6DCB 867F 5592 E6D1 7504 C974 0F91 B402
+ADDRESS_OTHER = "someone@somedomain.org"
+PUBLIC_KEY_OTHER = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
+kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
+jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
+wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
+QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
+AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
+bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
+9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
+cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
+TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
+Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
+hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
+BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
+u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
+MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
+2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
+SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
+1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
+CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
+DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
+6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
+U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
+XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
+QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
+=gDzy
+-----END PGP PUBLIC KEY BLOCK-----
+"""