summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/tests
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-09-29 16:36:20 +0200
committerRuben Pollan <meskio@sindominio.net>2015-10-01 00:01:19 +0200
commit9a9c53eea49092e80737c84a2f850dd682c33ae3 (patch)
tree84534f413ad7d9b22d01abcbcdc74f4a503aa593 /src/leap/keymanager/tests
parent6a8cd66c656b810fb5052c75fa21002de5330273 (diff)
[feat] self-repair the keyring if keys get duplicated
In some cases in the past keys got stored twice in different documents. Hopefully this issue is solved now, this tries to self-repair the keyring if encounters that. This is not really solving the problem, if it keeps happening we need to investigate the source. - Resolves: #7498
Diffstat (limited to 'src/leap/keymanager/tests')
-rw-r--r--src/leap/keymanager/tests/__init__.py7
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py104
2 files changed, 110 insertions, 1 deletions
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index 9b95e1ac..cd612c43 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -66,9 +66,15 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
for private in [True, False]:
d = km.get_all_keys(private=private)
d.addCallback(delete_keys)
+ d.addCallback(check_deleted, private)
deferreds.append(d)
return gatherResults(deferreds)
+ def check_deleted(_, private):
+ d = km.get_all_keys(private=private)
+ d.addCallback(lambda keys: self.assertEqual(keys, []))
+ return d
+
# wait for the indexes to be ready for the tear down
d = km._wrapper_map[OpenPGPKey].deferred_indexes
d.addCallback(get_and_delete_keys)
@@ -91,6 +97,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
+KEY_ID = "2F455E2824D18DDF"
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
PUBLIC_KEY = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
index 5f85c74b..bae83db7 100644
--- a/src/leap/keymanager/tests/test_openpgp.py
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -21,12 +21,15 @@ Tests for the OpenPGP support on Key Manager.
"""
-from twisted.internet.defer import inlineCallbacks
+from datetime import datetime
+from mock import Mock
+from twisted.internet.defer import inlineCallbacks, gatherResults, succeed
from leap.keymanager import (
KeyNotFound,
openpgp,
)
+from leap.keymanager.keys import TYPE_ID_PRIVATE_INDEX
from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
@@ -34,6 +37,7 @@ from leap.keymanager.tests import (
ADDRESS_2,
KEY_FINGERPRINT,
PUBLIC_KEY,
+ KEY_ID,
PUBLIC_KEY_2,
PRIVATE_KEY,
PRIVATE_KEY_2,
@@ -247,6 +251,104 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
validsign = pgp.verify(data, pubkey, detached_sig=signature)
self.assertTrue(validsign)
+ @inlineCallbacks
+ def test_self_repair_three_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ refreshed_at=datetime(2001, 1, 1))
+ d1 = self._soledad.create_doc_from_json(k1.get_json())
+ d2 = self._soledad.create_doc_from_json(k2.get_json())
+ d3 = self._soledad.create_doc_from_json(k3.get_json())
+ return gatherResults([d1, d2, d3])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ key = yield pgp.get_key(ADDRESS, private=False)
+
+ try:
+ self.assertEqual(key.key_id, "2")
+ self.assertEqual(self._soledad.delete_doc.call_count, 2)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
+ @inlineCallbacks
+ def test_self_repair_no_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ return succeed([])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield self.assertFailure(pgp.get_key(ADDRESS, private=False),
+ KeyNotFound)
+ self.assertEqual(self._soledad.delete_doc.call_count, 1)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
+ @inlineCallbacks
+ def test_self_repair_put_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2001, 1, 1))
+ d1 = self._soledad.create_doc_from_json(k1.get_json())
+ d2 = self._soledad.create_doc_from_json(k2.get_json())
+ d3 = self._soledad.create_doc_from_json(k3.get_json())
+ return gatherResults([d1, d2, d3])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ self.assertEqual(self._soledad.delete_doc.call_count, 2)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)
return self.assertFailure(d, KeyNotFound)