diff options
| -rw-r--r-- | keymanager/changes/feature-6212_multi_uid_support | 1 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 12 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/keys.py | 15 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 120 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 48 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_validation.py | 21 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/validation.py | 4 | 
7 files changed, 101 insertions, 120 deletions
| diff --git a/keymanager/changes/feature-6212_multi_uid_support b/keymanager/changes/feature-6212_multi_uid_support new file mode 100644 index 0000000..495b8c4 --- /dev/null +++ b/keymanager/changes/feature-6212_multi_uid_support @@ -0,0 +1 @@ +- Multi uid support (closes #6212) diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 562bfbf..c64cdea 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -319,7 +319,6 @@ class KeyManager(object):          return map(              lambda doc: build_key_from_dict(                  self._key_class_from_type(doc.content['type']), -                doc.content['address'][0],                  doc.content),              self._soledad.get_from_index(                  TAGS_PRIVATE_INDEX, @@ -519,26 +518,25 @@ class KeyManager(object):          except IndexError as e:              leap_assert(False, "Unsupported key type. Error {0!r}".format(e)) -    def put_key(self, key, address=None): +    def put_key(self, key, address):          """          Put C{key} in local storage.          :param key: The key to be stored          :type key: EncryptionKey -        :param address: address for which this key will be active. If not set -                        all the uids will be activated +        :param address: address for which this key will be active          :type address: str          :raises KeyAddressMismatch: if address doesn't match any uid on the key          :raises KeyNotValidUpdate: if a key with the same uid exists and the                                     new one is not a valid update for it          """ -        if address is not None and address not in key.address: +        if address not in key.address:              raise KeyAddressMismatch("UID %s found, but expected %s"                                       % (str(key.address), address))          try: -            old_key = self._wrapper_map[type(key)].get_key(key.address[0], +            old_key = self._wrapper_map[type(key)].get_key(address,                                                             private=key.private)          except KeyNotFound:              old_key = None @@ -553,7 +551,7 @@ class KeyManager(object):              raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s"                                       % (old_key.key_id, key.key_id)) -    def put_raw_key(self, key, ktype, address=None, +    def put_raw_key(self, key, ktype, address,                      validation=ValidationLevel.Weak_Chain):          """          Put C{key} in local storage. diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index d1b5d93..2108aa2 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -109,20 +109,15 @@ def is_address(address):      return bool(re.match('[\w.-]+@[\w.-]+', address)) -def build_key_from_dict(kClass, address, kdict): +def build_key_from_dict(kClass, kdict):      """ -    Build an C{kClass} key bound to C{address} based on info in C{kdict}. +    Build an C{kClass} key based on info in C{kdict}. -    :param address: The address bound to the key. -    :type address: str      :param kdict: Dictionary with key data.      :type kdict: dict      :return: An instance of the key.      :rtype: C{kClass}      """ -    leap_assert( -        address in kdict[KEY_ADDRESS_KEY], -        'Wrong address in key data.')      try:          validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])      except ValueError: @@ -135,7 +130,7 @@ def build_key_from_dict(kClass, address, kdict):      refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])      return kClass( -        [address], +        kdict[KEY_ADDRESS_KEY],          key_id=kdict[KEY_ID_KEY],          fingerprint=kdict[KEY_FINGERPRINT_KEY],          key_data=kdict[KEY_DATA_KEY], @@ -315,12 +310,14 @@ class EncryptionScheme(object):          pass      @abstractmethod -    def put_key(self, key): +    def put_key(self, key, address):          """          Put a key in local storage.          :param key: The key to be stored.          :type key: EncryptionKey +        :param address: address for which this key will be active. +        :type address: str          """          pass diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 52655d0..4f96574 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -38,6 +38,7 @@ from leap.keymanager.keys import (      build_key_from_dict,      TYPE_ID_PRIVATE_INDEX,      TYPE_ADDRESS_PRIVATE_INDEX, +    KEY_ADDRESS_KEY,      KEY_FINGERPRINT_KEY,      KEY_DATA_KEY,      KEY_ID_KEY, @@ -110,9 +111,9 @@ class TempGPGWrapper(object):          # itself is enough to also have the public key in the keyring,          # and we want to count the keys afterwards. -        privaddrs = map(lambda privkey: privkey.address[0], privkeys) +        privids = map(lambda privkey: privkey.key_id, privkeys)          publkeys = filter( -            lambda pubkey: pubkey.address[0] not in privaddrs, publkeys) +            lambda pubkey: pubkey.key_id not in privids, publkeys)          listkeys = lambda: self._gpg.list_keys()          listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -163,16 +164,13 @@ class TempGPGWrapper(object):              shutil.rmtree(self._gpg.homedir) -def _build_key_from_gpg(address, key, key_data): +def _build_key_from_gpg(key, key_data):      """ -    Build an OpenPGPKey for C{address} based on C{key} from -    local gpg storage. +    Build an OpenPGPKey based on C{key} from local gpg storage.      ASCII armored GPG key data has to be queried independently in this      wrapper, so we receive it in C{key_data}. -    :param address: The address bound to the key. -    :type address: str      :param key: Key obtained from GPG storage.      :type key: dict      :param key_data: Key data obtained from GPG storage. @@ -183,9 +181,12 @@ def _build_key_from_gpg(address, key, key_data):      expiry_date = None      if key['expires']:          expiry_date = datetime.fromtimestamp(int(key['expires'])) +    address = [] +    for uid in key['uids']: +        address.append(_parse_address(uid))      return OpenPGPKey( -        [address], +        address,          key_id=key['keyid'],          fingerprint=key['fingerprint'],          key_data=key_data, @@ -198,14 +199,18 @@ def _build_key_from_gpg(address, key, key_data):  def _parse_address(address):      """ -    Remove the identity suffix after the '+' until the '@' +    Remove name, '<', '>' and the identity suffix after the '+' until the '@'      e.g.: test_user+something@provider.com becomes test_user@provider.com      since the key belongs to the identity without the '+' suffix.      :type address: str      :rtype: str      """ -    return re.sub(r'\+.*\@', '@', address) +    mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' +    match = re.match(mail_regex, address) +    if match is None: +        return None +    return ''.join(match.group(2, 4))  # @@ -289,15 +294,17 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert(                  len(key['uids']) is 1,  # with just one uid!                  'Wrong number of uids for key: %d.' % len(key['uids'])) -            leap_assert( -                re.match('.*<%s>$' % address, key['uids'][0]) is not None, -                'Key not correctly bound to address.') +            uid_match = False +            for uid in key['uids']: +                if re.match('.*<%s>$' % address, uid) is not None: +                    uid_match = True +                    return +            leap_assert(uid_match, 'Key not correctly bound to address.')              # insert both public and private keys in storage              for secret in [True, False]:                  key = gpg.list_keys(secret=secret).pop()                  openpgp_key = _build_key_from_gpg( -                    address, key, -                    gpg.export_keys(key['fingerprint'], secret=secret)) +                    key, gpg.export_keys(key['fingerprint'], secret=secret))                  self.put_key(openpgp_key, address)          return self.get_key(address, private=True) @@ -320,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):          doc = self._get_key_doc(address, private)          if doc is None:              raise errors.KeyNotFound(address) -        return build_key_from_dict(OpenPGPKey, address, doc.content) +        leap_assert( +            address in doc.content[KEY_ADDRESS_KEY], +            'Wrong address in key data.') +        return build_key_from_dict(OpenPGPKey, doc.content)      def parse_ascii_key(self, key_data):          """ @@ -337,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):          leap_assert_type(key_data, (str, unicode))          # TODO: add more checks for correct key data.          leap_assert(key_data is not None, 'Data does not represent a key.') -        mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'          with self._temporary_gpgwrapper() as gpg:              # TODO: inspect result, or use decorator @@ -354,44 +363,30 @@ class OpenPGPScheme(EncryptionScheme):              except IndexError:                  return (None, None) -            # extract adress from first uid on key -            match = re.match(mail_regex, pubkey['uids'].pop()) -            leap_assert(match is not None, 'No user address in key data.') -            address = match.group(1) -              openpgp_privkey = None              if privkey is not None: -                match = re.match(mail_regex, privkey['uids'].pop()) -                leap_assert(match is not None, 'No user address in key data.') -                privaddress = match.group(1) -                  # build private key                  openpgp_privkey = _build_key_from_gpg( -                    privaddress, privkey, +                    privkey,                      gpg.export_keys(privkey['fingerprint'], secret=True)) - -                leap_check(address == privaddress, -                           'Addresses in public and private key differ.', -                           errors.KeyAddressMismatch)                  leap_check(pubkey['fingerprint'] == privkey['fingerprint'],                             'Fingerprints for public and private key differ.',                             errors.KeyFingerprintMismatch)              # build public key              openpgp_pubkey = _build_key_from_gpg( -                address, pubkey, +                pubkey,                  gpg.export_keys(pubkey['fingerprint'], secret=False))              return (openpgp_pubkey, openpgp_privkey) -    def put_ascii_key(self, key_data, address=None): +    def put_ascii_key(self, key_data, address):          """          Put key contained in ascii-armored C{key_data} in local storage.          :param key_data: The key data to be stored.          :type key_data: str or unicode -        :param address: address for which this key will be active. If not set -                        all the uids will be activated +        :param address: address for which this key will be active          :type address: str          """          leap_assert_type(key_data, (str, unicode)) @@ -407,23 +402,17 @@ class OpenPGPScheme(EncryptionScheme):          if openpgp_privkey is not None:              self.put_key(openpgp_privkey, address) -    def put_key(self, key, address=None): +    def put_key(self, key, address):          """          Put C{key} in local storage.          :param key: The key to be stored.          :type key: OpenPGPKey -        :param address: address for which this key will be active. If not set -                        all the uids will be activated +        :param address: address for which this key will be active.          :type address: str          """ -        if address is not None: -            active_address = [_parse_address(address)] -        else: -            active_address = key.address -          self._put_key_doc(key) -        self._put_active_doc(key, active_address) +        self._put_active_doc(key, address)      def _put_key_doc(self, key):          """ @@ -445,7 +434,7 @@ class OpenPGPScheme(EncryptionScheme):                      gpg.import_keys(key.key_data)                      gpgkey = gpg.list_keys(secret=key.private).pop()                      key = _build_key_from_gpg( -                        key.address[0], gpgkey, +                        gpgkey,                          gpg.export_keys(gpgkey['fingerprint'],                                          secret=key.private))                  doc.set_json(key.get_json()) @@ -458,31 +447,30 @@ class OpenPGPScheme(EncryptionScheme):          else:              self._soledad.create_doc_from_json(key.get_json()) -    def _put_active_doc(self, key, addresses): +    def _put_active_doc(self, key, address):          """          Put active key document in soledad          :type key: OpenPGPKey -        :type addresses: list(str) -        """ -        for address in addresses: -            docs = self._soledad.get_from_index( -                TYPE_ADDRESS_PRIVATE_INDEX, -                self.ACTIVE_TYPE, -                address, -                '1' if key.private else '0') -            if len(docs) == 1: -                doc = docs.pop() -                doc.set_json(key.get_active_json(address)) -                self._soledad.put_doc(doc) -            else: -                if len(docs) > 1: -                    logger.error("There is more than one active key document " -                                 "for the address %s" % (address,)) -                    for doc in docs: -                        self._soledad.delete_doc(doc) -                self._soledad.create_doc_from_json( -                    key.get_active_json(address)) +        :type addresses: str +        """ +        docs = self._soledad.get_from_index( +            TYPE_ADDRESS_PRIVATE_INDEX, +            self.ACTIVE_TYPE, +            address, +            '1' if key.private else '0') +        if len(docs) == 1: +            doc = docs.pop() +            doc.set_json(key.get_active_json(address)) +            self._soledad.put_doc(doc) +        else: +            if len(docs) > 1: +                logger.error("There is more than one active key document " +                             "for the address %s" % (address,)) +                for doc in docs: +                    self._soledad.delete_doc(doc) +            self._soledad.create_doc_from_json( +                key.get_active_json(address))      def _get_key_doc(self, address, private=False):          """ diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 4daf346..6aeb67a 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -89,7 +89,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):              'encr_used': False,              'sign_used': True,          } -        key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict) +        key = build_key_from_dict(OpenPGPKey, kdict)          self.assertEqual(              kdict['address'], key.address,              'Wrong data in key.') @@ -144,7 +144,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) -        pgp.put_ascii_key(PUBLIC_KEY) +        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)          key = pgp.get_key(ADDRESS, private=False)          pgp.delete_key(key)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) @@ -153,7 +153,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) -        pgp.put_ascii_key(PUBLIC_KEY) +        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)          key = pgp.get_key(ADDRESS, private=False)          self.assertIsInstance(key, openpgp.OpenPGPKey)          self.assertTrue( @@ -167,7 +167,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) -        pgp.put_ascii_key(PUBLIC_KEY) +        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)          self.assertRaises(              KeyNotFound, pgp.get_key, ADDRESS, private=True)          key = pgp.get_key(ADDRESS, private=False) @@ -181,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          # encrypt          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PUBLIC_KEY) +        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)          pubkey = pgp.get_key(ADDRESS, private=False)          cyphertext = pgp.encrypt('data', pubkey)          # assert @@ -193,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          # decrypt          self.assertRaises(              KeyNotFound, pgp.get_key, ADDRESS, private=True) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          privkey = pgp.get_key(ADDRESS, private=True)          pgp.delete_key(pubkey)          pgp.delete_key(privkey) @@ -216,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_sign_with_public_raises(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PUBLIC_KEY) +        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)          data = 'data'          pubkey = pgp.get_key(ADDRESS, private=False)          self.assertRaises( @@ -226,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_verify_with_wrong_key_raises(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          signed = pgp.sign(data, privkey) -        pgp.put_ascii_key(PUBLIC_KEY_2) +        pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)          wrongkey = pgp.get_key(ADDRESS_2)          self.assertRaises(              errors.InvalidSignature, @@ -239,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_encrypt_sign_with_public_raises(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          pubkey = pgp.get_key(ADDRESS, private=False) @@ -250,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_decrypt_verify_with_private_raises(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          pubkey = pgp.get_key(ADDRESS, private=False) @@ -264,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_decrypt_verify_with_wrong_key_raises(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          pubkey = pgp.get_key(ADDRESS, private=False)          encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) -        pgp.put_ascii_key(PUBLIC_KEY_2) +        pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)          wrongkey = pgp.get_key(ADDRESS_2)          self.assertRaises(              errors.InvalidSignature, @@ -278,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_sign_verify(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          signed = pgp.sign(data, privkey, detach=False) @@ -288,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_encrypt_sign_decrypt_verify(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          pubkey = pgp.get_key(ADDRESS, private=False)          privkey = pgp.get_key(ADDRESS, private=True) -        pgp.put_ascii_key(PRIVATE_KEY_2) +        pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2)          pubkey2 = pgp.get_key(ADDRESS_2, private=False)          privkey2 = pgp.get_key(ADDRESS_2, private=True)          data = 'data' @@ -304,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_sign_verify_detached_sig(self):          pgp = openpgp.OpenPGPScheme(              self._soledad, gpgbinary=GPG_BINARY_PATH) -        pgp.put_ascii_key(PRIVATE_KEY) +        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          signature = pgp.sign(data, privkey, detach=True) @@ -316,7 +316,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_get_all_keys_in_db(self):          km = self._key_manager() -        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)          # get public keys          keys = km.get_all_keys(False)          self.assertEqual(len(keys), 1, 'Wrong number of keys') @@ -330,7 +330,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_get_public_key(self):          km = self._key_manager() -        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)          # get the key          key = km.get_key(ADDRESS, OpenPGPKey, private=False,                           fetch_remote=False) @@ -342,7 +342,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):      def test_get_private_key(self):          km = self._key_manager() -        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)          # get the key          key = km.get_key(ADDRESS, OpenPGPKey, private=True,                           fetch_remote=False) @@ -364,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          token = "mytoken"          km = self._key_manager(token=token) -        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)          km._fetcher.put = Mock()          # the following data will be used on the send          km.ca_cert_path = 'capath' @@ -447,7 +447,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          """          km = self._key_manager(url='http://nickserver.domain') -        km.put_raw_key(PUBLIC_KEY, OpenPGPKey) +        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)          key = km.get_key(ADDRESS, OpenPGPKey)          self.assertIsInstance(key, OpenPGPKey)          self.assertTrue(ADDRESS in key.address) @@ -509,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_keymanager_openpgp_encrypt_decrypt(self):          km = self._key_manager()          # put raw private key -        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)          # get public key          pubkey = km.get_key(              ADDRESS, OpenPGPKey, private=False, fetch_remote=False) @@ -526,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):      def test_keymanager_openpgp_sign_verify(self):          km = self._key_manager()          # put raw private keys -        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)          # get private key for signing          privkey = km.get_key(              ADDRESS, OpenPGPKey, private=True, fetch_remote=False) diff --git a/keymanager/src/leap/keymanager/tests/test_validation.py b/keymanager/src/leap/keymanager/tests/test_validation.py index 3ae873d..400d36e 100644 --- a/keymanager/src/leap/keymanager/tests/test_validation.py +++ b/keymanager/src/leap/keymanager/tests/test_validation.py @@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):      def test_none_old_key(self):          km = self._key_manager() -        km.put_raw_key(PUBLIC_KEY, OpenPGPKey) +        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)          key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)          self.assertEqual(key.fingerprint, KEY_FINGERPRINT)      def test_cant_upgrade(self):          km = self._key_manager() -        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, +        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,                         validation=ValidationLevel.Provider_Trust)          self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY, -                          OpenPGPKey) +                          OpenPGPKey, ADDRESS)      def test_fingerprint_level(self):          km = self._key_manager() -        km.put_raw_key(PUBLIC_KEY, OpenPGPKey) -        km.put_raw_key(UNRELATED_KEY, OpenPGPKey, +        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS) +        km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,                         validation=ValidationLevel.Fingerprint)          key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)          self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)      def test_expired_key(self):          km = self._key_manager() -        km.put_raw_key(EXPIRED_KEY, OpenPGPKey) -        km.put_raw_key(UNRELATED_KEY, OpenPGPKey) +        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS) +        km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)          key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)          self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)      def test_expired_fail_lower_level(self):          km = self._key_manager() -        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, +        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS,                         validation=ValidationLevel.Third_Party_Endorsement)          self.assertRaises(              KeyNotValidUpgrade,              km.put_raw_key,              UNRELATED_KEY,              OpenPGPKey, +            ADDRESS,              validation=ValidationLevel.Provider_Trust)      def test_roll_back(self):          km = self._key_manager() -        km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey) -        km.put_raw_key(EXPIRED_KEY, OpenPGPKey) +        km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS) +        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)          key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)          self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) diff --git a/keymanager/src/leap/keymanager/validation.py b/keymanager/src/leap/keymanager/validation.py index cf5b4a8..245013e 100644 --- a/keymanager/src/leap/keymanager/validation.py +++ b/keymanager/src/leap/keymanager/validation.py @@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key):      if old_key is None:          return True -    if new_key.address != old_key.address: -        # XXX how do we map multiple IDs? (#6212) -        return False -      # An update of the same key      if new_key.fingerprint == old_key.fingerprint:          return True | 
