Support bundled GPG and change API.
authordrebs <drebs@leap.se>
Wed, 7 Aug 2013 14:29:54 +0000 (16:29 +0200)
committerdrebs <drebs@leap.se>
Thu, 8 Aug 2013 13:44:43 +0000 (15:44 +0200)
- Move openpgp encrypt/decrypt/sign/verify  API to inside OpenPGP class.
- Add encrypt/decrypt/sign/verify API to KeyManager.
- Add possibility of passing custom gpg binary to KeyManager and
  OpenPGPScheme.
- Remove "_asym" suffix from method names.
- Bump version to 0.2.1. New API is *not* backwards compatible.

changes/feature_3397-keymanager-should-support-bundled-gpg [new file with mode: 0644]
src/leap/keymanager/__init__.py
src/leap/keymanager/keys.py
src/leap/keymanager/openpgp.py
src/leap/keymanager/tests/test_keymanager.py

diff --git a/changes/feature_3397-keymanager-should-support-bundled-gpg b/changes/feature_3397-keymanager-should-support-bundled-gpg
new file mode 100644 (file)
index 0000000..6774dd8
--- /dev/null
@@ -0,0 +1,2 @@
+  o Add support for bundled gpg. Closes #3397.
+  o Refactor API to include encrypt/decrypt/sign/verify in KeyManager.
index e1f318c..d24e08e 100644 (file)
@@ -22,12 +22,13 @@ Key Manager is a Nicknym agent for LEAP client.
 
 import requests
 
-from leap.common.check import leap_assert
+from leap.common.check import leap_assert, leap_assert_type
 from leap.keymanager.errors import (
     KeyNotFound,
     NoPasswordGiven,
 )
 from leap.keymanager.keys import (
+    EncryptionKey,
     build_key_from_dict,
     KEYMANAGER_KEY_TAG,
     TAGS_PRIVATE_INDEX,
@@ -52,7 +53,8 @@ class KeyManager(object):
     PUBKEY_KEY = "user[public_key]"
 
     def __init__(self, address, nickserver_uri, soledad, session_id=None,
-                 ca_cert_path=None, api_uri=None, api_version=None, uid=None):
+                 ca_cert_path=None, api_uri=None, api_version=None, uid=None,
+                 gpgbinary=None):
         """
         Initialize a Key Manager for user's C{address} with provider's
         nickserver reachable in C{url}.
@@ -73,6 +75,8 @@ class KeyManager(object):
         :type api_version: str
         :param uid: The users' UID.
         :type uid: str
+        :param gpgbinary: Name for GnuPG binary executable.
+        :type gpgbinary: C{str}
         """
         self._address = address
         self._nickserver_uri = nickserver_uri
@@ -84,7 +88,7 @@ class KeyManager(object):
         self.uid = uid
         # a dict to map key types to their handlers
         self._wrapper_map = {
-            OpenPGPKey: OpenPGPScheme(soledad),
+            OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary),
             # other types of key will be added to this mapper.
         }
         # the following are used to perform https requests
@@ -334,3 +338,90 @@ class KeyManager(object):
 
     uid = property(
         _get_uid, _set_uid, doc='The uid of the user.')
