summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/keymanager/__init__.py
diff options
context:
space:
mode:
authorNavaL <ayoyo@thoughtworks.com>2016-11-25 20:51:45 +0100
committerRuben Pollan <meskio@sindominio.net>2017-12-03 20:43:31 +0100
commitbf9fa332e270ce0775e62517da457c8fc54f77ba (patch)
tree1311b80a089f26a70befc0b761f0a994b502e062 /src/leap/bitmask/keymanager/__init__.py
parent95f8a39c2185d9cc3af4e6ed600aea98c13d1949 (diff)
[feat] decryption interoperability, when the current key pair
is renewed - there is only one private inactive key that is the key expiring last among all inactive keys - if there is an inactive key, decryption with it, is tried if it fails with the current active key.
Diffstat (limited to 'src/leap/bitmask/keymanager/__init__.py')
-rw-r--r--src/leap/bitmask/keymanager/__init__.py65
1 files changed, 62 insertions, 3 deletions
diff --git a/src/leap/bitmask/keymanager/__init__.py b/src/leap/bitmask/keymanager/__init__.py
index a273b87d..87f411e7 100644
--- a/src/leap/bitmask/keymanager/__init__.py
+++ b/src/leap/bitmask/keymanager/__init__.py
@@ -207,7 +207,47 @@ class KeyManager(object):
yield self.put_raw_key(
raw_key, address=address, validation=validation_level)
- def get_key(self, address, private=False, fetch_remote=True):
+ @defer.inlineCallbacks
+ def get_key(self, address, private=False, active=True, fetch_remote=True):
+ """
+ Return a key bound to address.
+
+ For an active key: first, search for the key in local storage.
+ If it is not available, then try to fetch from nickserver.
+ The inactive key is fetched locally, for the case of multiple keys
+ for the same address. This can be used to attempt decryption
+ from multiple keys.
+
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Look for a private key instead of a public one?
+ :type private: bool
+ :param active: Look for the current active key
+ :type private: bool
+ :param fetch_remote: If key not found in local storage try to fetch
+ from nickserver
+ :type fetch_remote: bool
+
+ :return: A Deferred which fires with an EncryptionKey bound to address,
+ or which fails with KeyNotFound if no key was found neither
+ locally or in keyserver or fail with KeyVersionError if the
+ key has a format not supported by this version of KeyManager
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+
+ if active:
+ key = yield self._get_key(address, private, fetch_remote)
+ defer.returnValue(key)
+ all_keys = yield self.get_all_keys(private)
+ inactive_keys = filter(lambda _key: not _key.is_active(), all_keys)
+ if inactive_keys:
+ inactive_keys = sorted(inactive_keys,
+ key=lambda _key: _key.expiry_date)
+ defer.returnValue(inactive_keys[-1])
+
+ def _get_key(self, address, private=False, fetch_remote=True):
"""
Return a key bound to address.
@@ -462,7 +502,8 @@ class KeyManager(object):
fetch_remote=True):
"""
Decrypt data using private key from address and verify with public key
- bound to verify address.
+ bound to verify address. If the decryption using the active private
+ key fails, then decription using the inactive key, if any, is tried.
:param data: The data to be decrypted.
:type data: str
@@ -489,7 +530,7 @@ class KeyManager(object):
"""
@defer.inlineCallbacks
- def decrypt(keys):
+ def _decrypt(keys):
pubkey, privkey = keys
decrypted, signed = yield self._openpgp.decrypt(
data, privkey, passphrase=passphrase, verify=pubkey)
@@ -507,6 +548,24 @@ class KeyManager(object):
(pubkey.fingerprint,))
defer.returnValue((decrypted, signature))
+ @defer.inlineCallbacks
+ def decrypt_with_inactive_key(keys, original_decrypt_error):
+ verify_key, active_key = keys
+ inactive_key = yield self.get_key(address, private=True,
+ active=False)
+ if inactive_key:
+ result = yield _decrypt([verify_key, inactive_key])
+ defer.returnValue(result)
+ raise original_decrypt_error
+
+ @defer.inlineCallbacks
+ def decrypt(keys):
+ try:
+ result = yield _decrypt(keys)
+ except keymanager_errors.DecryptError as e:
+ result = yield decrypt_with_inactive_key(keys, e)
+ defer.returnValue(result)
+
dpriv = self.get_key(address, private=True)
dpub = defer.succeed(None)
if verify is not None: