summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keymanager/changes/next-changelog.txt1
-rw-r--r--keymanager/src/leap/keymanager/openpgp.py35
-rw-r--r--keymanager/src/leap/keymanager/tests/test_keymanager.py22
-rw-r--r--keymanager/src/leap/keymanager/tests/test_openpgp.py67
4 files changed, 68 insertions, 57 deletions
diff --git a/keymanager/changes/next-changelog.txt b/keymanager/changes/next-changelog.txt
index 91ca8bdd..a2ab4fb1 100644
--- a/keymanager/changes/next-changelog.txt
+++ b/keymanager/changes/next-changelog.txt
@@ -12,6 +12,7 @@ Features
~~~~~~~~
- `#8031 <https://leap.se/code/issues/8031>`_: Remove support for multiple key types.
- `#8068 <https://leap.se/code/issues/8068>`_: make get_all_keys aware of active addresses.
+- `#6658 <https://leap.se/code/issues/6658>`_: Improve duplicated active documents fixup.
- `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234.
- New feature without related issue number.
diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py
index 8658e9c2..98ce4649 100644
--- a/keymanager/src/leap/keymanager/openpgp.py
+++ b/keymanager/src/leap/keymanager/openpgp.py
@@ -47,8 +47,8 @@ from leap.keymanager.documents import (
TYPE_ADDRESS_PRIVATE_INDEX,
KEY_UIDS_KEY,
KEY_FINGERPRINT_KEY,
+ KEY_PRIVATE_KEY,
KEY_REFRESHED_AT_KEY,
- KEY_LAST_AUDITED_AT_KEY,
KEY_SIGN_USED_KEY,
KEY_ENCR_USED_KEY,
KEY_ADDRESS_KEY,
@@ -268,8 +268,8 @@ class OpenPGPScheme(object):
'1' if private else '0')
keys = []
+ fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]
for active in active_docs:
- fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]
fp_keys = filter(lambda k: fp(k) == fp(active), key_docs)
if len(fp_keys) == 0:
@@ -770,6 +770,7 @@ class OpenPGPScheme(object):
return self._repair_docs(doclist, cmp_key, log_key_doc)
+ @defer.inlineCallbacks
def _repair_active_docs(self, doclist):
"""
If there is more than one active doc for an address try to self-repair
@@ -779,23 +780,39 @@ class OpenPGPScheme(object):
all the deletions are completed
:rtype: Deferred
"""
+ keys = {}
+ for doc in doclist:
+ fp = doc.content[KEY_FINGERPRINT_KEY]
+ private = doc.content[KEY_PRIVATE_KEY]
+ try:
+ key = yield self._get_key_doc_from_fingerprint(fp, private)
+ keys[fp] = key
+ except Exception:
+ pass
+
def log_active_doc(doc):
logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
doc.content[KEY_FINGERPRINT_KEY]))
def cmp_active(d1, d2):
- res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY],
- d2.content[KEY_LAST_AUDITED_AT_KEY])
- if res != 0:
- return res
-
+ # XXX: for private keys it will be nice to check which key is known
+ # by the nicknym server and keep this one. But this needs a
+ # refactor that might not be worth it.
used1 = (d1.content[KEY_SIGN_USED_KEY] +
d1.content[KEY_ENCR_USED_KEY])
used2 = (d2.content[KEY_SIGN_USED_KEY] +
d2.content[KEY_ENCR_USED_KEY])
- return cmp(used1, used2)
+ res = cmp(used1, used2)
+ if res != 0:
+ return res
+
+ key1 = keys[d1.content[KEY_FINGERPRINT_KEY]]
+ key2 = keys[d2.content[KEY_FINGERPRINT_KEY]]
+ return cmp(key1.content[KEY_REFRESHED_AT_KEY],
+ key2.content[KEY_REFRESHED_AT_KEY])
- return self._repair_docs(doclist, cmp_active, log_active_doc)
+ doc = yield self._repair_docs(doclist, cmp_active, log_active_doc)
+ defer.returnValue(doc)
def _repair_docs(self, doclist, cmp_func, log_func):
logger.error("BUG ---------------------------------------------------")
diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py
index 9deaff1e..c4d3fda3 100644
--- a/keymanager/src/leap/keymanager/tests/test_keymanager.py
+++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py
@@ -237,7 +237,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def verify_the_call(_):
used_kwargs = km._async_client_pinned.request.call_args[1]
- km._async_client_pinned.request.assert_called_once_with(expected_url, 'GET', **used_kwargs)
+ km._async_client_pinned.request.assert_called_once_with(
+ expected_url, 'GET', **used_kwargs)
d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
d.addCallback(verify_the_call)
@@ -245,11 +246,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_key_not_found_is_raised_if_key_search_responds_404(self):
"""
- Test if key search request comes back with a 404 response then KeyNotFound is raised, with corresponding error message.
+ Test if key search request comes back with a 404 response then
+ KeyNotFound is raised, with corresponding error message.
"""
km = self._key_manager(url=NICKSERVER_URI)
client.readBody = Mock(return_value=defer.succeed(None))
- km._async_client_pinned.request = Mock(return_value=defer.succeed(None))
+ km._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS
d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS)
@@ -259,7 +262,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
check_404_callback = used_kwargs['callback']
fake_response = Mock()
fake_response.code = NOT_FOUND
- with self.assertRaisesRegexp(errors.KeyNotFound, '404: %s key not found.' % INVALID_MAIL_ADDRESS):
+ with self.assertRaisesRegexp(
+ errors.KeyNotFound,
+ '404: %s key not found.' % INVALID_MAIL_ADDRESS):
check_404_callback(fake_response)
d.addCallback(check_key_not_found_is_raised_if_404)
@@ -267,11 +272,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_non_existing_key_from_nicknym_is_relayed(self):
"""
- Test if key search requests throws KeyNotFound, the same error is raised.
+ Test if key search requests throws KeyNotFound, the same error is
+ raised.
"""
km = self._key_manager(url=NICKSERVER_URI)
key_not_found_exception = errors.KeyNotFound('some message')
- km._async_client_pinned.request = Mock(side_effect=key_not_found_exception)
+ km._async_client_pinned.request = Mock(
+ side_effect=key_not_found_exception)
def assert_key_not_found_raised(error):
self.assertEqual(error.value, key_not_found_exception)
@@ -312,7 +319,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
client.readBody = Mock(return_value=defer.succeed(data))
# mock the fetcher so it returns the key for ADDRESS_2
- km._async_client_pinned.request = Mock(return_value=defer.succeed(None))
+ km._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
d_fail = km.get_key(address, fetch_remote=False)
diff --git a/keymanager/src/leap/keymanager/tests/test_openpgp.py b/keymanager/src/leap/keymanager/tests/test_openpgp.py
index 4c016764..17a6b790 100644
--- a/keymanager/src/leap/keymanager/tests/test_openpgp.py
+++ b/keymanager/src/leap/keymanager/tests/test_openpgp.py
@@ -309,52 +309,37 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(self.count, 2)
@inlineCallbacks
- def test_self_repair_five_active_docs(self):
+ def test_self_repair_six_active_docs(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.gpg_binary_path)
- get_from_index = self._soledad.get_from_index
- delete_doc = self._soledad.delete_doc
-
- def my_get_from_index(*args):
- if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and
- args[2] == ADDRESS):
- k1 = OpenPGPKey(ADDRESS, fingerprint="1",
- last_audited_at=datetime(2005, 1, 1))
- k2 = OpenPGPKey(ADDRESS, fingerprint="2",
- last_audited_at=datetime(2007, 1, 1))
- k3 = OpenPGPKey(ADDRESS, fingerprint="3",
- last_audited_at=datetime(2007, 1, 1),
- encr_used=True, sign_used=True)
- k4 = OpenPGPKey(ADDRESS, fingerprint="4",
- last_audited_at=datetime(2007, 1, 1),
- sign_used=True)
- k5 = OpenPGPKey(ADDRESS, fingerprint="5",
- last_audited_at=datetime(2007, 1, 1),
- encr_used=True)
- deferreds = []
- for k in (k1, k2, k3, k4, k5):
- d = self._soledad.create_doc_from_json(
- k.get_active_json())
- deferreds.append(d)
- return gatherResults(deferreds)
- elif args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX:
- fingerprint = args[2]
- self.assertEqual(fingerprint, "3")
- k = OpenPGPKey(ADDRESS, fingerprint="3")
- return succeed(
- [self._soledad.create_doc_from_json(k.get_json())])
- return get_from_index(*args)
+ k1 = OpenPGPKey(ADDRESS, fingerprint="1",
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, fingerprint="2",
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, fingerprint="3",
+ refreshed_at=datetime(2007, 1, 1),
+ encr_used=True, sign_used=True)
+ k4 = OpenPGPKey(ADDRESS, fingerprint="4",
+ refreshed_at=datetime(2007, 1, 1),
+ sign_used=True)
+ k5 = OpenPGPKey(ADDRESS, fingerprint="5",
+ refreshed_at=datetime(2007, 1, 1),
+ encr_used=True)
+ k6 = OpenPGPKey(ADDRESS, fingerprint="6",
+ refreshed_at=datetime(2006, 1, 1),
+ encr_used=True, sign_used=True)
+ keys = (k1, k2, k3, k4, k5, k6)
+ for key in keys:
+ yield self._soledad.create_doc_from_json(key.get_json())
+ yield self._soledad.create_doc_from_json(key.get_active_json())
- self._soledad.get_from_index = my_get_from_index
- self._soledad.delete_doc = Mock(return_value=succeed(None))
+ delete_doc = self._mock_delete_doc()
- try:
- yield pgp.get_key(ADDRESS, private=False)
- self.assertEqual(self._soledad.delete_doc.call_count, 4)
- finally:
- self._soledad.get_from_index = get_from_index
- self._soledad.delete_doc = delete_doc
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self._soledad.delete_doc = delete_doc
+ self.assertEqual(self.count, 5)
+ self.assertEqual(key.fingerprint, "3")
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)