Update to use gnupg 1.2.2 module.
authordrebs <drebs@leap.se>
Mon, 30 Sep 2013 18:55:01 +0000 (15:55 -0300)
committerdrebs <drebs@leap.se>
Wed, 2 Oct 2013 12:12:28 +0000 (09:12 -0300)
changes/feature_2342-use-new-gnupg-module [new file with mode: 0644]
pkg/requirements.pip
src/leap/keymanager/gpg.py [deleted file]
src/leap/keymanager/openpgp.py
src/leap/keymanager/tests/test_keymanager.py

diff --git a/changes/feature_2342-use-new-gnupg-module b/changes/feature_2342-use-new-gnupg-module
new file mode 100644 (file)
index 0000000..b83e8ec
--- /dev/null
@@ -0,0 +1 @@
+  o Update code to use gnupg 1.2.2 python module. Closes #2342.
index 9e16e95..5ebd803 100644 (file)
@@ -1,4 +1,4 @@
 leap.common>=0.3.0
 simplejson
 requests
-python-gnupg
+gnupg
diff --git a/src/leap/keymanager/gpg.py b/src/leap/keymanager/gpg.py
deleted file mode 100644 (file)
index b81b218..0000000
+++ /dev/null
@@ -1,398 +0,0 @@
-# -*- coding: utf-8 -*-
-# gpgwrapper.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-"""
-A GPG wrapper used to handle OpenPGP keys.
-
-This is a temporary class that will be superseded by the a revised version of
-python-gnupg.
-"""
-
-
-import os
-import gnupg
-import re
-from gnupg import (
-    logger,
-    _is_sequence,
-    _make_binary_stream,
-)
-
-
-class ListPackets():
-    """
-    Handle status messages for --list-packets.
-    """
-
-    def __init__(self, gpg):
-        """
-        Initialize the packet listing handling class.
-
-        :param gpg: GPG object instance.
-        :type gpg: gnupg.GPG
-        """
-        self.gpg = gpg
-        self.nodata = None
-        self.key = None
-        self.need_passphrase = None
-        self.need_passphrase_sym = None
-        self.userid_hint = None
-
-    def handle_status(self, key, value):
-        """
-        Handle one line of the --list-packets status message.
-
-        :param key: The status message key.
-        :type key: str
-        :param value: The status message value.
-        :type value: str
-        """
-        # TODO: write tests for handle_status
-        if key == 'NODATA':
-            self.nodata = True
-        if key == 'ENC_TO':
-            # This will only capture keys in our keyring. In the future we
-            # may want to include multiple unknown keys in this list.
-            self.key, _, _ = value.split()
-        if key == 'NEED_PASSPHRASE':
-            self.need_passphrase = True
-        if key == 'NEED_PASSPHRASE_SYM':
-            self.need_passphrase_sym = True
-        if key == 'USERID_HINT':
-            self.userid_hint = value.strip().split()
-
-
-class GPGWrapper(gnupg.GPG):
-    """
-    This is a temporary class for handling GPG requests, and should be
-    replaced by a more general class used throughout the project.
-    """
-
-    GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"
-    GNUPG_BINARY = "/usr/bin/gpg"  # this has to be changed based on OS
-
-    def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME,
-                 verbose=False, use_agent=False, keyring=None, options=None):
-        """
-        Initialize a GnuPG process wrapper.
-
-        :param gpgbinary: Name for GnuPG binary executable.
-        :type gpgbinary: C{str}
-        :param gpghome: Full pathname to directory containing the public and
-            private keyrings.
-        :type gpghome: C{str}
-        :param keyring: Name of alternative keyring file to use. If specified,
-            the default keyring is not used.
-        :param verbose: Should some verbose info be output?
-        :type verbose: bool
-        :param use_agent: Should pass `--use-agent` to GPG binary?
-        :type use_agent: bool
-        :param keyring: Path for the keyring to use.
-        :type keyring: str
-        @options: A list of additional options to pass to the GPG binary.
-        :type options: list
-
-        @raise: RuntimeError with explanation message if there is a problem
-            invoking gpg.
-        """
-        # XXX: options isn't always supported, so removing for the time being
-        gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary,
-                           verbose=verbose, use_agent=use_agent,
-                           keyring=keyring)#, options=options)
-        self.result_map['list-packets'] = ListPackets
-
-    def find_key_by_email(self, email, secret=False):
-        """
-        Find user's key based on their email.
-
-        :param email: Email address of key being searched for.
-        :type email: str
-        :param secret: Should we search for a secret key?
-        :type secret: bool
-
-        :return: The fingerprint of the found key.
-        :rtype: str
-        """
-        for key in self.list_keys(secret=secret):
-            for uid in key['uids']:
-                if re.search(email, uid):
-                    return key
-        raise LookupError("GnuPG public key for email %s not found!" % email)
-
-    def find_key_by_subkey(self, subkey, secret=False):
-        """
-        Find user's key based on a subkey fingerprint.
-
-        :param email: Subkey fingerprint of the key being searched for.
-        :type email: str
-        :param secret: Should we search for a secret key?
-        :type secret: bool
-
-        :return: The fingerprint of the found key.
-        :rtype: str
-        """
-        for key in self.list_keys(secret=secret):
-            for sub in key['subkeys']:
-                if sub[0] == subkey:
-                    return key
-        raise LookupError(
-            "GnuPG public key for subkey %s not found!" % subkey)
-
-    def find_key_by_keyid(self, keyid, secret=False):
-        """
-        Find user's key based on the key ID.
-
-        :param email: The key ID of the key being searched for.
-        :type email: str
-        :param secret: Should we search for a secret key?
-        :type secret: bool
-
-        :return: The fingerprint of the found key.
-        :rtype: str
-        """
-        for key in self.list_keys(secret=secret):
-            if keyid == key['keyid']:
-                return key
-        raise LookupError(
-            "GnuPG public key for keyid %s not found!" % keyid)
-
-    def find_key_by_fingerprint(self, fingerprint, secret=False):
-        """
-        Find user's key based on the key fingerprint.
-
-        :param email: The fingerprint of the key being searched for.
-        :type email: str
-        :param secret: Should we search for a secret key?
-        :type secret: bool
-
-        :return: The fingerprint of the found key.
-        :rtype: str
-        """
-        for key in self.list_keys(secret=secret):
-            if fingerprint == key['fingerprint']:
-                return key
-        raise LookupError(
-            "GnuPG public key for fingerprint %s not found!" % fingerprint)
-
-    def encrypt(self, data, recipient, sign=None, always_trust=True,
-                passphrase=None, symmetric=False):
-        """
-        Encrypt data using GPG.
-
-        :param data: The data to be encrypted.
-        :type data: str
-        :param recipient: The address of the public key to be used.
-        :type recipient: str
-        :param sign: Should the encrypted content be signed?
-        :type sign: bool
-        :param always_trust: Skip key validation and assume that used keys
-            are always fully trusted?
-        :type always_trust: bool
-        :param passphrase: The passphrase to be used if symmetric encryption
-            is desired.
-        :type passphrase: str
-        :param symmetric: Should we encrypt to a password?
-        :type symmetric: bool
-
-        :return: An object with encrypted result in the `data` field.
-        :rtype: gnupg.Crypt
-        """
-        # TODO: devise a way so we don't need to "always trust".
-        return gnupg.GPG.encrypt(self, data, recipient, sign=sign,
-                                 always_trust=always_trust,
-                                 passphrase=passphrase,
-                                 symmetric=symmetric,
-                                 cipher_algo='AES256')
-
-    def decrypt(self, data, always_trust=True, passphrase=None):
-        """
-        Decrypt data using GPG.
-
-        :param data: The data to be decrypted.
-        :type data: str
-        :param always_trust: Skip key validation and assume that used keys
-            are always fully trusted?
-        :type always_trust: bool
-        :param passphrase: The passphrase to be used if symmetric encryption
-            is desired.
-        :type passphrase: str
-
-        :return: An object with decrypted result in the `data` field.
-        :rtype: gnupg.Crypt
-        """
-        # TODO: devise a way so we don't need to "always trust".
-        return gnupg.GPG.decrypt(self, data, always_trust=always_trust,
-                                 passphrase=passphrase)
-
-    def send_keys(self, keyserver, *keyids):
-        """
-        Send keys to a keyserver
-
-        :param keyserver: The keyserver to send the keys to.
-        :type keyserver: str
-        :param keyids: The key ids to send.
-        :type keyids: list
-
-        :return: A list of keys sent to server.
-        :rtype: gnupg.ListKeys
-        """
-        # TODO: write tests for this.
-        # TODO: write a SendKeys class to handle status for this.
-        result = self.result_map['list'](self)
-        gnupg.logger.debug('send_keys: %r', keyids)
-        data = gnupg._make_binary_stream("", self.encoding)
-        args = ['--keyserver', keyserver, '--send-keys']
-        args.extend(keyids)
-        self._handle_io(args, data, result, binary=True)
-        gnupg.logger.debug('send_keys result: %r', result.__dict__)
-        data.close()
-        return result
-
-    def encrypt_file(self, file, recipients, sign=None,
-                     always_trust=False, passphrase=None,
-                     armor=True, output=None, symmetric=False,
-                     cipher_algo=None):
-        """
-        Encrypt the message read from the file-like object 'file'.
-
-        :param file: The file to be encrypted.
-        :type data: file
-        :param recipient: The address of the public key to be used.
-        :type recipient: str
-        :param sign: Should the encrypted content be signed?
-        :type sign: bool
-        :param always_trust: Skip key validation and assume that used keys
-            are always fully trusted?
-        :type always_trust: bool
-        :param passphrase: The passphrase to be used if symmetric encryption
-            is desired.
-        :type passphrase: str
-        :param armor: Create ASCII armored output?
-        :type armor: bool
-        :param output: Path of file to write results in.
-        :type output: str
-        :param symmetric: Should we encrypt to a password?
-        :type symmetric: bool
-        :param cipher_algo: Algorithm to use.
-        :type cipher_algo: str
-
-        :return: An object with encrypted result in the `data` field.
-        :rtype: gnupg.Crypt
-        """
-        args = ['--encrypt']
-        if symmetric:
-            args = ['--symmetric']
-            if cipher_algo:
-                args.append('--cipher-algo %s' % cipher_algo)
-        else:
-            args = ['--encrypt']
-            if not _is_sequence(recipients):
-                recipients = (recipients,)
-            for recipient in recipients:
-                args.append('--recipient "%s"' % recipient)
-        if armor:  # create ascii-armored output - set to False for binary
-            args.append('--armor')
-        if output:  # write the output to a file with the specified name
-            if os.path.exists(output):
-                os.remove(output)  # to avoid overwrite confirmation message
-            args.append('--output "%s"' % output)
-        if sign:
-            args.append('--sign --default-key "%s"' % sign)
-        if always_trust:
-            args.append("--always-trust")
-        result = self.result_map['crypt'](self)
-        self._handle_io(args, file, result, passphrase=passphrase, binary=True)
-        logger.debug('encrypt result: %r', result.data)
-        return result
-
-    def list_packets(self, data):
-        """
-        List the sequence of packets.
-
-        :param data: The data to extract packets from.
-        :type data: str
-
-        :return: An object with packet info.
-        :rtype ListPackets
-        """
-        args = ["--list-packets"]
-        result = self.result_map['list-packets'](self)
-        self._handle_io(
-            args,
-            _make_binary_stream(data, self.encoding),
-            result,
-        )
-        return result
-
-    def encrypted_to(self, data):
-        """
-        Return the key to which data is encrypted to.
-
-        :param data: The data to be examined.
-        :type data: str
-
-        :return: The fingerprint of the key to which data is encrypted to.
-        :rtype: str
-        """
-        # TODO: make this support multiple keys.
-        result = self.list_packets(data)
-        if not result.key:
-            raise LookupError(
-                "Content is not encrypted to a GnuPG key!")
-        try:
-            return self.find_key_by_keyid(result.key)
-        except:
-            return self.find_key_by_subkey(result.key)
-
-    def is_encrypted_sym(self, data):
-        """
-        Say whether some chunk of data is encrypted to a symmetric key.
-
-        :param data: The data to be examined.
-        :type data: str
-
-        :return: Whether data is encrypted to a symmetric key.
-        :rtype: bool
-        """
-        result = self.list_packets(data)
-        return bool(result.need_passphrase_sym)
-
-    def is_encrypted_asym(self, data):
-        """
-        Say whether some chunk of data is encrypted to a private key.
-
-        :param data: The data to be examined.
-        :type data: str
-
-        :return: Whether data is encrypted to a private key.
-        :rtype: bool
-        """
-        result = self.list_packets(data)
-        return bool(result.key)
-
-    def is_encrypted(self, data):
-        """
-        Say whether some chunk of data is encrypted to a key.
-
-        :param data: The data to be examined.
-        :type data: str
-
-        :return: Whether data is encrypted to a key.
-        :rtype: bool
-        """
-        return self.is_encrypted_asym(data) or self.is_encrypted_sym(data)
index 7946db8..9d8d89a 100644 (file)
@@ -27,6 +27,8 @@ import re
 import shutil
 import tempfile
 
+from gnupg import GPG
+from gnupg.gnupg import GPGUtilities
 
 from leap.common.check import leap_assert, leap_assert_type
 from leap.keymanager import errors
@@ -38,7 +40,6 @@ from leap.keymanager.keys import (
     KEYMANAGER_KEY_TAG,
     TAGS_ADDRESS_PRIVATE_INDEX,
 )
-from leap.keymanager.gpg import GPGWrapper
 
 
 logger = logging.getLogger(__name__)
@@ -46,15 +47,15 @@ logger = logging.getLogger(__name__)
 
 class TempGPGWrapper(object):
     """