+
+    #
+    # encrypt/decrypt and sign/verify API
+    #
+
+    def encrypt(self, data, pubkey, passphrase=None, sign=None):
+        """
+        Encrypt C{data} using public @{key} and sign with C{sign} key.
+
+        :param data: The data to be encrypted.
+        :type data: str
+        :param pubkey: The key used to encrypt.
+        :type pubkey: EncryptionKey
+        :param sign: The key used for signing.
+        :type sign: EncryptionKey
+
+        :return: The encrypted data.
+        :rtype: str
+        """
+        leap_assert_type(pubkey, EncryptionKey)
+        leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
+        leap_assert(pubkey.private is False, 'Key is not public.')
+        return self._wrapper_map[pubkey.__class__].encrypt(
+            data, pubkey, passphrase, sign)
+
+    def decrypt(self, data, privkey, passphrase=None, verify=None):
+        """
+        Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+        :param data: The data to be decrypted.
+        :type data: str
+        :param privkey: The key used to decrypt.
+        :type privkey: OpenPGPKey
+        :param verify: The key used to verify a signature.
+        :type verify: OpenPGPKey
+
+        :return: The decrypted data.
+        :rtype: str
+
+        @raise InvalidSignature: Raised if unable to verify the signature with
+            C{verify} key.
+        """
+        leap_assert_type(privkey, EncryptionKey)
+        leap_assert(
+            privkey.__class__ in self._wrapper_map,
+            'Unknown key type.')
+        leap_assert(privkey.private is True, 'Key is not private.')
+        return self._wrapper_map[privkey.__class__].decrypt(
+            data, privkey, passphrase, verify)
+
+    def sign(self, data, privkey):
+        """
+        Sign C{data} with C{privkey}.
+
+        :param data: The data to be signed.
+        :type data: str
+
+        :param privkey: The private key to be used to sign.
+        :type privkey: EncryptionKey
+
+        :return: The signed data.
+        :rtype: str
+        """
+        leap_assert_type(privkey, EncryptionKey)
+        leap_assert(
+            privkey.__class__ in self._wrapper_map,
+            'Unknown key type.')
+        leap_assert(privkey.private is True, 'Key is not private.')
+        return self._wrapper_map[privkey.__class__].sign(data, privkey)
+
+    def verify(self, data, pubkey):
+        """
+        Verify signed C{data} with C{pubkey}.
+
+        :param data: The data to be verified.
+        :type data: str
+
+        :param pubkey: The public key to be used on verification.
+        :type pubkey: EncryptionKey
+
+        :return: The signed data.
+        :rtype: str
+        """
+        leap_assert_type(pubkey, EncryptionKey)
+        leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
+        leap_assert(pubkey.private is False, 'Key is not public.')
+        return self._wrapper_map[pubkey.__class__].verify(data, pubkey)
index 44bd587..1c33745 100644 (file)
@@ -283,3 +283,72 @@ class EncryptionScheme(object):
         :type key: EncryptionKey
         """
         pass
+
+    @abstractmethod
+    def encrypt(self, data, pubkey, passphrase=None, sign=None):
+        """
+        Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+        :param data: The data to be encrypted.
+        :type data: str
+        :param pubkey: The key used to encrypt.
+        :type pubkey: EncryptionKey
+        :param sign: The key used for signing.
+        :type sign: EncryptionKey
+
+        :return: The encrypted data.
+        :rtype: str
+        """
+        pass
+
+    @abstractmethod
+    def decrypt(self, data, privkey, passphrase=None, verify=None):
+        """
+        Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+        :param data: The data to be decrypted.
+        :type data: str
+        :param privkey: The key used to decrypt.
+        :type privkey: OpenPGPKey
+        :param verify: The key used to verify a signature.
+        :type verify: OpenPGPKey
+
+        :return: The decrypted data.
+        :rtype: str
+
+        @raise InvalidSignature: Raised if unable to verify the signature with
+            C{verify} key.
+        """
+        pass
+
+    @abstractmethod
+    def sign(self, data, privkey):
+        """
+        Sign C{data} with C{privkey}.
+
+        :param data: The data to be signed.
+        :type data: str
+
+        :param privkey: The private key to be used to sign.
+        :type privkey: EncryptionKey
+
+        :return: The signed data.
+        :rtype: str
+        """
+        pass
+
+    @abstractmethod
+    def verify(self, data, pubkey):
+        """
+        Verify signed C{data} with C{pubkey}.
+
+        :param data: The data to be verified.
+        :type data: str
+
+        :param pubkey: The public key to be used on verification.
+        :type pubkey: EncryptionKey
+
+        :return: The signed data.
+        :rtype: str
+        """
+        pass
index 9d404cb..1e78c70 100644 (file)
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 
+
 """
 Infrastructure for using OpenPGP keys in Key Manager.
 """
+
+
 import logging
 import os
 import re
 import shutil
 import tempfile
 
+
 from leap.common.check import leap_assert, leap_assert_type
 from leap.keymanager import errors
 from leap.keymanager.keys import (
@@ -36,125 +40,8 @@ from leap.keymanager.keys import (
 )
 from leap.keymanager.gpg import GPGWrapper
 
-logger = logging.getLogger(__name__)
-
-
-#
-# gpg wrapper and decorator
-#
-
-def temporary_gpgwrapper(keys=None):
-    """
-    Returns a unitary gpg wrapper that implements context manager
-    protocol.
-
-    :param key_data: ASCII armored key data.
-    :type key_data: str
-
-    :return: a GPGWrapper instance
-    :rtype: GPGWrapper
-    """
-    # TODO do here checks on key_data
-    return TempGPGWrapper(keys=keys)
-
-
-def with_temporary_gpg(fun):
-    """
-    Decorator to add a temporary gpg wrapper as context
-    to gpg related functions.
-
-    Decorated functions are expected to return a function whose only
-    argument is a gpgwrapper instance.
-    """
-    def wrapped(*args, **kwargs):
-        """
-        We extract the arguments passed to the wrapped function,
-        run the function and do validations.
-        We expect that the positional arguments are `data`,
-        and an optional `key`.
-        All the rest of arguments should be passed as named arguments
-        to allow for a correct unpacking.
-        """
-        if len(args) == 2:
-            keys = args[1] if isinstance(args[1], OpenPGPKey) else None
-        else:
-            keys = None
-
-        # sign/verify keys passed as arguments
-        sign = kwargs.get('sign', None)
-        if sign:
-            keys = [keys, sign]
-
-        verify = kwargs.get('verify', None)
-        if verify:
-            keys = [keys, verify]
-
-        # is the wrapped function sign or verify?
-        fun_name = fun.__name__
-        is_sign_function = True if fun_name == "sign" else False
-        is_verify_function = True if fun_name == "verify" else False
-
-        result = None
-
-        with temporary_gpgwrapper(keys) as gpg:
-            result = fun(*args, **kwargs)(gpg)
-
-            # TODO: cleanup a little bit the
-            # validation. maybe delegate to other
-            # auxiliary functions for clarity.
-
-            ok = getattr(result, 'ok', None)
-
-            stderr = getattr(result, 'stderr', None)
-            if stderr:
-                logger.debug("%s" % (stderr,))
-
-            if ok is False:
-                raise errors.EncryptionDecryptionFailed(
-                    'Failed to encrypt/decrypt in %s: %s' % (
-                        fun.__name__,
-                        stderr))
-
-            if verify is not None:
-                # A verify key has been passed
-                if result.valid is False or \
-                        verify.fingerprint != result.pubkey_fingerprint:
-                    raise errors.InvalidSignature(
-                        'Failed to verify signature with key %s: %s' %
-                        (verify.key_id, stderr))
-
-            if is_sign_function:
-                # Specific validation for sign function
-                privkey = gpg.list_keys(secret=True).pop()
-                rfprint = result.fingerprint
-                kfprint = privkey['fingerprint']
-                if result.fingerprint is None:
-                    raise errors.SignFailed(
-                        'Failed to sign with key %s: %s' %
-                        (privkey['keyid'], stderr))
-                leap_assert(
-                    result.fingerprint == kfprint,
-                    'Signature and private key fingerprints mismatch: '
-                    '%s != %s' %
-                    (rfprint, kfprint))
-
-            if is_verify_function:
-                # Specific validation for verify function
-                pubkey = gpg.list_keys().pop()
-                valid = result.valid
-                rfprint = result.fingerprint
-                kfprint = pubkey['fingerprint']
-                if valid is False or rfprint != kfprint:
-                    raise errors.InvalidSignature(
-                        'Failed to verify signature '
-                        'with key %s.' % pubkey['keyid'])
-                result = result.valid
 
-            # ok, enough checks. let's return data if available
-            if hasattr(result, 'data'):
-                result = result.data
-        return result
-    return wrapped
+logger = logging.getLogger(__name__)
 
 
 class TempGPGWrapper(object):
@@ -166,12 +53,15 @@ class TempGPGWrapper(object):
     one key. This function creates an empty temporary keyring and imports
     C{keys} if it is not None.
     """
