summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2014-11-10 18:36:59 -0600
committerRuben Pollan <meskio@sindominio.net>2014-11-19 11:25:50 -0600
commit1d1274e57f5e466fb195c829a628f519ef51186a (patch)
treeed79bb7675c444e4b34026709f2fbf779ac90aaa
parent4ec8d345a32b9169300974941c5bdd87d85f3915 (diff)
Implement multi uid support
-rw-r--r--keymanager/changes/feature-6212_multi_uid_support1
-rw-r--r--keymanager/src/leap/keymanager/__init__.py12
-rw-r--r--keymanager/src/leap/keymanager/keys.py15
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py120
-rw-r--r--keymanager/src/leap/keymanager/tests/test_keymanager.py48
-rw-r--r--keymanager/src/leap/keymanager/tests/test_validation.py21
-rw-r--r--keymanager/src/leap/keymanager/validation.py4
7 files changed, 101 insertions, 120 deletions
diff --git a/keymanager/changes/feature-6212_multi_uid_support b/keymanager/changes/feature-6212_multi_uid_support
new file mode 100644
index 00000000..495b8c48
--- /dev/null
+++ b/keymanager/changes/feature-6212_multi_uid_support
@@ -0,0 +1 @@
+- Multi uid support (closes #6212)
diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py
index 562bfbfa..c64cdeab 100644
--- a/keymanager/src/leap/keymanager/__init__.py
+++ b/keymanager/src/leap/keymanager/__init__.py
@@ -319,7 +319,6 @@ class KeyManager(object):
return map(
lambda doc: build_key_from_dict(
self._key_class_from_type(doc.content['type']),
- doc.content['address'][0],
doc.content),
self._soledad.get_from_index(
TAGS_PRIVATE_INDEX,
@@ -519,26 +518,25 @@ class KeyManager(object):
except IndexError as e:
leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
- def put_key(self, key, address=None):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored
:type key: EncryptionKey
- :param address: address for which this key will be active. If not set
- all the uids will be activated
+ :param address: address for which this key will be active
:type address: str
:raises KeyAddressMismatch: if address doesn't match any uid on the key
:raises KeyNotValidUpdate: if a key with the same uid exists and the
new one is not a valid update for it
"""
- if address is not None and address not in key.address:
+ if address not in key.address:
raise KeyAddressMismatch("UID %s found, but expected %s"
% (str(key.address), address))
try:
- old_key = self._wrapper_map[type(key)].get_key(key.address[0],
+ old_key = self._wrapper_map[type(key)].get_key(address,
private=key.private)
except KeyNotFound:
old_key = None
@@ -553,7 +551,7 @@ class KeyManager(object):
raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s"
% (old_key.key_id, key.key_id))
- def put_raw_key(self, key, ktype, address=None,
+ def put_raw_key(self, key, ktype, address,
validation=ValidationLevel.Weak_Chain):
"""
Put C{key} in local storage.
diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py
index d1b5d933..2108aa26 100644
--- a/keymanager/src/leap/keymanager/keys.py
+++ b/keymanager/src/leap/keymanager/keys.py
@@ -109,20 +109,15 @@ def is_address(address):
return bool(re.match('[\w.-]+@[\w.-]+', address))
-def build_key_from_dict(kClass, address, kdict):
+def build_key_from_dict(kClass, kdict):
"""
- Build an C{kClass} key bound to C{address} based on info in C{kdict}.
+ Build an C{kClass} key based on info in C{kdict}.
- :param address: The address bound to the key.
- :type address: str
:param kdict: Dictionary with key data.
:type kdict: dict
:return: An instance of the key.
:rtype: C{kClass}
"""
- leap_assert(
- address in kdict[KEY_ADDRESS_KEY],
- 'Wrong address in key data.')
try:
validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
except ValueError:
@@ -135,7 +130,7 @@ def build_key_from_dict(kClass, address, kdict):
refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
return kClass(
- [address],
+ kdict[KEY_ADDRESS_KEY],
key_id=kdict[KEY_ID_KEY],
fingerprint=kdict[KEY_FINGERPRINT_KEY],
key_data=kdict[KEY_DATA_KEY],
@@ -315,12 +310,14 @@ class EncryptionScheme(object):
pass
@abstractmethod
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put a key in local storage.
:param key: The key to be stored.
:type key: EncryptionKey
+ :param address: address for which this key will be active.
+ :type address: str
"""
pass
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index 52655d0e..4f965745 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -38,6 +38,7 @@ from leap.keymanager.keys import (
build_key_from_dict,
TYPE_ID_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
+ KEY_ADDRESS_KEY,
KEY_FINGERPRINT_KEY,
KEY_DATA_KEY,
KEY_ID_KEY,
@@ -110,9 +111,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address[0], privkeys)
+ privids = map(lambda privkey: privkey.key_id, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.address[0] not in privaddrs, publkeys)
+ lambda pubkey: pubkey.key_id not in privids, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -163,16 +164,13 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.homedir)
-def _build_key_from_gpg(address, key, key_data):
+def _build_key_from_gpg(key, key_data):
"""
- Build an OpenPGPKey for C{address} based on C{key} from
- local gpg storage.
+ Build an OpenPGPKey based on C{key} from local gpg storage.
ASCII armored GPG key data has to be queried independently in this
wrapper, so we receive it in C{key_data}.
- :param address: The address bound to the key.
- :type address: str
:param key: Key obtained from GPG storage.
:type key: dict
:param key_data: Key data obtained from GPG storage.
@@ -183,9 +181,12 @@ def _build_key_from_gpg(address, key, key_data):
expiry_date = None
if key['expires']:
expiry_date = datetime.fromtimestamp(int(key['expires']))
+ address = []
+ for uid in key['uids']:
+ address.append(_parse_address(uid))
return OpenPGPKey(
- [address],
+ address,
key_id=key['keyid'],
fingerprint=key['fingerprint'],
key_data=key_data,
@@ -198,14 +199,18 @@ def _build_key_from_gpg(address, key, key_data):
def _parse_address(address):
"""
- Remove the identity suffix after the '+' until the '@'
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
e.g.: test_user+something@provider.com becomes test_user@provider.com
since the key belongs to the identity without the '+' suffix.
:type address: str
:rtype: str
"""
- return re.sub(r'\+.*\@', '@', address)
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
#
@@ -289,15 +294,17 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(
len(key['uids']) is 1, # with just one uid!
'Wrong number of uids for key: %d.' % len(key['uids']))
- leap_assert(
- re.match('.*<%s>$' % address, key['uids'][0]) is not None,
- 'Key not correctly bound to address.')
+ uid_match = False
+ for uid in key['uids']:
+ if re.match('.*<%s>$' % address, uid) is not None:
+ uid_match = True
+ return
+ leap_assert(uid_match, 'Key not correctly bound to address.')
# insert both public and private keys in storage
for secret in [True, False]:
key = gpg.list_keys(secret=secret).pop()
openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint'], secret=secret))
+ key, gpg.export_keys(key['fingerprint'], secret=secret))
self.put_key(openpgp_key, address)
return self.get_key(address, private=True)
@@ -320,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):
doc = self._get_key_doc(address, private)
if doc is None:
raise errors.KeyNotFound(address)
- return build_key_from_dict(OpenPGPKey, address, doc.content)
+ leap_assert(
+ address in doc.content[KEY_ADDRESS_KEY],
+ 'Wrong address in key data.')
+ return build_key_from_dict(OpenPGPKey, doc.content)
def parse_ascii_key(self, key_data):
"""
@@ -337,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(key_data, (str, unicode))
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
with self._temporary_gpgwrapper() as gpg:
# TODO: inspect result, or use decorator
@@ -354,44 +363,30 @@ class OpenPGPScheme(EncryptionScheme):
except IndexError:
return (None, None)
- # extract adress from first uid on key
- match = re.match(mail_regex, pubkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- address = match.group(1)
-
openpgp_privkey = None
if privkey is not None:
- match = re.match(mail_regex, privkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- privaddress = match.group(1)
-
# build private key
openpgp_privkey = _build_key_from_gpg(
- privaddress, privkey,
+ privkey,
gpg.export_keys(privkey['fingerprint'], secret=True))
-
- leap_check(address == privaddress,
- 'Addresses in public and private key differ.',
- errors.KeyAddressMismatch)
leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
'Fingerprints for public and private key differ.',
errors.KeyFingerprintMismatch)
# build public key
openpgp_pubkey = _build_key_from_gpg(
- address, pubkey,
+ pubkey,
gpg.export_keys(pubkey['fingerprint'], secret=False))
return (openpgp_pubkey, openpgp_privkey)
- def put_ascii_key(self, key_data, address=None):
+ def put_ascii_key(self, key_data, address):
"""
Put key contained in ascii-armored C{key_data} in local storage.
:param key_data: The key data to be stored.
:type key_data: str or unicode
- :param address: address for which this key will be active. If not set
- all the uids will be activated
+ :param address: address for which this key will be active
:type address: str
"""
leap_assert_type(key_data, (str, unicode))
@@ -407,23 +402,17 @@ class OpenPGPScheme(EncryptionScheme):
if openpgp_privkey is not None:
self.put_key(openpgp_privkey, address)
- def put_key(self, key, address=None):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored.
:type key: OpenPGPKey
- :param address: address for which this key will be active. If not set
- all the uids will be activated
+ :param address: address for which this key will be active.
:type address: str
"""
- if address is not None:
- active_address = [_parse_address(address)]
- else:
- active_address = key.address
-
self._put_key_doc(key)
- self._put_active_doc(key, active_address)
+ self._put_active_doc(key, address)
def _put_key_doc(self, key):
"""
@@ -445,7 +434,7 @@ class OpenPGPScheme(EncryptionScheme):
gpg.import_keys(key.key_data)
gpgkey = gpg.list_keys(secret=key.private).pop()
key = _build_key_from_gpg(
- key.address[0], gpgkey,
+ gpgkey,
gpg.export_keys(gpgkey['fingerprint'],
secret=key.private))
doc.set_json(key.get_json())
@@ -458,31 +447,30 @@ class OpenPGPScheme(EncryptionScheme):
else:
self._soledad.create_doc_from_json(key.get_json())
- def _put_active_doc(self, key, addresses):
+ def _put_active_doc(self, key, address):
"""
Put active key document in soledad
:type key: OpenPGPKey
- :type addresses: list(str)
- """
- for address in addresses:
- docs = self._soledad.get_from_index(
- TYPE_ADDRESS_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- address,
- '1' if key.private else '0')
- if len(docs) == 1:
- doc = docs.pop()
- doc.set_json(key.get_active_json(address))
- self._soledad.put_doc(doc)
- else:
- if len(docs) > 1:
- logger.error("There is more than one active key document "
- "for the address %s" % (address,))
- for doc in docs:
- self._soledad.delete_doc(doc)
- self._soledad.create_doc_from_json(
- key.get_active_json(address))
+ :type addresses: str
+ """
+ docs = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if key.private else '0')
+ if len(docs) == 1:
+ doc = docs.pop()
+ doc.set_json(key.get_active_json(address))
+ self._soledad.put_doc(doc)
+ else:
+ if len(docs) > 1:
+ logger.error("There is more than one active key document "
+ "for the address %s" % (address,))
+ for doc in docs:
+ self._soledad.delete_doc(doc)
+ self._soledad.create_doc_from_json(
+ key.get_active_json(address))
def _get_key_doc(self, address, private=False):
"""
diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py
index 4daf3465..6aeb67a8 100644
--- a/keymanager/src/leap/keymanager/tests/test_keymanager.py
+++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py
@@ -89,7 +89,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):
'encr_used': False,
'sign_used': True,
}
- key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
+ key = build_key_from_dict(OpenPGPKey, kdict)
self.assertEqual(
kdict['address'], key.address,
'Wrong data in key.')
@@ -144,7 +144,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
key = pgp.get_key(ADDRESS, private=False)
pgp.delete_key(key)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
@@ -153,7 +153,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
key = pgp.get_key(ADDRESS, private=False)
self.assertIsInstance(key, openpgp.OpenPGPKey)
self.assertTrue(
@@ -167,7 +167,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
key = pgp.get_key(ADDRESS, private=False)
@@ -181,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
# encrypt
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
pubkey = pgp.get_key(ADDRESS, private=False)
cyphertext = pgp.encrypt('data', pubkey)
# assert
@@ -193,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
# decrypt
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
privkey = pgp.get_key(ADDRESS, private=True)
pgp.delete_key(pubkey)
pgp.delete_key(privkey)
@@ -216,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_sign_with_public_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PUBLIC_KEY)
+ pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
data = 'data'
pubkey = pgp.get_key(ADDRESS, private=False)
self.assertRaises(
@@ -226,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_verify_with_wrong_key_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
signed = pgp.sign(data, privkey)
- pgp.put_ascii_key(PUBLIC_KEY_2)
+ pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
@@ -239,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_encrypt_sign_with_public_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
@@ -250,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_decrypt_verify_with_private_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
@@ -264,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_decrypt_verify_with_wrong_key_raises(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
- pgp.put_ascii_key(PUBLIC_KEY_2)
+ pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
@@ -278,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_sign_verify(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
signed = pgp.sign(data, privkey, detach=False)
@@ -288,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_encrypt_sign_decrypt_verify(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
pubkey = pgp.get_key(ADDRESS, private=False)
privkey = pgp.get_key(ADDRESS, private=True)
- pgp.put_ascii_key(PRIVATE_KEY_2)
+ pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2)
pubkey2 = pgp.get_key(ADDRESS_2, private=False)
privkey2 = pgp.get_key(ADDRESS_2, private=True)
data = 'data'
@@ -304,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_sign_verify_detached_sig(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
+ pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
signature = pgp.sign(data, privkey, detach=True)
@@ -316,7 +316,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_get_all_keys_in_db(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get public keys
keys = km.get_all_keys(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
@@ -330,7 +330,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_get_public_key(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get the key
key = km.get_key(ADDRESS, OpenPGPKey, private=False,
fetch_remote=False)
@@ -342,7 +342,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_get_private_key(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get the key
key = km.get_key(ADDRESS, OpenPGPKey, private=True,
fetch_remote=False)
@@ -364,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
token = "mytoken"
km = self._key_manager(token=token)
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
km._fetcher.put = Mock()
# the following data will be used on the send
km.ca_cert_path = 'capath'
@@ -447,7 +447,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
"""
km = self._key_manager(url='http://nickserver.domain')
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.address)
@@ -509,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_keymanager_openpgp_encrypt_decrypt(self):
km = self._key_manager()
# put raw private key
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get public key
pubkey = km.get_key(
ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
@@ -526,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
def test_keymanager_openpgp_sign_verify(self):
km = self._key_manager()
# put raw private keys
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get private key for signing
privkey = km.get_key(
ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
diff --git a/keymanager/src/leap/keymanager/tests/test_validation.py b/keymanager/src/leap/keymanager/tests/test_validation.py
index 3ae873d4..400d36e8 100644
--- a/keymanager/src/leap/keymanager/tests/test_validation.py
+++ b/keymanager/src/leap/keymanager/tests/test_validation.py
@@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
def test_none_old_key(self):
km = self._key_manager()
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
def test_cant_upgrade(self):
km = self._key_manager()
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey,
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevel.Provider_Trust)
self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY,
- OpenPGPKey)
+ OpenPGPKey, ADDRESS)
def test_fingerprint_level(self):
km = self._key_manager()
- km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
- km.put_raw_key(UNRELATED_KEY, OpenPGPKey,
+ km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+ km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevel.Fingerprint)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
def test_expired_key(self):
km = self._key_manager()
- km.put_raw_key(EXPIRED_KEY, OpenPGPKey)
- km.put_raw_key(UNRELATED_KEY, OpenPGPKey)
+ km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
+ km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
def test_expired_fail_lower_level(self):
km = self._key_manager()
- km.put_raw_key(EXPIRED_KEY, OpenPGPKey,
+ km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevel.Third_Party_Endorsement)
self.assertRaises(
KeyNotValidUpgrade,
km.put_raw_key,
UNRELATED_KEY,
OpenPGPKey,
+ ADDRESS,
validation=ValidationLevel.Provider_Trust)
def test_roll_back(self):
km = self._key_manager()
- km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey)
- km.put_raw_key(EXPIRED_KEY, OpenPGPKey)
+ km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS)
+ km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)
diff --git a/keymanager/src/leap/keymanager/validation.py b/keymanager/src/leap/keymanager/validation.py
index cf5b4a83..245013e5 100644
--- a/keymanager/src/leap/keymanager/validation.py
+++ b/keymanager/src/leap/keymanager/validation.py
@@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key):
if old_key is None:
return True
- if new_key.address != old_key.address:
- # XXX how do we map multiple IDs? (#6212)
- return False
-
# An update of the same key
if new_key.fingerprint == old_key.fingerprint:
return True