summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore34
-rw-r--r--CHANGELOG7
-rw-r--r--setup.py2
-rw-r--r--src/leap/keymanager/__init__.py118
-rw-r--r--src/leap/keymanager/gpg.py3
-rw-r--r--src/leap/keymanager/keys.py69
-rw-r--r--src/leap/keymanager/openpgp.py484
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py124
8 files changed, 523 insertions, 318 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b0f9a13
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,34 @@
+*.swp
+*.swo
+*.pyc
+*.log
+*.*~
+.*
+*_rc.py
+ui_*.py
+!.coveragerc
+!.tx
+bin/
+build/
+core
+debian/python-leap-client/
+dist/
+docs/_build
+docs/covhtml
+include/
+lib/
+local/
+share/
+src/leap/util/reqs.txt
+src/leap.egg-info/
+src/leap_client.egg-info
+src/leap/_branding.py
+src/leap/certs/*.pem
+src/*.egg-info
+src/pysqlcipher
+pkg/osx/dist
+pkg/osx/build
+MANIFEST
+_trial_temp*
+config/*
+CHANGELOG~
diff --git a/CHANGELOG b/CHANGELOG
index 174318d..b7dc4a2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,2 +1,9 @@
+0.3.0 Aug 9:
+ o If a nickserver request fails in any way, notify and continue.
+ o Options parameter in gnupg.GPG isn't supported by all versions, so
+ removing it for the time being.
+ o Add support for bundled gpg. Closes #3397.
+ o Refactor API to include encrypt/decrypt/sign/verify in KeyManager.
+
0.2.0 Jul 12:
o Move keymanager to its own package
diff --git a/setup.py b/setup.py
index 84780ed..88fe35f 100644
--- a/setup.py
+++ b/setup.py
@@ -38,7 +38,7 @@ tests_requirements = [
setup(
name='leap.keymanager',
- version='0.2.0',
+ version='0.3.0',
url='https://leap.se/',
license='GPLv3+',
description='LEAP\'s Key Manager',
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index e1f318c..e6122ff 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -20,14 +20,16 @@
Key Manager is a Nicknym agent for LEAP client.
"""
+import logging
import requests
-from leap.common.check import leap_assert
+from leap.common.check import leap_assert, leap_assert_type
from leap.keymanager.errors import (
KeyNotFound,
NoPasswordGiven,
)
from leap.keymanager.keys import (
+ EncryptionKey,
build_key_from_dict,
KEYMANAGER_KEY_TAG,
TAGS_PRIVATE_INDEX,
@@ -37,6 +39,8 @@ from leap.keymanager.openpgp import (
OpenPGPScheme,
)
+logger = logging.getLogger(__name__)
+
#
# The Key Manager
@@ -52,7 +56,8 @@ class KeyManager(object):
PUBKEY_KEY = "user[public_key]"
def __init__(self, address, nickserver_uri, soledad, session_id=None,
- ca_cert_path=None, api_uri=None, api_version=None, uid=None):
+ ca_cert_path=None, api_uri=None, api_version=None, uid=None,
+ gpgbinary=None):
"""
Initialize a Key Manager for user's C{address} with provider's
nickserver reachable in C{url}.
@@ -73,6 +78,8 @@ class KeyManager(object):
:type api_version: str
:param uid: The users' UID.
:type uid: str
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
self._address = address
self._nickserver_uri = nickserver_uri
@@ -84,7 +91,7 @@ class KeyManager(object):
self.uid = uid
# a dict to map key types to their handlers
self._wrapper_map = {
- OpenPGPKey: OpenPGPScheme(soledad),
+ OpenPGPKey: OpenPGPScheme(soledad, gpgbinary=gpgbinary),
# other types of key will be added to this mapper.
}
# the following are used to perform https requests
@@ -166,12 +173,18 @@ class KeyManager(object):
@raise KeyNotFound: If the key was not found on nickserver.
"""
# request keys from the nickserver
- server_keys = self._get(
- self._nickserver_uri, {'address': address}).json()
- # insert keys in local database
- if self.OPENPGP_KEY in server_keys:
- self._wrapper_map[OpenPGPKey].put_ascii_key(
- server_keys['openpgp'])
+ res = None
+ try:
+ res = self._get(self._nickserver_uri, {'address': address})
+ server_keys = res.json()
+ # insert keys in local database
+ if self.OPENPGP_KEY in server_keys:
+ self._wrapper_map[OpenPGPKey].put_ascii_key(
+ server_keys['openpgp'])
+ except Exception as e:
+ logger.warning("Error retrieving the keys: %r" % (e,))
+ if res:
+ logger.warning("%s" % (res.content,))
#
# key management
@@ -334,3 +347,90 @@ class KeyManager(object):
uid = property(
_get_uid, _set_uid, doc='The uid of the user.')
+
+ #
+ # encrypt/decrypt and sign/verify API
+ #
+
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{key} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: EncryptionKey
+ :param sign: The key used for signing.
+ :type sign: EncryptionKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, EncryptionKey)
+ leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ return self._wrapper_map[pubkey.__class__].encrypt(
+ data, pubkey, passphrase, sign)
+
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ leap_assert_type(privkey, EncryptionKey)
+ leap_assert(
+ privkey.__class__ in self._wrapper_map,
+ 'Unknown key type.')
+ leap_assert(privkey.private is True, 'Key is not private.')
+ return self._wrapper_map[privkey.__class__].decrypt(
+ data, privkey, passphrase, verify)
+
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ leap_assert_type(privkey, EncryptionKey)
+ leap_assert(
+ privkey.__class__ in self._wrapper_map,
+ 'Unknown key type.')
+ leap_assert(privkey.private is True, 'Key is not private.')
+ return self._wrapper_map[privkey.__class__].sign(data, privkey)
+
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, EncryptionKey)
+ leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ return self._wrapper_map[pubkey.__class__].verify(data, pubkey)
diff --git a/src/leap/keymanager/gpg.py b/src/leap/keymanager/gpg.py
index 15c1d9f..b81b218 100644
--- a/src/leap/keymanager/gpg.py
+++ b/src/leap/keymanager/gpg.py
@@ -110,9 +110,10 @@ class GPGWrapper(gnupg.GPG):
@raise: RuntimeError with explanation message if there is a problem
invoking gpg.
"""
+ # XXX: options isn't always supported, so removing for the time being
gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary,
verbose=verbose, use_agent=use_agent,
- keyring=keyring, options=options)
+ keyring=keyring)#, options=options)
self.result_map['list-packets'] = ListPackets
def find_key_by_email(self, email, secret=False):
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index 44bd587..1c33745 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -283,3 +283,72 @@ class EncryptionScheme(object):
:type key: EncryptionKey
"""
pass
+
+ @abstractmethod
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: EncryptionKey
+ :param sign: The key used for signing.
+ :type sign: EncryptionKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ pass
+
+ @abstractmethod
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ pass
+
+ @abstractmethod
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ pass
+
+ @abstractmethod
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: EncryptionKey
+
+ :return: The signed data.
+ :rtype: str
+ """
+ pass
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index d19bb2b..aa04ed0 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -15,15 +15,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
"""
Infrastructure for using OpenPGP keys in Key Manager.
"""
+
+
import logging
import os
import re
import shutil
import tempfile
+
from leap.common.check import leap_assert, leap_assert_type
from leap.keymanager import errors
from leap.keymanager.keys import (
@@ -36,125 +40,8 @@ from leap.keymanager.keys import (
)
from leap.keymanager.gpg import GPGWrapper
-logger = logging.getLogger(__name__)
-
-#
-# gpg wrapper and decorator
-#
-
-def temporary_gpgwrapper(keys=None):
- """
- Returns a unitary gpg wrapper that implements context manager
- protocol.
-
- :param key_data: ASCII armored key data.
- :type key_data: str
-
- :return: a GPGWrapper instance
- :rtype: GPGWrapper
- """
- # TODO do here checks on key_data
- return TempGPGWrapper(keys=keys)
-
-
-def with_temporary_gpg(fun):
- """
- Decorator to add a temporary gpg wrapper as context
- to gpg related functions.
-
- Decorated functions are expected to return a function whose only
- argument is a gpgwrapper instance.
- """
- def wrapped(*args, **kwargs):
- """
- We extract the arguments passed to the wrapped function,
- run the function and do validations.
- We expect that the positional arguments are `data`,
- and an optional `key`.
- All the rest of arguments should be passed as named arguments
- to allow for a correct unpacking.
- """
- if len(args) == 2:
- keys = args[1] if isinstance(args[1], OpenPGPKey) else None
- else:
- keys = None
-
- # sign/verify keys passed as arguments
- sign = kwargs.get('sign', None)
- if sign:
- keys = [keys, sign]
-
- verify = kwargs.get('verify', None)
- if verify:
- keys = [keys, verify]
-
- # is the wrapped function sign or verify?
- fun_name = fun.__name__
- is_sign_function = True if fun_name == "sign" else False
- is_verify_function = True if fun_name == "verify" else False
-
- result = None
-
- with temporary_gpgwrapper(keys) as gpg:
- result = fun(*args, **kwargs)(gpg)
-
- # TODO: cleanup a little bit the
- # validation. maybe delegate to other
- # auxiliary functions for clarity.
-
- ok = getattr(result, 'ok', None)
-
- stderr = getattr(result, 'stderr', None)
- if stderr:
- logger.debug("%s" % (stderr,))
-
- if ok is False:
- raise errors.EncryptionDecryptionFailed(
- 'Failed to encrypt/decrypt in %s: %s' % (
- fun.__name__,
- stderr))
-
- if verify is not None:
- # A verify key has been passed
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, stderr))
-
- if is_sign_function:
- # Specific validation for sign function
- privkey = gpg.list_keys(secret=True).pop()
- rfprint = result.fingerprint
- kfprint = privkey['fingerprint']
- if result.fingerprint is None:
- raise errors.SignFailed(
- 'Failed to sign with key %s: %s' %
- (privkey['keyid'], stderr))
- leap_assert(
- result.fingerprint == kfprint,
- 'Signature and private key fingerprints mismatch: '
- '%s != %s' %
- (rfprint, kfprint))
-
- if is_verify_function:
- # Specific validation for verify function
- pubkey = gpg.list_keys().pop()
- valid = result.valid
- rfprint = result.fingerprint
- kfprint = pubkey['fingerprint']
- if valid is False or rfprint != kfprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature '
- 'with key %s.' % pubkey['keyid'])
- result = result.valid
-
- # ok, enough checks. let's return data if available
- if hasattr(result, 'data'):
- result = result.data
- return result
- return wrapped
+logger = logging.getLogger(__name__)
class TempGPGWrapper(object):
@@ -166,12 +53,15 @@ class TempGPGWrapper(object):
one key. This function creates an empty temporary keyring and imports
C{keys} if it is not None.
"""
- def __init__(self, keys=None):
+ def __init__(self, keys=None, gpgbinary=None):
"""
:param keys: OpenPGP key, or list of.
:type keys: OpenPGPKey or list of OpenPGPKeys
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
self._gpg = None
+ self._gpgbinary = gpgbinary
if not keys:
keys = list()
if not isinstance(keys, list):
@@ -221,7 +111,9 @@ class TempGPGWrapper(object):
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
- self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
+ self._gpg = GPGWrapper(
+ gnupghome=tempfile.mkdtemp(),
+ gpgbinary=self._gpgbinary)
leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
# import keys into the keyring:
@@ -266,144 +158,6 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.gnupghome)
-#
-# API functions
-#
-
-@with_temporary_gpg
-def encrypt_asym(data, key, passphrase=None, sign=None):
- """
- Encrypt C{data} using public @{key} and sign with C{sign} key.
-
- :param data: The data to be encrypted.
- :type data: str
- :param pubkey: The key used to encrypt.
- :type pubkey: OpenPGPKey
- :param sign: The key used for signing.
- :type sign: OpenPGPKey
-
- :return: The encrypted data.
- :rtype: str
- """
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private is False, 'Key is not public.')
- if sign is not None:
- leap_assert_type(sign, OpenPGPKey)
- leap_assert(sign.private is True)
-
- # Here we cannot assert for correctness of sig because the sig is in
- # the ciphertext.
- # result.ok - (bool) indicates if the operation succeeded
- # result.data - (bool) contains the result of the operation
-
- return lambda gpg: gpg.encrypt(
- data, key.fingerprint,
- sign=sign.key_id if sign else None,
- passphrase=passphrase, symmetric=False)
-
-
-@with_temporary_gpg
-def decrypt_asym(data, key, passphrase=None, verify=None):
- """
- Decrypt C{data} using private @{key} and verify with C{verify} key.
-
- :param data: The data to be decrypted.
- :type data: str
- :param privkey: The key used to decrypt.
- :type privkey: OpenPGPKey
- :param verify: The key used to verify a signature.
- :type verify: OpenPGPKey
-
- :return: The decrypted data.
- :rtype: str
-
- @raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
- """
- leap_assert(key.private is True, 'Key is not private.')
- if verify is not None:
- leap_assert_type(verify, OpenPGPKey)
- leap_assert(verify.private is False)
-
- return lambda gpg: gpg.decrypt(
- data, passphrase=passphrase)
-
-
-@with_temporary_gpg
-def is_encrypted(data):
- """
- Return whether C{data} was encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- return lambda gpg: gpg.is_encrypted(data)
-
-
-@with_temporary_gpg
-def is_encrypted_asym(data):
- """
- Return whether C{data} was asymmetrically encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- return lambda gpg: gpg.is_encrypted_asym(data)
-
-
-@with_temporary_gpg
-def sign(data, privkey):
- """
- Sign C{data} with C{privkey}.
-
- :param data: The data to be signed.
- :type data: str
-
- :param privkey: The private key to be used to sign.
- :type privkey: OpenPGPKey
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(privkey, OpenPGPKey)
- leap_assert(privkey.private is True)
-
- # result.fingerprint - contains the fingerprint of the key used to
- # sign.
- return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
-
-
-@with_temporary_gpg
-def verify(data, key):
- """
- Verify signed C{data} with C{pubkey}.
-
- :param data: The data to be verified.
- :type data: str
-
- :param pubkey: The public key to be used on verification.
- :type pubkey: OpenPGPKey
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private is False)
-
- return lambda gpg: gpg.verify(data)
-
-
-#
-# Helper functions
-#
-
-
def _build_key_from_gpg(address, key, key_data):
"""
Build an OpenPGPKey for C{address} based on C{key} from
@@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey):
class OpenPGPScheme(EncryptionScheme):
"""
- A wrapper for OpenPGP keys.
+ A wrapper for OpenPGP keys management and use (encryption, decyption,
+ signing and verification).
"""
- def __init__(self, soledad):
+ def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
:param soledad: A Soledad instance for key storage.
:type soledad: leap.soledad.Soledad
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
EncryptionScheme.__init__(self, soledad)
+ self._gpgbinary = gpgbinary
+
+ #
+ # Keys management
+ #
def gen_key(self, address):
"""
@@ -473,18 +235,28 @@ class OpenPGPScheme(EncryptionScheme):
self.get_key(address)
raise errors.KeyAlreadyExists(address)
except errors.KeyNotFound:
- pass
+ logger.debug('Key for %s not found' % (address,))
- def _gen_key(gpg):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
params = gpg.gen_key_input(
key_type='RSA',
key_length=4096,
name_real=address,
name_email=address,
name_comment='Generated by LEAP Key Manager.')
+ logger.info("About to generate keys... This might take SOME time.")
gpg.gen_key(params)
+ logger.info("Keys for %s have been successfully "
+ "generated." % (address,))
pubkeys = gpg.list_keys()
+
# assert for new key characteristics
+
+ # XXX This exception is not properly catched by the soledad
+ # bootstrapping, so if we do not finish generating the keys
+ # we end with a blocked thread -- kali
+
leap_assert(
len(pubkeys) is 1, # a unitary keyring!
'Keyring has wrong number of keys: %d.' % len(pubkeys))
@@ -503,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(key['fingerprint'], secret=secret))
self.put_key(openpgp_key)
- with temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- _gen_key(gpg)
-
return self.get_key(address, private=True)
def get_key(self, address, private=False):
@@ -539,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme):
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- def _put_ascii_key(gpg):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
gpg.import_keys(key_data)
privkey = None
pubkey = None
@@ -575,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(pubkey['fingerprint'], secret=False))
self.put_key(openpgp_pubkey)
- with temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- _put_ascii_key(gpg)
-
def put_key(self, key):
"""
Put C{key} in local storage.
@@ -634,3 +399,180 @@ class OpenPGPScheme(EncryptionScheme):
raise errors.KeyAttributesDiffer(key)
doc = self._get_key_doc(key.address, key.private)
self._soledad.delete_doc(doc)
+
+ #
+ # Data encryption, decryption, signing and verifying
+ #
+
+ def _temporary_gpgwrapper(self, keys=None):
+ """
+ Returns a unitary gpg wrapper that implements context manager
+ protocol.
+
+ :param key_data: ASCII armored key data.
+ :type key_data: str
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+
+ :return: a GPGWrapper instance
+ :rtype: GPGWrapper
+ """
+ # TODO do here checks on key_data
+ return TempGPGWrapper(
+ keys=keys, gpgbinary=self._gpgbinary)
+
+ @staticmethod
+ def _assert_gpg_result_ok(result):
+ """
+ Check if GPG result is 'ok' and log stderr outputs.
+ :param result: The GPG results
+ :type result:
+ """
+ stderr = getattr(result, 'stderr', None)
+ if stderr:
+ logger.debug("%s" % (stderr,))
+ if getattr(result, 'ok', None) is not True:
+ raise errors.EncryptionDecryptionFailed(
+ 'Failed to encrypt/decrypt: %s' % stderr)
+
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: OpenPGPKey
+ :param sign: The key used for signing.
+ :type sign: OpenPGPKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ keys = [pubkey]
+ if sign is not None:
+ leap_assert_type(sign, OpenPGPKey)
+ leap_assert(sign.private is True)
+ keys.append(sign)
+ with self._temporary_gpgwrapper(keys) as gpg:
+ result = gpg.encrypt(
+ data, pubkey.fingerprint,
+ sign=sign.key_id if sign else None,
+ passphrase=passphrase, symmetric=False)
+ # Here we cannot assert for correctness of sig because the sig is
+ # in the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
+ self._assert_gpg_result_ok(result)
+ return result.data
+
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ leap_assert(privkey.private is True, 'Key is not private.')
+ keys = [privkey]
+ if verify is not None:
+ leap_assert_type(verify, OpenPGPKey)
+ leap_assert(verify.private is False)
+ keys.append(verify)
+ with self._temporary_gpgwrapper(keys) as gpg:
+ result = gpg.decrypt(data, passphrase=passphrase)
+ self._assert_gpg_result_ok(result)
+ # verify signature
+ if (verify is not None):
+ if result.valid is False or \
+ verify.fingerprint != result.pubkey_fingerprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature with key %s: %s' %
+ (verify.key_id, stderr))
+ return result.data
+
+ def is_encrypted(self, data):
+ """
+ Return whether C{data} was asymmetrically encrypted using OpenPGP.
+
+ :param data: The data we want to know about.
+ :type data: str
+
+ :return: Whether C{data} was encrypted using this wrapper.
+ :rtype: bool
+ """
+ with self._temporary_gpgwrapper() as gpg:
+ return gpg.is_encrypted_asym(data)
+
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: OpenPGPKey
+
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(privkey, OpenPGPKey)
+ leap_assert(privkey.private is True)
+
+ # result.fingerprint - contains the fingerprint of the key used to
+ # sign.
+ with self._temporary_gpgwrapper(privkey) as gpg:
+ result = gpg.sign(data, keyid=privkey.key_id)
+ rfprint = privkey.fingerprint
+ privkey = gpg.list_keys(secret=True).pop()
+ kfprint = privkey['fingerprint']
+ if result.fingerprint is None:
+ raise errors.SignFailed(
+ 'Failed to sign with key %s: %s' %
+ (privkey['keyid'], stderr))
+ leap_assert(
+ result.fingerprint == kfprint,
+ 'Signature and private key fingerprints mismatch: '
+ '%s != %s' % (rfprint, kfprint))
+ return result.data
+
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: OpenPGPKey
+
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False)
+ with self._temporary_gpgwrapper(pubkey) as gpg:
+ result = gpg.verify(data)
+ gpgpubkey = gpg.list_keys().pop()
+ valid = result.valid
+ rfprint = result.fingerprint
+ kfprint = gpgpubkey['fingerprint']
+ # raise in case sig is invalid
+ if valid is False or rfprint != kfprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature '
+ 'with key %s.' % gpgpubkey['keyid'])
+ return True
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 09a6183..a474e32 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -40,6 +40,7 @@ from leap.keymanager.keys import (
ADDRESS = 'leap@leap.se'
ADDRESS_2 = 'anotheruser@leap.se'
+GPG_BINARY_PATH = '/usr/bin/gpg'
class KeyManagerUtilTestCase(BaseLeapTest):
@@ -136,7 +137,8 @@ class KeyManagerWithSoledadTestCase(BaseLeapTest):
km._wrapper_map[key.__class__].delete_key(key)
def _key_manager(self, user=ADDRESS, url=''):
- return KeyManager(user, url, self._soledad)
+ return KeyManager(user, url, self._soledad,
+ gpgbinary=GPG_BINARY_PATH)
class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -152,7 +154,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
'4096', key.length, 'Wrong key length.')
def test_openpgp_put_delete_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_ascii_key(PUBLIC_KEY)
key = pgp.get_key(ADDRESS, private=False)
@@ -160,7 +163,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_openpgp_put_ascii_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_ascii_key(PUBLIC_KEY)
key = pgp.get_key(ADDRESS, private=False)
@@ -173,7 +177,8 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
def test_get_public_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
pgp.put_ascii_key(PUBLIC_KEY)
self.assertRaises(
@@ -185,24 +190,25 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pgp.delete_key(key)
self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- def test_openpgp_encrypt_decrypt_asym(self):
+ def test_openpgp_encrypt_decrypt(self):
# encrypt
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PUBLIC_KEY)
pubkey = pgp.get_key(ADDRESS, private=False)
- cyphertext = openpgp.encrypt_asym('data', pubkey)
+ cyphertext = pgp.encrypt('data', pubkey)
# assert
self.assertTrue(cyphertext is not None)
self.assertTrue(cyphertext != '')
self.assertTrue(cyphertext != 'data')
- self.assertTrue(openpgp.is_encrypted_asym(cyphertext))
- self.assertTrue(openpgp.is_encrypted(cyphertext))
+ self.assertTrue(pgp.is_encrypted(cyphertext))
+ self.assertTrue(pgp.is_encrypted(cyphertext))
# decrypt
self.assertRaises(
KeyNotFound, pgp.get_key, ADDRESS, private=True)
pgp.put_ascii_key(PRIVATE_KEY)
privkey = pgp.get_key(ADDRESS, private=True)
- plaintext = openpgp.decrypt_asym(cyphertext, privkey)
+ plaintext = pgp.decrypt(cyphertext, privkey)
pgp.delete_key(pubkey)
pgp.delete_key(privkey)
self.assertRaises(
@@ -211,83 +217,91 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
KeyNotFound, pgp.get_key, ADDRESS, private=True)
def test_verify_with_private_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
- signed = openpgp.sign(data, privkey)
+ signed = pgp.sign(data, privkey)
self.assertRaises(
AssertionError,
- openpgp.verify, signed, privkey)
+ pgp.verify, signed, privkey)
def test_sign_with_public_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PUBLIC_KEY)
data = 'data'
pubkey = pgp.get_key(ADDRESS, private=False)
self.assertRaises(
AssertionError,
- openpgp.sign, data, pubkey)
+ pgp.sign, data, pubkey)
def test_verify_with_wrong_key_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
- signed = openpgp.sign(data, privkey)
+ signed = pgp.sign(data, privkey)
pgp.put_ascii_key(PUBLIC_KEY_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
- openpgp.verify, signed, wrongkey)
+ pgp.verify, signed, wrongkey)
- def test_encrypt_asym_sign_with_public_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_encrypt_sign_with_public_raises(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
self.assertRaises(
AssertionError,
- openpgp.encrypt_asym, data, privkey, sign=pubkey)
+ pgp.encrypt, data, privkey, sign=pubkey)
- def test_decrypt_asym_verify_with_private_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_decrypt_verify_with_private_raises(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
- encrypted_and_signed = openpgp.encrypt_asym(
+ encrypted_and_signed = pgp.encrypt(
data, pubkey, sign=privkey)
self.assertRaises(
AssertionError,
- openpgp.decrypt_asym,
+ pgp.decrypt,
encrypted_and_signed, privkey, verify=privkey)
- def test_decrypt_asym_verify_with_wrong_key_raises(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_decrypt_verify_with_wrong_key_raises(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
pubkey = pgp.get_key(ADDRESS, private=False)
- encrypted_and_signed = openpgp.encrypt_asym(data, pubkey, sign=privkey)
+ encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
pgp.put_ascii_key(PUBLIC_KEY_2)
wrongkey = pgp.get_key(ADDRESS_2)
self.assertRaises(
errors.InvalidSignature,
- openpgp.verify, encrypted_and_signed, wrongkey)
+ pgp.verify, encrypted_and_signed, wrongkey)
def test_sign_verify(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
data = 'data'
privkey = pgp.get_key(ADDRESS, private=True)
- signed = openpgp.sign(data, privkey)
+ signed = pgp.sign(data, privkey)
pubkey = pgp.get_key(ADDRESS, private=False)
- self.assertTrue(openpgp.verify(signed, pubkey))
+ self.assertTrue(pgp.verify(signed, pubkey))
- def test_encrypt_asym_sign_decrypt_verify(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
+ def test_encrypt_sign_decrypt_verify(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=GPG_BINARY_PATH)
pgp.put_ascii_key(PRIVATE_KEY)
pubkey = pgp.get_key(ADDRESS, private=False)
privkey = pgp.get_key(ADDRESS, private=True)
@@ -295,9 +309,9 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
pubkey2 = pgp.get_key(ADDRESS_2, private=False)
privkey2 = pgp.get_key(ADDRESS_2, private=True)
data = 'data'
- encrypted_and_signed = openpgp.encrypt_asym(
+ encrypted_and_signed = pgp.encrypt(
data, pubkey2, sign=privkey)
- res = openpgp.decrypt_asym(
+ res = pgp.decrypt(
encrypted_and_signed, privkey2, verify=pubkey)
self.assertTrue(data, res)
@@ -448,6 +462,44 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(ADDRESS_2, key.address)
+class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
+
+ RAW_DATA = 'data'
+
+ def test_keymanager_openpgp_encrypt_decrypt(self):
+ km = self._key_manager()
+ # put raw private key
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ # get public key
+ pubkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+ # encrypt
+ encdata = km.encrypt(self.RAW_DATA, pubkey)
+ self.assertNotEqual(self.RAW_DATA, encdata)
+ # get private key
+ privkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
+ # decrypt
+ rawdata = km.decrypt(encdata, privkey)
+ self.assertEqual(self.RAW_DATA, rawdata)
+
+ def test_keymanager_openpgp_sign_verify(self):
+ km = self._key_manager()
+ # put raw private keys
+ km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ # get private key for signing
+ privkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
+ # encrypt
+ signdata = km.sign(self.RAW_DATA, privkey)
+ self.assertNotEqual(self.RAW_DATA, signdata)
+ # get public key for verifying
+ pubkey = km.get_key(
+ ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+ # decrypt
+ self.assertTrue(km.verify(signdata, pubkey))
+
+
# Key material for testing
# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"