diff options
Diffstat (limited to 'src/leap')
-rw-r--r-- | src/leap/keymanager/__init__.py | 118 | ||||
-rw-r--r-- | src/leap/keymanager/gpg.py | 3 | ||||
-rw-r--r-- | src/leap/keymanager/keys.py | 69 | ||||
-rw-r--r-- | src/leap/keymanager/openpgp.py | 484 | ||||
-rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 124 |
5 files changed, 481 insertions, 317 deletions
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index e1f318c..e6122ff 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -20,14 +20,16 @@ Key Manager is a Nicknym agent for LEAP client. """ +import logging import requests -from leap.common.check import leap_assert +from leap.common.check import leap_assert, leap_assert_type from leap.keymanager.errors import ( KeyNotFound, NoPasswordGiven, ) from leap.keymanager.keys import ( + EncryptionKey, build_key_from_dict, KEYMANAGER_KEY_TAG, TAGS_PRIVATE_INDEX, @@ -37,6 +39,8 @@ from leap.keymanager.openpgp import ( OpenPGPScheme, ) +logger = logging.getLogger(__name__) + # # The Key Manager @@ -52,7 +56,8 @@ class KeyManager(object): PUBKEY_KEY = "user[public_key]" def __init__(self, address, nickserver_uri, soledad, session_id=None, - ca_cert_path=None, api_uri=None, api_version=None, uid=None): + ca_cert_path=None, api_uri=None, api_version=None, uid=None, + gpgbinary=None): """ Initialize a Key Manager for user's C{address} with provider's nickserver reachable in C{url}. @@ -73,6 +78,8 @@ class KeyManager(object): :type api_version: str :param uid: The users' UID. :type uid: str + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} """ self._address = address self._nickserver_uri = nickserver_uri @@ -84,7 +91,7 @@ class KeyManager(object): self.uid = uid # a dict to map key types to their handlers self._wrapper_map = { - OpenPGPKey: OpenPGPScheme(soledad), + OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary), # other types of key will be added to this mapper. } # the following are used to perform https requests @@ -166,12 +173,18 @@ class KeyManager(object): @raise KeyNotFound: If the key was not found on nickserver. """ # request keys from the nickserver - server_keys = self._get( - self._nickserver_uri, {'address': address}).json() - # insert keys in local database - if self.OPENPGP_KEY in server_keys: - self._wrapper_map[OpenPGPKey].put_ascii_key( - server_keys['openpgp']) + res = None + try: + res = self._get(self._nickserver_uri, {'address': address}) + server_keys = res.json() + # insert keys in local database + if self.OPENPGP_KEY in server_keys: + self._wrapper_map[OpenPGPKey].put_ascii_key( + server_keys['openpgp']) + except Exception as e: + logger.warning("Error retrieving the keys: %r" % (e,)) + if res: + logger.warning("%s" % (res.content,)) # # key management @@ -334,3 +347,90 @@ class KeyManager(object): uid = property( _get_uid, _set_uid, doc='The uid of the user.') + + # + # encrypt/decrypt and sign/verify API + # + + def encrypt(self, data, pubkey, passphrase=None, sign=None): + """ + Encrypt C{data} using public @{key} and sign with C{sign} key. + + :param data: The data to be encrypted. + :type data: str + :param pubkey: The key used to encrypt. + :type pubkey: EncryptionKey + :param sign: The key used for signing. + :type sign: EncryptionKey + + :return: The encrypted data. + :rtype: str + """ + leap_assert_type(pubkey, EncryptionKey) + leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') + leap_assert(pubkey.private is False, 'Key is not public.') + return self._wrapper_map[pubkey.__class__].encrypt( + data, pubkey, passphrase, sign) + + def decrypt(self, data, privkey, passphrase=None, verify=None): + """ + Decrypt C{data} using private @{privkey} and verify with C{verify} key. + + :param data: The data to be decrypted. + :type data: str + :param privkey: The key used to decrypt. + :type privkey: OpenPGPKey + :param verify: The key used to verify a signature. + :type verify: OpenPGPKey + + :return: The decrypted data. + :rtype: str + + @raise InvalidSignature: Raised if unable to verify the signature with + C{verify} key. + """ + leap_assert_type(privkey, EncryptionKey) + leap_assert( + privkey.__class__ in self._wrapper_map, + 'Unknown key type.') + leap_assert(privkey.private is True, 'Key is not private.') + return self._wrapper_map[privkey.__class__].decrypt( + data, privkey, passphrase, verify) + + def sign(self, data, privkey): + """ + Sign C{data} with C{privkey}. + + :param data: The data to be signed. + :type data: str + + :param privkey: The private key to be used to sign. + :type privkey: EncryptionKey + + :return: The signed data. + :rtype: str + """ + leap_assert_type(privkey, EncryptionKey) + leap_assert( + privkey.__class__ in self._wrapper_map, + 'Unknown key type.') + leap_assert(privkey.private is True, 'Key is not private.') + return self._wrapper_map[privkey.__class__].sign(data, privkey) + + def verify(self, data, pubkey): + """ + Verify signed C{data} with C{pubkey}. + + :param data: The data to be verified. + :type data: str + + :param pubkey: The public key to be used on verification. + :type pubkey: EncryptionKey + + :return: The signed data. + :rtype: str + """ + leap_assert_type(pubkey, EncryptionKey) + leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') + leap_assert(pubkey.private is False, 'Key is not public.') + return self._wrapper_map[pubkey.__class__].verify(data, pubkey) diff --git a/src/leap/keymanager/gpg.py b/src/leap/keymanager/gpg.py index 15c1d9f..b81b218 100644 --- a/src/leap/keymanager/gpg.py +++ b/src/leap/keymanager/gpg.py @@ -110,9 +110,10 @@ class GPGWrapper(gnupg.GPG): @raise: RuntimeError with explanation message if there is a problem invoking gpg. """ + # XXX: options isn't always supported, so removing for the time being gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary, verbose=verbose, use_agent=use_agent, - keyring=keyring, options=options) + keyring=keyring)#, options=options) self.result_map['list-packets'] = ListPackets def find_key_by_email(self, email, secret=False): diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py index 44bd587..1c33745 100644 --- a/src/leap/keymanager/keys.py +++ b/src/leap/keymanager/keys.py @@ -283,3 +283,72 @@ class EncryptionScheme(object): :type key: EncryptionKey """ pass + + @abstractmethod + def encrypt(self, data, pubkey, passphrase=None, sign=None): + """ + Encrypt C{data} using public @{pubkey} and sign with C{sign} key. + + :param data: The data to be encrypted. + :type data: str + :param pubkey: The key used to encrypt. + :type pubkey: EncryptionKey + :param sign: The key used for signing. + :type sign: EncryptionKey + + :return: The encrypted data. + :rtype: str + """ + pass + + @abstractmethod + def decrypt(self, data, privkey, passphrase=None, verify=None): + """ + Decrypt C{data} using private @{privkey} and verify with C{verify} key. + + :param data: The data to be decrypted. + :type data: str + :param privkey: The key used to decrypt. + :type privkey: OpenPGPKey + :param verify: The key used to verify a signature. + :type verify: OpenPGPKey + + :return: The decrypted data. + :rtype: str + + @raise InvalidSignature: Raised if unable to verify the signature with + C{verify} key. + """ + pass + + @abstractmethod + def sign(self, data, privkey): + """ + Sign C{data} with C{privkey}. + + :param data: The data to be signed. + :type data: str + + :param privkey: The private key to be used to sign. + :type privkey: EncryptionKey + + :return: The signed data. + :rtype: str + """ + pass + + @abstractmethod + def verify(self, data, pubkey): + """ + Verify signed C{data} with C{pubkey}. + + :param data: The data to be verified. + :type data: str + + :param pubkey: The public key to be used on verification. + :type pubkey: EncryptionKey + + :return: The signed data. + :rtype: str + """ + pass diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index d19bb2b..aa04ed0 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -15,15 +15,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. + """ Infrastructure for using OpenPGP keys in Key Manager. """ + + import logging import os import re import shutil import tempfile + from leap.common.check import leap_assert, leap_assert_type from leap.keymanager import errors from leap.keymanager.keys import ( @@ -36,125 +40,8 @@ from leap.keymanager.keys import ( ) from leap.keymanager.gpg import GPGWrapper -logger = logging.getLogger(__name__) - -# -# gpg wrapper and decorator -# - -def temporary_gpgwrapper(keys=None): - """ - Returns a unitary gpg wrapper that implements context manager - protocol. - - :param key_data: ASCII armored key data. - :type key_data: str - - :return: a GPGWrapper instance - :rtype: GPGWrapper - """ - # TODO do here checks on key_data - return TempGPGWrapper(keys=keys) - - -def with_temporary_gpg(fun): - """ - Decorator to add a temporary gpg wrapper as context - to gpg related functions. - - Decorated functions are expected to return a function whose only - argument is a gpgwrapper instance. - """ - def wrapped(*args, **kwargs): - """ - We extract the arguments passed to the wrapped function, - run the function and do validations. - We expect that the positional arguments are `data`, - and an optional `key`. - All the rest of arguments should be passed as named arguments - to allow for a correct unpacking. - """ - if len(args) == 2: - keys = args[1] if isinstance(args[1], OpenPGPKey) else None - else: - keys = None - - # sign/verify keys passed as arguments - sign = kwargs.get('sign', None) - if sign: - keys = [keys, sign] - - verify = kwargs.get('verify', None) - if verify: - keys = [keys, verify] - - # is the wrapped function sign or verify? - fun_name = fun.__name__ - is_sign_function = True if fun_name == "sign" else False - is_verify_function = True if fun_name == "verify" else False - - result = None - - with temporary_gpgwrapper(keys) as gpg: - result = fun(*args, **kwargs)(gpg) - - # TODO: cleanup a little bit the - # validation. maybe delegate to other - # auxiliary functions for clarity. - - ok = getattr(result, 'ok', None) - - stderr = getattr(result, 'stderr', None) - if stderr: - logger.debug("%s" % (stderr,)) - - if ok is False: - raise errors.EncryptionDecryptionFailed( - 'Failed to encrypt/decrypt in %s: %s' % ( - fun.__name__, - stderr)) - - if verify is not None: - # A verify key has been passed - if result.valid is False or \ - verify.fingerprint != result.pubkey_fingerprint: - raise errors.InvalidSignature( - 'Failed to verify signature with key %s: %s' % - (verify.key_id, stderr)) - - if is_sign_function: - # Specific validation for sign function - privkey = gpg.list_keys(secret=True).pop() - rfprint = result.fingerprint - kfprint = privkey['fingerprint'] - if result.fingerprint is None: - raise errors.SignFailed( - 'Failed to sign with key %s: %s' % - (privkey['keyid'], stderr)) - leap_assert( - result.fingerprint == kfprint, - 'Signature and private key fingerprints mismatch: ' - '%s != %s' % - (rfprint, kfprint)) - - if is_verify_function: - # Specific validation for verify function - pubkey = gpg.list_keys().pop() - valid = result.valid - rfprint = result.fingerprint - kfprint = pubkey['fingerprint'] - if valid is False or rfprint != kfprint: - raise errors.InvalidSignature( - 'Failed to verify signature ' - 'with key %s.' % pubkey['keyid']) - result = result.valid - - # ok, enough checks. let's return data if available - if hasattr(result, 'data'): - result = result.data - return result - return wrapped +logger = logging.getLogger(__name__) class TempGPGWrapper(object): @@ -166,12 +53,15 @@ class TempGPGWrapper(object): one key. This function creates an empty temporary keyring and imports C{keys} if it is not None. """ - def __init__(self, keys=None): + def __init__(self, keys=None, gpgbinary=None): """ :param keys: OpenPGP key, or list of. :type keys: OpenPGPKey or list of OpenPGPKeys + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} """ self._gpg = None + self._gpgbinary = gpgbinary if not keys: keys = list() if not isinstance(keys, list): @@ -221,7 +111,9 @@ class TempGPGWrapper(object): listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) - self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp()) + self._gpg = GPGWrapper( + gnupghome=tempfile.mkdtemp(), + gpgbinary=self._gpgbinary) leap_assert(len(listkeys()) is 0, 'Keyring not empty.') # import keys into the keyring: @@ -266,144 +158,6 @@ class TempGPGWrapper(object): shutil.rmtree(self._gpg.gnupghome) -# -# API functions -# - -@with_temporary_gpg -def encrypt_asym(data, key, passphrase=None, sign=None): - """ - Encrypt C{data} using public @{key} and sign with C{sign} key. - - :param data: The data to be encrypted. - :type data: str - :param pubkey: The key used to encrypt. - :type pubkey: OpenPGPKey - :param sign: The key used for signing. - :type sign: OpenPGPKey - - :return: The encrypted data. - :rtype: str - """ - leap_assert_type(key, OpenPGPKey) - leap_assert(key.private is False, 'Key is not public.') - if sign is not None: - leap_assert_type(sign, OpenPGPKey) - leap_assert(sign.private is True) - - # Here we cannot assert for correctness of sig because the sig is in - # the ciphertext. - # result.ok - (bool) indicates if the operation succeeded - # result.data - (bool) contains the result of the operation - - return lambda gpg: gpg.encrypt( - data, key.fingerprint, - sign=sign.key_id if sign else None, - passphrase=passphrase, symmetric=False) - - -@with_temporary_gpg -def decrypt_asym(data, key, passphrase=None, verify=None): - """ - Decrypt C{data} using private @{key} and verify with C{verify} key. - - :param data: The data to be decrypted. - :type data: str - :param privkey: The key used to decrypt. - :type privkey: OpenPGPKey - :param verify: The key used to verify a signature. - :type verify: OpenPGPKey - - :return: The decrypted data. - :rtype: str - - @raise InvalidSignature: Raised if unable to verify the signature with - C{verify} key. - """ - leap_assert(key.private is True, 'Key is not private.') - if verify is not None: - leap_assert_type(verify, OpenPGPKey) - leap_assert(verify.private is False) - - return lambda gpg: gpg.decrypt( - data, passphrase=passphrase) - - -@with_temporary_gpg -def is_encrypted(data): - """ - Return whether C{data} was encrypted using OpenPGP. - - :param data: The data we want to know about. - :type data: str - - :return: Whether C{data} was encrypted using this wrapper. - :rtype: bool - """ - return lambda gpg: gpg.is_encrypted(data) - - -@with_temporary_gpg -def is_encrypted_asym(data): - """ - Return whether C{data} was asymmetrically encrypted using OpenPGP. - - :param data: The data we want to know about. - :type data: str - - :return: Whether C{data} was encrypted using this wrapper. - :rtype: bool - """ - return lambda gpg: gpg.is_encrypted_asym(data) - - -@with_temporary_gpg -def sign(data, privkey): - """ - Sign C{data} with C{privkey}. - - :param data: The data to be signed. - :type data: str - - :param privkey: The private key to be used to sign. - :type privkey: OpenPGPKey - - :return: The ascii-armored signed data. - :rtype: str - """ - leap_assert_type(privkey, OpenPGPKey) - leap_assert(privkey.private is True) - - # result.fingerprint - contains the fingerprint of the key used to - # sign. - return lambda gpg: gpg.sign(data, keyid=privkey.key_id) - - -@with_temporary_gpg -def verify(data, key): - """ - Verify signed C{data} with C{pubkey}. - - :param data: The data to be verified. - :type data: str - - :param pubkey: The public key to be used on verification. - :type pubkey: OpenPGPKey - - :return: The ascii-armored signed data. - :rtype: str - """ - leap_assert_type(key, OpenPGPKey) - leap_assert(key.private is False) - - return lambda gpg: gpg.verify(data) - - -# -# Helper functions -# - - def _build_key_from_gpg(address, key, key_data): """ Build an OpenPGPKey for C{address} based on C{key} from @@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey): class OpenPGPScheme(EncryptionScheme): """ - A wrapper for OpenPGP keys. + A wrapper for OpenPGP keys management and use (encryption, decyption, + signing and verification). """ - def __init__(self, soledad): + def __init__(self, soledad, gpgbinary=None): """ Initialize the OpenPGP wrapper. :param soledad: A Soledad instance for key storage. :type soledad: leap.soledad.Soledad + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} """ EncryptionScheme.__init__(self, soledad) + self._gpgbinary = gpgbinary + + # + # Keys management + # def gen_key(self, address): """ @@ -473,18 +235,28 @@ class OpenPGPScheme(EncryptionScheme): self.get_key(address) raise errors.KeyAlreadyExists(address) except errors.KeyNotFound: - pass + logger.debug('Key for %s not found' % (address,)) - def _gen_key(gpg): + with self._temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator params = gpg.gen_key_input( key_type='RSA', key_length=4096, name_real=address, name_email=address, name_comment='Generated by LEAP Key Manager.') + logger.info("About to generate keys... This might take SOME time.") gpg.gen_key(params) + logger.info("Keys for %s have been successfully " + "generated." % (address,)) pubkeys = gpg.list_keys() + # assert for new key characteristics + + # XXX This exception is not properly catched by the soledad + # bootstrapping, so if we do not finish generating the keys + # we end with a blocked thread -- kali + leap_assert( len(pubkeys) is 1, # a unitary keyring! 'Keyring has wrong number of keys: %d.' % len(pubkeys)) @@ -503,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(key['fingerprint'], secret=secret)) self.put_key(openpgp_key) - with temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - _gen_key(gpg) - return self.get_key(address, private=True) def get_key(self, address, private=False): @@ -539,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme): # TODO: add more checks for correct key data. leap_assert(key_data is not None, 'Data does not represent a key.') - def _put_ascii_key(gpg): + with self._temporary_gpgwrapper() as gpg: + # TODO: inspect result, or use decorator gpg.import_keys(key_data) privkey = None pubkey = None @@ -575,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme): gpg.export_keys(pubkey['fingerprint'], secret=False)) self.put_key(openpgp_pubkey) - with temporary_gpgwrapper() as gpg: - # TODO: inspect result, or use decorator - _put_ascii_key(gpg) - def put_key(self, key): """ Put C{key} in local storage. @@ -634,3 +399,180 @@ class OpenPGPScheme(EncryptionScheme): raise errors.KeyAttributesDiffer(key) doc = self._get_key_doc(key.address, key.private) self._soledad.delete_doc(doc) + + # + # Data encryption, decryption, signing and verifying + # + + def _temporary_gpgwrapper(self, keys=None): + """ + Returns a unitary gpg wrapper that implements context manager + protocol. + + :param key_data: ASCII armored key data. + :type key_data: str + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} + + :return: a GPGWrapper instance + :rtype: GPGWrapper + """ + # TODO do here checks on key_data + return TempGPGWrapper( + keys=keys, gpgbinary=self._gpgbinary) + + @staticmethod + def _assert_gpg_result_ok(result): + """ + Check if GPG result is 'ok' and log stderr outputs. + :param result: The GPG results + :type result: + """ + stderr = getattr(result, 'stderr', None) + if stderr: + logger.debug("%s" % (stderr,)) + if getattr(result, 'ok', None) is not True: + raise errors.EncryptionDecryptionFailed( + 'Failed to encrypt/decrypt: %s' % stderr) + + def encrypt(self, data, pubkey, passphrase=None, sign=None): + """ + Encrypt C{data} using public @{pubkey} and sign with C{sign} key. + + :param data: The data to be encrypted. + :type data: str + :param pubkey: The key used to encrypt. + :type pubkey: OpenPGPKey + :param sign: The key used for signing. + :type sign: OpenPGPKey + + :return: The encrypted data. + :rtype: str + """ + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False, 'Key is not public.') + keys = [pubkey] + if sign is not None: + leap_assert_type(sign, OpenPGPKey) + leap_assert(sign.private is True) + keys.append(sign) + with self._temporary_gpgwrapper(keys) as gpg: + result = gpg.encrypt( + data, pubkey.fingerprint, + sign=sign.key_id if sign else None, + passphrase=passphrase, symmetric=False) + # Here we cannot assert for correctness of sig because the sig is + # in the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation + self._assert_gpg_result_ok(result) + return result.data + + def decrypt(self, data, privkey, passphrase=None, verify=None): + """ + Decrypt C{data} using private @{privkey} and verify with C{verify} key. + + :param data: The data to be decrypted. + :type data: str + :param privkey: The key used to decrypt. + :type privkey: OpenPGPKey + :param verify: The key used to verify a signature. + :type verify: OpenPGPKey + + :return: The decrypted data. + :rtype: str + + @raise InvalidSignature: Raised if unable to verify the signature with + C{verify} key. + """ + leap_assert(privkey.private is True, 'Key is not private.') + keys = [privkey] + if verify is not None: + leap_assert_type(verify, OpenPGPKey) + leap_assert(verify.private is False) + keys.append(verify) + with self._temporary_gpgwrapper(keys) as gpg: + result = gpg.decrypt(data, passphrase=passphrase) + self._assert_gpg_result_ok(result) + # verify signature + if (verify is not None): + if result.valid is False or \ + verify.fingerprint != result.pubkey_fingerprint: + raise errors.InvalidSignature( + 'Failed to verify signature with key %s: %s' % + (verify.key_id, stderr)) + return result.data + + def is_encrypted(self, data): + """ + Return whether C{data} was asymmetrically encrypted using OpenPGP. + + :param data: The data we want to know about. + :type data: str + + :return: Whether C{data} was encrypted using this wrapper. + :rtype: bool + """ + with self._temporary_gpgwrapper() as gpg: + return gpg.is_encrypted_asym(data) + + def sign(self, data, privkey): + """ + Sign C{data} with C{privkey}. + + :param data: The data to be signed. + :type data: str + + :param privkey: The private key to be used to sign. + :type privkey: OpenPGPKey + + :return: The ascii-armored signed data. + :rtype: str + """ + leap_assert_type(privkey, OpenPGPKey) + leap_assert(privkey.private is True) + + # result.fingerprint - contains the fingerprint of the key used to + # sign. + with self._temporary_gpgwrapper(privkey) as gpg: + result = gpg.sign(data, keyid=privkey.key_id) + rfprint = privkey.fingerprint + privkey = gpg.list_keys(secret=True).pop() + kfprint = privkey['fingerprint'] + if result.fingerprint is None: + raise errors.SignFailed( + 'Failed to sign with key %s: %s' % + (privkey['keyid'], stderr)) + leap_assert( + result.fingerprint == kfprint, + 'Signature and private key fingerprints mismatch: ' + '%s != %s' % (rfprint, kfprint)) + return result.data + + def verify(self, data, pubkey): + """ + Verify signed C{data} with C{pubkey}. + + :param data: The data to be verified. + :type data: str + + :param pubkey: The public key to be used on verification. + :type pubkey: OpenPGPKey + + :return: The ascii-armored signed data. + :rtype: str + """ + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False) + with self._temporary_gpgwrapper(pubkey) as gpg: + result = gpg.verify(data) + gpgpubkey = gpg.list_keys().pop() + valid = result.valid + rfprint = result.fingerprint + kfprint = gpgpubkey['fingerprint'] + # raise in case sig is invalid + if valid is False or rfprint != kfprint: + raise errors.InvalidSignature( + 'Failed to verify signature ' + 'with key %s.' % gpgpubkey['keyid']) + return True diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 09a6183..a474e32 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -40,6 +40,7 @@ from leap.keymanager.keys import ( ADDRESS = 'leap@leap.se' ADDRESS_2 = 'anotheruser@leap.se' +GPG_BINARY_PATH = '/usr/bin/gpg' class KeyManagerUtilTestCase(BaseLeapTest): @@ -136,7 +137,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest): km._wrapper_map[key.__class__].delete_key(key) def _key_manager(self, user=ADDRESS, url=''): - return KeyManager(user, url, self._soledad) + return KeyManager(user, url, self._soledad, + gpgbinary=GPG_BINARY_PATH) class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -152,7 +154,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): '4096', key.length, 'Wrong key length.') def test_openpgp_put_delete_key(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) pgp.put_ascii_key(PUBLIC_KEY) key = pgp.get_key(ADDRESS, private=False) @@ -160,7 +163,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) def test_openpgp_put_ascii_key(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) pgp.put_ascii_key(PUBLIC_KEY) key = pgp.get_key(ADDRESS, private=False) @@ -173,7 +177,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) def test_get_public_key(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) pgp.put_ascii_key(PUBLIC_KEY) self.assertRaises( @@ -185,24 +190,25 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pgp.delete_key(key) self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) - def test_openpgp_encrypt_decrypt_asym(self): + def test_openpgp_encrypt_decrypt(self): # encrypt - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PUBLIC_KEY) pubkey = pgp.get_key(ADDRESS, private=False) - cyphertext = openpgp.encrypt_asym('data', pubkey) + cyphertext = pgp.encrypt('data', pubkey) # assert self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - self.assertTrue(openpgp.is_encrypted_asym(cyphertext)) - self.assertTrue(openpgp.is_encrypted(cyphertext)) + self.assertTrue(pgp.is_encrypted(cyphertext)) + self.assertTrue(pgp.is_encrypted(cyphertext)) # decrypt self.assertRaises( KeyNotFound, pgp.get_key, ADDRESS, private=True) pgp.put_ascii_key(PRIVATE_KEY) privkey = pgp.get_key(ADDRESS, private=True) - plaintext = openpgp.decrypt_asym(cyphertext, privkey) + plaintext = pgp.decrypt(cyphertext, privkey) pgp.delete_key(pubkey) pgp.delete_key(privkey) self.assertRaises( @@ -211,83 +217,91 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): KeyNotFound, pgp.get_key, ADDRESS, private=True) def test_verify_with_private_raises(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) - signed = openpgp.sign(data, privkey) + signed = pgp.sign(data, privkey) self.assertRaises( AssertionError, - openpgp.verify, signed, privkey) + pgp.verify, signed, privkey) def test_sign_with_public_raises(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PUBLIC_KEY) data = 'data' pubkey = pgp.get_key(ADDRESS, private=False) self.assertRaises( AssertionError, - openpgp.sign, data, pubkey) + pgp.sign, data, pubkey) def test_verify_with_wrong_key_raises(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) - signed = openpgp.sign(data, privkey) + signed = pgp.sign(data, privkey) pgp.put_ascii_key(PUBLIC_KEY_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, - openpgp.verify, signed, wrongkey) + pgp.verify, signed, wrongkey) - def test_encrypt_asym_sign_with_public_raises(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + def test_encrypt_sign_with_public_raises(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) self.assertRaises( AssertionError, - openpgp.encrypt_asym, data, privkey, sign=pubkey) + pgp.encrypt, data, privkey, sign=pubkey) - def test_decrypt_asym_verify_with_private_raises(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + def test_decrypt_verify_with_private_raises(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) - encrypted_and_signed = openpgp.encrypt_asym( + encrypted_and_signed = pgp.encrypt( data, pubkey, sign=privkey) self.assertRaises( AssertionError, - openpgp.decrypt_asym, + pgp.decrypt, encrypted_and_signed, privkey, verify=privkey) - def test_decrypt_asym_verify_with_wrong_key_raises(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + def test_decrypt_verify_with_wrong_key_raises(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) pubkey = pgp.get_key(ADDRESS, private=False) - encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey) + encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey) pgp.put_ascii_key(PUBLIC_KEY_2) wrongkey = pgp.get_key(ADDRESS_2) self.assertRaises( errors.InvalidSignature, - openpgp.verify, encrypted_and_signed, wrongkey) + pgp.verify, encrypted_and_signed, wrongkey) def test_sign_verify(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) data = 'data' privkey = pgp.get_key(ADDRESS, private=True) - signed = openpgp.sign(data, privkey) + signed = pgp.sign(data, privkey) pubkey = pgp.get_key(ADDRESS, private=False) - self.assertTrue(openpgp.verify(signed, pubkey)) + self.assertTrue(pgp.verify(signed, pubkey)) - def test_encrypt_asym_sign_decrypt_verify(self): - pgp = openpgp.OpenPGPScheme(self._soledad) + def test_encrypt_sign_decrypt_verify(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=GPG_BINARY_PATH) pgp.put_ascii_key(PRIVATE_KEY) pubkey = pgp.get_key(ADDRESS, private=False) privkey = pgp.get_key(ADDRESS, private=True) @@ -295,9 +309,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): pubkey2 = pgp.get_key(ADDRESS_2, private=False) privkey2 = pgp.get_key(ADDRESS_2, private=True) data = 'data' - encrypted_and_signed = openpgp.encrypt_asym( + encrypted_and_signed = pgp.encrypt( data, pubkey2, sign=privkey) - res = openpgp.decrypt_asym( + res = pgp.decrypt( encrypted_and_signed, privkey2, verify=pubkey) self.assertTrue(data, res) @@ -448,6 +462,44 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): self.assertEqual(ADDRESS_2, key.address) +class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): + + RAW_DATA = 'data' + + def test_keymanager_openpgp_encrypt_decrypt(self): + km = self._key_manager() + # put raw private key + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + # get public key + pubkey = km.get_key( + ADDRESS, OpenPGPKey, private=False, fetch_remote=False) + # encrypt + encdata = km.encrypt(self.RAW_DATA, pubkey) + self.assertNotEqual(self.RAW_DATA, encdata) + # get private key + privkey = km.get_key( + ADDRESS, OpenPGPKey, private=True, fetch_remote=False) + # decrypt + rawdata = km.decrypt(encdata, privkey) + self.assertEqual(self.RAW_DATA, rawdata) + + def test_keymanager_openpgp_sign_verify(self): + km = self._key_manager() + # put raw private keys + km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) + # get private key for signing + privkey = km.get_key( + ADDRESS, OpenPGPKey, private=True, fetch_remote=False) + # encrypt + signdata = km.sign(self.RAW_DATA, privkey) + self.assertNotEqual(self.RAW_DATA, signdata) + # get public key for verifying + pubkey = km.get_key( + ADDRESS, OpenPGPKey, private=False, fetch_remote=False) + # decrypt + self.assertTrue(km.verify(signdata, pubkey)) + + # Key material for testing # key 24D18DDF: public key "Leap Test Key <leap@leap.se>" |