diff options
-rw-r--r-- | keymanager/changes/next-changelog.txt | 1 | ||||
-rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 35 | ||||
-rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 22 | ||||
-rw-r--r-- | keymanager/src/leap/keymanager/tests/test_openpgp.py | 67 |
4 files changed, 68 insertions, 57 deletions
diff --git a/keymanager/changes/next-changelog.txt b/keymanager/changes/next-changelog.txt index 91ca8bdd..a2ab4fb1 100644 --- a/keymanager/changes/next-changelog.txt +++ b/keymanager/changes/next-changelog.txt @@ -12,6 +12,7 @@ Features ~~~~~~~~ - `#8031 <https://leap.se/code/issues/8031>`_: Remove support for multiple key types. - `#8068 <https://leap.se/code/issues/8068>`_: make get_all_keys aware of active addresses. +- `#6658 <https://leap.se/code/issues/6658>`_: Improve duplicated active documents fixup. - `#1234 <https://leap.se/code/issues/1234>`_: Description of the new feature corresponding with issue #1234. - New feature without related issue number. diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 8658e9c2..98ce4649 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -47,8 +47,8 @@ from leap.keymanager.documents import ( TYPE_ADDRESS_PRIVATE_INDEX, KEY_UIDS_KEY, KEY_FINGERPRINT_KEY, + KEY_PRIVATE_KEY, KEY_REFRESHED_AT_KEY, - KEY_LAST_AUDITED_AT_KEY, KEY_SIGN_USED_KEY, KEY_ENCR_USED_KEY, KEY_ADDRESS_KEY, @@ -268,8 +268,8 @@ class OpenPGPScheme(object): '1' if private else '0') keys = [] + fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY] for active in active_docs: - fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY] fp_keys = filter(lambda k: fp(k) == fp(active), key_docs) if len(fp_keys) == 0: @@ -770,6 +770,7 @@ class OpenPGPScheme(object): return self._repair_docs(doclist, cmp_key, log_key_doc) + @defer.inlineCallbacks def _repair_active_docs(self, doclist): """ If there is more than one active doc for an address try to self-repair @@ -779,23 +780,39 @@ class OpenPGPScheme(object): all the deletions are completed :rtype: Deferred """ + keys = {} + for doc in doclist: + fp = doc.content[KEY_FINGERPRINT_KEY] + private = doc.content[KEY_PRIVATE_KEY] + try: + key = yield self._get_key_doc_from_fingerprint(fp, private) + keys[fp] = key + except Exception: + pass + def log_active_doc(doc): logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY], doc.content[KEY_FINGERPRINT_KEY])) def cmp_active(d1, d2): - res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY], - d2.content[KEY_LAST_AUDITED_AT_KEY]) - if res != 0: - return res - + # XXX: for private keys it will be nice to check which key is known + # by the nicknym server and keep this one. But this needs a + # refactor that might not be worth it. used1 = (d1.content[KEY_SIGN_USED_KEY] + d1.content[KEY_ENCR_USED_KEY]) used2 = (d2.content[KEY_SIGN_USED_KEY] + d2.content[KEY_ENCR_USED_KEY]) - return cmp(used1, used2) + res = cmp(used1, used2) + if res != 0: + return res + + key1 = keys[d1.content[KEY_FINGERPRINT_KEY]] + key2 = keys[d2.content[KEY_FINGERPRINT_KEY]] + return cmp(key1.content[KEY_REFRESHED_AT_KEY], + key2.content[KEY_REFRESHED_AT_KEY]) - return self._repair_docs(doclist, cmp_active, log_active_doc) + doc = yield self._repair_docs(doclist, cmp_active, log_active_doc) + defer.returnValue(doc) def _repair_docs(self, doclist, cmp_func, log_func): logger.error("BUG ---------------------------------------------------") diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 9deaff1e..c4d3fda3 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -237,7 +237,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def verify_the_call(_): used_kwargs = km._async_client_pinned.request.call_args[1] - km._async_client_pinned.request.assert_called_once_with(expected_url, 'GET', **used_kwargs) + km._async_client_pinned.request.assert_called_once_with( + expected_url, 'GET', **used_kwargs) d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2) d.addCallback(verify_the_call) @@ -245,11 +246,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_key_not_found_is_raised_if_key_search_responds_404(self): """ - Test if key search request comes back with a 404 response then KeyNotFound is raised, with corresponding error message. + Test if key search request comes back with a 404 response then + KeyNotFound is raised, with corresponding error message. """ km = self._key_manager(url=NICKSERVER_URI) client.readBody = Mock(return_value=defer.succeed(None)) - km._async_client_pinned.request = Mock(return_value=defer.succeed(None)) + km._async_client_pinned.request = Mock( + return_value=defer.succeed(None)) url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) @@ -259,7 +262,9 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): check_404_callback = used_kwargs['callback'] fake_response = Mock() fake_response.code = NOT_FOUND - with self.assertRaisesRegexp(errors.KeyNotFound, '404: %s key not found.' % INVALID_MAIL_ADDRESS): + with self.assertRaisesRegexp( + errors.KeyNotFound, + '404: %s key not found.' % INVALID_MAIL_ADDRESS): check_404_callback(fake_response) d.addCallback(check_key_not_found_is_raised_if_404) @@ -267,11 +272,13 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): def test_non_existing_key_from_nicknym_is_relayed(self): """ - Test if key search requests throws KeyNotFound, the same error is raised. + Test if key search requests throws KeyNotFound, the same error is + raised. """ km = self._key_manager(url=NICKSERVER_URI) key_not_found_exception = errors.KeyNotFound('some message') - km._async_client_pinned.request = Mock(side_effect=key_not_found_exception) + km._async_client_pinned.request = Mock( + side_effect=key_not_found_exception) def assert_key_not_found_raised(error): self.assertEqual(error.value, key_not_found_exception) @@ -312,7 +319,8 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): client.readBody = Mock(return_value=defer.succeed(data)) # mock the fetcher so it returns the key for ADDRESS_2 - km._async_client_pinned.request = Mock(return_value=defer.succeed(None)) + km._async_client_pinned.request = Mock( + return_value=defer.succeed(None)) km.ca_cert_path = 'cacertpath' # try to key get without fetching from server d_fail = km.get_key(address, fetch_remote=False) diff --git a/keymanager/src/leap/keymanager/tests/test_openpgp.py b/keymanager/src/leap/keymanager/tests/test_openpgp.py index 4c016764..17a6b790 100644 --- a/keymanager/src/leap/keymanager/tests/test_openpgp.py +++ b/keymanager/src/leap/keymanager/tests/test_openpgp.py @@ -309,52 +309,37 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): self.assertEqual(self.count, 2) @inlineCallbacks - def test_self_repair_five_active_docs(self): + def test_self_repair_six_active_docs(self): pgp = openpgp.OpenPGPScheme( self._soledad, gpgbinary=self.gpg_binary_path) - get_from_index = self._soledad.get_from_index - delete_doc = self._soledad.delete_doc - - def my_get_from_index(*args): - if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and - args[2] == ADDRESS): - k1 = OpenPGPKey(ADDRESS, fingerprint="1", - last_audited_at=datetime(2005, 1, 1)) - k2 = OpenPGPKey(ADDRESS, fingerprint="2", - last_audited_at=datetime(2007, 1, 1)) - k3 = OpenPGPKey(ADDRESS, fingerprint="3", - last_audited_at=datetime(2007, 1, 1), - encr_used=True, sign_used=True) - k4 = OpenPGPKey(ADDRESS, fingerprint="4", - last_audited_at=datetime(2007, 1, 1), - sign_used=True) - k5 = OpenPGPKey(ADDRESS, fingerprint="5", - last_audited_at=datetime(2007, 1, 1), - encr_used=True) - deferreds = [] - for k in (k1, k2, k3, k4, k5): - d = self._soledad.create_doc_from_json( - k.get_active_json()) - deferreds.append(d) - return gatherResults(deferreds) - elif args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX: - fingerprint = args[2] - self.assertEqual(fingerprint, "3") - k = OpenPGPKey(ADDRESS, fingerprint="3") - return succeed( - [self._soledad.create_doc_from_json(k.get_json())]) - return get_from_index(*args) + k1 = OpenPGPKey(ADDRESS, fingerprint="1", + refreshed_at=datetime(2005, 1, 1)) + k2 = OpenPGPKey(ADDRESS, fingerprint="2", + refreshed_at=datetime(2007, 1, 1)) + k3 = OpenPGPKey(ADDRESS, fingerprint="3", + refreshed_at=datetime(2007, 1, 1), + encr_used=True, sign_used=True) + k4 = OpenPGPKey(ADDRESS, fingerprint="4", + refreshed_at=datetime(2007, 1, 1), + sign_used=True) + k5 = OpenPGPKey(ADDRESS, fingerprint="5", + refreshed_at=datetime(2007, 1, 1), + encr_used=True) + k6 = OpenPGPKey(ADDRESS, fingerprint="6", + refreshed_at=datetime(2006, 1, 1), + encr_used=True, sign_used=True) + keys = (k1, k2, k3, k4, k5, k6) + for key in keys: + yield self._soledad.create_doc_from_json(key.get_json()) + yield self._soledad.create_doc_from_json(key.get_active_json()) - self._soledad.get_from_index = my_get_from_index - self._soledad.delete_doc = Mock(return_value=succeed(None)) + delete_doc = self._mock_delete_doc() - try: - yield pgp.get_key(ADDRESS, private=False) - self.assertEqual(self._soledad.delete_doc.call_count, 4) - finally: - self._soledad.get_from_index = get_from_index - self._soledad.delete_doc = delete_doc + key = yield pgp.get_key(ADDRESS, private=False) + self._soledad.delete_doc = delete_doc + self.assertEqual(self.count, 5) + self.assertEqual(key.fingerprint, "3") def _assert_key_not_found(self, pgp, address, private=False): d = pgp.get_key(address, private=private) |