diff options
| author | drebs <drebs@leap.se> | 2013-05-02 22:38:31 -0300 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2013-05-02 22:38:31 -0300 | 
| commit | 71a3f21d3b72566efa6cf024317dfc96624a10f7 (patch) | |
| tree | 40a36e4817d605439aeedce826751bc154d1bfa6 /src/leap | |
| parent | 170cd90f593a106ea7730babde310724410a585e (diff) | |
Add tests for key management remote methods.
Diffstat (limited to 'src/leap')
| -rw-r--r-- | src/leap/common/keymanager/__init__.py | 42 | ||||
| -rw-r--r-- | src/leap/common/keymanager/errors.py | 6 | ||||
| -rw-r--r-- | src/leap/common/tests/test_keymanager.py | 166 | 
3 files changed, 176 insertions, 38 deletions
| diff --git a/src/leap/common/keymanager/__init__.py b/src/leap/common/keymanager/__init__.py index 01dc0da..d6dbb8a 100644 --- a/src/leap/common/keymanager/__init__.py +++ b/src/leap/common/keymanager/__init__.py @@ -30,6 +30,7 @@ except ImportError:  from leap.common.check import leap_assert  from leap.common.keymanager.errors import (      KeyNotFound, +    NoPasswordGiven,  )  from leap.common.keymanager.keys import (      build_key_from_dict, @@ -51,7 +52,7 @@ INDEXES = {  class KeyManager(object): -    def __init__(self, address, nickserver_url, soledad): +    def __init__(self, address, nickserver_url, soledad, token=None):          """          Initialize a Key Manager for user's C{address} with provider's          nickserver reachable in C{url}. @@ -66,11 +67,13 @@ class KeyManager(object):          self._address = address          self._nickserver_url = nickserver_url          self._soledad = soledad +        self.token = token          self._wrapper_map = {              OpenPGPKey: OpenPGPScheme(soledad),              # other types of key will be added to this mapper.          }          self._init_indexes() +        self._fetcher = requests      #      # utilities @@ -109,7 +112,7 @@ class KeyManager(object):          Make a GET HTTP request and return a dictionary containing the          response.          """ -        response = requests.get(self._nickserver_url+path) +        response = self._fetcher.get(self._nickserver_url+path)          leap_assert(response.status_code == 200, 'Invalid response.')          leap_assert(              response.headers['content-type'].startswith('application/json') @@ -142,24 +145,27 @@ class KeyManager(object):              keyserver.          """          # prepare the public key bound to address +        pubkey = self.get_key( +            self._address, ktype, private=False, fetch_remote=False)          data = {              'address': self._address,              'keys': [ -                json.loads( -                    self.get_key( -                        self._address, ktype, private=False).get_json()), +                json.loads(pubkey.get_json()),              ]          }          # prepare the private key bound to address          if send_private: -            privkey = json.loads( -                self.get_key(self._address, ktype, private=True).get_json()) -            privkey.key_data = encrypt_sym(data, password) +            if password is None or password == '': +                raise NoPasswordGiven('Can\'t send unencrypted private keys!') +            privkey = self.get_key( +                self._address, ktype, private=True, fetch_remote=False) +            privkey = json.loads(privkey.get_json()) +            privkey.key_data = encrypt_sym(privkey.key_data, password)              data['keys'].append(privkey) -        requests.put( +        self._fetcher.put(              self._nickserver_url + '/key/' + self._address,              data=data, -            auth=(self._address, None))  # TODO: replace for token-based auth. +            auth=(self._address, self._token))      def get_key(self, address, ktype, private=False, fetch_remote=True):          """ @@ -248,7 +254,8 @@ class KeyManager(object):          """          addresses = set(map(              lambda doc: doc.address, -            self.get_all_keys_in_local_db(False))) +            self.get_all_keys_in_local_db(private=False))) +        # TODO: maybe we should not attempt to refresh our own public key?          for address in addresses:              for key in self.fetch_keys_from_server(address):                  self._wrapper_map[key.__class__].put_key(key) @@ -264,3 +271,16 @@ class KeyManager(object):          @rtype: EncryptionKey          """          return self._wrapper_map[ktype].gen_key(self._address) + +    # +    # Token setter/getter +    # + +    def _get_token(self): +        return self._token + +    def _set_token(self, token): +        self._token = token + +    token = property( +        _get_token, _set_token, doc='The auth token.') diff --git a/src/leap/common/keymanager/errors.py b/src/leap/common/keymanager/errors.py index add6a38..1cf506e 100644 --- a/src/leap/common/keymanager/errors.py +++ b/src/leap/common/keymanager/errors.py @@ -38,3 +38,9 @@ class KeyAttributesDiffer(Exception):      Raised when trying to delete a key but the stored key differs from the key      passed to the delete_key() method.      """ + +class NoPasswordGiven(Exception): +    """ +    Raised when trying to perform some action that needs a password without +    providing one. +    """ diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py index 32bd1fd..1d7a382 100644 --- a/src/leap/common/tests/test_keymanager.py +++ b/src/leap/common/tests/test_keymanager.py @@ -21,6 +21,13 @@ Tests for the Key Manager.  """ +import mock +try: +    import simplejson as json +except ImportError: +    import json  # noqa + +  from leap.common.testing.basetest import BaseLeapTest  from leap.soledad import Soledad  from leap.soledad.crypto import SoledadCrypto @@ -30,6 +37,7 @@ from leap.common.keymanager import (      KeyManager,      openpgp,      KeyNotFound, +    NoPasswordGiven,      TAGS_INDEX,      TAGS_AND_PRIVATE_INDEX,  ) @@ -42,6 +50,9 @@ from leap.common.keymanager.keys import (  from leap.common.keymanager import errors +ADDRESS = 'leap@leap.se' + +  class KeyManagerUtilTestCase(BaseLeapTest):      def setUp(self): @@ -66,7 +77,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):      def test_build_key_from_dict(self):          kdict = { -            'address': 'leap@leap.se', +            'address': ADDRESS,              'key_id': 'key_id',              'fingerprint': 'fingerprint',              'key_data': 'key_data', @@ -77,7 +88,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):              'last_audited_at': 'last_audited_at',              'validation': 'validation',          } -        key = build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict) +        key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)          self.assertEqual(              kdict['address'], key.address,              'Wrong data in key.') @@ -111,9 +122,9 @@ class KeyManagerUtilTestCase(BaseLeapTest):      def test_keymanager_doc_id(self):          doc_id1 = keymanager_doc_id( -            OpenPGPKey, 'leap@leap.se', private=False) +            OpenPGPKey, ADDRESS, private=False)          doc_id2 = keymanager_doc_id( -            OpenPGPKey, 'leap@leap.se', private=True) +            OpenPGPKey, ADDRESS, private=True)          doc_id3 = keymanager_doc_id(              OpenPGPKey, 'user@leap.se', private=False)          doc_id4 = keymanager_doc_id( @@ -134,6 +145,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):              "123456",              secret_path=self.tempdir+"/secret.gpg",              local_db_path=self.tempdir+"/soledad.u1db", +            server_url='', +            cert_file=None,              bootstrap=False,          )          # initialize solead by hand for testing purposes @@ -144,7 +157,14 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):          self._soledad._init_db()      def tearDown(self): -        pass +        km = self._key_manager() +        for key in km.get_all_keys_in_local_db(): +            km._wrapper_map[key.__class__].delete_key(key) +        for key in km.get_all_keys_in_local_db(private=True): +            km._wrapper_map[key.__class__].delete_key(key) + +    def _key_manager(self, user=ADDRESS, url=''): +        return KeyManager(user, url, self._soledad)  class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -161,43 +181,43 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_openpgp_put_delete_key(self):          pgp = openpgp.OpenPGPScheme(self._soledad) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_key_raw(PUBLIC_KEY) -        key = pgp.get_key('leap@leap.se', private=False) +        key = pgp.get_key(ADDRESS, private=False)          pgp.delete_key(key) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_openpgp_put_key_raw(self):          pgp = openpgp.OpenPGPScheme(self._soledad) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_key_raw(PUBLIC_KEY) -        key = pgp.get_key('leap@leap.se', private=False) +        key = pgp.get_key(ADDRESS, private=False)          self.assertIsInstance(key, openpgp.OpenPGPKey)          self.assertEqual( -            'leap@leap.se', key.address, 'Wrong address bound to key.') +            ADDRESS, key.address, 'Wrong address bound to key.')          self.assertEqual(              '4096', key.length, 'Wrong key length.')          pgp.delete_key(key) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_get_public_key(self):          pgp = openpgp.OpenPGPScheme(self._soledad) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_key_raw(PUBLIC_KEY)          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) -        key = pgp.get_key('leap@leap.se', private=False) -        self.assertEqual('leap@leap.se', key.address) +            KeyNotFound, pgp.get_key, ADDRESS, private=True) +        key = pgp.get_key(ADDRESS, private=False) +        self.assertEqual(ADDRESS, key.address)          self.assertFalse(key.private)          self.assertEqual(KEY_FINGERPRINT, key.fingerprint)          pgp.delete_key(key) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_openpgp_encrypt_decrypt_asym(self):          # encrypt          pgp = openpgp.OpenPGPScheme(self._soledad)          pgp.put_key_raw(PUBLIC_KEY) -        pubkey = pgp.get_key('leap@leap.se', private=False) +        pubkey = pgp.get_key(ADDRESS, private=False)          cyphertext = openpgp.encrypt_asym('data', pubkey)          # assert          self.assertTrue(cyphertext is not None) @@ -208,16 +228,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          self.assertTrue(openpgp.is_encrypted(cyphertext))          # decrypt          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +            KeyNotFound, pgp.get_key, ADDRESS, private=True)          pgp.put_key_raw(PRIVATE_KEY) -        privkey = pgp.get_key('leap@leap.se', private=True) +        privkey = pgp.get_key(ADDRESS, private=True)          plaintext = openpgp.decrypt_asym(cyphertext, privkey)          pgp.delete_key(pubkey)          pgp.delete_key(privkey)          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=False) +            KeyNotFound, pgp.get_key, ADDRESS, private=False)          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +            KeyNotFound, pgp.get_key, ADDRESS, private=True)      def test_openpgp_encrypt_decrypt_sym(self):          cyphertext = openpgp.encrypt_sym('data', 'pass') @@ -234,23 +254,115 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):  class KeyManagerKeyManagementTestCase(      KeyManagerWithSoledadTestCase): -    def _key_manager(self, user='leap@leap.se', url=''): -        return KeyManager(user, url, self._soledad) -      def test_get_all_keys_in_db(self):          km = self._key_manager()          km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)          # get public keys          keys = km.get_all_keys_in_local_db(False)          self.assertEqual(len(keys), 1, 'Wrong number of keys') -        self.assertEqual('leap@leap.se', keys[0].address) +        self.assertEqual(ADDRESS, keys[0].address)          self.assertFalse(keys[0].private)          # get private keys          keys = km.get_all_keys_in_local_db(True)          self.assertEqual(len(keys), 1, 'Wrong number of keys') -        self.assertEqual('leap@leap.se', keys[0].address) +        self.assertEqual(ADDRESS, keys[0].address)          self.assertTrue(keys[0].private) +    def test_get_public_key(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY) +        # get the key +        key = km.get_key(ADDRESS, OpenPGPKey, private=False, +                         fetch_remote=False) +        self.assertTrue(key is not None) +        self.assertEqual(key.address, ADDRESS) +        self.assertEqual( +            key.fingerprint.lower(),  KEY_FINGERPRINT.lower()) +        self.assertFalse(key.private) + +    def test_get_private_key(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY) +        # get the key +        key = km.get_key(ADDRESS, OpenPGPKey, private=True, +                         fetch_remote=False) +        self.assertTrue(key is not None) +        self.assertEqual(key.address, ADDRESS) +        self.assertEqual( +            key.fingerprint.lower(), KEY_FINGERPRINT.lower()) +        self.assertTrue(key.private) + +    def test_send_key_raises_key_not_found(self): +        km = self._key_manager() +        self.assertRaises( +            KeyNotFound, +            km.send_key, OpenPGPKey, send_private=False) + +    def test_send_private_key_raises_key_not_found(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        self.assertRaises( +            KeyNotFound, +            km.send_key, OpenPGPKey, send_private=True, +            password='123') + +    def test_send_private_key_without_password_raises(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        self.assertRaises( +            NoPasswordGiven, +            km.send_key, OpenPGPKey, send_private=True) + +    def test_send_public_key(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        km._fetcher.put = mock.Mock() +        km.token = '123' +        km.send_key(OpenPGPKey, send_private=False) +        # setup args +        data = { +            'address': km._address, +            'keys': [ +                json.loads( +                    km.get_key( +                        km._address, OpenPGPKey).get_json()), +            ] +        } +        url = km._nickserver_url + '/key/' + km._address + +        km._fetcher.put.assert_called_once_with( +            url, data=data, auth=(km._address, '123') +        ) + +    def test_fetch_keys_from_server(self): +        km = self._key_manager() +        # setup mock + +        class Response(object): +            status_code = 200 +            headers = {'content-type': 'application/json'} +            def json(self): +                return {'address': 'anotheruser@leap.se', 'keys': []} + +        km._fetcher.get = mock.Mock( +            return_value=Response()) +        # do the fetch +        km.fetch_keys_from_server('anotheruser@leap.se') +        # and verify the call +        km._fetcher.get.assert_called_once_with( +           km._nickserver_url + '/key/' + 'anotheruser@leap.se', +        ) + +    def test_refresh_keys(self): +        # TODO: maybe we should not attempt to refresh our own public key? +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        km.fetch_keys_from_server = mock.Mock(return_value=[]) +        km.refresh_keys() +        km.fetch_keys_from_server.assert_called_once_with( +            'leap@leap.se' +        ) +  # Key material for testing  KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" | 