-    def __init__(self, keys=None):
+    def __init__(self, keys=None, gpgbinary=None):
         """
         :param keys: OpenPGP key, or list of.
         :type keys: OpenPGPKey or list of OpenPGPKeys
+        :param gpgbinary: Name for GnuPG binary executable.
+        :type gpgbinary: C{str}
         """
         self._gpg = None
+        self._gpgbinary = gpgbinary
         if not keys:
             keys = list()
         if not isinstance(keys, list):
@@ -221,7 +111,9 @@ class TempGPGWrapper(object):
         listkeys = lambda: self._gpg.list_keys()
         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
 
-        self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
+        self._gpg = GPGWrapper(
+            gnupghome=tempfile.mkdtemp(),
+            gpgbinary=self._gpgbinary)
         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
 
         # import keys into the keyring:
@@ -266,144 +158,6 @@ class TempGPGWrapper(object):
             shutil.rmtree(self._gpg.gnupghome)
 
 
-#
-# API functions
-#
-
-@with_temporary_gpg
-def encrypt_asym(data, key, passphrase=None, sign=None):
-    """
-    Encrypt C{data} using public @{key} and sign with C{sign} key.
-
-    :param data: The data to be encrypted.
-    :type data: str
-    :param pubkey: The key used to encrypt.
-    :type pubkey: OpenPGPKey
-    :param sign: The key used for signing.
-    :type sign: OpenPGPKey
-
-    :return: The encrypted data.
-    :rtype: str
-    """
-    leap_assert_type(key, OpenPGPKey)
-    leap_assert(key.private is False, 'Key is not public.')
-    if sign is not None:
-        leap_assert_type(sign, OpenPGPKey)
-        leap_assert(sign.private is True)
-
-    # Here we cannot assert for correctness of sig because the sig is in
-    # the ciphertext.
-    # result.ok    - (bool) indicates if the operation succeeded
-    # result.data  - (bool) contains the result of the operation
-
-    return lambda gpg: gpg.encrypt(
-        data, key.fingerprint,
-        sign=sign.key_id if sign else None,
-        passphrase=passphrase, symmetric=False)
-
-
-@with_temporary_gpg
-def decrypt_asym(data, key, passphrase=None, verify=None):
-    """
-    Decrypt C{data} using private @{key} and verify with C{verify} key.
-
-    :param data: The data to be decrypted.
-    :type data: str
-    :param privkey: The key used to decrypt.
-    :type privkey: OpenPGPKey
-    :param verify: The key used to verify a signature.
-    :type verify: OpenPGPKey
-
-    :return: The decrypted data.
-    :rtype: str
-
-    @raise InvalidSignature: Raised if unable to verify the signature with
-        C{verify} key.
-    """
-    leap_assert(key.private is True, 'Key is not private.')
-    if verify is not None:
-        leap_assert_type(verify, OpenPGPKey)
-        leap_assert(verify.private is False)
-
-    return lambda gpg: gpg.decrypt(
-        data, passphrase=passphrase)
-
-
-@with_temporary_gpg
-def is_encrypted(data):
-    """
-    Return whether C{data} was encrypted using OpenPGP.
-
-    :param data: The data we want to know about.
-    :type data: str
-
-    :return: Whether C{data} was encrypted using this wrapper.
-    :rtype: bool
-    """
-    return lambda gpg: gpg.is_encrypted(data)
-
-
-@with_temporary_gpg
-def is_encrypted_asym(data):
-    """
-    Return whether C{data} was asymmetrically encrypted using OpenPGP.
-
-    :param data: The data we want to know about.
-    :type data: str
-
-    :return: Whether C{data} was encrypted using this wrapper.
-    :rtype: bool
-    """
-    return lambda gpg: gpg.is_encrypted_asym(data)
-
-
-@with_temporary_gpg
-def sign(data, privkey):
-    """
-    Sign C{data} with C{privkey}.
-
-    :param data: The data to be signed.
-    :type data: str
-
-    :param privkey: The private key to be used to sign.
-    :type privkey: OpenPGPKey
-
-    :return: The ascii-armored signed data.
-    :rtype: str
-    """
-    leap_assert_type(privkey, OpenPGPKey)
-    leap_assert(privkey.private is True)
-
-    # result.fingerprint - contains the fingerprint of the key used to
-    #                      sign.
-    return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
-
-
-@with_temporary_gpg
-def verify(data, key):
-    """
-    Verify signed C{data} with C{pubkey}.
-
-    :param data: The data to be verified.
-    :type data: str
-
-    :param pubkey: The public key to be used on verification.
-    :type pubkey: OpenPGPKey
-
-    :return: The ascii-armored signed data.
-    :rtype: str
-    """
-    leap_assert_type(key, OpenPGPKey)
-    leap_assert(key.private is False)
-
-    return lambda gpg: gpg.verify(data)
-
-
-#
-# Helper functions
-#
-
-
 def _build_key_from_gpg(address, key, key_data):
     """
     Build an OpenPGPKey for C{address} based on C{key} from
@@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey):
 
 class OpenPGPScheme(EncryptionScheme):
     """
