summaryrefslogtreecommitdiff
path: root/gnupg/_meta.py
diff options
context:
space:
mode:
Diffstat (limited to 'gnupg/_meta.py')
-rw-r--r--gnupg/_meta.py871
1 files changed, 871 insertions, 0 deletions
diff --git a/gnupg/_meta.py b/gnupg/_meta.py
new file mode 100644
index 0000000..f11310c
--- /dev/null
+++ b/gnupg/_meta.py
@@ -0,0 +1,871 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of python-gnupg, a Python interface to GnuPG.
+# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
+# © 2013 Andrej B.
+# © 2013 LEAP Encryption Access Project
+# © 2008-2012 Vinay Sajip
+# © 2005 Steve Traugott
+# © 2004 A.M. Kuchling
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
+
+'''Meta and base classes for hiding internal functions, and controlling
+attribute creation and handling.
+'''
+
+from __future__ import absolute_import
+
+import atexit
+import codecs
+import encodings
+## For AOS, the locale module will need to point to a wrapper around the
+## java.util.Locale class.
+## See https://code.patternsinthevoid.net/?p=android-locale-hack.git
+import locale
+import os
+import psutil
+import subprocess
+import sys
+import threading
+
+from . import _parsers
+from . import _util
+
+from ._parsers import _check_preferences
+from ._parsers import _sanitise_list
+from ._util import log
+
+
+class GPGMeta(type):
+ """Metaclass for changing the :meth:GPG.__init__ initialiser.
+
+ Detects running gpg-agent processes and the presence of a pinentry
+ program, and disables pinentry so that python-gnupg can write the
+ passphrase to the controlled GnuPG process without killing the agent.
+
+ :attr _agent_proc: If a :program:`gpg-agent` process is currently running
+ for the effective userid, then **_agent_proc** will be
+ set to a ``psutil.Process`` for that process.
+ """
+
+ def __new__(cls, name, bases, attrs):
+ """Construct the initialiser for GPG"""
+ log.debug("Metaclass __new__ constructor called for %r" % cls)
+ if cls._find_agent():
+ ## call the normal GPG.__init__() initialiser:
+ attrs['init'] = cls.__init__
+ attrs['_remove_agent'] = True
+ return super(GPGMeta, cls).__new__(cls, name, bases, attrs)
+
+ @classmethod
+ def _find_agent(cls):
+ """Discover if a gpg-agent process for the current euid is running.
+
+ If there is a matching gpg-agent process, set a :class:`psutil.Process`
+ instance containing the gpg-agent process' information to
+ ``cls._agent_proc``.
+
+ :returns: True if there exists a gpg-agent process running under the
+ same effective user ID as that of this program. Otherwise,
+ returns None.
+ """
+ identity = psutil.Process(os.getpid()).uids
+ for proc in psutil.process_iter():
+ if (proc.name == "gpg-agent") and proc.is_running:
+ log.debug("Found gpg-agent process with pid %d" % proc.pid)
+ if proc.uids == identity:
+ log.debug(
+ "Effective UIDs of this process and gpg-agent match")
+ setattr(cls, '_agent_proc', proc)
+ return True
+
+
+class GPGBase(object):
+ """Base class for storing properties and controlling process initialisation.
+
+ :const _result_map: A *dict* containing classes from
+ :mod:`~gnupg._parsers`, used for parsing results
+ obtained from GnuPG commands.
+ :const _decode_errors: How to handle encoding errors.
+ """
+ __metaclass__ = GPGMeta
+ _decode_errors = 'strict'
+ _result_map = { 'crypt': _parsers.Crypt,
+ 'delete': _parsers.DeleteResult,
+ 'generate': _parsers.GenKey,
+ 'import': _parsers.ImportResult,
+ 'list': _parsers.ListKeys,
+ 'sign': _parsers.Sign,
+ 'verify': _parsers.Verify,
+ 'packets': _parsers.ListPackets }
+
+ def __init__(self, binary=None, home=None, keyring=None, secring=None,
+ use_agent=False, default_preference_list=None,
+ verbose=False, options=None):
+ """Create a ``GPGBase``.
+
+ This class is used to set up properties for controlling the behaviour
+ of configuring various options for GnuPG, such as setting GnuPG's
+ **homedir** , and the paths to its **binary** and **keyring** .
+
+ :const binary: (:obj:`str`) The full path to the GnuPG binary.
+
+ :ivar homedir: (:class:`~gnupg._util.InheritableProperty`) The full
+ path to the current setting for the GnuPG
+ ``--homedir``.
+
+ :ivar _generated_keys: (:class:`~gnupg._util.InheritableProperty`)
+ Controls setting the directory for storing any
+ keys which are generated with
+ :meth:`~gnupg.GPG.gen_key`.
+
+ :ivar str keyring: The filename in **homedir** to use as the keyring
+ file for public keys.
+ :ivar str secring: The filename in **homedir** to use as the keyring
+ file for secret keys.
+ """
+ self.binary = _util._find_binary(binary)
+ self.homedir = home if home else _util._conf
+ pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
+ sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
+ self.keyring = os.path.join(self._homedir, pub)
+ self.secring = os.path.join(self._homedir, sec)
+ self.options = _parsers._sanitise(options) if options else None
+
+ if default_preference_list:
+ self._prefs = _check_preferences(default_preference_list, 'all')
+ else:
+ self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH'
+ self._prefs += ' AES192 ZLIB ZIP Uncompressed'
+
+ encoding = locale.getpreferredencoding()
+ if encoding is None: # This happens on Jython!
+ encoding = sys.stdin.encoding
+ self._encoding = encoding.lower().replace('-', '_')
+ self._filesystemencoding = encodings.normalize_encoding(
+ sys.getfilesystemencoding().lower())
+
+ self._keyserver = 'hkp://wwwkeys.pgp.net'
+ self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
+
+ try:
+ assert self.binary, "Could not find binary %s" % binary
+ assert isinstance(verbose, (bool, str, int)), \
+ "'verbose' must be boolean, string, or 0 <= n <= 9"
+ assert isinstance(use_agent, bool), "'use_agent' must be boolean"
+ if self.options is not None:
+ assert isinstance(self.options, str), "options not string"
+ except (AssertionError, AttributeError) as ae:
+ log.error("GPGBase.__init__(): %s" % str(ae))
+ raise RuntimeError(str(ae))
+ else:
+ if verbose is True:
+ # The caller wants logging, but we need a valid --debug-level
+ # for gpg. Default to "basic", and warn about the ambiguity.
+ # (garrettr)
+ verbose = "basic"
+ log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging')
+ self.verbose = verbose
+ self.use_agent = use_agent
+
+ if hasattr(self, '_agent_proc') \
+ and getattr(self, '_remove_agent', None) is True:
+ if hasattr(self, '__remove_path__'):
+ self.__remove_path__('pinentry')
+
+ def __remove_path__(self, prog=None, at_exit=True):
+ """Remove the directories containing a program from the system's
+ ``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it
+ is linked to :file:'./gpg' in the current directory.
+
+ :param str prog: The program to remove from ``$PATH``.
+ :param bool at_exit: Add the program back into the ``$PATH`` when the
+ Python interpreter exits, and delete any symlinks
+ to ``GPGBase.binary`` which were created.
+ """
+ #: A list of ``$PATH`` entries which were removed to disable pinentry.
+ self._removed_path_entries = []
+
+ log.debug("Attempting to remove %s from system PATH" % str(prog))
+ if (prog is None) or (not isinstance(prog, str)): return
+
+ try:
+ program = _util._which(prog)[0]
+ except (OSError, IOError, IndexError) as err:
+ log.err(str(err))
+ log.err("Cannot find program '%s', not changing PATH." % prog)
+ return
+
+ ## __remove_path__ cannot be an @classmethod in GPGMeta, because
+ ## the use_agent attribute must be set by the instance.
+ if not self.use_agent:
+ program_base = os.path.dirname(prog)
+ gnupg_base = os.path.dirname(self.binary)
+
+ ## symlink our gpg binary into $PWD if the path we are removing is
+ ## the one which contains our gpg executable:
+ new_gpg_location = os.path.join(os.getcwd(), 'gpg')
+ if gnupg_base == program_base:
+ os.symlink(self.binary, new_gpg_location)
+ self.binary = new_gpg_location
+
+ ## copy the original environment so that we can put it back later:
+ env_copy = os.environ ## this one should not be touched
+ path_copy = os.environ.pop('PATH')
+ log.debug("Created a copy of system PATH: %r" % path_copy)
+ assert not os.environ.has_key('PATH'), "OS env kept $PATH anyway!"
+
+ @staticmethod
+ def remove_program_from_path(path, prog_base):
+ """Remove all directories which contain a program from PATH.
+
+ :param str path: The contents of the system environment's
+ ``$PATH``.
+
+ :param str prog_base: The directory portion of a program's
+ location, without the trailing slash,
+ and without the program name. For
+ example, ``prog_base='/usr/bin'``.
+ """
+ paths = path.split(':')
+ for directory in paths:
+ if directory == prog_base:
+ log.debug("Found directory with target program: %s"
+ % directory)
+ path.remove(directory)
+ self._removed_path_entries.append(directory)
+ log.debug("Deleted all found instance of %s." % directory)
+ log.debug("PATH is now:%s%s" % (os.linesep, path))
+ new_path = ':'.join([p for p in path])
+ return new_path
+
+ @staticmethod
+ def update_path(environment, path):
+ """Add paths to the string at ``os.environ['PATH']``.
+
+ :param str environment: The environment mapping to update.
+ :param list path: A list of strings to update the PATH with.
+ """
+ log.debug("Updating system path...")
+ os.environ = environment
+ new_path = ':'.join([p for p in path])
+ old = ''
+ if 'PATH' in os.environ:
+ new_path = ':'.join([os.environ['PATH'], new_path])
+ os.environ.update({'PATH': new_path})
+ log.debug("System $PATH: %s" % os.environ['PATH'])
+
+ modified_path = remove_program_from_path(path_copy, program_base)
+ update_path(env_copy, modified_path)
+
+ ## register an _exithandler with the python interpreter:
+ atexit.register(update_path, env_copy, path_copy)
+
+ def remove_symlinked_binary(symlink):
+ if os.path.islink(symlink):
+ os.unlink(symlink)
+ log.debug("Removed binary symlink '%s'" % symlink)
+ atexit.register(remove_symlinked_binary, new_gpg_location)
+
+ @property
+ def default_preference_list(self):
+ """Get the default preference list."""
+ return self._prefs
+
+ @default_preference_list.setter
+ def default_preference_list(self, prefs):
+ """Set the default preference list.
+
+ :param str prefs: A string containing the default preferences for
+ ciphers, digests, and compression algorithms.
+ """
+ prefs = _check_preferences(prefs)
+ if prefs is not None:
+ self._prefs = prefs
+
+ @default_preference_list.deleter
+ def default_preference_list(self):
+ """Reset the default preference list to its original state.
+
+ Note that "original state" does not mean the default preference
+ list for whichever version of GnuPG is being used. It means the
+ default preference list defined by :attr:`GPGBase._prefs`.
+
+ Using BZIP2 is avoided due to not interacting well with some versions
+ of GnuPG>=2.0.0.
+ """
+ self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP'
+
+ @property
+ def keyserver(self):
+ """Get the current keyserver setting."""
+ return self._keyserver
+
+ @keyserver.setter
+ def keyserver(self, location):
+ """Set the default keyserver to use for sending and receiving keys.
+
+ The ``location`` is sent to :func:`_parsers._check_keyserver` when
+ option are parsed in :meth:`gnupg.GPG._make_options`.
+
+ :param str location: A string containing the default keyserver. This
+ should contain the desired keyserver protocol
+ which is supported by the keyserver, for example,
+ ``'hkps://keys.mayfirst.org'``. The default
+ keyserver is ``'hkp://wwwkeys.pgp.net'``.
+ """
+ self._keyserver = location
+
+ @keyserver.deleter
+ def keyserver(self):
+ """Reset the keyserver to the default setting."""
+ self._keyserver = 'hkp://wwwkeys.pgp.net'
+
+ def _homedir_getter(self):
+ """Get the directory currently being used as GnuPG's homedir.
+
+ If unspecified, use :file:`~/.config/python-gnupg/`
+
+ :rtype: str
+ :returns: The absolute path to the current GnuPG homedir.
+ """
+ return self._homedir
+
+ def _homedir_setter(self, directory):
+ """Set the directory to use as GnuPG's homedir.
+
+ If unspecified, use $HOME/.config/python-gnupg. If specified, ensure
+ that the ``directory`` does not contain various shell escape
+ characters. If ``directory`` is not found, it will be automatically
+ created. Lastly, the ``direcory`` will be checked that the EUID has
+ read and write permissions for it.
+
+ :param str directory: A relative or absolute path to the directory to
+ use for storing/accessing GnuPG's files, including
+ keyrings and the trustdb.
+ :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable
+ directory to use.
+ """
+ if not directory:
+ log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'"
+ % _util._conf)
+ directory = _util._conf
+
+ hd = _parsers._fix_unsafe(directory)
+ log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd)
+
+ if hd:
+ log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
+ _util._create_if_necessary(hd)
+
+ try:
+ log.debug("GPGBase._homedir_setter(): checking permissions")
+ assert _util._has_readwrite(hd), \
+ "Homedir '%s' needs read/write permissions" % hd
+ except AssertionError as ae:
+ msg = ("Unable to set '%s' as GnuPG homedir" % directory)
+ log.debug("GPGBase.homedir.setter(): %s" % msg)
+ log.debug(str(ae))
+ raise RuntimeError(str(ae))
+ else:
+ log.info("Setting homedir to '%s'" % hd)
+ self._homedir = hd
+
+ homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
+
+ def _generated_keys_getter(self):
+ """Get the ``homedir`` subdirectory for storing generated keys.
+
+ :rtype: str
+ :returns: The absolute path to the current GnuPG homedir.
+ """
+ return self.__generated_keys
+
+ def _generated_keys_setter(self, directory):
+ """Set the directory for storing generated keys.
+
+ If unspecified, use
+ :meth:`~gnupg._meta.GPGBase.homedir`/generated-keys. If specified,
+ ensure that the ``directory`` does not contain various shell escape
+ characters. If ``directory`` isn't found, it will be automatically
+ created. Lastly, the ``directory`` will be checked to ensure that the
+ current EUID has read and write permissions for it.
+
+ :param str directory: A relative or absolute path to the directory to
+ use for storing/accessing GnuPG's files, including keyrings and
+ the trustdb.
+ :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable
+ directory to use.
+ """
+ if not directory:
+ directory = os.path.join(self.homedir, 'generated-keys')
+ log.debug("GPGBase._generated_keys_setter(): Using '%s'"
+ % directory)
+
+ hd = _parsers._fix_unsafe(directory)
+ log.debug("GPGBase._generated_keys_setter(): got directory '%s'" % hd)
+
+ if hd:
+ log.debug("GPGBase._generated_keys_setter(): Check exists '%s'"
+ % hd)
+ _util._create_if_necessary(hd)
+
+ try:
+ log.debug("GPGBase._generated_keys_setter(): check permissions")
+ assert _util._has_readwrite(hd), \
+ "Keys dir '%s' needs read/write permissions" % hd
+ except AssertionError as ae:
+ msg = ("Unable to set '%s' as generated keys dir" % directory)
+ log.debug("GPGBase._generated_keys_setter(): %s" % msg)
+ log.debug(str(ae))
+ raise RuntimeError(str(ae))
+ else:
+ log.info("Setting homedir to '%s'" % hd)
+ self.__generated_keys = hd
+
+ _generated_keys = _util.InheritableProperty(_generated_keys_getter,
+ _generated_keys_setter)
+
+ def _make_args(self, args, passphrase=False):
+ """Make a list of command line elements for GPG.
+
+ The value of ``args`` will be appended only if it passes the checks in
+ :func:`gnupg._parsers._sanitise`. The ``passphrase`` argument needs to
+ be True if a passphrase will be sent to GnuPG, else False.
+
+ :param list args: A list of strings of options and flags to pass to
+ ``GPG.binary``. This is input safe, meaning that
+ these values go through strict checks (see
+ ``parsers._sanitise_list``) before being passed to to
+ the input file descriptor for the GnuPG process.
+ Each string should be given exactly as it would be on
+ the commandline interface to GnuPG,
+ e.g. ["--cipher-algo AES256", "--default-key
+ A3ADB67A2CDB8B35"].
+
+ :param bool passphrase: If True, the passphrase will be sent to the
+ stdin file descriptor for the attached GnuPG
+ process.
+ """
+ ## see TODO file, tag :io:makeargs:
+ cmd = [self.binary,
+ '--no-options --no-emit-version --no-tty --status-fd 2']
+
+ if self.homedir: cmd.append('--homedir "%s"' % self.homedir)
+
+ if self.keyring:
+ cmd.append('--no-default-keyring --keyring %s' % self.keyring)
+ if self.secring:
+ cmd.append('--secret-keyring %s' % self.secring)
+
+ if passphrase: cmd.append('--batch --passphrase-fd 0')
+
+ if self.use_agent: cmd.append('--use-agent')
+ else: cmd.append('--no-use-agent')
+
+ if self.options:
+ [cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
+ if args:
+ [cmd.append(arg) for arg in iter(_sanitise_list(args))]
+
+ if self.verbose:
+ cmd.append('--debug-all')
+ if ((isinstance(self.verbose, str) and
+ self.verbose in ['basic', 'advanced', 'expert', 'guru'])
+ or (isinstance(self.verbose, int) and (1<=self.verbose<=9))):
+ cmd.append('--debug-level %s' % self.verbose)
+
+ return cmd
+
+ def _open_subprocess(self, args=None, passphrase=False):
+ """Open a pipe to a GPG subprocess and return the file objects for
+ communicating with it.
+
+ :param list args: A list of strings of options and flags to pass to
+ ``GPG.binary``. This is input safe, meaning that
+ these values go through strict checks (see
+ ``parsers._sanitise_list``) before being passed to to
+ the input file descriptor for the GnuPG process.
+ Each string should be given exactly as it would be on
+ the commandline interface to GnuPG,
+ e.g. ["--cipher-algo AES256", "--default-key
+ A3ADB67A2CDB8B35"].
+
+ :param bool passphrase: If True, the passphrase will be sent to the
+ stdin file descriptor for the attached GnuPG
+ process.
+ """
+ ## see http://docs.python.org/2/library/subprocess.html#converting-an\
+ ## -argument-sequence-to-a-string-on-windows
+ cmd = ' '.join(self._make_args(args, passphrase))
+ log.debug("Sending command to GnuPG process:%s%s" % (os.linesep, cmd))
+ return subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ env={'LANGUAGE': 'en'})
+
+ def _read_response(self, stream, result):
+ """Reads all the stderr output from GPG, taking notice only of lines
+ that begin with the magic [GNUPG:] prefix.
+
+ Calls methods on the response object for each valid token found, with
+ the arg being the remainder of the status line.
+
+ :param stream: A byte-stream, file handle, or a
+ :data:`subprocess.PIPE` for parsing the status codes
+ from the GnuPG process.
+
+ :param result: The result parser class from :mod:`~gnupg._parsers` ―
+ the ``handle_status()`` method of that class will be
+ called in order to parse the output of ``stream``.
+ """
+ lines = []
+ while True:
+ line = stream.readline()
+ if len(line) == 0:
+ break
+ lines.append(line)
+ line = line.rstrip()
+
+ if line.startswith('[GNUPG:]'):
+ line = _util._deprefix(line, '[GNUPG:] ', log.status)
+ keyword, value = _util._separate_keyword(line)
+ result._handle_status(keyword, value)
+ elif line.startswith('gpg:'):
+ line = _util._deprefix(line, 'gpg: ')
+ keyword, value = _util._separate_keyword(line)
+
+ # Log gpg's userland messages at our own levels:
+ if keyword.upper().startswith("WARNING"):
+ log.warn("%s" % value)
+ elif keyword.upper().startswith("FATAL"):
+ log.critical("%s" % value)
+ # Handle the gpg2 error where a missing trustdb.gpg is,
+ # for some stupid reason, considered fatal:
+ if value.find("trustdb.gpg") and value.find("No such file"):
+ result._handle_status('NEED_TRUSTDB', '')
+ else:
+ if self.verbose:
+ log.info("%s" % line)
+ else:
+ log.debug("%s" % line)
+ result.stderr = ''.join(lines)
+
+ def _read_data(self, stream, result):
+ """Incrementally read from ``stream`` and store read data.
+
+ All data gathered from calling ``stream.read()`` will be concatenated
+ and stored as ``result.data``.
+
+ :param stream: An open file-like object to read() from.
+ :param result: An instance of one of the :ref:`result parsing classes
+ <parsers>` from :const:`~gnupg._meta.GPGBase._result_map`.
+ """
+ chunks = []
+ log.debug("Reading data from stream %r..." % stream.__repr__())
+
+ while True:
+ data = stream.read(1024)
+ if len(data) == 0:
+ break
+ chunks.append(data)
+ log.debug("Read %4d bytes" % len(data))
+
+ # Join using b'' or '', as appropriate
+ result.data = type(data)().join(chunks)
+ log.debug("Finishing reading from stream %r..." % stream.__repr__())
+ log.debug("Read %4d bytes total" % len(result.data))
+
+ def _collect_output(self, process, result, writer=None, stdin=None):
+ """Drain the subprocesses output streams, writing the collected output
+ to the result. If a writer thread (writing to the subprocess) is given,
+ make sure it's joined before returning. If a stdin stream is given,
+ close it before returning.
+ """
+ stderr = codecs.getreader(self._encoding)(process.stderr)
+ rr = threading.Thread(target=self._read_response,
+ args=(stderr, result))
+ rr.setDaemon(True)
+ log.debug('stderr reader: %r', rr)
+ rr.start()
+
+ stdout = process.stdout
+ dr = threading.Thread(target=self._read_data, args=(stdout, result))
+ dr.setDaemon(True)
+ log.debug('stdout reader: %r', dr)
+ dr.start()
+
+ dr.join()
+ rr.join()
+ if writer is not None:
+ writer.join()
+ process.wait()
+ if stdin is not None:
+ try:
+ stdin.close()
+ except IOError:
+ pass
+ stderr.close()
+ stdout.close()
+
+ def _handle_io(self, args, file, result, passphrase=False, binary=False):
+ """Handle a call to GPG - pass input data, collect output data."""
+ p = self._open_subprocess(args, passphrase)
+ if not binary:
+ stdin = codecs.getwriter(self._encoding)(p.stdin)
+ else:
+ stdin = p.stdin
+ if passphrase:
+ _util._write_passphrase(stdin, passphrase, self._encoding)
+ writer = _util._threaded_copy_data(file, stdin)
+ self._collect_output(p, result, writer, stdin)
+ return result
+
+ def _recv_keys(self, keyids, keyserver=None):
+ """Import keys from a keyserver.
+
+ :param str keyids: A space-delimited string containing the keyids to
+ request.
+ :param str keyserver: The keyserver to request the ``keyids`` from;
+ defaults to `gnupg.GPG.keyserver`.
+ """
+ if not keyserver:
+ keyserver = self.keyserver
+
+ args = ['--keyserver {0}'.format(keyserver),
+ '--recv-keys {0}'.format(keyids)]
+ log.info('Requesting keys from %s: %s' % (keyserver, keyids))
+
+ result = self._result_map['import'](self)
+ proc = self._open_subprocess(args)
+ self._collect_output(proc, result)
+ log.debug('recv_keys result: %r', result.__dict__)
+ return result
+
+ def _sign_file(self, file, default_key=None, passphrase=None,
+ clearsign=True, detach=False, binary=False,
+ digest_algo='SHA512'):
+ """Create a signature for a file.
+
+ :param file: The file stream (i.e. it's already been open()'d) to sign.
+ :param str default_key: The key to sign with.
+ :param str passphrase: The passphrase to pipe to stdin.
+ :param bool clearsign: If True, create a cleartext signature.
+ :param bool detach: If True, create a detached signature.
+ :param bool binary: If True, do not ascii armour the output.
+ :param str digest_algo: The hash digest to use. Again, to see which
+ hashes your GnuPG is capable of using, do:
+ ``$ gpg --with-colons --list-config
+ digestname``. The default, if unspecified, is
+ ``'SHA512'``.
+ """
+ log.debug("_sign_file():")
+ if binary:
+ log.info("Creating binary signature for file %s" % file)
+ args = ['--sign']
+ else:
+ log.info("Creating ascii-armoured signature for file %s" % file)
+ args = ['--sign --armor']
+
+ if clearsign:
+ args.append("--clearsign")
+ if detach:
+ log.warn("Cannot use both --clearsign and --detach-sign.")
+ log.warn("Using default GPG behaviour: --clearsign only.")
+ elif detach and not clearsign:
+ args.append("--detach-sign")
+
+ if default_key:
+ args.append(str("--default-key %s" % default_key))
+
+ args.append(str("--digest-algo %s" % digest_algo))
+
+ ## We could use _handle_io here except for the fact that if the
+ ## passphrase is bad, gpg bails and you can't write the message.
+ result = self._result_map['sign'](self)
+ proc = self._open_subprocess(args, passphrase is not None)
+ try:
+ if passphrase:
+ _util._write_passphrase(proc.stdin, passphrase, self._encoding)
+ writer = _util._threaded_copy_data(file, proc.stdin)
+ except IOError as ioe:
+ log.exception("Error writing message: %s" % str(ioe))
+ writer = None
+ self._collect_output(proc, result, writer, proc.stdin)
+ return result
+
+ def _encrypt(self, data, recipients,
+ default_key=None,
+ passphrase=None,
+ armor=True,
+ encrypt=True,
+ symmetric=False,
+ always_trust=True,
+ output=None,
+ cipher_algo='AES256',
+ digest_algo='SHA512',
+ compress_algo='ZLIB'):
+ """Encrypt the message read from the file-like object **data**.
+
+ :param str data: The file or bytestream to encrypt.
+
+ :param str recipients: The recipients to encrypt to. Recipients must
+ be specified keyID/fingerprint.
+
+ .. warning:: Care should be taken in Python2 to make sure that the
+ given fingerprints for **recipients** are in fact strings
+ and not unicode objects.
+
+ :param str default_key: The keyID/fingerprint of the key to use for
+ signing. If given, **data** will be encrypted
+ *and* signed.
+
+ :param str passphrase: If given, and **default_key** is also given,
+ use this passphrase to unlock the secret
+ portion of the **default_key** to sign the
+ encrypted **data**. Otherwise, if
+ **default_key** is not given, but **symmetric**
+ is ``True``, then use this passphrase as the
+ passphrase for symmetric encryption. Signing
+ and symmetric encryption should *not* be
+ combined when sending the **data** to other
+ recipients, else the passphrase to the secret
+ key would be shared with them.
+
+ :param bool armor: If True, ascii armor the output; otherwise, the
+ output will be in binary format. (Default: True)
+
+ :param bool encrypt: If True, encrypt the **data** using the
+ **recipients** public keys. (Default: True)
+
+ :param bool symmetric: If True, encrypt the **data** to **recipients**
+ using a symmetric key. See the **passphrase**
+ parameter. Symmetric encryption and public key
+ encryption can be used simultaneously, and will
+ result in a ciphertext which is decryptable
+ with either the symmetric **passphrase** or one
+ of the corresponding private keys.
+
+ :param bool always_trust: If True, ignore trust warnings on
+ **recipients** keys. If False, display trust
+ warnings. (default: True)
+
+ :param str output: The output file to write to. If not specified, the
+ encrypted output is returned, and thus should be
+ stored as an object in Python. For example:
+
+
+ >>> import shutil
+ >>> import gnupg
+ >>> if os.path.exists("doctests"):
+ ... shutil.rmtree("doctests")
+ >>> gpg = gnupg.GPG(homedir="doctests")
+ >>> key_settings = gpg.gen_key_input(key_type='RSA',
+ ... key_length=1024,
+ ... key_usage='ESCA',
+ ... passphrase='foo')
+ >>> key = gpg.gen_key(key_settings)
+ >>> message = "The crow flies at midnight."
+ >>> encrypted = str(gpg.encrypt(message, key.printprint))
+ >>> assert encrypted != message
+ >>> assert not encrypted.isspace()
+ >>> decrypted = str(gpg.decrypt(encrypted))
+ >>> assert not decrypted.isspace()
+ >>> decrypted
+ 'The crow flies at midnight.'
+
+ :param str cipher_algo: The cipher algorithm to use. To see available
+ algorithms with your version of GnuPG, do:
+ :command:`$ gpg --with-colons --list-config
+ ciphername`. The default **cipher_algo**, if
+ unspecified, is ``'AES256'``.
+
+ :param str digest_algo: The hash digest to use. Again, to see which
+ hashes your GnuPG is capable of using, do:
+ :command:`$ gpg --with-colons --list-config
+ digestname`. The default, if unspecified, is
+ ``'SHA512'``.
+
+ :param str compress_algo: The compression algorithm to use. Can be one
+ of ``'ZLIB'``, ``'BZIP2'``, ``'ZIP'``, or
+ ``'Uncompressed'``.
+ """
+ args = []
+
+ if output:
+ if getattr(output, 'fileno', None) is not None:
+ ## avoid overwrite confirmation message
+ if getattr(output, 'name', None) is None:
+ if os.path.exists(output):
+ os.remove(output)
+ args.append('--output %s' % output)
+ else:
+ if os.path.exists(output.name):
+ os.remove(output.name)
+ args.append('--output %s' % output.name)
+
+ if armor: args.append('--armor')
+ if always_trust: args.append('--always-trust')
+ if cipher_algo: args.append('--cipher-algo %s' % cipher_algo)
+ if compress_algo: args.append('--compress-algo %s' % compress_algo)
+
+ if default_key:
+ args.append('--sign')
+ args.append('--default-key %s' % default_key)
+ if digest_algo:
+ args.append('--digest-algo %s' % digest_algo)
+
+ ## both can be used at the same time for an encrypted file which
+ ## is decryptable with a passphrase or secretkey.
+ if symmetric: args.append('--symmetric')
+ if encrypt: args.append('--encrypt')
+
+ if len(recipients) >= 1:
+ log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
+ % (recipients, type(recipients)))
+
+ if isinstance(recipients, (list, tuple)):
+ for recp in recipients:
+ if not _util._py3k:
+ if isinstance(recp, unicode):
+ try:
+ assert _parsers._is_hex(str(recp))
+ except AssertionError:
+ log.info("Can't accept recipient string: %s"
+ % recp)
+ else:
+ args.append('--recipient %s' % str(recp))
+ continue
+ ## will give unicode in 2.x as '\uXXXX\uXXXX'
+ args.append('--recipient %r' % recp)
+ continue
+ if isinstance(recp, str):
+ args.append('--recipient %s' % recp)
+
+ elif (not _util._py3k) and isinstance(recp, basestring):
+ for recp in recipients.split('\x20'):
+ args.append('--recipient %s' % recp)
+
+ elif _util._py3k and isinstance(recp, str):
+ for recp in recipients.split(' '):
+ args.append('--recipient %s' % recp)
+ ## ...and now that we've proven py3k is better...
+
+ else:
+ log.debug("Don't know what to do with recipients: '%s'"
+ % recipients)
+
+ result = self._result_map['crypt'](self)
+ log.debug("Got data '%s' with type '%s'."
+ % (data, type(data)))
+ self._handle_io(args, data, result,
+ passphrase=passphrase, binary=True)
+ log.debug("\n%s" % result.data)
+ return result