summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-08-07 16:29:54 +0200
committerdrebs <drebs@leap.se>2013-08-08 15:44:43 +0200
commit5cb58ca086e88951b0313d7d4dc48a9f4c0c85b5 (patch)
treedd03b3e26081be9eb5da513986a7b944b26acdba
parent8c1200d745185cdee1d17b127797f8da2da29c80 (diff)
Support bundled GPG and change API.
- Move openpgp encrypt/decrypt/sign/verify API to inside OpenPGP class. - Add encrypt/decrypt/sign/verify API to KeyManager. - Add possibility of passing custom gpg binary to KeyManager and OpenPGPScheme. - Remove "_asym" suffix from method names. - Bump version to 0.2.1. New API is *not* backwards compatible.
-rw-r--r--changes/feature_3397-keymanager-should-support-bundled-gpg2
-rw-r--r--src/leap/keymanager/__init__.py97
-rw-r--r--src/leap/keymanager/keys.py69
-rw-r--r--src/leap/keymanager/openpgp.py475
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py124
5 files changed, 458 insertions, 309 deletions
diff --git a/changes/feature_3397-keymanager-should-support-bundled-gpg b/changes/feature_3397-keymanager-should-support-bundled-gpg
new file mode 100644
index 0000000..6774dd8
--- /dev/null
+++ b/changes/feature_3397-keymanager-should-support-bundled-gpg
@@ -0,0 +1,2 @@
+ o Add support for bundled gpg. Closes #3397.
+ o Refactor API to include encrypt/decrypt/sign/verify in KeyManager.
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index e1f318c..d24e08e 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -22,12 +22,13 @@ Key Manager is a Nicknym agent for LEAP client.
import requests
-from leap.common.check import leap_assert
+from leap.common.check import leap_assert, leap_assert_type
from leap.keymanager.errors import (
KeyNotFound,
NoPasswordGiven,
)
from leap.keymanager.keys import (
+ EncryptionKey,
build_key_from_dict,
KEYMANAGER_KEY_TAG,
TAGS_PRIVATE_INDEX,
@@ -52,7 +53,8 @@ class KeyManager(object):
PUBKEY_KEY = "user[public_key]"
def __init__(self, address, nickserver_uri, soledad, session_id=None,
- ca_cert_path=None, api_uri=None, api_version=None, uid=None):
+ ca_cert_path=None, api_uri=None, api_version=None, uid=None,
+ gpgbinary=None):
"""
Initialize a Key Manager for user's C{address} with provider's
nickserver reachable in C{url}.
@@ -73,6 +75,8 @@ class KeyManager(object):
:type api_version: str
:param uid: The users' UID.
:type uid: str
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
self._address = address
self._nickserver_uri = nickserver_uri
@@ -84,7 +88,7 @@ class KeyManager(object):
self.uid = uid
# a dict to map key types to their handlers
self._wrapper_map = {
- OpenPGPKey: OpenPGPScheme(soledad),
+ OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary),
# other types of key will be added to this mapper.
}
# the following are used to perform https requests
@@ -334,3 +338,90 @@ class KeyManager(object):
uid = property(
_get_uid, _set_uid, doc='The uid of the user.')
+
+ #
+ # encrypt/decrypt and sign/verify API
+ #
+
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{key} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: EncryptionKey
+ :param sign: The key used for signing.
+ :type sign: EncryptionKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, EncryptionKey)
+ leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ return self._wrapper_map[pubkey.__class__].encrypt(
+ data, pubkey, passphrase, sign)
+
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ leap_assert_type(privkey, EncryptionKey)
+ leap_assert(
+ privkey.__class__ in self._wrapper_map,
+ 'Unknown key type.')
+ leap_assert(privkey.private is True, 'Key is not private.')
+ return self._wrapper_map[privkey.__class__].decrypt(
+ data, privkey, passphrase, verify)
+
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ leap_assert_type(privkey, EncryptionKey)
+ leap_assert(
+ privkey.__class__ in self._wrapper_map,
+ 'Unknown key type.')
+ leap_assert(privkey.private is True, 'Key is not private.')
+ return self._wrapper_map[privkey.__class__].sign(data, privkey)
+
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, EncryptionKey)
+ leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ return self._wrapper_map[pubkey.__class__].verify(data, pubkey)
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index 44bd587..1c33745 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -283,3 +283,72 @@ class EncryptionScheme(object):
:type key: EncryptionKey
"""
pass
+
+ @abstractmethod
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: EncryptionKey
+ :param sign: The key used for signing.
+ :type sign: EncryptionKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ pass
+
+ @abstractmethod
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ pass
+
+ @abstractmethod
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ pass
+
+ @abstractmethod
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ pass
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 9d404cb..1e78c70 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -15,15 +15,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
"""
Infrastructure for using OpenPGP keys in Key Manager.
"""
+
+
import logging
import os
import re
import shutil
import tempfile
+
from leap.common.check import leap_assert, leap_assert_type
from leap.keymanager import errors
from leap.keymanager.keys import (
@@ -36,125 +40,8 @@ from leap.keymanager.keys import (
)
from leap.keymanager.gpg import GPGWrapper
-logger = logging.getLogger(__name__)
-
-
-#
-# gpg wrapper and decorator
-#
-
-def temporary_gpgwrapper(keys=None):
- """
- Returns a unitary gpg wrapper that implements context manager
- protocol.
-
- :param key_data: ASCII armored key data.
- :type key_data: str
-
- :return: a GPGWrapper instance
- :rtype: GPGWrapper
- """
- # TODO do here checks on key_data
- return TempGPGWrapper(keys=keys)
-
-
-def with_temporary_gpg(fun):
- """
- Decorator to add a temporary gpg wrapper as context
- to gpg related functions.
-
- Decorated functions are expected to return a function whose only
- argument is a gpgwrapper instance.
- """
- def wrapped(*args, **kwargs):
- """
- We extract the arguments passed to the wrapped function,
- run the function and do validations.
- We expect that the positional arguments are `data`,
- and an optional `key`.
- All the rest of arguments should be passed as named arguments
- to allow for a correct unpacking.
- """
- if len(args) == 2:
- keys = args[1] if isinstance(args[1], OpenPGPKey) else None
- else:
- keys = None
-
- # sign/verify keys passed as arguments
- sign = kwargs.get('sign', None)
- if sign:
- keys = [keys, sign]
-
- verify = kwargs.get('verify', None)
- if verify:
- keys = [keys, verify]
-
- # is the wrapped function sign or verify?
- fun_name = fun.__name__
- is_sign_function = True if fun_name == "sign" else False
- is_verify_function = True if fun_name == "verify" else False
-
- result = None
-
- with temporary_gpgwrapper(keys) as gpg:
- result = fun(*args, **kwargs)(gpg)
-
- # TODO: cleanup a little bit the
- # validation. maybe delegate to other
- # auxiliary functions for clarity.
-
- ok = getattr(result, 'ok', None)
-
- stderr = getattr(result, 'stderr', None)
- if stderr:
- logger.debug("%s" % (stderr,))
-
- if ok is False:
- raise errors.EncryptionDecryptionFailed(
- 'Failed to encrypt/decrypt in %s: %s' % (
- fun.__name__,
- stderr))
-
- if verify is not None:
- # A verify key has been passed
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, stderr))
-
- if is_sign_function:
- # Specific validation for sign function
- privkey = gpg.list_keys(secret=True).pop()
- rfprint = result.fingerprint
- kfprint = privkey['fingerprint']
- if result.fingerprint is None:
- raise errors.SignFailed(
- 'Failed to sign with key %s: %s' %
- (privkey['keyid'], stderr))
- leap_assert(
- result.fingerprint == kfprint,
- 'Signature and private key fingerprints mismatch: '
- '%s != %s' %
- (rfprint, kfprint))
-
- if is_verify_function:
- # Specific validation for verify function
- pubkey = gpg.list_keys().pop()
- valid = result.valid
- rfprint = result.fingerprint
- kfprint = pubkey['fingerprint']
- if valid is False or rfprint != kfprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature '
- 'with key %s.' % pubkey['keyid'])
- result = result.valid
- # ok, enough checks. let's return data if available
- if hasattr(result, 'data'):
- result = result.data
- return result
- return wrapped
+logger = logging.getLogger(__name__)
class TempGPGWrapper(object):
@@ -166,12 +53,15 @@ class TempGPGWrapper(object):
one key. This function creates an empty temporary keyring and imports
C{keys} if it is not None.
"""
- def __init__(self, keys=None):
+ def __init__(self, keys=None, gpgbinary=None):
"""
:param keys: OpenPGP key, or list of.
:type keys: OpenPGPKey or list of OpenPGPKeys
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
self._gpg = None
+ self._gpgbinary = gpgbinary
if not keys:
keys = list()
if not isinstance(keys, list):
@@ -221,7 +111,9 @@ class TempGPGWrapper(object):
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
- self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
+ self._gpg = GPGWrapper(
+ gnupghome=tempfile.mkdtemp(),
+ gpgbinary=self._gpgbinary)
leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
# import keys into the keyring:
@@ -266,144 +158,6 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.gnupghome)
-#
-# API functions
-#
-
-@with_temporary_gpg
-def encrypt_asym(data, key, passphrase=None, sign=None):
- """
- Encrypt C{data} using public @{key} and sign with C{sign} key.
-
- :param data: The data to be encrypted.
- :type data: str
- :param pubkey: The key used to encrypt.
- :type pubkey: OpenPGPKey
- :param sign: The key used for signing.
- :type sign: OpenPGPKey
-
- :return: The encrypted data.
- :rtype: str
- """
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private is False, 'Key is not public.')
- if sign is not None:
- leap_assert_type(sign, OpenPGPKey)
- leap_assert(sign.private is True)
-
- # Here we cannot assert for correctness of sig because the sig is in
- # the ciphertext.
- # result.ok - (bool) indicates if the operation succeeded
- # result.data - (bool) contains the result of the operation
-
- return lambda gpg: gpg.encrypt(
- data, key.fingerprint,
- sign=sign.key_id if sign else None,
- passphrase=passphrase, symmetric=False)
-
-
-@with_temporary_gpg
-def decrypt_asym(data, key, passphrase=None, verify=None):
- """
- Decrypt C{data} using private @{key} and verify with C{verify} key.
-
- :param data: The data to be decrypted.
- :type data: str
- :param privkey: The key used to decrypt.
- :type privkey: OpenPGPKey
- :param verify: The key used to verify a signature.
- :type verify: OpenPGPKey
-
- :return: The decrypted data.
- :rtype: str
-
- @raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
- """
- leap_assert(key.private is True, 'Key is not private.')
- if verify is not None:
- leap_assert_type(verify, OpenPGPKey)
- leap_assert(verify.private is False)
-
- return lambda gpg: gpg.decrypt(
- data, passphrase=passphrase)
-
-
-@with_temporary_gpg
-def is_encrypted(data):
- """
- Return whether C{data} was encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- return lambda gpg: gpg.is_encrypted(data)
-
-
-@with_temporary_gpg
-def is_encrypted_asym(data):
- """
- Return whether C{data} was asymmetrically encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- return lambda gpg: gpg.is_encrypted_asym(data)
-
-
-@with_temporary_gpg
-def sign(data, privkey):
- """
- Sign C{data} with C{privkey}.
-
- :param data: The data to be signed.
- :type data: str
-
- :param privkey: The private key to be used to sign.
- :type privkey: OpenPGPKey
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(privkey, OpenPGPKey)
- leap_assert(privkey.private is True)
-
- # result.fingerprint - contains the fingerprint of the key used to
- # sign.
- return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
-
-
-@with_temporary_gpg
-def verify(data, key):
- """
- Verify signed C{data} with C{pubkey}.
-
- :param data: The data to be verified.
- :type data: str
-
- :param pubkey: The public key to be used on verification.
- :type pubkey: OpenPGPKey
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private is False)
-
- return lambda gpg: gpg.verify(data)
-
-
-#
-# Helper functions
-#
-
-
def _build_key_from_gpg(address, key, key_data):
"""
Build an OpenPGPKey for C{address} based on C{key} from
@@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey):
class OpenPGPScheme(EncryptionScheme):
"""
- A wrapper for OpenPGP keys.
+ A wrapper for OpenPGP keys management and use (encryption, decyption,
+ signing and verification).
"""
- def __init__(self, soledad):
+ def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
:param soledad: A Soledad instance for key storage.
:type soledad: leap.soledad.Soledad
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
EncryptionScheme.__init__(self, soledad)
+ self._gpgbinary = gpgbinary
+
+ #
+ # Keys management
+ #
def gen_key(self, address):
"""
@@ -475,7 +237,8 @@ class OpenPGPScheme(EncryptionScheme):
except errors.KeyNotFound:
logger.debug('Key for %s not found' % (address,))
- def _gen_key(gpg):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
params = gpg.gen_key_input(
key_type='RSA',
key_length=4096,
@@ -512,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(key['fingerprint'], secret=secret))
self.put_key(openpgp_key)
- with temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- _gen_key(gpg)
-
return self.get_key(address, private=True)
def get_key(self, address, private=False):
@@ -548,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme):
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- def _put_ascii_key(gpg):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
gpg.import_keys(key_data)
privkey = None
pubkey = None
@@ -584,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(pubkey['fingerprint'], secret=False))
self.put_key(openpgp_pubkey)
- with temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- _put_ascii_key(gpg)
-
def put_key(self, key):
"""
Put C{key} in local storage.
@@ -643,3 +399,182 @@ class OpenPGPScheme(EncryptionScheme):
raise errors.KeyAttributesDiffer(key)
doc = self._get_key_doc(key.address, key.private)
self._soledad.delete_doc(doc)
+
+ #
+ # Data encryption, decryption, signing and verifying
+ #
+
+ def _temporary_gpgwrapper(self, keys=None):
+ """
+ Returns a unitary gpg wrapper that implements context manager
+ protocol.
+
+ :param key_data: ASCII armored key data.
+ :type key_data: str
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+
+ :return: a GPGWrapper instance
+ :rtype: GPGWrapper
+ """
+ # TODO do here checks on key_data
+ return TempGPGWrapper(
+ keys=keys, gpgbinary=self._gpgbinary)
+
+ @staticmethod
+ def _assert_gpg_result_ok(result):
+ """
+ Check if GPG result is 'ok' and log stderr outputs.
+ :param result: The GPG results
+ :type result:
+ """
+ stderr = getattr(result, 'stderr', None)
+ if stderr:
+ logger.debug("%s" % (stderr,))
+ if getattr(result, 'ok', None) is not True:
+ raise errors.EncryptionDecryptionFailed(
+ 'Failed to encrypt/decrypt in %s: %s' % (
+ this.encrypt,
+ stderr))
+
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: OpenPGPKey
+ :param sign: The key used for signing.
+ :type sign: OpenPGPKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ keys = [pubkey]
+ if sign is not None:
+ leap_assert_type(sign, OpenPGPKey)
+ leap_assert(sign.private is True)
+ keys.append(sign)
+ with self._temporary_gpgwrapper(keys) as gpg:
+ result = gpg.encrypt(
+ data, pubkey.fingerprint,
+ sign=sign.key_id if sign else None,
+ passphrase=passphrase, symmetric=False)
+ # Here we cannot assert for correctness of sig because the sig is
+ # in the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
+ self._assert_gpg_result_ok(result)
+ return result.data
+
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ leap_assert(privkey.private is True, 'Key is not private.')
+ keys = [privkey]
+ if verify is not None:
+ leap_assert_type(verify, OpenPGPKey)
+ leap_assert(verify.private is False)
+ keys.append(verify)
+ with self._temporary_gpgwrapper(keys) as gpg:
+ result = gpg.decrypt(data, passphrase=passphrase)
+ self._assert_gpg_result_ok(result)
+ # verify signature
+ if (verify is not None):
+ if result.valid is False or \
+ verify.fingerprint != result.pubkey_fingerprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature with key %s: %s' %
+ (verify.key_id, stderr))
+ return result.data
+
+ def is_encrypted(self, data):
+ """
+ Return whether C{data} was asymmetrically encrypted using OpenPGP.
+
+ :param data: The data we want to know about.
+ :type data: str
+
+ :return: Whether C{data} was encrypted using this wrapper.
+ :rtype: bool
+ """
+ with self._temporary_gpgwrapper() as gpg:
+ return gpg.is_encrypted_asym(data)
+
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: OpenPGPKey
+
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(privkey, OpenPGPKey)
+ leap_assert(privkey.private is True)
+
+ # result.fingerprint - contains the fingerprint of the key used to
+ # sign.
+ with self._temporary_gpgwrapper(privkey) as gpg:
+ result = gpg.sign(data, keyid=privkey.key_id)
+ rfprint = privkey.fingerprint
+ privkey = gpg.list_keys(secret=True).pop()
+ kfprint = privkey['fingerprint']
+ if result.fingerprint is None:
+ raise errors.SignFailed(
+ 'Failed to sign with key %s: %s' %
+ (privkey['keyid'], stderr))
+ leap_assert(
+ result.fingerprint == kfprint,
+ 'Signature and private key fingerprints mismatch: '
+ '%s != %s' % (rfprint, kfprint))
+ return result.data
+
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: OpenPGPKey
+
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False)
+ with self._temporary_gpgwrapper(pubkey) as gpg:
+ result = gpg.verify(data)
+ gpgpubkey = gpg.list_keys().pop()
+ valid = result.valid
+ rfprint = result.fingerprint
+ kfprint = gpgpubkey['fingerprint']
+ # raise in case sig is invalid
+ if valid is False or rfprint != kfprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature '
+ 'with key %s.' % gpgpubkey['keyid'])
+ return True
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 09a6183..a474e32 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -40,6 +40,7 @@ from leap.keymanager.keys import (
ADDRESS = 'leap@leap.se'
ADDRESS_2 = 'anotheruser@leap.se'
+GPG_BINARY_PATH = '/usr/bin/gpg'
class KeyManagerUtilTestCase(BaseLeapTest):
@@ -136,7 +137,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
km._wrapper_map[key.__class__].delete_key(key)
def _key_manager(self, user=ADDRESS, url=''):
- return KeyManager(user, url, self._soledad)
+ return KeyManager(user, url, self._soledad,
+ gpgbinary=GPG_BINARY_PATH)
class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -152,7 +154,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
'4096', key.length, 'Wrong key length.')
def test_openpgp_put_delete_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_ascii_key(PUBLIC_KEY)
key = pgp.get_key(ADDRESS, private=False)
@@ -160,7 +163,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_openpgp_put_ascii_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_ascii_key(PUBLIC_KEY)
key = pgp.get_key(ADDRESS, private=False)
@@ -173,7 +177,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_get_public_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_ascii_key(PUBLIC_KEY)
self.assertRaises(
@@ -185,24 +190,25 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp.delete_key(key)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- def test_openpgp_encrypt_decrypt_asym(self):
+ def test_openpgp_encrypt_decrypt(self):
# encrypt
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PUBLIC_KEY)
pubkey = pgp.get_key(ADDRESS, private=False)
- cyphertext = openpgp.encrypt_asym('data', pubkey)
+ cyphertext = pgp.encrypt('data', pubkey)
# assert
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- self.assertTrue(openpgp.is_encrypted_asym(cyphertext))
- self.assertTrue(openpgp.is_encrypted(cyphertext))
+ self.assertTrue(pgp.is_encrypted(cyphertext))
+ self.assertTrue(pgp.is_encrypted(cyphertext))
# decrypt
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
pgp.put_ascii_key(PRIVATE_KEY)
privkey = pgp.get_key(ADDRESS, private=True)
- plaintext = openpgp.decrypt_asym(cyphertext, privkey)
+ plaintext = pgp.decrypt(cyphertext, privkey)
pgp.delete_key(pubkey)
pgp.delete_key(privkey)
self.assertRaises(
@@ -211,83 +217,91 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
KeyNotFound, pgp.get_key, ADDRESS, private=True)
def test_verify_with_private_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
- signed = openpgp.sign(data, privkey)
+ signed = pgp.sign(data, privkey)
self.assertRaises(
AssertionError,
- openpgp.verify, signed, privkey)
+ pgp.verify, signed, privkey)
def test_sign_with_public_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PUBLIC_KEY)
data = 'data'
pubkey = pgp.get_key(ADDRESS, private=False)
self.assertRaises(
AssertionError,
- openpgp.sign, data, pubkey)
+ pgp.sign, data, pubkey)
def test_verify_with_wrong_key_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
- signed = openpgp.sign(data, privkey)
+ signed = pgp.sign(data, privkey)
pgp.put_ascii_key(PUBLIC_KEY_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
- openpgp.verify, signed, wrongkey)
+ pgp.verify, signed, wrongkey)
- def test_encrypt_asym_sign_with_public_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_encrypt_sign_with_public_raises(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
self.assertRaises(
AssertionError,
- openpgp.encrypt_asym, data, privkey, sign=pubkey)
+ pgp.encrypt, data, privkey, sign=pubkey)
- def test_decrypt_asym_verify_with_private_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_decrypt_verify_with_private_raises(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
- encrypted_and_signed = openpgp.encrypt_asym(
+ encrypted_and_signed = pgp.encrypt(
data, pubkey, sign=privkey)
self.assertRaises(
AssertionError,
- openpgp.decrypt_asym,
+ pgp.decrypt,
encrypted_and_signed, privkey, verify=privkey)
- def test_decrypt_asym_verify_with_wrong_key_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_decrypt_verify_with_wrong_key_raises(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
- encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey)
+ encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
pgp.put_ascii_key(PUBLIC_KEY_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
- openpgp.verify, encrypted_and_signed, wrongkey)
+ pgp.verify, encrypted_and_signed, wrongkey)
def test_sign_verify(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
- signed = openpgp.sign(data, privkey)
+ signed = pgp.sign(data, privkey)
pubkey = pgp.get_key(ADDRESS, private=False)
- self.assertTrue(openpgp.verify(signed, pubkey))
+ self.assertTrue(pgp.verify(signed, pubkey))
- def test_encrypt_asym_sign_decrypt_verify(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_encrypt_sign_decrypt_verify(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
pubkey = pgp.get_key(ADDRESS, private=False)
privkey = pgp.get_key(ADDRESS, private=True)
@@ -295,9 +309,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pubkey2 = pgp.get_key(ADDRESS_2, private=False)
privkey2 = pgp.get_key(ADDRESS_2, private=True)
data = 'data'
- encrypted_and_signed = openpgp.encrypt_asym(
+ encrypted_and_signed = pgp.encrypt(
data, pubkey2, sign=privkey)
- res = openpgp.decrypt_asym(
+ res = pgp.decrypt(
encrypted_and_signed, privkey2, verify=pubkey)
self.assertTrue(data, res)
@@ -448,6 +462,44 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(ADDRESS_2, key.address)
+class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
+
+ RAW_DATA = 'data'
+
+ def test_keymanager_openpgp_encrypt_decrypt(self):
+ km = self._key_manager()
+ # put raw private key
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ # get public key
+ pubkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+ # encrypt
+ encdata = km.encrypt(self.RAW_DATA, pubkey)
+ self.assertNotEqual(self.RAW_DATA, encdata)
+ # get private key
+ privkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
+ # decrypt
+ rawdata = km.decrypt(encdata, privkey)
+ self.assertEqual(self.RAW_DATA, rawdata)
+
+ def test_keymanager_openpgp_sign_verify(self):
+ km = self._key_manager()
+ # put raw private keys
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ # get private key for signing
+ privkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
+ # encrypt
+ signdata = km.sign(self.RAW_DATA, privkey)
+ self.assertNotEqual(self.RAW_DATA, signdata)
+ # get public key for verifying
+ pubkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+ # decrypt
+ self.assertTrue(km.verify(signdata, pubkey))
+
+
# Key material for testing
# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"