From 4612ba27e83e478baac5a5f8b3037608774934d6 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Sun, 1 May 2016 15:23:15 -0300 Subject: [feat] remove the keytype support - Resolves: #8031 --- src/leap/keymanager/__init__.py | 161 ++++++++++------------------------------ 1 file changed, 39 insertions(+), 122 deletions(-) (limited to 'src/leap/keymanager/__init__.py') diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index aa0a9ac..f72f403 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -71,9 +71,7 @@ from leap.common.decorators import memoized_method from leap.keymanager.errors import ( KeyNotFound, - KeyAddressMismatch, KeyNotValidUpgrade, - UnsupportedKeyTypeError, InvalidSignature ) from leap.keymanager.validation import ValidationLevels, can_upgrade @@ -138,11 +136,7 @@ class KeyManager(object): self.api_uri = api_uri self.api_version = api_version self.uid = uid - # a dict to map key types to their handlers - self._wrapper_map = { - OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary), - # other types of key will be added to this mapper. - } + self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary) # the following are used to perform https requests self._fetcher = requests self._combined_ca_bundle = self._create_combined_bundle_file() @@ -184,20 +178,6 @@ class KeyManager(object): return tmp_file.name - def _key_class_from_type(self, ktype): - """ - Given a class type, return a class - - :param ktype: string representation of a class name - :type ktype: str - - :return: A class with the matching name - :rtype: classobj or type - """ - return filter( - lambda klass: klass.__name__ == ktype, - self._wrapper_map).pop() - @defer.inlineCallbacks def _get_key_from_nicknym(self, address): """ @@ -349,7 +329,6 @@ class KeyManager(object): yield self.put_raw_key( server_keys['openpgp'], - OpenPGPKey, address=address, validation=validation_level) @@ -357,24 +336,19 @@ class KeyManager(object): # key management # - def send_key(self, ktype): + def send_key(self): """ - Send user's key of type ktype to provider. + Send user's key to provider. Public key bound to user's is sent to provider, which will sign it and replace any prior keys for the same address in its database. - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey - :return: A Deferred which fires when the key is sent, or which fails with KeyNotFound if the key was not found in local database. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - def send(pubkey): data = { self.PUBKEY_KEY: pubkey.key_data @@ -390,40 +364,33 @@ class KeyManager(object): return d d = self.get_key( - self._address, ktype, private=False, fetch_remote=False) + self._address, private=False, fetch_remote=False) d.addCallback(send) return d - def get_key(self, address, ktype, private=False, fetch_remote=True): + def get_key(self, address, private=False, fetch_remote=True): """ - Return a key of type ktype bound to address. + Return a key bound to address. First, search for the key in local storage. If it is not available, then try to fetch from nickserver. :param address: The address bound to the key. :type address: str - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param private: Look for a private key instead of a public one? :type private: bool :param fetch_remote: If key not found in local storage try to fetch from nickserver :type fetch_remote: bool - :return: A Deferred which fires with an EncryptionKey of type ktype - bound to address, or which fails with KeyNotFound if no key - was found neither locally or in keyserver. + :return: A Deferred which fires with an EncryptionKey bound to address, + or which fails with KeyNotFound if no key was found neither + locally or in keyserver. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) logger.debug("getting key for %s" % (address,)) - leap_assert( - ktype in self._wrapper_map, - 'Unkown key type: %s.' % str(ktype)) - _keys = self._wrapper_map[ktype] emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) @@ -445,12 +412,12 @@ class KeyManager(object): emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) d = self._fetch_keys_from_server(address) d.addCallback( - lambda _: _keys.get_key(address, private=False)) + lambda _: self._openpgp.get_key(address, private=False)) d.addCallback(key_found) return d # return key if it exists in local database - d = _keys.get_key(address, private=private) + d = self._openpgp.get_key(address, private=private) d.addCallbacks(key_found, key_not_found) return d @@ -467,9 +434,7 @@ class KeyManager(object): # TODO: should it be based on activedocs? def build_keys(docs): return map( - lambda doc: build_key_from_dict( - self._key_class_from_type(doc.content['type']), - doc.content), + lambda doc: build_key_from_dict(OpenPGPKey, doc.content), docs) # XXX: there is no check that the soledad indexes are ready, as it @@ -484,20 +449,15 @@ class KeyManager(object): d.addCallback(build_keys) return d - def gen_key(self, ktype): + def gen_key(self): """ - Generate a key of type ktype bound to the user's address. - - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey + Generate a key bound to the user's address. :return: A Deferred which fires with the generated EncryptionKey. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def signal_finished(key): emit_async( @@ -506,7 +466,7 @@ class KeyManager(object): emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address) - d = _keys.gen_key(self._address) + d = self.openpg.gen_key(self._address) d.addCallback(signal_finished) return d @@ -564,7 +524,7 @@ class KeyManager(object): # encrypt/decrypt and sign/verify API # - def encrypt(self, data, address, ktype, passphrase=None, sign=None, + def encrypt(self, data, address, passphrase=None, sign=None, cipher_algo='AES256', fetch_remote=True): """ Encrypt data with the public key bound to address and sign with with @@ -574,8 +534,6 @@ class KeyManager(object): :type data: str :param address: The address to encrypt it for. :type address: str - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param passphrase: The passphrase for the secret key used for the signature. :type passphrase: str @@ -595,30 +553,28 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] @defer.inlineCallbacks def encrypt(keys): pubkey, signkey = keys - encrypted = yield _keys.encrypt( + encrypted = yield self._openpgp.encrypt( data, pubkey, passphrase, sign=signkey, cipher_algo=cipher_algo) if not pubkey.encr_used: pubkey.encr_used = True - yield _keys.put_key(pubkey) + yield self._openpgp.put_key(pubkey) defer.returnValue(encrypted) - dpub = self.get_key(address, ktype, private=False, + dpub = self.get_key(address, private=False, fetch_remote=fetch_remote) dpriv = defer.succeed(None) if sign is not None: - dpriv = self.get_key(sign, ktype, private=True) + dpriv = self.get_key(sign, private=True) d = defer.gatherResults([dpub, dpriv], consumeErrors=True) d.addCallbacks(encrypt, self._extract_first_error) return d - def decrypt(self, data, address, ktype, passphrase=None, verify=None, + def decrypt(self, data, address, passphrase=None, verify=None, fetch_remote=True): """ Decrypt data using private key from address and verify with public key @@ -628,8 +584,6 @@ class KeyManager(object): :type data: str :param address: The address to whom data was encrypted. :type address: str - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param passphrase: The passphrase for the secret key used for decryption. :type passphrase: str @@ -649,13 +603,11 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] @defer.inlineCallbacks def decrypt(keys): pubkey, privkey = keys - decrypted, signed = yield _keys.decrypt( + decrypted, signed = yield self._openpgp.decrypt( data, privkey, passphrase=passphrase, verify=pubkey) if pubkey is None: signature = KeyNotFound(verify) @@ -663,7 +615,7 @@ class KeyManager(object): signature = pubkey if not pubkey.sign_used: pubkey.sign_used = True - yield _keys.put_key(pubkey) + yield self._openpgp.put_key(pubkey) defer.returnValue((decrypted, signature)) else: signature = InvalidSignature( @@ -671,10 +623,10 @@ class KeyManager(object): (pubkey.fingerprint,)) defer.returnValue((decrypted, signature)) - dpriv = self.get_key(address, ktype, private=True) + dpriv = self.get_key(address, private=True) dpub = defer.succeed(None) if verify is not None: - dpub = self.get_key(verify, ktype, private=False, + dpub = self.get_key(verify, private=False, fetch_remote=fetch_remote) dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f) d = defer.gatherResults([dpub, dpriv], consumeErrors=True) @@ -684,7 +636,7 @@ class KeyManager(object): def _extract_first_error(self, failure): return failure.value.subFailure - def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False, + def sign(self, data, address, digest_algo='SHA512', clearsign=False, detach=True, binary=False): """ Sign data with private key bound to address. @@ -693,8 +645,6 @@ class KeyManager(object): :type data: str :param address: The address to be used to sign. :type address: EncryptionKey - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param digest_algo: The hash digest to use. :type digest_algo: str :param clearsign: If True, create a cleartext signature. @@ -712,19 +662,17 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def sign(privkey): - return _keys.sign( + return self._openpgp.sign( data, privkey, digest_algo=digest_algo, clearsign=clearsign, detach=detach, binary=binary) - d = self.get_key(address, ktype, private=True) + d = self.get_key(address, private=True) d.addCallback(sign) return d - def verify(self, data, address, ktype, detached_sig=None, + def verify(self, data, address, detached_sig=None, fetch_remote=True): """ Verify signed data with private key bound to address, eventually using @@ -734,8 +682,6 @@ class KeyManager(object): :type data: str :param address: The address to be used to verify. :type address: EncryptionKey - :param ktype: The type of the key. - :type ktype: subclass of EncryptionKey :param detached_sig: A detached signature. If given, C{data} is verified using this detached signature. :type detached_sig: str @@ -751,16 +697,14 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def verify(pubkey): - signed = _keys.verify( + signed = self._openpgp.verify( data, pubkey, detached_sig=detached_sig) if signed: if not pubkey.sign_used: pubkey.sign_used = True - d = _keys.put_key(pubkey) + d = self._openpgp.put_key(pubkey) d.addCallback(lambda _: pubkey) return d return pubkey @@ -769,7 +713,7 @@ class KeyManager(object): 'Failed to verify signature with key %s' % (pubkey.fingerprint,)) - d = self.get_key(address, ktype, private=False, + d = self.get_key(address, private=False, fetch_remote=fetch_remote) d.addCallback(verify) return d @@ -787,9 +731,7 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(type(key)) - _keys = self._wrapper_map[type(key)] - return _keys.delete_key(key) + return self._openpgp.delete_key(key) def put_key(self, key): """ @@ -805,9 +747,6 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - ktype = type(key) - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] def old_key_not_found(failure): if failure.check(KeyNotFound): @@ -817,26 +756,24 @@ class KeyManager(object): def check_upgrade(old_key): if key.private or can_upgrade(key, old_key): - return _keys.put_key(key) + return self._openpgp.put_key(key) else: raise KeyNotValidUpgrade( "Key %s can not be upgraded by new key %s" % (old_key.fingerprint, key.fingerprint)) - d = _keys.get_key(key.address, private=key.private) + d = self._openpgp.get_key(key.address, private=key.private) d.addErrback(old_key_not_found) d.addCallback(check_upgrade) return d - def put_raw_key(self, key, ktype, address, + def put_raw_key(self, key, address, validation=ValidationLevels.Weak_Chain): """ Put raw key bound to address in local storage. :param key: The ascii key to be stored :type key: str - :param ktype: the type of the key. - :type ktype: subclass of EncryptionKey :param address: address for which this key will be active :type address: str :param validation: validation level for this key @@ -853,10 +790,7 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] - - pubkey, privkey = _keys.parse_key(key, address) + pubkey, privkey = self._openpgp.parse_key(key, address) if pubkey is None: return defer.fail(KeyNotFound(key)) @@ -868,8 +802,7 @@ class KeyManager(object): return d @defer.inlineCallbacks - def fetch_key(self, address, uri, ktype, - validation=ValidationLevels.Weak_Chain): + def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain): """ Fetch a public key bound to address from the network and put it in local storage. @@ -878,8 +811,6 @@ class KeyManager(object): :type address: str :param uri: The URI of the key. :type uri: str - :param ktype: the type of the key. - :type ktype: subclass of EncryptionKey :param validation: validation level for this key (default: 'Weak_Chain') :type validation: ValidationLevels @@ -893,32 +824,18 @@ class KeyManager(object): :raise UnsupportedKeyTypeError: if invalid key type """ - self._assert_supported_key_type(ktype) - _keys = self._wrapper_map[ktype] logger.info("Fetch key for %s from %s" % (address, uri)) ascii_content = yield self._get_with_combined_ca_bundle(uri) # XXX parse binary keys - pubkey, _ = _keys.parse_key(ascii_content, address) + pubkey, _ = self._openpgp.parse_key(ascii_content, address) if pubkey is None: raise KeyNotFound(uri) pubkey.validation = validation yield self.put_key(pubkey) - def _assert_supported_key_type(self, ktype): - """ - Check if ktype is one of the supported key types - - :param ktype: the type of the key. - :type ktype: subclass of EncryptionKey - - :raise UnsupportedKeyTypeError: if invalid key type - """ - if ktype not in self._wrapper_map: - raise UnsupportedKeyTypeError(str(ktype)) - def _split_email(address): """ -- cgit v1.2.3