summaryrefslogtreecommitdiff
path: root/src/leap/keymanager/openpgp.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-08-07 16:29:54 +0200
committerdrebs <drebs@leap.se>2013-08-08 15:44:43 +0200
commit5cb58ca086e88951b0313d7d4dc48a9f4c0c85b5 (patch)
treedd03b3e26081be9eb5da513986a7b944b26acdba /src/leap/keymanager/openpgp.py
parent8c1200d745185cdee1d17b127797f8da2da29c80 (diff)
Support bundled GPG and change API.
- Move openpgp encrypt/decrypt/sign/verify API to inside OpenPGP class. - Add encrypt/decrypt/sign/verify API to KeyManager. - Add possibility of passing custom gpg binary to KeyManager and OpenPGPScheme. - Remove "_asym" suffix from method names. - Bump version to 0.2.1. New API is *not* backwards compatible.
Diffstat (limited to 'src/leap/keymanager/openpgp.py')
-rw-r--r--src/leap/keymanager/openpgp.py475
1 files changed, 205 insertions, 270 deletions
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 9d404cb..1e78c70 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -15,15 +15,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
"""
Infrastructure for using OpenPGP keys in Key Manager.
"""
+
+
import logging
import os
import re
import shutil
import tempfile
+
from leap.common.check import leap_assert, leap_assert_type
from leap.keymanager import errors
from leap.keymanager.keys import (
@@ -36,125 +40,8 @@ from leap.keymanager.keys import (
)
from leap.keymanager.gpg import GPGWrapper
-logger = logging.getLogger(__name__)
-
-
-#
-# gpg wrapper and decorator
-#
-
-def temporary_gpgwrapper(keys=None):
- """
- Returns a unitary gpg wrapper that implements context manager
- protocol.
-
- :param key_data: ASCII armored key data.
- :type key_data: str
-
- :return: a GPGWrapper instance
- :rtype: GPGWrapper
- """
- # TODO do here checks on key_data
- return TempGPGWrapper(keys=keys)
-
-
-def with_temporary_gpg(fun):
- """
- Decorator to add a temporary gpg wrapper as context
- to gpg related functions.
-
- Decorated functions are expected to return a function whose only
- argument is a gpgwrapper instance.
- """
- def wrapped(*args, **kwargs):
- """
- We extract the arguments passed to the wrapped function,
- run the function and do validations.
- We expect that the positional arguments are `data`,
- and an optional `key`.
- All the rest of arguments should be passed as named arguments
- to allow for a correct unpacking.
- """
- if len(args) == 2:
- keys = args[1] if isinstance(args[1], OpenPGPKey) else None
- else:
- keys = None
-
- # sign/verify keys passed as arguments
- sign = kwargs.get('sign', None)
- if sign:
- keys = [keys, sign]
-
- verify = kwargs.get('verify', None)
- if verify:
- keys = [keys, verify]
-
- # is the wrapped function sign or verify?
- fun_name = fun.__name__
- is_sign_function = True if fun_name == "sign" else False
- is_verify_function = True if fun_name == "verify" else False
-
- result = None
-
- with temporary_gpgwrapper(keys) as gpg:
- result = fun(*args, **kwargs)(gpg)
-
- # TODO: cleanup a little bit the
- # validation. maybe delegate to other
- # auxiliary functions for clarity.
-
- ok = getattr(result, 'ok', None)
-
- stderr = getattr(result, 'stderr', None)
- if stderr:
- logger.debug("%s" % (stderr,))
-
- if ok is False:
- raise errors.EncryptionDecryptionFailed(
- 'Failed to encrypt/decrypt in %s: %s' % (
- fun.__name__,
- stderr))
-
- if verify is not None:
- # A verify key has been passed
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, stderr))
-
- if is_sign_function:
- # Specific validation for sign function
- privkey = gpg.list_keys(secret=True).pop()
- rfprint = result.fingerprint
- kfprint = privkey['fingerprint']
- if result.fingerprint is None:
- raise errors.SignFailed(
- 'Failed to sign with key %s: %s' %
- (privkey['keyid'], stderr))
- leap_assert(
- result.fingerprint == kfprint,
- 'Signature and private key fingerprints mismatch: '
- '%s != %s' %
- (rfprint, kfprint))
-
- if is_verify_function:
- # Specific validation for verify function
- pubkey = gpg.list_keys().pop()
- valid = result.valid
- rfprint = result.fingerprint
- kfprint = pubkey['fingerprint']
- if valid is False or rfprint != kfprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature '
- 'with key %s.' % pubkey['keyid'])
- result = result.valid
- # ok, enough checks. let's return data if available
- if hasattr(result, 'data'):
- result = result.data
- return result
- return wrapped
+logger = logging.getLogger(__name__)
class TempGPGWrapper(object):
@@ -166,12 +53,15 @@ class TempGPGWrapper(object):
one key. This function creates an empty temporary keyring and imports
C{keys} if it is not None.
"""
- def __init__(self, keys=None):
+ def __init__(self, keys=None, gpgbinary=None):
"""
:param keys: OpenPGP key, or list of.
:type keys: OpenPGPKey or list of OpenPGPKeys
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
self._gpg = None
+ self._gpgbinary = gpgbinary
if not keys:
keys = list()
if not isinstance(keys, list):
@@ -221,7 +111,9 @@ class TempGPGWrapper(object):
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
- self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
+ self._gpg = GPGWrapper(
+ gnupghome=tempfile.mkdtemp(),
+ gpgbinary=self._gpgbinary)
leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
# import keys into the keyring:
@@ -266,144 +158,6 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.gnupghome)
-#
-# API functions
-#
-
-@with_temporary_gpg
-def encrypt_asym(data, key, passphrase=None, sign=None):
- """
- Encrypt C{data} using public @{key} and sign with C{sign} key.
-
- :param data: The data to be encrypted.
- :type data: str
- :param pubkey: The key used to encrypt.
- :type pubkey: OpenPGPKey
- :param sign: The key used for signing.
- :type sign: OpenPGPKey
-
- :return: The encrypted data.
- :rtype: str
- """
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private is False, 'Key is not public.')
- if sign is not None:
- leap_assert_type(sign, OpenPGPKey)
- leap_assert(sign.private is True)
-
- # Here we cannot assert for correctness of sig because the sig is in
- # the ciphertext.
- # result.ok - (bool) indicates if the operation succeeded
- # result.data - (bool) contains the result of the operation
-
- return lambda gpg: gpg.encrypt(
- data, key.fingerprint,
- sign=sign.key_id if sign else None,
- passphrase=passphrase, symmetric=False)
-
-
-@with_temporary_gpg
-def decrypt_asym(data, key, passphrase=None, verify=None):
- """
- Decrypt C{data} using private @{key} and verify with C{verify} key.
-
- :param data: The data to be decrypted.
- :type data: str
- :param privkey: The key used to decrypt.
- :type privkey: OpenPGPKey
- :param verify: The key used to verify a signature.
- :type verify: OpenPGPKey
-
- :return: The decrypted data.
- :rtype: str
-
- @raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
- """
- leap_assert(key.private is True, 'Key is not private.')
- if verify is not None:
- leap_assert_type(verify, OpenPGPKey)
- leap_assert(verify.private is False)
-
- return lambda gpg: gpg.decrypt(
- data, passphrase=passphrase)
-
-
-@with_temporary_gpg
-def is_encrypted(data):
- """
- Return whether C{data} was encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- return lambda gpg: gpg.is_encrypted(data)
-
-
-@with_temporary_gpg
-def is_encrypted_asym(data):
- """
- Return whether C{data} was asymmetrically encrypted using OpenPGP.
-
- :param data: The data we want to know about.
- :type data: str
-
- :return: Whether C{data} was encrypted using this wrapper.
- :rtype: bool
- """
- return lambda gpg: gpg.is_encrypted_asym(data)
-
-
-@with_temporary_gpg
-def sign(data, privkey):
- """
- Sign C{data} with C{privkey}.
-
- :param data: The data to be signed.
- :type data: str
-
- :param privkey: The private key to be used to sign.
- :type privkey: OpenPGPKey
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(privkey, OpenPGPKey)
- leap_assert(privkey.private is True)
-
- # result.fingerprint - contains the fingerprint of the key used to
- # sign.
- return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
-
-
-@with_temporary_gpg
-def verify(data, key):
- """
- Verify signed C{data} with C{pubkey}.
-
- :param data: The data to be verified.
- :type data: str
-
- :param pubkey: The public key to be used on verification.
- :type pubkey: OpenPGPKey
-
- :return: The ascii-armored signed data.
- :rtype: str
- """
- leap_assert_type(key, OpenPGPKey)
- leap_assert(key.private is False)
-
- return lambda gpg: gpg.verify(data)
-
-
-#
-# Helper functions
-#
-
-
def _build_key_from_gpg(address, key, key_data):
"""
Build an OpenPGPKey for C{address} based on C{key} from
@@ -445,17 +199,25 @@ class OpenPGPKey(EncryptionKey):
class OpenPGPScheme(EncryptionScheme):
"""
- A wrapper for OpenPGP keys.
+ A wrapper for OpenPGP keys management and use (encryption, decyption,
+ signing and verification).
"""
- def __init__(self, soledad):
+ def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
:param soledad: A Soledad instance for key storage.
:type soledad: leap.soledad.Soledad
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
"""
EncryptionScheme.__init__(self, soledad)
+ self._gpgbinary = gpgbinary
+
+ #
+ # Keys management
+ #
def gen_key(self, address):
"""
@@ -475,7 +237,8 @@ class OpenPGPScheme(EncryptionScheme):
except errors.KeyNotFound:
logger.debug('Key for %s not found' % (address,))
- def _gen_key(gpg):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
params = gpg.gen_key_input(
key_type='RSA',
key_length=4096,
@@ -512,10 +275,6 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(key['fingerprint'], secret=secret))
self.put_key(openpgp_key)
- with temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- _gen_key(gpg)
-
return self.get_key(address, private=True)
def get_key(self, address, private=False):
@@ -548,7 +307,8 @@ class OpenPGPScheme(EncryptionScheme):
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- def _put_ascii_key(gpg):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
gpg.import_keys(key_data)
privkey = None
pubkey = None
@@ -584,10 +344,6 @@ class OpenPGPScheme(EncryptionScheme):
gpg.export_keys(pubkey['fingerprint'], secret=False))
self.put_key(openpgp_pubkey)
- with temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- _put_ascii_key(gpg)
-
def put_key(self, key):
"""
Put C{key} in local storage.
@@ -643,3 +399,182 @@ class OpenPGPScheme(EncryptionScheme):
raise errors.KeyAttributesDiffer(key)
doc = self._get_key_doc(key.address, key.private)
self._soledad.delete_doc(doc)
+
+ #
+ # Data encryption, decryption, signing and verifying
+ #
+
+ def _temporary_gpgwrapper(self, keys=None):
+ """
+ Returns a unitary gpg wrapper that implements context manager
+ protocol.
+
+ :param key_data: ASCII armored key data.
+ :type key_data: str
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+
+ :return: a GPGWrapper instance
+ :rtype: GPGWrapper
+ """
+ # TODO do here checks on key_data
+ return TempGPGWrapper(
+ keys=keys, gpgbinary=self._gpgbinary)
+
+ @staticmethod
+ def _assert_gpg_result_ok(result):
+ """
+ Check if GPG result is 'ok' and log stderr outputs.
+ :param result: The GPG results
+ :type result:
+ """
+ stderr = getattr(result, 'stderr', None)
+ if stderr:
+ logger.debug("%s" % (stderr,))
+ if getattr(result, 'ok', None) is not True:
+ raise errors.EncryptionDecryptionFailed(
+ 'Failed to encrypt/decrypt in %s: %s' % (
+ this.encrypt,
+ stderr))
+
+ def encrypt(self, data, pubkey, passphrase=None, sign=None):
+ """
+ Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: OpenPGPKey
+ :param sign: The key used for signing.
+ :type sign: OpenPGPKey
+
+ :return: The encrypted data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ keys = [pubkey]
+ if sign is not None:
+ leap_assert_type(sign, OpenPGPKey)
+ leap_assert(sign.private is True)
+ keys.append(sign)
+ with self._temporary_gpgwrapper(keys) as gpg:
+ result = gpg.encrypt(
+ data, pubkey.fingerprint,
+ sign=sign.key_id if sign else None,
+ passphrase=passphrase, symmetric=False)
+ # Here we cannot assert for correctness of sig because the sig is
+ # in the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # result.data - (bool) contains the result of the operation
+ self._assert_gpg_result_ok(result)
+ return result.data
+
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+
+ :return: The decrypted data.
+ :rtype: str
+
+ @raise InvalidSignature: Raised if unable to verify the signature with
+ C{verify} key.
+ """
+ leap_assert(privkey.private is True, 'Key is not private.')
+ keys = [privkey]
+ if verify is not None:
+ leap_assert_type(verify, OpenPGPKey)
+ leap_assert(verify.private is False)
+ keys.append(verify)
+ with self._temporary_gpgwrapper(keys) as gpg:
+ result = gpg.decrypt(data, passphrase=passphrase)
+ self._assert_gpg_result_ok(result)
+ # verify signature
+ if (verify is not None):
+ if result.valid is False or \
+ verify.fingerprint != result.pubkey_fingerprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature with key %s: %s' %
+ (verify.key_id, stderr))
+ return result.data
+
+ def is_encrypted(self, data):
+ """
+ Return whether C{data} was asymmetrically encrypted using OpenPGP.
+
+ :param data: The data we want to know about.
+ :type data: str
+
+ :return: Whether C{data} was encrypted using this wrapper.
+ :rtype: bool
+ """
+ with self._temporary_gpgwrapper() as gpg:
+ return gpg.is_encrypted_asym(data)
+
+ def sign(self, data, privkey):
+ """
+ Sign C{data} with C{privkey}.
+
+ :param data: The data to be signed.
+ :type data: str
+
+ :param privkey: The private key to be used to sign.
+ :type privkey: OpenPGPKey
+
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(privkey, OpenPGPKey)
+ leap_assert(privkey.private is True)
+
+ # result.fingerprint - contains the fingerprint of the key used to
+ # sign.
+ with self._temporary_gpgwrapper(privkey) as gpg:
+ result = gpg.sign(data, keyid=privkey.key_id)
+ rfprint = privkey.fingerprint
+ privkey = gpg.list_keys(secret=True).pop()
+ kfprint = privkey['fingerprint']
+ if result.fingerprint is None:
+ raise errors.SignFailed(
+ 'Failed to sign with key %s: %s' %
+ (privkey['keyid'], stderr))
+ leap_assert(
+ result.fingerprint == kfprint,
+ 'Signature and private key fingerprints mismatch: '
+ '%s != %s' % (rfprint, kfprint))
+ return result.data
+
+ def verify(self, data, pubkey):
+ """
+ Verify signed C{data} with C{pubkey}.
+
+ :param data: The data to be verified.
+ :type data: str
+
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: OpenPGPKey
+
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False)
+ with self._temporary_gpgwrapper(pubkey) as gpg:
+ result = gpg.verify(data)
+ gpgpubkey = gpg.list_keys().pop()
+ valid = result.valid
+ rfprint = result.fingerprint
+ kfprint = gpgpubkey['fingerprint']
+ # raise in case sig is invalid
+ if valid is False or rfprint != kfprint:
+ raise errors.InvalidSignature(
+ 'Failed to verify signature '
+ 'with key %s.' % gpgpubkey['keyid'])
+ return True