Add tests for key management remote methods.
authordrebs <drebs@leap.se>
Fri, 3 May 2013 01:38:31 +0000 (22:38 -0300)
committerdrebs <drebs@leap.se>
Fri, 3 May 2013 01:38:31 +0000 (22:38 -0300)
src/leap/common/keymanager/__init__.py
src/leap/common/keymanager/errors.py
src/leap/common/tests/test_keymanager.py

index 01dc0da..d6dbb8a 100644 (file)
@@ -30,6 +30,7 @@ except ImportError:
 from leap.common.check import leap_assert
 from leap.common.keymanager.errors import (
     KeyNotFound,
+    NoPasswordGiven,
 )
 from leap.common.keymanager.keys import (
     build_key_from_dict,
@@ -51,7 +52,7 @@ INDEXES = {
 
 class KeyManager(object):
 
-    def __init__(self, address, nickserver_url, soledad):
+    def __init__(self, address, nickserver_url, soledad, token=None):
         """
         Initialize a Key Manager for user's C{address} with provider's
         nickserver reachable in C{url}.
@@ -66,11 +67,13 @@ class KeyManager(object):
         self._address = address
         self._nickserver_url = nickserver_url
         self._soledad = soledad
+        self.token = token
         self._wrapper_map = {
             OpenPGPKey: OpenPGPScheme(soledad),
             # other types of key will be added to this mapper.
         }
         self._init_indexes()
+        self._fetcher = requests
 
     #
     # utilities
@@ -109,7 +112,7 @@ class KeyManager(object):
         Make a GET HTTP request and return a dictionary containing the
         response.
         """
-        response = requests.get(self._nickserver_url+path)
+        response = self._fetcher.get(self._nickserver_url+path)
         leap_assert(response.status_code == 200, 'Invalid response.')
         leap_assert(
             response.headers['content-type'].startswith('application/json')
@@ -142,24 +145,27 @@ class KeyManager(object):
             keyserver.
         """
         # prepare the public key bound to address
+        pubkey = self.get_key(
+            self._address, ktype, private=False, fetch_remote=False)
         data = {
             'address': self._address,
             'keys': [
-                json.loads(
-                    self.get_key(
-                        self._address, ktype, private=False).get_json()),
+                json.loads(pubkey.get_json()),
             ]
         }
         # prepare the private key bound to address
         if send_private:
-            privkey = json.loads(
-                self.get_key(self._address, ktype, private=True).get_json())
-            privkey.key_data = encrypt_sym(data, password)
+            if password is None or password == '':
+                raise NoPasswordGiven('Can\'t send unencrypted private keys!')
+            privkey = self.get_key(
+                self._address, ktype, private=True, fetch_remote=False)
+            privkey = json.loads(privkey.get_json())
+            privkey.key_data = encrypt_sym(privkey.key_data, password)
             data['keys'].append(privkey)
-        requests.put(
+        self._fetcher.put(
             self._nickserver_url + '/key/' + self._address,
             data=data,
-            auth=(self._address, None))  # TODO: replace for token-based auth.
+            auth=(self._address, self._token))
 
     def get_key(self, address, ktype, private=False, fetch_remote=True):
         """
@@ -248,7 +254,8 @@ class KeyManager(object):
         """
         addresses = set(map(
             lambda doc: doc.address,
-            self.get_all_keys_in_local_db(False)))
+            self.get_all_keys_in_local_db(private=False)))
+        # TODO: maybe we should not attempt to refresh our own public key?
         for address in addresses:
             for key in self.fetch_keys_from_server(address):
                 self._wrapper_map[key.__class__].put_key(key)
@@ -264,3 +271,16 @@ class KeyManager(object):
         @rtype: EncryptionKey
         """
         return self._wrapper_map[ktype].gen_key(self._address)
+
+    #
+    # Token setter/getter
+    #
+
+    def _get_token(self):
+        return self._token
+
+    def _set_token(self, token):
+        self._token = token
+
+    token = property(
+        _get_token, _set_token, doc='The auth token.')
index add6a38..1cf506e 100644 (file)
@@ -38,3 +38,9 @@ class KeyAttributesDiffer(Exception):
     Raised when trying to delete a key but the stored key differs from the key
     passed to the delete_key() method.
     """
+
+class NoPasswordGiven(Exception):
+    """
+    Raised when trying to perform some action that needs a password without
+    providing one.
+    """
index 32bd1fd..1d7a382 100644 (file)
@@ -21,6 +21,13 @@ Tests for the Key Manager.
 """
 
 
+import mock
+try:
+    import simplejson as json
+except ImportError:
+    import json  # noqa
+
+
 from leap.common.testing.basetest import BaseLeapTest
 from leap.soledad import Soledad
 from leap.soledad.crypto import SoledadCrypto
@@ -30,6 +37,7 @@ from leap.common.keymanager import (
     KeyManager,
     openpgp,
     KeyNotFound,
+    NoPasswordGiven,
     TAGS_INDEX,
     TAGS_AND_PRIVATE_INDEX,
 )
@@ -42,6 +50,9 @@ from leap.common.keymanager.keys import (
 from leap.common.keymanager import errors
 
 
+ADDRESS = 'leap@leap.se'
+
+
 class KeyManagerUtilTestCase(BaseLeapTest):
 
     def setUp(self):
@@ -66,7 +77,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):
 
     def test_build_key_from_dict(self):
         kdict = {
-            'address': 'leap@leap.se',
+            'address': ADDRESS,
             'key_id': 'key_id',
             'fingerprint': 'fingerprint',
             'key_data': 'key_data',
@@ -77,7 +88,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):
             'last_audited_at': 'last_audited_at',
             'validation': 'validation',
         }
-        key = build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict)
+        key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
         self.assertEqual(
             kdict['address'], key.address,
             'Wrong data in key.')
@@ -111,9 +122,9 @@ class KeyManagerUtilTestCase(BaseLeapTest):
 
     def test_keymanager_doc_id(self):
         doc_id1 = keymanager_doc_id(
-            OpenPGPKey, 'leap@leap.se', private=False)
+            OpenPGPKey, ADDRESS, private=False)
         doc_id2 = keymanager_doc_id(
-            OpenPGPKey, 'leap@leap.se', private=True)
+            OpenPGPKey, ADDRESS, private=True)
         doc_id3 = keymanager_doc_id(
             OpenPGPKey, 'user@leap.se', private=False)
         doc_id4 = keymanager_doc_id(
@@ -134,6 +145,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
             "123456",
             secret_path=self.tempdir+"/secret.gpg",
             local_db_path=self.tempdir+"/soledad.u1db",
+            server_url='',
+            cert_file=None,
             bootstrap=False,
         )
         # initialize solead by hand for testing purposes