-    A wrapper for OpenPGP keys.
+    A wrapper for OpenPGP keys management and use (encryption, decyption,
+    signing and verification).
     """
 
-    def __init__(self, soledad):
+    def __init__(self, soledad, gpgbinary=None):
         """
         Initialize the OpenPGP wrapper.
 
         :param soledad: A Soledad instance for key storage.
         :type soledad: leap.soledad.Soledad
+        :param gpgbinary: Name for GnuPG binary executable.
+        :type gpgbinary: C{str}
         """
         EncryptionScheme.__init__(self, soledad)
+        self._gpgbinary = gpgbinary
+
+    #
+    # Keys management
+    #
 
     def gen_key(self, address):
         """
@@ -475,7 +237,8 @@ class OpenPGPScheme(EncryptionScheme):
         except errors.KeyNotFound:
             logger.debug('Key for %s not found' % (address,))
 
-        def _gen_key(gpg):
+        with self._temporary_gpgwrapper() as gpg:
+            # TODO: inspect result, or use decorator
             params = gpg.gen_key_input(
                 key_type='RSA',
                 key_length=4096,
@@ -512,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme):
                     gpg.export_keys(key['fingerprint'], secret=secret))
                 self.put_key(openpgp_key)
 
-        with temporary_gpgwrapper() as gpg:
-            # TODO: inspect result, or use decorator
-            _gen_key(gpg)
-
         return self.get_key(address, private=True)
 
     def get_key(self, address, private=False):
