diff options
| -rw-r--r-- | changes/feature-5932_fetch_keys_uri | 1 | ||||
| -rw-r--r-- | src/leap/keymanager/__init__.py | 31 | ||||
| -rw-r--r-- | src/leap/keymanager/openpgp.py | 5 | ||||
| -rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 50 | 
4 files changed, 85 insertions, 2 deletions
diff --git a/changes/feature-5932_fetch_keys_uri b/changes/feature-5932_fetch_keys_uri new file mode 100644 index 0000000..6f628a6 --- /dev/null +++ b/changes/feature-5932_fetch_keys_uri @@ -0,0 +1 @@ +- Add 'fetch_key' method to fetch keys from a URI (closes #5932) diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index da679ac..c3423d9 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -48,7 +48,7 @@ from leap.common.events import signal  from leap.common.events import events_pb2 as proto  from leap.common.decorators import memoized_method -from leap.keymanager.errors import KeyNotFound +from leap.keymanager.errors import KeyNotFound, KeyAttributesDiffer  from leap.keymanager.keys import (      EncryptionKey, @@ -526,6 +526,35 @@ class KeyManager(object):          except IndexError as e:              leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) +    def fetch_key(self, address, uri, ktype): +        """ +        Fetch a public key for C{address} from the network and put it in +        local storage. + +        Raises C{openpgp.errors.KeyNotFound} if not valid key on C{uri}. +        Raises C{openpgp.errors.KeyAttributesDiffer} if address don't match +        any uid on the key. + +        :param address: The email address of the key. +        :type address: str +        :param uri: The URI of the key. +        :type uri: str +        :param ktype: The type of the key. +        :type ktype: KeyType +        """ +        res = self._get(uri) +        if not res.ok: +            raise KeyNotFound(uri) + +        # XXX parse binary keys +        pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content) +        if pubkey is None: +            raise KeyNotFound(uri) +        if pubkey.address != address: +            raise KeyAttributesDiffer("UID %s found, but expected %s" +                                      % (pubkey.address, address)) +        self.put_key(pubkey) +  from ._version import get_versions  __version__ = get_versions()['version']  del get_versions diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index ee37a34..6a825cd 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -327,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):                  privkey = gpg.list_keys(secret=True).pop()              except IndexError:                  pass -            pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring +            try: +                pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring +            except IndexError: +                return (None, None)              # extract adress from first uid on key              match = re.match(mail_regex, pubkey['uids'].pop()) diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 39b729d..65f8f39 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -28,6 +28,7 @@ from leap.keymanager import (      KeyManager,      openpgp,      KeyNotFound, +    KeyAttributesDiffer,      errors,  )  from leap.keymanager.openpgp import OpenPGPKey @@ -477,6 +478,55 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          self.assertIsInstance(key, OpenPGPKey)          self.assertEqual(ADDRESS, key.address) +    def test_fetch_uri_ascii_key(self): +        """ +        Test that fetch key downloads the ascii key and gets included in +        the local storage +        """ +        km = self._key_manager() + +        class Response(object): +            ok = True +            content = PUBLIC_KEY + +        km._fetcher.get = Mock(return_value=Response()) +        km.ca_cert_path = 'cacertpath' + +        km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) +        key = km.get_key(ADDRESS, OpenPGPKey) +        self.assertEqual(KEY_FINGERPRINT, key.fingerprint) + +    def test_fetch_uri_empty_key(self): +        """ +        Test that fetch key raises KeyNotFound if no key in the url +        """ +        km = self._key_manager() + +        class Response(object): +            ok = True +            content = "" + +        km._fetcher.get = Mock(return_value=Response()) +        km.ca_cert_path = 'cacertpath' +        self.assertRaises(KeyNotFound, km.fetch_key, +                          ADDRESS, "http://site.domain/key", OpenPGPKey) + +    def test_fetch_uri_address_differ(self): +        """ +        Test that fetch key raises KeyAttributesDiffer if the address +        don't match +        """ +        km = self._key_manager() + +        class Response(object): +            ok = True +            content = PUBLIC_KEY + +        km._fetcher.get = Mock(return_value=Response()) +        km.ca_cert_path = 'cacertpath' +        self.assertRaises(KeyAttributesDiffer, km.fetch_key, +                          ADDRESS_2, "http://site.domain/key", OpenPGPKey) +  class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):  | 