@@ -144,7 +157,14 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
         self._soledad._init_db()
 
     def tearDown(self):
-        pass
+        km = self._key_manager()
+        for key in km.get_all_keys_in_local_db():
+            km._wrapper_map[key.__class__].delete_key(key)
+        for key in km.get_all_keys_in_local_db(private=True):
+            km._wrapper_map[key.__class__].delete_key(key)
+
+    def _key_manager(self, user=ADDRESS, url=''):
+        return KeyManager(user, url, self._soledad)
 
 
 class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -161,43 +181,43 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
 
     def test_openpgp_put_delete_key(self):
         pgp = openpgp.OpenPGPScheme(self._soledad)
-        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
         pgp.put_key_raw(PUBLIC_KEY)
-        key = pgp.get_key('leap@leap.se', private=False)
+        key = pgp.get_key(ADDRESS, private=False)
         pgp.delete_key(key)
-        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
     def test_openpgp_put_key_raw(self):
         pgp = openpgp.OpenPGPScheme(self._soledad)
-        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
         pgp.put_key_raw(PUBLIC_KEY)
-        key = pgp.get_key('leap@leap.se', private=False)
+        key = pgp.get_key(ADDRESS, private=False)
         self.assertIsInstance(key, openpgp.OpenPGPKey)
         self.assertEqual(
-            'leap@leap.se', key.address, 'Wrong address bound to key.')
+            ADDRESS, key.address, 'Wrong address bound to key.')
         self.assertEqual(
             '4096', key.length, 'Wrong key length.')
         pgp.delete_key(key)
-        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
     def test_get_public_key(self):
         pgp = openpgp.OpenPGPScheme(self._soledad)
-        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
         pgp.put_key_raw(PUBLIC_KEY)
         self.assertRaises(
-            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
-        key = pgp.get_key('leap@leap.se', private=False)
-        self.assertEqual('leap@leap.se', key.address)
+            KeyNotFound, pgp.get_key, ADDRESS, private=True)
+        key = pgp.get_key(ADDRESS, private=False)
+        self.assertEqual(ADDRESS, key.address)
         self.assertFalse(key.private)
         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
         pgp.delete_key(key)
-        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se')
+        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
     def test_openpgp_encrypt_decrypt_asym(self):
         # encrypt
         pgp = openpgp.OpenPGPScheme(self._soledad)
         pgp.put_key_raw(PUBLIC_KEY)
-        pubkey = pgp.get_key('leap@leap.se', private=False)
+        pubkey = pgp.get_key(ADDRESS, private=False)
         cyphertext = openpgp.encrypt_asym('data', pubkey)
         # assert
         self.assertTrue(cyphertext is not None)
@@ -208,16 +228,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         self.assertTrue(openpgp.is_encrypted(cyphertext))
         # decrypt
         self.assertRaises(
-            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
+            KeyNotFound, pgp.get_key, ADDRESS, private=True)
         pgp.put_key_raw(PRIVATE_KEY)
