summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager/__init__.py')
-rw-r--r--src/leap/keymanager/__init__.py161
1 files changed, 39 insertions, 122 deletions
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index aa0a9ac..f72f403 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -71,9 +71,7 @@ from leap.common.decorators import memoized_method
from leap.keymanager.errors import (
KeyNotFound,
- KeyAddressMismatch,
KeyNotValidUpgrade,
- UnsupportedKeyTypeError,
InvalidSignature
)
from leap.keymanager.validation import ValidationLevels, can_upgrade
@@ -138,11 +136,7 @@ class KeyManager(object):
self.api_uri = api_uri
self.api_version = api_version
self.uid = uid
- # a dict to map key types to their handlers
- self._wrapper_map = {
- OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary),
- # other types of key will be added to this mapper.
- }
+ self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary)
# the following are used to perform https requests
self._fetcher = requests
self._combined_ca_bundle = self._create_combined_bundle_file()
@@ -184,20 +178,6 @@ class KeyManager(object):
return tmp_file.name
- def _key_class_from_type(self, ktype):
- """
- Given a class type, return a class
-
- :param ktype: string representation of a class name
- :type ktype: str
-
- :return: A class with the matching name
- :rtype: classobj or type
- """
- return filter(
- lambda klass: klass.__name__ == ktype,
- self._wrapper_map).pop()
-
@defer.inlineCallbacks
def _get_key_from_nicknym(self, address):
"""
@@ -349,7 +329,6 @@ class KeyManager(object):
yield self.put_raw_key(
server_keys['openpgp'],
- OpenPGPKey,
address=address,
validation=validation_level)
@@ -357,24 +336,19 @@ class KeyManager(object):
# key management
#
- def send_key(self, ktype):
+ def send_key(self):
"""
- Send user's key of type ktype to provider.
+ Send user's key to provider.
Public key bound to user's is sent to provider, which will sign it and
replace any prior keys for the same address in its database.
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
-
:return: A Deferred which fires when the key is sent, or which fails
with KeyNotFound if the key was not found in local database.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
-
def send(pubkey):
data = {
self.PUBKEY_KEY: pubkey.key_data
@@ -390,40 +364,33 @@ class KeyManager(object):
return d
d = self.get_key(
- self._address, ktype, private=False, fetch_remote=False)
+ self._address, private=False, fetch_remote=False)
d.addCallback(send)
return d
- def get_key(self, address, ktype, private=False, fetch_remote=True):
+ def get_key(self, address, private=False, fetch_remote=True):
"""
- Return a key of type ktype bound to address.
+ Return a key bound to address.
First, search for the key in local storage. If it is not available,
then try to fetch from nickserver.
:param address: The address bound to the key.
:type address: str
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param private: Look for a private key instead of a public one?
:type private: bool
:param fetch_remote: If key not found in local storage try to fetch
from nickserver
:type fetch_remote: bool
- :return: A Deferred which fires with an EncryptionKey of type ktype
- bound to address, or which fails with KeyNotFound if no key
- was found neither locally or in keyserver.
+ :return: A Deferred which fires with an EncryptionKey bound to address,
+ or which fails with KeyNotFound if no key was found neither
+ locally or in keyserver.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
logger.debug("getting key for %s" % (address,))
- leap_assert(
- ktype in self._wrapper_map,
- 'Unkown key type: %s.' % str(ktype))
- _keys = self._wrapper_map[ktype]
emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
@@ -445,12 +412,12 @@ class KeyManager(object):
emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
d = self._fetch_keys_from_server(address)
d.addCallback(
- lambda _: _keys.get_key(address, private=False))
+ lambda _: self._openpgp.get_key(address, private=False))
d.addCallback(key_found)
return d
# return key if it exists in local database
- d = _keys.get_key(address, private=private)
+ d = self._openpgp.get_key(address, private=private)
d.addCallbacks(key_found, key_not_found)
return d
@@ -467,9 +434,7 @@ class KeyManager(object):
# TODO: should it be based on activedocs?
def build_keys(docs):
return map(
- lambda doc: build_key_from_dict(
- self._key_class_from_type(doc.content['type']),
- doc.content),
+ lambda doc: build_key_from_dict(OpenPGPKey, doc.content),
docs)
# XXX: there is no check that the soledad indexes are ready, as it
@@ -484,20 +449,15 @@ class KeyManager(object):
d.addCallback(build_keys)
return d
- def gen_key(self, ktype):
+ def gen_key(self):
"""
- Generate a key of type ktype bound to the user's address.
-
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
+ Generate a key bound to the user's address.
:return: A Deferred which fires with the generated EncryptionKey.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def signal_finished(key):
emit_async(
@@ -506,7 +466,7 @@ class KeyManager(object):
emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
- d = _keys.gen_key(self._address)
+ d = self.openpg.gen_key(self._address)
d.addCallback(signal_finished)
return d
@@ -564,7 +524,7 @@ class KeyManager(object):
# encrypt/decrypt and sign/verify API
#
- def encrypt(self, data, address, ktype, passphrase=None, sign=None,
+ def encrypt(self, data, address, passphrase=None, sign=None,
cipher_algo='AES256', fetch_remote=True):
"""
Encrypt data with the public key bound to address and sign with with
@@ -574,8 +534,6 @@ class KeyManager(object):
:type data: str
:param address: The address to encrypt it for.
:type address: str
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param passphrase: The passphrase for the secret key used for the
signature.
:type passphrase: str
@@ -595,30 +553,28 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
@defer.inlineCallbacks
def encrypt(keys):
pubkey, signkey = keys
- encrypted = yield _keys.encrypt(
+ encrypted = yield self._openpgp.encrypt(
data, pubkey, passphrase, sign=signkey,
cipher_algo=cipher_algo)
if not pubkey.encr_used:
pubkey.encr_used = True
- yield _keys.put_key(pubkey)
+ yield self._openpgp.put_key(pubkey)
defer.returnValue(encrypted)
- dpub = self.get_key(address, ktype, private=False,
+ dpub = self.get_key(address, private=False,
fetch_remote=fetch_remote)
dpriv = defer.succeed(None)
if sign is not None:
- dpriv = self.get_key(sign, ktype, private=True)
+ dpriv = self.get_key(sign, private=True)
d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
d.addCallbacks(encrypt, self._extract_first_error)
return d
- def decrypt(self, data, address, ktype, passphrase=None, verify=None,
+ def decrypt(self, data, address, passphrase=None, verify=None,
fetch_remote=True):
"""
Decrypt data using private key from address and verify with public key
@@ -628,8 +584,6 @@ class KeyManager(object):
:type data: str
:param address: The address to whom data was encrypted.
:type address: str
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param passphrase: The passphrase for the secret key used for
decryption.
:type passphrase: str
@@ -649,13 +603,11 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
@defer.inlineCallbacks
def decrypt(keys):
pubkey, privkey = keys
- decrypted, signed = yield _keys.decrypt(
+ decrypted, signed = yield self._openpgp.decrypt(
data, privkey, passphrase=passphrase, verify=pubkey)
if pubkey is None:
signature = KeyNotFound(verify)
@@ -663,7 +615,7 @@ class KeyManager(object):
signature = pubkey
if not pubkey.sign_used:
pubkey.sign_used = True
- yield _keys.put_key(pubkey)
+ yield self._openpgp.put_key(pubkey)
defer.returnValue((decrypted, signature))
else:
signature = InvalidSignature(
@@ -671,10 +623,10 @@ class KeyManager(object):
(pubkey.fingerprint,))
defer.returnValue((decrypted, signature))
- dpriv = self.get_key(address, ktype, private=True)
+ dpriv = self.get_key(address, private=True)
dpub = defer.succeed(None)
if verify is not None:
- dpub = self.get_key(verify, ktype, private=False,
+ dpub = self.get_key(verify, private=False,
fetch_remote=fetch_remote)
dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f)
d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
@@ -684,7 +636,7 @@ class KeyManager(object):
def _extract_first_error(self, failure):
return failure.value.subFailure
- def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False,
+ def sign(self, data, address, digest_algo='SHA512', clearsign=False,
detach=True, binary=False):
"""
Sign data with private key bound to address.
@@ -693,8 +645,6 @@ class KeyManager(object):
:type data: str
:param address: The address to be used to sign.
:type address: EncryptionKey
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param digest_algo: The hash digest to use.
:type digest_algo: str
:param clearsign: If True, create a cleartext signature.
@@ -712,19 +662,17 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def sign(privkey):
- return _keys.sign(
+ return self._openpgp.sign(
data, privkey, digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
- d = self.get_key(address, ktype, private=True)
+ d = self.get_key(address, private=True)
d.addCallback(sign)
return d
- def verify(self, data, address, ktype, detached_sig=None,
+ def verify(self, data, address, detached_sig=None,
fetch_remote=True):
"""
Verify signed data with private key bound to address, eventually using
@@ -734,8 +682,6 @@ class KeyManager(object):
:type data: str
:param address: The address to be used to verify.
:type address: EncryptionKey
- :param ktype: The type of the key.
- :type ktype: subclass of EncryptionKey
:param detached_sig: A detached signature. If given, C{data} is
verified using this detached signature.
:type detached_sig: str
@@ -751,16 +697,14 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def verify(pubkey):
- signed = _keys.verify(
+ signed = self._openpgp.verify(
data, pubkey, detached_sig=detached_sig)
if signed:
if not pubkey.sign_used:
pubkey.sign_used = True
- d = _keys.put_key(pubkey)
+ d = self._openpgp.put_key(pubkey)
d.addCallback(lambda _: pubkey)
return d
return pubkey
@@ -769,7 +713,7 @@ class KeyManager(object):
'Failed to verify signature with key %s' %
(pubkey.fingerprint,))
- d = self.get_key(address, ktype, private=False,
+ d = self.get_key(address, private=False,
fetch_remote=fetch_remote)
d.addCallback(verify)
return d
@@ -787,9 +731,7 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(type(key))
- _keys = self._wrapper_map[type(key)]
- return _keys.delete_key(key)
+ return self._openpgp.delete_key(key)
def put_key(self, key):
"""
@@ -805,9 +747,6 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- ktype = type(key)
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
def old_key_not_found(failure):
if failure.check(KeyNotFound):
@@ -817,26 +756,24 @@ class KeyManager(object):
def check_upgrade(old_key):
if key.private or can_upgrade(key, old_key):
- return _keys.put_key(key)
+ return self._openpgp.put_key(key)
else:
raise KeyNotValidUpgrade(
"Key %s can not be upgraded by new key %s"
% (old_key.fingerprint, key.fingerprint))
- d = _keys.get_key(key.address, private=key.private)
+ d = self._openpgp.get_key(key.address, private=key.private)
d.addErrback(old_key_not_found)
d.addCallback(check_upgrade)
return d
- def put_raw_key(self, key, ktype, address,
+ def put_raw_key(self, key, address,
validation=ValidationLevels.Weak_Chain):
"""
Put raw key bound to address in local storage.
:param key: The ascii key to be stored
:type key: str
- :param ktype: the type of the key.
- :type ktype: subclass of EncryptionKey
:param address: address for which this key will be active
:type address: str
:param validation: validation level for this key
@@ -853,10 +790,7 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
-
- pubkey, privkey = _keys.parse_key(key, address)
+ pubkey, privkey = self._openpgp.parse_key(key, address)
if pubkey is None:
return defer.fail(KeyNotFound(key))
@@ -868,8 +802,7 @@ class KeyManager(object):
return d
@defer.inlineCallbacks
- def fetch_key(self, address, uri, ktype,
- validation=ValidationLevels.Weak_Chain):
+ def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain):
"""
Fetch a public key bound to address from the network and put it in
local storage.
@@ -878,8 +811,6 @@ class KeyManager(object):
:type address: str
:param uri: The URI of the key.
:type uri: str
- :param ktype: the type of the key.
- :type ktype: subclass of EncryptionKey
:param validation: validation level for this key
(default: 'Weak_Chain')
:type validation: ValidationLevels
@@ -893,32 +824,18 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(ktype)
- _keys = self._wrapper_map[ktype]
logger.info("Fetch key for %s from %s" % (address, uri))
ascii_content = yield self._get_with_combined_ca_bundle(uri)
# XXX parse binary keys
- pubkey, _ = _keys.parse_key(ascii_content, address)
+ pubkey, _ = self._openpgp.parse_key(ascii_content, address)
if pubkey is None:
raise KeyNotFound(uri)
pubkey.validation = validation
yield self.put_key(pubkey)
- def _assert_supported_key_type(self, ktype):
- """
- Check if ktype is one of the supported key types
-
- :param ktype: the type of the key.
- :type ktype: subclass of EncryptionKey
-
- :raise UnsupportedKeyTypeError: if invalid key type
- """
- if ktype not in self._wrapper_map:
- raise UnsupportedKeyTypeError(str(ktype))
-
def _split_email(address):
"""