-    A context manager returning a temporary GPG wrapper keyring, which
-    contains exactly zero or one pubkeys, and zero or one privkeys.
-
-    Temporary unitary keyrings allow the to use GPG's facilities for exactly
-    one key. This function creates an empty temporary keyring and imports
-    C{keys} if it is not None.
+    A context manager that wraps a temporary GPG keyring which only contains
+    the keys given at object creation.
     """
+
     def __init__(self, keys=None, gpgbinary=None):
         """
+        Create an empty temporary keyring and import any given C{keys} into
+        it.
+
         :param keys: OpenPGP key, or list of.
         :type keys: OpenPGPKey or list of OpenPGPKeys
         :param gpgbinary: Name for GnuPG binary executable.
@@ -67,14 +68,15 @@ class TempGPGWrapper(object):
         if not isinstance(keys, list):
             keys = [keys]
         self._keys = keys
-        for key in filter(None, keys):
+        for key in keys:
             leap_assert_type(key, OpenPGPKey)
 
     def __enter__(self):
         """
-        Calls the unitary gpgwrapper initializer
+        Build and return a GPG keyring containing the keys given on
+        object creation.
 
-        :return: A GPG wrapper with a unitary keyring.
+        :return: A GPG instance containing the keys given on object creation.
         :rtype: gnupg.GPG
         """
         self._build_keyring()
