diff options
author | NavaL <ayoyo@thoughtworks.com> | 2016-11-25 20:51:45 +0100 |
---|---|---|
committer | Ruben Pollan <meskio@sindominio.net> | 2017-12-03 20:43:31 +0100 |
commit | bf9fa332e270ce0775e62517da457c8fc54f77ba (patch) | |
tree | 1311b80a089f26a70befc0b761f0a994b502e062 /src/leap/bitmask/keymanager/__init__.py | |
parent | 95f8a39c2185d9cc3af4e6ed600aea98c13d1949 (diff) |
[feat] decryption interoperability, when the current key pair
is renewed
- there is only one private inactive key that is the key
expiring last among all inactive keys
- if there is an inactive key, decryption with it, is tried
if it fails with the current active key.
Diffstat (limited to 'src/leap/bitmask/keymanager/__init__.py')
-rw-r--r-- | src/leap/bitmask/keymanager/__init__.py | 65 |
1 files changed, 62 insertions, 3 deletions
diff --git a/src/leap/bitmask/keymanager/__init__.py b/src/leap/bitmask/keymanager/__init__.py index a273b87d..87f411e7 100644 --- a/src/leap/bitmask/keymanager/__init__.py +++ b/src/leap/bitmask/keymanager/__init__.py @@ -207,7 +207,47 @@ class KeyManager(object): yield self.put_raw_key( raw_key, address=address, validation=validation_level) - def get_key(self, address, private=False, fetch_remote=True): + @defer.inlineCallbacks + def get_key(self, address, private=False, active=True, fetch_remote=True): + """ + Return a key bound to address. + + For an active key: first, search for the key in local storage. + If it is not available, then try to fetch from nickserver. + The inactive key is fetched locally, for the case of multiple keys + for the same address. This can be used to attempt decryption + from multiple keys. + + :param address: The address bound to the key. + :type address: str + :param private: Look for a private key instead of a public one? + :type private: bool + :param active: Look for the current active key + :type private: bool + :param fetch_remote: If key not found in local storage try to fetch + from nickserver + :type fetch_remote: bool + + :return: A Deferred which fires with an EncryptionKey bound to address, + or which fails with KeyNotFound if no key was found neither + locally or in keyserver or fail with KeyVersionError if the + key has a format not supported by this version of KeyManager + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + if active: + key = yield self._get_key(address, private, fetch_remote) + defer.returnValue(key) + all_keys = yield self.get_all_keys(private) + inactive_keys = filter(lambda _key: not _key.is_active(), all_keys) + if inactive_keys: + inactive_keys = sorted(inactive_keys, + key=lambda _key: _key.expiry_date) + defer.returnValue(inactive_keys[-1]) + + def _get_key(self, address, private=False, fetch_remote=True): """ Return a key bound to address. @@ -462,7 +502,8 @@ class KeyManager(object): fetch_remote=True): """ Decrypt data using private key from address and verify with public key - bound to verify address. + bound to verify address. If the decryption using the active private + key fails, then decription using the inactive key, if any, is tried. :param data: The data to be decrypted. :type data: str @@ -489,7 +530,7 @@ class KeyManager(object): """ @defer.inlineCallbacks - def decrypt(keys): + def _decrypt(keys): pubkey, privkey = keys decrypted, signed = yield self._openpgp.decrypt( data, privkey, passphrase=passphrase, verify=pubkey) @@ -507,6 +548,24 @@ class KeyManager(object): (pubkey.fingerprint,)) defer.returnValue((decrypted, signature)) + @defer.inlineCallbacks + def decrypt_with_inactive_key(keys, original_decrypt_error): + verify_key, active_key = keys + inactive_key = yield self.get_key(address, private=True, + active=False) + if inactive_key: + result = yield _decrypt([verify_key, inactive_key]) + defer.returnValue(result) + raise original_decrypt_error + + @defer.inlineCallbacks + def decrypt(keys): + try: + result = yield _decrypt(keys) + except keymanager_errors.DecryptError as e: + result = yield decrypt_with_inactive_key(keys, e) + defer.returnValue(result) + dpriv = self.get_key(address, private=True) dpub = defer.succeed(None) if verify is not None: |