From b1024172ed1ab137a2bd6efb687397fe9f56644a Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 30 Sep 2013 15:55:01 -0300 Subject: Update to use gnupg 1.2.2 module. --- changes/feature_2342-use-new-gnupg-module | 1 + pkg/requirements.pip | 2 +- src/leap/keymanager/gpg.py | 398 --------------------------- src/leap/keymanager/openpgp.py | 65 ++--- src/leap/keymanager/tests/test_keymanager.py | 1 - 5 files changed, 35 insertions(+), 432 deletions(-) create mode 100644 changes/feature_2342-use-new-gnupg-module delete mode 100644 src/leap/keymanager/gpg.py diff --git a/changes/feature_2342-use-new-gnupg-module b/changes/feature_2342-use-new-gnupg-module new file mode 100644 index 00000000..b83e8ec3 --- /dev/null +++ b/changes/feature_2342-use-new-gnupg-module @@ -0,0 +1 @@ + o Update code to use gnupg 1.2.2 python module. Closes #2342. diff --git a/pkg/requirements.pip b/pkg/requirements.pip index 9e16e95a..5ebd8038 100644 --- a/pkg/requirements.pip +++ b/pkg/requirements.pip @@ -1,4 +1,4 @@ leap.common>=0.3.0 simplejson requests -python-gnupg +gnupg diff --git a/src/leap/keymanager/gpg.py b/src/leap/keymanager/gpg.py deleted file mode 100644 index b81b218f..00000000 --- a/src/leap/keymanager/gpg.py +++ /dev/null @@ -1,398 +0,0 @@ -# -*- coding: utf-8 -*- -# gpgwrapper.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -A GPG wrapper used to handle OpenPGP keys. - -This is a temporary class that will be superseded by the a revised version of -python-gnupg. -""" - - -import os -import gnupg -import re -from gnupg import ( - logger, - _is_sequence, - _make_binary_stream, -) - - -class ListPackets(): - """ - Handle status messages for --list-packets. - """ - - def __init__(self, gpg): - """ - Initialize the packet listing handling class. - - :param gpg: GPG object instance. - :type gpg: gnupg.GPG - """ - self.gpg = gpg - self.nodata = None - self.key = None - self.need_passphrase = None - self.need_passphrase_sym = None - self.userid_hint = None - - def handle_status(self, key, value): - """ - Handle one line of the --list-packets status message. - - :param key: The status message key. - :type key: str - :param value: The status message value. - :type value: str - """ - # TODO: write tests for handle_status - if key == 'NODATA': - self.nodata = True - if key == 'ENC_TO': - # This will only capture keys in our keyring. In the future we - # may want to include multiple unknown keys in this list. - self.key, _, _ = value.split() - if key == 'NEED_PASSPHRASE': - self.need_passphrase = True - if key == 'NEED_PASSPHRASE_SYM': - self.need_passphrase_sym = True - if key == 'USERID_HINT': - self.userid_hint = value.strip().split() - - -class GPGWrapper(gnupg.GPG): - """ - This is a temporary class for handling GPG requests, and should be - replaced by a more general class used throughout the project. - """ - - GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" - GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS - - def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, - verbose=False, use_agent=False, keyring=None, options=None): - """ - Initialize a GnuPG process wrapper. - - :param gpgbinary: Name for GnuPG binary executable. - :type gpgbinary: C{str} - :param gpghome: Full pathname to directory containing the public and - private keyrings. - :type gpghome: C{str} - :param keyring: Name of alternative keyring file to use. If specified, - the default keyring is not used. - :param verbose: Should some verbose info be output? - :type verbose: bool - :param use_agent: Should pass `--use-agent` to GPG binary? - :type use_agent: bool - :param keyring: Path for the keyring to use. - :type keyring: str - @options: A list of additional options to pass to the GPG binary. - :type options: list - - @raise: RuntimeError with explanation message if there is a problem - invoking gpg. - """ - # XXX: options isn't always supported, so removing for the time being - gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary, - verbose=verbose, use_agent=use_agent, - keyring=keyring)#, options=options) - self.result_map['list-packets'] = ListPackets - - def find_key_by_email(self, email, secret=False): - """ - Find user's key based on their email. - - :param email: Email address of key being searched for. - :type email: str - :param secret: Should we search for a secret key? - :type secret: bool - - :return: The fingerprint of the found key. - :rtype: str - """ - for key in self.list_keys(secret=secret): - for uid in key['uids']: - if re.search(email, uid): - return key - raise LookupError("GnuPG public key for email %s not found!" % email) - - def find_key_by_subkey(self, subkey, secret=False): - """ - Find user's key based on a subkey fingerprint. - - :param email: Subkey fingerprint of the key being searched for. - :type email: str - :param secret: Should we search for a secret key? - :type secret: bool - - :return: The fingerprint of the found key. - :rtype: str - """ - for key in self.list_keys(secret=secret): - for sub in key['subkeys']: - if sub[0] == subkey: - return key - raise LookupError( - "GnuPG public key for subkey %s not found!" % subkey) - - def find_key_by_keyid(self, keyid, secret=False): - """ - Find user's key based on the key ID. - - :param email: The key ID of the key being searched for. - :type email: str - :param secret: Should we search for a secret key? - :type secret: bool - - :return: The fingerprint of the found key. - :rtype: str - """ - for key in self.list_keys(secret=secret): - if keyid == key['keyid']: - return key - raise LookupError( - "GnuPG public key for keyid %s not found!" % keyid) - - def find_key_by_fingerprint(self, fingerprint, secret=False): - """ - Find user's key based on the key fingerprint. - - :param email: The fingerprint of the key being searched for. - :type email: str - :param secret: Should we search for a secret key? - :type secret: bool - - :return: The fingerprint of the found key. - :rtype: str - """ - for key in self.list_keys(secret=secret): - if fingerprint == key['fingerprint']: - return key - raise LookupError( - "GnuPG public key for fingerprint %s not found!" % fingerprint) - - def encrypt(self, data, recipient, sign=None, always_trust=True, - passphrase=None, symmetric=False): - """ - Encrypt data using GPG. - - :param data: The data to be encrypted. - :type data: str - :param recipient: The address of the public key to be used. - :type recipient: str - :param sign: Should the encrypted content be signed? - :type sign: bool - :param always_trust: Skip key validation and assume that used keys - are always fully trusted? - :type always_trust: bool - :param passphrase: The passphrase to be used if symmetric encryption - is desired. - :type passphrase: str - :param symmetric: Should we encrypt to a password? - :type symmetric: bool - - :return: An object with encrypted result in the `data` field. - :rtype: gnupg.Crypt - """ - # TODO: devise a way so we don't need to "always trust". - return gnupg.GPG.encrypt(self, data, recipient, sign=sign, - always_trust=always_trust, - passphrase=passphrase, - symmetric=symmetric, - cipher_algo='AES256') - - def decrypt(self, data, always_trust=True, passphrase=None): - """ - Decrypt data using GPG. - - :param data: The data to be decrypted. - :type data: str - :param always_trust: Skip key validation and assume that used keys - are always fully trusted? - :type always_trust: bool - :param passphrase: The passphrase to be used if symmetric encryption - is desired. - :type passphrase: str - - :return: An object with decrypted result in the `data` field. - :rtype: gnupg.Crypt - """ - # TODO: devise a way so we don't need to "always trust". - return gnupg.GPG.decrypt(self, data, always_trust=always_trust, - passphrase=passphrase) - - def send_keys(self, keyserver, *keyids): - """ - Send keys to a keyserver - - :param keyserver: The keyserver to send the keys to. - :type keyserver: str - :param keyids: The key ids to send. - :type keyids: list - - :return: A list of keys sent to server. - :rtype: gnupg.ListKeys - """ - # TODO: write tests for this. - # TODO: write a SendKeys class to handle status for this. - result = self.result_map['list'](self) - gnupg.logger.debug('send_keys: %r', keyids) - data = gnupg._make_binary_stream("", self.encoding) - args = ['--keyserver', keyserver, '--send-keys'] - args.extend(keyids) - self._handle_io(args, data, result, binary=True) - gnupg.logger.debug('send_keys result: %r', result.__dict__) - data.close() - return result - - def encrypt_file(self, file, recipients, sign=None, - always_trust=False, passphrase=None, - armor=True, output=None, symmetric=False, - cipher_algo=None): - """ - Encrypt the message read from the file-like object 'file'. - - :param file: The file to be encrypted. - :type data: file - :param recipient: The address of the public key to be used. - :type recipient: str - :param sign: Should the encrypted content be signed? - :type sign: bool - :param always_trust: Skip key validation and assume that used keys - are always fully trusted? - :type always_trust: bool - :param passphrase: The passphrase to be used if symmetric encryption - is desired. - :type passphrase: str - :param armor: Create ASCII armored output? - :type armor: bool - :param output: Path of file to write results in. - :type output: str - :param symmetric: Should we encrypt to a password? - :type symmetric: bool - :param cipher_algo: Algorithm to use. - :type cipher_algo: str - - :return: An object with encrypted result in the `data` field. - :rtype: gnupg.Crypt - """ - args = ['--encrypt'] - if symmetric: - args = ['--symmetric'] - if cipher_algo: - args.append('--cipher-algo %s' % cipher_algo) - else: - args = ['--encrypt'] - if not _is_sequence(recipients): - recipients = (recipients,) - for recipient in recipients: - args.append('--recipient "%s"' % recipient) - if armor: # create ascii-armored output - set to False for binary - args.append('--armor') - if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) - if sign: - args.append('--sign --default-key "%s"' % sign) - if always_trust: - args.append("--always-trust") - result = self.result_map['crypt'](self) - self._handle_io(args, file, result, passphrase=passphrase, binary=True) - logger.debug('encrypt result: %r', result.data) - return result - - def list_packets(self, data): - """ - List the sequence of packets. - - :param data: The data to extract packets from. - :type data: str - - :return: An object with packet info. - :rtype ListPackets - """ - args = ["--list-packets"] - result = self.result_map['list-packets'](self) - self._handle_io( - args, - _make_binary_stream(data, self.encoding), - result, - ) - return result - - def encrypted_to(self, data): - """ - Return the key to which data is encrypted to. - - :param data: The data to be examined. - :type data: str - - :return: The fingerprint of the key to which data is encrypted to. - :rtype: str - """ - # TODO: make this support multiple keys. - result = self.list_packets(data) - if not result.key: - raise LookupError( - "Content is not encrypted to a GnuPG key!") - try: - return self.find_key_by_keyid(result.key) - except: - return self.find_key_by_subkey(result.key) - - def is_encrypted_sym(self, data): - """ - Say whether some chunk of data is encrypted to a symmetric key. - - :param data: The data to be examined. - :type data: str - - :return: Whether data is encrypted to a symmetric key. - :rtype: bool - """ - result = self.list_packets(data) - return bool(result.need_passphrase_sym) - - def is_encrypted_asym(self, data): - """ - Say whether some chunk of data is encrypted to a private key. - - :param data: The data to be examined. - :type data: str - - :return: Whether data is encrypted to a private key. - :rtype: bool - """ - result = self.list_packets(data) - return bool(result.key) - - def is_encrypted(self, data): - """ - Say whether some chunk of data is encrypted to a key. - - :param data: The data to be examined. - :type data: str - - :return: Whether data is encrypted to a key. - :rtype: bool - """ - return self.is_encrypted_asym(data) or self.is_encrypted_sym(data) diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 7946db85..9d8d89a1 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -27,6 +27,8 @@ import re import shutil import tempfile +from gnupg import GPG +from gnupg.gnupg import GPGUtilities from leap.common.check import leap_assert, leap_assert_type from leap.keymanager import errors @@ -38,7 +40,6 @@ from leap.keymanager.keys import ( KEYMANAGER_KEY_TAG, TAGS_ADDRESS_PRIVATE_INDEX, ) -from leap.keymanager.gpg import GPGWrapper logger = logging.getLogger(__name__) @@ -46,15 +47,15 @@ logger = logging.getLogger(__name__) class TempGPGWrapper(object): """ - A context manager returning a temporary GPG wrapper keyring, which - contains exactly zero or one pubkeys, and zero or one privkeys. - - Temporary unitary keyrings allow the to use GPG's facilities for exactly - one key. This function creates an empty temporary keyring and imports - C{keys} if it is not None. + A context manager that wraps a temporary GPG keyring which only contains + the keys given at object creation. """ + def __init__(self, keys=None, gpgbinary=None): """ + Create an empty temporary keyring and import any given C{keys} into + it. + :param keys: OpenPGP key, or list of. :type keys: OpenPGPKey or list of OpenPGPKeys :param gpgbinary: Name for GnuPG binary executable. @@ -67,14 +68,15 @@ class TempGPGWrapper(object): if not isinstance(keys, list): keys = [keys] self._keys = keys - for key in filter(None, keys): + for key in keys: leap_assert_type(key, OpenPGPKey) def __enter__(self): """ - Calls the unitary gpgwrapper initializer + Build and return a GPG keyring containing the keys given on + object creation. - :return: A GPG wrapper with a unitary keyring. + :return: A GPG instance containing the keys given on object creation. :rtype: gnupg.GPG """ self._build_keyring() @@ -82,19 +84,16 @@ class TempGPGWrapper(object): def __exit__(self, exc_type, exc_value, traceback): """ - Ensures the gpgwrapper is properly destroyed. + Ensure the gpg is properly destroyed. """ # TODO handle exceptions and log here self._destroy_keyring() def _build_keyring(self): """ - Create an empty GPG keyring and import C{keys} into it. - - :param keys: List of keys to add to the keyring. - :type keys: list of OpenPGPKey + Create a GPG keyring containing the keys given on object creation. - :return: A GPG wrapper with a unitary keyring. + :return: A GPG instance containing the keys given on object creation. :rtype: gnupg.GPG """ privkeys = [key for key in self._keys if key and key.private is True] @@ -111,14 +110,13 @@ class TempGPGWrapper(object): listkeys = lambda: self._gpg.list_keys() listsecretkeys = lambda: self._gpg.list_keys(secret=True) - self._gpg = GPGWrapper( - gnupghome=tempfile.mkdtemp(), - gpgbinary=self._gpgbinary) + self._gpg = GPG(binary=self._gpgbinary, + homedir=tempfile.mkdtemp()) leap_assert(len(listkeys()) is 0, 'Keyring not empty.') # import keys into the keyring: # concatenating ascii-armored keys, which is correctly - # understood by the GPGWrapper. + # understood by GPG. self._gpg.import_keys("".join( [x.key_data for x in publkeys + privkeys])) @@ -135,7 +133,7 @@ class TempGPGWrapper(object): def _destroy_keyring(self): """ - Securely erase a unitary keyring. + Securely erase the keyring. """ # TODO: implement some kind of wiping of data or a more # secure way that @@ -153,9 +151,9 @@ class TempGPGWrapper(object): raise finally: - leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'), + leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'), "watch out! Tried to remove default gnupg home!") - shutil.rmtree(self._gpg.gnupghome) + shutil.rmtree(self._gpg.homedir) def _build_key_from_gpg(address, key, key_data): @@ -406,16 +404,16 @@ class OpenPGPScheme(EncryptionScheme): def _temporary_gpgwrapper(self, keys=None): """ - Returns a unitary gpg wrapper that implements context manager - protocol. + Return a gpg wrapper that implements the context manager protocol and + contains C{keys}. :param key_data: ASCII armored key data. :type key_data: str :param gpgbinary: Name for GnuPG binary executable. :type gpgbinary: C{str} - :return: a GPGWrapper instance - :rtype: GPGWrapper + :return: a TempGPGWrapper instance + :rtype: TempGPGWrapper """ # TODO do here checks on key_data return TempGPGWrapper( @@ -459,8 +457,9 @@ class OpenPGPScheme(EncryptionScheme): with self._temporary_gpgwrapper(keys) as gpg: result = gpg.encrypt( data, pubkey.fingerprint, - sign=sign.key_id if sign else None, - passphrase=passphrase, symmetric=False) + default_key=sign.key_id if sign else None, + passphrase=passphrase, symmetric=False, + cipher_algo='AES256') # Here we cannot assert for correctness of sig because the sig is # in the ciphertext. # result.ok - (bool) indicates if the operation succeeded @@ -492,7 +491,8 @@ class OpenPGPScheme(EncryptionScheme): leap_assert(verify.private is False) keys.append(verify) with self._temporary_gpgwrapper(keys) as gpg: - result = gpg.decrypt(data, passphrase=passphrase) + result = gpg.decrypt( + data, passphrase=passphrase, always_trust=True) self._assert_gpg_result_ok(result) # verify signature if (verify is not None): @@ -514,7 +514,8 @@ class OpenPGPScheme(EncryptionScheme): :rtype: bool """ with self._temporary_gpgwrapper() as gpg: - return gpg.is_encrypted_asym(data) + gpgutil = GPGUtilities(gpg) + return gpgutil.is_encrypted_asym(data) def sign(self, data, privkey): """ @@ -535,7 +536,7 @@ class OpenPGPScheme(EncryptionScheme): # result.fingerprint - contains the fingerprint of the key used to # sign. with self._temporary_gpgwrapper(privkey) as gpg: - result = gpg.sign(data, keyid=privkey.key_id) + result = gpg.sign(data, default_key=privkey.key_id) rfprint = privkey.fingerprint privkey = gpg.list_keys(secret=True).pop() kfprint = privkey['fingerprint'] diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 5f232fed..25126047 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -28,7 +28,6 @@ from leap.keymanager import ( KeyManager, openpgp, KeyNotFound, - NoPasswordGiven, errors, ) from leap.keymanager.openpgp import OpenPGPKey -- cgit v1.2.3