From f07d407523e6b76076824fa53e4c3568bc88764f Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Mon, 10 Nov 2014 18:36:59 -0600 Subject: Implement multi uid support --- src/leap/keymanager/__init__.py | 12 ++- src/leap/keymanager/keys.py | 15 ++-- src/leap/keymanager/openpgp.py | 120 ++++++++++++--------------- src/leap/keymanager/tests/test_keymanager.py | 48 +++++------ src/leap/keymanager/tests/test_validation.py | 21 ++--- src/leap/keymanager/validation.py | 4 - 6 files changed, 100 insertions(+), 120 deletions(-) (limited to 'src/leap/keymanager') diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index 562bfbfa..c64cdeab 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -319,7 +319,6 @@ class KeyManager(object): return map( lambda doc: build_key_from_dict( self._key_class_from_type(doc.content['type']), - doc.content['address'][0], doc.content), self._soledad.get_from_index( TAGS_PRIVATE_INDEX, @@ -519,26 +518,25 @@ class KeyManager(object): except IndexError as e: leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) - def put_key(self, key, address=None): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored :type key: EncryptionKey - :param address: address for which this key will be active. If not set - all the uids will be activated + :param address: address for which this key will be active :type address: str :raises KeyAddressMismatch: if address doesn't match any uid on the key :raises KeyNotValidUpdate: if a key with the same uid exists and the new one is not a valid update for it """ - if address is not None and address not in key.address: + if address not in key.address: raise KeyAddressMismatch("UID %s found, but expected %s" % (str(key.address), address)) try: - old_key = self._wrapper_map[type(key)].get_key(key.address[0], + old_key = self._wrapper_map[type(key)].get_key(address, private=key.private) except KeyNotFound: old_key = None @@ -553,7 +551,7 @@ class KeyManager(object): raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s" % (old_key.key_id, key.key_id)) - def put_raw_key(self, key, ktype, address=None, + def put_raw_key(self, key, ktype, address, validation=ValidationLevel.Weak_Chain): """ Put C{key} in local storage. diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py index d1b5d933..2108aa26 100644 --- a/src/leap/keymanager/keys.py +++ b/src/leap/keymanager/keys.py @@ -109,20 +109,15 @@ def is_address(address): return bool(re.match('[\w.-]+@[\w.-]+', address)) -def build_key_from_dict(kClass, address, kdict): +def build_key_from_dict(kClass, kdict): """ - Build an C{kClass} key bound to C{address} based on info in C{kdict}. + Build an C{kClass} key based on info in C{kdict}. - :param address: The address bound to the key. - :type address: str :param kdict: Dictionary with key data. :type kdict: dict :return: An instance of the key. :rtype: C{kClass} """ - leap_assert( - address in kdict[KEY_ADDRESS_KEY], - 'Wrong address in key data.') try: validation = toValidationLevel(kdict[KEY_VALIDATION_KEY]) except ValueError: @@ -135,7 +130,7 @@ def build_key_from_dict(kClass, address, kdict): refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY]) return kClass( - [address], + kdict[KEY_ADDRESS_KEY], key_id=kdict[KEY_ID_KEY], fingerprint=kdict[KEY_FINGERPRINT_KEY], key_data=kdict[KEY_DATA_KEY], @@ -315,12 +310,14 @@ class EncryptionScheme(object): pass @abstractmethod - def put_key(self, key): + def put_key(self, key, address): """ Put a key in local storage. :param key: The key to be stored. :type key: EncryptionKey + :param address: address for which this key will be active. + :type address: str """ pass diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 52655d0e..4f965745 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -38,6 +38,7 @@ from leap.keymanager.keys import ( build_key_from_dict, TYPE_ID_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, + KEY_ADDRESS_KEY, KEY_FINGERPRINT_KEY, KEY_DATA_KEY, KEY_ID_KEY, @@ -110,9 +111,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privaddrs = map(lambda privkey: privkey.address[0], privkeys) + privids = map(lambda privkey: privkey.key_id, privkeys) publkeys = filter( - lambda pubkey: pubkey.address[0] not in privaddrs, publkeys) + lambda pubkey: pubkey.key_id not in privids, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -163,16 +164,13 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data): """ - Build an OpenPGPKey for C{address} based on C{key} from - local gpg storage. + Build an OpenPGPKey based on C{key} from local gpg storage. ASCII armored GPG key data has to be queried independently in this wrapper, so we receive it in C{key_data}. - :param address: The address bound to the key. - :type address: str :param key: Key obtained from GPG storage. :type key: dict :param key_data: Key data obtained from GPG storage. @@ -183,9 +181,12 @@ def _build_key_from_gpg(address, key, key_data): expiry_date = None if key['expires']: expiry_date = datetime.fromtimestamp(int(key['expires'])) + address = [] + for uid in key['uids']: + address.append(_parse_address(uid)) return OpenPGPKey( - [address], + address, key_id=key['keyid'], fingerprint=key['fingerprint'], key_data=key_data, @@ -198,14 +199,18 @@ def _build_key_from_gpg(address, key, key_data): def _parse_address(address): """ - Remove the identity suffix after the '+' until the '@' + Remove name, '<', '>' and the identity suffix after the '+' until the '@' e.g.: test_user+something@provider.com becomes test_user@provider.com since the key belongs to the identity without the '+' suffix. :type address: str :rtype: str """ - return re.sub(r'\+.*\@', '@', address) + mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' + match = re.match(mail_regex, address) + if match is None: + return None + return ''.join(match.group(2, 4)) # @@ -289,15 +294,17 @@ class OpenPGPScheme(EncryptionScheme): leap_assert( len(key['uids']) is 1, # with just one uid! 'Wrong number of uids for key: %d.' % len(key['uids'])) - leap_assert( - re.match('.*<%s>$' % address, key['uids'][0]) is not None, - 'Key not correctly bound to address.') + uid_match = False + for uid in key['uids']: + if re.match('.*<%s>$' % address, uid) is not None: + uid_match = True + return + leap_assert(uid_match, 'Key not correctly bound to address.') # insert both public and private keys in storage for secret in [True, False]: key = gpg.list_keys(secret=secret).pop() openpgp_key = _build_key_from_gpg( - address, key, - gpg.export_keys(key['fingerprint'], secret=secret)) + key, gpg.export_keys(key['fingerprint'], secret=secret)) self.put_key(openpgp_key, address) return self.get_key(address, private=True) @@ -320,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme): doc = self._get_key_doc(address, private) if doc is None: raise errors.KeyNotFound(address) - return build_key_from_dict(OpenPGPKey, address, doc.content) + leap_assert( + address in doc.content[KEY_ADDRESS_KEY], + 'Wrong address in key data.') + return build_key_from_dict(OpenPGPKey, doc.content) def parse_ascii_key(self, key_data): """ @@ -337,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme): leap_assert_type(key_data, (str, unicode)) # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - mail_regex = '.*<([\w.-]+@[\w.-]+)>.*' with self._temporary_gpgwrapper() as gpg: # TODO: inspect result, or use decorator @@ -354,44 +363,30 @@ class OpenPGPScheme(EncryptionScheme): except IndexError: return (None, None) - # extract adress from first uid on key - match = re.match(mail_regex, pubkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - address = match.group(1) - openpgp_privkey = None if privkey is not None: - match = re.match(mail_regex, privkey['uids'].pop()) - leap_assert(match is not None, 'No user address in key data.') - privaddress = match.group(1) - # build private key openpgp_privkey = _build_key_from_gpg( - privaddress, privkey, + privkey, gpg.export_keys(privkey['fingerprint'], secret=True)) - - leap_check(address == privaddress, - 'Addresses in public and private key differ.', - errors.KeyAddressMismatch) leap_check(pubkey['fingerprint'] == privkey['fingerprint'], 'Fingerprints for public and private key differ.', errors.KeyFingerprintMismatch) # build public key openpgp_pubkey = _build_key_from_gpg( - address, pubkey, + pubkey, gpg.export_keys(pubkey['fingerprint'], secret=False)) return (openpgp_pubkey, openpgp_privkey) - def put_ascii_key(self, key_data, address=None): + def put_ascii_key(self, key_data, address): """ Put key contained in ascii-armored C{key_data} in local storage. :param key_data: The key data to be stored. :type key_data: str or unicode - :param address: address for which this key will be active. If not set - all the uids will be activated + :param address: address for which this key will be active :type address: str """ leap_assert_type(key_data, (str, unicode)) @@ -407,23 +402,17 @@ class OpenPGPScheme(EncryptionScheme): if openpgp_privkey is not None: self.put_key(openpgp_privkey, address) - def put_key(self, key, address=None): + def put_key(self, key, address): """ Put C{key} in local storage. :param key: The key to be stored. :type key: OpenPGPKey - :param address: address for which this key will be active. If not set - all the uids will be activated + :param address: address for which this key will be active. :type address: str """ - if address is not None: - active_address = [_parse_address(address)] - else: - active_address = key.address - self._put_key_doc(key) - self._put_active_doc(key, active_address) + self._put_active_doc(key, address) def _put_key_doc(self, key): """ @@ -445,7 +434,7 @@ class OpenPGPScheme(EncryptionScheme): gpg.import_keys(key.key_data) gpgkey = gpg.list_keys(secret=key.private).pop() key = _build_key_from_gpg( - key.address[0], gpgkey, + gpgkey, gpg.export_keys(gpgkey['fingerprint'], secret=key.private)) doc.set_json(key.get_json()) @@ -458,31 +447,30 @@ class OpenPGPScheme(EncryptionScheme): else: self._soledad.create_doc_from_json(key.get_json()) - def _put_active_doc(self, key, addresses): + def _put_active_doc(self, key, address): """ Put active key document in soledad :type key: OpenPGPKey - :type addresses: list(str) - """ - for address in addresses: - docs = self._soledad.get_from_index( - TYPE_ADDRESS_PRIVATE_INDEX, - self.ACTIVE_TYPE, - address, - '1' if key.private else '0') - if len(docs) == 1: - doc = docs.pop() - doc.set_json(key.get_active_json(address)) - self._soledad.put_doc(doc) - else: - if len(docs) > 1: - logger.error("There is more than one active key document " - "for the address %s" % (address,)) - for doc in docs: - self._soledad.delete_doc(doc) - self._soledad.create_doc_from_json( - key.get_active_json(address)) + :type addresses: str + """ + docs = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, + address, + '1' if key.private else '0') + if len(docs) == 1: + doc = docs.pop() + doc.set_json(key.get_active_json(address)) + self._soledad.put_doc(doc) + else: + if len(docs) > 1: + logger.error("There is more than one active key document " + "for the address %s" % (address,)) + for doc in docs: + self._soledad.delete_doc(doc) + self._soledad.create_doc_from_json( + key.get_active_json(address)) def _get_key_doc(self, address, private=False): """ diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 4daf3465..6aeb67a8 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -89,7 +89,7 @@ class KeyManagerUtilTestCase(BaseLeapTest): 'encr_used': False, 'sign_used': True, } - key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict) + key = build_key_from_dict(OpenPGPKey, kdict) self.assertEqual( kdict['address'], key.address, 'Wrong data in key.') @@ -144,7 +144,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) key = pgp.get_key(ADDRESS, private=False) pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -153,7 +153,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) key = pgp.get_key(ADDRESS, private=False) self.assertIsInstance(key, openpgp.OpenPGPKey) self.assertTrue( @@ -167,7 +167,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) key = pgp.get_key(ADDRESS, private=False) @@ -181,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): # encrypt pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) pubkey = pgp.get_key(ADDRESS, private=False) cyphertext = pgp.encrypt('data', pubkey) # assert @@ -193,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): # decrypt self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) privkey = pgp.get_key(ADDRESS, private=True) pgp.delete_key(pubkey) pgp.delete_key(privkey) @@ -216,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_with_public_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PUBLIC_KEY) + pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) data = 'data' pubkey = pgp.get_key(ADDRESS, private=False) self.assertRaises( @@ -226,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_verify_with_wrong_key_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signed = pgp.sign(data, privkey) - pgp.put_ascii_key(PUBLIC_KEY_2) + pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, @@ -239,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_encrypt_sign_with_public_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) @@ -250,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_verify_with_private_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) @@ -264,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_decrypt_verify_with_wrong_key_raises(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) - pgp.put_ascii_key(PUBLIC_KEY_2) + pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, @@ -278,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_verify(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signed = pgp.sign(data, privkey, detach=False) @@ -288,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_encrypt_sign_decrypt_verify(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) pubkey = pgp.get_key(ADDRESS, private=False) privkey = pgp.get_key(ADDRESS, private=True) - pgp.put_ascii_key(PRIVATE_KEY_2) + pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2) pubkey2 = pgp.get_key(ADDRESS_2, private=False) privkey2 = pgp.get_key(ADDRESS_2, private=True) data = 'data' @@ -304,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def test_sign_verify_detached_sig(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=GPG_BINARY_PATH) - pgp.put_ascii_key(PRIVATE_KEY) + pgp.put_ascii_key(PRIVATE_KEY, ADDRESS) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) signature = pgp.sign(data, privkey, detach=True) @@ -316,7 +316,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_all_keys_in_db(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public keys keys = km.get_all_keys(False) self.assertEqual(len(keys), 1, 'Wrong number of keys') @@ -330,7 +330,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_public_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key key = km.get_key(ADDRESS, OpenPGPKey, private=False, fetch_remote=False) @@ -342,7 +342,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_get_private_key(self): km = self._key_manager() - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get the key key = km.get_key(ADDRESS, OpenPGPKey, private=True, fetch_remote=False) @@ -364,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ token = "mytoken" km = self._key_manager(token=token) - km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS) km._fetcher.put = Mock() # the following data will be used on the send km.ca_cert_path = 'capath' @@ -447,7 +447,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): """ km = self._key_manager(url='http://nickserver.domain') - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey) self.assertIsInstance(key, OpenPGPKey) self.assertTrue(ADDRESS in key.address) @@ -509,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_encrypt_decrypt(self): km = self._key_manager() # put raw private key - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get public key pubkey = km.get_key( ADDRESS, OpenPGPKey, private=False, fetch_remote=False) @@ -526,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): def test_keymanager_openpgp_sign_verify(self): km = self._key_manager() # put raw private keys - km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS) # get private key for signing privkey = km.get_key( ADDRESS, OpenPGPKey, private=True, fetch_remote=False) diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py index 3ae873d4..400d36e8 100644 --- a/src/leap/keymanager/tests/test_validation.py +++ b/src/leap/keymanager/tests/test_validation.py @@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase): def test_none_old_key(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, KEY_FINGERPRINT) def test_cant_upgrade(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey, + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Provider_Trust) self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, - OpenPGPKey) + OpenPGPKey, ADDRESS) def test_fingerprint_level(self): km = self._key_manager() - km.put_raw_key(PUBLIC_KEY, OpenPGPKey) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey, + km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) + km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Fingerprint) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) def test_expired_key(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey) - km.put_raw_key(UNRELATED_KEY, OpenPGPKey) + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) + km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) def test_expired_fail_lower_level(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY, OpenPGPKey, + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS, validation=ValidationLevel.Third_Party_Endorsement) self.assertRaises( KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, OpenPGPKey, + ADDRESS, validation=ValidationLevel.Provider_Trust) def test_roll_back(self): km = self._key_manager() - km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey) - km.put_raw_key(EXPIRED_KEY, OpenPGPKey) + km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) + km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False) self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) diff --git a/src/leap/keymanager/validation.py b/src/leap/keymanager/validation.py index cf5b4a83..245013e5 100644 --- a/src/leap/keymanager/validation.py +++ b/src/leap/keymanager/validation.py @@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key): if old_key is None: return True - if new_key.address != old_key.address: - # XXX how do we map multiple IDs? (#6212) - return False - # An update of the same key if new_key.fingerprint == old_key.fingerprint: return True -- cgit v1.2.3