@@ -82,19 +84,16 @@ class TempGPGWrapper(object):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """
-        Ensures the gpgwrapper is properly destroyed.
+        Ensure the gpg is properly destroyed.
         """
         # TODO handle exceptions and log here
         self._destroy_keyring()
 
     def _build_keyring(self):
         """
-        Create an empty GPG keyring and import C{keys} into it.
-
-        :param keys: List of keys to add to the keyring.
-        :type keys: list of OpenPGPKey
+        Create a GPG keyring containing the keys given on object creation.
 
-        :return: A GPG wrapper with a unitary keyring.
+        :return: A GPG instance containing the keys given on object creation.
         :rtype: gnupg.GPG
         """
         privkeys = [key for key in self._keys if key and key.private is True]
@@ -111,14 +110,13 @@ class TempGPGWrapper(object):
         listkeys = lambda: self._gpg.list_keys()
         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
 
-        self._gpg = GPGWrapper(
-            gnupghome=tempfile.mkdtemp(),
-            gpgbinary=self._gpgbinary)
+        self._gpg = GPG(binary=self._gpgbinary,
+                        homedir=tempfile.mkdtemp())
         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
 
         # import keys into the keyring:
         # concatenating ascii-armored keys, which is correctly
-        # understood by the GPGWrapper.
+        # understood by GPG.
 
         self._gpg.import_keys("".join(
             [x.key_data for x in publkeys + privkeys]))
