diff options
author | Kali Kaneko <kali@leap.se> | 2015-02-11 14:01:20 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-02-11 14:01:20 -0400 |
commit | 8d8d5f2a3d9dbefcf98a5e3d7245a5f13360a88e (patch) | |
tree | 4e91d829a7185be9cb127592747eb72866d47f77 /src | |
parent | c794991d65d987b59014d067e730976684307775 (diff) | |
parent | 82d027b1f471517213bdcdc773dc8eea677fe330 (diff) |
Merge branch 'feature/async-api' into develop
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/keymanager/__init__.py | 518 | ||||
-rw-r--r-- | src/leap/keymanager/errors.py | 6 | ||||
-rw-r--r-- | src/leap/keymanager/keys.py | 95 | ||||
-rw-r--r-- | src/leap/keymanager/openpgp.py | 489 | ||||
-rw-r--r-- | src/leap/keymanager/tests/__init__.py | 124 | ||||
-rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 418 | ||||
-rw-r--r-- | src/leap/keymanager/tests/test_openpgp.py | 252 | ||||
-rw-r--r-- | src/leap/keymanager/tests/test_validation.py | 222 | ||||
-rw-r--r-- | src/leap/keymanager/validation.py | 35 |
9 files changed, 1377 insertions, 782 deletions
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index e4be9c47..a1a59f54 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -25,7 +25,7 @@ try: assert(GPGUtilities) # pyflakes happy from gnupg import __version__ as _gnupg_version from pkg_resources import parse_version - assert(parse_version(_gnupg_version) >= parse_version('1.2.3')) + assert(parse_version(_gnupg_version) >= parse_version('1.4.0')) except (ImportError, AssertionError): print "*******" @@ -43,7 +43,9 @@ except (ImportError, AssertionError): import logging import requests -from leap.common.check import leap_assert, leap_assert_type +from twisted.internet import defer + +from leap.common.check import leap_assert from leap.common.events import signal from leap.common.events import events_pb2 as proto from leap.common.decorators import memoized_method @@ -51,12 +53,13 @@ from leap.common.decorators import memoized_method from leap.keymanager.errors import ( KeyNotFound, KeyAddressMismatch, - KeyNotValidUpgrade + KeyNotValidUpgrade, + UnsupportedKeyTypeError, + InvalidSignature ) from leap.keymanager.validation import ValidationLevel, can_upgrade from leap.keymanager.keys import ( - EncryptionKey, build_key_from_dict, KEYMANAGER_KEY_TAG, TAGS_PRIVATE_INDEX, @@ -197,15 +200,20 @@ class KeyManager(object): @memoized_method(invalidation=300) def _fetch_keys_from_server(self, address): """ - Fetch keys bound to C{address} from nickserver and insert them in + Fetch keys bound to address from nickserver and insert them in local database. :param address: The address bound to the keys. :type address: str - :raise KeyNotFound: If the key was not found on nickserver. + :return: A Deferred which fires when the key is in the storage, + or which fails with KeyNotFound if the key was not found on + nickserver. + :rtype: Deferred + """ # request keys from the nickserver + d = defer.succeed(None) res = None try: res = self._get(self._nickserver_uri, {'address': address}) @@ -213,18 +221,22 @@ class KeyManager(object): server_keys = res.json() # insert keys in local database if self.OPENPGP_KEY in server_keys: - self.put_raw_key( + d = self.put_raw_key( server_keys['openpgp'], OpenPGPKey, address=address, validation=ValidationLevel.Provider_Trust) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: - raise KeyNotFound(address) + d = defer.fail(KeyNotFound(address)) + else: + d = defer.fail(e) logger.warning("HTTP error retrieving key: %r" % (e,)) logger.warning("%s" % (res.content,)) except Exception as e: + d = defer.fail(e) logger.warning("Error retrieving key: %r" % (e,)) + return d # # key management @@ -232,7 +244,7 @@ class KeyManager(object): def send_key(self, ktype): """ - Send user's key of type C{ktype} to provider. + Send user's key of type ktype to provider. Public key bound to user's is sent to provider, which will sign it and replace any prior keys for the same address in its database. @@ -240,27 +252,33 @@ class KeyManager(object): :param ktype: The type of the key. :type ktype: subclass of EncryptionKey - :raise KeyNotFound: If the key was not found in local database. + :return: A Deferred which fires when the key is sent, or which fails + with KeyNotFound if the key was not found in local database. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ - leap_assert( - ktype is OpenPGPKey, - 'For now we only know how to send OpenPGP public keys.') - # prepare the public key bound to address - pubkey = self.get_key( + self._assert_supported_key_type(ktype) + + def send(pubkey): + data = { + self.PUBKEY_KEY: pubkey.key_data + } + uri = "%s/%s/users/%s.json" % ( + self._api_uri, + self._api_version, + self._uid) + self._put(uri, data) + signal(proto.KEYMANAGER_DONE_UPLOADING_KEYS, self._address) + + d = self.get_key( self._address, ktype, private=False, fetch_remote=False) - data = { - self.PUBKEY_KEY: pubkey.key_data - } - uri = "%s/%s/users/%s.json" % ( - self._api_uri, - self._api_version, - self._uid) - self._put(uri, data) - signal(proto.KEYMANAGER_DONE_UPLOADING_KEYS, self._address) + d.addCallback(send) + return d def get_key(self, address, ktype, private=False, fetch_remote=True): """ - Return a key of type C{ktype} bound to C{address}. + Return a key of type ktype bound to address. First, search for the key in local storage. If it is not available, then try to fetch from nickserver. @@ -275,36 +293,47 @@ class KeyManager(object): from nickserver :type fetch_remote: bool - :return: A key of type C{ktype} bound to C{address}. - :rtype: EncryptionKey - :raise KeyNotFound: If the key was not found both locally and in - keyserver. + :return: A Deferred which fires with an EncryptionKey of type ktype + bound to address, or which fails with KeyNotFound if no key + was found neither locally or in keyserver. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ + self._assert_supported_key_type(ktype) logger.debug("getting key for %s" % (address,)) leap_assert( ktype in self._wrapper_map, 'Unkown key type: %s.' % str(ktype)) - try: - signal(proto.KEYMANAGER_LOOKING_FOR_KEY, address) - # return key if it exists in local database - key = self._wrapper_map[ktype].get_key(address, private=private) - signal(proto.KEYMANAGER_KEY_FOUND, address) + signal(proto.KEYMANAGER_LOOKING_FOR_KEY, address) + def key_found(key): + signal(proto.KEYMANAGER_KEY_FOUND, address) return key - except KeyNotFound: + + def key_not_found(failure): + if not failure.check(KeyNotFound): + return failure + signal(proto.KEYMANAGER_KEY_NOT_FOUND, address) # we will only try to fetch a key from nickserver if fetch_remote # is True and the key is not private. if fetch_remote is False or private is True: - raise + return failure signal(proto.KEYMANAGER_LOOKING_FOR_KEY, address) - self._fetch_keys_from_server(address) # might raise KeyNotFound - key = self._wrapper_map[ktype].get_key(address, private=False) - signal(proto.KEYMANAGER_KEY_FOUND, address) - - return key + d = self._fetch_keys_from_server(address) + d.addCallback( + lambda _: + self._wrapper_map[ktype].get_key(address, private=False)) + d.addCallback(key_found) + return d + + # return key if it exists in local database + d = self._wrapper_map[ktype].get_key(address, private=private) + d.addCallbacks(key_found, key_not_found) + return d def get_all_keys(self, private=False): """ @@ -313,33 +342,50 @@ class KeyManager(object): :param private: Include private keys :type private: bool - :return: A list with all keys in local db. - :rtype: list - """ - return map( - lambda doc: build_key_from_dict( - self._key_class_from_type(doc.content['type']), - doc.content), - self._soledad.get_from_index( - TAGS_PRIVATE_INDEX, - KEYMANAGER_KEY_TAG, - '1' if private else '0')) + :return: A Deferred which fires with a list of all keys in local db. + :rtype: Deferred + """ + def build_keys(docs): + return map( + lambda doc: build_key_from_dict( + self._key_class_from_type(doc.content['type']), + doc.content), + docs) + + # XXX: there is no check that the soledad indexes are ready, as it + # happens with EncryptionScheme. + # The usecases right now are not problematic. This could be solve + # adding a keytype to this funciont and moving the soledad request + # to the EncryptionScheme. + d = self._soledad.get_from_index( + TAGS_PRIVATE_INDEX, + KEYMANAGER_KEY_TAG, + '1' if private else '0') + d.addCallback(build_keys) + return d def gen_key(self, ktype): """ - Generate a key of type C{ktype} bound to the user's address. + Generate a key of type ktype bound to the user's address. :param ktype: The type of the key. :type ktype: subclass of EncryptionKey - :return: The generated key. - :rtype: EncryptionKey + :return: A Deferred which fires with the generated EncryptionKey. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ - signal(proto.KEYMANAGER_STARTED_KEY_GENERATION, self._address) - key = self._wrapper_map[ktype].gen_key(self._address) - signal(proto.KEYMANAGER_FINISHED_KEY_GENERATION, self._address) + self._assert_supported_key_type(ktype) - return key + def signal_finished(key): + signal(proto.KEYMANAGER_FINISHED_KEY_GENERATION, self._address) + return key + + signal(proto.KEYMANAGER_STARTED_KEY_GENERATION, self._address) + d = self._wrapper_map[ktype].gen_key(self._address) + d.addCallback(signal_finished) + return d # # Setters/getters @@ -395,70 +441,132 @@ class KeyManager(object): # encrypt/decrypt and sign/verify API # - def encrypt(self, data, pubkey, passphrase=None, sign=None, - cipher_algo='AES256'): + def encrypt(self, data, address, ktype, passphrase=None, sign=None, + cipher_algo='AES256', fetch_remote=True): """ - Encrypt C{data} using public @{key} and sign with C{sign} key. + Encrypt data with the public key bound to address and sign with with + the private key bound to sign address. :param data: The data to be encrypted. :type data: str - :param pubkey: The key used to encrypt. - :type pubkey: EncryptionKey + :param address: The address to encrypt it for. + :type address: str + :param ktype: The type of the key. + :type ktype: subclass of EncryptionKey :param passphrase: The passphrase for the secret key used for the signature. :type passphrase: str - :param sign: The key used for signing. - :type sign: EncryptionKey + :param sign: The address to be used for signature. + :type sign: str :param cipher_algo: The cipher algorithm to use. :type cipher_algo: str + :param fetch_remote: If key is not found in local storage try to fetch + from nickserver + :type fetch_remote: bool - :return: The encrypted data. - :rtype: str - """ - leap_assert_type(pubkey, EncryptionKey) - leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') - leap_assert(pubkey.private is False, 'Key is not public.') - return self._wrapper_map[pubkey.__class__].encrypt( - data, pubkey, passphrase, sign, cipher_algo=cipher_algo) - - def decrypt(self, data, privkey, passphrase=None, verify=None): - """ - Decrypt C{data} using private @{privkey} and verify with C{verify} key. + :return: A Deferred which fires with the encrypted data as str, or + which fails with KeyNotFound if no keys were found neither + locally or in keyserver or fails with EncryptError if failed + encrypting for some reason. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + self._assert_supported_key_type(ktype) + + def encrypt(keys): + pubkey, signkey = keys + encrypted = self._wrapper_map[ktype].encrypt( + data, pubkey, passphrase, sign=signkey, + cipher_algo=cipher_algo) + pubkey.encr_used = True + d = self._wrapper_map[ktype].put_key(pubkey, address) + d.addCallback(lambda _: encrypted) + return d + + dpub = self.get_key(address, ktype, private=False, + fetch_remote=fetch_remote) + dpriv = defer.succeed(None) + if sign is not None: + dpriv = self.get_key(sign, ktype, private=True) + d = defer.gatherResults([dpub, dpriv], consumeErrors=True) + d.addCallbacks(encrypt, self._extract_first_error) + return d + + def decrypt(self, data, address, ktype, passphrase=None, verify=None, + fetch_remote=True): + """ + Decrypt data using private key from address and verify with public key + bound to verify address. :param data: The data to be decrypted. :type data: str - :param privkey: The key used to decrypt. - :type privkey: OpenPGPKey + :param address: The address to whom data was encrypted. + :type address: str + :param ktype: The type of the key. + :type ktype: subclass of EncryptionKey :param passphrase: The passphrase for the secret key used for decryption. :type passphrase: str - :param verify: The key used to verify a signature. - :type verify: OpenPGPKey - - :return: The decrypted data. - :rtype: str - - :raise InvalidSignature: Raised if unable to verify the signature with - C{verify} key. - """ - leap_assert_type(privkey, EncryptionKey) - leap_assert( - privkey.__class__ in self._wrapper_map, - 'Unknown key type.') - leap_assert(privkey.private is True, 'Key is not private.') - return self._wrapper_map[privkey.__class__].decrypt( - data, privkey, passphrase, verify) + :param verify: The address to be used for signature. + :type verify: str + :param fetch_remote: If key for verify not found in local storage try + to fetch from nickserver + :type fetch_remote: bool - def sign(self, data, privkey, digest_algo='SHA512', clearsign=False, + :return: A Deferred which fires with: + * (decripted str, signing key) if validation works + * (decripted str, KeyNotFound) if signing key not found + * (decripted str, InvalidSignature) if signature is invalid + * KeyNotFound failure if private key not found + * DecryptError failure if decription failed + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + self._assert_supported_key_type(ktype) + + def decrypt(keys): + pubkey, privkey = keys + decrypted, signed = self._wrapper_map[ktype].decrypt( + data, privkey, passphrase=passphrase, verify=pubkey) + if pubkey is None: + signature = KeyNotFound(verify) + elif signed: + pubkey.sign_used = True + d = self._wrapper_map[ktype].put_key(pubkey, address) + d.addCallback(lambda _: (decrypted, pubkey)) + return d + else: + signature = InvalidSignature( + 'Failed to verify signature with key %s' % + (pubkey.key_id,)) + return (decrypted, signature) + + dpriv = self.get_key(address, ktype, private=True) + dpub = defer.succeed(None) + if verify is not None: + dpub = self.get_key(verify, ktype, private=False, + fetch_remote=fetch_remote) + dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f) + d = defer.gatherResults([dpub, dpriv], consumeErrors=True) + d.addCallbacks(decrypt, self._extract_first_error) + return d + + def _extract_first_error(self, failure): + return failure.value.subFailure + + def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False, detach=True, binary=False): """ - Sign C{data} with C{privkey}. + Sign data with private key bound to address. :param data: The data to be signed. :type data: str - - :param privkey: The private key to be used to sign. - :type privkey: EncryptionKey + :param address: The address to be used to sign. + :type address: EncryptionKey + :param ktype: The type of the key. + :type ktype: subclass of EncryptionKey :param digest_algo: The hash digest to use. :type digest_algo: str :param clearsign: If True, create a cleartext signature. @@ -468,93 +576,137 @@ class KeyManager(object): :param binary: If True, do not ascii armour the output. :type binary: bool - :return: The signed data. - :rtype: str + :return: A Deferred which fires with the signed data as str or fails + with KeyNotFound if no key was found neither locally or in + keyserver or fails with SignFailed if there was any error + signing. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ - leap_assert_type(privkey, EncryptionKey) - leap_assert( - privkey.__class__ in self._wrapper_map, - 'Unknown key type.') - leap_assert(privkey.private is True, 'Key is not private.') - return self._wrapper_map[privkey.__class__].sign( - data, privkey, digest_algo=digest_algo, clearsign=clearsign, - detach=detach, binary=binary) + self._assert_supported_key_type(ktype) - def verify(self, data, pubkey, detached_sig=None): + def sign(privkey): + return self._wrapper_map[ktype].sign( + data, privkey, digest_algo=digest_algo, clearsign=clearsign, + detach=detach, binary=binary) + + d = self.get_key(address, ktype, private=True) + d.addCallback(sign) + return d + + def verify(self, data, address, ktype, detached_sig=None, + fetch_remote=True): """ - Verify signed C{data} with C{pubkey}, eventually using - C{detached_sig}. + Verify signed data with private key bound to address, eventually using + detached_sig. :param data: The data to be verified. :type data: str - :param pubkey: The public key to be used on verification. - :type pubkey: EncryptionKey + :param address: The address to be used to verify. + :type address: EncryptionKey + :param ktype: The type of the key. + :type ktype: subclass of EncryptionKey :param detached_sig: A detached signature. If given, C{data} is verified using this detached signature. :type detached_sig: str + :param fetch_remote: If key for verify not found in local storage try + to fetch from nickserver + :type fetch_remote: bool - :return: signature matches - :rtype: bool - """ - leap_assert_type(pubkey, EncryptionKey) - leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') - leap_assert(pubkey.private is False, 'Key is not public.') - return self._wrapper_map[pubkey.__class__].verify( - data, pubkey, detached_sig=detached_sig) + :return: A Deferred which fires with the signing EncryptionKey if + signature verifies, or which fails with InvalidSignature if + signature don't verifies or fails with KeyNotFound if no key + was found neither locally or in keyserver. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + self._assert_supported_key_type(ktype) + + def verify(pubkey): + signed = self._wrapper_map[ktype].verify( + data, pubkey, detached_sig=detached_sig) + if signed: + pubkey.sign_used = True + d = self._wrapper_map[ktype].put_key(pubkey, address) + d.addCallback(lambda _: pubkey) + return d + else: + raise InvalidSignature( + 'Failed to verify signature with key %s' % + (pubkey.key_id,)) + + d = self.get_key(address, ktype, private=False, + fetch_remote=fetch_remote) + d.addCallback(verify) + return d def delete_key(self, key): """ - Remove C{key} from storage. - - May raise: - openpgp.errors.KeyNotFound - openpgp.errors.KeyAttributesDiffer + Remove key from storage. :param key: The key to be removed. :type key: EncryptionKey + + :return: A Deferred which fires when the key is deleted, or which fails + KeyNotFound if the key was not found on local storage. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ - try: - self._wrapper_map[type(key)].delete_key(key) - except IndexError as e: - leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) + self._assert_supported_key_type(type(key)) + return self._wrapper_map[type(key)].delete_key(key) def put_key(self, key, address): """ - Put C{key} in local storage. + Put key bound to address in local storage. :param key: The key to be stored :type key: EncryptionKey :param address: address for which this key will be active :type address: str - :raises KeyAddressMismatch: if address doesn't match any uid on the key - :raises KeyNotValidUpdate: if a key with the same uid exists and the - new one is not a valid update for it + :return: A Deferred which fires when the key is in the storage, or + which fails with KeyAddressMismatch if address doesn't match + any uid on the key or fails with KeyNotValidUpdate if a key + with the same uid exists and the new one is not a valid update + for it. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ - if address not in key.address: - raise KeyAddressMismatch("UID %s found, but expected %s" - % (str(key.address), address)) + self._assert_supported_key_type(type(key)) - try: - old_key = self._wrapper_map[type(key)].get_key(address, - private=key.private) - except KeyNotFound: - old_key = None - - if key.private or can_upgrade(key, old_key): - try: - self._wrapper_map[type(key)].put_key(key, address) - except IndexError as e: - leap_assert( - False, "Unsupported key type. Error {0!r}".format(e)) - else: - raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s" - % (old_key.key_id, key.key_id)) + if address not in key.address: + return defer.fail( + KeyAddressMismatch("UID %s found, but expected %s" + % (str(key.address), address))) + + def old_key_not_found(failure): + if failure.check(KeyNotFound): + return None + else: + return failure + + def check_upgrade(old_key): + if key.private or can_upgrade(key, old_key): + return self._wrapper_map[type(key)].put_key(key, address) + else: + raise KeyNotValidUpgrade( + "Key %s can not be upgraded by new key %s" + % (old_key.key_id, key.key_id)) + + d = self._wrapper_map[type(key)].get_key(address, + private=key.private) + d.addErrback(old_key_not_found) + d.addCallback(check_upgrade) + return d def put_raw_key(self, key, ktype, address, validation=ValidationLevel.Weak_Chain): """ - Put C{key} in local storage. + Put raw key bound to address in local storage. :param key: The ascii key to be stored :type key: str @@ -566,19 +718,27 @@ class KeyManager(object): (default: 'Weak_Chain') :type validation: ValidationLevel - :raises KeyAddressMismatch: if address doesn't match any uid on the key - :raises KeyNotValidUpdate: if a key with the same uid exists and the - new one is not a valid update for it - """ - pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(key) + :return: A Deferred which fires when the key is in the storage, or + which fails with KeyAddressMismatch if address doesn't match + any uid on the key or fails with KeyNotValidUpdate if a key + with the same uid exists and the new one is not a valid update + for it. + :rtype: Deferred + :raise UnsupportedKeyTypeError: if invalid key type + """ + self._assert_supported_key_type(ktype) + pubkey, privkey = self._wrapper_map[ktype].parse_ascii_key(key) pubkey.validation = validation - self.put_key(pubkey, address) + d = self.put_key(pubkey, address) + if privkey is not None: + d.addCallback(lambda _: self.put_key(privkey, address)) + return d def fetch_key(self, address, uri, ktype, validation=ValidationLevel.Weak_Chain): """ - Fetch a public key for C{address} from the network and put it in + Fetch a public key bound to address from the network and put it in local storage. :param address: The email address of the key. @@ -591,22 +751,40 @@ class KeyManager(object): (default: 'Weak_Chain') :type validation: ValidationLevel - :raises KeyNotFound: if not valid key on C{uri} - :raises KeyAddressMismatch: if address doesn't match any uid on the key - :raises KeyNotValidUpdate: if a key with the same uid exists and the - new one is not a valid update for it + :return: A Deferred which fires when the key is in the storage, or + which fails with KeyNotFound: if not valid key on uri or fails + with KeyAddressMismatch if address doesn't match any uid on + the key or fails with KeyNotValidUpdate if a key with the same + uid exists and the new one is not a valid update for it. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type """ + self._assert_supported_key_type(ktype) + res = self._get(uri) if not res.ok: - raise KeyNotFound(uri) + return defer.fail(KeyNotFound(uri)) # XXX parse binary keys pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content) if pubkey is None: - raise KeyNotFound(uri) + return defer.fail(KeyNotFound(uri)) pubkey.validation = validation - self.put_key(pubkey, address) + return self.put_key(pubkey, address) + + def _assert_supported_key_type(self, ktype): + """ + Check if ktype is one of the supported key types + + :param ktype: the type of the key. + :type ktype: subclass of EncryptionKey + + :raise UnsupportedKeyTypeError: if invalid key type + """ + if ktype not in self._wrapper_map: + raise UnsupportedKeyTypeError(str(ktype)) from ._version import get_versions __version__ = get_versions()['version'] diff --git a/src/leap/keymanager/errors.py b/src/leap/keymanager/errors.py index f8965823..8a9fb3c7 100644 --- a/src/leap/keymanager/errors.py +++ b/src/leap/keymanager/errors.py @@ -102,3 +102,9 @@ class KeyNotValidUpgrade(Exception): """ Already existing key can not be upgraded with the new key """ + + +class UnsupportedKeyTypeError(Exception): + """ + Invalid key type + """ diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py index 7c732e35..562c0a9b 100644 --- a/src/leap/keymanager/keys.py +++ b/src/leap/keymanager/keys.py @@ -33,6 +33,7 @@ import time from abc import ABCMeta, abstractmethod from datetime import datetime from leap.common.check import leap_assert +from twisted.internet import defer from leap.keymanager.validation import ValidationLevel, toValidationLevel @@ -212,7 +213,7 @@ class EncryptionKey(object): KEY_EXPIRY_DATE_KEY: expiry_date, KEY_LAST_AUDITED_AT_KEY: last_audited_at, KEY_REFRESHED_AT_KEY: refreshed_at, - KEY_VALIDATION_KEY: str(self.validation), + KEY_VALIDATION_KEY: self.validation.name, KEY_ENCR_USED_KEY: self.encr_used, KEY_SIGN_USED_KEY: self.sign_used, KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG], @@ -277,21 +278,61 @@ class EncryptionScheme(object): """ leap_assert(self._soledad is not None, "Cannot init indexes with null soledad") - # Ask the database for currently existing indexes. - db_indexes = dict(self._soledad.list_indexes()) - # Loop through the indexes we expect to find. - for name, expression in INDEXES.items(): - if name not in db_indexes: - # The index does not yet exist. - self._soledad.create_index(name, *expression) - continue - if expression == db_indexes[name]: - # The index exists and is up to date. - continue - # The index exists but the definition is not what expected, so we - # delete it and add the proper index expression. - self._soledad.delete_index(name) - self._soledad.create_index(name, *expression) + + def init_idexes(indexes): + deferreds = [] + db_indexes = dict(indexes) + # Loop through the indexes we expect to find. + for name, expression in INDEXES.items(): + if name not in db_indexes: + # The index does not yet exist. + d = self._soledad.create_index(name, *expression) + deferreds.append(d) + elif expression != db_indexes[name]: + # The index exists but the definition is not what expected, + # so we delete it and add the proper index expression. + d = self._soledad.delete_index(name) + d.addCallback( + lambda _: + self._soledad.create_index(name, *expression)) + deferreds.append(d) + return defer.gatherResults(deferreds, consumeErrors=True) + + self.deferred_indexes = self._soledad.list_indexes() + self.deferred_indexes.addCallback(init_idexes) + + def _wait_indexes(self, *methods): + """ + Methods that need to wait for the indexes to be ready. + + Heavily based on + http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/ + + :param methods: methods that need to wait for the indexes to be ready + :type methods: tuple(str) + """ + self.waiting = [] + self.stored = {} + + def restore(_): + for method in self.stored: + setattr(self, method, self.stored[method]) + for d in self.waiting: + d.callback(None) + + def makeWrapper(method): + def wrapper(*args, **kw): + d = defer.Deferred() + d.addCallback(lambda _: self.stored[method](*args, **kw)) + self.waiting.append(d) + return d + return wrapper + + for method in methods: + self.stored[method] = getattr(self, method) + setattr(self, method, makeWrapper(method)) + + self.deferred_indexes.addCallback(restore) @abstractmethod def get_key(self, address, private=False): @@ -303,9 +344,10 @@ class EncryptionScheme(object): :param private: Look for a private key instead of a public one? :type private: bool - :return: The key bound to C{address}. - :rtype: EncryptionKey - @raise KeyNotFound: If the key was not found on local storage. + :return: A Deferred which fires with the EncryptionKey bound to + address, or which fails with KeyNotFound if the key was not + found on local storage. + :rtype: Deferred """ pass @@ -318,6 +360,9 @@ class EncryptionScheme(object): :type key: EncryptionKey :param address: address for which this key will be active. :type address: str + + :return: A Deferred which fires when the key is in the storage. + :rtype: Deferred """ pass @@ -341,6 +386,11 @@ class EncryptionScheme(object): :param key: The key to be removed. :type key: EncryptionKey + + :return: A Deferred which fires when the key is deleted, or which + fails with KeyNotFound if the key was not found on local + storage. + :rtype: Deferred """ pass @@ -373,11 +423,10 @@ class EncryptionScheme(object): :param verify: The key used to verify a signature. :type verify: OpenPGPKey - :return: The decrypted data. - :rtype: str + :return: The decrypted data and if signature verifies + :rtype: (unicode, bool) - @raise InvalidSignature: Raised if unable to verify the signature with - C{verify} key. + :raise DecryptError: Raised if failed decrypting for some reason. """ pass diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 3f298f71..794a0ec2 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -28,6 +28,7 @@ import io from datetime import datetime from gnupg import GPG from gnupg.gnupg import GPGUtilities +from twisted.internet import defer from leap.common.check import leap_assert, leap_assert_type, leap_check from leap.keymanager import errors @@ -39,8 +40,6 @@ from leap.keymanager.keys import ( TYPE_ID_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, KEY_ADDRESS_KEY, - KEY_FINGERPRINT_KEY, - KEY_DATA_KEY, KEY_ID_KEY, KEYMANAGER_ACTIVE_TYPE, ) @@ -164,39 +163,6 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(key, key_data): - """ - Build an OpenPGPKey based on C{key} from local gpg storage. - - ASCII armored GPG key data has to be queried independently in this - wrapper, so we receive it in C{key_data}. - - :param key: Key obtained from GPG storage. - :type key: dict - :param key_data: Key data obtained from GPG storage. - :type key_data: str - :return: An instance of the key. - :rtype: OpenPGPKey - """ - expiry_date = None - if key['expires']: - expiry_date = datetime.fromtimestamp(int(key['expires'])) - address = [] - for uid in key['uids']: - address.append(_parse_address(uid)) - - return OpenPGPKey( - address, - key_id=key['keyid'], - fingerprint=key['fingerprint'], - key_data=key_data, - private=True if key['type'] == 'sec' else False, - length=int(key['length']), - expiry_date=expiry_date, - refreshed_at=datetime.now(), - ) - - def _parse_address(address): """ Remove name, '<', '>' and the identity suffix after the '+' until the '@' @@ -222,6 +188,26 @@ class OpenPGPKey(EncryptionKey): Base class for OpenPGP keys. """ + def __init__(self, address, gpgbinary=None, **kwargs): + self._gpgbinary = gpgbinary + super(OpenPGPKey, self).__init__(address, **kwargs) + + @property + def signatures(self): + """ + Get the key signatures + + :return: the key IDs that have signed the key + :rtype: list(str) + """ + with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: + res = gpg.list_sigs(self.key_id) + for uid, sigs in res.sigs.iteritems(): + if _parse_address(uid) in self.address: + return sigs + + return [] + class OpenPGPScheme(EncryptionScheme): """ @@ -243,6 +229,7 @@ class OpenPGPScheme(EncryptionScheme): :type gpgbinary: C{str} """ EncryptionScheme.__init__(self, soledad) + self._wait_indexes("get_key", "put_key") self._gpgbinary = gpgbinary # @@ -255,59 +242,63 @@ class OpenPGPScheme(EncryptionScheme): :param address: The address bound to the key. :type address: str - :return: The key bound to C{address}. - :rtype: OpenPGPKey - @raise KeyAlreadyExists: If key already exists in local database. + + :return: A Deferred which fires with the key bound to address, or fails + with KeyAlreadyExists if key already exists in local database. + :rtype: Deferred """ # make sure the key does not already exist leap_assert(is_address(address), 'Not an user address: %s' % address) - try: - self.get_key(address) - raise errors.KeyAlreadyExists(address) - except errors.KeyNotFound: - logger.debug('Key for %s not found' % (address,)) - with self._temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - params = gpg.gen_key_input( - key_type='RSA', - key_length=4096, - name_real=address, - name_email=address, - name_comment='') - logger.info("About to generate keys... This might take SOME time.") - gpg.gen_key(params) - logger.info("Keys for %s have been successfully " - "generated." % (address,)) - pubkeys = gpg.list_keys() - - # assert for new key characteristics - - # XXX This exception is not properly catched by the soledad - # bootstrapping, so if we do not finish generating the keys - # we end with a blocked thread -- kali - - leap_assert( - len(pubkeys) is 1, # a unitary keyring! - 'Keyring has wrong number of keys: %d.' % len(pubkeys)) - key = gpg.list_keys(secret=True).pop() - leap_assert( - len(key['uids']) is 1, # with just one uid! - 'Wrong number of uids for key: %d.' % len(key['uids'])) - uid_match = False - for uid in key['uids']: - if re.match('.*<%s>$' % address, uid) is not None: - uid_match = True - return - leap_assert(uid_match, 'Key not correctly bound to address.') - # insert both public and private keys in storage - for secret in [True, False]: - key = gpg.list_keys(secret=secret).pop() - openpgp_key = _build_key_from_gpg( - key, gpg.export_keys(key['fingerprint'], secret=secret)) - self.put_key(openpgp_key, address) + def _gen_key(_): + with self._temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator + params = gpg.gen_key_input( + key_type='RSA', + key_length=4096, + name_real=address, + name_email=address, + name_comment='') + logger.info("About to generate keys... " + "This might take SOME time.") + gpg.gen_key(params) + logger.info("Keys for %s have been successfully " + "generated." % (address,)) + pubkeys = gpg.list_keys() + + # assert for new key characteristics + leap_assert( + len(pubkeys) is 1, # a unitary keyring! + 'Keyring has wrong number of keys: %d.' % len(pubkeys)) + key = gpg.list_keys(secret=True).pop() + leap_assert( + len(key['uids']) is 1, # with just one uid! + 'Wrong number of uids for key: %d.' % len(key['uids'])) + uid_match = False + for uid in key['uids']: + if re.match('.*<%s>$' % address, uid) is not None: + uid_match = True + break + leap_assert(uid_match, 'Key not correctly bound to address.') + + # insert both public and private keys in storage + deferreds = [] + for secret in [True, False]: + key = gpg.list_keys(secret=secret).pop() + openpgp_key = self._build_key_from_gpg( + key, + gpg.export_keys(key['fingerprint'], secret=secret)) + d = self.put_key(openpgp_key, address) + deferreds.append(d) + return defer.gatherResults(deferreds) + + def key_already_exists(_): + raise errors.KeyAlreadyExists(address) - return self.get_key(address, private=True) + d = self.get_key(address) + d.addCallbacks(key_already_exists, _gen_key) + d.addCallback(lambda _: self.get_key(address, private=True)) + return d def get_key(self, address, private=False): """ @@ -318,19 +309,26 @@ class OpenPGPScheme(EncryptionScheme): :param private: Look for a private key instead of a public one? :type private: bool - :return: The key bound to C{address}. - :rtype: OpenPGPKey - @raise KeyNotFound: If the key was not found on local storage. + :return: A Deferred which fires with the OpenPGPKey bound to address, + or which fails with KeyNotFound if the key was not found on + local storage. + :rtype: Deferred """ address = _parse_address(address) - doc = self._get_key_doc(address, private) - if doc is None: - raise errors.KeyNotFound(address) - leap_assert( - address in doc.content[KEY_ADDRESS_KEY], - 'Wrong address in key data.') - return build_key_from_dict(OpenPGPKey, doc.content) + def build_key(doc): + if doc is None: + raise errors.KeyNotFound(address) + leap_assert( + address in doc.content[KEY_ADDRESS_KEY], + 'Wrong address in key data.') + key = build_key_from_dict(OpenPGPKey, doc.content) + key._gpgbinary = self._gpgbinary + return key + + d = self._get_key_doc(address, private) + d.addCallback(build_key) + return d def parse_ascii_key(self, key_data): """ @@ -366,7 +364,7 @@ class OpenPGPScheme(EncryptionScheme): openpgp_privkey = None if privkey is not None: # build private key - openpgp_privkey = _build_key_from_gpg( + openpgp_privkey = self._build_key_from_gpg( privkey, gpg.export_keys(privkey['fingerprint'], secret=True)) leap_check(pubkey['fingerprint'] == privkey['fingerprint'], @@ -374,7 +372,7 @@ class OpenPGPScheme(EncryptionScheme): errors.KeyFingerprintMismatch) # build public key - openpgp_pubkey = _build_key_from_gpg( + openpgp_pubkey = self._build_key_from_gpg( pubkey, gpg.export_keys(pubkey['fingerprint'], secret=False)) @@ -388,6 +386,9 @@ class OpenPGPScheme(EncryptionScheme): :type key_data: str or unicode :param address: address for which this key will be active :type address: str + + :return: A Deferred which fires when the OpenPGPKey is in the storage. + :rtype: Deferred """ leap_assert_type(key_data, (str, unicode)) @@ -395,12 +396,17 @@ class OpenPGPScheme(EncryptionScheme): try: openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data) except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e: - leap_assert(False, repr(e)) + return defer.fail(e) + def put_key(_, key): + return self.put_key(key, address) + + d = defer.succeed(None) if openpgp_pubkey is not None: - self.put_key(openpgp_pubkey, address) + d.addCallback(put_key, openpgp_pubkey) if openpgp_privkey is not None: - self.put_key(openpgp_privkey, address) + d.addCallback(put_key, openpgp_privkey) + return d def put_key(self, key, address): """ @@ -410,42 +416,66 @@ class OpenPGPScheme(EncryptionScheme): :type key: OpenPGPKey :param address: address for which this key will be active. :type address: str + + :return: A Deferred which fires when the key is in the storage. + :rtype: Deferred """ - self._put_key_doc(key) - self._put_active_doc(key, address) + d = self._put_key_doc(key) + d.addCallback(lambda _: self._put_active_doc(key, address)) + return d def _put_key_doc(self, key): """ Put key document in soledad :type key: OpenPGPKey - """ - docs = self._soledad.get_from_index( + :rtype: Deferred + """ + def check_and_put(docs, key): + if len(docs) == 1: + doc = docs.pop() + oldkey = build_key_from_dict(OpenPGPKey, doc.content) + if key.fingerprint == oldkey.fingerprint: + # in case of an update of the key merge them with gnupg + with self._temporary_gpgwrapper() as gpg: + gpg.import_keys(oldkey.key_data) + gpg.import_keys(key.key_data) + gpgkey = gpg.list_keys(secret=key.private).pop() + mergedkey = self._build_key_from_gpg( + gpgkey, + gpg.export_keys(gpgkey['fingerprint'], + secret=key.private)) + mergedkey.validation = max( + [key.validation, oldkey.validation]) + mergedkey.last_audited_at = oldkey.last_audited_at + mergedkey.refreshed_at = key.refreshed_at + mergedkey.encr_used = key.encr_used or oldkey.encr_used + mergedkey.sign_used = key.sign_used or oldkey.sign_used + doc.set_json(mergedkey.get_json()) + d = self._soledad.put_doc(doc) + else: + logger.critical( + "Can't put a key whith the same key_id and different " + "fingerprint: %s, %s" + % (key.fingerprint, oldkey.fingerprint)) + d = defer.fail( + errors.KeyFingerprintMismatch(key.fingerprint)) + elif len(docs) > 1: + logger.critical( + "There is more than one key with the same key_id %s" + % (key.key_id,)) + d = defer.fail(errors.KeyAttributesDiffer(key.key_id)) + else: + d = self._soledad.create_doc_from_json(key.get_json()) + return d + + d = self._soledad.get_from_index( TYPE_ID_PRIVATE_INDEX, self.KEY_TYPE, key.key_id, '1' if key.private else '0') - if len(docs) != 0: - doc = docs.pop() - if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]: - # in case of an update of the key merge them with gnupg - with self._temporary_gpgwrapper() as gpg: - gpg.import_keys(doc.content[KEY_DATA_KEY]) - gpg.import_keys(key.key_data) - gpgkey = gpg.list_keys(secret=key.private).pop() - key = _build_key_from_gpg( - gpgkey, - gpg.export_keys(gpgkey['fingerprint'], - secret=key.private)) - doc.set_json(key.get_json()) - self._soledad.put_doc(doc) - else: - logger.critical( - "Can't put a key whith the same key_id and different " - "fingerprint: %s, %s" - % (key.fingerprint, doc.content[KEY_FINGERPRINT_KEY])) - else: - self._soledad.create_doc_from_json(key.get_json()) + d.addCallback(check_and_put, key) + return d def _put_active_doc(self, key, address): """ @@ -453,24 +483,37 @@ class OpenPGPScheme(EncryptionScheme): :type key: OpenPGPKey :type addresses: str + :rtype: Deferred """ - docs = self._soledad.get_from_index( + def check_and_put(docs): + if len(docs) == 1: + doc = docs.pop() + doc.set_json(key.get_active_json(address)) + d = self._soledad.put_doc(doc) + else: + if len(docs) > 1: + logger.error("There is more than one active key document " + "for the address %s" % (address,)) + deferreds = [] + for doc in docs: + delete = self._soledad.delete_doc(doc) + deferreds.append(delete) + d = defer.gatherResults(deferreds, consumeErrors=True) + else: + d = defer.succeed(None) + + d.addCallback( + lambda _: self._soledad.create_doc_from_json( + key.get_active_json(address))) + return d + + d = self._soledad.get_from_index( TYPE_ADDRESS_PRIVATE_INDEX, self.ACTIVE_TYPE, address, '1' if key.private else '0') - if len(docs) == 1: - doc = docs.pop() - doc.set_json(key.get_active_json(address)) - self._soledad.put_doc(doc) - else: - if len(docs) > 1: - logger.error("There is more than one active key document " - "for the address %s" % (address,)) - for doc in docs: - self._soledad.delete_doc(doc) - self._soledad.create_doc_from_json( - key.get_active_json(address)) + d.addCallback(check_and_put) + return d def _get_key_doc(self, address, private=False): """ @@ -482,69 +525,128 @@ class OpenPGPScheme(EncryptionScheme): :type address: str :param private: Whether to look for a private key. :type private: bool - :return: The document with the key or None if it does not exist. - :rtype: leap.soledad.document.SoledadDocument + + :return: A Deferred which fires with the SoledadDocument with the key + or None if it does not exist. + :rtype: Deferred """ - activedoc = self._soledad.get_from_index( + def get_key_from_active_doc(activedoc): + if len(activedoc) is 0: + return None + leap_assert( + len(activedoc) is 1, + 'Found more than one key for address %s!' % (address,)) + + key_id = activedoc[0].content[KEY_ID_KEY] + d = self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key_id, + '1' if private else '0') + d.addCallback(get_doc, key_id) + return d + + def get_doc(doclist, key_id): + leap_assert( + len(doclist) is 1, + 'There is %d keys for id %s!' % (len(doclist), key_id)) + return doclist.pop() + + d = self._soledad.get_from_index( TYPE_ADDRESS_PRIVATE_INDEX, self.ACTIVE_TYPE, address, '1' if private else '0') - if len(activedoc) is 0: - return None - leap_assert( - len(activedoc) is 1, - 'Found more than one key for address %s!' % (address,)) + d.addCallback(get_key_from_active_doc) + return d - key_id = activedoc[0].content[KEY_ID_KEY] - doclist = self._soledad.get_from_index( - TYPE_ID_PRIVATE_INDEX, - self.KEY_TYPE, - key_id, - '1' if private else '0') - leap_assert( - len(doclist) is 1, - 'There is %d keys for id %s!' % (len(doclist), key_id)) - return doclist.pop() + def _build_key_from_gpg(self, key, key_data): + """ + Build an OpenPGPKey for C{address} based on C{key} from + local gpg storage. + + ASCII armored GPG key data has to be queried independently in this + wrapper, so we receive it in C{key_data}. + + :param key: Key obtained from GPG storage. + :type key: dict + :param key_data: Key data obtained from GPG storage. + :type key_data: str + :return: An instance of the key. + :rtype: OpenPGPKey + """ + expiry_date = None + if key['expires']: + expiry_date = datetime.fromtimestamp(int(key['expires'])) + address = [] + for uid in key['uids']: + address.append(_parse_address(uid)) + + return OpenPGPKey( + address, + gpgbinary=self._gpgbinary, + key_id=key['keyid'], + fingerprint=key['fingerprint'], + key_data=key_data, + private=True if key['type'] == 'sec' else False, + length=int(key['length']), + expiry_date=expiry_date, + refreshed_at=datetime.now(), + ) def delete_key(self, key): """ Remove C{key} from storage. - May raise: - errors.KeyNotFound - :param key: The key to be removed. :type key: EncryptionKey + + :return: A Deferred which fires when the key is deleted, or which + fails with KeyNotFound if the key was not found on local + storage. + :rtype: Deferred """ leap_assert_type(key, OpenPGPKey) - activedocs = self._soledad.get_from_index( - TYPE_ID_PRIVATE_INDEX, - self.ACTIVE_TYPE, - key.key_id, - '1' if key.private else '0') - for doc in activedocs: - self._soledad.delete_doc(doc) - docs = self._soledad.get_from_index( + def delete_docs(activedocs): + deferreds = [] + for doc in activedocs: + d = self._soledad.delete_doc(doc) + deferreds.append(d) + return defer.gatherResults(deferreds) + + def get_key_docs(_): + return self._soledad.get_from_index( + TYPE_ID_PRIVATE_INDEX, + self.KEY_TYPE, + key.key_id, + '1' if key.private else '0') + + def delete_key(docs): + if len(docs) == 0: + raise errors.KeyNotFound(key) + if len(docs) > 1: + logger.critical("There is more than one key for key_id %s" + % key.key_id) + + doc = None + for d in docs: + if d.content['fingerprint'] == key.fingerprint: + doc = d + break + if doc is None: + raise errors.KeyNotFound(key) + return self._soledad.delete_doc(doc) + + d = self._soledad.get_from_index( TYPE_ID_PRIVATE_INDEX, - self.KEY_TYPE, + self.ACTIVE_TYPE, key.key_id, '1' if key.private else '0') - if len(docs) == 0: - raise errors.KeyNotFound(key) - if len(docs) > 1: - logger.critical("There is more than one key for key_id %s" - % key.key_id) - - doc = None - for d in docs: - if d.content['fingerprint'] == key.fingerprint: - doc = d - break - if doc is None: - raise errors.KeyNotFound(key) - self._soledad.delete_doc(doc) + d.addCallback(delete_docs) + d.addCallback(get_key_docs) + d.addCallback(delete_key) + return d # # Data encryption, decryption, signing and verifying @@ -640,12 +742,10 @@ class OpenPGPScheme(EncryptionScheme): :param verify: The key used to verify a signature. :type verify: OpenPGPKey - :return: The decrypted data. - :rtype: unicode + :return: The decrypted data and if signature verifies + :rtype: (unicode, bool) :raise DecryptError: Raised if failed decrypting for some reason. - :raise InvalidSignature: Raised if unable to verify the signature with - C{verify} key. """ leap_assert(privkey.private is True, 'Key is not private.') keys = [privkey] @@ -658,15 +758,15 @@ class OpenPGPScheme(EncryptionScheme): result = gpg.decrypt( data, passphrase=passphrase, always_trust=True) self._assert_gpg_result_ok(result) + # verify signature - if (verify is not None): - if result.valid is False or \ - verify.fingerprint != result.pubkey_fingerprint: - raise errors.InvalidSignature( - 'Failed to verify signature with key %s: %s' % - (verify.key_id, result.stderr)) + sign_valid = False + if (verify is not None and + result.valid is True and + verify.fingerprint == result.pubkey_fingerprint): + sign_valid = True - return result.data + return (result.data, sign_valid) except errors.GPGError as e: logger.error('Failed to decrypt: %s.' % str(e)) raise errors.DecryptError(str(e)) @@ -764,9 +864,4 @@ class OpenPGPScheme(EncryptionScheme): valid = result.valid rfprint = result.fingerprint kfprint = gpgpubkey['fingerprint'] - # raise in case sig is invalid - if valid is False or rfprint != kfprint: - raise errors.InvalidSignature( - 'Failed to verify signature ' - 'with key %s.' % gpgpubkey['keyid']) - return True + return valid and rfprint == kfprint diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py index 1ea33b57..7128d20d 100644 --- a/src/leap/keymanager/tests/__init__.py +++ b/src/leap/keymanager/tests/__init__.py @@ -18,37 +18,27 @@ Base classes for the Key Manager tests. """ -from mock import Mock +import distutils.spawn +import os.path + +from twisted.internet.defer import gatherResults +from twisted.trial import unittest from leap.common.testing.basetest import BaseLeapTest from leap.soledad.client import Soledad from leap.keymanager import KeyManager +from leap.keymanager.openpgp import OpenPGPKey ADDRESS = 'leap@leap.se' -# XXX discover the gpg binary path -GPG_BINARY_PATH = '/usr/bin/gpg' +ADDRESS_2 = 'anotheruser@leap.se' -class KeyManagerWithSoledadTestCase(BaseLeapTest): +class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): def setUp(self): - # mock key fetching and storing so Soledad doesn't fail when trying to - # reach the server. - Soledad._get_secrets_from_shared_db = Mock(return_value=None) - Soledad._put_secrets_in_shared_db = Mock(return_value=None) - - class MockSharedDB(object): - - get_doc = Mock(return_value=None) - put_doc = Mock() - lock = Mock(return_value=('atoken', 300)) - unlock = Mock(return_value=True) - - def __call__(self): - return self - - Soledad._shared_db = MockSharedDB() + self.setUpEnv() + self.gpg_binary_path = self._find_gpg() self._soledad = Soledad( u"leap@leap.se", @@ -58,18 +48,43 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest): server_url='', cert_file=None, auth_token=None, + syncable=False ) def tearDown(self): km = self._key_manager() - for key in km.get_all_keys(): - km._wrapper_map[key.__class__].delete_key(key) - for key in km.get_all_keys(private=True): - km._wrapper_map[key.__class__].delete_key(key) + + def delete_keys(keys): + deferreds = [] + for key in keys: + d = km._wrapper_map[key.__class__].delete_key(key) + deferreds.append(d) + return gatherResults(deferreds) + + def get_and_delete_keys(_): + deferreds = [] + for private in [True, False]: + d = km.get_all_keys(private=private) + d.addCallback(delete_keys) + deferreds.append(d) + return gatherResults(deferreds) + + # wait for the indexes to be ready for the tear down + d = km._wrapper_map[OpenPGPKey].deferred_indexes + d.addCallback(get_and_delete_keys) + d.addCallback(lambda _: self.tearDownEnv()) + return d def _key_manager(self, user=ADDRESS, url='', token=None): return KeyManager(user, url, self._soledad, token=token, - gpgbinary=GPG_BINARY_PATH) + gpgbinary=self.gpg_binary_path) + + def _find_gpg(self): + gpg_path = distutils.spawn.find_executable('gpg') + if gpg_path is not None: + return os.path.realpath(gpg_path) + else: + return "/usr/bin/gpg" # key 24D18DDF: public key "Leap Test Key <leap@leap.se>" @@ -234,3 +249,62 @@ RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc= =JTFu -----END PGP PRIVATE KEY BLOCK----- """ + +# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>" +PUBLIC_KEY_2 = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR +gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq +Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0 +IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle +AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E +gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw +ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4 +JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz +VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt +Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63 +yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ +f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X +Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck +I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ= +=Thdu +-----END PGP PUBLIC KEY BLOCK----- +""" + +PRIVATE_KEY_2 = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD +kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1 +6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB +AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8 +H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks +7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X +C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje +uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty +GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI +1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v +dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG +CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh +8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD +izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT +oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL +juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw +cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe +94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC +rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx +77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2 +3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF +UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO +2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB +/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE +JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda +z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk +o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6 +THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0 +=a5gs +-----END PGP PRIVATE KEY BLOCK----- +""" diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 319d2e11..93bc42c7 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -23,12 +23,13 @@ Tests for the Key Manager. from datetime import datetime from mock import Mock -from leap.common.testing.basetest import BaseLeapTest +from twisted.internet.defer import inlineCallbacks +from twisted.trial import unittest + from leap.keymanager import ( - openpgp, KeyNotFound, KeyAddressMismatch, - errors, + errors ) from leap.keymanager.openpgp import OpenPGPKey from leap.keymanager.keys import ( @@ -42,23 +43,16 @@ from leap.keymanager.validation import ( from leap.keymanager.tests import ( KeyManagerWithSoledadTestCase, ADDRESS, + ADDRESS_2, KEY_FINGERPRINT, PUBLIC_KEY, + PUBLIC_KEY_2, PRIVATE_KEY, - GPG_BINARY_PATH + PRIVATE_KEY_2, ) -ADDRESS_2 = 'anotheruser@leap.se' - - -class KeyManagerUtilTestCase(BaseLeapTest): - - def setUp(self): - pass - - def tearDown(self): - pass +class KeyManagerUtilTestCase(unittest.TestCase): def test_is_address(self): self.assertTrue( @@ -85,7 +79,7 @@ class KeyManagerUtilTestCase(BaseLeapTest): 'expiry_date': 0, 'last_audited_at': 0, 'refreshed_at': 1311239602, - 'validation': str(ValidationLevel.Weak_Chain), + 'validation': ValidationLevel.Weak_Chain.name, 'encr_used': False, 'sign_used': True, } @@ -128,224 +122,43 @@ class KeyManagerUtilTestCase(BaseLeapTest): 'Wrong data in key.') -class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): - - def _test_openpgp_gen_key(self): - pgp = openpgp.OpenPGPScheme(self._soledad) - self.assertRaises(KeyNotFound, pgp.get_key, 'user@leap.se') - key = pgp.gen_key('user@leap.se') - self.assertIsInstance(key, openpgp.OpenPGPKey) - self.assertEqual( - ['user@leap.se'], key.address, 'Wrong address bound to key.') - self.assertEqual( - 4096, key.length, 'Wrong key length.') - - def test_openpgp_put_delete_key(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - key = pgp.get_key(ADDRESS, private=False) - pgp.delete_key(key) - self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - - def test_openpgp_put_ascii_key(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - key = pgp.get_key(ADDRESS, private=False) - self.assertIsInstance(key, openpgp.OpenPGPKey) - self.assertTrue( - ADDRESS in key.address, 'Wrong address bound to key.') - self.assertEqual( - 4096, key.length, 'Wrong key length.') - pgp.delete_key(key) - self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - - def test_get_public_key(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - self.assertRaises( - KeyNotFound, pgp.get_key, ADDRESS, private=True) - key = pgp.get_key(ADDRESS, private=False) - self.assertTrue(ADDRESS in key.address) - self.assertFalse(key.private) - self.assertEqual(KEY_FINGERPRINT, key.fingerprint) - pgp.delete_key(key) - self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - - def test_openpgp_encrypt_decrypt(self): - # encrypt - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - pubkey = pgp.get_key(ADDRESS, private=False) - cyphertext = pgp.encrypt('data', pubkey) - # assert - self.assertTrue(cyphertext is not None) - self.assertTrue(cyphertext != '') - self.assertTrue(cyphertext != 'data') - self.assertTrue(pgp.is_encrypted(cyphertext)) - self.assertTrue(pgp.is_encrypted(cyphertext)) - # decrypt - self.assertRaises( - KeyNotFound, pgp.get_key, ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - privkey = pgp.get_key(ADDRESS, private=True) - pgp.delete_key(pubkey) - pgp.delete_key(privkey) - self.assertRaises( - KeyNotFound, pgp.get_key, ADDRESS, private=False) - self.assertRaises( - KeyNotFound, pgp.get_key, ADDRESS, private=True) - - def test_verify_with_private_raises(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - signed = pgp.sign(data, privkey) - self.assertRaises( - AssertionError, - pgp.verify, signed, privkey) - - def test_sign_with_public_raises(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - data = 'data' - pubkey = pgp.get_key(ADDRESS, private=False) - self.assertRaises( - AssertionError, - pgp.sign, data, pubkey) - - def test_verify_with_wrong_key_raises(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - signed = pgp.sign(data, privkey) - pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) - wrongkey = pgp.get_key(ADDRESS_2) - self.assertRaises( - errors.InvalidSignature, - pgp.verify, signed, wrongkey) - - def test_encrypt_sign_with_public_raises(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - pubkey = pgp.get_key(ADDRESS, private=False) - self.assertRaises( - AssertionError, - pgp.encrypt, data, privkey, sign=pubkey) - - def test_decrypt_verify_with_private_raises(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - pubkey = pgp.get_key(ADDRESS, private=False) - encrypted_and_signed = pgp.encrypt( - data, pubkey, sign=privkey) - self.assertRaises( - AssertionError, - pgp.decrypt, - encrypted_and_signed, privkey, verify=privkey) - - def test_decrypt_verify_with_wrong_key_raises(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - pubkey = pgp.get_key(ADDRESS, private=False) - encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) - pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) - wrongkey = pgp.get_key(ADDRESS_2) - self.assertRaises( - errors.InvalidSignature, - pgp.verify, encrypted_and_signed, wrongkey) - - def test_sign_verify(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - signed = pgp.sign(data, privkey, detach=False) - pubkey = pgp.get_key(ADDRESS, private=False) - self.assertTrue(pgp.verify(signed, pubkey)) - - def test_encrypt_sign_decrypt_verify(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - pubkey = pgp.get_key(ADDRESS, private=False) - privkey = pgp.get_key(ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2) - pubkey2 = pgp.get_key(ADDRESS_2, private=False) - privkey2 = pgp.get_key(ADDRESS_2, private=True) - data = 'data' - encrypted_and_signed = pgp.encrypt( - data, pubkey2, sign=privkey) - res = pgp.decrypt( - encrypted_and_signed, privkey2, verify=pubkey) - self.assertTrue(data, res) - - def test_sign_verify_detached_sig(self): - pgp = openpgp.OpenPGPScheme( - self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) - data = 'data' - privkey = pgp.get_key(ADDRESS, private=True) - signature = pgp.sign(data, privkey, detach=True) - pubkey = pgp.get_key(ADDRESS, private=False) - self.assertTrue(pgp.verify(data, pubkey, detached_sig=signature)) - - class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): + @inlineCallbacks def test_get_all_keys_in_db(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public keys - keys = km.get_all_keys(False) + keys = yield km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') self.assertTrue(ADDRESS in keys[0].address) self.assertFalse(keys[0].private) # get private keys - keys = km.get_all_keys(True) + keys = yield km.get_all_keys(True) self.assertEqual(len(keys), 1, 'Wrong number of keys') self.assertTrue(ADDRESS in keys[0].address) self.assertTrue(keys[0].private) + @inlineCallbacks def test_get_public_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key - key = km.get_key(ADDRESS, OpenPGPKey, private=False, - fetch_remote=False) + key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, + fetch_remote=False) self.assertTrue(key is not None) self.assertTrue(ADDRESS in key.address) self.assertEqual( key.fingerprint.lower(), KEY_FINGERPRINT.lower()) self.assertFalse(key.private) + @inlineCallbacks def test_get_private_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key - key = km.get_key(ADDRESS, OpenPGPKey, private=True, - fetch_remote=False) + key = yield km.get_key(ADDRESS, OpenPGPKey, private=True, + fetch_remote=False) self.assertTrue(key is not None) self.assertTrue(ADDRESS in key.address) self.assertEqual( @@ -354,17 +167,17 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_send_key_raises_key_not_found(self): km = self._key_manager() - self.assertRaises( - KeyNotFound, - km.send_key, OpenPGPKey) + d = km.send_key(OpenPGPKey) + return self.assertFailure(d, KeyNotFound) + @inlineCallbacks def test_send_key(self): """ Test that request is well formed when sending keys to server. """ token = "mytoken" km = self._key_manager(token=token) - km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS) km._fetcher.put = Mock() # the following data will be used on the send km.ca_cert_path = 'capath' @@ -372,10 +185,11 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km.uid = 'myuid' km.api_uri = 'apiuri' km.api_version = 'apiver' - km.send_key(OpenPGPKey) + yield km.send_key(OpenPGPKey) # setup expected args + pubkey = yield km.get_key(km._address, OpenPGPKey) data = { - km.PUBKEY_KEY: km.get_key(km._address, OpenPGPKey).key_data, + km.PUBKEY_KEY: pubkey.key_data, } url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid') km._fetcher.put.assert_called_once_with( @@ -383,7 +197,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): headers={'Authorization': 'Token token=%s' % token}, ) - def test__fetch_keys_from_server(self): + def test_fetch_keys_from_server(self): """ Test that the request is well formed when fetching keys from server. """ @@ -403,15 +217,19 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._fetcher.get = Mock( return_value=Response()) km.ca_cert_path = 'cacertpath' - # do the fetch - km._fetch_keys_from_server(ADDRESS_2) - # and verify the call - km._fetcher.get.assert_called_once_with( - 'http://nickserver.domain', - data={'address': ADDRESS_2}, - verify='cacertpath', - ) + def verify_the_call(_): + km._fetcher.get.assert_called_once_with( + 'http://nickserver.domain', + data={'address': ADDRESS_2}, + verify='cacertpath', + ) + + d = km._fetch_keys_from_server(ADDRESS_2) + d.addCallback(verify_the_call) + return d + + @inlineCallbacks def test_get_key_fetches_from_server(self): """ Test that getting a key successfuly fetches from server. @@ -432,26 +250,26 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._fetcher.get = Mock(return_value=Response()) km.ca_cert_path = 'cacertpath' # try to key get without fetching from server - self.assertRaises( - KeyNotFound, km.get_key, ADDRESS, OpenPGPKey, - fetch_remote=False - ) + d = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield self.assertFailure(d, KeyNotFound) # try to get key fetching from server. - key = km.get_key(ADDRESS, OpenPGPKey) + key = yield km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.address) + @inlineCallbacks def test_put_key_ascii(self): """ Test that putting ascii key works """ km = self._key_manager(url='http://nickserver.domain') - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - key = km.get_key(ADDRESS, OpenPGPKey) + yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + key = yield km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.address) + @inlineCallbacks def test_fetch_uri_ascii_key(self): """ Test that fetch key downloads the ascii key and gets included in @@ -466,8 +284,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._fetcher.get = Mock(return_value=Response()) km.ca_cert_path = 'cacertpath' - km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) - key = km.get_key(ADDRESS, OpenPGPKey) + yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) + key = yield km.get_key(ADDRESS, OpenPGPKey) self.assertEqual(KEY_FINGERPRINT, key.fingerprint) def test_fetch_uri_empty_key(self): @@ -482,8 +300,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._fetcher.get = Mock(return_value=Response()) km.ca_cert_path = 'cacertpath' - self.assertRaises(KeyNotFound, km.fetch_key, - ADDRESS, "http://site.domain/key", OpenPGPKey) + d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey) + return self.assertFailure(d, KeyNotFound) def test_fetch_uri_address_differ(self): """ @@ -498,108 +316,74 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): km._fetcher.get = Mock(return_value=Response()) km.ca_cert_path = 'cacertpath' - self.assertRaises(KeyAddressMismatch, km.fetch_key, - ADDRESS_2, "http://site.domain/key", OpenPGPKey) + d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey) + return self.assertFailure(d, KeyAddressMismatch) class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): RAW_DATA = 'data' + @inlineCallbacks def test_keymanager_openpgp_encrypt_decrypt(self): km = self._key_manager() # put raw private key - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) - # get public key - pubkey = km.get_key( - ADDRESS, OpenPGPKey, private=False, fetch_remote=False) + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key( + PRIVATE_KEY_2, ADDRESS_2) # encrypt - encdata = km.encrypt(self.RAW_DATA, pubkey) + encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey, + sign=ADDRESS_2, fetch_remote=False) self.assertNotEqual(self.RAW_DATA, encdata) - # get private key - privkey = km.get_key( - ADDRESS, OpenPGPKey, private=True, fetch_remote=False) # decrypt - rawdata = km.decrypt(encdata, privkey) + rawdata, signingkey = yield km.decrypt( + encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False) self.assertEqual(self.RAW_DATA, rawdata) + key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False, + fetch_remote=False) + self.assertEqual(signingkey.fingerprint, key.fingerprint) + @inlineCallbacks + def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self): + km = self._key_manager() + # put raw keys + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + yield km._wrapper_map[OpenPGPKey].put_ascii_key( + PRIVATE_KEY_2, ADDRESS_2) + # encrypt + encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey, + sign=ADDRESS_2, fetch_remote=False) + self.assertNotEqual(self.RAW_DATA, encdata) + # verify + rawdata, signingkey = yield km.decrypt( + encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False) + self.assertEqual(self.RAW_DATA, rawdata) + self.assertTrue(isinstance(signingkey, errors.InvalidSignature)) + + @inlineCallbacks def test_keymanager_openpgp_sign_verify(self): km = self._key_manager() # put raw private keys - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) - # get private key for signing - privkey = km.get_key( - ADDRESS, OpenPGPKey, private=True, fetch_remote=False) - # encrypt - signdata = km.sign(self.RAW_DATA, privkey, detach=False) + yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey, + detach=False) self.assertNotEqual(self.RAW_DATA, signdata) - # get public key for verifying - pubkey = km.get_key( - ADDRESS, OpenPGPKey, private=False, fetch_remote=False) - # decrypt - self.assertTrue(km.verify(signdata, pubkey)) - - -# Key material for testing - -# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>" -PUBLIC_KEY_2 = """ ------BEGIN PGP PUBLIC KEY BLOCK----- -Version: GnuPG v1.4.10 (GNU/Linux) - -mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR -gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq -Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0 -IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle -AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E -gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw -ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4 -JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz -VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt -Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63 -yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ -f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X -Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck -I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ= -=Thdu ------END PGP PUBLIC KEY BLOCK----- -""" + # verify + signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey, + fetch_remote=False) + key = yield km.get_key(ADDRESS, OpenPGPKey, private=False, + fetch_remote=False) + self.assertEqual(signingkey.fingerprint, key.fingerprint) + + def test_keymanager_encrypt_key_not_found(self): + km = self._key_manager() + d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) + d.addCallback( + lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey, + sign=ADDRESS, fetch_remote=False)) + return self.assertFailure(d, KeyNotFound) + -PRIVATE_KEY_2 = """ ------BEGIN PGP PRIVATE KEY BLOCK----- -Version: GnuPG v1.4.10 (GNU/Linux) - -lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD -kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1 -6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB -AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8 -H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks -7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X -C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje -uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty -GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI -1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v -dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG -CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh -8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD -izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT -oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL -juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw -cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe -94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC -rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx -77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2 -3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF -UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO -2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB -/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE -JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda -z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk -o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6 -THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0 -=a5gs ------END PGP PRIVATE KEY BLOCK----- -""" import unittest if __name__ == "__main__": unittest.main() diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py new file mode 100644 index 00000000..5f85c74b --- /dev/null +++ b/src/leap/keymanager/tests/test_openpgp.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +# test_keymanager.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for the OpenPGP support on Key Manager. +""" + + +from twisted.internet.defer import inlineCallbacks + +from leap.keymanager import ( + KeyNotFound, + openpgp, +) +from leap.keymanager.openpgp import OpenPGPKey +from leap.keymanager.tests import ( + KeyManagerWithSoledadTestCase, + ADDRESS, + ADDRESS_2, + KEY_FINGERPRINT, + PUBLIC_KEY, + PUBLIC_KEY_2, + PRIVATE_KEY, + PRIVATE_KEY_2, +) + + +class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): + + # set the trial timeout to 20min, needed by the key generation test + timeout = 1200 + + @inlineCallbacks + def _test_openpgp_gen_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, 'user@leap.se') + key = yield pgp.gen_key('user@leap.se') + self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertEqual( + ['user@leap.se'], key.address, 'Wrong address bound to key.') + self.assertEqual( + 4096, key.length, 'Wrong key length.') + + @inlineCallbacks + def test_openpgp_put_delete_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, ADDRESS) + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + key = yield pgp.get_key(ADDRESS, private=False) + yield pgp.delete_key(key) + yield self._assert_key_not_found(pgp, ADDRESS) + + @inlineCallbacks + def test_openpgp_put_ascii_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, ADDRESS) + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + key = yield pgp.get_key(ADDRESS, private=False) + self.assertIsInstance(key, openpgp.OpenPGPKey) + self.assertTrue( + ADDRESS in key.address, 'Wrong address bound to key.') + self.assertEqual( + 4096, key.length, 'Wrong key length.') + yield pgp.delete_key(key) + yield self._assert_key_not_found(pgp, ADDRESS) + + @inlineCallbacks + def test_get_public_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, ADDRESS) + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + yield self._assert_key_not_found(pgp, ADDRESS, private=True) + key = yield pgp.get_key(ADDRESS, private=False) + self.assertTrue(ADDRESS in key.address) + self.assertFalse(key.private) + self.assertEqual(KEY_FINGERPRINT, key.fingerprint) + yield pgp.delete_key(key) + yield self._assert_key_not_found(pgp, ADDRESS) + + @inlineCallbacks + def test_openpgp_encrypt_decrypt(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + # encrypt + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + pubkey = yield pgp.get_key(ADDRESS, private=False) + cyphertext = pgp.encrypt(data, pubkey) + + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != data) + self.assertTrue(pgp.is_encrypted(cyphertext)) + self.assertTrue(pgp.is_encrypted(cyphertext)) + + # decrypt + yield self._assert_key_not_found(pgp, ADDRESS, private=True) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + decrypted, _ = pgp.decrypt(cyphertext, privkey) + self.assertEqual(decrypted, data) + + yield pgp.delete_key(pubkey) + yield pgp.delete_key(privkey) + yield self._assert_key_not_found(pgp, ADDRESS, private=False) + yield self._assert_key_not_found(pgp, ADDRESS, private=True) + + @inlineCallbacks + def test_verify_with_private_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signed = pgp.sign(data, privkey) + self.assertRaises( + AssertionError, + pgp.verify, signed, privkey) + + @inlineCallbacks + def test_sign_with_public_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + self.assertRaises( + AssertionError, + pgp.sign, data, ADDRESS, OpenPGPKey) + + @inlineCallbacks + def test_verify_with_wrong_key_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signed = pgp.sign(data, privkey) + yield pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) + wrongkey = yield pgp.get_key(ADDRESS_2) + self.assertFalse(pgp.verify(signed, wrongkey)) + + @inlineCallbacks + def test_encrypt_sign_with_public_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + self.assertRaises( + AssertionError, + pgp.encrypt, data, privkey, sign=pubkey) + + @inlineCallbacks + def test_decrypt_verify_with_private_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + encrypted_and_signed = pgp.encrypt( + data, pubkey, sign=privkey) + self.assertRaises( + AssertionError, + pgp.decrypt, + encrypted_and_signed, privkey, verify=privkey) + + @inlineCallbacks + def test_decrypt_verify_with_wrong_key(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) + yield pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) + wrongkey = yield pgp.get_key(ADDRESS_2) + decrypted, validsign = pgp.decrypt(encrypted_and_signed, privkey, + verify=wrongkey) + self.assertEqual(decrypted, data) + self.assertFalse(validsign) + + @inlineCallbacks + def test_sign_verify(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signed = pgp.sign(data, privkey, detach=False) + pubkey = yield pgp.get_key(ADDRESS, private=False) + validsign = pgp.verify(signed, pubkey) + self.assertTrue(validsign) + + @inlineCallbacks + def test_encrypt_sign_decrypt_verify(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + pubkey = yield pgp.get_key(ADDRESS, private=False) + privkey = yield pgp.get_key(ADDRESS, private=True) + + yield pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2) + pubkey2 = yield pgp.get_key(ADDRESS_2, private=False) + privkey2 = yield pgp.get_key(ADDRESS_2, private=True) + + data = 'data' + encrypted_and_signed = pgp.encrypt( + data, pubkey2, sign=privkey) + res, validsign = pgp.decrypt( + encrypted_and_signed, privkey2, verify=pubkey) + self.assertEqual(data, res) + self.assertTrue(validsign) + + @inlineCallbacks + def test_sign_verify_detached_sig(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signature = yield pgp.sign(data, privkey, detach=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + validsign = pgp.verify(data, pubkey, detached_sig=signature) + self.assertTrue(validsign) + + def _assert_key_not_found(self, pgp, address, private=False): + d = pgp.get_key(address, private=private) + return self.assertFailure(d, KeyNotFound) diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py index 400d36e8..15e7d27d 100644 --- a/src/leap/keymanager/tests/test_validation.py +++ b/src/leap/keymanager/tests/test_validation.py @@ -19,6 +19,7 @@ Tests for the Validation Levels """ from datetime import datetime +from twisted.internet.defer import inlineCallbacks from leap.keymanager.openpgp import OpenPGPKey from leap.keymanager.errors import ( @@ -35,53 +36,95 @@ from leap.keymanager.validation import ValidationLevel class ValidationLevelTestCase(KeyManagerWithSoledadTestCase): + @inlineCallbacks def test_none_old_key(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, KEY_FINGERPRINT) + @inlineCallbacks def test_cant_upgrade(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, - validation=ValidationLevel.Provider_Trust) - self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, - OpenPGPKey, ADDRESS) + yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, + validation=ValidationLevel.Provider_Trust) + d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) + yield self.assertFailure(d, KeyNotValidUpgrade) + @inlineCallbacks def test_fingerprint_level(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, - validation=ValidationLevel.Fingerprint) - key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, + validation=ValidationLevel.Fingerprint) + key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) + @inlineCallbacks def test_expired_key(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) - key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) + yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) + key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) + @inlineCallbacks def test_expired_fail_lower_level(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS, - validation=ValidationLevel.Third_Party_Endorsement) - self.assertRaises( - KeyNotValidUpgrade, - km.put_raw_key, + yield km.put_raw_key( + EXPIRED_KEY, OpenPGPKey, ADDRESS, + validation=ValidationLevel.Third_Party_Endorsement) + d = km.put_raw_key( UNRELATED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Provider_Trust) + yield self.assertFailure(d, KeyNotValidUpgrade) + @inlineCallbacks def test_roll_back(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) - km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) - key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + yield km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) + yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) + key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) + @inlineCallbacks + def test_not_used(self): + km = self._key_manager() + yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS, + validation=ValidationLevel.Provider_Trust) + yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, + validation=ValidationLevel.Provider_Endorsement) + key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) + + @inlineCallbacks + def test_used(self): + TEXT = "some text" + + km = self._key_manager() + yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS) + yield km.encrypt(TEXT, ADDRESS, OpenPGPKey) + + km2 = self._key_manager() + yield km2.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS) + signature = yield km2.sign(TEXT, ADDRESS, OpenPGPKey) + + yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature) + d = km.put_raw_key( + UNRELATED_KEY, OpenPGPKey, ADDRESS, + validation=ValidationLevel.Provider_Endorsement) + yield self.assertFailure(d, KeyNotValidUpgrade) + + @inlineCallbacks + def test_signed_key(self): + km = self._key_manager() + yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + yield km.put_raw_key(SIGNED_KEY, OpenPGPKey, ADDRESS) + key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) + self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT) + # Key material for testing @@ -155,7 +198,7 @@ Osuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci -----END PGP PUBLIC KEY BLOCK----- """ # updated expiration date -EXPIRED_KEY_NEW_EXPIRY_DATE = datetime.fromtimestamp(2045319180) +EXPIRED_KEY_NEW_EXPIRY_DATE = datetime.fromtimestamp(2049717872) EXPIRED_KEY_UPDATED = """ -----BEGIN PGP PUBLIC KEY BLOCK----- Version: GnuPG v1.4.12 (GNU/Linux) @@ -167,25 +210,132 @@ acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAG0HExlYXAgVGVzdCBLZXkg PGxlYXBAbGVhcC5zZT6JAT4EEwECACgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4B -AheABQJUURIXBQld/ZovAAoJEG8V8AShiFp8xUcIALcAHZbaxvyhHRGOrwDddbH0 -fFDK0AqKTsIT7y4D/HLFCP5zG3Ck7qGPZdkHXZfzq8rIb+zUjW3oJIVI1IucHxG2 -T5kppa8RFCBAFlRWYf6R3isX3YL0d3QSragjoxRNPcHNU8ALHcvfSonFHBoi4fH4 -4rvgksAiT68SsdPaoXDlabx5T15evu/7T5e/DGMQVPMxiaSuSQhbOKuMk2wcFdmL -tBYHLZPa54hHPNhEDyxLgtKKph0gObk9ojKfH9kPvLveIcpS5CqTJfN/kqBz7CJW -wEeAi2iG3H1OEB25aCUdTxXSRNlGqEgcWPaWxtc1RzlARu7LB64OUZuRy4puiAG5 +AheABQJUlDCSBQleQLiTAAoJEG8V8AShiFp8t3QH/1eqkVIScXmqaCVeno3VSKiH +HqnxiHcEgtpNRfUlP6tLD4H6QPEpvoUI9S/8HSYi3nbDGXEX8ycKlnwxjdIqWSOW +xj91/7uQAo+dP9QaVJ6xgaAiqzN1x3JzX3Js1wTodmNV0TfmGjxwnC5up/xK7/pd +KuDP3woDsRlwy8Lgj67mkn49xfAFHo6hI6SD36UBDAC/ELq6kZaba4Kk0fEVHCEz +HX0B09ZIY9fmf305cEB3dNh6SMQgKtH0wKozaqI2UM2B+cs3z08bC+YuUUh7UJTH +yr+hI7vF4/WEeJB3fuhP3xsumLhV8P47DaJ7oivmtsDEbAJFKqvigEqNES73Xpy5 AQ0EG+t93QEIAKqRq/2sBDW4g3FU+11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1 XeA+kTHiF0LaqoaciDRvkA9DvhDbSrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5 sXbuipY3TEiakugdSU4rzgi0hFycm6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm 4thYPuJ1kPH8/bkbTi9sLHoApYgL+7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3 leldixHHKAutNt49p0pkXlORAHRpUmp+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQ -KLyKoh5wsJsaPXBjdG7cf6G/cBcwvnQVUHcAEQEAAYkBJQQYAQIADwUCG+t93QIb -DAUJAAFRgAAKCRBvFfAEoYhafOPgB/9z4YCyT/N0262HtegHykhsyykuqEeNb1LV -D9INcP+RbCX/0IjFgP4DTMPP7qqF1OBwR276maALT321Gqxc5HN5YrwxGdmoyBLm -unaQJJlD+7B1C+jnO6r4m44obvJ/NMERxVyzkXap3J2VgRIO1wNLI9I0sH6Kj5/j -Mgy06OwXDcqIc+jB4sIJ3Tnm8LZ3phJzNEm9mI8Ak0oJ7IEcMndR6DzmRt1rJQcq -K/D7hOG02zvyRhxF27U1qR1MxeU/gNnOx8q4dnVyWB+EiV1sFl4iTOyYHEsoyd7W -Osuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci -=79Ll +KLyKoh5wsJsaPXBjdG7cf6G/cBcwvnQVUHcAEQEAAYkBJQQYAQIADwIbDAUCVJQw +3QUJXkC4/QAKCRBvFfAEoYhafEtiB/9hMfSFNMxtlIJDJArG4JwR7sBOatYUT858 +qZnTgGETZN8wXpeEpXWKdDdmCX9aeE9jsDNgSQ5WWpqU21bGMXh1IGjAzmqTqq3/ +ik1vALuaVfr6OqjTzrJVQujT61CGed26xpP3Zh8hLKyKa+dXnX/VpgZS42wZLPx2 +wcODfANmTfE2AhMap/RyDy21q4nau+z2hMEOKdtF8dpP+pEvzoN5ZexYP1hfT+Av +oFPyVB5YtEMfxTEshDKRPjbdgNmw4faKXd5Cbelo4YxxpO16FHb6gzIdjOX15vQ+ +KwcVXzg9xk4D3cr1mnTCops/iv6TXvcw4Wbo70rrKXwkjl8LKjOP +=sHoe +-----END PGP PUBLIC KEY BLOCK----- +""" +UNEXPIRED_KEY = EXPIRED_KEY_UPDATED +UNEXPIRED_PRIVATE = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1.4.12 (GNU/Linux) + +lQOYBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj +b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ +82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3 +acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A +IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV +wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAEAB/0cwelrGEdmG+Z/RxZx +4anvpzNNMRSJ0Xu508SVk4vElCQrlaPfFZC1t0ZW1XcHsQ5Gsy/gxaA4YbK1RXV2 +8uvvWh5oTsdLByzj/cSLLp5u+cYxyuaBOb/jiAiCPVEFnEec23pQ4fumwpebgX5f +FLGCVYAqWc2EMqOFVgnAEJ9TbIWRnCkN04r1WSc7eLcUlH+vTp4HUPd6PQj56zSr +J5beeviHgYB76M6mcM/BRzLmcl4M7bgx5olp8A0Wz7ub+hXICmNQyqpE8qZeyGjq +v4T/6BSpsp5yEGDMkahFyO7OwB7UI6SZGkdnWKGeXOWG48so6cFdZ8dxRGx49gFL +1rP1BADfYjQDfmBpB6tC1MyATb1MUK/1CN7wC5w7fXCtPbYNiqc9s27W9NXQReHD +GOU04weU+ZJsV6Fwlt3oRD2j05vNdhbqKseLdsm27/kg2GWZvjamriHqZ94sw6yk +fg3MqPb4JdFzBZVHqD50AHASx2rMshBeMVo27LhcADCWM9P8bwQA4yeRonbIAUls +yAwWIRCMel2JY1u/zmJrg8FFAG2LYx+pYaxkRxjSJNlQQV7o6aYiU3Yw+nXvj5Pz +IdOdimWfFb8eZ3U6tbognJxjwU8vV3ili40O7SENgloeM/nzg+nQjIaS9utfE8Et +juV7f9OWi8Fo+xzSOvUGwoL/zW5t+UsD/0bm+5ch53Sm1ITCn7yfMrp0YaT+YC3Y +mNNfrfbFpEd20ky4K9COIFDFCJmMyKLx/jSajcf4JqrxB/mOmHHAF9CeL7LUy/XV +O8Ec5lkovicDIDT1b+pQYEYvh5UBJmoq1R5nbNLo70gFtGP6b4+t27Gxks5VLhF/ +BVvxK7xjmkBETnq0HExlYXAgVGVzdCBLZXkgPGxlYXBAbGVhcC5zZT6JAT4EEwEC +ACgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheABQJUURIXBQld/ZovAAoJEG8V +8AShiFp8xUcIALcAHZbaxvyhHRGOrwDddbH0fFDK0AqKTsIT7y4D/HLFCP5zG3Ck +7qGPZdkHXZfzq8rIb+zUjW3oJIVI1IucHxG2T5kppa8RFCBAFlRWYf6R3isX3YL0 +d3QSragjoxRNPcHNU8ALHcvfSonFHBoi4fH44rvgksAiT68SsdPaoXDlabx5T15e +vu/7T5e/DGMQVPMxiaSuSQhbOKuMk2wcFdmLtBYHLZPa54hHPNhEDyxLgtKKph0g +Obk9ojKfH9kPvLveIcpS5CqTJfN/kqBz7CJWwEeAi2iG3H1OEB25aCUdTxXSRNlG +qEgcWPaWxtc1RzlARu7LB64OUZuRy4puiAGdA5gEG+t93QEIAKqRq/2sBDW4g3FU ++11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1XeA+kTHiF0LaqoaciDRvkA9DvhDb +SrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5sXbuipY3TEiakugdSU4rzgi0hFyc +m6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm4thYPuJ1kPH8/bkbTi9sLHoApYgL ++7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3leldixHHKAutNt49p0pkXlORAHRp +Ump+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQKLyKoh5wsJsaPXBjdG7cf6G/cBcw +vnQVUHcAEQEAAQAH/A0TCHNz3Yi+oXis8m2WzeyU7Sw6S4VOLnoXMgOhf/JLXVoy +S2P4qj73nMqNkYni2AJkej5GtOyunSGOpZ2zzKQyhigajq76HRRxP5oXwX7VLNy0 +bguSrys2IrJb/8Fq88rN/+H5kpvxNlog+P79wzTta5Y9/yIVJDNXIip/ptVARhA7 +CrdDyE4EaPjcWCS3/9a4R8JDZl19PlTE23DD5ffZv5wNEX38oZkDCK4Si+kqhvm7 +g0Upr49hnvqRPXoi46OBAoUh9yVTKaNDMsRWblvno7k3+MF0CCnix5p5JR74bGnZ +8kS14qXXkAa58uMaAIcv86+mHNovXhaxcog+8k0EAM8wWyWPjdO2xbwwB0hS2E9i +IO/X26uhLY3/tozbOekvqXKvwIy/xdWNVHr7eukAS+oIY10iczgKkMgquoqvzR4q +UY5WI0iC0iMLUGV7xdxusPl+aCbGKomtN/H3JR2Wecgje7K/3o5BtUDM6Fr2KPFb ++uf/gqVkoMmp3O/DjhDlBADSwMHuhfStF+eDXwSQ4WJ3uXP8n4M4t9J2zXO366BB +CAJg8enzwQi62YB+AOhP9NiY5ZrEySk0xGsnVgex2e7V5ilm1wd1z2el3g9ecfVj +yu9mwqHKT811xsLjqQC84JN+qHM/7t7TSgczY2vD8ho2O8bBZzuoiX+QIPYUXkDy +KwP8DTeHjnI6vAP2uVRnaY+bO53llyO5DDp4pnpr45yL47geciElq3m3jXFjHwos +mmkOlYAL07JXeZK+LwbhxmbrwLxXNJB//P7l8ByRsmIrWvPuPzzcKig1KnFqvFO1 +5wGU0Pso2qWZU+idrhCdG+D8LRSQ0uibOFCcjFdM0JOJ7e1RdIkBJQQYAQIADwUC +G+t93QIbDAUJAAFRgAAKCRBvFfAEoYhafOPgB/9z4YCyT/N0262HtegHykhsyyku +qEeNb1LVD9INcP+RbCX/0IjFgP4DTMPP7qqF1OBwR276maALT321Gqxc5HN5Yrwx +GdmoyBLmunaQJJlD+7B1C+jnO6r4m44obvJ/NMERxVyzkXap3J2VgRIO1wNLI9I0 +sH6Kj5/jMgy06OwXDcqIc+jB4sIJ3Tnm8LZ3phJzNEm9mI8Ak0oJ7IEcMndR6Dzm +Rt1rJQcqK/D7hOG02zvyRhxF27U1qR1MxeU/gNnOx8q4dnVyWB+EiV1sFl4iTOyY +HEsoyd7WOsuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci +=dZE8 +-----END PGP PRIVATE KEY BLOCK----- +""" + +# key CA1AD31E: public key "Leap Test Key <leap@leap.se>" +# signed by E36E738D69173C13D709E44F2F455E2824D18DDF +SIGNED_FINGERPRINT = "6704CF3362087DA23E3D2DF8ED81DFD1CA1AD31E" +SIGNED_KEY = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.12 (GNU/Linux) + +mQENBFQ9DHMBCADJXyNVzTQ+NnmSDbR6q8jjDsnqk/IgKrMBkpjNxUa/0HQ4o0Yh +pklzR1hIc/jsdgq42A0++pqdfQFeRc2NVw/NnE/9uzW73YuaWg5XnWGjuAP3UeRI +3xjL/cscEFmGfGkuGvFpIVa7GBPqz1SMBXWULJbkCE1pnHfgqh0R7oc5u0omnsln +0zIrmLX1ufpDRSUedjSgIfd6VqbkPm3NJuZE4NVn6spHG3zTxqcaPCG0xLfHw7eS +qgUdz0BFaxqtQiXffBpA3KvGJW0792VjDh4M6kDvpeYpKRmB9oEYlT3n3KvQrdPE +B3N5KrzJj1QIL990q4NQdqjg+jUE5zCJsTdzABEBAAG0HExlYXAgVGVzdCBLZXkg +PGxlYXBAbGVhcC5zZT6JATgEEwECACIFAlQ9DHMCGwMGCwkIBwMCBhUIAgkKCwQW +AgMBAh4BAheAAAoJEO2B39HKGtMeI/4H/0/OG1OqtQEoYscvJ+BZ3ZrM2pEk7KDd +7AEEf6QIGSd38GFyn/pve24cpRLv7phKNy9dX9VJhTDobpKvK0ZT/yQO3FVlySAN +NVpu93/jrLnrW51J3p/GP952NtUAEP5l1uyYIKZ1W3RLWws72Lh34HTaHAWC94oF +vnS42IYdTn4y6lfizL+wYD6CnfrIpHm8v3NABEQZ8e/jllrRK0pnOxAdFv/TpWEl +8AnTZXcejSBgCG6UmDtrRKfgoQlGJEIH61QSqHpRIwkepQVYexUwgcLFAZPI9Hvw +N5pZQ5Z+XcsYTGtLNEpF7YW6ykLDRTAv6LiBQPkBX8TDKhkh95Cs3sKJAhwEEAEC +AAYFAlQ9DgIACgkQL0VeKCTRjd/pABAAsNPbiGwuZ469NxwTgf3+EMoWZNHf5ZRa +ZbzKKesLFEElMdX3Q/MkVc1T8Rsy9Fdn1tf/H6glwuKyqWeXNkxa86VT6gck9WV6 +bslFIl/vJpb3dcmiCCM1tSCYpX0yE0fqebMihcfvNqDw3GdZBUo7R0pWN6BEh4iM +YYWTAtuPCrbsv+2bSid1ZLIO6FIF5kskg60h/IbSr+A+DSBgyyjf9fbUv6MoyMw8 +08GtCAx6VGJhTTC/RkWIA+N3n83W5XQFszOOg/PAAg0JMUXUBGvjfYJ5fcB8cfuw +1XZe9uWsDmYpwfVEtDajrLbatkXAu22pjIJnB4cVqiD+4hHbBCFkeZIfdRsPEINO +UacsjVZV5/EPDN9OpkvZbkrLJ6eaQnmQZnFclquNHUCqFI0QYUml0BXXaZq+aEJ9 +N9x00kdYV1xW6zkL+MGgxdViC5n6dwJcU3MANrykV8Cc5/x+wmwY8AXbHzU7MxvY +nGlAYsAZHhf4ZlEdAO6C329VotMxBLFd5DJZZoN+ysaOpsUNRl0JO41+6bbI141l +DCmzWUG4iTI70zxsgzZGgEt0HlMDoIxElPcy/jDKi1IfEDmveK+QR9WphM40Ayvx +VTeA6g9WagmoHopQs/D/Kbi3Q8izFDfXTwA52DUxTjyUEFn0jEOiG9BFmnIkQ6LE +3WkIJFd3D0+5AQ0EVD0McwEIALRXukBsOrcA/rNJ4SV4I64cGdN8q9Gl5RpLl8cS +L5+SGHp6KoCL4daBqpbxdhE/Ylb3QmPt2SBZbTkwJ2yuczELOyhH6R13OWRWCgkd +dYLZsm/sEzu7dVoFQ4peKTGDzI8mQ/s157wRkz/8iSUYjJjeM3g0NI55FVcefibN +pOOFRaYGUh8itofRFOu7ipZ9F8zRJdBwqISe1gemNBR+O3G3Srm34PYu6sZRsdLU +Nf+81+/ynQWQseVpbz8X93sx/onIYIY0w+kxIE0oR/gBBjdsMOp7EfcvtbGTgplQ ++Zxln/pV2bVFkGkjKniFEEfi4eCPknCj0+67qKRt/Fc9n98AEQEAAYkBHwQYAQIA +CQUCVD0McwIbDAAKCRDtgd/RyhrTHmmcCACpTbjOYjvvr8/rD60bDyhnfcKvpavO +1/gZtMeEmw5gysp10mOXwhc2XuC3R1A6wVbVgGuOp5RxlxI4w8xwwxMFSp6u2rS5 +jm5slXBKB2i3KE6Sao9uZKP2K4nS8Qc+DhathfgafI39vPtBmsb1SJd5W1njNnYY +hARRpViUcVdfvW3VRpDACZ79PBs4ZQQ022NsNAPwm/AJvAw2S42Y9tjTnaLVRLfH +lzdErcGTBnfEIi0mQF/11k/THBJxx7vaFt8YXiDlWLUkg5XW3xK9mkETbaTv+/tB +X2+l7IOSt+31KQCBFN/VmhTySJOVQC1d2A56lSH2c/DWVClji+x3suzn +=xprj -----END PGP PUBLIC KEY BLOCK----- """ diff --git a/src/leap/keymanager/validation.py b/src/leap/keymanager/validation.py index 245013e5..c81d533a 100644 --- a/src/leap/keymanager/validation.py +++ b/src/leap/keymanager/validation.py @@ -24,17 +24,17 @@ See: from datetime import datetime -from enum import Enum +from enum import IntEnum -ValidationLevel = Enum( - "Weak_Chain", - "Provider_Trust", - "Provider_Endorsement", - "Third_Party_Endorsement", - "Third_Party_Consensus", - "Historically_Auditing", - "Known_Key", +ValidationLevel = IntEnum("ValidationLevel", + "Weak_Chain " + "Provider_Trust " + "Provider_Endorsement " + "Third_Party_Endorsement " + "Third_Party_Consensus " + "Historically_Auditing " + "Known_Key " "Fingerprint") @@ -49,7 +49,7 @@ def toValidationLevel(value): :raises ValueError: if C{value} is not a validation level """ for level in ValidationLevel: - if value == str(level): + if value == level.name: return level raise ValueError("Not valid validation level: %s" % (value,)) @@ -60,9 +60,6 @@ def can_upgrade(new_key, old_key): :type old_key: EncryptionKey :rtype: bool """ - # XXX not succesfully used and strict high validation level (#6211) - # XXX implement key signature checking (#6120) - # First contact if old_key is None: return True @@ -82,7 +79,17 @@ def can_upgrade(new_key, old_key): return True # No expiration date and higher validation level - elif new_key.validation >= old_key.validation: + if (old_key.expiry_date is None and + new_key.validation > old_key.validation): + return True + + # Not successfully used and strict high validation level + if (not (old_key.sign_used and old_key.encr_used) and + new_key.validation > old_key.validation): + return True + + # New key signed by the old key + if old_key.key_id in new_key.signatures: return True return False |