summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keymanager/changes/feature-5932_fetch_keys_uri1
-rw-r--r--keymanager/src/leap/keymanager/__init__.py31
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py5
-rw-r--r--keymanager/src/leap/keymanager/tests/test_keymanager.py50
4 files changed, 85 insertions, 2 deletions
diff --git a/keymanager/changes/feature-5932_fetch_keys_uri b/keymanager/changes/feature-5932_fetch_keys_uri
new file mode 100644
index 0000000..6f628a6
--- /dev/null
+++ b/keymanager/changes/feature-5932_fetch_keys_uri
@@ -0,0 +1 @@
+- Add 'fetch_key' method to fetch keys from a URI (closes #5932)
diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py
index da679ac..c3423d9 100644
--- a/keymanager/src/leap/keymanager/__init__.py
+++ b/keymanager/src/leap/keymanager/__init__.py
@@ -48,7 +48,7 @@ from leap.common.events import signal
from leap.common.events import events_pb2 as proto
from leap.common.decorators import memoized_method
-from leap.keymanager.errors import KeyNotFound
+from leap.keymanager.errors import KeyNotFound, KeyAttributesDiffer
from leap.keymanager.keys import (
EncryptionKey,
@@ -526,6 +526,35 @@ class KeyManager(object):
except IndexError as e:
leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
+ def fetch_key(self, address, uri, ktype):
+ """
+ Fetch a public key for C{address} from the network and put it in
+ local storage.
+
+ Raises C{openpgp.errors.KeyNotFound} if not valid key on C{uri}.
+ Raises C{openpgp.errors.KeyAttributesDiffer} if address don't match
+ any uid on the key.
+
+ :param address: The email address of the key.
+ :type address: str
+ :param uri: The URI of the key.
+ :type uri: str
+ :param ktype: The type of the key.
+ :type ktype: KeyType
+ """
+ res = self._get(uri)
+ if not res.ok:
+ raise KeyNotFound(uri)
+
+ # XXX parse binary keys
+ pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
+ if pubkey is None:
+ raise KeyNotFound(uri)
+ if pubkey.address != address:
+ raise KeyAttributesDiffer("UID %s found, but expected %s"
+ % (pubkey.address, address))
+ self.put_key(pubkey)
+
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index ee37a34..6a825cd 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -327,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):
privkey = gpg.list_keys(secret=True).pop()
except IndexError:
pass
- pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
+ try:
+ pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
+ except IndexError:
+ return (None, None)
# extract adress from first uid on key
match = re.match(mail_regex, pubkey['uids'].pop())
diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py
index 39b729d..65f8f39 100644
--- a/keymanager/src/leap/keymanager/tests/test_keymanager.py
+++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py
@@ -28,6 +28,7 @@ from leap.keymanager import (
KeyManager,
openpgp,
KeyNotFound,
+ KeyAttributesDiffer,
errors,
)
from leap.keymanager.openpgp import OpenPGPKey
@@ -477,6 +478,55 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
self.assertIsInstance(key, OpenPGPKey)
self.assertEqual(ADDRESS, key.address)
+ def test_fetch_uri_ascii_key(self):
+ """
+ Test that fetch key downloads the ascii key and gets included in
+ the local storage
+ """
+ km = self._key_manager()
+
+ class Response(object):
+ ok = True
+ content = PUBLIC_KEY
+
+ km._fetcher.get = Mock(return_value=Response())
+ km.ca_cert_path = 'cacertpath'
+
+ km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
+ key = km.get_key(ADDRESS, OpenPGPKey)
+ self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+
+ def test_fetch_uri_empty_key(self):
+ """
+ Test that fetch key raises KeyNotFound if no key in the url
+ """
+ km = self._key_manager()
+
+ class Response(object):
+ ok = True
+ content = ""
+
+ km._fetcher.get = Mock(return_value=Response())
+ km.ca_cert_path = 'cacertpath'
+ self.assertRaises(KeyNotFound, km.fetch_key,
+ ADDRESS, "http://site.domain/key", OpenPGPKey)
+
+ def test_fetch_uri_address_differ(self):
+ """
+ Test that fetch key raises KeyAttributesDiffer if the address
+ don't match
+ """
+ km = self._key_manager()
+
+ class Response(object):
+ ok = True
+ content = PUBLIC_KEY
+
+ km._fetcher.get = Mock(return_value=Response())
+ km.ca_cert_path = 'cacertpath'
+ self.assertRaises(KeyAttributesDiffer, km.fetch_key,
+ ADDRESS_2, "http://site.domain/key", OpenPGPKey)
+
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):