@@ -135,7 +133,7 @@ class TempGPGWrapper(object):
 
     def _destroy_keyring(self):
         """
-        Securely erase a unitary keyring.
+        Securely erase the keyring.
         """
         # TODO: implement some kind of wiping of data or a more
         # secure way that
@@ -153,9 +151,9 @@ class TempGPGWrapper(object):
             raise
 
         finally:
-            leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
+            leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
                         "watch out! Tried to remove default gnupg home!")
-            shutil.rmtree(self._gpg.gnupghome)
+            shutil.rmtree(self._gpg.homedir)
 
 
 def _build_key_from_gpg(address, key, key_data):
@@ -406,16 +404,16 @@ class OpenPGPScheme(EncryptionScheme):
 
     def _temporary_gpgwrapper(self, keys=None):
         """
-        Returns a unitary gpg wrapper that implements context manager
-        protocol.
+        Return a gpg wrapper that implements the context manager protocol and
+        contains C{keys}.
 
         :param key_data: ASCII armored key data.
         :type key_data: str
         :param gpgbinary: Name for GnuPG binary executable.
         :type gpgbinary: C{str}
 
-        :return: a GPGWrapper instance
-        :rtype: GPGWrapper
+        :return: a TempGPGWrapper instance
+        :rtype: TempGPGWrapper
         """
         # TODO do here checks on key_data
         return TempGPGWrapper(
@@ -459,8 +457,9 @@ class OpenPGPScheme(EncryptionScheme):
         with self._temporary_gpgwrapper(keys) as gpg:
             result = gpg.encrypt(
                 data, pubkey.fingerprint,
-                sign=sign.key_id if sign else None,
-                passphrase=passphrase, symmetric=False)
+                default_key=sign.key_id if sign else None,
+                passphrase=passphrase, symmetric=False,
+                cipher_algo='AES256')
             # Here we cannot assert for correctness of sig because the sig is
             # in the ciphertext.
             # result.ok    - (bool) indicates if the operation succeeded
