summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/keymanager/tests')
-rw-r--r--src/leap/keymanager/tests/__init__.py14
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py142
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py104
-rw-r--r--src/leap/keymanager/tests/test_validation.py48
4 files changed, 278 insertions, 30 deletions
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index 7128d20..cd612c4 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -66,18 +66,27 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
for private in [True, False]:
d = km.get_all_keys(private=private)
d.addCallback(delete_keys)
+ d.addCallback(check_deleted, private)
deferreds.append(d)
return gatherResults(deferreds)
+ def check_deleted(_, private):
+ d = km.get_all_keys(private=private)
+ d.addCallback(lambda keys: self.assertEqual(keys, []))
+ return d
+
# wait for the indexes to be ready for the tear down
d = km._wrapper_map[OpenPGPKey].deferred_indexes
d.addCallback(get_and_delete_keys)
d.addCallback(lambda _: self.tearDownEnv())
+ d.addCallback(lambda _: self._soledad.close())
return d
- def _key_manager(self, user=ADDRESS, url='', token=None):
+ def _key_manager(self, user=ADDRESS, url='', token=None,
+ ca_cert_path=None):
return KeyManager(user, url, self._soledad, token=token,
- gpgbinary=self.gpg_binary_path)
+ gpgbinary=self.gpg_binary_path,
+ ca_cert_path=ca_cert_path)
def _find_gpg(self):
gpg_path = distutils.spawn.find_executable('gpg')
@@ -88,6 +97,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
+KEY_ID = "2F455E2824D18DDF"
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
PUBLIC_KEY = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 55f892e..856d6da 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -20,9 +20,11 @@
Tests for the Key Manager.
"""
-
+from os import path
from datetime import datetime
-from mock import Mock
+import tempfile
+from leap.common import ca_bundle
+from mock import Mock, MagicMock, patch
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
@@ -36,10 +38,7 @@ from leap.keymanager.keys import (
is_address,
build_key_from_dict,
)
-from leap.keymanager.validation import (
- ValidationLevel,
- toValidationLevel
-)
+from leap.keymanager.validation import ValidationLevels
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
ADDRESS,
@@ -53,6 +52,7 @@ from leap.keymanager.tests import (
NICKSERVER_URI = "http://leap.se/"
+REMOTE_KEY_URL = "http://site.domain/key"
class KeyManagerUtilTestCase(unittest.TestCase):
@@ -82,7 +82,7 @@ class KeyManagerUtilTestCase(unittest.TestCase):
'expiry_date': 0,
'last_audited_at': 0,
'refreshed_at': 1311239602,
- 'validation': ValidationLevel.Weak_Chain.name,
+ 'validation': str(ValidationLevels.Weak_Chain),
'encr_used': False,
'sign_used': True,
}
@@ -115,7 +115,7 @@ class KeyManagerUtilTestCase(unittest.TestCase):
datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
'Wrong data in key.')
self.assertEqual(
- toValidationLevel(kdict['validation']), key.validation,
+ ValidationLevels.get(kdict['validation']), key.validation,
'Wrong data in key.')
self.assertEqual(
kdict['encr_used'], key.encr_used,
@@ -227,7 +227,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.address)
- self.assertEqual(key.validation, ValidationLevel.Provider_Trust)
+ self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
@inlineCallbacks
def test_get_key_fetches_other_domain(self):
@@ -239,7 +239,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS_OTHER in key.address)
- self.assertEqual(key.validation, ValidationLevel.Weak_Chain)
+ self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
def _fetch_key(self, km, address, key):
"""
@@ -290,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -307,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = ""
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyNotFound)
@@ -323,10 +321,125 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyAddressMismatch)
+ def _mock_get_response(self, km, body):
+ class Response(object):
+ ok = True
+ content = body
+
+ mock = MagicMock(return_value=Response())
+ km._fetcher.get = mock
+
+ return mock
+
+ @inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_none_specified(self):
+ ca_cert_path = None
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
+ ca_cert_path = ''
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
+ ca_cert_path = ca_bundle.where()
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_uses_combined_ca_bundle_otherwise(self):
+ with tempfile.NamedTemporaryFile() as tmp_input, \
+ tempfile.NamedTemporaryFile(delete=False) as tmp_output:
+ ca_content = 'some\ncontent\n'
+ ca_cert_path = tmp_input.name
+ self._dump_to_file(ca_cert_path, ca_content)
+
+ with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
+ mock.return_value = tmp_output
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ # assert that combined bundle file is passed to get call
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=tmp_output.name)
+
+ # assert that files got appended
+ expected = self._slurp_file(ca_bundle.where()) + ca_content
+ self.assertEqual(expected, self._slurp_file(tmp_output.name))
+
+ del km # force km out of scope
+ self.assertFalse(path.exists(tmp_output.name))
+
+ def _dump_to_file(self, filename, content):
+ with open(filename, 'w') as out:
+ out.write(content)
+
+ def _slurp_file(self, filename):
+ with open(filename) as f:
+ content = f.read()
+ return content
+
+ @inlineCallbacks
+ def test_decrypt_updates_sign_used_for_signer(self):
+ # given
+ km = self._key_manager()
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
+ sign=ADDRESS_2, fetch_remote=False)
+ yield km.decrypt(
+ encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+
+ # when
+ key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
+
+ # then
+ self.assertEqual(True, key.sign_used)
+
+ @inlineCallbacks
+ def test_decrypt_does_not_update_sign_used_for_recipient(self):
+ # given
+ km = self._key_manager()
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY, ADDRESS)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
+ sign=ADDRESS_2, fetch_remote=False)
+ yield km.decrypt(
+ encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+
+ # when
+ key = yield km.get_key(
+ ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+
+ # then
+ self.assertEqual(False, key.sign_used)
+
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -391,9 +504,8 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
sign=ADDRESS, fetch_remote=False))
return self.assertFailure(d, KeyNotFound)
-
-import unittest
if __name__ == "__main__":
+ import unittest
unittest.main()
# key 0F91B402: someone@somedomain.org
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
index 5f85c74..bae83db 100644
--- a/src/leap/keymanager/tests/test_openpgp.py
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -21,12 +21,15 @@ Tests for the OpenPGP support on Key Manager.
"""
-from twisted.internet.defer import inlineCallbacks
+from datetime import datetime
+from mock import Mock
+from twisted.internet.defer import inlineCallbacks, gatherResults, succeed
from leap.keymanager import (
KeyNotFound,
openpgp,
)
+from leap.keymanager.keys import TYPE_ID_PRIVATE_INDEX
from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
@@ -34,6 +37,7 @@ from leap.keymanager.tests import (
ADDRESS_2,
KEY_FINGERPRINT,
PUBLIC_KEY,
+ KEY_ID,
PUBLIC_KEY_2,
PRIVATE_KEY,
PRIVATE_KEY_2,
@@ -247,6 +251,104 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
validsign = pgp.verify(data, pubkey, detached_sig=signature)
self.assertTrue(validsign)
+ @inlineCallbacks
+ def test_self_repair_three_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ refreshed_at=datetime(2001, 1, 1))
+ d1 = self._soledad.create_doc_from_json(k1.get_json())
+ d2 = self._soledad.create_doc_from_json(k2.get_json())
+ d3 = self._soledad.create_doc_from_json(k3.get_json())
+ return gatherResults([d1, d2, d3])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ key = yield pgp.get_key(ADDRESS, private=False)
+
+ try:
+ self.assertEqual(key.key_id, "2")
+ self.assertEqual(self._soledad.delete_doc.call_count, 2)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
+ @inlineCallbacks
+ def test_self_repair_no_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ return succeed([])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield self.assertFailure(pgp.get_key(ADDRESS, private=False),
+ KeyNotFound)
+ self.assertEqual(self._soledad.delete_doc.call_count, 1)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
+ @inlineCallbacks
+ def test_self_repair_put_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2001, 1, 1))
+ d1 = self._soledad.create_doc_from_json(k1.get_json())
+ d2 = self._soledad.create_doc_from_json(k2.get_json())
+ d3 = self._soledad.create_doc_from_json(k3.get_json())
+ return gatherResults([d1, d2, d3])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ self.assertEqual(self._soledad.delete_doc.call_count, 2)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)
return self.assertFailure(d, KeyNotFound)
diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py
index 15e7d27..bcf41c4 100644
--- a/src/leap/keymanager/tests/test_validation.py
+++ b/src/leap/keymanager/tests/test_validation.py
@@ -18,6 +18,7 @@
Tests for the Validation Levels
"""
+import unittest
from datetime import datetime
from twisted.internet.defer import inlineCallbacks
@@ -29,12 +30,15 @@ from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
ADDRESS,
PUBLIC_KEY,
+ ADDRESS_2,
+ PUBLIC_KEY_2,
+ PRIVATE_KEY_2,
KEY_FINGERPRINT
)
-from leap.keymanager.validation import ValidationLevel
+from leap.keymanager.validation import ValidationLevels
-class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
+class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
@inlineCallbacks
def test_none_old_key(self):
@@ -47,7 +51,7 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
def test_cant_upgrade(self):
km = self._key_manager()
yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Trust)
+ validation=ValidationLevels.Provider_Trust)
d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
yield self.assertFailure(d, KeyNotValidUpgrade)
@@ -56,7 +60,7 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager()
yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Fingerprint)
+ validation=ValidationLevels.Fingerprint)
key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@@ -73,12 +77,12 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager()
yield km.put_raw_key(
EXPIRED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Third_Party_Endorsement)
+ validation=ValidationLevels.Third_Party_Endorsement)
d = km.put_raw_key(
UNRELATED_KEY,
OpenPGPKey,
ADDRESS,
- validation=ValidationLevel.Provider_Trust)
+ validation=ValidationLevels.Provider_Trust)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
@@ -93,14 +97,14 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
def test_not_used(self):
km = self._key_manager()
yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Trust)
+ validation=ValidationLevels.Provider_Trust)
yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Endorsement)
+ validation=ValidationLevels.Provider_Endorsement)
key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@inlineCallbacks
- def test_used(self):
+ def test_used_with_verify(self):
TEXT = "some text"
km = self._key_manager()
@@ -114,7 +118,28 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature)
d = km.put_raw_key(
UNRELATED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Endorsement)
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+
+ @inlineCallbacks
+ def test_used_with_decrypt(self):
+ TEXT = "some text"
+
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2)
+ yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
+
+ km2 = self._key_manager()
+ yield km2.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
+ yield km2.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2)
+ encrypted = yield km2.encrypt(TEXT, ADDRESS_2, OpenPGPKey,
+ sign=ADDRESS)
+
+ yield km.decrypt(encrypted, ADDRESS_2, OpenPGPKey, verify=ADDRESS)
+ d = km.put_raw_key(
+ UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Endorsement)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
@@ -339,7 +364,6 @@ X2+l7IOSt+31KQCBFN/VmhTySJOVQC1d2A56lSH2c/DWVClji+x3suzn
-----END PGP PUBLIC KEY BLOCK-----
"""
-
-import unittest
if __name__ == "__main__":
+ import unittest
unittest.main()