From 56b12a84635bd99ed8cca4db9baf5e4160e8d13b Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Fri, 18 Dec 2015 20:31:18 +0100 Subject: [feat] Use fingerprints instead of key ids - Resolves: #7500 --- keymanager/src/leap/keymanager/__init__.py | 6 +- keymanager/src/leap/keymanager/keys.py | 21 ++-- keymanager/src/leap/keymanager/migrator.py | 8 +- keymanager/src/leap/keymanager/openpgp.py | 43 ++++--- keymanager/src/leap/keymanager/tests/__init__.py | 1 - .../src/leap/keymanager/tests/test_keymanager.py | 5 - .../src/leap/keymanager/tests/test_openpgp.py | 123 ++++++++------------- keymanager/src/leap/keymanager/validation.py | 4 +- 8 files changed, 91 insertions(+), 120 deletions(-) (limited to 'keymanager/src/leap') diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index 8b3487f9..8a4efbe9 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -642,7 +642,7 @@ class KeyManager(object): else: signature = InvalidSignature( 'Failed to verify signature with key %s' % - (pubkey.key_id,)) + (pubkey.fingerprint,)) defer.returnValue((decrypted, signature)) dpriv = self.get_key(address, ktype, private=True) @@ -741,7 +741,7 @@ class KeyManager(object): else: raise InvalidSignature( 'Failed to verify signature with key %s' % - (pubkey.key_id,)) + (pubkey.fingerprint,)) d = self.get_key(address, ktype, private=False, fetch_remote=fetch_remote) @@ -804,7 +804,7 @@ class KeyManager(object): else: raise KeyNotValidUpgrade( "Key %s can not be upgraded by new key %s" - % (old_key.key_id, key.key_id)) + % (old_key.fingerprint, key.fingerprint)) d = _keys.get_key(address, private=key.private) d.addErrback(old_key_not_found) diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index a60c19d3..68e3fada 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -48,7 +48,6 @@ logger = logging.getLogger(__name__) KEY_VERSION_KEY = 'version' KEY_ADDRESS_KEY = 'address' KEY_TYPE_KEY = 'type' -KEY_ID_KEY = 'key_id' KEY_FINGERPRINT_KEY = 'fingerprint' KEY_DATA_KEY = 'key_data' KEY_PRIVATE_KEY = 'private' @@ -80,16 +79,16 @@ KEYMANAGER_DOC_VERSION = 1 # TAGS_PRIVATE_INDEX = 'by-tags-private' -TYPE_ID_PRIVATE_INDEX = 'by-type-id-private' +TYPE_FINGERPRINT_PRIVATE_INDEX = 'by-type-fingerprint-private' TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private' INDEXES = { TAGS_PRIVATE_INDEX: [ KEY_TAGS_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ], - TYPE_ID_PRIVATE_INDEX: [ + TYPE_FINGERPRINT_PRIVATE_INDEX: [ KEY_TYPE_KEY, - KEY_ID_KEY, + KEY_FINGERPRINT_KEY, 'bool(%s)' % KEY_PRIVATE_KEY, ], TYPE_ADDRESS_PRIVATE_INDEX: [ @@ -137,7 +136,8 @@ def build_key_from_dict(kClass, key, active=None): validation = ValidationLevels.get(active[KEY_VALIDATION_KEY]) except ValueError: logger.error("Not valid validation level (%s) for key %s", - (active[KEY_VALIDATION_KEY], active[KEY_ID_KEY])) + (active[KEY_VALIDATION_KEY], + active[KEY_FINGERPRINT_KEY])) last_audited_at = _to_datetime(active[KEY_LAST_AUDITED_AT_KEY]) encr_used = active[KEY_ENCR_USED_KEY] sign_used = active[KEY_SIGN_USED_KEY] @@ -147,7 +147,6 @@ def build_key_from_dict(kClass, key, active=None): return kClass( key[KEY_ADDRESS_KEY], - key_id=key[KEY_ID_KEY], fingerprint=key[KEY_FINGERPRINT_KEY], key_data=key[KEY_DATA_KEY], private=key[KEY_PRIVATE_KEY], @@ -189,13 +188,12 @@ class EncryptionKey(object): __metaclass__ = ABCMeta - def __init__(self, address, key_id="", fingerprint="", + def __init__(self, address, fingerprint="", key_data="", private=False, length=0, expiry_date=None, validation=ValidationLevels.Weak_Chain, last_audited_at=None, refreshed_at=None, encr_used=False, sign_used=False): # TODO: it should know its own active address self.address = address - self.key_id = key_id self.fingerprint = fingerprint self.key_data = key_data self.private = private @@ -221,7 +219,6 @@ class EncryptionKey(object): return json.dumps({ KEY_ADDRESS_KEY: self.address, KEY_TYPE_KEY: self.__class__.__name__, - KEY_ID_KEY: self.key_id, KEY_FINGERPRINT_KEY: self.fingerprint, KEY_DATA_KEY: self.key_data, KEY_PRIVATE_KEY: self.private, @@ -244,7 +241,7 @@ class EncryptionKey(object): return json.dumps({ KEY_ADDRESS_KEY: address, KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE, - KEY_ID_KEY: self.key_id, + KEY_FINGERPRINT_KEY: self.fingerprint, KEY_PRIVATE_KEY: self.private, KEY_VALIDATION_KEY: str(self.validation), KEY_LAST_AUDITED_AT_KEY: last_audited_at, @@ -260,7 +257,7 @@ class EncryptionKey(object): """ return u"<%s 0x%s (%s - %s)>" % ( self.__class__.__name__, - self.key_id, + self.fingerprint, self.address, "priv" if self.private else "publ") @@ -519,7 +516,7 @@ class EncryptionScheme(object): """ def log_active_doc(doc): logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY], - doc.content[KEY_ID_KEY])) + doc.content[KEY_FINGERPRINT_KEY])) def cmp_active(d1, d2): res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY], diff --git a/keymanager/src/leap/keymanager/migrator.py b/keymanager/src/leap/keymanager/migrator.py index b59647a2..11cf2439 100644 --- a/keymanager/src/leap/keymanager/migrator.py +++ b/keymanager/src/leap/keymanager/migrator.py @@ -33,7 +33,7 @@ from leap.keymanager.keys import ( KEYMANAGER_DOC_VERSION, KEY_VERSION_KEY, - KEY_ID_KEY, + KEY_FINGERPRINT_KEY, KEY_VALIDATION_KEY, KEY_LAST_AUDITED_AT_KEY, KEY_ENCR_USED_KEY, @@ -42,6 +42,8 @@ from leap.keymanager.keys import ( from leap.keymanager.validation import ValidationLevels +KEY_ID_KEY = 'key_id' + KeyDocs = namedtuple("KeyDocs", ['key', 'active']) @@ -132,6 +134,7 @@ class KeyDocumentsMigrator(object): last_audited = 0 encr_used = False sign_used = False + fingerprint = key.content[KEY_FINGERPRINT_KEY] if len(actives) == 1 and KEY_VERSION_KEY not in key.content: # we can preserve the validation of the key if there is only one # active address for the key @@ -146,10 +149,12 @@ class KeyDocumentsMigrator(object): continue active.content[KEY_VERSION_KEY] = KEYMANAGER_DOC_VERSION + active.content[KEY_FINGERPRINT_KEY] = fingerprint active.content[KEY_VALIDATION_KEY] = validation active.content[KEY_LAST_AUDITED_AT_KEY] = last_audited active.content[KEY_ENCR_USED_KEY] = encr_used active.content[KEY_SIGN_USED_KEY] = sign_used + del active.content[KEY_ID_KEY] d = self._soledad.put_doc(active) deferreds.append(d) return gatherResults(deferreds) @@ -159,6 +164,7 @@ class KeyDocumentsMigrator(object): return succeed(None) key.content[KEY_VERSION_KEY] = KEYMANAGER_DOC_VERSION + del key.content[KEY_ID_KEY] del key.content[KEY_VALIDATION_KEY] del key.content[KEY_LAST_AUDITED_AT_KEY] del key.content[KEY_ENCR_USED_KEY] diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 3c8ac1e1..0f162969 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -39,10 +39,10 @@ from leap.keymanager.keys import ( EncryptionScheme, is_address, build_key_from_dict, - TYPE_ID_PRIVATE_INDEX, + TYPE_FINGERPRINT_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, KEY_ADDRESS_KEY, - KEY_ID_KEY, + KEY_FINGERPRINT_KEY, KEYMANAGER_ACTIVE_TYPE, ) @@ -122,9 +122,9 @@ class TempGPGWrapper(object): # itself is enough to also have the public key in the keyring, # and we want to count the keys afterwards. - privids = map(lambda privkey: privkey.key_id, privkeys) + privfps = map(lambda privkey: privkey.fingerprint, privkeys) publkeys = filter( - lambda pubkey: pubkey.key_id not in privids, publkeys) + lambda pubkey: pubkey.fingerprint not in privfps, publkeys) listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) @@ -213,7 +213,7 @@ class OpenPGPKey(EncryptionKey): :rtype: list(str) """ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: - res = gpg.list_sigs(self.key_id) + res = gpg.list_sigs(self.fingerprint) for uid, sigs in res.sigs.iteritems(): if _parse_address(uid) in self.address: return sigs @@ -370,7 +370,7 @@ class OpenPGPScheme(EncryptionScheme): leap_assert( address in keydoc.content[KEY_ADDRESS_KEY], 'Wrong address in key %s. Expected %s, found %s.' - % (keydoc.content[KEY_ID_KEY], address, + % (keydoc.content[KEY_FINGERPRINT_KEY], address, keydoc.content[KEY_ADDRESS_KEY])) key = build_key_from_dict(OpenPGPKey, keydoc.content, activedoc.content) @@ -493,7 +493,7 @@ class OpenPGPScheme(EncryptionScheme): deferreds.append(d) return defer.gatherResults(deferreds) - dk = self._get_key_doc_from_keyid(key.key_id, key.private) + dk = self._get_key_doc_from_fingerprint(key.fingerprint, key.private) da = self._get_active_doc_from_address(address, key.private) d = defer.gatherResults([dk, da]) d.addCallback(merge_and_put) @@ -517,8 +517,8 @@ class OpenPGPScheme(EncryptionScheme): def get_key_from_active_doc(activedoc): if not activedoc: return (None, None) - key_id = activedoc.content[KEY_ID_KEY] - d = self._get_key_doc_from_keyid(key_id, private) + fingerprint = activedoc.content[KEY_FINGERPRINT_KEY] + d = self._get_key_doc_from_fingerprint(fingerprint, private) d.addCallback(delete_active_if_no_key, activedoc) return d @@ -573,17 +573,17 @@ class OpenPGPScheme(EncryptionScheme): def get_key_docs(_): return self._soledad.get_from_index( - TYPE_ID_PRIVATE_INDEX, + TYPE_FINGERPRINT_PRIVATE_INDEX, self.KEY_TYPE, - key.key_id, + key.fingerprint, '1' if key.private else '0') def delete_key(docs): if len(docs) == 0: raise errors.KeyNotFound(key) elif len(docs) > 1: - logger.warning("There is more than one key for key_id %s" - % key.key_id) + logger.warning("There is more than one key for fingerprint %s" + % key.fingerprint) has_deleted = False deferreds = [] @@ -597,9 +597,9 @@ class OpenPGPScheme(EncryptionScheme): return defer.gatherResults(deferreds) d = self._soledad.get_from_index( - TYPE_ID_PRIVATE_INDEX, + TYPE_FINGERPRINT_PRIVATE_INDEX, self.ACTIVE_TYPE, - key.key_id, + key.fingerprint, '1' if key.private else '0') d.addCallback(delete_docs) d.addCallback(get_key_docs) @@ -659,7 +659,7 @@ class OpenPGPScheme(EncryptionScheme): result = yield from_thread( gpg.encrypt, data, pubkey.fingerprint, - default_key=sign.key_id if sign else None, + default_key=sign.fingerprint if sign else None, passphrase=passphrase, symmetric=False, cipher_algo=cipher_algo) # Here we cannot assert for correctness of sig because the sig is @@ -761,7 +761,7 @@ class OpenPGPScheme(EncryptionScheme): # result.fingerprint - contains the fingerprint of the key used to # sign. with TempGPGWrapper(privkey, self._gpgbinary) as gpg: - result = gpg.sign(data, default_key=privkey.key_id, + result = gpg.sign(data, default_key=privkey.fingerprint, digest_algo=digest_algo, clearsign=clearsign, detach=detach, binary=binary) rfprint = privkey.fingerprint @@ -770,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme): if result.fingerprint is None: raise errors.SignFailed( 'Failed to sign with key %s: %s' % - (privkey['keyid'], result.stderr)) + (privkey['fingerprint'], result.stderr)) leap_assert( result.fingerprint == kfprint, 'Signature and private key fingerprints mismatch: ' @@ -823,11 +823,11 @@ class OpenPGPScheme(EncryptionScheme): d.addCallback(self._repair_and_get_doc, self._repair_active_docs) return d - def _get_key_doc_from_keyid(self, key_id, private): + def _get_key_doc_from_fingerprint(self, fingerprint, private): d = self._soledad.get_from_index( - TYPE_ID_PRIVATE_INDEX, + TYPE_FINGERPRINT_PRIVATE_INDEX, self.KEY_TYPE, - key_id, + fingerprint, '1' if private else '0') d.addCallback(self._repair_and_get_doc, self._repair_key_docs) return d @@ -863,7 +863,6 @@ def build_gpg_key(key_info, key_data, gpgbinary=None): return OpenPGPKey( address, gpgbinary=gpgbinary, - key_id=key_info['keyid'], fingerprint=key_info['fingerprint'], key_data=key_data, private=True if key_info['type'] == 'sec' else False, diff --git a/keymanager/src/leap/keymanager/tests/__init__.py b/keymanager/src/leap/keymanager/tests/__init__.py index d02f187f..4fbf63ee 100644 --- a/keymanager/src/leap/keymanager/tests/__init__.py +++ b/keymanager/src/leap/keymanager/tests/__init__.py @@ -97,7 +97,6 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): # key 24D18DDF: public key "Leap Test Key " -KEY_ID = "2F455E2824D18DDF" KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" PUBLIC_KEY = """ -----BEGIN PGP PUBLIC KEY BLOCK----- diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index e4e0d8b4..2fe9e4cd 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -77,7 +77,6 @@ class KeyManagerUtilTestCase(unittest.TestCase): def test_build_key_from_dict(self): kdict = { 'address': [ADDRESS], - 'key_id': KEY_FINGERPRINT[-16:], 'fingerprint': KEY_FINGERPRINT, 'key_data': PUBLIC_KEY, 'private': False, @@ -87,7 +86,6 @@ class KeyManagerUtilTestCase(unittest.TestCase): } adict = { 'address': ADDRESS, - 'key_id': KEY_FINGERPRINT[-16:], 'private': False, 'last_audited_at': 0, 'validation': str(ValidationLevels.Weak_Chain), @@ -98,9 +96,6 @@ class KeyManagerUtilTestCase(unittest.TestCase): self.assertEqual( kdict['address'], key.address, 'Wrong data in key.') - self.assertEqual( - kdict['key_id'], key.key_id, - 'Wrong data in key.') self.assertEqual( kdict['fingerprint'], key.fingerprint, 'Wrong data in key.') diff --git a/keymanager/src/leap/keymanager/tests/test_openpgp.py b/keymanager/src/leap/keymanager/tests/test_openpgp.py index 66415916..8ed049f6 100644 --- a/keymanager/src/leap/keymanager/tests/test_openpgp.py +++ b/keymanager/src/leap/keymanager/tests/test_openpgp.py @@ -30,7 +30,7 @@ from leap.keymanager import ( openpgp, ) from leap.keymanager.keys import ( - TYPE_ID_PRIVATE_INDEX, + TYPE_FINGERPRINT_PRIVATE_INDEX, TYPE_ADDRESS_PRIVATE_INDEX, ) from leap.keymanager.openpgp import OpenPGPKey @@ -40,7 +40,6 @@ from leap.keymanager.tests import ( ADDRESS_2, KEY_FINGERPRINT, PUBLIC_KEY, - KEY_ID, PUBLIC_KEY_2, PRIVATE_KEY, PRIVATE_KEY_2, @@ -256,39 +255,18 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @inlineCallbacks def test_self_repair_three_keys(self): + refreshed_keep = datetime(2007, 1, 1) + self._insert_key_docs([datetime(2005, 1, 1), + refreshed_keep, + datetime(2001, 1, 1)]) + delete_doc = self._mock_delete_doc() + pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=self.gpg_binary_path) - yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - - get_from_index = self._soledad.get_from_index - delete_doc = self._soledad.delete_doc - - def my_get_from_index(*args): - if (args[0] == TYPE_ID_PRIVATE_INDEX and - args[2] == KEY_ID): - k1 = OpenPGPKey(ADDRESS, key_id="1", - refreshed_at=datetime(2005, 1, 1)) - k2 = OpenPGPKey(ADDRESS, key_id="2", - refreshed_at=datetime(2007, 1, 1)) - k3 = OpenPGPKey(ADDRESS, key_id="3", - refreshed_at=datetime(2001, 1, 1)) - d1 = self._soledad.create_doc_from_json(k1.get_json()) - d2 = self._soledad.create_doc_from_json(k2.get_json()) - d3 = self._soledad.create_doc_from_json(k3.get_json()) - return gatherResults([d1, d2, d3]) - return get_from_index(*args) - - self._soledad.get_from_index = my_get_from_index - self._soledad.delete_doc = Mock(return_value=succeed(None)) - key = yield pgp.get_key(ADDRESS, private=False) - - try: - self.assertEqual(key.key_id, "2") - self.assertEqual(self._soledad.delete_doc.call_count, 2) - finally: - self._soledad.get_from_index = get_from_index - self._soledad.delete_doc = delete_doc + self.assertEqual(key.refreshed_at, refreshed_keep) + self.assertEqual(self.count, 2) + self._soledad.delete_doc = delete_doc @inlineCallbacks def test_self_repair_no_keys(self): @@ -300,8 +278,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): delete_doc = self._soledad.delete_doc def my_get_from_index(*args): - if (args[0] == TYPE_ID_PRIVATE_INDEX and - args[2] == KEY_ID): + if (args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX and + args[2] == KEY_FINGERPRINT): return succeed([]) return get_from_index(*args) @@ -319,39 +297,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @inlineCallbacks def test_self_repair_put_keys(self): + self._insert_key_docs([datetime(2005, 1, 1), + datetime(2007, 1, 1), + datetime(2001, 1, 1)]) + delete_doc = self._mock_delete_doc() + pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=self.gpg_binary_path) - - get_from_index = self._soledad.get_from_index - delete_doc = self._soledad.delete_doc - - def my_get_from_index(*args): - if (args[0] == TYPE_ID_PRIVATE_INDEX and - args[2] == KEY_ID): - k1 = OpenPGPKey(ADDRESS, key_id="1", - fingerprint=KEY_FINGERPRINT, - refreshed_at=datetime(2005, 1, 1)) - k2 = OpenPGPKey(ADDRESS, key_id="2", - fingerprint=KEY_FINGERPRINT, - refreshed_at=datetime(2007, 1, 1)) - k3 = OpenPGPKey(ADDRESS, key_id="3", - fingerprint=KEY_FINGERPRINT, - refreshed_at=datetime(2001, 1, 1)) - d1 = self._soledad.create_doc_from_json(k1.get_json()) - d2 = self._soledad.create_doc_from_json(k2.get_json()) - d3 = self._soledad.create_doc_from_json(k3.get_json()) - return gatherResults([d1, d2, d3]) - return get_from_index(*args) - - self._soledad.get_from_index = my_get_from_index - self._soledad.delete_doc = Mock(return_value=succeed(None)) - - try: - yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) - self.assertEqual(self._soledad.delete_doc.call_count, 2) - finally: - self._soledad.get_from_index = get_from_index - self._soledad.delete_doc = delete_doc + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + self.assertEqual(self.count, 2) + self._soledad.delete_doc = delete_doc @inlineCallbacks def test_self_repair_five_active_docs(self): @@ -364,29 +319,29 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def my_get_from_index(*args): if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and args[2] == ADDRESS): - k1 = OpenPGPKey(ADDRESS, key_id="1", + k1 = OpenPGPKey(ADDRESS, fingerprint="1", last_audited_at=datetime(2005, 1, 1)) - k2 = OpenPGPKey(ADDRESS, key_id="2", + k2 = OpenPGPKey(ADDRESS, fingerprint="2", last_audited_at=datetime(2007, 1, 1)) - k3 = OpenPGPKey(ADDRESS, key_id="3", + k3 = OpenPGPKey(ADDRESS, fingerprint="3", last_audited_at=datetime(2007, 1, 1), encr_used=True, sign_used=True) - k4 = OpenPGPKey(ADDRESS, key_id="4", + k4 = OpenPGPKey(ADDRESS, fingerprint="4", last_audited_at=datetime(2007, 1, 1), sign_used=True) - k5 = OpenPGPKey(ADDRESS, key_id="5", + k5 = OpenPGPKey(ADDRESS, fingerprint="5", last_audited_at=datetime(2007, 1, 1), encr_used=True) deferreds = [] - for k in [k1, k2, k3, k4, k5]: + for k in (k1, k2, k3, k4, k5): d = self._soledad.create_doc_from_json( k.get_active_json(ADDRESS)) deferreds.append(d) return gatherResults(deferreds) - elif args[0] == TYPE_ID_PRIVATE_INDEX: - key_id = args[2] - self.assertEqual(key_id, "3") - k = OpenPGPKey(ADDRESS, key_id="3") + elif args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX: + fingerprint = args[2] + self.assertEqual(fingerprint, "3") + k = OpenPGPKey(ADDRESS, fingerprint="3") return succeed( [self._soledad.create_doc_from_json(k.get_json())]) return get_from_index(*args) @@ -404,3 +359,21 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): def _assert_key_not_found(self, pgp, address, private=False): d = pgp.get_key(address, private=private) return self.assertFailure(d, KeyNotFound) + + @inlineCallbacks + def _insert_key_docs(self, refreshed_at): + for date in refreshed_at: + key = OpenPGPKey(ADDRESS, fingerprint=KEY_FINGERPRINT, + refreshed_at=date) + yield self._soledad.create_doc_from_json(key.get_json()) + yield self._soledad.create_doc_from_json(key.get_active_json()) + + def _mock_delete_doc(self): + delete_doc = self._soledad.delete_doc + self.count = 0 + + def my_delete_doc(*args): + self.count += 1 + return delete_doc(*args) + self._soledad.delete_doc = my_delete_doc + return delete_doc diff --git a/keymanager/src/leap/keymanager/validation.py b/keymanager/src/leap/keymanager/validation.py index 734cfce9..8cf96da1 100644 --- a/keymanager/src/leap/keymanager/validation.py +++ b/keymanager/src/leap/keymanager/validation.py @@ -118,7 +118,9 @@ def can_upgrade(new_key, old_key): return True # New key signed by the old key - if old_key.key_id in new_key.signatures: + # XXX: signatures are using key-ids instead of fingerprints + key_id = old_key.fingerprint[-16:] + if key_id in new_key.signatures: return True return False -- cgit v1.2.3