From 4612ba27e83e478baac5a5f8b3037608774934d6 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Sun, 1 May 2016 15:23:15 -0300 Subject: [feat] remove the keytype support - Resolves: #8031 --- changes/next-changelog.txt | 2 + src/leap/keymanager/__init__.py | 161 +++++++-------------------- src/leap/keymanager/tests/__init__.py | 5 +- src/leap/keymanager/tests/test_keymanager.py | 147 +++++++++++------------- src/leap/keymanager/tests/test_validation.py | 94 ++++++++-------- 5 files changed, 154 insertions(+), 255 deletions(-) diff --git a/changes/next-changelog.txt b/changes/next-changelog.txt index 09b2e71..7fad645 100644 --- a/changes/next-changelog.txt +++ b/changes/next-changelog.txt @@ -10,6 +10,8 @@ I've added a new category `Misc` so we can track doc/style/packaging stuff. Features ~~~~~~~~ +- `#8031 `_: Remove support for multiple key types. + - `#1234 `_: Description of the new feature corresponding with issue #1234. - New feature without related issue number. diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index aa0a9ac..f72f403 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -71,9 +71,7 @@ from leap.common.decorators import memoized_method from leap.keymanager.errors import ( KeyNotFound, - KeyAddressMismatch, KeyNotValidUpgrade, - UnsupportedKeyTypeError, InvalidSignature ) from leap.keymanager.validation import ValidationLevels, can_upgrade @@ -138,11 +136,7 @@ class KeyManager(object): self.api_uri = api_uri self.api_version = api_version self.uid = uid - # a dict to map key types to their handlers - self._wrapper_map = { - OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary), - # other types of key will be added to this mapper. - } + self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary) # the following are used to perform https requests self._fetcher = requests self._combined_ca_bundle = self._create_combined_bundle_file() @@ -184,20 +178,6 @@ class KeyManager(object): return tmp_file.name - def _key_class_from_type(self, ktype): - """ - Given a class type, return a class - - :param ktype: string representation of a class name - :type ktype: str - - :return: A class with the matching name - :rtype: classobj or type - """ - return filter( - lambda klass: klass.__name__ == ktype, - self._wrapper_map).pop() - @defer.inlineCallbacks def _get_key_from_nicknym(self, address): """ @@ -349,7 +329,6 @@ class KeyManager(object): yield self.put_raw_key( server_keys['openpgp'], - OpenPGPKey, address=address, validation=validation_level) @@ -357,24 +336,19 @@ class KeyManager(object): # key management # - def send_key(self, ktype): + def send_key(self): """ - Send user's key of type ktype to provider. + Send user's key to provider. Public key bound to user's is sent to provider, which will sign it and replace any prior keys for the same address in its database. - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey - :return: A Deferred which fires when the key is sent, or which fails with KeyNotFound if the key was not found in local database. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - def send(pubkey): data = { self.PUBKEY_KEY: pubkey.key_data @@ -390,40 +364,33 @@ class KeyManager(object): return d d = self.get_key( - self._address, ktype, private=False, fetch_remote=False) + self._address, private=False, fetch_remote=False) d.addCallback(send) return d - def get_key(self, address, ktype, private=False, fetch_remote=True): + def get_key(self, address, private=False, fetch_remote=True): """ - Return a key of type ktype bound to address. + Return a key bound to address. First, search for the key in local storage. If it is not available, then try to fetch from nickserver. :param address: The address bound to the key. :type address: str - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param private: Look for a private key instead of a public one? :type private: bool :param fetch_remote: If key not found in local storage try to fetch from nickserver :type fetch_remote: bool - :return: A Deferred which fires with an EncryptionKey of type ktype - bound to address, or which fails with KeyNotFound if no key - was found neither locally or in keyserver. + :return: A Deferred which fires with an EncryptionKey bound to address, + or which fails with KeyNotFound if no key was found neither + locally or in keyserver. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) logger.debug("getting key for %s" % (address,)) - leap_assert( - ktype in self._wrapper_map, - 'Unkown key type: %s.' % str(ktype)) - _keys = self._wrapper_map[ktype] emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) @@ -445,12 +412,12 @@ class KeyManager(object): emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) d = self._fetch_keys_from_server(address) d.addCallback( - lambda _: _keys.get_key(address, private=False)) + lambda _: self._openpgp.get_key(address, private=False)) d.addCallback(key_found) return d # return key if it exists in local database - d = _keys.get_key(address, private=private) + d = self._openpgp.get_key(address, private=private) d.addCallbacks(key_found, key_not_found) return d @@ -467,9 +434,7 @@ class KeyManager(object): # TODO: should it be based on activedocs? def build_keys(docs): return map( - lambda doc: build_key_from_dict( - self._key_class_from_type(doc.content['type']), - doc.content), + lambda doc: build_key_from_dict(OpenPGPKey, doc.content), docs) # XXX: there is no check that the soledad indexes are ready, as it @@ -484,20 +449,15 @@ class KeyManager(object): d.addCallback(build_keys) return d - def gen_key(self, ktype): + def gen_key(self): """ - Generate a key of type ktype bound to the user's address. - - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey + Generate a key bound to the user's address. :return: A Deferred which fires with the generated EncryptionKey. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def signal_finished(key): emit_async( @@ -506,7 +466,7 @@ class KeyManager(object): emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address) - d = _keys.gen_key(self._address) + d = self.openpg.gen_key(self._address) d.addCallback(signal_finished) return d @@ -564,7 +524,7 @@ class KeyManager(object): # encrypt/decrypt and sign/verify API # - def encrypt(self, data, address, ktype, passphrase=None, sign=None, + def encrypt(self, data, address, passphrase=None, sign=None, cipher_algo='AES256', fetch_remote=True): """ Encrypt data with the public key bound to address and sign with with @@ -574,8 +534,6 @@ class KeyManager(object): :type data: str :param address: The address to encrypt it for. :type address: str - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param passphrase: The passphrase for the secret key used for the signature. :type passphrase: str @@ -595,30 +553,28 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] @defer.inlineCallbacks def encrypt(keys): pubkey, signkey = keys - encrypted = yield _keys.encrypt( + encrypted = yield self._openpgp.encrypt( data, pubkey, passphrase, sign=signkey, cipher_algo=cipher_algo) if not pubkey.encr_used: pubkey.encr_used = True - yield _keys.put_key(pubkey) + yield self._openpgp.put_key(pubkey) defer.returnValue(encrypted) - dpub = self.get_key(address, ktype, private=False, + dpub = self.get_key(address, private=False, fetch_remote=fetch_remote) dpriv = defer.succeed(None) if sign is not None: - dpriv = self.get_key(sign, ktype, private=True) + dpriv = self.get_key(sign, private=True) d = defer.gatherResults([dpub, dpriv], consumeErrors=True) d.addCallbacks(encrypt, self._extract_first_error) return d - def decrypt(self, data, address, ktype, passphrase=None, verify=None, + def decrypt(self, data, address, passphrase=None, verify=None, fetch_remote=True): """ Decrypt data using private key from address and verify with public key @@ -628,8 +584,6 @@ class KeyManager(object): :type data: str :param address: The address to whom data was encrypted. :type address: str - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param passphrase: The passphrase for the secret key used for decryption. :type passphrase: str @@ -649,13 +603,11 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] @defer.inlineCallbacks def decrypt(keys): pubkey, privkey = keys - decrypted, signed = yield _keys.decrypt( + decrypted, signed = yield self._openpgp.decrypt( data, privkey, passphrase=passphrase, verify=pubkey) if pubkey is None: signature = KeyNotFound(verify) @@ -663,7 +615,7 @@ class KeyManager(object): signature = pubkey if not pubkey.sign_used: pubkey.sign_used = True - yield _keys.put_key(pubkey) + yield self._openpgp.put_key(pubkey) defer.returnValue((decrypted, signature)) else: signature = InvalidSignature( @@ -671,10 +623,10 @@ class KeyManager(object): (pubkey.fingerprint,)) defer.returnValue((decrypted, signature)) - dpriv = self.get_key(address, ktype, private=True) + dpriv = self.get_key(address, private=True) dpub = defer.succeed(None) if verify is not None: - dpub = self.get_key(verify, ktype, private=False, + dpub = self.get_key(verify, private=False, fetch_remote=fetch_remote) dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f) d = defer.gatherResults([dpub, dpriv], consumeErrors=True) @@ -684,7 +636,7 @@ class KeyManager(object): def _extract_first_error(self, failure): return failure.value.subFailure - def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False, + def sign(self, data, address, digest_algo='SHA512', clearsign=False, detach=True, binary=False): """ Sign data with private key bound to address. @@ -693,8 +645,6 @@ class KeyManager(object): :type data: str :param address: The address to be used to sign. :type address: EncryptionKey - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param digest_algo: The hash digest to use. :type digest_algo: str :param clearsign: If True, create a cleartext signature. @@ -712,19 +662,17 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def sign(privkey): - return _keys.sign( + return self._openpgp.sign( data, privkey, digest_algo=digest_algo, clearsign=clearsign, detach=detach, binary=binary) - d = self.get_key(address, ktype, private=True) + d = self.get_key(address, private=True) d.addCallback(sign) return d - def verify(self, data, address, ktype, detached_sig=None, + def verify(self, data, address, detached_sig=None, fetch_remote=True): """ Verify signed data with private key bound to address, eventually using @@ -734,8 +682,6 @@ class KeyManager(object): :type data: str :param address: The address to be used to verify. :type address: EncryptionKey - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param detached_sig: A detached signature. If given, C{data} is verified using this detached signature. :type detached_sig: str @@ -751,16 +697,14 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def verify(pubkey): - signed = _keys.verify( + signed = self._openpgp.verify( data, pubkey, detached_sig=detached_sig) if signed: if not pubkey.sign_used: pubkey.sign_used = True - d = _keys.put_key(pubkey) + d = self._openpgp.put_key(pubkey) d.addCallback(lambda _: pubkey) return d return pubkey @@ -769,7 +713,7 @@ class KeyManager(object): 'Failed to verify signature with key %s' % (pubkey.fingerprint,)) - d = self.get_key(address, ktype, private=False, + d = self.get_key(address, private=False, fetch_remote=fetch_remote) d.addCallback(verify) return d @@ -787,9 +731,7 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(type(key)) - _keys = self._wrapper_map[type(key)] - return _keys.delete_key(key) + return self._openpgp.delete_key(key) def put_key(self, key): """ @@ -805,9 +747,6 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - ktype = type(key) - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def old_key_not_found(failure): if failure.check(KeyNotFound): @@ -817,26 +756,24 @@ class KeyManager(object): def check_upgrade(old_key): if key.private or can_upgrade(key, old_key): - return _keys.put_key(key) + return self._openpgp.put_key(key) else: raise KeyNotValidUpgrade( "Key %s can not be upgraded by new key %s" % (old_key.fingerprint, key.fingerprint)) - d = _keys.get_key(key.address, private=key.private) + d = self._openpgp.get_key(key.address, private=key.private) d.addErrback(old_key_not_found) d.addCallback(check_upgrade) return d - def put_raw_key(self, key, ktype, address, + def put_raw_key(self, key, address, validation=ValidationLevels.Weak_Chain): """ Put raw key bound to address in local storage. :param key: The ascii key to be stored :type key: str - :param ktype: the type of the key. - :type ktype: subclass of EncryptionKey :param address: address for which this key will be active :type address: str :param validation: validation level for this key @@ -853,10 +790,7 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] - - pubkey, privkey = _keys.parse_key(key, address) + pubkey, privkey = self._openpgp.parse_key(key, address) if pubkey is None: return defer.fail(KeyNotFound(key)) @@ -868,8 +802,7 @@ class KeyManager(object): return d @defer.inlineCallbacks - def fetch_key(self, address, uri, ktype, - validation=ValidationLevels.Weak_Chain): + def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain): """ Fetch a public key bound to address from the network and put it in local storage. @@ -878,8 +811,6 @@ class KeyManager(object): :type address: str :param uri: The URI of the key. :type uri: str - :param ktype: the type of the key. - :type ktype: subclass of EncryptionKey :param validation: validation level for this key (default: 'Weak_Chain') :type validation: ValidationLevels @@ -893,32 +824,18 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] logger.info("Fetch key for %s from %s" % (address, uri)) ascii_content = yield self._get_with_combined_ca_bundle(uri) # XXX parse binary keys - pubkey, _ = _keys.parse_key(ascii_content, address) + pubkey, _ = self._openpgp.parse_key(ascii_content, address) if pubkey is None: raise KeyNotFound(uri) pubkey.validation = validation yield self.put_key(pubkey) - def _assert_supported_key_type(self, ktype): - """ - Check if ktype is one of the supported key types - - :param ktype: the type of the key. - :type ktype: subclass of EncryptionKey - - :raise UnsupportedKeyTypeError: if invalid key type - """ - if ktype not in self._wrapper_map: - raise UnsupportedKeyTypeError(str(ktype)) - def _split_email(address): """ diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py index 2a6a3f1..a20e1fd 100644 --- a/src/leap/keymanager/tests/__init__.py +++ b/src/leap/keymanager/tests/__init__.py @@ -27,7 +27,6 @@ from twisted.trial import unittest from leap.common.testing.basetest import BaseLeapTest from leap.soledad.client import Soledad from leap.keymanager import KeyManager -from leap.keymanager.openpgp import OpenPGPKey PATH = os.path.dirname(os.path.realpath(__file__)) @@ -56,7 +55,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): km = self._key_manager() # wait for the indexes to be ready for the tear down - d = km._wrapper_map[OpenPGPKey].deferred_init + d = km._openpgp.deferred_init d.addCallback(lambda _: self.delete_all_keys(km)) d.addCallback(lambda _: self.tearDownEnv()) d.addCallback(lambda _: self._soledad.close()) @@ -66,7 +65,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): def delete_keys(keys): deferreds = [] for key in keys: - d = km._wrapper_map[key.__class__].delete_key(key) + d = km._openpgp.delete_key(key) deferreds.append(d) return gatherResults(deferreds) diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 14f47f6..c55a3c3 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -34,13 +34,9 @@ from twisted.web._responses import NOT_FOUND from leap.keymanager import client -from leap.keymanager import ( - KeyNotFound, - KeyAddressMismatch, - errors -) -from leap.keymanager.openpgp import OpenPGPKey +from leap.keymanager import errors from leap.keymanager.keys import ( + OpenPGPKey, is_address, build_key_from_dict, ) @@ -140,7 +136,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): @defer.inlineCallbacks def test_get_all_keys_in_db(self): km = self._key_manager() - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) # get public keys keys = yield km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') @@ -155,10 +151,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): @defer.inlineCallbacks def test_get_public_key(self): km = self._key_manager() - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) # get the key - key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, - fetch_remote=False) + key = yield km.get_key(ADDRESS, private=False, fetch_remote=False) self.assertTrue(key is not None) self.assertTrue(ADDRESS in key.uids) self.assertEqual( @@ -168,10 +163,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): @defer.inlineCallbacks def test_get_public_key_with_binary_private_key(self): km = self._key_manager() - yield km._wrapper_map[OpenPGPKey].put_raw_key(self.get_private_binary_key(), ADDRESS) + yield km._openpgp.put_raw_key(self.get_private_binary_key(), ADDRESS) # get the key - key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, - fetch_remote=False) + key = yield km.get_key(ADDRESS, private=False, fetch_remote=False) self.assertTrue(key is not None) self.assertTrue(ADDRESS in key.uids) self.assertEqual( @@ -181,10 +175,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): @defer.inlineCallbacks def test_get_private_key(self): km = self._key_manager() - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) # get the key - key = yield km.get_key(ADDRESS, OpenPGPKey, private=True, - fetch_remote=False) + key = yield km.get_key(ADDRESS, private=True, fetch_remote=False) self.assertTrue(key is not None) self.assertTrue(ADDRESS in key.uids) self.assertEqual( @@ -193,8 +186,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_send_key_raises_key_not_found(self): km = self._key_manager() - d = km.send_key(OpenPGPKey) - return self.assertFailure(d, KeyNotFound) + d = km.send_key() + return self.assertFailure(d, errors.KeyNotFound) @defer.inlineCallbacks def test_send_key(self): @@ -203,7 +196,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ token = "mytoken" km = self._key_manager(token=token) - yield km._wrapper_map[OpenPGPKey].put_raw_key(PUBLIC_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS) km._async_client_pinned.request = Mock(return_value=defer.succeed('')) # the following data will be used on the send km.ca_cert_path = 'capath' @@ -211,9 +204,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km.uid = 'myuid' km.api_uri = 'apiuri' km.api_version = 'apiver' - yield km.send_key(OpenPGPKey) + yield km.send_key() # setup expected args - pubkey = yield km.get_key(km._address, OpenPGPKey) + pubkey = yield km.get_key(km._address) data = urllib.urlencode({ km.PUBKEY_KEY: pubkey.key_data, }) @@ -312,10 +305,10 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._async_client_pinned.request = Mock(return_value=defer.succeed(None)) km.ca_cert_path = 'cacertpath' # try to key get without fetching from server - d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False) - d = self.assertFailure(d_fail, KeyNotFound) + d_fail = km.get_key(address, fetch_remote=False) + d = self.assertFailure(d_fail, errors.KeyNotFound) # try to get key fetching from server. - d.addCallback(lambda _: km.get_key(address, OpenPGPKey)) + d.addCallback(lambda _: km.get_key(address)) return d @defer.inlineCallbacks @@ -325,8 +318,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) - yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - key = yield km.get_key(ADDRESS, OpenPGPKey) + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + key = yield km.get_key(ADDRESS) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.uids) @@ -337,8 +330,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url=NICKSERVER_URI) - yield km.put_raw_key(self.get_public_binary_key(), OpenPGPKey, ADDRESS) - key = yield km.get_key(ADDRESS, OpenPGPKey) + yield km.put_raw_key(self.get_public_binary_key(), ADDRESS) + key = yield km.get_key(ADDRESS) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.uids) @@ -353,8 +346,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY)) - yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) - key = yield km.get_key(ADDRESS, OpenPGPKey) + yield km.fetch_key(ADDRESS, "http://site.domain/key") + key = yield km.get_key(ADDRESS) self.assertEqual(KEY_FINGERPRINT, key.fingerprint) @defer.inlineCallbacks @@ -365,10 +358,11 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager() - km._async_client.request = Mock(return_value=defer.succeed(self.get_public_binary_key())) + km._async_client.request = Mock( + return_value=defer.succeed(self.get_public_binary_key())) - yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) - key = yield km.get_key(ADDRESS, OpenPGPKey) + yield km.fetch_key(ADDRESS, "http://site.domain/key") + key = yield km.get_key(ADDRESS) self.assertEqual(KEY_FINGERPRINT, key.fingerprint) def test_fetch_uri_empty_key(self): @@ -382,8 +376,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): content = "" km._fetcher.get = Mock(return_value=Response()) - d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) - return self.assertFailure(d, KeyNotFound) + d = km.fetch_key(ADDRESS, "http://site.domain/key") + return self.assertFailure(d, errors.KeyNotFound) def test_fetch_uri_address_differ(self): """ @@ -393,8 +387,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km = self._key_manager() km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY)) - d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey) - return self.assertFailure(d, KeyAddressMismatch) + d = km.fetch_key(ADDRESS_2, "http://site.domain/key") + return self.assertFailure(d, errors.KeyAddressMismatch) def _mock_get_response(self, km, body): km._async_client.request = MagicMock(return_value=defer.succeed(body)) @@ -407,7 +401,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km = self._key_manager(ca_cert_path=ca_cert_path) get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) - yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -417,7 +411,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km = self._key_manager(ca_cert_path=ca_cert_path) get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) - yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -427,7 +421,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km = self._key_manager(ca_cert_path=ca_cert_path) get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) - yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -445,7 +439,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km = self._key_manager(ca_cert_path=ca_cert_path) get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) - yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey) + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) # assert that combined bundle file is passed to get call get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') @@ -470,16 +464,15 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_updates_sign_used_for_signer(self): # given km = self._key_manager() - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) - yield km._wrapper_map[OpenPGPKey].put_raw_key( - PRIVATE_KEY_2, ADDRESS_2) - encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey, - sign=ADDRESS_2, fetch_remote=False) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2, + fetch_remote=False) yield km.decrypt( - encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) + encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False) # when - key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False) + key = yield km.get_key(ADDRESS_2, fetch_remote=False) # then self.assertEqual(True, key.sign_used) @@ -488,18 +481,16 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_does_not_update_sign_used_for_recipient(self): # given km = self._key_manager() - yield km._wrapper_map[OpenPGPKey].put_raw_key( - PRIVATE_KEY, ADDRESS) - yield km._wrapper_map[OpenPGPKey].put_raw_key( - PRIVATE_KEY_2, ADDRESS_2) - encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey, - sign=ADDRESS_2, fetch_remote=False) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2, + fetch_remote=False) yield km.decrypt( - encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) + encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False) # when key = yield km.get_key( - ADDRESS, OpenPGPKey, private=False, fetch_remote=False) + ADDRESS, private=False, fetch_remote=False) # then self.assertEqual(False, key.sign_used) @@ -513,35 +504,32 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_encrypt_decrypt(self): km = self._key_manager() # put raw private key - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) - yield km._wrapper_map[OpenPGPKey].put_raw_key( - PRIVATE_KEY_2, ADDRESS_2) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) # encrypt - encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey, - sign=ADDRESS_2, fetch_remote=False) + encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2, + fetch_remote=False) self.assertNotEqual(self.RAW_DATA, encdata) # decrypt rawdata, signingkey = yield km.decrypt( - encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) + encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False) self.assertEqual(self.RAW_DATA, rawdata) - key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False, - fetch_remote=False) + key = yield km.get_key(ADDRESS_2, private=False, fetch_remote=False) self.assertEqual(signingkey.fingerprint, key.fingerprint) @defer.inlineCallbacks def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self): km = self._key_manager() # put raw keys - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) - yield km._wrapper_map[OpenPGPKey].put_raw_key( - PRIVATE_KEY_2, ADDRESS_2) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) # encrypt - encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey, - sign=ADDRESS_2, fetch_remote=False) + encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2, + fetch_remote=False) self.assertNotEqual(self.RAW_DATA, encdata) # verify rawdata, signingkey = yield km.decrypt( - encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False) + encdata, ADDRESS, verify=ADDRESS, fetch_remote=False) self.assertEqual(self.RAW_DATA, rawdata) self.assertTrue(isinstance(signingkey, errors.InvalidSignature)) @@ -549,24 +537,21 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_sign_verify(self): km = self._key_manager() # put raw private keys - yield km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) - signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey, - detach=False) + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + signdata = yield km.sign(self.RAW_DATA, ADDRESS, detach=False) self.assertNotEqual(self.RAW_DATA, signdata) # verify - signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey, - fetch_remote=False) - key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, - fetch_remote=False) + signingkey = yield km.verify(signdata, ADDRESS, fetch_remote=False) + key = yield km.get_key(ADDRESS, private=False, fetch_remote=False) self.assertEqual(signingkey.fingerprint, key.fingerprint) def test_keymanager_encrypt_key_not_found(self): km = self._key_manager() - d = km._wrapper_map[OpenPGPKey].put_raw_key(PRIVATE_KEY, ADDRESS) + d = km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) d.addCallback( - lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey, - sign=ADDRESS, fetch_remote=False)) - return self.assertFailure(d, KeyNotFound) + lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, sign=ADDRESS, + fetch_remote=False)) + return self.assertFailure(d, errors.KeyNotFound) if __name__ == "__main__": import unittest diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py index 44d6e39..954b1df 100644 --- a/src/leap/keymanager/tests/test_validation.py +++ b/src/leap/keymanager/tests/test_validation.py @@ -22,7 +22,6 @@ import unittest from datetime import datetime from twisted.internet.defer import inlineCallbacks -from leap.keymanager.openpgp import OpenPGPKey from leap.keymanager.errors import ( KeyNotValidUpgrade ) @@ -43,64 +42,62 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase): @inlineCallbacks def test_none_old_key(self): km = self._key_manager() - yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.fingerprint, KEY_FINGERPRINT) @inlineCallbacks def test_cant_upgrade(self): km = self._key_manager() - yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, + yield km.put_raw_key(PUBLIC_KEY, ADDRESS, validation=ValidationLevels.Provider_Trust) - d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) + d = km.put_raw_key(UNRELATED_KEY, ADDRESS) yield self.assertFailure(d, KeyNotValidUpgrade) @inlineCallbacks def test_fingerprint_level(self): km = self._key_manager() - yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + yield km.put_raw_key(UNRELATED_KEY, ADDRESS, validation=ValidationLevels.Fingerprint) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) @inlineCallbacks def test_expired_key(self): km = self._key_manager() - yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) - yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(EXPIRED_KEY, ADDRESS) + yield km.put_raw_key(UNRELATED_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) @inlineCallbacks def test_expired_fail_lower_level(self): km = self._key_manager() yield km.put_raw_key( - EXPIRED_KEY, OpenPGPKey, ADDRESS, + EXPIRED_KEY, ADDRESS, validation=ValidationLevels.Third_Party_Endorsement) d = km.put_raw_key( - UNRELATED_KEY, - OpenPGPKey, - ADDRESS, + UNRELATED_KEY, ADDRESS, validation=ValidationLevels.Provider_Trust) yield self.assertFailure(d, KeyNotValidUpgrade) @inlineCallbacks def test_roll_back(self): km = self._key_manager() - yield km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) - yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(EXPIRED_KEY_UPDATED, ADDRESS) + yield km.put_raw_key(EXPIRED_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) @inlineCallbacks def test_not_used(self): km = self._key_manager() - yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS, + yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS, validation=ValidationLevels.Provider_Trust) - yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, + yield km.put_raw_key(UNRELATED_KEY, ADDRESS, validation=ValidationLevels.Provider_Endorsement) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) @inlineCallbacks @@ -108,16 +105,16 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase): TEXT = "some text" km = self._key_manager() - yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS) - signature = yield km.sign(TEXT, ADDRESS, OpenPGPKey) + yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS) + signature = yield km.sign(TEXT, ADDRESS) yield self.delete_all_keys(km) - yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS) - yield km.encrypt(TEXT, ADDRESS, OpenPGPKey) - yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature) + yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS) + yield km.encrypt(TEXT, ADDRESS) + yield km.verify(TEXT, ADDRESS, detached_sig=signature) d = km.put_raw_key( - UNRELATED_KEY, OpenPGPKey, ADDRESS, + UNRELATED_KEY, ADDRESS, validation=ValidationLevels.Provider_Endorsement) yield self.assertFailure(d, KeyNotValidUpgrade) @@ -126,28 +123,27 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase): TEXT = "some text" km = self._key_manager() - yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS) - yield km.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2) - encrypted = yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey, - sign=ADDRESS) + yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS) + yield km.put_raw_key(PUBLIC_KEY_2, ADDRESS_2) + encrypted = yield km.encrypt(TEXT, ADDRESS_2, sign=ADDRESS) yield self.delete_all_keys(km) - yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS) - yield km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2) - yield km.encrypt(TEXT, ADDRESS, OpenPGPKey) - yield km.decrypt(encrypted, ADDRESS_2, OpenPGPKey, verify=ADDRESS) + yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS) + yield km.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + yield km.encrypt(TEXT, ADDRESS) + yield km.decrypt(encrypted, ADDRESS_2, verify=ADDRESS) d = km.put_raw_key( - UNRELATED_KEY, OpenPGPKey, ADDRESS, + UNRELATED_KEY, ADDRESS, validation=ValidationLevels.Provider_Endorsement) yield self.assertFailure(d, KeyNotValidUpgrade) @inlineCallbacks def test_signed_key(self): km = self._key_manager() - yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - yield km.put_raw_key(SIGNED_KEY, OpenPGPKey, ADDRESS) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + yield km.put_raw_key(SIGNED_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT) @inlineCallbacks @@ -155,26 +151,26 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase): TEXT = "some text" km = self._key_manager() - yield km.put_raw_key(UUIDS_PRIVATE, OpenPGPKey, ADDRESS_2) - signature = yield km.sign(TEXT, ADDRESS_2, OpenPGPKey) + yield km.put_raw_key(UUIDS_PRIVATE, ADDRESS_2) + signature = yield km.sign(TEXT, ADDRESS_2) yield self.delete_all_keys(km) - yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS_2) - yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS) - yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey) - yield km.verify(TEXT, ADDRESS_2, OpenPGPKey, detached_sig=signature) + yield km.put_raw_key(UUIDS_KEY, ADDRESS_2) + yield km.put_raw_key(UUIDS_KEY, ADDRESS) + yield km.encrypt(TEXT, ADDRESS_2) + yield km.verify(TEXT, ADDRESS_2, detached_sig=signature) d = km.put_raw_key( - PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2, + PUBLIC_KEY_2, ADDRESS_2, validation=ValidationLevels.Provider_Endorsement) yield self.assertFailure(d, KeyNotValidUpgrade) - key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False) + key = yield km.get_key(ADDRESS_2, fetch_remote=False) self.assertEqual(key.fingerprint, UUIDS_FINGERPRINT) yield km.put_raw_key( - PUBLIC_KEY, OpenPGPKey, ADDRESS, + PUBLIC_KEY, ADDRESS, validation=ValidationLevels.Provider_Endorsement) - key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + key = yield km.get_key(ADDRESS, fetch_remote=False) self.assertEqual(key.fingerprint, KEY_FINGERPRINT) -- cgit v1.2.3