# -*- coding: utf-8 -*- # gpgwrapper.py # Copyright (C) 2013 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ A GPG wrapper used to handle OpenPGP keys. This is a temporary class that will be superseded by the a revised version of python-gnupg. """ import os import gnupg import re from gnupg import ( logger, _is_sequence, _make_binary_stream, ) class ListPackets(): """ Handle status messages for --list-packets. """ def __init__(self, gpg): """ Initialize the packet listing handling class. :param gpg: GPG object instance. :type gpg: gnupg.GPG """ self.gpg = gpg self.nodata = None self.key = None self.need_passphrase = None self.need_passphrase_sym = None self.userid_hint = None def handle_status(self, key, value): """ Handle one line of the --list-packets status message. :param key: The status message key. :type key: str :param value: The status message value. :type value: str """ # TODO: write tests for handle_status if key == 'NODATA': self.nodata = True if key == 'ENC_TO': # This will only capture keys in our keyring. In the future we # may want to include multiple unknown keys in this list. self.key, _, _ = value.split() if key == 'NEED_PASSPHRASE': self.need_passphrase = True if key == 'NEED_PASSPHRASE_SYM': self.need_passphrase_sym = True if key == 'USERID_HINT': self.userid_hint = value.strip().split() class GPGWrapper(gnupg.GPG): """ This is a temporary class for handling GPG requests, and should be replaced by a more general class used throughout the project. """ GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, verbose=False, use_agent=False, keyring=None, options=None): """ Initialize a GnuPG process wrapper. :param gpgbinary: Name for GnuPG binary executable. :type gpgbinary: C{str} :param gpghome: Full pathname to directory containing the public and private keyrings. :type gpghome: C{str} :param keyring: Name of alternative keyring file to use. If specified, the default keyring is not used. :param verbose: Should some verbose info be output? :type verbose: bool :param use_agent: Should pass `--use-agent` to GPG binary? :type use_agent: bool :param keyring: Path for the keyring to use. :type keyring: str @options: A list of additional options to pass to the GPG binary. :type options: list @raise: RuntimeError with explanation message if there is a problem invoking gpg. """ # XXX: options isn't always supported, so removing for the time being gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary, verbose=verbose, use_agent=use_agent, keyring=keyring)#, options=options) self.result_map['list-packets'] = ListPackets def find_key_by_email(self, email, secret=False): """ Find user's key based on their email. :param email: Email address of key being searched for. :type email: str :param secret: Should we search for a secret key? :type secret: bool :return: The fingerprint of the found key. :rtype: str """ for key in self.list_keys(secret=secret): for uid in key['uids']: if re.search(email, uid): return key raise LookupError("GnuPG public key for email %s not found!" % email) def find_key_by_subkey(self, subkey, secret=False): """ Find user's key based on a subkey fingerprint. :param email: Subkey fingerprint of the key being searched for. :type email: str :param secret: Should we search for a secret key? :type secret: bool :return: The fingerprint of the found key. :rtype: str """ for key in self.list_keys(secret=secret): for sub in key['subkeys']: if sub[0] == subkey: return key raise LookupError( "GnuPG public key for subkey %s not found!" % subkey) def find_key_by_keyid(self, keyid, secret=False): """ Find user's key based on the key ID. :param email: The key ID of the key being searched for. :type email: str :param secret: Should we search for a secret key? :type secret: bool :return: The fingerprint of the found key. :rtype: str """ for key in self.list_keys(secret=secret): if keyid == key['keyid']: return key raise LookupError( "GnuPG public key for keyid %s not found!" % keyid) def find_key_by_fingerprint(self, fingerprint, secret=False): """ Find user's key based on the key fingerprint. :param email: The fingerprint of the key being searched for. :type email: str :param secret: Should we search for a secret key? :type secret: bool :return: The fingerprint of the found key. :rtype: str """ for key in self.list_keys(secret=secret): if fingerprint == key['fingerprint']: return key raise LookupError( "GnuPG public key for fingerprint %s not found!" % fingerprint) def encrypt(self, data, recipient, sign=None, always_trust=True, passphrase=None, symmetric=False): """ Encrypt data using GPG. :param data: The data to be encrypted. :type data: str :param recipient: The address of the public key to be used. :type recipient: str :param sign: Should the encrypted content be signed? :type sign: bool :param always_trust: Skip key validation and assume that used keys are always fully trusted? :type always_trust: bool :param passphrase: The passphrase to be used if symmetric encryption is desired. :type passphrase: str :param symmetric: Should we encrypt to a password? :type symmetric: bool :return: An object with encrypted result in the `data` field. :rtype: gnupg.Crypt """ # TODO: devise a way so we don't need to "always trust". return gnupg.GPG.encrypt(self, data, recipient, sign=sign, always_trust=always_trust, passphrase=passphrase, symmetric=symmetric, cipher_algo='AES256') def decrypt(self, data, always_trust=True, passphrase=None): """ Decrypt data using GPG. :param data: The data to be decrypted. :type data: str :param always_trust: Skip key validation and assume that used keys are always fully trusted? :type always_trust: bool :param passphrase: The passphrase to be used if symmetric encryption is desired. :type passphrase: str :return: An object with decrypted result in the `data` field. :rtype: gnupg.Crypt """ # TODO: devise a way so we don't need to "always trust". return gnupg.GPG.decrypt(self, data, always_trust=always_trust, passphrase=passphrase) def send_keys(self, keyserver, *keyids): """ Send keys to a keyserver :param keyserver: The keyserver to send the keys to. :type keyserver: str :param keyids: The key ids to send. :type keyids: list :return: A list of keys sent to server. :rtype: gnupg.ListKeys """ # TODO: write tests for this. # TODO: write a SendKeys class to handle status for this. result = self.result_map['list'](self) gnupg.logger.debug('send_keys: %r', keyids) data = gnupg._make_binary_stream("", self.encoding) args = ['--keyserver', keyserver, '--send-keys'] args.extend(keyids) self._handle_io(args, data, result, binary=True) gnupg.logger.debug('send_keys result: %r', result.__dict__) data.close() return result def encrypt_file(self, file, recipients, sign=None, always_trust=False, passphrase=None, armor=True, output=None, symmetric=False, cipher_algo=None): """ Encrypt the message read from the file-like object 'file'. :param file: The file to be encrypted. :type data: file :param recipient: The address of the public key to be used. :type recipient: str :param sign: Should the encrypted content be signed? :type sign: bool :param always_trust: Skip key validation and assume that used keys are always fully trusted? :type always_trust: bool :param passphrase: The passphrase to be used if symmetric encryption is desired. :type passphrase: str :param armor: Create ASCII armored output? :type armor: bool :param output: Path of file to write results in. :type output: str :param symmetric: Should we encrypt to a password? :type symmetric: bool :param cipher_algo: Algorithm to use. :type cipher_algo: str :return: An object with encrypted result in the `data` field. :rtype: gnupg.Crypt """ args = ['--encrypt'] if symmetric: args = ['--symmetric'] if cipher_algo: args.append('--cipher-algo %s' % cipher_algo) else: args = ['--encrypt'] if not _is_sequence(recipients): recipients = (recipients,) for recipient in recipients: args.append('--recipient "%s"' % recipient) if armor: # create ascii-armored output - set to False for binary args.append('--armor') if output: # write the output to a file with the specified name if os.path.exists(output): os.remove(output) # to avoid overwrite confirmation message args.append('--output "%s"' % output) if sign: args.append('--sign --default-key "%s"' % sign) if always_trust: args.append("--always-trust") result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase=passphrase, binary=True) logger.debug('encrypt result: %r', result.data) return result def list_packets(self, data): """ List the sequence of packets. :param data: The data to extract packets from. :type data: str :return: An object with packet info. :rtype ListPackets """ args = ["--list-packets"] result = self.result_map['list-packets'](self) self._handle_io( args, _make_binary_stream(data, self.encoding), result, ) return result def encrypted_to(self, data): """ Return the key to which data is encrypted to. :param data: The data to be examined. :type data: str :return: The fingerprint of the key to which data is encrypted to. :rtype: str """ # TODO: make this support multiple keys. result = self.list_packets(data) if not result.key: raise LookupError( "Content is not encrypted to a GnuPG key!") try: return self.find_key_by_keyid(result.key) except: return self.find_key_by_subkey(result.key) def is_encrypted_sym(self, data): """ Say whether some chunk of data is encrypted to a symmetric key. :param data: The data to be examined. :type data: str :return: Whether data is encrypted to a symmetric key. :rtype: bool """ result = self.list_packets(data) return bool(result.need_passphrase_sym) def is_encrypted_asym(self, data): """ Say whether some chunk of data is encrypted to a private key. :param data: The data to be examined. :type data: str :return: Whether data is encrypted to a private key. :rtype: bool """ result = self.list_packets(data) return bool(result.key) def is_encrypted(self, data): """ Say whether some chunk of data is encrypted to a key. :param data: The data to be examined. :type data: str :return: Whether data is encrypted to a key. :rtype: bool """ return self.is_encrypted_asym(data) or self.is_encrypted_sym(data)