@@ -492,7 +491,8 @@ class OpenPGPScheme(EncryptionScheme):
             leap_assert(verify.private is False)
             keys.append(verify)
         with self._temporary_gpgwrapper(keys) as gpg:
-            result = gpg.decrypt(data, passphrase=passphrase)
+            result = gpg.decrypt(
+                data, passphrase=passphrase, always_trust=True)
             self._assert_gpg_result_ok(result)
             # verify signature
             if (verify is not None):
@@ -514,7 +514,8 @@ class OpenPGPScheme(EncryptionScheme):
         :rtype: bool
         """
         with self._temporary_gpgwrapper() as gpg:
-            return gpg.is_encrypted_asym(data)
+            gpgutil = GPGUtilities(gpg)
+            return gpgutil.is_encrypted_asym(data)
 
     def sign(self, data, privkey):
         """
@@ -535,7 +536,7 @@ class OpenPGPScheme(EncryptionScheme):
         # result.fingerprint - contains the fingerprint of the key used to
         #                      sign.
         with self._temporary_gpgwrapper(privkey) as gpg:
-            result = gpg.sign(data, keyid=privkey.key_id)
+            result = gpg.sign(data, default_key=privkey.key_id)
             rfprint = privkey.fingerprint
             privkey = gpg.list_keys(secret=True).pop()
             kfprint = privkey['fingerprint']
index 5f232fe..2512604 100644 (file)
@@ -28,7 +28,6 @@ from leap.keymanager import (
     KeyManager,
     openpgp,
     KeyNotFound,
-    NoPasswordGiven,
     errors,
 )
 from leap.keymanager.openpgp import OpenPGPKey