-        privkey = pgp.get_key('leap@leap.se', private=True)
+        privkey = pgp.get_key(ADDRESS, private=True)
         plaintext = openpgp.decrypt_asym(cyphertext, privkey)
         pgp.delete_key(pubkey)
         pgp.delete_key(privkey)
         self.assertRaises(
-            KeyNotFound, pgp.get_key, 'leap@leap.se', private=False)
+            KeyNotFound, pgp.get_key, ADDRESS, private=False)
         self.assertRaises(
-            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True)
+            KeyNotFound, pgp.get_key, ADDRESS, private=True)
 
     def test_openpgp_encrypt_decrypt_sym(self):
         cyphertext = openpgp.encrypt_sym('data', 'pass')
@@ -234,23 +254,115 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
 class KeyManagerKeyManagementTestCase(
     KeyManagerWithSoledadTestCase):
 
-    def _key_manager(self, user='leap@leap.se', url=''):
-        return KeyManager(user, url, self._soledad)
-
     def test_get_all_keys_in_db(self):
         km = self._key_manager()
         km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)
         # get public keys
         keys = km.get_all_keys_in_local_db(False)
         self.assertEqual(len(keys), 1, 'Wrong number of keys')
-        self.assertEqual('leap@leap.se', keys[0].address)
+        self.assertEqual(ADDRESS, keys[0].address)
         self.assertFalse(keys[0].private)
         # get private keys
         keys = km.get_all_keys_in_local_db(True)
         self.assertEqual(len(keys), 1, 'Wrong number of keys')
-        self.assertEqual('leap@leap.se', keys[0].address)
+        self.assertEqual(ADDRESS, keys[0].address)
         self.assertTrue(keys[0].private)
 
+    def test_get_public_key(self):
+        km = self._key_manager()
+        km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)
+        # get the key
+        key = km.get_key(ADDRESS, OpenPGPKey, private=False,
+                         fetch_remote=False)
+        self.assertTrue(key is not None)
+        self.assertEqual(key.address, ADDRESS)
+        self.assertEqual(
+            key.fingerprint.lower(),  KEY_FINGERPRINT.lower())
+        self.assertFalse(key.private)
+
+    def test_get_private_key(self):
+        km = self._key_manager()
+        km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)
+        # get the key
+        key = km.get_key(ADDRESS, OpenPGPKey, private=True,
+                         fetch_remote=False)
+        self.assertTrue(key is not None)
+        self.assertEqual(key.address, ADDRESS)
+        self.assertEqual(
+            key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+        self.assertTrue(key.private)
+
+    def test_send_key_raises_key_not_found(self):
+        km = self._key_manager()
+        self.assertRaises(
+            KeyNotFound,
+            km.send_key, OpenPGPKey, send_private=False)
+
+    def test_send_private_key_raises_key_not_found(self):
+        km = self._key_manager()
+        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+        self.assertRaises(
+            KeyNotFound,
+            km.send_key, OpenPGPKey, send_private=True,
+            password='123')
+
+    def test_send_private_key_without_password_raises(self):
+        km = self._key_manager()
+        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+        self.assertRaises(
+            NoPasswordGiven,
+            km.send_key, OpenPGPKey, send_private=True)
+
+    def test_send_public_key(self):
+        km = self._key_manager()
+        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+        km._fetcher.put = mock.Mock()
+        km.token = '123'
+        km.send_key(OpenPGPKey, send_private=False)
+        # setup args
+        data = {
+            'address': km._address,
+            'keys': [
+                json.loads(
+                    km.get_key(
+                        km._address, OpenPGPKey).get_json()),
+            ]
+        }
+        url = km._nickserver_url + '/key/' + km._address
+
+        km._fetcher.put.assert_called_once_with(
+            url, data=data, auth=(km._address, '123')
+        )
+
+    def test_fetch_keys_from_server(self):
+        km = self._key_manager()
+        # setup mock
+
+        class Response(object):
+            status_code = 200
+            headers = {'content-type': 'application/json'}
+            def json(self):
+                return {'address': 'anotheruser@leap.se', 'keys': []}
+
+        km._fetcher.get = mock.Mock(
+            return_value=Response())
+        # do the fetch
+        km.fetch_keys_from_server('anotheruser@leap.se')
+        # and verify the call
+        km._fetcher.get.assert_called_once_with(
+           km._nickserver_url + '/key/' + 'anotheruser@leap.se',
+        )
+
+    def test_refresh_keys(self):
+        # TODO: maybe we should not attempt to refresh our own public key?
+        km = self._key_manager()
+        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY)
+        km.fetch_keys_from_server = mock.Mock(return_value=[])
+        km.refresh_keys()
+        km.fetch_keys_from_server.assert_called_once_with(
+            'leap@leap.se'
+        )
+
 
 # Key material for testing
 KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"