From a5d46a4e38985be540b9127ddcd3d8e21bbecb9a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 8 Jun 2015 16:46:11 -0400 Subject: Imported Upstream version 2.0.2 --- gnupg/_meta.py | 250 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 198 insertions(+), 52 deletions(-) (limited to 'gnupg/_meta.py') diff --git a/gnupg/_meta.py b/gnupg/_meta.py index 3aafacd..32ab287 100644 --- a/gnupg/_meta.py +++ b/gnupg/_meta.py @@ -32,14 +32,22 @@ import encodings import locale import os import platform -import psutil import shlex import subprocess import sys import threading +## Using psutil is recommended, but since the extension doesn't run with the +## PyPy interpreter, we'll run even if it's not present. +try: + import psutil +except ImportError: + psutil = None + from . import _parsers from . import _util +from ._util import b +from ._util import s from ._parsers import _check_preferences from ._parsers import _sanitise_list @@ -75,19 +83,49 @@ class GPGMeta(type): instance containing the gpg-agent process' information to ``cls._agent_proc``. + For Unix systems, we check that the effective UID of this + ``python-gnupg`` process is also the owner of the gpg-agent + process. For Windows, we check that the usernames of the owners are + the same. (Sorry Windows users; maybe you should switch to anything + else.) + + .. note: This function will only run if the psutil_ Python extension + is installed. Because psutil won't run with the PyPy interpreter, + use of it is optional (although highly recommended). + + .. _psutil: https://pypi.python.org/pypi/psutil + :returns: True if there exists a gpg-agent process running under the same effective user ID as that of this program. Otherwise, - returns None. + returns False. """ - identity = psutil.Process(os.getpid()).uids + if not psutil: + return False + + this_process = psutil.Process(os.getpid()) + ownership_match = False + + if _util._running_windows: + identity = this_process.username() + else: + identity = this_process.uids + for proc in psutil.process_iter(): if (proc.name == "gpg-agent") and proc.is_running: log.debug("Found gpg-agent process with pid %d" % proc.pid) - if proc.uids == identity: - log.debug( - "Effective UIDs of this process and gpg-agent match") - setattr(cls, '_agent_proc', proc) - return True + if _util._running_windows: + if proc.username() == identity: + ownership_match = True + else: + if proc.uids == identity: + ownership_match = True + + if ownership_match: + log.debug("Effective UIDs of this process and gpg-agent match") + setattr(cls, '_agent_proc', proc) + return True + + return False class GPGBase(object): @@ -111,7 +149,7 @@ class GPGBase(object): def __init__(self, binary=None, home=None, keyring=None, secring=None, use_agent=False, default_preference_list=None, - verbose=False, options=None): + ignore_homedir_permissions=False, verbose=False, options=None): """Create a ``GPGBase``. This class is used to set up properties for controlling the behaviour @@ -134,13 +172,18 @@ class GPGBase(object): :ivar str secring: The filename in **homedir** to use as the keyring file for secret keys. """ + self.ignore_homedir_permissions = ignore_homedir_permissions self.binary = _util._find_binary(binary) - self.homedir = home if home else _util._conf + self.homedir = os.path.expanduser(home) if home else _util._conf pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg' sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg' self.keyring = os.path.join(self._homedir, pub) self.secring = os.path.join(self._homedir, sec) - self.options = _parsers._sanitise(options) if options else None + self.options = list(_parsers._sanitise_list(options)) if options else None + + #: The version string of our GnuPG binary + self.binary_version = '0.0.0' + self.verbose = False if default_preference_list: self._prefs = _check_preferences(default_preference_list, 'all') @@ -155,6 +198,14 @@ class GPGBase(object): self._filesystemencoding = encodings.normalize_encoding( sys.getfilesystemencoding().lower()) + # Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49 + # + # During `line = stream.readline()` in `_read_response()`, the Python + # codecs module will choke on Unicode data, so we globally monkeypatch + # the "strict" error handler to use the builtin `replace_errors` + # handler: + codecs.register_error('strict', codecs.replace_errors) + self._keyserver = 'hkp://wwwkeys.pgp.net' self.__generated_keys = os.path.join(self.homedir, 'generated-keys') @@ -164,18 +215,12 @@ class GPGBase(object): "'verbose' must be boolean, string, or 0 <= n <= 9" assert isinstance(use_agent, bool), "'use_agent' must be boolean" if self.options is not None: - assert isinstance(self.options, str), "options not string" + assert isinstance(self.options, list), "options not list" except (AssertionError, AttributeError) as ae: log.error("GPGBase.__init__(): %s" % str(ae)) raise RuntimeError(str(ae)) else: - if verbose is True: - # The caller wants logging, but we need a valid --debug-level - # for gpg. Default to "basic", and warn about the ambiguity. - # (garrettr) - verbose = "basic" - log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging') - self.verbose = verbose + self._set_verbose(verbose) self.use_agent = use_agent if hasattr(self, '_agent_proc') \ @@ -183,6 +228,9 @@ class GPGBase(object): if hasattr(self, '__remove_path__'): self.__remove_path__('pinentry') + # Assign our self.binary_version attribute: + self._check_sane_and_get_gpg_version() + def __remove_path__(self, prog=None, at_exit=True): """Remove the directories containing a program from the system's ``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it @@ -368,18 +416,21 @@ class GPGBase(object): log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd) _util._create_if_necessary(hd) - try: - log.debug("GPGBase._homedir_setter(): checking permissions") - assert _util._has_readwrite(hd), \ - "Homedir '%s' needs read/write permissions" % hd - except AssertionError as ae: - msg = ("Unable to set '%s' as GnuPG homedir" % directory) - log.debug("GPGBase.homedir.setter(): %s" % msg) - log.debug(str(ae)) - raise RuntimeError(str(ae)) - else: - log.info("Setting homedir to '%s'" % hd) + if self.ignore_homedir_permissions: self._homedir = hd + else: + try: + log.debug("GPGBase._homedir_setter(): checking permissions") + assert _util._has_readwrite(hd), \ + "Homedir '%s' needs read/write permissions" % hd + except AssertionError as ae: + msg = ("Unable to set '%s' as GnuPG homedir" % directory) + log.debug("GPGBase.homedir.setter(): %s" % msg) + log.debug(str(ae)) + raise RuntimeError(str(ae)) + else: + log.info("Setting homedir to '%s'" % hd) + self._homedir = hd homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter) @@ -436,6 +487,24 @@ class GPGBase(object): _generated_keys = _util.InheritableProperty(_generated_keys_getter, _generated_keys_setter) + def _check_sane_and_get_gpg_version(self): + """Check that everything runs alright, and grab the gpg binary's + version number while we're at it, storing it as :data:`binary_version`. + + :raises RuntimeError: if we cannot invoke the gpg binary. + """ + proc = self._open_subprocess(["--list-config", "--with-colons"]) + result = self._result_map['list'](self) + self._read_data(proc.stdout, result) + if proc.returncode: + raise RuntimeError("Error invoking gpg: %s" % result.data) + else: + proc.terminate() + + version_line = str(result.data).partition(':version:')[2] + self.binary_version = version_line.split('\n')[0] + log.debug("Using GnuPG version %s" % self.binary_version) + def _make_args(self, args, passphrase=False): """Make a list of command line elements for GPG. @@ -470,21 +539,29 @@ class GPGBase(object): if passphrase: cmd.append('--batch --passphrase-fd 0') - if self.use_agent: cmd.append('--use-agent') - else: cmd.append('--no-use-agent') + if self.use_agent is True: cmd.append('--use-agent') + elif self.use_agent is False: cmd.append('--no-use-agent') + + # The arguments for debugging and verbosity should be placed into the + # cmd list before the options/args in order to resolve Issue #76: + # https://github.com/isislovecruft/python-gnupg/issues/76 + if self.verbose: + cmd.append('--debug-all') + + if (isinstance(self.verbose, str) or + (isinstance(self.verbose, int) and (self.verbose >= 1))): + # GnuPG<=1.4.18 parses the `--debug-level` command in a way + # that is incompatible with all other GnuPG versions. :'( + if self.binary_version and (self.binary_version <= '1.4.18'): + cmd.append('--debug-level=%s' % self.verbose) + else: + cmd.append('--debug-level %s' % self.verbose) if self.options: [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] if args: [cmd.append(arg) for arg in iter(_sanitise_list(args))] - if self.verbose: - cmd.append('--debug-all') - if ((isinstance(self.verbose, str) and - self.verbose in ['basic', 'advanced', 'expert', 'guru']) - or (isinstance(self.verbose, int) and (1<=self.verbose<=9))): - cmd.append('--debug-level %s' % self.verbose) - return cmd def _open_subprocess(self, args=None, passphrase=False): @@ -592,6 +669,36 @@ class GPGBase(object): log.debug("Finishing reading from stream %r..." % stream.__repr__()) log.debug("Read %4d bytes total" % len(result.data)) + def _set_verbose(self, verbose): + """Check and set our :data:`verbose` attribute. + The debug-level must be a string or an integer. If it is one of + the allowed strings, GnuPG will translate it internally to it's + corresponding integer level: + + basic = 1-2 + advanced = 3-5 + expert = 6-8 + guru = 9+ + + If it's not one of the recognised string levels, then then + entire argument is ignored by GnuPG. :( + + To fix that stupid behaviour, if they wanted debugging but typo'd + the string level (or specified ``verbose=True``), we'll default to + 'basic' logging. + """ + string_levels = ('basic', 'advanced', 'expert', 'guru') + + if verbose is True: + # The caller wants logging, but we need a valid --debug-level + # for gpg. Default to "basic", and warn about the ambiguity. + verbose = 'basic' + + if (isinstance(verbose, str) and not (verbose in string_levels)): + verbose = 'basic' + + self.verbose = verbose + def _collect_output(self, process, result, writer=None, stdin=None): """Drain the subprocesses output streams, writing the collected output to the result. If a writer thread (writing to the subprocess) is given, @@ -699,6 +806,19 @@ class GPGBase(object): ## We could use _handle_io here except for the fact that if the ## passphrase is bad, gpg bails and you can't write the message. result = self._result_map['sign'](self) + + ## If the passphrase is an empty string, the message up to and + ## including its first newline will be cut off before making it to the + ## GnuPG process. Therefore, if the passphrase='' or passphrase=b'', + ## we set passphrase=None. See Issue #82: + ## https://github.com/isislovecruft/python-gnupg/issues/82 + if _util._is_string(passphrase): + passphrase = passphrase if len(passphrase) > 0 else None + elif _util._is_bytes(passphrase): + passphrase = s(passphrase) if len(passphrase) > 0 else None + else: + passphrase = None + proc = self._open_subprocess(args, passphrase is not None) try: if passphrase: @@ -718,6 +838,8 @@ class GPGBase(object): symmetric=False, always_trust=True, output=None, + throw_keyids=False, + hidden_recipients=None, cipher_algo='AES256', digest_algo='SHA512', compress_algo='ZLIB'): @@ -790,6 +912,14 @@ class GPGBase(object): >>> decrypted 'The crow flies at midnight.' + + :param bool throw_keyids: If True, make all **recipients** keyids be + zero'd out in packet information. This is the same as using + **hidden_recipients** for all **recipients**. (Default: False). + + :param list hidden_recipients: A list of recipients that should have + their keyids zero'd out in packet information. + :param str cipher_algo: The cipher algorithm to use. To see available algorithms with your version of GnuPG, do: :command:`$ gpg --with-colons --list-config @@ -841,6 +971,7 @@ class GPGBase(object): ## is decryptable with a passphrase or secretkey. if symmetric: args.append('--symmetric') if encrypt: args.append('--encrypt') + if throw_keyids: args.append('--throw-keyids') if len(recipients) >= 1: log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" @@ -856,39 +987,54 @@ class GPGBase(object): log.info("Can't accept recipient string: %s" % recp) else: - args.append('--recipient %s' % str(recp)) + self._add_recipient_string(args, hidden_recipients, str(recp)) continue ## will give unicode in 2.x as '\uXXXX\uXXXX' - args.append('--recipient %r' % recp) + if isinstance(hidden_recipients, (list, tuple)): + if [s for s in hidden_recipients if recp in str(s)]: + args.append('--hidden-recipient %r' % recp) + else: + args.append('--recipient %r' % recp) + else: + args.append('--recipient %r' % recp) continue if isinstance(recp, str): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) elif (not _util._py3k) and isinstance(recp, basestring): for recp in recipients.split('\x20'): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) elif _util._py3k and isinstance(recp, str): for recp in recipients.split(' '): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) ## ...and now that we've proven py3k is better... - else: - log.debug("Don't know what to do with recipients: '%s'" + log.debug("Don't know what to do with recipients: %r" % recipients) result = self._result_map['crypt'](self) - log.debug("Got data '%s' with type '%s'." - % (data, type(data))) - self._handle_io(args, data, result, - passphrase=passphrase, binary=True) - log.debug("\n%s" % result.data) + log.debug("Got data '%s' with type '%s'." % (data, type(data))) + self._handle_io(args, data, result, passphrase=passphrase, binary=True) + # Avoid writing raw encrypted bytes to terminal loggers and breaking + # them in that adorable way where they spew hieroglyphics until reset: + if armor: + log.debug("\n%s" % result.data) if output_filename: log.info("Writing encrypted output to file: %s" % output_filename) - with open(output_filename, 'w+') as fh: + with open(output_filename, 'wb') as fh: fh.write(result.data) fh.flush() log.info("Encrypted output written successfully.") return result + + def _add_recipient_string(self, args, hidden_recipients, recipient): + if isinstance(hidden_recipients, (list, tuple)): + if [s for s in hidden_recipients if recipient in str(s)]: + args.append('--hidden-recipient %s' % recipient) + else: + args.append('--recipient %s' % recipient) + else: + args.append('--recipient %s' % recipient) -- cgit v1.2.3