summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2016-06-14 19:06:45 +0200
committerRuben Pollan <meskio@sindominio.net>2016-06-17 21:25:07 +0200
commitb64591d39dbb03a76c5c0052ebb86d9fdb9915c7 (patch)
tree6d1ac36cd278e25ee66f2696ab5ef70979710480
parent2d69975880214e59d493b5d30655295f77a5cb0c (diff)
[feat] improve duplicated active documents fixup
Mostly for private keys we want to keep the latest one in the keyring. It would be desirable to check with one is set in nicknym, but this will need a big refactor that I'm not sure if it's worth it just for that. - Related: #6658, #8059
-rw-r--r--changes/next-changelog.txt1
-rw-r--r--src/leap/keymanager/openpgp.py35
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py22
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py67
4 files changed, 68 insertions, 57 deletions
diff --git a/changes/next-changelog.txt b/changes/next-changelog.txt
index 91ca8bd..a2ab4fb 100644
--- a/changes/next-changelog.txt
+++ b/changes/next-changelog.txt
@@ -12,6 +12,7 @@ Features
~~~~~~~~
- `#8031 <https://leap.se/code/issues/8031>`_: Remove support for multiple key types.
- `#8068 <https://leap.se/code/issues/8068>`_: make get_all_keys aware of active addresses.
+- `#6658 <https://leap.se/code/issues/6658>`_: Improve duplicated active documents fixup.
- `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234.
- New feature without related issue number.
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 8658e9c..98ce464 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -47,8 +47,8 @@ from leap.keymanager.documents import (
TYPE_ADDRESS_PRIVATE_INDEX,
KEY_UIDS_KEY,
KEY_FINGERPRINT_KEY,
+ KEY_PRIVATE_KEY,
KEY_REFRESHED_AT_KEY,
- KEY_LAST_AUDITED_AT_KEY,
KEY_SIGN_USED_KEY,
KEY_ENCR_USED_KEY,
KEY_ADDRESS_KEY,
@@ -268,8 +268,8 @@ class OpenPGPScheme(object):
'1' if private else '0')
keys = []
+ fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]
for active in active_docs:
- fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]
fp_keys = filter(lambda k: fp(k) == fp(active), key_docs)
if len(fp_keys) == 0:
@@ -770,6 +770,7 @@ class OpenPGPScheme(object):
return self._repair_docs(doclist, cmp_key, log_key_doc)
+ @defer.inlineCallbacks
def _repair_active_docs(self, doclist):
"""
If there is more than one active doc for an address try to self-repair
@@ -779,23 +780,39 @@ class OpenPGPScheme(object):
all the deletions are completed
:rtype: Deferred
"""
+ keys = {}
+ for doc in doclist:
+ fp = doc.content[KEY_FINGERPRINT_KEY]
+ private = doc.content[KEY_PRIVATE_KEY]
+ try:
+ key = yield self._get_key_doc_from_fingerprint(fp, private)
+ keys[fp] = key
+ except Exception:
+ pass
+
def log_active_doc(doc):
logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
doc.content[KEY_FINGERPRINT_KEY]))
def cmp_active(d1, d2):
- res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY],
- d2.content[KEY_LAST_AUDITED_AT_KEY])
- if res != 0:
- return res
-
+ # XXX: for private keys it will be nice to check which key is known
+ # by the nicknym server and keep this one. But this needs a
+ # refactor that might not be worth it.
used1 = (d1.content[KEY_SIGN_USED_KEY] +
d1.content[KEY_ENCR_USED_KEY])
used2 = (d2.content[KEY_SIGN_USED_KEY] +
d2.content[KEY_ENCR_USED_KEY])
- return cmp(used1, used2)
+ res = cmp(used1, used2)
+ if res != 0:
+ return res
+
+ key1 = keys[d1.content[KEY_FINGERPRINT_KEY]]
+ key2 = keys[d2.content[KEY_FINGERPRINT_KEY]]
+ return cmp(key1.content[KEY_REFRESHED_AT_KEY],
+ key2.content[KEY_REFRESHED_AT_KEY])
- return self._repair_docs(doclist, cmp_active, log_active_doc)
+ doc = yield self._repair_docs(doclist, cmp_active, log_active_doc)
+ defer.returnValue(doc)
def _repair_docs(self, doclist, cmp_func, log_func):
logger.error("BUG ---------------------------------------------------")
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 9deaff1..c4d3fda 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -237,7 +237,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def verify_the_call(_):
used_kwargs = km._async_client_pinned.request.call_args[1]
- km._async_client_pinned.request.assert_called_once_with(expected_url, 'GET', **used_kwargs)
+ km._async_client_pinned.request.assert_called_once_with(
+ expected_url, 'GET', **used_kwargs)
d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
d.addCallback(verify_the_call)
@@ -245,11 +246,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_key_not_found_is_raised_if_key_search_responds_404(self):
"""
- Test if key search request comes back with a 404 response then KeyNotFound is raised, with corresponding error message.
+ Test if key search request comes back with a 404 response then
+ KeyNotFound is raised, with corresponding error message.
"""
km = self._key_manager(url=NICKSERVER_URI)
client.readBody = Mock(return_value=defer.succeed(None))
- km._async_client_pinned.request = Mock(return_value=defer.succeed(None))
+ km._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS
d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS)
@@ -259,7 +262,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
check_404_callback = used_kwargs['callback']
fake_response = Mock()
fake_response.code = NOT_FOUND
- with self.assertRaisesRegexp(errors.KeyNotFound, '404: %s key not found.' % INVALID_MAIL_ADDRESS):
+ with self.assertRaisesRegexp(
+ errors.KeyNotFound,
+ '404: %s key not found.' % INVALID_MAIL_ADDRESS):
check_404_callback(fake_response)
d.addCallback(check_key_not_found_is_raised_if_404)
@@ -267,11 +272,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
def test_non_existing_key_from_nicknym_is_relayed(self):
"""
- Test if key search requests throws KeyNotFound, the same error is raised.
+ Test if key search requests throws KeyNotFound, the same error is
+ raised.
"""
km = self._key_manager(url=NICKSERVER_URI)
key_not_found_exception = errors.KeyNotFound('some message')
- km._async_client_pinned.request = Mock(side_effect=key_not_found_exception)
+ km._async_client_pinned.request = Mock(
+ side_effect=key_not_found_exception)
def assert_key_not_found_raised(error):
self.assertEqual(error.value, key_not_found_exception)
@@ -312,7 +319,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
client.readBody = Mock(return_value=defer.succeed(data))
# mock the fetcher so it returns the key for ADDRESS_2
- km._async_client_pinned.request = Mock(return_value=defer.succeed(None))
+ km._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
d_fail = km.get_key(address, fetch_remote=False)
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
index 4c01676..17a6b79 100644
--- a/src/leap/keymanager/tests/test_openpgp.py
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -309,52 +309,37 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(self.count, 2)
@inlineCallbacks
- def test_self_repair_five_active_docs(self):
+ def test_self_repair_six_active_docs(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.gpg_binary_path)
- get_from_index = self._soledad.get_from_index
- delete_doc = self._soledad.delete_doc
-
- def my_get_from_index(*args):
- if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and
- args[2] == ADDRESS):
- k1 = OpenPGPKey(ADDRESS, fingerprint="1",
- last_audited_at=datetime(2005, 1, 1))
- k2 = OpenPGPKey(ADDRESS, fingerprint="2",
- last_audited_at=datetime(2007, 1, 1))
- k3 = OpenPGPKey(ADDRESS, fingerprint="3",
- last_audited_at=datetime(2007, 1, 1),
- encr_used=True, sign_used=True)
- k4 = OpenPGPKey(ADDRESS, fingerprint="4",
- last_audited_at=datetime(2007, 1, 1),
- sign_used=True)
- k5 = OpenPGPKey(ADDRESS, fingerprint="5",
- last_audited_at=datetime(2007, 1, 1),
- encr_used=True)
- deferreds = []
- for k in (k1, k2, k3, k4, k5):
- d = self._soledad.create_doc_from_json(
- k.get_active_json())
- deferreds.append(d)
- return gatherResults(deferreds)
- elif args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX:
- fingerprint = args[2]
- self.assertEqual(fingerprint, "3")
- k = OpenPGPKey(ADDRESS, fingerprint="3")
- return succeed(
- [self._soledad.create_doc_from_json(k.get_json())])
- return get_from_index(*args)
+ k1 = OpenPGPKey(ADDRESS, fingerprint="1",
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, fingerprint="2",
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, fingerprint="3",
+ refreshed_at=datetime(2007, 1, 1),
+ encr_used=True, sign_used=True)
+ k4 = OpenPGPKey(ADDRESS, fingerprint="4",
+ refreshed_at=datetime(2007, 1, 1),
+ sign_used=True)
+ k5 = OpenPGPKey(ADDRESS, fingerprint="5",
+ refreshed_at=datetime(2007, 1, 1),
+ encr_used=True)
+ k6 = OpenPGPKey(ADDRESS, fingerprint="6",
+ refreshed_at=datetime(2006, 1, 1),
+ encr_used=True, sign_used=True)
+ keys = (k1, k2, k3, k4, k5, k6)
+ for key in keys:
+ yield self._soledad.create_doc_from_json(key.get_json())
+ yield self._soledad.create_doc_from_json(key.get_active_json())
- self._soledad.get_from_index = my_get_from_index
- self._soledad.delete_doc = Mock(return_value=succeed(None))
+ delete_doc = self._mock_delete_doc()
- try:
- yield pgp.get_key(ADDRESS, private=False)
- self.assertEqual(self._soledad.delete_doc.call_count, 4)
- finally:
- self._soledad.get_from_index = get_from_index
- self._soledad.delete_doc = delete_doc
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self._soledad.delete_doc = delete_doc
+ self.assertEqual(self.count, 5)
+ self.assertEqual(key.fingerprint, "3")
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)