diff options
| author | Tomás Touceda <chiiph@leap.se> | 2013-08-08 11:08:18 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2013-08-08 11:08:18 -0300 | 
| commit | 52178f62cc880c19593a4fe15efbff812ad0b47d (patch) | |
| tree | 020095665c8a27db54d3c64a772bf22913b437c2 /keymanager/src | |
| parent | 5a77f97b954f0286d3da7d227a6f4e130ab25f5f (diff) | |
| parent | 2623cf55c275bda37ad7249e4cf33d448ea7f246 (diff) | |
Merge remote-tracking branch 'drebs/feature/3397-keymanager-should-support-bundled-gpg' into develop
Diffstat (limited to 'keymanager/src')
| -rw-r--r-- | keymanager/src/leap/keymanager/__init__.py | 97 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/keys.py | 69 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/openpgp.py | 475 | ||||
| -rw-r--r-- | keymanager/src/leap/keymanager/tests/test_keymanager.py | 124 | 
4 files changed, 456 insertions, 309 deletions
| diff --git a/keymanager/src/leap/keymanager/__init__.py b/keymanager/src/leap/keymanager/__init__.py index e1f318c..d24e08e 100644 --- a/keymanager/src/leap/keymanager/__init__.py +++ b/keymanager/src/leap/keymanager/__init__.py @@ -22,12 +22,13 @@ Key Manager is a Nicknym agent for LEAP client.  import requests -from leap.common.check import leap_assert +from leap.common.check import leap_assert, leap_assert_type  from leap.keymanager.errors import (      KeyNotFound,      NoPasswordGiven,  )  from leap.keymanager.keys import ( +    EncryptionKey,      build_key_from_dict,      KEYMANAGER_KEY_TAG,      TAGS_PRIVATE_INDEX, @@ -52,7 +53,8 @@ class KeyManager(object):      PUBKEY_KEY = "user[public_key]"      def __init__(self, address, nickserver_uri, soledad, session_id=None, -                 ca_cert_path=None, api_uri=None, api_version=None, uid=None): +                 ca_cert_path=None, api_uri=None, api_version=None, uid=None, +                 gpgbinary=None):          """          Initialize a Key Manager for user's C{address} with provider's          nickserver reachable in C{url}. @@ -73,6 +75,8 @@ class KeyManager(object):          :type api_version: str          :param uid: The users' UID.          :type uid: str +        :param gpgbinary: Name for GnuPG binary executable. +        :type gpgbinary: C{str}          """          self._address = address          self._nickserver_uri = nickserver_uri @@ -84,7 +88,7 @@ class KeyManager(object):          self.uid = uid          # a dict to map key types to their handlers          self._wrapper_map = { -            OpenPGPKey: OpenPGPScheme(soledad), +            OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary),              # other types of key will be added to this mapper.          }          # the following are used to perform https requests @@ -334,3 +338,90 @@ class KeyManager(object):      uid = property(          _get_uid, _set_uid, doc='The uid of the user.') + +    # +    # encrypt/decrypt and sign/verify API +    # + +    def encrypt(self, data, pubkey, passphrase=None, sign=None): +        """ +        Encrypt C{data} using public @{key} and sign with C{sign} key. + +        :param data: The data to be encrypted. +        :type data: str +        :param pubkey: The key used to encrypt. +        :type pubkey: EncryptionKey +        :param sign: The key used for signing. +        :type sign: EncryptionKey + +        :return: The encrypted data. +        :rtype: str +        """ +        leap_assert_type(pubkey, EncryptionKey) +        leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') +        leap_assert(pubkey.private is False, 'Key is not public.') +        return self._wrapper_map[pubkey.__class__].encrypt( +            data, pubkey, passphrase, sign) + +    def decrypt(self, data, privkey, passphrase=None, verify=None): +        """ +        Decrypt C{data} using private @{privkey} and verify with C{verify} key. + +        :param data: The data to be decrypted. +        :type data: str +        :param privkey: The key used to decrypt. +        :type privkey: OpenPGPKey +        :param verify: The key used to verify a signature. +        :type verify: OpenPGPKey + +        :return: The decrypted data. +        :rtype: str + +        @raise InvalidSignature: Raised if unable to verify the signature with +            C{verify} key. +        """ +        leap_assert_type(privkey, EncryptionKey) +        leap_assert( +            privkey.__class__ in self._wrapper_map, +            'Unknown key type.') +        leap_assert(privkey.private is True, 'Key is not private.') +        return self._wrapper_map[privkey.__class__].decrypt( +            data, privkey, passphrase, verify) + +    def sign(self, data, privkey): +        """ +        Sign C{data} with C{privkey}. + +        :param data: The data to be signed. +        :type data: str + +        :param privkey: The private key to be used to sign. +        :type privkey: EncryptionKey + +        :return: The signed data. +        :rtype: str +        """ +        leap_assert_type(privkey, EncryptionKey) +        leap_assert( +            privkey.__class__ in self._wrapper_map, +            'Unknown key type.') +        leap_assert(privkey.private is True, 'Key is not private.') +        return self._wrapper_map[privkey.__class__].sign(data, privkey) + +    def verify(self, data, pubkey): +        """ +        Verify signed C{data} with C{pubkey}. + +        :param data: The data to be verified. +        :type data: str + +        :param pubkey: The public key to be used on verification. +        :type pubkey: EncryptionKey + +        :return: The signed data. +        :rtype: str +        """ +        leap_assert_type(pubkey, EncryptionKey) +        leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.') +        leap_assert(pubkey.private is False, 'Key is not public.') +        return self._wrapper_map[pubkey.__class__].verify(data, pubkey) diff --git a/keymanager/src/leap/keymanager/keys.py b/keymanager/src/leap/keymanager/keys.py index 44bd587..1c33745 100644 --- a/keymanager/src/leap/keymanager/keys.py +++ b/keymanager/src/leap/keymanager/keys.py @@ -283,3 +283,72 @@ class EncryptionScheme(object):          :type key: EncryptionKey          """          pass + +    @abstractmethod +    def encrypt(self, data, pubkey, passphrase=None, sign=None): +        """ +        Encrypt C{data} using public @{pubkey} and sign with C{sign} key. + +        :param data: The data to be encrypted. +        :type data: str +        :param pubkey: The key used to encrypt. +        :type pubkey: EncryptionKey +        :param sign: The key used for signing. +        :type sign: EncryptionKey + +        :return: The encrypted data. +        :rtype: str +        """ +        pass + +    @abstractmethod +    def decrypt(self, data, privkey, passphrase=None, verify=None): +        """ +        Decrypt C{data} using private @{privkey} and verify with C{verify} key. + +        :param data: The data to be decrypted. +        :type data: str +        :param privkey: The key used to decrypt. +        :type privkey: OpenPGPKey +        :param verify: The key used to verify a signature. +        :type verify: OpenPGPKey + +        :return: The decrypted data. +        :rtype: str + +        @raise InvalidSignature: Raised if unable to verify the signature with +            C{verify} key. +        """ +        pass + +    @abstractmethod +    def sign(self, data, privkey): +        """ +        Sign C{data} with C{privkey}. + +        :param data: The data to be signed. +        :type data: str + +        :param privkey: The private key to be used to sign. +        :type privkey: EncryptionKey + +        :return: The signed data. +        :rtype: str +        """ +        pass + +    @abstractmethod +    def verify(self, data, pubkey): +        """ +        Verify signed C{data} with C{pubkey}. + +        :param data: The data to be verified. +        :type data: str + +        :param pubkey: The public key to be used on verification. +        :type pubkey: EncryptionKey + +        :return: The signed data. +        :rtype: str +        """ +        pass diff --git a/keymanager/src/leap/keymanager/openpgp.py b/keymanager/src/leap/keymanager/openpgp.py index 9d404cb..1e78c70 100644 --- a/keymanager/src/leap/keymanager/openpgp.py +++ b/keymanager/src/leap/keymanager/openpgp.py @@ -15,15 +15,19 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. +  """  Infrastructure for using OpenPGP keys in Key Manager.  """ + +  import logging  import os  import re  import shutil  import tempfile +  from leap.common.check import leap_assert, leap_assert_type  from leap.keymanager import errors  from leap.keymanager.keys import ( @@ -36,125 +40,8 @@ from leap.keymanager.keys import (  )  from leap.keymanager.gpg import GPGWrapper -logger = logging.getLogger(__name__) - - -# -# gpg wrapper and decorator -# - -def temporary_gpgwrapper(keys=None): -    """ -    Returns a unitary gpg wrapper that implements context manager -    protocol. - -    :param key_data: ASCII armored key data. -    :type key_data: str - -    :return: a GPGWrapper instance -    :rtype: GPGWrapper -    """ -    # TODO do here checks on key_data -    return TempGPGWrapper(keys=keys) - - -def with_temporary_gpg(fun): -    """ -    Decorator to add a temporary gpg wrapper as context -    to gpg related functions. - -    Decorated functions are expected to return a function whose only -    argument is a gpgwrapper instance. -    """ -    def wrapped(*args, **kwargs): -        """ -        We extract the arguments passed to the wrapped function, -        run the function and do validations. -        We expect that the positional arguments are `data`, -        and an optional `key`. -        All the rest of arguments should be passed as named arguments -        to allow for a correct unpacking. -        """ -        if len(args) == 2: -            keys = args[1] if isinstance(args[1], OpenPGPKey) else None -        else: -            keys = None - -        # sign/verify keys passed as arguments -        sign = kwargs.get('sign', None) -        if sign: -            keys = [keys, sign] - -        verify = kwargs.get('verify', None) -        if verify: -            keys = [keys, verify] - -        # is the wrapped function sign or verify? -        fun_name = fun.__name__ -        is_sign_function = True if fun_name == "sign" else False -        is_verify_function = True if fun_name == "verify" else False - -        result = None - -        with temporary_gpgwrapper(keys) as gpg: -            result = fun(*args, **kwargs)(gpg) - -            # TODO: cleanup a little bit the -            # validation. maybe delegate to other -            # auxiliary functions for clarity. - -            ok = getattr(result, 'ok', None) - -            stderr = getattr(result, 'stderr', None) -            if stderr: -                logger.debug("%s" % (stderr,)) - -            if ok is False: -                raise errors.EncryptionDecryptionFailed( -                    'Failed to encrypt/decrypt in %s: %s' % ( -                        fun.__name__, -                        stderr)) - -            if verify is not None: -                # A verify key has been passed -                if result.valid is False or \ -                        verify.fingerprint != result.pubkey_fingerprint: -                    raise errors.InvalidSignature( -                        'Failed to verify signature with key %s: %s' % -                        (verify.key_id, stderr)) - -            if is_sign_function: -                # Specific validation for sign function -                privkey = gpg.list_keys(secret=True).pop() -                rfprint = result.fingerprint -                kfprint = privkey['fingerprint'] -                if result.fingerprint is None: -                    raise errors.SignFailed( -                        'Failed to sign with key %s: %s' % -                        (privkey['keyid'], stderr)) -                leap_assert( -                    result.fingerprint == kfprint, -                    'Signature and private key fingerprints mismatch: ' -                    '%s != %s' % -                    (rfprint, kfprint)) - -            if is_verify_function: -                # Specific validation for verify function -                pubkey = gpg.list_keys().pop() -                valid = result.valid -                rfprint = result.fingerprint -                kfprint = pubkey['fingerprint'] -                if valid is False or rfprint != kfprint: -                    raise errors.InvalidSignature( -                        'Failed to verify signature ' -                        'with key %s.' % pubkey['keyid']) -                result = result.valid -            # ok, enough checks. let's return data if available -            if hasattr(result, 'data'): -                result = result.data -        return result -    return wrapped +logger = logging.getLogger(__name__)  class TempGPGWrapper(object): @@ -166,12 +53,15 @@ class TempGPGWrapper(object):      one key. This function creates an empty temporary keyring and imports      C{keys} if it is not None.      """ -    def __init__(self, keys=None): +    def __init__(self, keys=None, gpgbinary=None):          """          :param keys: OpenPGP key, or list of.          :type keys: OpenPGPKey or list of OpenPGPKeys +        :param gpgbinary: Name for GnuPG binary executable. +        :type gpgbinary: C{str}          """          self._gpg = None +        self._gpgbinary = gpgbinary          if not keys:              keys = list()          if not isinstance(keys, list): @@ -221,7 +111,9 @@ class TempGPGWrapper(object):          listkeys = lambda: self._gpg.list_keys()          listsecretkeys = lambda: self._gpg.list_keys(secret=True) -        self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp()) +        self._gpg = GPGWrapper( +            gnupghome=tempfile.mkdtemp(), +            gpgbinary=self._gpgbinary)          leap_assert(len(listkeys()) is 0, 'Keyring not empty.')          # import keys into the keyring: @@ -266,144 +158,6 @@ class TempGPGWrapper(object):              shutil.rmtree(self._gpg.gnupghome) -# -# API functions -# - -@with_temporary_gpg -def encrypt_asym(data, key, passphrase=None, sign=None): -    """ -    Encrypt C{data} using public @{key} and sign with C{sign} key. - -    :param data: The data to be encrypted. -    :type data: str -    :param pubkey: The key used to encrypt. -    :type pubkey: OpenPGPKey -    :param sign: The key used for signing. -    :type sign: OpenPGPKey - -    :return: The encrypted data. -    :rtype: str -    """ -    leap_assert_type(key, OpenPGPKey) -    leap_assert(key.private is False, 'Key is not public.') -    if sign is not None: -        leap_assert_type(sign, OpenPGPKey) -        leap_assert(sign.private is True) - -    # Here we cannot assert for correctness of sig because the sig is in -    # the ciphertext. -    # result.ok    - (bool) indicates if the operation succeeded -    # result.data  - (bool) contains the result of the operation - -    return lambda gpg: gpg.encrypt( -        data, key.fingerprint, -        sign=sign.key_id if sign else None, -        passphrase=passphrase, symmetric=False) - - -@with_temporary_gpg -def decrypt_asym(data, key, passphrase=None, verify=None): -    """ -    Decrypt C{data} using private @{key} and verify with C{verify} key. - -    :param data: The data to be decrypted. -    :type data: str -    :param privkey: The key used to decrypt. -    :type privkey: OpenPGPKey -    :param verify: The key used to verify a signature. -    :type verify: OpenPGPKey - -    :return: The decrypted data. -    :rtype: str - -    @raise InvalidSignature: Raised if unable to verify the signature with -        C{verify} key. -    """ -    leap_assert(key.private is True, 'Key is not private.') -    if verify is not None: -        leap_assert_type(verify, OpenPGPKey) -        leap_assert(verify.private is False) - -    return lambda gpg: gpg.decrypt( -        data, passphrase=passphrase) - - -@with_temporary_gpg -def is_encrypted(data): -    """ -    Return whether C{data} was encrypted using OpenPGP. - -    :param data: The data we want to know about. -    :type data: str - -    :return: Whether C{data} was encrypted using this wrapper. -    :rtype: bool -    """ -    return lambda gpg: gpg.is_encrypted(data) - - -@with_temporary_gpg -def is_encrypted_asym(data): -    """ -    Return whether C{data} was asymmetrically encrypted using OpenPGP. - -    :param data: The data we want to know about. -    :type data: str - -    :return: Whether C{data} was encrypted using this wrapper. -    :rtype: bool -    """ -    return lambda gpg: gpg.is_encrypted_asym(data) - - -@with_temporary_gpg -def sign(data, privkey): -    """ -    Sign C{data} with C{privkey}. - -    :param data: The data to be signed. -    :type data: str - -    :param privkey: The private key to be used to sign. -    :type privkey: OpenPGPKey - -    :return: The ascii-armored signed data. -    :rtype: str -    """ -    leap_assert_type(privkey, OpenPGPKey) -    leap_assert(privkey.private is True) - -    # result.fingerprint - contains the fingerprint of the key used to -    #                      sign. -    return lambda gpg: gpg.sign(data, keyid=privkey.key_id) - - -@with_temporary_gpg -def verify(data, key): -    """ -    Verify signed C{data} with C{pubkey}. - -    :param data: The data to be verified. -    :type data: str - -    :param pubkey: The public key to be used on verification. -    :type pubkey: OpenPGPKey - -    :return: The ascii-armored signed data. -    :rtype: str -    """ -    leap_assert_type(key, OpenPGPKey) -    leap_assert(key.private is False) - -    return lambda gpg: gpg.verify(data) - - -# -# Helper functions -# - -  def _build_key_from_gpg(address, key, key_data):      """      Build an OpenPGPKey for C{address} based on C{key} from @@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey):  class OpenPGPScheme(EncryptionScheme):      """ -    A wrapper for OpenPGP keys. +    A wrapper for OpenPGP keys management and use (encryption, decyption, +    signing and verification).      """ -    def __init__(self, soledad): +    def __init__(self, soledad, gpgbinary=None):          """          Initialize the OpenPGP wrapper.          :param soledad: A Soledad instance for key storage.          :type soledad: leap.soledad.Soledad +        :param gpgbinary: Name for GnuPG binary executable. +        :type gpgbinary: C{str}          """          EncryptionScheme.__init__(self, soledad) +        self._gpgbinary = gpgbinary + +    # +    # Keys management +    #      def gen_key(self, address):          """ @@ -475,7 +237,8 @@ class OpenPGPScheme(EncryptionScheme):          except errors.KeyNotFound:              logger.debug('Key for %s not found' % (address,)) -        def _gen_key(gpg): +        with self._temporary_gpgwrapper() as gpg: +            # TODO: inspect result, or use decorator              params = gpg.gen_key_input(                  key_type='RSA',                  key_length=4096, @@ -512,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme):                      gpg.export_keys(key['fingerprint'], secret=secret))                  self.put_key(openpgp_key) -        with temporary_gpgwrapper() as gpg: -            # TODO: inspect result, or use decorator -            _gen_key(gpg) -          return self.get_key(address, private=True)      def get_key(self, address, private=False): @@ -548,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme):          # TODO: add more checks for correct key data.          leap_assert(key_data is not None, 'Data does not represent a key.') -        def _put_ascii_key(gpg): +        with self._temporary_gpgwrapper() as gpg: +            # TODO: inspect result, or use decorator              gpg.import_keys(key_data)              privkey = None              pubkey = None @@ -584,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme):                  gpg.export_keys(pubkey['fingerprint'], secret=False))              self.put_key(openpgp_pubkey) -        with temporary_gpgwrapper() as gpg: -            # TODO: inspect result, or use decorator -            _put_ascii_key(gpg) -      def put_key(self, key):          """          Put C{key} in local storage. @@ -643,3 +399,182 @@ class OpenPGPScheme(EncryptionScheme):              raise errors.KeyAttributesDiffer(key)          doc = self._get_key_doc(key.address, key.private)          self._soledad.delete_doc(doc) + +    # +    # Data encryption, decryption, signing and verifying +    # + +    def _temporary_gpgwrapper(self, keys=None): +        """ +        Returns a unitary gpg wrapper that implements context manager +        protocol. + +        :param key_data: ASCII armored key data. +        :type key_data: str +        :param gpgbinary: Name for GnuPG binary executable. +        :type gpgbinary: C{str} + +        :return: a GPGWrapper instance +        :rtype: GPGWrapper +        """ +        # TODO do here checks on key_data +        return TempGPGWrapper( +            keys=keys, gpgbinary=self._gpgbinary) + +    @staticmethod +    def _assert_gpg_result_ok(result): +        """ +        Check if GPG result is 'ok' and log stderr outputs. +        :param result: The GPG results +        :type result: +        """ +        stderr = getattr(result, 'stderr', None) +        if stderr: +            logger.debug("%s" % (stderr,)) +        if getattr(result, 'ok', None) is not True: +            raise errors.EncryptionDecryptionFailed( +                'Failed to encrypt/decrypt in %s: %s' % ( +                    this.encrypt, +                    stderr)) + +    def encrypt(self, data, pubkey, passphrase=None, sign=None): +        """ +        Encrypt C{data} using public @{pubkey} and sign with C{sign} key. + +        :param data: The data to be encrypted. +        :type data: str +        :param pubkey: The key used to encrypt. +        :type pubkey: OpenPGPKey +        :param sign: The key used for signing. +        :type sign: OpenPGPKey + +        :return: The encrypted data. +        :rtype: str +        """ +        leap_assert_type(pubkey, OpenPGPKey) +        leap_assert(pubkey.private is False, 'Key is not public.') +        keys = [pubkey] +        if sign is not None: +            leap_assert_type(sign, OpenPGPKey) +            leap_assert(sign.private is True) +            keys.append(sign) +        with self._temporary_gpgwrapper(keys) as gpg: +            result = gpg.encrypt( +                data, pubkey.fingerprint, +                sign=sign.key_id if sign else None, +                passphrase=passphrase, symmetric=False) +            # Here we cannot assert for correctness of sig because the sig is +            # in the ciphertext. +            # result.ok    - (bool) indicates if the operation succeeded +            # result.data  - (bool) contains the result of the operation +            self._assert_gpg_result_ok(result) +            return result.data + +    def decrypt(self, data, privkey, passphrase=None, verify=None): +        """ +        Decrypt C{data} using private @{privkey} and verify with C{verify} key. + +        :param data: The data to be decrypted. +        :type data: str +        :param privkey: The key used to decrypt. +        :type privkey: OpenPGPKey +        :param verify: The key used to verify a signature. +        :type verify: OpenPGPKey + +        :return: The decrypted data. +        :rtype: str + +        @raise InvalidSignature: Raised if unable to verify the signature with +            C{verify} key. +        """ +        leap_assert(privkey.private is True, 'Key is not private.') +        keys = [privkey] +        if verify is not None: +            leap_assert_type(verify, OpenPGPKey) +            leap_assert(verify.private is False) +            keys.append(verify) +        with self._temporary_gpgwrapper(keys) as gpg: +            result = gpg.decrypt(data, passphrase=passphrase) +            self._assert_gpg_result_ok(result) +            # verify signature +            if (verify is not None): +                if result.valid is False or \ +                        verify.fingerprint != result.pubkey_fingerprint: +                    raise errors.InvalidSignature( +                        'Failed to verify signature with key %s: %s' % +                        (verify.key_id, stderr)) +            return result.data + +    def is_encrypted(self, data): +        """ +        Return whether C{data} was asymmetrically encrypted using OpenPGP. + +        :param data: The data we want to know about. +        :type data: str + +        :return: Whether C{data} was encrypted using this wrapper. +        :rtype: bool +        """ +        with self._temporary_gpgwrapper() as gpg: +            return gpg.is_encrypted_asym(data) + +    def sign(self, data, privkey): +        """ +        Sign C{data} with C{privkey}. + +        :param data: The data to be signed. +        :type data: str + +        :param privkey: The private key to be used to sign. +        :type privkey: OpenPGPKey + +        :return: The ascii-armored signed data. +        :rtype: str +        """ +        leap_assert_type(privkey, OpenPGPKey) +        leap_assert(privkey.private is True) + +        # result.fingerprint - contains the fingerprint of the key used to +        #                      sign. +        with self._temporary_gpgwrapper(privkey) as gpg: +            result = gpg.sign(data, keyid=privkey.key_id) +            rfprint = privkey.fingerprint +            privkey = gpg.list_keys(secret=True).pop() +            kfprint = privkey['fingerprint'] +            if result.fingerprint is None: +                raise errors.SignFailed( +                    'Failed to sign with key %s: %s' % +                    (privkey['keyid'], stderr)) +            leap_assert( +                result.fingerprint == kfprint, +                'Signature and private key fingerprints mismatch: ' +                '%s != %s' % (rfprint, kfprint)) +        return result.data + +    def verify(self, data, pubkey): +        """ +        Verify signed C{data} with C{pubkey}. + +        :param data: The data to be verified. +        :type data: str + +        :param pubkey: The public key to be used on verification. +        :type pubkey: OpenPGPKey + +        :return: The ascii-armored signed data. +        :rtype: str +        """ +        leap_assert_type(pubkey, OpenPGPKey) +        leap_assert(pubkey.private is False) +        with self._temporary_gpgwrapper(pubkey) as gpg: +            result = gpg.verify(data) +            gpgpubkey = gpg.list_keys().pop() +            valid = result.valid +            rfprint = result.fingerprint +            kfprint = gpgpubkey['fingerprint'] +            # raise in case sig is invalid +            if valid is False or rfprint != kfprint: +                raise errors.InvalidSignature( +                    'Failed to verify signature ' +                    'with key %s.' % gpgpubkey['keyid']) +            return True diff --git a/keymanager/src/leap/keymanager/tests/test_keymanager.py b/keymanager/src/leap/keymanager/tests/test_keymanager.py index 09a6183..a474e32 100644 --- a/keymanager/src/leap/keymanager/tests/test_keymanager.py +++ b/keymanager/src/leap/keymanager/tests/test_keymanager.py @@ -40,6 +40,7 @@ from leap.keymanager.keys import (  ADDRESS = 'leap@leap.se'  ADDRESS_2 = 'anotheruser@leap.se' +GPG_BINARY_PATH = '/usr/bin/gpg'  class KeyManagerUtilTestCase(BaseLeapTest): @@ -136,7 +137,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):              km._wrapper_map[key.__class__].delete_key(key)      def _key_manager(self, user=ADDRESS, url=''): -        return KeyManager(user, url, self._soledad) +        return KeyManager(user, url, self._soledad, +                          gpgbinary=GPG_BINARY_PATH)  class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): @@ -152,7 +154,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):              '4096', key.length, 'Wrong key length.')      def test_openpgp_put_delete_key(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_ascii_key(PUBLIC_KEY)          key = pgp.get_key(ADDRESS, private=False) @@ -160,7 +163,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_openpgp_put_ascii_key(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_ascii_key(PUBLIC_KEY)          key = pgp.get_key(ADDRESS, private=False) @@ -173,7 +177,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)      def test_get_public_key(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)          pgp.put_ascii_key(PUBLIC_KEY)          self.assertRaises( @@ -185,24 +190,25 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pgp.delete_key(key)          self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS) -    def test_openpgp_encrypt_decrypt_asym(self): +    def test_openpgp_encrypt_decrypt(self):          # encrypt -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PUBLIC_KEY)          pubkey = pgp.get_key(ADDRESS, private=False) -        cyphertext = openpgp.encrypt_asym('data', pubkey) +        cyphertext = pgp.encrypt('data', pubkey)          # assert          self.assertTrue(cyphertext is not None)          self.assertTrue(cyphertext != '')          self.assertTrue(cyphertext != 'data') -        self.assertTrue(openpgp.is_encrypted_asym(cyphertext)) -        self.assertTrue(openpgp.is_encrypted(cyphertext)) +        self.assertTrue(pgp.is_encrypted(cyphertext)) +        self.assertTrue(pgp.is_encrypted(cyphertext))          # decrypt          self.assertRaises(              KeyNotFound, pgp.get_key, ADDRESS, private=True)          pgp.put_ascii_key(PRIVATE_KEY)          privkey = pgp.get_key(ADDRESS, private=True) -        plaintext = openpgp.decrypt_asym(cyphertext, privkey) +        plaintext = pgp.decrypt(cyphertext, privkey)          pgp.delete_key(pubkey)          pgp.delete_key(privkey)          self.assertRaises( @@ -211,83 +217,91 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):              KeyNotFound, pgp.get_key, ADDRESS, private=True)      def test_verify_with_private_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True) -        signed = openpgp.sign(data, privkey) +        signed = pgp.sign(data, privkey)          self.assertRaises(              AssertionError, -            openpgp.verify, signed, privkey) +            pgp.verify, signed, privkey)      def test_sign_with_public_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PUBLIC_KEY)          data = 'data'          pubkey = pgp.get_key(ADDRESS, private=False)          self.assertRaises(              AssertionError, -            openpgp.sign, data, pubkey) +            pgp.sign, data, pubkey)      def test_verify_with_wrong_key_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True) -        signed = openpgp.sign(data, privkey) +        signed = pgp.sign(data, privkey)          pgp.put_ascii_key(PUBLIC_KEY_2)          wrongkey = pgp.get_key(ADDRESS_2)          self.assertRaises(              errors.InvalidSignature, -            openpgp.verify, signed, wrongkey) +            pgp.verify, signed, wrongkey) -    def test_encrypt_asym_sign_with_public_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +    def test_encrypt_sign_with_public_raises(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          pubkey = pgp.get_key(ADDRESS, private=False)          self.assertRaises(              AssertionError, -            openpgp.encrypt_asym, data, privkey, sign=pubkey) +            pgp.encrypt, data, privkey, sign=pubkey) -    def test_decrypt_asym_verify_with_private_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +    def test_decrypt_verify_with_private_raises(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          pubkey = pgp.get_key(ADDRESS, private=False) -        encrypted_and_signed = openpgp.encrypt_asym( +        encrypted_and_signed = pgp.encrypt(              data, pubkey, sign=privkey)          self.assertRaises(              AssertionError, -            openpgp.decrypt_asym, +            pgp.decrypt,              encrypted_and_signed, privkey, verify=privkey) -    def test_decrypt_asym_verify_with_wrong_key_raises(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +    def test_decrypt_verify_with_wrong_key_raises(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True)          pubkey = pgp.get_key(ADDRESS, private=False) -        encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey) +        encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)          pgp.put_ascii_key(PUBLIC_KEY_2)          wrongkey = pgp.get_key(ADDRESS_2)          self.assertRaises(              errors.InvalidSignature, -            openpgp.verify, encrypted_and_signed, wrongkey) +            pgp.verify, encrypted_and_signed, wrongkey)      def test_sign_verify(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          data = 'data'          privkey = pgp.get_key(ADDRESS, private=True) -        signed = openpgp.sign(data, privkey) +        signed = pgp.sign(data, privkey)          pubkey = pgp.get_key(ADDRESS, private=False) -        self.assertTrue(openpgp.verify(signed, pubkey)) +        self.assertTrue(pgp.verify(signed, pubkey)) -    def test_encrypt_asym_sign_decrypt_verify(self): -        pgp = openpgp.OpenPGPScheme(self._soledad) +    def test_encrypt_sign_decrypt_verify(self): +        pgp = openpgp.OpenPGPScheme( +            self._soledad, gpgbinary=GPG_BINARY_PATH)          pgp.put_ascii_key(PRIVATE_KEY)          pubkey = pgp.get_key(ADDRESS, private=False)          privkey = pgp.get_key(ADDRESS, private=True) @@ -295,9 +309,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):          pubkey2 = pgp.get_key(ADDRESS_2, private=False)          privkey2 = pgp.get_key(ADDRESS_2, private=True)          data = 'data' -        encrypted_and_signed = openpgp.encrypt_asym( +        encrypted_and_signed = pgp.encrypt(              data, pubkey2, sign=privkey) -        res = openpgp.decrypt_asym( +        res = pgp.decrypt(              encrypted_and_signed, privkey2, verify=pubkey)          self.assertTrue(data, res) @@ -448,6 +462,44 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):          self.assertEqual(ADDRESS_2, key.address) +class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): + +    RAW_DATA = 'data' + +    def test_keymanager_openpgp_encrypt_decrypt(self): +        km = self._key_manager() +        # put raw private key +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        # get public key +        pubkey = km.get_key( +            ADDRESS, OpenPGPKey, private=False, fetch_remote=False) +        # encrypt +        encdata = km.encrypt(self.RAW_DATA, pubkey) +        self.assertNotEqual(self.RAW_DATA, encdata) +        # get private key +        privkey = km.get_key( +            ADDRESS, OpenPGPKey, private=True, fetch_remote=False) +        # decrypt +        rawdata = km.decrypt(encdata, privkey) +        self.assertEqual(self.RAW_DATA, rawdata) + +    def test_keymanager_openpgp_sign_verify(self): +        km = self._key_manager() +        # put raw private keys +        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY) +        # get private key for signing +        privkey = km.get_key( +            ADDRESS, OpenPGPKey, private=True, fetch_remote=False) +        # encrypt +        signdata = km.sign(self.RAW_DATA, privkey) +        self.assertNotEqual(self.RAW_DATA, signdata) +        # get public key for verifying +        pubkey = km.get_key( +            ADDRESS, OpenPGPKey, private=False, fetch_remote=False) +        # decrypt +        self.assertTrue(km.verify(signdata, pubkey)) + +  # Key material for testing  # key 24D18DDF: public key "Leap Test Key <leap@leap.se>" | 
