diff options
Diffstat (limited to 'src/leap/common/tests')
| -rw-r--r-- | src/leap/common/tests/test_keymanager.py | 166 | 
1 files changed, 139 insertions, 27 deletions
| diff --git a/src/leap/common/tests/test_keymanager.py b/src/leap/common/tests/test_keymanager.py index 32bd1fd..1d7a382 100644 --- a/src/leap/common/tests/test_keymanager.py +++ b/src/leap/common/tests/test_keymanager.py @@ -21,6 +21,13 @@ Tests for the Key Manager.  """ +import mock +try: +    import simplejson as json +except ImportError: +    import json  # noqa + +  from leap.common.testing.basetest import BaseLeapTest  from leap.soledad import Soledad  from leap.soledad.crypto import SoledadCrypto @@ -30,6 +37,7 @@ from leap.common.keymanager import (      KeyManager,      openpgp,      KeyNotFound, +    NoPasswordGiven,      TAGS_INDEX,      TAGS_AND_PRIVATE_INDEX,  ) @@ -42,6 +50,9 @@ from leap.common.keymanager.keys import (  from leap.common.keymanager import errors +ADDRESS = 'leap@leap.se' + +  class KeyManagerUtilTestCase(BaseLeapTest):      def setUp(self): @@ -66,7 +77,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):      def test_build_key_from_dict(self):          kdict = { -            'address': 'leap@leap.se', +            'address': ADDRESS,              'key_id': 'key_id',              'fingerprint': 'fingerprint',              'key_data': 'key_data', @@ -77,7 +88,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):              'last_audited_at': 'last_audited_at',              'validation': 'validation',          } -        key = build_key_from_dict(OpenPGPKey, 'leap@leap.se', kdict) +        key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)          self.assertEqual(              kdict['address'], key.address,              'Wrong data in key.') @@ -111,9 +122,9 @@ class KeyManagerUtilTestCase(BaseLeapTest):      def test_keymanager_doc_id(self):          doc_id1 = keymanager_doc_id( -            OpenPGPKey, 'leap@leap.se', private=False) +            OpenPGPKey, ADDRESS, private=False)          doc_id2 = keymanager_doc_id( -            OpenPGPKey, 'leap@leap.se', private=True) +            OpenPGPKey, ADDRESS, private=True)          doc_id3 = keymanager_doc_id(              OpenPGPKey, 'user@leap.se', private=False)          doc_id4 = keymanager_doc_id( @@ -134,6 +145,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):              "123456",              secret_path=self.tempdir+"/secret.gpg",              local_db_path=self.tempdir+"/soledad.u1db", +            server_url='', +            cert_file=None,              bootstrap=False,          )          # initialize solead by hand for testing purposes @@ -144,7 +157,14 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):          self._soledad._init_db()      def tearDown(self): -        pass +        km = self._key_manager() +        for key in km.get_all_keys_in_local_db(): +            km._wrapper_map[key.__class__].delete_key(key) +        for key in km.get_all_keys_in_local_db(private=True): +            km._wrapper_map[key.__class__].delete_key(key) + +    def _key_manager(self, user=ADDRESS, url=''): +        return KeyManager(user, url, self._soledad)  class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -161,43 +181,43 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_openpgp_put_delete_key(self):          pgp = openpgp.OpenPGPScheme(self._soledad) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_key_raw(PUBLIC_KEY) -        key = pgp.get_key('leap@leap.se', private=False) +        key = pgp.get_key(ADDRESS, private=False)          pgp.delete_key(key) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_openpgp_put_key_raw(self):          pgp = openpgp.OpenPGPScheme(self._soledad) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_key_raw(PUBLIC_KEY) -        key = pgp.get_key('leap@leap.se', private=False) +        key = pgp.get_key(ADDRESS, private=False)          self.assertIsInstance(key, openpgp.OpenPGPKey)          self.assertEqual( -            'leap@leap.se', key.address, 'Wrong address bound to key.') +            ADDRESS, key.address, 'Wrong address bound to key.')          self.assertEqual(              '4096', key.length, 'Wrong key length.')          pgp.delete_key(key) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_get_public_key(self):          pgp = openpgp.OpenPGPScheme(self._soledad) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_key_raw(PUBLIC_KEY)          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) -        key = pgp.get_key('leap@leap.se', private=False) -        self.assertEqual('leap@leap.se', key.address) +            KeyNotFound, pgp.get_key, ADDRESS, private=True) +        key = pgp.get_key(ADDRESS, private=False) +        self.assertEqual(ADDRESS, key.address)          self.assertFalse(key.private)          self.assertEqual(KEY_FINGERPRINT, key.fingerprint)          pgp.delete_key(key) -        self.assertRaises(KeyNotFound, pgp.get_key, 'leap@leap.se') +        self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_openpgp_encrypt_decrypt_asym(self):          # encrypt          pgp = openpgp.OpenPGPScheme(self._soledad)          pgp.put_key_raw(PUBLIC_KEY) -        pubkey = pgp.get_key('leap@leap.se', private=False) +        pubkey = pgp.get_key(ADDRESS, private=False)          cyphertext = openpgp.encrypt_asym('data', pubkey)          # assert          self.assertTrue(cyphertext is not None) @@ -208,16 +228,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          self.assertTrue(openpgp.is_encrypted(cyphertext))          # decrypt          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +            KeyNotFound, pgp.get_key, ADDRESS, private=True)          pgp.put_key_raw(PRIVATE_KEY) -        privkey = pgp.get_key('leap@leap.se', private=True) +        privkey = pgp.get_key(ADDRESS, private=True)          plaintext = openpgp.decrypt_asym(cyphertext, privkey)          pgp.delete_key(pubkey)          pgp.delete_key(privkey)          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=False) +            KeyNotFound, pgp.get_key, ADDRESS, private=False)          self.assertRaises( -            KeyNotFound, pgp.get_key, 'leap@leap.se', private=True) +            KeyNotFound, pgp.get_key, ADDRESS, private=True)      def test_openpgp_encrypt_decrypt_sym(self):          cyphertext = openpgp.encrypt_sym('data', 'pass') @@ -234,23 +254,115 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):  class KeyManagerKeyManagementTestCase(      KeyManagerWithSoledadTestCase): -    def _key_manager(self, user='leap@leap.se', url=''): -        return KeyManager(user, url, self._soledad) -      def test_get_all_keys_in_db(self):          km = self._key_manager()          km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY)          # get public keys          keys = km.get_all_keys_in_local_db(False)          self.assertEqual(len(keys), 1, 'Wrong number of keys') -        self.assertEqual('leap@leap.se', keys[0].address) +        self.assertEqual(ADDRESS, keys[0].address)          self.assertFalse(keys[0].private)          # get private keys          keys = km.get_all_keys_in_local_db(True)          self.assertEqual(len(keys), 1, 'Wrong number of keys') -        self.assertEqual('leap@leap.se', keys[0].address) +        self.assertEqual(ADDRESS, keys[0].address)          self.assertTrue(keys[0].private) +    def test_get_public_key(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY) +        # get the key +        key = km.get_key(ADDRESS, OpenPGPKey, private=False, +                         fetch_remote=False) +        self.assertTrue(key is not None) +        self.assertEqual(key.address, ADDRESS) +        self.assertEqual( +            key.fingerprint.lower(),  KEY_FINGERPRINT.lower()) +        self.assertFalse(key.private) + +    def test_get_private_key(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PRIVATE_KEY) +        # get the key +        key = km.get_key(ADDRESS, OpenPGPKey, private=True, +                         fetch_remote=False) +        self.assertTrue(key is not None) +        self.assertEqual(key.address, ADDRESS) +        self.assertEqual( +            key.fingerprint.lower(), KEY_FINGERPRINT.lower()) +        self.assertTrue(key.private) + +    def test_send_key_raises_key_not_found(self): +        km = self._key_manager() +        self.assertRaises( +            KeyNotFound, +            km.send_key, OpenPGPKey, send_private=False) + +    def test_send_private_key_raises_key_not_found(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        self.assertRaises( +            KeyNotFound, +            km.send_key, OpenPGPKey, send_private=True, +            password='123') + +    def test_send_private_key_without_password_raises(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        self.assertRaises( +            NoPasswordGiven, +            km.send_key, OpenPGPKey, send_private=True) + +    def test_send_public_key(self): +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        km._fetcher.put = mock.Mock() +        km.token = '123' +        km.send_key(OpenPGPKey, send_private=False) +        # setup args +        data = { +            'address': km._address, +            'keys': [ +                json.loads( +                    km.get_key( +                        km._address, OpenPGPKey).get_json()), +            ] +        } +        url = km._nickserver_url + '/key/' + km._address + +        km._fetcher.put.assert_called_once_with( +            url, data=data, auth=(km._address, '123') +        ) + +    def test_fetch_keys_from_server(self): +        km = self._key_manager() +        # setup mock + +        class Response(object): +            status_code = 200 +            headers = {'content-type': 'application/json'} +            def json(self): +                return {'address': 'anotheruser@leap.se', 'keys': []} + +        km._fetcher.get = mock.Mock( +            return_value=Response()) +        # do the fetch +        km.fetch_keys_from_server('anotheruser@leap.se') +        # and verify the call +        km._fetcher.get.assert_called_once_with( +           km._nickserver_url + '/key/' + 'anotheruser@leap.se', +        ) + +    def test_refresh_keys(self): +        # TODO: maybe we should not attempt to refresh our own public key? +        km = self._key_manager() +        km._wrapper_map[OpenPGPKey].put_key_raw(PUBLIC_KEY) +        km.fetch_keys_from_server = mock.Mock(return_value=[]) +        km.refresh_keys() +        km.fetch_keys_from_server.assert_called_once_with( +            'leap@leap.se' +        ) +  # Key material for testing  KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" | 
