From 9a9c53eea49092e80737c84a2f850dd682c33ae3 Mon Sep 17 00:00:00 2001 From: Ruben Pollan Date: Tue, 29 Sep 2015 16:36:20 +0200 Subject: [feat] self-repair the keyring if keys get duplicated In some cases in the past keys got stored twice in different documents. Hopefully this issue is solved now, this tries to self-repair the keyring if encounters that. This is not really solving the problem, if it keeps happening we need to investigate the source. - Resolves: #7498 --- src/leap/keymanager/tests/__init__.py | 7 ++ src/leap/keymanager/tests/test_openpgp.py | 104 +++++++++++++++++++++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) (limited to 'src/leap/keymanager/tests') diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py index 9b95e1a..cd612c4 100644 --- a/src/leap/keymanager/tests/__init__.py +++ b/src/leap/keymanager/tests/__init__.py @@ -66,9 +66,15 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): for private in [True, False]: d = km.get_all_keys(private=private) d.addCallback(delete_keys) + d.addCallback(check_deleted, private) deferreds.append(d) return gatherResults(deferreds) + def check_deleted(_, private): + d = km.get_all_keys(private=private) + d.addCallback(lambda keys: self.assertEqual(keys, [])) + return d + # wait for the indexes to be ready for the tear down d = km._wrapper_map[OpenPGPKey].deferred_indexes d.addCallback(get_and_delete_keys) @@ -91,6 +97,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): # key 24D18DDF: public key "Leap Test Key " +KEY_ID = "2F455E2824D18DDF" KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" PUBLIC_KEY = """ -----BEGIN PGP PUBLIC KEY BLOCK----- diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py index 5f85c74..bae83db 100644 --- a/src/leap/keymanager/tests/test_openpgp.py +++ b/src/leap/keymanager/tests/test_openpgp.py @@ -21,12 +21,15 @@ Tests for the OpenPGP support on Key Manager. """ -from twisted.internet.defer import inlineCallbacks +from datetime import datetime +from mock import Mock +from twisted.internet.defer import inlineCallbacks, gatherResults, succeed from leap.keymanager import ( KeyNotFound, openpgp, ) +from leap.keymanager.keys import TYPE_ID_PRIVATE_INDEX from leap.keymanager.openpgp import OpenPGPKey from leap.keymanager.tests import ( KeyManagerWithSoledadTestCase, @@ -34,6 +37,7 @@ from leap.keymanager.tests import ( ADDRESS_2, KEY_FINGERPRINT, PUBLIC_KEY, + KEY_ID, PUBLIC_KEY_2, PRIVATE_KEY, PRIVATE_KEY_2, @@ -247,6 +251,104 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): validsign = pgp.verify(data, pubkey, detached_sig=signature) self.assertTrue(validsign) + @inlineCallbacks + def test_self_repair_three_keys(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + + get_from_index = self._soledad.get_from_index + delete_doc = self._soledad.delete_doc + + def my_get_from_index(*args): + if (args[0] == TYPE_ID_PRIVATE_INDEX and + args[2] == KEY_ID): + k1 = OpenPGPKey(ADDRESS, key_id="1", + refreshed_at=datetime(2005, 1, 1)) + k2 = OpenPGPKey(ADDRESS, key_id="2", + refreshed_at=datetime(2007, 1, 1)) + k3 = OpenPGPKey(ADDRESS, key_id="3", + refreshed_at=datetime(2001, 1, 1)) + d1 = self._soledad.create_doc_from_json(k1.get_json()) + d2 = self._soledad.create_doc_from_json(k2.get_json()) + d3 = self._soledad.create_doc_from_json(k3.get_json()) + return gatherResults([d1, d2, d3]) + return get_from_index(*args) + + self._soledad.get_from_index = my_get_from_index + self._soledad.delete_doc = Mock(return_value=succeed(None)) + + key = yield pgp.get_key(ADDRESS, private=False) + + try: + self.assertEqual(key.key_id, "2") + self.assertEqual(self._soledad.delete_doc.call_count, 2) + finally: + self._soledad.get_from_index = get_from_index + self._soledad.delete_doc = delete_doc + + @inlineCallbacks + def test_self_repair_no_keys(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + + get_from_index = self._soledad.get_from_index + delete_doc = self._soledad.delete_doc + + def my_get_from_index(*args): + if (args[0] == TYPE_ID_PRIVATE_INDEX and + args[2] == KEY_ID): + return succeed([]) + return get_from_index(*args) + + self._soledad.get_from_index = my_get_from_index + self._soledad.delete_doc = Mock(return_value=succeed(None)) + + try: + yield self.assertFailure(pgp.get_key(ADDRESS, private=False), + KeyNotFound) + self.assertEqual(self._soledad.delete_doc.call_count, 1) + finally: + self._soledad.get_from_index = get_from_index + self._soledad.delete_doc = delete_doc + + @inlineCallbacks + def test_self_repair_put_keys(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + get_from_index = self._soledad.get_from_index + delete_doc = self._soledad.delete_doc + + def my_get_from_index(*args): + if (args[0] == TYPE_ID_PRIVATE_INDEX and + args[2] == KEY_ID): + k1 = OpenPGPKey(ADDRESS, key_id="1", + fingerprint=KEY_FINGERPRINT, + refreshed_at=datetime(2005, 1, 1)) + k2 = OpenPGPKey(ADDRESS, key_id="2", + fingerprint=KEY_FINGERPRINT, + refreshed_at=datetime(2007, 1, 1)) + k3 = OpenPGPKey(ADDRESS, key_id="3", + fingerprint=KEY_FINGERPRINT, + refreshed_at=datetime(2001, 1, 1)) + d1 = self._soledad.create_doc_from_json(k1.get_json()) + d2 = self._soledad.create_doc_from_json(k2.get_json()) + d3 = self._soledad.create_doc_from_json(k3.get_json()) + return gatherResults([d1, d2, d3]) + return get_from_index(*args) + + self._soledad.get_from_index = my_get_from_index + self._soledad.delete_doc = Mock(return_value=succeed(None)) + + try: + yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS) + self.assertEqual(self._soledad.delete_doc.call_count, 2) + finally: + self._soledad.get_from_index = get_from_index + self._soledad.delete_doc = delete_doc + def _assert_key_not_found(self, pgp, address, private=False): d = pgp.get_key(address, private=private) return self.assertFailure(d, KeyNotFound) -- cgit v1.2.3