Implement multi uid support
authorRuben Pollan <meskio@sindominio.net>
Tue, 11 Nov 2014 00:36:59 +0000 (18:36 -0600)
committerRuben Pollan <meskio@sindominio.net>
Wed, 19 Nov 2014 17:25:50 +0000 (11:25 -0600)
changes/feature-6212_multi_uid_support [new file with mode: 0644]
src/leap/keymanager/__init__.py
src/leap/keymanager/keys.py
src/leap/keymanager/openpgp.py
src/leap/keymanager/tests/test_keymanager.py
src/leap/keymanager/tests/test_validation.py
src/leap/keymanager/validation.py

diff --git a/changes/feature-6212_multi_uid_support b/changes/feature-6212_multi_uid_support
new file mode 100644 (file)
index 0000000..495b8c4
--- /dev/null
@@ -0,0 +1 @@
+- Multi uid support (closes #6212)
index 562bfbf..c64cdea 100644 (file)
@@ -319,7 +319,6 @@ class KeyManager(object):
         return map(
             lambda doc: build_key_from_dict(
                 self._key_class_from_type(doc.content['type']),
-                doc.content['address'][0],
                 doc.content),
             self._soledad.get_from_index(
                 TAGS_PRIVATE_INDEX,
@@ -519,26 +518,25 @@ class KeyManager(object):
         except IndexError as e:
             leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
 
-    def put_key(self, key, address=None):
+    def put_key(self, key, address):
         """
         Put C{key} in local storage.
 
         :param key: The key to be stored
         :type key: EncryptionKey
-        :param address: address for which this key will be active. If not set
-                        all the uids will be activated
+        :param address: address for which this key will be active
         :type address: str
 
         :raises KeyAddressMismatch: if address doesn't match any uid on the key
         :raises KeyNotValidUpdate: if a key with the same uid exists and the
                                    new one is not a valid update for it
         """
-        if address is not None and address not in key.address:
+        if address not in key.address:
             raise KeyAddressMismatch("UID %s found, but expected %s"
                                      % (str(key.address), address))
 
         try:
-            old_key = self._wrapper_map[type(key)].get_key(key.address[0],
+            old_key = self._wrapper_map[type(key)].get_key(address,
                                                            private=key.private)
         except KeyNotFound:
             old_key = None
@@ -553,7 +551,7 @@ class KeyManager(object):
             raise KeyNotValidUpgrade("Key %s can not be upgraded by new key %s"
                                      % (old_key.key_id, key.key_id))
 
-    def put_raw_key(self, key, ktype, address=None,
+    def put_raw_key(self, key, ktype, address,
                     validation=ValidationLevel.Weak_Chain):
         """
         Put C{key} in local storage.
index d1b5d93..2108aa2 100644 (file)
@@ -109,20 +109,15 @@ def is_address(address):
     return bool(re.match('[\w.-]+@[\w.-]+', address))
 
 
-def build_key_from_dict(kClass, address, kdict):
+def build_key_from_dict(kClass, kdict):
     """
-    Build an C{kClass} key bound to C{address} based on info in C{kdict}.
+    Build an C{kClass} key based on info in C{kdict}.
 
-    :param address: The address bound to the key.
-    :type address: str
     :param kdict: Dictionary with key data.
     :type kdict: dict
     :return: An instance of the key.
     :rtype: C{kClass}
     """
-    leap_assert(
-        address in kdict[KEY_ADDRESS_KEY],
-        'Wrong address in key data.')
     try:
         validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
     except ValueError:
@@ -135,7 +130,7 @@ def build_key_from_dict(kClass, address, kdict):
     refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
 
     return kClass(
-        [address],
+        kdict[KEY_ADDRESS_KEY],
         key_id=kdict[KEY_ID_KEY],
         fingerprint=kdict[KEY_FINGERPRINT_KEY],
         key_data=kdict[KEY_DATA_KEY],
@@ -315,12 +310,14 @@ class EncryptionScheme(object):
         pass
 
     @abstractmethod
-    def put_key(self, key):
+    def put_key(self, key, address):
         """
         Put a key in local storage.
 
         :param key: The key to be stored.
         :type key: EncryptionKey
+        :param address: address for which this key will be active.
+        :type address: str
         """
         pass
 
index 52655d0..4f96574 100644 (file)
@@ -38,6 +38,7 @@ from leap.keymanager.keys import (
     build_key_from_dict,
     TYPE_ID_PRIVATE_INDEX,
     TYPE_ADDRESS_PRIVATE_INDEX,
+    KEY_ADDRESS_KEY,
     KEY_FINGERPRINT_KEY,
     KEY_DATA_KEY,
     KEY_ID_KEY,
@@ -110,9 +111,9 @@ class TempGPGWrapper(object):
         # itself is enough to also have the public key in the keyring,
         # and we want to count the keys afterwards.
 
-        privaddrs = map(lambda privkey: privkey.address[0], privkeys)
+        privids = map(lambda privkey: privkey.key_id, privkeys)
         publkeys = filter(
-            lambda pubkey: pubkey.address[0] not in privaddrs, publkeys)
+            lambda pubkey: pubkey.key_id not in privids, publkeys)
 
         listkeys = lambda: self._gpg.list_keys()
         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -163,16 +164,13 @@ class TempGPGWrapper(object):
             shutil.rmtree(self._gpg.homedir)
 
 
-def _build_key_from_gpg(address, key, key_data):
+def _build_key_from_gpg(key, key_data):
     """
-    Build an OpenPGPKey for C{address} based on C{key} from
-    local gpg storage.
+    Build an OpenPGPKey based on C{key} from local gpg storage.
 
     ASCII armored GPG key data has to be queried independently in this
     wrapper, so we receive it in C{key_data}.
 
-    :param address: The address bound to the key.
-    :type address: str
     :param key: Key obtained from GPG storage.
     :type key: dict
     :param key_data: Key data obtained from GPG storage.
@@ -183,9 +181,12 @@ def _build_key_from_gpg(address, key, key_data):
     expiry_date = None
     if key['expires']:
         expiry_date = datetime.fromtimestamp(int(key['expires']))
+    address = []
+    for uid in key['uids']:
+        address.append(_parse_address(uid))
 
     return OpenPGPKey(
-        [address],
+        address,
         key_id=key['keyid'],
         fingerprint=key['fingerprint'],
         key_data=key_data,
@@ -198,14 +199,18 @@ def _build_key_from_gpg(address, key, key_data):
 
 def _parse_address(address):
     """
-    Remove the identity suffix after the '+' until the '@'
+    Remove name, '<', '>' and the identity suffix after the '+' until the '@'
     e.g.: test_user+something@provider.com becomes test_user@provider.com
     since the key belongs to the identity without the '+' suffix.
 
     :type address: str
     :rtype: str
     """
-    return re.sub(r'\+.*\@', '@', address)
+    mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+    match = re.match(mail_regex, address)
+    if match is None:
+        return None
+    return ''.join(match.group(2, 4))
 
 
 #
@@ -289,15 +294,17 @@ class OpenPGPScheme(EncryptionScheme):
             leap_assert(
                 len(key['uids']) is 1,  # with just one uid!
                 'Wrong number of uids for key: %d.' % len(key['uids']))
-            leap_assert(
-                re.match('.*<%s>$' % address, key['uids'][0]) is not None,
-                'Key not correctly bound to address.')
+            uid_match = False
+            for uid in key['uids']:
+                if re.match('.*<%s>$' % address, uid) is not None:
+                    uid_match = True
+                    return
+            leap_assert(uid_match, 'Key not correctly bound to address.')
             # insert both public and private keys in storage
             for secret in [True, False]:
                 key = gpg.list_keys(secret=secret).pop()
                 openpgp_key = _build_key_from_gpg(
-                    address, key,
-                    gpg.export_keys(key['fingerprint'], secret=secret))
+                    key, gpg.export_keys(key['fingerprint'], secret=secret))
                 self.put_key(openpgp_key, address)
 
         return self.get_key(address, private=True)
@@ -320,7 +327,10 @@ class OpenPGPScheme(EncryptionScheme):
         doc = self._get_key_doc(address, private)
         if doc is None:
             raise errors.KeyNotFound(address)
-        return build_key_from_dict(OpenPGPKey, address, doc.content)
+        leap_assert(
+            address in doc.content[KEY_ADDRESS_KEY],
+            'Wrong address in key data.')
+        return build_key_from_dict(OpenPGPKey, doc.content)
 
     def parse_ascii_key(self, key_data):
         """
@@ -337,7 +347,6 @@ class OpenPGPScheme(EncryptionScheme):
         leap_assert_type(key_data, (str, unicode))
         # TODO: add more checks for correct key data.
         leap_assert(key_data is not None, 'Data does not represent a key.')
-        mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
 
         with self._temporary_gpgwrapper() as gpg:
             # TODO: inspect result, or use decorator
@@ -354,44 +363,30 @@ class OpenPGPScheme(EncryptionScheme):
             except IndexError:
                 return (None, None)
 
-            # extract adress from first uid on key
-            match = re.match(mail_regex, pubkey['uids'].pop())
-            leap_assert(match is not None, 'No user address in key data.')
-            address = match.group(1)
-
             openpgp_privkey = None
             if privkey is not None:
-                match = re.match(mail_regex, privkey['uids'].pop())
-                leap_assert(match is not None, 'No user address in key data.')
-                privaddress = match.group(1)
-
                 # build private key
                 openpgp_privkey = _build_key_from_gpg(
-                    privaddress, privkey,
+                    privkey,
                     gpg.export_keys(privkey['fingerprint'], secret=True))
-
-                leap_check(address == privaddress,
-                           'Addresses in public and private key differ.',
-                           errors.KeyAddressMismatch)
                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
                            'Fingerprints for public and private key differ.',
                            errors.KeyFingerprintMismatch)
 
             # build public key
             openpgp_pubkey = _build_key_from_gpg(
-                address, pubkey,
+                pubkey,
                 gpg.export_keys(pubkey['fingerprint'], secret=False))
 
             return (openpgp_pubkey, openpgp_privkey)
 
-    def put_ascii_key(self, key_data, address=None):
+    def put_ascii_key(self, key_data, address):
         """
         Put key contained in ascii-armored C{key_data} in local storage.
 
         :param key_data: The key data to be stored.
         :type key_data: str or unicode
-        :param address: address for which this key will be active. If not set
-                        all the uids will be activated
+        :param address: address for which this key will be active
         :type address: str
         """
         leap_assert_type(key_data, (str, unicode))
@@ -407,23 +402,17 @@ class OpenPGPScheme(EncryptionScheme):
         if openpgp_privkey is not None:
             self.put_key(openpgp_privkey, address)
 
-    def put_key(self, key, address=None):
+    def put_key(self, key, address):
         """
         Put C{key} in local storage.
 
         :param key: The key to be stored.
         :type key: OpenPGPKey
-        :param address: address for which this key will be active. If not set
-                        all the uids will be activated
+        :param address: address for which this key will be active.
         :type address: str
         """
-        if address is not None:
-            active_address = [_parse_address(address)]
-        else:
-            active_address = key.address
-
         self._put_key_doc(key)
-        self._put_active_doc(key, active_address)
+        self._put_active_doc(key, address)
 
     def _put_key_doc(self, key):
         """
@@ -445,7 +434,7 @@ class OpenPGPScheme(EncryptionScheme):
                     gpg.import_keys(key.key_data)
                     gpgkey = gpg.list_keys(secret=key.private).pop()
                     key = _build_key_from_gpg(
-                        key.address[0], gpgkey,
+                        gpgkey,
                         gpg.export_keys(gpgkey['fingerprint'],
                                         secret=key.private))
                 doc.set_json(key.get_json())
@@ -458,31 +447,30 @@ class OpenPGPScheme(EncryptionScheme):
         else:
             self._soledad.create_doc_from_json(key.get_json())
 
-    def _put_active_doc(self, key, addresses):
+    def _put_active_doc(self, key, address):
         """
         Put active key document in soledad
 
         :type key: OpenPGPKey
-        :type addresses: list(str)
-        """
-        for address in addresses:
-            docs = self._soledad.get_from_index(
-                TYPE_ADDRESS_PRIVATE_INDEX,
-                self.ACTIVE_TYPE,
-                address,
-                '1' if key.private else '0')
-            if len(docs) == 1:
-                doc = docs.pop()
-                doc.set_json(key.get_active_json(address))
-                self._soledad.put_doc(doc)
-            else:
-                if len(docs) > 1:
-                    logger.error("There is more than one active key document "
-                                 "for the address %s" % (address,))
-                    for doc in docs:
-                        self._soledad.delete_doc(doc)
-                self._soledad.create_doc_from_json(
-                    key.get_active_json(address))
+        :type addresses: str
+        """
+        docs = self._soledad.get_from_index(
+            TYPE_ADDRESS_PRIVATE_INDEX,
+            self.ACTIVE_TYPE,
+            address,
+            '1' if key.private else '0')
+        if len(docs) == 1:
+            doc = docs.pop()
+            doc.set_json(key.get_active_json(address))
+            self._soledad.put_doc(doc)
+        else:
+            if len(docs) > 1:
+                logger.error("There is more than one active key document "
+                             "for the address %s" % (address,))
+                for doc in docs:
+                    self._soledad.delete_doc(doc)
+            self._soledad.create_doc_from_json(
+                key.get_active_json(address))
 
     def _get_key_doc(self, address, private=False):
         """
index 4daf346..6aeb67a 100644 (file)
@@ -89,7 +89,7 @@ class KeyManagerUtilTestCase(BaseLeapTest):
             'encr_used': False,
             'sign_used': True,
         }
-        key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
+        key = build_key_from_dict(OpenPGPKey, kdict)
         self.assertEqual(
             kdict['address'], key.address,
             'Wrong data in key.')
@@ -144,7 +144,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
-        pgp.put_ascii_key(PUBLIC_KEY)
+        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
         key = pgp.get_key(ADDRESS, private=False)
         pgp.delete_key(key)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
@@ -153,7 +153,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
-        pgp.put_ascii_key(PUBLIC_KEY)
+        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
         key = pgp.get_key(ADDRESS, private=False)
         self.assertIsInstance(key, openpgp.OpenPGPKey)
         self.assertTrue(
@@ -167,7 +167,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
-        pgp.put_ascii_key(PUBLIC_KEY)
+        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
         self.assertRaises(
             KeyNotFound, pgp.get_key, ADDRESS, private=True)
         key = pgp.get_key(ADDRESS, private=False)
@@ -181,7 +181,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         # encrypt
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PUBLIC_KEY)
+        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
         pubkey = pgp.get_key(ADDRESS, private=False)
         cyphertext = pgp.encrypt('data', pubkey)
         # assert
@@ -193,7 +193,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         # decrypt
         self.assertRaises(
             KeyNotFound, pgp.get_key, ADDRESS, private=True)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         privkey = pgp.get_key(ADDRESS, private=True)
         pgp.delete_key(pubkey)
         pgp.delete_key(privkey)
@@ -216,7 +216,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_sign_with_public_raises(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PUBLIC_KEY)
+        pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
         data = 'data'
         pubkey = pgp.get_key(ADDRESS, private=False)
         self.assertRaises(
@@ -226,11 +226,11 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_verify_with_wrong_key_raises(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         signed = pgp.sign(data, privkey)
-        pgp.put_ascii_key(PUBLIC_KEY_2)
+        pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
         wrongkey = pgp.get_key(ADDRESS_2)
         self.assertRaises(
             errors.InvalidSignature,
@@ -239,7 +239,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_encrypt_sign_with_public_raises(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         pubkey = pgp.get_key(ADDRESS, private=False)
@@ -250,7 +250,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_decrypt_verify_with_private_raises(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         pubkey = pgp.get_key(ADDRESS, private=False)
@@ -264,12 +264,12 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_decrypt_verify_with_wrong_key_raises(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         pubkey = pgp.get_key(ADDRESS, private=False)
         encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
-        pgp.put_ascii_key(PUBLIC_KEY_2)
+        pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
         wrongkey = pgp.get_key(ADDRESS_2)
         self.assertRaises(
             errors.InvalidSignature,
@@ -278,7 +278,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_sign_verify(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         signed = pgp.sign(data, privkey, detach=False)
@@ -288,10 +288,10 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_encrypt_sign_decrypt_verify(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         pubkey = pgp.get_key(ADDRESS, private=False)
         privkey = pgp.get_key(ADDRESS, private=True)
-        pgp.put_ascii_key(PRIVATE_KEY_2)
+        pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2)
         pubkey2 = pgp.get_key(ADDRESS_2, private=False)
         privkey2 = pgp.get_key(ADDRESS_2, private=True)
         data = 'data'
@@ -304,7 +304,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_sign_verify_detached_sig(self):
         pgp = openpgp.OpenPGPScheme(
             self._soledad, gpgbinary=GPG_BINARY_PATH)
-        pgp.put_ascii_key(PRIVATE_KEY)
+        pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         signature = pgp.sign(data, privkey, detach=True)
@@ -316,7 +316,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
 
     def test_get_all_keys_in_db(self):
         km = self._key_manager()
-        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
         # get public keys
         keys = km.get_all_keys(False)
         self.assertEqual(len(keys), 1, 'Wrong number of keys')
@@ -330,7 +330,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
 
     def test_get_public_key(self):
         km = self._key_manager()
-        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
         # get the key
         key = km.get_key(ADDRESS, OpenPGPKey, private=False,
                          fetch_remote=False)
@@ -342,7 +342,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
 
     def test_get_private_key(self):
         km = self._key_manager()
-        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
         # get the key
         key = km.get_key(ADDRESS, OpenPGPKey, private=True,
                          fetch_remote=False)
@@ -364,7 +364,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         """
         token = "mytoken"
         km = self._key_manager(token=token)
-        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
         km._fetcher.put = Mock()
         # the following data will be used on the send
         km.ca_cert_path = 'capath'
@@ -447,7 +447,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         """
         km = self._key_manager(url='http://nickserver.domain')
 
-        km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
+        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
         key = km.get_key(ADDRESS, OpenPGPKey)
         self.assertIsInstance(key, OpenPGPKey)
         self.assertTrue(ADDRESS in key.address)
@@ -509,7 +509,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_keymanager_openpgp_encrypt_decrypt(self):
         km = self._key_manager()
         # put raw private key
-        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
         # get public key
         pubkey = km.get_key(
             ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
@@ -526,7 +526,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
     def test_keymanager_openpgp_sign_verify(self):
         km = self._key_manager()
         # put raw private keys
-        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
         # get private key for signing
         privkey = km.get_key(
             ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
index 3ae873d..400d36e 100644 (file)
@@ -37,47 +37,48 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
 
     def test_none_old_key(self):
         km = self._key_manager()
-        km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
+        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
         key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
         self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
 
     def test_cant_upgrade(self):
         km = self._key_manager()
-        km.put_raw_key(PUBLIC_KEY, OpenPGPKey,
+        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
                        validation=ValidationLevel.Provider_Trust)
         self.assertRaises(KeyNotValidUpgrade, km.put_raw_key, UNRELATED_KEY,
-                          OpenPGPKey)
+                          OpenPGPKey, ADDRESS)
 
     def test_fingerprint_level(self):
         km = self._key_manager()
-        km.put_raw_key(PUBLIC_KEY, OpenPGPKey)
-        km.put_raw_key(UNRELATED_KEY, OpenPGPKey,
+        km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+        km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
                        validation=ValidationLevel.Fingerprint)
         key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
         self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
 
     def test_expired_key(self):
         km = self._key_manager()
-        km.put_raw_key(EXPIRED_KEY, OpenPGPKey)
-        km.put_raw_key(UNRELATED_KEY, OpenPGPKey)
+        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
+        km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
         key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
         self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
 
     def test_expired_fail_lower_level(self):
         km = self._key_manager()
-        km.put_raw_key(EXPIRED_KEY, OpenPGPKey,
+        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS,
                        validation=ValidationLevel.Third_Party_Endorsement)
         self.assertRaises(
             KeyNotValidUpgrade,
             km.put_raw_key,
             UNRELATED_KEY,
             OpenPGPKey,
+            ADDRESS,
             validation=ValidationLevel.Provider_Trust)
 
     def test_roll_back(self):
         km = self._key_manager()
-        km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey)
-        km.put_raw_key(EXPIRED_KEY, OpenPGPKey)
+        km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS)
+        km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
         key = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
         self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)
 
index cf5b4a8..245013e 100644 (file)
@@ -67,10 +67,6 @@ def can_upgrade(new_key, old_key):
     if old_key is None:
         return True
 
-    if new_key.address != old_key.address:
-        # XXX how do we map multiple IDs? (#6212)
-        return False
-
     # An update of the same key
     if new_key.fingerprint == old_key.fingerprint:
         return True