Implement 'fetch_key' for ascii keys
authorRuben Pollan <meskio@sindominio.net>
Fri, 1 Aug 2014 15:20:27 +0000 (10:20 -0500)
committerRuben Pollan <meskio@sindominio.net>
Thu, 9 Oct 2014 17:40:56 +0000 (12:40 -0500)
binary keys support is still missing

changes/feature-5932_fetch_keys_uri [new file with mode: 0644]
src/leap/keymanager/__init__.py
src/leap/keymanager/openpgp.py
src/leap/keymanager/tests/test_keymanager.py

diff --git a/changes/feature-5932_fetch_keys_uri b/changes/feature-5932_fetch_keys_uri
new file mode 100644 (file)
index 0000000..6f628a6
--- /dev/null
@@ -0,0 +1 @@
+- Add 'fetch_key' method to fetch keys from a URI (closes #5932)
index da679ac..c3423d9 100644 (file)
@@ -48,7 +48,7 @@ from leap.common.events import signal
 from leap.common.events import events_pb2 as proto
 from leap.common.decorators import memoized_method
 
-from leap.keymanager.errors import KeyNotFound
+from leap.keymanager.errors import KeyNotFound, KeyAttributesDiffer
 
 from leap.keymanager.keys import (
     EncryptionKey,
@@ -526,6 +526,35 @@ class KeyManager(object):
         except IndexError as e:
             leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
 
+    def fetch_key(self, address, uri, ktype):
+        """
+        Fetch a public key for C{address} from the network and put it in
+        local storage.
+
+        Raises C{openpgp.errors.KeyNotFound} if not valid key on C{uri}.
+        Raises C{openpgp.errors.KeyAttributesDiffer} if address don't match
+        any uid on the key.
+
+        :param address: The email address of the key.
+        :type address: str
+        :param uri: The URI of the key.
+        :type uri: str
+        :param ktype: The type of the key.
+        :type ktype: KeyType
+        """
+        res = self._get(uri)
+        if not res.ok:
+            raise KeyNotFound(uri)
+
+        # XXX parse binary keys
+        pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
+        if pubkey is None:
+            raise KeyNotFound(uri)
+        if pubkey.address != address:
+            raise KeyAttributesDiffer("UID %s found, but expected %s"
+                                      % (pubkey.address, address))
+        self.put_key(pubkey)
+
 from ._version import get_versions
 __version__ = get_versions()['version']
 del get_versions
index ee37a34..6a825cd 100644 (file)
@@ -327,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):
                 privkey = gpg.list_keys(secret=True).pop()
             except IndexError:
                 pass
-            pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
+            try:
+                pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
+            except IndexError:
+                return (None, None)
 
             # extract adress from first uid on key
             match = re.match(mail_regex, pubkey['uids'].pop())
index 39b729d..65f8f39 100644 (file)
@@ -28,6 +28,7 @@ from leap.keymanager import (
     KeyManager,
     openpgp,
     KeyNotFound,
+    KeyAttributesDiffer,
     errors,
 )
 from leap.keymanager.openpgp import OpenPGPKey
@@ -477,6 +478,55 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         self.assertIsInstance(key, OpenPGPKey)
         self.assertEqual(ADDRESS, key.address)
 
+    def test_fetch_uri_ascii_key(self):
+        """
+        Test that fetch key downloads the ascii key and gets included in
+        the local storage
+        """
+        km = self._key_manager()
+
+        class Response(object):
+            ok = True
+            content = PUBLIC_KEY
+
+        km._fetcher.get = Mock(return_value=Response())
+        km.ca_cert_path = 'cacertpath'
+
+        km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
+        key = km.get_key(ADDRESS, OpenPGPKey)
+        self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+
+    def test_fetch_uri_empty_key(self):
+        """
+        Test that fetch key raises KeyNotFound if no key in the url
+        """
+        km = self._key_manager()
+
+        class Response(object):
+            ok = True
+            content = ""
+
+        km._fetcher.get = Mock(return_value=Response())
+        km.ca_cert_path = 'cacertpath'
+        self.assertRaises(KeyNotFound, km.fetch_key,
+                          ADDRESS, "http://site.domain/key", OpenPGPKey)
+
+    def test_fetch_uri_address_differ(self):
+        """
+        Test that fetch key raises KeyAttributesDiffer if the address
+        don't match
+        """
+        km = self._key_manager()
+
+        class Response(object):
+            ok = True
+            content = PUBLIC_KEY
+
+        km._fetcher.get = Mock(return_value=Response())
+        km.ca_cert_path = 'cacertpath'
+        self.assertRaises(KeyAttributesDiffer, km.fetch_key,
+                          ADDRESS_2, "http://site.domain/key", OpenPGPKey)
+
 
 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):