summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/bitmask/keymanager/__init__.py71
-rw-r--r--tests/integration/keymanager/test_keymanager.py56
2 files changed, 74 insertions, 53 deletions
diff --git a/src/leap/bitmask/keymanager/__init__.py b/src/leap/bitmask/keymanager/__init__.py
index 87f411e7..e3d7fdd0 100644
--- a/src/leap/bitmask/keymanager/__init__.py
+++ b/src/leap/bitmask/keymanager/__init__.py
@@ -208,46 +208,24 @@ class KeyManager(object):
raw_key, address=address, validation=validation_level)
@defer.inlineCallbacks
- def get_key(self, address, private=False, active=True, fetch_remote=True):
+ def get_inactive_private_keys(self):
"""
- Return a key bound to address.
-
- For an active key: first, search for the key in local storage.
- If it is not available, then try to fetch from nickserver.
- The inactive key is fetched locally, for the case of multiple keys
- for the same address. This can be used to attempt decryption
- from multiple keys.
-
- :param address: The address bound to the key.
- :type address: str
- :param private: Look for a private key instead of a public one?
- :type private: bool
- :param active: Look for the current active key
- :type private: bool
- :param fetch_remote: If key not found in local storage try to fetch
- from nickserver
- :type fetch_remote: bool
+ Return all inactive private keys bound to address, that can are
+ stored locally.
+ This can be used to attempt decryption from multiple keys.
- :return: A Deferred which fires with an EncryptionKey bound to address,
- or which fails with KeyNotFound if no key was found neither
- locally or in keyserver or fail with KeyVersionError if the
- key has a format not supported by this version of KeyManager
+ :return: A Deferred which fires the list of inactive keys sorted
+ according to their expiry dates.
:rtype: Deferred
-
- :raise UnsupportedKeyTypeError: if invalid key type
"""
-
- if active:
- key = yield self._get_key(address, private, fetch_remote)
- defer.returnValue(key)
- all_keys = yield self.get_all_keys(private)
+ all_keys = yield self.get_all_keys(private=True)
inactive_keys = filter(lambda _key: not _key.is_active(), all_keys)
- if inactive_keys:
- inactive_keys = sorted(inactive_keys,
- key=lambda _key: _key.expiry_date)
- defer.returnValue(inactive_keys[-1])
- def _get_key(self, address, private=False, fetch_remote=True):
+ inactive_keys = \
+ sorted(inactive_keys, key=lambda _key: _key.expiry_date)
+ defer.returnValue(inactive_keys)
+
+ def get_key(self, address, private=False, fetch_remote=True):
"""
Return a key bound to address.
@@ -549,21 +527,30 @@ class KeyManager(object):
defer.returnValue((decrypted, signature))
@defer.inlineCallbacks
- def decrypt_with_inactive_key(keys, original_decrypt_error):
- verify_key, active_key = keys
- inactive_key = yield self.get_key(address, private=True,
- active=False)
- if inactive_key:
+ def decrypt_with_inactive_keys(inactive_keys, verify_key,
+ original_decrypt_err):
+ if not inactive_keys:
+ # when there are no more keys to go through
+ raise original_decrypt_err
+
+ try:
+ inactive_key = inactive_keys.pop()
result = yield _decrypt([verify_key, inactive_key])
- defer.returnValue(result)
- raise original_decrypt_error
+ except keymanager_errors.DecryptError:
+ result = yield decrypt_with_inactive_keys(inactive_keys,
+ verify_key,
+ original_decrypt_err)
+ defer.returnValue(result)
@defer.inlineCallbacks
def decrypt(keys):
try:
result = yield _decrypt(keys)
except keymanager_errors.DecryptError as e:
- result = yield decrypt_with_inactive_key(keys, e)
+ verify_key, active_key = keys
+ inactive_keys = yield self.get_inactive_private_keys()
+ result = yield decrypt_with_inactive_keys(inactive_keys,
+ verify_key, e)
defer.returnValue(result)
dpriv = self.get_key(address, private=True)
diff --git a/tests/integration/keymanager/test_keymanager.py b/tests/integration/keymanager/test_keymanager.py
index 7bacd050..ed734332 100644
--- a/tests/integration/keymanager/test_keymanager.py
+++ b/tests/integration/keymanager/test_keymanager.py
@@ -195,22 +195,22 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
self.assertTrue(key.private)
@defer.inlineCallbacks
- def test_create_and_get_two_private_keys_sets_first_key_inactive(self):
+ def test_get_inactive_private_key(self):
km = self._key_manager()
yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
yield km._openpgp.put_raw_key(DIFFERENT_PRIVATE_KEY, ADDRESS)
# get the key
- inactive_key = yield km.get_key(ADDRESS, private=True,
- active=False, fetch_remote=False)
- active_key = yield km.get_key(ADDRESS, private=True,
- active=True, fetch_remote=False)
+ inactive_keys = yield km.get_inactive_private_keys()
+ active_key = yield km.get_key(
+ ADDRESS, private=True, fetch_remote=False)
+ self.assertEqual(1, len(inactive_keys))
self.assertEqual(
- inactive_key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+ inactive_keys[0].fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertEqual(
active_key.fingerprint.lower(), DIFFERENT_KEY_FPR.lower())
- self.assertTrue(inactive_key.private)
+ self.assertTrue(inactive_keys[0].private)
self.assertTrue(active_key.private)
- self.assertFalse(inactive_key.is_active())
+ self.assertFalse(inactive_keys[0].is_active())
self.assertTrue(active_key.is_active())
@defer.inlineCallbacks
@@ -603,10 +603,11 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
old_key = yield km.get_key(ADDRESS_EXPIRING)
new_key = yield km.regenerate_key()
- retrieved_old_key = yield km.get_key(ADDRESS_EXPIRING,
- private=True, active=False)
+ inactive_private_keys = yield km.get_inactive_private_keys()
renewed_public_key = yield km.get_key(ADDRESS_EXPIRING, private=False)
+ self.assertEqual(1, len(inactive_private_keys))
+ retrieved_old_key = inactive_private_keys[0]
self.assertEqual(old_key.fingerprint,
retrieved_old_key.fingerprint)
self.assertNotEqual(old_key.fingerprint,
@@ -668,7 +669,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(signingkey.fingerprint, key.fingerprint)
@defer.inlineCallbacks
- def test_keymanager_openpgp_decryption_tries_inactive_valid_key(self):
+ def test_keymanager_decryption_tries_inactive_valid_key(self):
km = self._key_manager()
# put raw private key
yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
@@ -689,6 +690,21 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(signingkey.fingerprint, key.fingerprint)
@defer.inlineCallbacks
+ def test_decrypt_throws_error_when_all_keys_fails(self):
+ km = self._key_manager()
+ # put raw private key
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+
+ # renew key -- deactivate current key
+ yield km.regenerate_key()
+
+ # decrypt
+ with self.assertRaises(errors.DecryptError):
+ yield km.decrypt(ENCRYPTED_MESSAGE_FOR_DIFFERENT_KEY,
+ ADDRESS, verify=ADDRESS_2, fetch_remote=False)
+
+ @defer.inlineCallbacks
def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
km = self._key_manager()
# put raw keys
@@ -797,3 +813,21 @@ QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
=gDzy
-----END PGP PUBLIC KEY BLOCK-----
"""
+
+ENCRYPTED_MESSAGE_FOR_DIFFERENT_KEY = """
+-----BEGIN PGP MESSAGE-----
+
+hQEMAyIGRJSVjm17AQf/fyQrbcUKhy4Zv0UBsMFNdLj3h6YYkhkDecupmNeJzgSc
+IeW8E5Un5thGpJRCF1iC3XirzybQxCEDCqVZdibXY/K0D5eQAE95m3Bc2euZN3sm
+br4Ro/ybf/+0mt+cyPrvoaU/c/RKCWAXGDrTNCLe9f4UkwdiRj5tBpdC6WNEgsTD
+SJfpZF5xP+NMc0cBRmSnUZHMgspbBK1OYmQurxn8vjyxDXwJuJ9sWl+FrWop3WMW
+l/IMSSgyaJUjHvau6WNzRhKLujhuqyZKWo0WuJdBT0lPM0aQCJls4QVpDwE9mTZy
+Vm2M4VnrxP9/IMqCrevwJXQTIKgIz9ANif+iZdHWYNLALAGgnH+45wXeguhFP1vD
+x3SVIgOp8aAW7Plf5IO/bRQBs/LTvS1HWkD07WW14NJ29eMTPgoSR/lTGNMbHGYH
+EgqRxJIsH93A+fN+CQoPboaEW/0hhQVf0WO/b8soxhVwZDDPMI3qGAQBwrBD3N9z
+ksEUD5XNT+6mtMpTSpPr/0j0W7LjqR5QT+Bf2lUiFLH8XwekO0JK/vq5XkTydiAw
+ZZBCPpoBxM/gH3cMuFafZNbqE6KDd7UziKxZCR17SrDFjrK/BLMrRKXRSnZOQNsb
+WuLF0jIGxN6NiaduJ77gmrOieuBu0wKqv0iAvo8s
+=G2sp
+-----END PGP MESSAGE-----
+"""