summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-12-18 20:31:18 +0100
committerRuben Pollan <meskio@sindominio.net>2016-02-25 11:35:21 -0600
commit3d544f4a85930c5d1611d193500744fc97f0aee1 (patch)
treea86343d0ed93ccf20e36ce203d9499cb322382a3
parent81232da09286f7f1812f6d3d182cd57665feaa1f (diff)
[feat] Use fingerprints instead of key ids
- Resolves: #7500
-rw-r--r--changes/next-changelog.txt3
-rw-r--r--src/leap/keymanager/__init__.py6
-rw-r--r--src/leap/keymanager/keys.py21
-rw-r--r--src/leap/keymanager/migrator.py8
-rw-r--r--src/leap/keymanager/openpgp.py43
-rw-r--r--src/leap/keymanager/tests/__init__.py1
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py5
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py123
-rw-r--r--src/leap/keymanager/validation.py4
9 files changed, 94 insertions, 120 deletions
diff --git a/changes/next-changelog.txt b/changes/next-changelog.txt
index de837daa..a53a5d23 100644
--- a/changes/next-changelog.txt
+++ b/changes/next-changelog.txt
@@ -12,6 +12,7 @@ Features
~~~~~~~~
- `#7485 <https://leap.se/code/issues/7485>`_: Move validation, usage and audited date to the active document.
- `#7713 <https://leap.se/code/issues/7713>`_: Update soledad documents by adding versioning field.
+- `#7500 <https://leap.se/code/issues/7500>`_: Use fingerprints instead of key ids.
- `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234.
- New feature without related issue number.
@@ -23,6 +24,8 @@ Bugfixes
Misc
~~~~
+- This version includes changes in the Soledad Documents and minor modifications to the API.
+
- `#1236 <https://leap.se/code/issues/1236>`_: Description of the new feature corresponding with issue #1236.
- Some change without issue number.
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index 8b3487f9..8a4efbe9 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -642,7 +642,7 @@ class KeyManager(object):
else:
signature = InvalidSignature(
'Failed to verify signature with key %s' %
- (pubkey.key_id,))
+ (pubkey.fingerprint,))
defer.returnValue((decrypted, signature))
dpriv = self.get_key(address, ktype, private=True)
@@ -741,7 +741,7 @@ class KeyManager(object):
else:
raise InvalidSignature(
'Failed to verify signature with key %s' %
- (pubkey.key_id,))
+ (pubkey.fingerprint,))
d = self.get_key(address, ktype, private=False,
fetch_remote=fetch_remote)
@@ -804,7 +804,7 @@ class KeyManager(object):
else:
raise KeyNotValidUpgrade(
"Key %s can not be upgraded by new key %s"
- % (old_key.key_id, key.key_id))
+ % (old_key.fingerprint, key.fingerprint))
d = _keys.get_key(address, private=key.private)
d.addErrback(old_key_not_found)
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index a60c19d3..68e3fada 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -48,7 +48,6 @@ logger = logging.getLogger(__name__)
KEY_VERSION_KEY = 'version'
KEY_ADDRESS_KEY = 'address'
KEY_TYPE_KEY = 'type'
-KEY_ID_KEY = 'key_id'
KEY_FINGERPRINT_KEY = 'fingerprint'
KEY_DATA_KEY = 'key_data'
KEY_PRIVATE_KEY = 'private'
@@ -80,16 +79,16 @@ KEYMANAGER_DOC_VERSION = 1
#
TAGS_PRIVATE_INDEX = 'by-tags-private'
-TYPE_ID_PRIVATE_INDEX = 'by-type-id-private'
+TYPE_FINGERPRINT_PRIVATE_INDEX = 'by-type-fingerprint-private'
TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private'
INDEXES = {
TAGS_PRIVATE_INDEX: [
KEY_TAGS_KEY,
'bool(%s)' % KEY_PRIVATE_KEY,
],
- TYPE_ID_PRIVATE_INDEX: [
+ TYPE_FINGERPRINT_PRIVATE_INDEX: [
KEY_TYPE_KEY,
- KEY_ID_KEY,
+ KEY_FINGERPRINT_KEY,
'bool(%s)' % KEY_PRIVATE_KEY,
],
TYPE_ADDRESS_PRIVATE_INDEX: [
@@ -137,7 +136,8 @@ def build_key_from_dict(kClass, key, active=None):
validation = ValidationLevels.get(active[KEY_VALIDATION_KEY])
except ValueError:
logger.error("Not valid validation level (%s) for key %s",
- (active[KEY_VALIDATION_KEY], active[KEY_ID_KEY]))
+ (active[KEY_VALIDATION_KEY],
+ active[KEY_FINGERPRINT_KEY]))
last_audited_at = _to_datetime(active[KEY_LAST_AUDITED_AT_KEY])
encr_used = active[KEY_ENCR_USED_KEY]
sign_used = active[KEY_SIGN_USED_KEY]
@@ -147,7 +147,6 @@ def build_key_from_dict(kClass, key, active=None):
return kClass(
key[KEY_ADDRESS_KEY],
- key_id=key[KEY_ID_KEY],
fingerprint=key[KEY_FINGERPRINT_KEY],
key_data=key[KEY_DATA_KEY],
private=key[KEY_PRIVATE_KEY],
@@ -189,13 +188,12 @@ class EncryptionKey(object):
__metaclass__ = ABCMeta
- def __init__(self, address, key_id="", fingerprint="",
+ def __init__(self, address, fingerprint="",
key_data="", private=False, length=0, expiry_date=None,
validation=ValidationLevels.Weak_Chain, last_audited_at=None,
refreshed_at=None, encr_used=False, sign_used=False):
# TODO: it should know its own active address
self.address = address
- self.key_id = key_id
self.fingerprint = fingerprint
self.key_data = key_data
self.private = private
@@ -221,7 +219,6 @@ class EncryptionKey(object):
return json.dumps({
KEY_ADDRESS_KEY: self.address,
KEY_TYPE_KEY: self.__class__.__name__,
- KEY_ID_KEY: self.key_id,
KEY_FINGERPRINT_KEY: self.fingerprint,
KEY_DATA_KEY: self.key_data,
KEY_PRIVATE_KEY: self.private,
@@ -244,7 +241,7 @@ class EncryptionKey(object):
return json.dumps({
KEY_ADDRESS_KEY: address,
KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE,
- KEY_ID_KEY: self.key_id,
+ KEY_FINGERPRINT_KEY: self.fingerprint,
KEY_PRIVATE_KEY: self.private,
KEY_VALIDATION_KEY: str(self.validation),
KEY_LAST_AUDITED_AT_KEY: last_audited_at,
@@ -260,7 +257,7 @@ class EncryptionKey(object):
"""
return u"<%s 0x%s (%s - %s)>" % (
self.__class__.__name__,
- self.key_id,
+ self.fingerprint,
self.address,
"priv" if self.private else "publ")
@@ -519,7 +516,7 @@ class EncryptionScheme(object):
"""
def log_active_doc(doc):
logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
- doc.content[KEY_ID_KEY]))
+ doc.content[KEY_FINGERPRINT_KEY]))
def cmp_active(d1, d2):
res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY],
diff --git a/src/leap/keymanager/migrator.py b/src/leap/keymanager/migrator.py
index b59647a2..11cf2439 100644
--- a/src/leap/keymanager/migrator.py
+++ b/src/leap/keymanager/migrator.py
@@ -33,7 +33,7 @@ from leap.keymanager.keys import (
KEYMANAGER_DOC_VERSION,
KEY_VERSION_KEY,
- KEY_ID_KEY,
+ KEY_FINGERPRINT_KEY,
KEY_VALIDATION_KEY,
KEY_LAST_AUDITED_AT_KEY,
KEY_ENCR_USED_KEY,
@@ -42,6 +42,8 @@ from leap.keymanager.keys import (
from leap.keymanager.validation import ValidationLevels
+KEY_ID_KEY = 'key_id'
+
KeyDocs = namedtuple("KeyDocs", ['key', 'active'])
@@ -132,6 +134,7 @@ class KeyDocumentsMigrator(object):
last_audited = 0
encr_used = False
sign_used = False
+ fingerprint = key.content[KEY_FINGERPRINT_KEY]
if len(actives) == 1 and KEY_VERSION_KEY not in key.content:
# we can preserve the validation of the key if there is only one
# active address for the key
@@ -146,10 +149,12 @@ class KeyDocumentsMigrator(object):
continue
active.content[KEY_VERSION_KEY] = KEYMANAGER_DOC_VERSION
+ active.content[KEY_FINGERPRINT_KEY] = fingerprint
active.content[KEY_VALIDATION_KEY] = validation
active.content[KEY_LAST_AUDITED_AT_KEY] = last_audited
active.content[KEY_ENCR_USED_KEY] = encr_used
active.content[KEY_SIGN_USED_KEY] = sign_used
+ del active.content[KEY_ID_KEY]
d = self._soledad.put_doc(active)
deferreds.append(d)
return gatherResults(deferreds)
@@ -159,6 +164,7 @@ class KeyDocumentsMigrator(object):
return succeed(None)
key.content[KEY_VERSION_KEY] = KEYMANAGER_DOC_VERSION
+ del key.content[KEY_ID_KEY]
del key.content[KEY_VALIDATION_KEY]
del key.content[KEY_LAST_AUDITED_AT_KEY]
del key.content[KEY_ENCR_USED_KEY]
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 3c8ac1e1..0f162969 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -39,10 +39,10 @@ from leap.keymanager.keys import (
EncryptionScheme,
is_address,
build_key_from_dict,
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
KEY_ADDRESS_KEY,
- KEY_ID_KEY,
+ KEY_FINGERPRINT_KEY,
KEYMANAGER_ACTIVE_TYPE,
)
@@ -122,9 +122,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privids = map(lambda privkey: privkey.key_id, privkeys)
+ privfps = map(lambda privkey: privkey.fingerprint, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.key_id not in privids, publkeys)
+ lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -213,7 +213,7 @@ class OpenPGPKey(EncryptionKey):
:rtype: list(str)
"""
with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
- res = gpg.list_sigs(self.key_id)
+ res = gpg.list_sigs(self.fingerprint)
for uid, sigs in res.sigs.iteritems():
if _parse_address(uid) in self.address:
return sigs
@@ -370,7 +370,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(
address in keydoc.content[KEY_ADDRESS_KEY],
'Wrong address in key %s. Expected %s, found %s.'
- % (keydoc.content[KEY_ID_KEY], address,
+ % (keydoc.content[KEY_FINGERPRINT_KEY], address,
keydoc.content[KEY_ADDRESS_KEY]))
key = build_key_from_dict(OpenPGPKey, keydoc.content,
activedoc.content)
@@ -493,7 +493,7 @@ class OpenPGPScheme(EncryptionScheme):
deferreds.append(d)
return defer.gatherResults(deferreds)
- dk = self._get_key_doc_from_keyid(key.key_id, key.private)
+ dk = self._get_key_doc_from_fingerprint(key.fingerprint, key.private)
da = self._get_active_doc_from_address(address, key.private)
d = defer.gatherResults([dk, da])
d.addCallback(merge_and_put)
@@ -517,8 +517,8 @@ class OpenPGPScheme(EncryptionScheme):
def get_key_from_active_doc(activedoc):
if not activedoc:
return (None, None)
- key_id = activedoc.content[KEY_ID_KEY]
- d = self._get_key_doc_from_keyid(key_id, private)
+ fingerprint = activedoc.content[KEY_FINGERPRINT_KEY]
+ d = self._get_key_doc_from_fingerprint(fingerprint, private)
d.addCallback(delete_active_if_no_key, activedoc)
return d
@@ -573,17 +573,17 @@ class OpenPGPScheme(EncryptionScheme):
def get_key_docs(_):
return self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
self.KEY_TYPE,
- key.key_id,
+ key.fingerprint,
'1' if key.private else '0')
def delete_key(docs):
if len(docs) == 0:
raise errors.KeyNotFound(key)
elif len(docs) > 1:
- logger.warning("There is more than one key for key_id %s"
- % key.key_id)
+ logger.warning("There is more than one key for fingerprint %s"
+ % key.fingerprint)
has_deleted = False
deferreds = []
@@ -597,9 +597,9 @@ class OpenPGPScheme(EncryptionScheme):
return defer.gatherResults(deferreds)
d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
self.ACTIVE_TYPE,
- key.key_id,
+ key.fingerprint,
'1' if key.private else '0')
d.addCallback(delete_docs)
d.addCallback(get_key_docs)
@@ -659,7 +659,7 @@ class OpenPGPScheme(EncryptionScheme):
result = yield from_thread(
gpg.encrypt,
data, pubkey.fingerprint,
- default_key=sign.key_id if sign else None,
+ default_key=sign.fingerprint if sign else None,
passphrase=passphrase, symmetric=False,
cipher_algo=cipher_algo)
# Here we cannot assert for correctness of sig because the sig is
@@ -761,7 +761,7 @@ class OpenPGPScheme(EncryptionScheme):
# result.fingerprint - contains the fingerprint of the key used to
# sign.
with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
- result = gpg.sign(data, default_key=privkey.key_id,
+ result = gpg.sign(data, default_key=privkey.fingerprint,
digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
rfprint = privkey.fingerprint
@@ -770,7 +770,7 @@ class OpenPGPScheme(EncryptionScheme):
if result.fingerprint is None:
raise errors.SignFailed(
'Failed to sign with key %s: %s' %
- (privkey['keyid'], result.stderr))
+ (privkey['fingerprint'], result.stderr))
leap_assert(
result.fingerprint == kfprint,
'Signature and private key fingerprints mismatch: '
@@ -823,11 +823,11 @@ class OpenPGPScheme(EncryptionScheme):
d.addCallback(self._repair_and_get_doc, self._repair_active_docs)
return d
- def _get_key_doc_from_keyid(self, key_id, private):
+ def _get_key_doc_from_fingerprint(self, fingerprint, private):
d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
self.KEY_TYPE,
- key_id,
+ fingerprint,
'1' if private else '0')
d.addCallback(self._repair_and_get_doc, self._repair_key_docs)
return d
@@ -863,7 +863,6 @@ def build_gpg_key(key_info, key_data, gpgbinary=None):
return OpenPGPKey(
address,
gpgbinary=gpgbinary,
- key_id=key_info['keyid'],
fingerprint=key_info['fingerprint'],
key_data=key_data,
private=True if key_info['type'] == 'sec' else False,
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index d02f187f..4fbf63ee 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -97,7 +97,6 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
-KEY_ID = "2F455E2824D18DDF"
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
PUBLIC_KEY = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index e4e0d8b4..2fe9e4cd 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -77,7 +77,6 @@ class KeyManagerUtilTestCase(unittest.TestCase):
def test_build_key_from_dict(self):
kdict = {
'address': [ADDRESS],
- 'key_id': KEY_FINGERPRINT[-16:],
'fingerprint': KEY_FINGERPRINT,
'key_data': PUBLIC_KEY,
'private': False,
@@ -87,7 +86,6 @@ class KeyManagerUtilTestCase(unittest.TestCase):
}
adict = {
'address': ADDRESS,
- 'key_id': KEY_FINGERPRINT[-16:],
'private': False,
'last_audited_at': 0,
'validation': str(ValidationLevels.Weak_Chain),
@@ -99,9 +97,6 @@ class KeyManagerUtilTestCase(unittest.TestCase):
kdict['address'], key.address,
'Wrong data in key.')
self.assertEqual(
- kdict['key_id'], key.key_id,
- 'Wrong data in key.')
- self.assertEqual(
kdict['fingerprint'], key.fingerprint,
'Wrong data in key.')
self.assertEqual(
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
index 66415916..8ed049f6 100644
--- a/src/leap/keymanager/tests/test_openpgp.py
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -30,7 +30,7 @@ from leap.keymanager import (
openpgp,
)
from leap.keymanager.keys import (
- TYPE_ID_PRIVATE_INDEX,
+ TYPE_FINGERPRINT_PRIVATE_INDEX,
TYPE_ADDRESS_PRIVATE_INDEX,
)
from leap.keymanager.openpgp import OpenPGPKey
@@ -40,7 +40,6 @@ from leap.keymanager.tests import (
ADDRESS_2,
KEY_FINGERPRINT,
PUBLIC_KEY,
- KEY_ID,
PUBLIC_KEY_2,
PRIVATE_KEY,
PRIVATE_KEY_2,
@@ -256,39 +255,18 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@inlineCallbacks
def test_self_repair_three_keys(self):
+ refreshed_keep = datetime(2007, 1, 1)
+ self._insert_key_docs([datetime(2005, 1, 1),
+ refreshed_keep,
+ datetime(2001, 1, 1)])
+ delete_doc = self._mock_delete_doc()
+
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.gpg_binary_path)
- yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
-
- get_from_index = self._soledad.get_from_index
- delete_doc = self._soledad.delete_doc
-
- def my_get_from_index(*args):
- if (args[0] == TYPE_ID_PRIVATE_INDEX and
- args[2] == KEY_ID):
- k1 = OpenPGPKey(ADDRESS, key_id="1",
- refreshed_at=datetime(2005, 1, 1))
- k2 = OpenPGPKey(ADDRESS, key_id="2",
- refreshed_at=datetime(2007, 1, 1))
- k3 = OpenPGPKey(ADDRESS, key_id="3",
- refreshed_at=datetime(2001, 1, 1))
- d1 = self._soledad.create_doc_from_json(k1.get_json())
- d2 = self._soledad.create_doc_from_json(k2.get_json())
- d3 = self._soledad.create_doc_from_json(k3.get_json())
- return gatherResults([d1, d2, d3])
- return get_from_index(*args)
-
- self._soledad.get_from_index = my_get_from_index
- self._soledad.delete_doc = Mock(return_value=succeed(None))
-
key = yield pgp.get_key(ADDRESS, private=False)
-
- try:
- self.assertEqual(key.key_id, "2")
- self.assertEqual(self._soledad.delete_doc.call_count, 2)
- finally:
- self._soledad.get_from_index = get_from_index
- self._soledad.delete_doc = delete_doc
+ self.assertEqual(key.refreshed_at, refreshed_keep)
+ self.assertEqual(self.count, 2)
+ self._soledad.delete_doc = delete_doc
@inlineCallbacks
def test_self_repair_no_keys(self):
@@ -300,8 +278,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
delete_doc = self._soledad.delete_doc
def my_get_from_index(*args):
- if (args[0] == TYPE_ID_PRIVATE_INDEX and
- args[2] == KEY_ID):
+ if (args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX and
+ args[2] == KEY_FINGERPRINT):
return succeed([])
return get_from_index(*args)
@@ -319,39 +297,16 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@inlineCallbacks
def test_self_repair_put_keys(self):
+ self._insert_key_docs([datetime(2005, 1, 1),
+ datetime(2007, 1, 1),
+ datetime(2001, 1, 1)])
+ delete_doc = self._mock_delete_doc()
+
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.gpg_binary_path)
-
- get_from_index = self._soledad.get_from_index
- delete_doc = self._soledad.delete_doc
-
- def my_get_from_index(*args):
- if (args[0] == TYPE_ID_PRIVATE_INDEX and
- args[2] == KEY_ID):
- k1 = OpenPGPKey(ADDRESS, key_id="1",
- fingerprint=KEY_FINGERPRINT,
- refreshed_at=datetime(2005, 1, 1))
- k2 = OpenPGPKey(ADDRESS, key_id="2",
- fingerprint=KEY_FINGERPRINT,
- refreshed_at=datetime(2007, 1, 1))
- k3 = OpenPGPKey(ADDRESS, key_id="3",
- fingerprint=KEY_FINGERPRINT,
- refreshed_at=datetime(2001, 1, 1))
- d1 = self._soledad.create_doc_from_json(k1.get_json())
- d2 = self._soledad.create_doc_from_json(k2.get_json())
- d3 = self._soledad.create_doc_from_json(k3.get_json())
- return gatherResults([d1, d2, d3])
- return get_from_index(*args)
-
- self._soledad.get_from_index = my_get_from_index
- self._soledad.delete_doc = Mock(return_value=succeed(None))
-
- try:
- yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
- self.assertEqual(self._soledad.delete_doc.call_count, 2)
- finally:
- self._soledad.get_from_index = get_from_index
- self._soledad.delete_doc = delete_doc
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ self.assertEqual(self.count, 2)
+ self._soledad.delete_doc = delete_doc
@inlineCallbacks
def test_self_repair_five_active_docs(self):
@@ -364,29 +319,29 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def my_get_from_index(*args):
if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and
args[2] == ADDRESS):
- k1 = OpenPGPKey(ADDRESS, key_id="1",
+ k1 = OpenPGPKey(ADDRESS, fingerprint="1",
last_audited_at=datetime(2005, 1, 1))
- k2 = OpenPGPKey(ADDRESS, key_id="2",
+ k2 = OpenPGPKey(ADDRESS, fingerprint="2",
last_audited_at=datetime(2007, 1, 1))
- k3 = OpenPGPKey(ADDRESS, key_id="3",
+ k3 = OpenPGPKey(ADDRESS, fingerprint="3",
last_audited_at=datetime(2007, 1, 1),
encr_used=True, sign_used=True)
- k4 = OpenPGPKey(ADDRESS, key_id="4",
+ k4 = OpenPGPKey(ADDRESS, fingerprint="4",
last_audited_at=datetime(2007, 1, 1),
sign_used=True)
- k5 = OpenPGPKey(ADDRESS, key_id="5",
+ k5 = OpenPGPKey(ADDRESS, fingerprint="5",
last_audited_at=datetime(2007, 1, 1),
encr_used=True)
deferreds = []
- for k in [k1, k2, k3, k4, k5]:
+ for k in (k1, k2, k3, k4, k5):
d = self._soledad.create_doc_from_json(
k.get_active_json(ADDRESS))
deferreds.append(d)
return gatherResults(deferreds)
- elif args[0] == TYPE_ID_PRIVATE_INDEX:
- key_id = args[2]
- self.assertEqual(key_id, "3")
- k = OpenPGPKey(ADDRESS, key_id="3")
+ elif args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX:
+ fingerprint = args[2]
+ self.assertEqual(fingerprint, "3")
+ k = OpenPGPKey(ADDRESS, fingerprint="3")
return succeed(
[self._soledad.create_doc_from_json(k.get_json())])
return get_from_index(*args)
@@ -404,3 +359,21 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)
return self.assertFailure(d, KeyNotFound)
+
+ @inlineCallbacks
+ def _insert_key_docs(self, refreshed_at):
+ for date in refreshed_at:
+ key = OpenPGPKey(ADDRESS, fingerprint=KEY_FINGERPRINT,
+ refreshed_at=date)
+ yield self._soledad.create_doc_from_json(key.get_json())
+ yield self._soledad.create_doc_from_json(key.get_active_json())
+
+ def _mock_delete_doc(self):
+ delete_doc = self._soledad.delete_doc
+ self.count = 0
+
+ def my_delete_doc(*args):
+ self.count += 1
+ return delete_doc(*args)
+ self._soledad.delete_doc = my_delete_doc
+ return delete_doc
diff --git a/src/leap/keymanager/validation.py b/src/leap/keymanager/validation.py
index 734cfce9..8cf96da1 100644
--- a/src/leap/keymanager/validation.py
+++ b/src/leap/keymanager/validation.py
@@ -118,7 +118,9 @@ def can_upgrade(new_key, old_key):
return True
# New key signed by the old key
- if old_key.key_id in new_key.signatures:
+ # XXX: signatures are using key-ids instead of fingerprints
+ key_id = old_key.fingerprint[-16:]
+ if key_id in new_key.signatures:
return True
return False