@@ -548,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme):
         # TODO: add more checks for correct key data.
         leap_assert(key_data is not None, 'Data does not represent a key.')
 
-        def _put_ascii_key(gpg):
+        with self._temporary_gpgwrapper() as gpg:
+            # TODO: inspect result, or use decorator
             gpg.import_keys(key_data)
             privkey = None
             pubkey = None
@@ -584,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme):
                 gpg.export_keys(pubkey['fingerprint'], secret=False))
             self.put_key(openpgp_pubkey)
 
-        with temporary_gpgwrapper() as gpg:
-            # TODO: inspect result, or use decorator
-            _put_ascii_key(gpg)
-
     def put_key(self, key):
         """
         Put C{key} in local storage.
@@ -643,3 +399,182 @@ class OpenPGPScheme(EncryptionScheme):
             raise errors.KeyAttributesDiffer(key)
         doc = self._get_key_doc(key.address, key.private)
         self._soledad.delete_doc(doc)
+
+    #
+    # Data encryption, decryption, signing and verifying
+    #
+
+    def _temporary_gpgwrapper(self, keys=None):
+        """
+        Returns a unitary gpg wrapper that implements context manager
+        protocol.
+
+        :param key_data: ASCII armored key data.
+        :type key_data: str
+        :param gpgbinary: Name for GnuPG binary executable.
+        :type gpgbinary: C{str}
+
+        :return: a GPGWrapper instance
+        :rtype: GPGWrapper
+        """
+        # TODO do here checks on key_data
+        return TempGPGWrapper(
+            keys=keys, gpgbinary=self._gpgbinary)
+
+    @staticmethod
+    def _assert_gpg_result_ok(result):
+        """
+        Check if GPG result is 'ok' and log stderr outputs.
+        :param result: The GPG results
+        :type result:
+        """
+        stderr = getattr(result, 'stderr', None)
+        if stderr:
+            logger.debug("%s" % (stderr,))
+        if getattr(result, 'ok', None) is not True:
+            raise errors.EncryptionDecryptionFailed(
+                'Failed to encrypt/decrypt in %s: %s' % (
+                    this.encrypt,
+                    stderr))
+
+    def encrypt(self, data, pubkey, passphrase=None, sign=None):
+        """
+        Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+        :param data: The data to be encrypted.
+        :type data: str
+        :param pubkey: The key used to encrypt.
+        :type pubkey: OpenPGPKey
+        :param sign: The key used for signing.
+        :type sign: OpenPGPKey
+
+        :return: The encrypted data.
+        :rtype: str
+        """
+        leap_assert_type(pubkey, OpenPGPKey)
+        leap_assert(pubkey.private is False, 'Key is not public.')
+        keys = [pubkey]
+        if sign is not None:
+            leap_assert_type(sign, OpenPGPKey)
+            leap_assert(sign.private is True)
+            keys.append(sign)
+        with self._temporary_gpgwrapper(keys) as gpg:
+            result = gpg.encrypt(
+                data, pubkey.fingerprint,
+                sign=sign.key_id if sign else None,
+                passphrase=passphrase, symmetric=False)
+            # Here we cannot assert for correctness of sig because the sig is
+            # in the ciphertext.
+            # result.ok    - (bool) indicates if the operation succeeded
+            # result.data  - (bool) contains the result of the operation
+            self._assert_gpg_result_ok(result)
+            return result.data
+
+    def decrypt(self, data, privkey, passphrase=None, verify=None):
+        """
+        Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+        :param data: The data to be decrypted.
+        :type data: str
+        :param privkey: The key used to decrypt.
+        :type privkey: OpenPGPKey
+        :param verify: The key used to verify a signature.
+        :type verify: OpenPGPKey
+
+        :return: The decrypted data.
+        :rtype: str
+
+        @raise InvalidSignature: Raised if unable to verify the signature with
+            C{verify} key.
+        """
+        leap_assert(privkey.private is True, 'Key is not private.')
+        keys = [privkey]
+        if verify is not None:
+            leap_assert_type(verify, OpenPGPKey)
+            leap_assert(verify.private is False)
+            keys.append(verify)
+        with self._temporary_gpgwrapper(keys) as gpg:
+            result = gpg.decrypt(data, passphrase=passphrase)
+            self._assert_gpg_result_ok(result)
+            # verify signature
+            if (verify is not None):
+                if result.valid is False or \
+                        verify.fingerprint != result.pubkey_fingerprint:
+                    raise errors.InvalidSignature(
+                        'Failed to verify signature with key %s: %s' %
+                        (verify.key_id, stderr))
+            return result.data
+
+    def is_encrypted(self, data):
+        """
+        Return whether C{data} was asymmetrically encrypted using OpenPGP.
+
+        :param data: The data we want to know about.
+        :type data: str
+
+        :return: Whether C{data} was encrypted using this wrapper.
+        :rtype: bool
+        """
+        with self._temporary_gpgwrapper() as gpg:
+            return gpg.is_encrypted_asym(data)
+
+    def sign(self, data, privkey):
+        """
+        Sign C{data} with C{privkey}.
+
+        :param data: The data to be signed.
+        :type data: str
+
+        :param privkey: The private key to be used to sign.
+        :type privkey: OpenPGPKey
+
+        :return: The ascii-armored signed data.
+        :rtype: str
+        """
+        leap_assert_type(privkey, OpenPGPKey)
+        leap_assert(privkey.private is True)
+
+        # result.fingerprint - contains the fingerprint of the key used to
+        #                      sign.
+        with self._temporary_gpgwrapper(privkey) as gpg:
+            result = gpg.sign(data, keyid=privkey.key_id)
+            rfprint = privkey.fingerprint
+            privkey = gpg.list_keys(secret=True).pop()
+            kfprint = privkey['fingerprint']
+            if result.fingerprint is None:
+                raise errors.SignFailed(
+                    'Failed to sign with key %s: %s' %
+                    (privkey['keyid'], stderr))
+            leap_assert(
+                result.fingerprint == kfprint,
+                'Signature and private key fingerprints mismatch: '
+                '%s != %s' % (rfprint, kfprint))
+        return result.data
+
+    def verify(self, data, pubkey):
+        """
+        Verify signed C{data} with C{pubkey}.
+
+        :param data: The data to be verified.
+        :type data: str
+
+        :param pubkey: The public key to be used on verification.
+        :type pubkey: OpenPGPKey
+
+        :return: The ascii-armored signed data.
+        :rtype: str
+        """
+        leap_assert_type(pubkey, OpenPGPKey)
+        leap_assert(pubkey.private is False)
+        with self._temporary_gpgwrapper(pubkey) as gpg:
+            result = gpg.verify(data)
+            gpgpubkey = gpg.list_keys().pop()
+            valid = result.valid
+            rfprint = result.fingerprint
+            kfprint = gpgpubkey['fingerprint']
+            # raise in case sig is invalid
+            if valid is False or rfprint != kfprint:
+                raise errors.InvalidSignature(
+                    'Failed to verify signature '
+                    'with key %s.' % gpgpubkey['keyid'])
+            return True
index 09a6183..a474e32 100644 (file)
@@ -40,6 +40,7 @@ from leap.keymanager.keys import (
 
 ADDRESS = 'leap@leap.se'
 ADDRESS_2 = 'anotheruser@leap.se'
+GPG_BINARY_PATH = '/usr/bin/gpg'
 
 
 class KeyManagerUtilTestCase(BaseLeapTest):
@@ -136,7 +137,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
             km._wrapper_map[key.__class__].delete_key(key)
 
     def _key_manager(self, user=ADDRESS, url=''):
-        return KeyManager(user, url, self._soledad)
+        return KeyManager(user, url, self._soledad,
+                          gpgbinary=GPG_BINARY_PATH)
 
 
 class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -152,7 +154,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
             '4096', key.length, 'Wrong key length.')
 
     def test_openpgp_put_delete_key(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
         pgp.put_ascii_key(PUBLIC_KEY)
         key = pgp.get_key(ADDRESS, private=False)
@@ -160,7 +163,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
     def test_openpgp_put_ascii_key(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
         pgp.put_ascii_key(PUBLIC_KEY)
         key = pgp.get_key(ADDRESS, private=False)
@@ -173,7 +177,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
     def test_get_public_key(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
         pgp.put_ascii_key(PUBLIC_KEY)
         self.assertRaises(
@@ -185,24 +190,25 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pgp.delete_key(key)
         self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
 
-    def test_openpgp_encrypt_decrypt_asym(self):
+    def test_openpgp_encrypt_decrypt(self):
         # encrypt
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PUBLIC_KEY)
         pubkey = pgp.get_key(ADDRESS, private=False)
-        cyphertext = openpgp.encrypt_asym('data', pubkey)
+        cyphertext = pgp.encrypt('data', pubkey)
         # assert
         self.assertTrue(cyphertext is not None)
         self.assertTrue(cyphertext != '')
         self.assertTrue(cyphertext != 'data')
-        self.assertTrue(openpgp.is_encrypted_asym(cyphertext))
-        self.assertTrue(openpgp.is_encrypted(cyphertext))
+        self.assertTrue(pgp.is_encrypted(cyphertext))
+        self.assertTrue(pgp.is_encrypted(cyphertext))
         # decrypt
         self.assertRaises(
             KeyNotFound, pgp.get_key, ADDRESS, private=True)
         pgp.put_ascii_key(PRIVATE_KEY)
         privkey = pgp.get_key(ADDRESS, private=True)
-        plaintext = openpgp.decrypt_asym(cyphertext, privkey)
+        plaintext = pgp.decrypt(cyphertext, privkey)
         pgp.delete_key(pubkey)
         pgp.delete_key(privkey)
         self.assertRaises(
@@ -211,83 +217,91 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
             KeyNotFound, pgp.get_key, ADDRESS, private=True)
 
     def test_verify_with_private_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
-        signed = openpgp.sign(data, privkey)
+        signed = pgp.sign(data, privkey)
         self.assertRaises(
             AssertionError,
-            openpgp.verify, signed, privkey)
+            pgp.verify, signed, privkey)
 
     def test_sign_with_public_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PUBLIC_KEY)
         data = 'data'
         pubkey = pgp.get_key(ADDRESS, private=False)
         self.assertRaises(
             AssertionError,
-            openpgp.sign, data, pubkey)
+            pgp.sign, data, pubkey)
 
     def test_verify_with_wrong_key_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
-        signed = openpgp.sign(data, privkey)
+        signed = pgp.sign(data, privkey)
         pgp.put_ascii_key(PUBLIC_KEY_2)
         wrongkey = pgp.get_key(ADDRESS_2)
         self.assertRaises(
             errors.InvalidSignature,
-            openpgp.verify, signed, wrongkey)
+            pgp.verify, signed, wrongkey)
 
-    def test_encrypt_asym_sign_with_public_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+    def test_encrypt_sign_with_public_raises(self):
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         pubkey = pgp.get_key(ADDRESS, private=False)
         self.assertRaises(
             AssertionError,
-            openpgp.encrypt_asym, data, privkey, sign=pubkey)
+            pgp.encrypt, data, privkey, sign=pubkey)
 
-    def test_decrypt_asym_verify_with_private_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+    def test_decrypt_verify_with_private_raises(self):
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         pubkey = pgp.get_key(ADDRESS, private=False)
-        encrypted_and_signed = openpgp.encrypt_asym(
+        encrypted_and_signed = pgp.encrypt(
             data, pubkey, sign=privkey)
         self.assertRaises(
             AssertionError,
-            openpgp.decrypt_asym,
+            pgp.decrypt,
             encrypted_and_signed, privkey, verify=privkey)
 
-    def test_decrypt_asym_verify_with_wrong_key_raises(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+    def test_decrypt_verify_with_wrong_key_raises(self):
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
         pubkey = pgp.get_key(ADDRESS, private=False)
-        encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey)
+        encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
         pgp.put_ascii_key(PUBLIC_KEY_2)
         wrongkey = pgp.get_key(ADDRESS_2)
         self.assertRaises(
             errors.InvalidSignature,
-            openpgp.verify, encrypted_and_signed, wrongkey)
+            pgp.verify, encrypted_and_signed, wrongkey)
 
     def test_sign_verify(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         data = 'data'
         privkey = pgp.get_key(ADDRESS, private=True)
-        signed = openpgp.sign(data, privkey)
+        signed = pgp.sign(data, privkey)
         pubkey = pgp.get_key(ADDRESS, private=False)
-        self.assertTrue(openpgp.verify(signed, pubkey))
+        self.assertTrue(pgp.verify(signed, pubkey))
 
-    def test_encrypt_asym_sign_decrypt_verify(self):
-        pgp = openpgp.OpenPGPScheme(self._soledad)
+    def test_encrypt_sign_decrypt_verify(self):
+        pgp = openpgp.OpenPGPScheme(
+            self._soledad, gpgbinary=GPG_BINARY_PATH)
         pgp.put_ascii_key(PRIVATE_KEY)
         pubkey = pgp.get_key(ADDRESS, private=False)
         privkey = pgp.get_key(ADDRESS, private=True)
@@ -295,9 +309,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
         pubkey2 = pgp.get_key(ADDRESS_2, private=False)
         privkey2 = pgp.get_key(ADDRESS_2, private=True)
         data = 'data'
-        encrypted_and_signed = openpgp.encrypt_asym(
+        encrypted_and_signed = pgp.encrypt(
             data, pubkey2, sign=privkey)
-        res = openpgp.decrypt_asym(
+        res = pgp.decrypt(
             encrypted_and_signed, privkey2, verify=pubkey)
         self.assertTrue(data, res)
 
@@ -448,6 +462,44 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
         self.assertEqual(ADDRESS_2, key.address)
 
 
+class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
+
+    RAW_DATA = 'data'
+
+    def test_keymanager_openpgp_encrypt_decrypt(self):
+        km = self._key_manager()
+        # put raw private key
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        # get public key
+        pubkey = km.get_key(
+            ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+        # encrypt
+        encdata = km.encrypt(self.RAW_DATA, pubkey)
+        self.assertNotEqual(self.RAW_DATA, encdata)
+        # get private key
+        privkey = km.get_key(
+            ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
+        # decrypt
+        rawdata = km.decrypt(encdata, privkey)
+        self.assertEqual(self.RAW_DATA, rawdata)
+
+    def test_keymanager_openpgp_sign_verify(self):
+        km = self._key_manager()
+        # put raw private keys
+        km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+        # get private key for signing
+        privkey = km.get_key(
+            ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
+        # encrypt
+        signdata = km.sign(self.RAW_DATA, privkey)
+        self.assertNotEqual(self.RAW_DATA, signdata)
+        # get public key for verifying
+        pubkey = km.get_key(
+            ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+        # decrypt
+        self.assertTrue(km.verify(signdata, pubkey))
+
+
 # Key material for testing
 
 # key 24D18DDF: public key "Leap Test Key <leap@leap.se>"