diff options
| -rw-r--r-- | CHANGELOG | 4 | ||||
| -rw-r--r-- | pkg/requirements.pip | 2 | ||||
| -rw-r--r-- | src/leap/keymanager/__init__.py | 21 | ||||
| -rw-r--r-- | src/leap/keymanager/gpg.py | 398 | ||||
| -rw-r--r-- | src/leap/keymanager/openpgp.py | 65 | ||||
| -rw-r--r-- | src/leap/keymanager/tests/test_keymanager.py | 1 | 
6 files changed, 57 insertions, 434 deletions
| @@ -1,3 +1,7 @@ +0.3.3 Oct 4: +  o Add a sanity check for the correct version of gnupg. +  o Update code to use gnupg 1.2.2 python module. Closes #2342. +  0.3.2 Sep 6:    o Do not raise exception when a GET request doesn't return 2XX      code. Nickserver uses error codes for more verbosity in the diff --git a/pkg/requirements.pip b/pkg/requirements.pip index 9e16e95a..5ebd8038 100644 --- a/pkg/requirements.pip +++ b/pkg/requirements.pip @@ -1,4 +1,4 @@  leap.common>=0.3.0  simplejson  requests -python-gnupg +gnupg diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py index 2f39ad9e..76be2262 100644 --- a/src/leap/keymanager/__init__.py +++ b/src/leap/keymanager/__init__.py @@ -14,11 +14,28 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Key Manager is a Nicknym agent for LEAP client.  """ +# let's do a little sanity check to see if we're using the wrong gnupg +import sys + +try: +    from gnupg.gnupg import GPGUtilities +    assert(GPGUtilities)  # pyflakes happy +    from gnupg import __version__ +    from distutils.version import LooseVersion as V +    assert(V(__version__) >= V('1.2.2')) + +except ImportError, AssertionError: +    print "Ooops! It looks like there is a conflict in the installed version " +    print "of gnupg." +    print "Disclaimer: Ideally, we would need to work a patch and propose the " +    print "merge to upstream. But until then do: " +    print +    print "% pip uninstall python-gnupg" +    print "% pip install gnupg" +    sys.exit(1)  import logging  import requests diff --git a/src/leap/keymanager/gpg.py b/src/leap/keymanager/gpg.py deleted file mode 100644 index b81b218f..00000000 --- a/src/leap/keymanager/gpg.py +++ /dev/null @@ -1,398 +0,0 @@ -# -*- coding: utf-8 -*- -# gpgwrapper.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A GPG wrapper used to handle OpenPGP keys. - -This is a temporary class that will be superseded by the a revised version of -python-gnupg. -""" - - -import os -import gnupg -import re -from gnupg import ( -    logger, -    _is_sequence, -    _make_binary_stream, -) - - -class ListPackets(): -    """ -    Handle status messages for --list-packets. -    """ - -    def __init__(self, gpg): -        """ -        Initialize the packet listing handling class. - -        :param gpg: GPG object instance. -        :type gpg: gnupg.GPG -        """ -        self.gpg = gpg -        self.nodata = None -        self.key = None -        self.need_passphrase = None -        self.need_passphrase_sym = None -        self.userid_hint = None - -    def handle_status(self, key, value): -        """ -        Handle one line of the --list-packets status message. - -        :param key: The status message key. -        :type key: str -        :param value: The status message value. -        :type value: str -        """ -        # TODO: write tests for handle_status -        if key == 'NODATA': -            self.nodata = True -        if key == 'ENC_TO': -            # This will only capture keys in our keyring. In the future we -            # may want to include multiple unknown keys in this list. -            self.key, _, _ = value.split() -        if key == 'NEED_PASSPHRASE': -            self.need_passphrase = True -        if key == 'NEED_PASSPHRASE_SYM': -            self.need_passphrase_sym = True -        if key == 'USERID_HINT': -            self.userid_hint = value.strip().split() - - -class GPGWrapper(gnupg.GPG): -    """ -    This is a temporary class for handling GPG requests, and should be -    replaced by a more general class used throughout the project. -    """ - -    GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" -    GNUPG_BINARY = "/usr/bin/gpg"  # this has to be changed based on OS - -    def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, -                 verbose=False, use_agent=False, keyring=None, options=None): -        """ -        Initialize a GnuPG process wrapper. - -        :param gpgbinary: Name for GnuPG binary executable. -        :type gpgbinary: C{str} -        :param gpghome: Full pathname to directory containing the public and -            private keyrings. -        :type gpghome: C{str} -        :param keyring: Name of alternative keyring file to use. If specified, -            the default keyring is not used. -        :param verbose: Should some verbose info be output? -        :type verbose: bool -        :param use_agent: Should pass `--use-agent` to GPG binary? -        :type use_agent: bool -        :param keyring: Path for the keyring to use. -        :type keyring: str -        @options: A list of additional options to pass to the GPG binary. -        :type options: list - -        @raise: RuntimeError with explanation message if there is a problem -            invoking gpg. -        """ -        # XXX: options isn't always supported, so removing for the time being -        gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary, -                           verbose=verbose, use_agent=use_agent, -                           keyring=keyring)#, options=options) -        self.result_map['list-packets'] = ListPackets - -    def find_key_by_email(self, email, secret=False): -        """ -        Find user's key based on their email. - -        :param email: Email address of key being searched for. -        :type email: str -        :param secret: Should we search for a secret key? -        :type secret: bool - -        :return: The fingerprint of the found key. -        :rtype: str -        """ -        for key in self.list_keys(secret=secret): -            for uid in key['uids']: -                if re.search(email, uid): -                    return key -        raise LookupError("GnuPG public key for email %s not found!" % email) - -    def find_key_by_subkey(self, subkey, secret=False): -        """ -        Find user's key based on a subkey fingerprint. - -        :param email: Subkey fingerprint of the key being searched for. -        :type email: str -        :param secret: Should we search for a secret key? -        :type secret: bool - -        :return: The fingerprint of the found key. -        :rtype: str -        """ -        for key in self.list_keys(secret=secret): -            for sub in key['subkeys']: -                if sub[0] == subkey: -                    return key -        raise LookupError( -            "GnuPG public key for subkey %s not found!" % subkey) - -    def find_key_by_keyid(self, keyid, secret=False): -        """ -        Find user's key based on the key ID. - -        :param email: The key ID of the key being searched for. -        :type email: str -        :param secret: Should we search for a secret key? -        :type secret: bool - -        :return: The fingerprint of the found key. -        :rtype: str -        """ -        for key in self.list_keys(secret=secret): -            if keyid == key['keyid']: -                return key -        raise LookupError( -            "GnuPG public key for keyid %s not found!" % keyid) - -    def find_key_by_fingerprint(self, fingerprint, secret=False): -        """ -        Find user's key based on the key fingerprint. - -        :param email: The fingerprint of the key being searched for. -        :type email: str -        :param secret: Should we search for a secret key? -        :type secret: bool - -        :return: The fingerprint of the found key. -        :rtype: str -        """ -        for key in self.list_keys(secret=secret): -            if fingerprint == key['fingerprint']: -                return key -        raise LookupError( -            "GnuPG public key for fingerprint %s not found!" % fingerprint) - -    def encrypt(self, data, recipient, sign=None, always_trust=True, -                passphrase=None, symmetric=False): -        """ -        Encrypt data using GPG. - -        :param data: The data to be encrypted. -        :type data: str -        :param recipient: The address of the public key to be used. -        :type recipient: str -        :param sign: Should the encrypted content be signed? -        :type sign: bool -        :param always_trust: Skip key validation and assume that used keys -            are always fully trusted? -        :type always_trust: bool -        :param passphrase: The passphrase to be used if symmetric encryption -            is desired. -        :type passphrase: str -        :param symmetric: Should we encrypt to a password? -        :type symmetric: bool - -        :return: An object with encrypted result in the `data` field. -        :rtype: gnupg.Crypt -        """ -        # TODO: devise a way so we don't need to "always trust". -        return gnupg.GPG.encrypt(self, data, recipient, sign=sign, -                                 always_trust=always_trust, -                                 passphrase=passphrase, -                                 symmetric=symmetric, -                                 cipher_algo='AES256') - -    def decrypt(self, data, always_trust=True, passphrase=None): -        """ -        Decrypt data using GPG. - -        :param data: The data to be decrypted. -        :type data: str -        :param always_trust: Skip key validation and assume that used keys -            are always fully trusted? -        :type always_trust: bool -        :param passphrase: The passphrase to be used if symmetric encryption -            is desired. -        :type passphrase: str - -        :return: An object with decrypted result in the `data` field. -        :rtype: gnupg.Crypt -        """ -        # TODO: devise a way so we don't need to "always trust". -        return gnupg.GPG.decrypt(self, data, always_trust=always_trust, -                                 passphrase=passphrase) - -    def send_keys(self, keyserver, *keyids): -        """ -        Send keys to a keyserver - -        :param keyserver: The keyserver to send the keys to. -        :type keyserver: str -        :param keyids: The key ids to send. -        :type keyids: list - -        :return: A list of keys sent to server. -        :rtype: gnupg.ListKeys -        """ -        # TODO: write tests for this. -        # TODO: write a SendKeys class to handle status for this. -        result = self.result_map['list'](self) -        gnupg.logger.debug('send_keys: %r', keyids) -        data = gnupg._make_binary_stream("", self.encoding) -        args = ['--keyserver', keyserver, '--send-keys'] -        args.extend(keyids) -        self._handle_io(args, data, result, binary=True) -        gnupg.logger.debug('send_keys result: %r', result.__dict__) -        data.close() -        return result - -    def encrypt_file(self, file, recipients, sign=None, -                     always_trust=False, passphrase=None, -                     armor=True, output=None, symmetric=False, -                     cipher_algo=None): -        """ -        Encrypt the message read from the file-like object 'file'. - -        :param file: The file to be encrypted. -        :type data: file -        :param recipient: The address of the public key to be used. -        :type recipient: str -        :param sign: Should the encrypted content be signed? -        :type sign: bool -        :param always_trust: Skip key validation and assume that used keys -            are always fully trusted? -        :type always_trust: bool -        :param passphrase: The passphrase to be used if symmetric encryption -            is desired. -        :type passphrase: str -        :param armor: Create ASCII armored output? -        :type armor: bool -        :param output: Path of file to write results in. -        :type output: str -        :param symmetric: Should we encrypt to a password? -        :type symmetric: bool -        :param cipher_algo: Algorithm to use. -        :type cipher_algo: str - -        :return: An object with encrypted result in the `data` field. -        :rtype: gnupg.Crypt -        """ -        args = ['--encrypt'] -        if symmetric: -            args = ['--symmetric'] -            if cipher_algo: -                args.append('--cipher-algo %s' % cipher_algo) -        else: -            args = ['--encrypt'] -            if not _is_sequence(recipients): -                recipients = (recipients,) -            for recipient in recipients: -                args.append('--recipient "%s"' % recipient) -        if armor:  # create ascii-armored output - set to False for binary -            args.append('--armor') -        if output:  # write the output to a file with the specified name -            if os.path.exists(output): -                os.remove(output)  # to avoid overwrite confirmation message -            args.append('--output "%s"' % output) -        if sign: -            args.append('--sign --default-key "%s"' % sign) -        if always_trust: -            args.append("--always-trust") -        result = self.result_map['crypt'](self) -        self._handle_io(args, file, result, passphrase=passphrase, binary=True) -        logger.debug('encrypt result: %r', result.data) -        return result - -    def list_packets(self, data): -        """ -        List the sequence of packets. - -        :param data: The data to extract packets from. -        :type data: str - -        :return: An object with packet info. -        :rtype ListPackets -        """ -        args = ["--list-packets"] -        result = self.result_map['list-packets'](self) -        self._handle_io( -            args, -            _make_binary_stream(data, self.encoding), -            result, -        ) -        return result - -    def encrypted_to(self, data): -        """ -        Return the key to which data is encrypted to. - -        :param data: The data to be examined. -        :type data: str - -        :return: The fingerprint of the key to which data is encrypted to. -        :rtype: str -        """ -        # TODO: make this support multiple keys. -        result = self.list_packets(data) -        if not result.key: -            raise LookupError( -                "Content is not encrypted to a GnuPG key!") -        try: -            return self.find_key_by_keyid(result.key) -        except: -            return self.find_key_by_subkey(result.key) - -    def is_encrypted_sym(self, data): -        """ -        Say whether some chunk of data is encrypted to a symmetric key. - -        :param data: The data to be examined. -        :type data: str - -        :return: Whether data is encrypted to a symmetric key. -        :rtype: bool -        """ -        result = self.list_packets(data) -        return bool(result.need_passphrase_sym) - -    def is_encrypted_asym(self, data): -        """ -        Say whether some chunk of data is encrypted to a private key. - -        :param data: The data to be examined. -        :type data: str - -        :return: Whether data is encrypted to a private key. -        :rtype: bool -        """ -        result = self.list_packets(data) -        return bool(result.key) - -    def is_encrypted(self, data): -        """ -        Say whether some chunk of data is encrypted to a key. - -        :param data: The data to be examined. -        :type data: str - -        :return: Whether data is encrypted to a key. -        :rtype: bool -        """ -        return self.is_encrypted_asym(data) or self.is_encrypted_sym(data) diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py index 7946db85..9d8d89a1 100644 --- a/src/leap/keymanager/openpgp.py +++ b/src/leap/keymanager/openpgp.py @@ -27,6 +27,8 @@ import re  import shutil  import tempfile +from gnupg import GPG +from gnupg.gnupg import GPGUtilities  from leap.common.check import leap_assert, leap_assert_type  from leap.keymanager import errors @@ -38,7 +40,6 @@ from leap.keymanager.keys import (      KEYMANAGER_KEY_TAG,      TAGS_ADDRESS_PRIVATE_INDEX,  ) -from leap.keymanager.gpg import GPGWrapper  logger = logging.getLogger(__name__) @@ -46,15 +47,15 @@ logger = logging.getLogger(__name__)  class TempGPGWrapper(object):      """ -    A context manager returning a temporary GPG wrapper keyring, which -    contains exactly zero or one pubkeys, and zero or one privkeys. - -    Temporary unitary keyrings allow the to use GPG's facilities for exactly -    one key. This function creates an empty temporary keyring and imports -    C{keys} if it is not None. +    A context manager that wraps a temporary GPG keyring which only contains +    the keys given at object creation.      """ +      def __init__(self, keys=None, gpgbinary=None):          """ +        Create an empty temporary keyring and import any given C{keys} into +        it. +          :param keys: OpenPGP key, or list of.          :type keys: OpenPGPKey or list of OpenPGPKeys          :param gpgbinary: Name for GnuPG binary executable. @@ -67,14 +68,15 @@ class TempGPGWrapper(object):          if not isinstance(keys, list):              keys = [keys]          self._keys = keys -        for key in filter(None, keys): +        for key in keys:              leap_assert_type(key, OpenPGPKey)      def __enter__(self):          """ -        Calls the unitary gpgwrapper initializer +        Build and return a GPG keyring containing the keys given on +        object creation. -        :return: A GPG wrapper with a unitary keyring. +        :return: A GPG instance containing the keys given on object creation.          :rtype: gnupg.GPG          """          self._build_keyring() @@ -82,19 +84,16 @@ class TempGPGWrapper(object):      def __exit__(self, exc_type, exc_value, traceback):          """ -        Ensures the gpgwrapper is properly destroyed. +        Ensure the gpg is properly destroyed.          """          # TODO handle exceptions and log here          self._destroy_keyring()      def _build_keyring(self):          """ -        Create an empty GPG keyring and import C{keys} into it. - -        :param keys: List of keys to add to the keyring. -        :type keys: list of OpenPGPKey +        Create a GPG keyring containing the keys given on object creation. -        :return: A GPG wrapper with a unitary keyring. +        :return: A GPG instance containing the keys given on object creation.          :rtype: gnupg.GPG          """          privkeys = [key for key in self._keys if key and key.private is True] @@ -111,14 +110,13 @@ class TempGPGWrapper(object):          listkeys = lambda: self._gpg.list_keys()          listsecretkeys = lambda: self._gpg.list_keys(secret=True) -        self._gpg = GPGWrapper( -            gnupghome=tempfile.mkdtemp(), -            gpgbinary=self._gpgbinary) +        self._gpg = GPG(binary=self._gpgbinary, +                        homedir=tempfile.mkdtemp())          leap_assert(len(listkeys()) is 0, 'Keyring not empty.')          # import keys into the keyring:          # concatenating ascii-armored keys, which is correctly -        # understood by the GPGWrapper. +        # understood by GPG.          self._gpg.import_keys("".join(              [x.key_data for x in publkeys + privkeys])) @@ -135,7 +133,7 @@ class TempGPGWrapper(object):      def _destroy_keyring(self):          """ -        Securely erase a unitary keyring. +        Securely erase the keyring.          """          # TODO: implement some kind of wiping of data or a more          # secure way that @@ -153,9 +151,9 @@ class TempGPGWrapper(object):              raise          finally: -            leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'), +            leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),                          "watch out! Tried to remove default gnupg home!") -            shutil.rmtree(self._gpg.gnupghome) +            shutil.rmtree(self._gpg.homedir)  def _build_key_from_gpg(address, key, key_data): @@ -406,16 +404,16 @@ class OpenPGPScheme(EncryptionScheme):      def _temporary_gpgwrapper(self, keys=None):          """ -        Returns a unitary gpg wrapper that implements context manager -        protocol. +        Return a gpg wrapper that implements the context manager protocol and +        contains C{keys}.          :param key_data: ASCII armored key data.          :type key_data: str          :param gpgbinary: Name for GnuPG binary executable.          :type gpgbinary: C{str} -        :return: a GPGWrapper instance -        :rtype: GPGWrapper +        :return: a TempGPGWrapper instance +        :rtype: TempGPGWrapper          """          # TODO do here checks on key_data          return TempGPGWrapper( @@ -459,8 +457,9 @@ class OpenPGPScheme(EncryptionScheme):          with self._temporary_gpgwrapper(keys) as gpg:              result = gpg.encrypt(                  data, pubkey.fingerprint, -                sign=sign.key_id if sign else None, -                passphrase=passphrase, symmetric=False) +                default_key=sign.key_id if sign else None, +                passphrase=passphrase, symmetric=False, +                cipher_algo='AES256')              # Here we cannot assert for correctness of sig because the sig is              # in the ciphertext.              # result.ok    - (bool) indicates if the operation succeeded @@ -492,7 +491,8 @@ class OpenPGPScheme(EncryptionScheme):              leap_assert(verify.private is False)              keys.append(verify)          with self._temporary_gpgwrapper(keys) as gpg: -            result = gpg.decrypt(data, passphrase=passphrase) +            result = gpg.decrypt( +                data, passphrase=passphrase, always_trust=True)              self._assert_gpg_result_ok(result)              # verify signature              if (verify is not None): @@ -514,7 +514,8 @@ class OpenPGPScheme(EncryptionScheme):          :rtype: bool          """          with self._temporary_gpgwrapper() as gpg: -            return gpg.is_encrypted_asym(data) +            gpgutil = GPGUtilities(gpg) +            return gpgutil.is_encrypted_asym(data)      def sign(self, data, privkey):          """ @@ -535,7 +536,7 @@ class OpenPGPScheme(EncryptionScheme):          # result.fingerprint - contains the fingerprint of the key used to          #                      sign.          with self._temporary_gpgwrapper(privkey) as gpg: -            result = gpg.sign(data, keyid=privkey.key_id) +            result = gpg.sign(data, default_key=privkey.key_id)              rfprint = privkey.fingerprint              privkey = gpg.list_keys(secret=True).pop()              kfprint = privkey['fingerprint'] diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py index 5f232fed..25126047 100644 --- a/src/leap/keymanager/tests/test_keymanager.py +++ b/src/leap/keymanager/tests/test_keymanager.py @@ -28,7 +28,6 @@ from leap.keymanager import (      KeyManager,      openpgp,      KeyNotFound, -    NoPasswordGiven,      errors,  )  from leap.keymanager.openpgp import OpenPGPKey | 
