From 5cb58ca086e88951b0313d7d4dc48a9f4c0c85b5 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 7 Aug 2013 16:29:54 +0200 Subject: Support bundled GPG and change API. - Move openpgp encrypt/decrypt/sign/verify API to inside OpenPGP class. - Add encrypt/decrypt/sign/verify API to KeyManager. - Add possibility of passing custom gpg binary to KeyManager and OpenPGPScheme. - Remove "_asym" suffix from method names. - Bump version to 0.2.1. New API is *not* backwards compatible. --- src/leap/keymanager/openpgp.py | 475 ++++++++++++++++++----------------------- 1 file changed, 205 insertions(+), 270 deletions(-) (limited to 'src/leap/keymanager/openpgp.py') diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 9d404cb..1e78c70 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -15,15 +15,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + """ Infrastructure for using OpenPGP keys in Key Manager. """ + + import logging import os import re import shutil import tempfile + from leap.common.check import leap_assert, leap_assert_type from leap.keymanager import errors from leap.keymanager.keys import ( @@ -36,125 +40,8 @@ from leap.keymanager.keys import ( ) from leap.keymanager.gpg import GPGWrapper -logger = logging.getLogger(__name__) - - -# -# gpg wrapper and decorator -# - -def temporary_gpgwrapper(keys=None): - """ - Returns a unitary gpg wrapper that implements context manager - protocol. - - :param key_data: ASCII armored key data. - :type key_data: str - - :return: a GPGWrapper instance - :rtype: GPGWrapper - """ - # TODO do here checks on key_data - return TempGPGWrapper(keys=keys) - - -def with_temporary_gpg(fun): - """ - Decorator to add a temporary gpg wrapper as context - to gpg related functions. - - Decorated functions are expected to return a function whose only - argument is a gpgwrapper instance. - """ - def wrapped(*args, **kwargs): - """ - We extract the arguments passed to the wrapped function, - run the function and do validations. - We expect that the positional arguments are `data`, - and an optional `key`. - All the rest of arguments should be passed as named arguments - to allow for a correct unpacking. - """ - if len(args) == 2: - keys = args[1] if isinstance(args[1], OpenPGPKey) else None - else: - keys = None - - # sign/verify keys passed as arguments - sign = kwargs.get('sign', None) - if sign: - keys = [keys, sign] - - verify = kwargs.get('verify', None) - if verify: - keys = [keys, verify] - - # is the wrapped function sign or verify? - fun_name = fun.__name__ - is_sign_function = True if fun_name == "sign" else False - is_verify_function = True if fun_name == "verify" else False - - result = None - - with temporary_gpgwrapper(keys) as gpg: - result = fun(*args, **kwargs)(gpg) - - # TODO: cleanup a little bit the - # validation. maybe delegate to other - # auxiliary functions for clarity. - - ok = getattr(result, 'ok', None) - - stderr = getattr(result, 'stderr', None) - if stderr: - logger.debug("%s" % (stderr,)) - - if ok is False: - raise errors.EncryptionDecryptionFailed( - 'Failed to encrypt/decrypt in %s: %s' % ( - fun.__name__, - stderr)) - - if verify is not None: - # A verify key has been passed - if result.valid is False or \ - verify.fingerprint != result.pubkey_fingerprint: - raise errors.InvalidSignature( - 'Failed to verify signature with key %s: %s' % - (verify.key_id, stderr)) - - if is_sign_function: - # Specific validation for sign function - privkey = gpg.list_keys(secret=True).pop() - rfprint = result.fingerprint - kfprint = privkey['fingerprint'] - if result.fingerprint is None: - raise errors.SignFailed( - 'Failed to sign with key %s: %s' % - (privkey['keyid'], stderr)) - leap_assert( - result.fingerprint == kfprint, - 'Signature and private key fingerprints mismatch: ' - '%s != %s' % - (rfprint, kfprint)) - - if is_verify_function: - # Specific validation for verify function - pubkey = gpg.list_keys().pop() - valid = result.valid - rfprint = result.fingerprint - kfprint = pubkey['fingerprint'] - if valid is False or rfprint != kfprint: - raise errors.InvalidSignature( - 'Failed to verify signature ' - 'with key %s.' % pubkey['keyid']) - result = result.valid - # ok, enough checks. let's return data if available - if hasattr(result, 'data'): - result = result.data - return result - return wrapped +logger = logging.getLogger(__name__) class TempGPGWrapper(object): @@ -166,12 +53,15 @@ class TempGPGWrapper(object): one key. This function creates an empty temporary keyring and imports C{keys} if it is not None. """ - def __init__(self, keys=None): + def __init__(self, keys=None, gpgbinary=None): """ :param keys: OpenPGP key, or list of. :type keys: OpenPGPKey or list of OpenPGPKeys + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} """ self._gpg = None + self._gpgbinary = gpgbinary if not keys: keys = list() if not isinstance(keys, list): @@ -221,7 +111,9 @@ class TempGPGWrapper(object): listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) - self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp()) + self._gpg = GPGWrapper( + gnupghome=tempfile.mkdtemp(), + gpgbinary=self._gpgbinary) leap_assert(len(listkeys()) is 0, 'Keyring not empty.') # import keys into the keyring: @@ -266,144 +158,6 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.gnupghome) -# -# API functions -# - -@with_temporary_gpg -def encrypt_asym(data, key, passphrase=None, sign=None): - """ - Encrypt C{data} using public @{key} and sign with C{sign} key. - - :param data: The data to be encrypted. - :type data: str - :param pubkey: The key used to encrypt. - :type pubkey: OpenPGPKey - :param sign: The key used for signing. - :type sign: OpenPGPKey - - :return: The encrypted data. - :rtype: str - """ - leap_assert_type(key, OpenPGPKey) - leap_assert(key.private is False, 'Key is not public.') - if sign is not None: - leap_assert_type(sign, OpenPGPKey) - leap_assert(sign.private is True) - - # Here we cannot assert for correctness of sig because the sig is in - # the ciphertext. - # result.ok - (bool) indicates if the operation succeeded - # result.data - (bool) contains the result of the operation - - return lambda gpg: gpg.encrypt( - data, key.fingerprint, - sign=sign.key_id if sign else None, - passphrase=passphrase, symmetric=False) - - -@with_temporary_gpg -def decrypt_asym(data, key, passphrase=None, verify=None): - """ - Decrypt C{data} using private @{key} and verify with C{verify} key. - - :param data: The data to be decrypted. - :type data: str - :param privkey: The key used to decrypt. - :type privkey: OpenPGPKey - :param verify: The key used to verify a signature. - :type verify: OpenPGPKey - - :return: The decrypted data. - :rtype: str - - @raise InvalidSignature: Raised if unable to verify the signature with - C{verify} key. - """ - leap_assert(key.private is True, 'Key is not private.') - if verify is not None: - leap_assert_type(verify, OpenPGPKey) - leap_assert(verify.private is False) - - return lambda gpg: gpg.decrypt( - data, passphrase=passphrase) - - -@with_temporary_gpg -def is_encrypted(data): - """ - Return whether C{data} was encrypted using OpenPGP. - - :param data: The data we want to know about. - :type data: str - - :return: Whether C{data} was encrypted using this wrapper. - :rtype: bool - """ - return lambda gpg: gpg.is_encrypted(data) - - -@with_temporary_gpg -def is_encrypted_asym(data): - """ - Return whether C{data} was asymmetrically encrypted using OpenPGP. - - :param data: The data we want to know about. - :type data: str - - :return: Whether C{data} was encrypted using this wrapper. - :rtype: bool - """ - return lambda gpg: gpg.is_encrypted_asym(data) - - -@with_temporary_gpg -def sign(data, privkey): - """ - Sign C{data} with C{privkey}. - - :param data: The data to be signed. - :type data: str - - :param privkey: The private key to be used to sign. - :type privkey: OpenPGPKey - - :return: The ascii-armored signed data. - :rtype: str - """ - leap_assert_type(privkey, OpenPGPKey) - leap_assert(privkey.private is True) - - # result.fingerprint - contains the fingerprint of the key used to - # sign. - return lambda gpg: gpg.sign(data, keyid=privkey.key_id) - - -@with_temporary_gpg -def verify(data, key): - """ - Verify signed C{data} with C{pubkey}. - - :param data: The data to be verified. - :type data: str - - :param pubkey: The public key to be used on verification. - :type pubkey: OpenPGPKey - - :return: The ascii-armored signed data. - :rtype: str - """ - leap_assert_type(key, OpenPGPKey) - leap_assert(key.private is False) - - return lambda gpg: gpg.verify(data) - - -# -# Helper functions -# - - def _build_key_from_gpg(address, key, key_data): """ Build an OpenPGPKey for C{address} based on C{key} from @@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey): class OpenPGPScheme(EncryptionScheme): """ - A wrapper for OpenPGP keys. + A wrapper for OpenPGP keys management and use (encryption, decyption, + signing and verification). """ - def __init__(self, soledad): + def __init__(self, soledad, gpgbinary=None): """ Initialize the OpenPGP wrapper. :param soledad: A Soledad instance for key storage. :type soledad: leap.soledad.Soledad + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} """ EncryptionScheme.__init__(self, soledad) + self._gpgbinary = gpgbinary + + # + # Keys management + # def gen_key(self, address): """ @@ -475,7 +237,8 @@ class OpenPGPScheme(EncryptionScheme): except errors.KeyNotFound: logger.debug('Key for %s not found' % (address,)) - def _gen_key(gpg): + with self._temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator params = gpg.gen_key_input( key_type='RSA', key_length=4096, @@ -512,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(key['fingerprint'], secret=secret)) self.put_key(openpgp_key) - with temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - _gen_key(gpg) - return self.get_key(address, private=True) def get_key(self, address, private=False): @@ -548,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme): # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - def _put_ascii_key(gpg): + with self._temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator gpg.import_keys(key_data) privkey = None pubkey = None @@ -584,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(pubkey['fingerprint'], secret=False)) self.put_key(openpgp_pubkey) - with temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - _put_ascii_key(gpg) - def put_key(self, key): """ Put C{key} in local storage. @@ -643,3 +399,182 @@ class OpenPGPScheme(EncryptionScheme): raise errors.KeyAttributesDiffer(key) doc = self._get_key_doc(key.address, key.private) self._soledad.delete_doc(doc) + + # + # Data encryption, decryption, signing and verifying + # + + def _temporary_gpgwrapper(self, keys=None): + """ + Returns a unitary gpg wrapper that implements context manager + protocol. + + :param key_data: ASCII armored key data. + :type key_data: str + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} + + :return: a GPGWrapper instance + :rtype: GPGWrapper + """ + # TODO do here checks on key_data + return TempGPGWrapper( + keys=keys, gpgbinary=self._gpgbinary) + + @staticmethod + def _assert_gpg_result_ok(result): + """ + Check if GPG result is 'ok' and log stderr outputs. + :param result: The GPG results + :type result: + """ + stderr = getattr(result, 'stderr', None) + if stderr: + logger.debug("%s" % (stderr,)) + if getattr(result, 'ok', None) is not True: + raise errors.EncryptionDecryptionFailed( + 'Failed to encrypt/decrypt in %s: %s' % ( + this.encrypt, + stderr)) + + def encrypt(self, data, pubkey, passphrase=None, sign=None): + """ + Encrypt C{data} using public @{pubkey} and sign with C{sign} key. + + :param data: The data to be encrypted. + :type data: str + :param pubkey: The key used to encrypt. + :type pubkey: OpenPGPKey + :param sign: The key used for signing. + :type sign: OpenPGPKey + + :return: The encrypted data. + :rtype: str + """ + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False, 'Key is not public.') + keys = [pubkey] + if sign is not None: + leap_assert_type(sign, OpenPGPKey) + leap_assert(sign.private is True) + keys.append(sign) + with self._temporary_gpgwrapper(keys) as gpg: + result = gpg.encrypt( + data, pubkey.fingerprint, + sign=sign.key_id if sign else None, + passphrase=passphrase, symmetric=False) + # Here we cannot assert for correctness of sig because the sig is + # in the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation + self._assert_gpg_result_ok(result) + return result.data + + def decrypt(self, data, privkey, passphrase=None, verify=None): + """ + Decrypt C{data} using private @{privkey} and verify with C{verify} key. + + :param data: The data to be decrypted. + :type data: str + :param privkey: The key used to decrypt. + :type privkey: OpenPGPKey + :param verify: The key used to verify a signature. + :type verify: OpenPGPKey + + :return: The decrypted data. + :rtype: str + + @raise InvalidSignature: Raised if unable to verify the signature with + C{verify} key. + """ + leap_assert(privkey.private is True, 'Key is not private.') + keys = [privkey] + if verify is not None: + leap_assert_type(verify, OpenPGPKey) + leap_assert(verify.private is False) + keys.append(verify) + with self._temporary_gpgwrapper(keys) as gpg: + result = gpg.decrypt(data, passphrase=passphrase) + self._assert_gpg_result_ok(result) + # verify signature + if (verify is not None): + if result.valid is False or \ + verify.fingerprint != result.pubkey_fingerprint: + raise errors.InvalidSignature( + 'Failed to verify signature with key %s: %s' % + (verify.key_id, stderr)) + return result.data + + def is_encrypted(self, data): + """ + Return whether C{data} was asymmetrically encrypted using OpenPGP. + + :param data: The data we want to know about. + :type data: str + + :return: Whether C{data} was encrypted using this wrapper. + :rtype: bool + """ + with self._temporary_gpgwrapper() as gpg: + return gpg.is_encrypted_asym(data) + + def sign(self, data, privkey): + """ + Sign C{data} with C{privkey}. + + :param data: The data to be signed. + :type data: str + + :param privkey: The private key to be used to sign. + :type privkey: OpenPGPKey + + :return: The ascii-armored signed data. + :rtype: str + """ + leap_assert_type(privkey, OpenPGPKey) + leap_assert(privkey.private is True) + + # result.fingerprint - contains the fingerprint of the key used to + # sign. + with self._temporary_gpgwrapper(privkey) as gpg: + result = gpg.sign(data, keyid=privkey.key_id) + rfprint = privkey.fingerprint + privkey = gpg.list_keys(secret=True).pop() + kfprint = privkey['fingerprint'] + if result.fingerprint is None: + raise errors.SignFailed( + 'Failed to sign with key %s: %s' % + (privkey['keyid'], stderr)) + leap_assert( + result.fingerprint == kfprint, + 'Signature and private key fingerprints mismatch: ' + '%s != %s' % (rfprint, kfprint)) + return result.data + + def verify(self, data, pubkey): + """ + Verify signed C{data} with C{pubkey}. + + :param data: The data to be verified. + :type data: str + + :param pubkey: The public key to be used on verification. + :type pubkey: OpenPGPKey + + :return: The ascii-armored signed data. + :rtype: str + """ + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False) + with self._temporary_gpgwrapper(pubkey) as gpg: + result = gpg.verify(data) + gpgpubkey = gpg.list_keys().pop() + valid = result.valid + rfprint = result.fingerprint + kfprint = gpgpubkey['fingerprint'] + # raise in case sig is invalid + if valid is False or rfprint != kfprint: + raise errors.InvalidSignature( + 'Failed to verify signature ' + 'with key %s.' % gpgpubkey['keyid']) + return True -- cgit v1.2.3