diff options
Diffstat (limited to 'gnupg')
-rw-r--r-- | gnupg/_meta.py | 250 | ||||
-rw-r--r-- | gnupg/_parsers.py | 276 | ||||
-rw-r--r-- | gnupg/_trust.py | 8 | ||||
-rw-r--r-- | gnupg/_util.py | 270 | ||||
-rw-r--r-- | gnupg/_version.py | 4 | ||||
-rw-r--r-- | gnupg/gnupg.py | 139 |
6 files changed, 725 insertions, 222 deletions
diff --git a/gnupg/_meta.py b/gnupg/_meta.py index 3aafacd..32ab287 100644 --- a/gnupg/_meta.py +++ b/gnupg/_meta.py @@ -32,14 +32,22 @@ import encodings import locale import os import platform -import psutil import shlex import subprocess import sys import threading +## Using psutil is recommended, but since the extension doesn't run with the +## PyPy interpreter, we'll run even if it's not present. +try: + import psutil +except ImportError: + psutil = None + from . import _parsers from . import _util +from ._util import b +from ._util import s from ._parsers import _check_preferences from ._parsers import _sanitise_list @@ -75,19 +83,49 @@ class GPGMeta(type): instance containing the gpg-agent process' information to ``cls._agent_proc``. + For Unix systems, we check that the effective UID of this + ``python-gnupg`` process is also the owner of the gpg-agent + process. For Windows, we check that the usernames of the owners are + the same. (Sorry Windows users; maybe you should switch to anything + else.) + + .. note: This function will only run if the psutil_ Python extension + is installed. Because psutil won't run with the PyPy interpreter, + use of it is optional (although highly recommended). + + .. _psutil: https://pypi.python.org/pypi/psutil + :returns: True if there exists a gpg-agent process running under the same effective user ID as that of this program. Otherwise, - returns None. + returns False. """ - identity = psutil.Process(os.getpid()).uids + if not psutil: + return False + + this_process = psutil.Process(os.getpid()) + ownership_match = False + + if _util._running_windows: + identity = this_process.username() + else: + identity = this_process.uids + for proc in psutil.process_iter(): if (proc.name == "gpg-agent") and proc.is_running: log.debug("Found gpg-agent process with pid %d" % proc.pid) - if proc.uids == identity: - log.debug( - "Effective UIDs of this process and gpg-agent match") - setattr(cls, '_agent_proc', proc) - return True + if _util._running_windows: + if proc.username() == identity: + ownership_match = True + else: + if proc.uids == identity: + ownership_match = True + + if ownership_match: + log.debug("Effective UIDs of this process and gpg-agent match") + setattr(cls, '_agent_proc', proc) + return True + + return False class GPGBase(object): @@ -111,7 +149,7 @@ class GPGBase(object): def __init__(self, binary=None, home=None, keyring=None, secring=None, use_agent=False, default_preference_list=None, - verbose=False, options=None): + ignore_homedir_permissions=False, verbose=False, options=None): """Create a ``GPGBase``. This class is used to set up properties for controlling the behaviour @@ -134,13 +172,18 @@ class GPGBase(object): :ivar str secring: The filename in **homedir** to use as the keyring file for secret keys. """ + self.ignore_homedir_permissions = ignore_homedir_permissions self.binary = _util._find_binary(binary) - self.homedir = home if home else _util._conf + self.homedir = os.path.expanduser(home) if home else _util._conf pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg' sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg' self.keyring = os.path.join(self._homedir, pub) self.secring = os.path.join(self._homedir, sec) - self.options = _parsers._sanitise(options) if options else None + self.options = list(_parsers._sanitise_list(options)) if options else None + + #: The version string of our GnuPG binary + self.binary_version = '0.0.0' + self.verbose = False if default_preference_list: self._prefs = _check_preferences(default_preference_list, 'all') @@ -155,6 +198,14 @@ class GPGBase(object): self._filesystemencoding = encodings.normalize_encoding( sys.getfilesystemencoding().lower()) + # Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49 + # + # During `line = stream.readline()` in `_read_response()`, the Python + # codecs module will choke on Unicode data, so we globally monkeypatch + # the "strict" error handler to use the builtin `replace_errors` + # handler: + codecs.register_error('strict', codecs.replace_errors) + self._keyserver = 'hkp://wwwkeys.pgp.net' self.__generated_keys = os.path.join(self.homedir, 'generated-keys') @@ -164,18 +215,12 @@ class GPGBase(object): "'verbose' must be boolean, string, or 0 <= n <= 9" assert isinstance(use_agent, bool), "'use_agent' must be boolean" if self.options is not None: - assert isinstance(self.options, str), "options not string" + assert isinstance(self.options, list), "options not list" except (AssertionError, AttributeError) as ae: log.error("GPGBase.__init__(): %s" % str(ae)) raise RuntimeError(str(ae)) else: - if verbose is True: - # The caller wants logging, but we need a valid --debug-level - # for gpg. Default to "basic", and warn about the ambiguity. - # (garrettr) - verbose = "basic" - log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging') - self.verbose = verbose + self._set_verbose(verbose) self.use_agent = use_agent if hasattr(self, '_agent_proc') \ @@ -183,6 +228,9 @@ class GPGBase(object): if hasattr(self, '__remove_path__'): self.__remove_path__('pinentry') + # Assign our self.binary_version attribute: + self._check_sane_and_get_gpg_version() + def __remove_path__(self, prog=None, at_exit=True): """Remove the directories containing a program from the system's ``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it @@ -368,18 +416,21 @@ class GPGBase(object): log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd) _util._create_if_necessary(hd) - try: - log.debug("GPGBase._homedir_setter(): checking permissions") - assert _util._has_readwrite(hd), \ - "Homedir '%s' needs read/write permissions" % hd - except AssertionError as ae: - msg = ("Unable to set '%s' as GnuPG homedir" % directory) - log.debug("GPGBase.homedir.setter(): %s" % msg) - log.debug(str(ae)) - raise RuntimeError(str(ae)) - else: - log.info("Setting homedir to '%s'" % hd) + if self.ignore_homedir_permissions: self._homedir = hd + else: + try: + log.debug("GPGBase._homedir_setter(): checking permissions") + assert _util._has_readwrite(hd), \ + "Homedir '%s' needs read/write permissions" % hd + except AssertionError as ae: + msg = ("Unable to set '%s' as GnuPG homedir" % directory) + log.debug("GPGBase.homedir.setter(): %s" % msg) + log.debug(str(ae)) + raise RuntimeError(str(ae)) + else: + log.info("Setting homedir to '%s'" % hd) + self._homedir = hd homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter) @@ -436,6 +487,24 @@ class GPGBase(object): _generated_keys = _util.InheritableProperty(_generated_keys_getter, _generated_keys_setter) + def _check_sane_and_get_gpg_version(self): + """Check that everything runs alright, and grab the gpg binary's + version number while we're at it, storing it as :data:`binary_version`. + + :raises RuntimeError: if we cannot invoke the gpg binary. + """ + proc = self._open_subprocess(["--list-config", "--with-colons"]) + result = self._result_map['list'](self) + self._read_data(proc.stdout, result) + if proc.returncode: + raise RuntimeError("Error invoking gpg: %s" % result.data) + else: + proc.terminate() + + version_line = str(result.data).partition(':version:')[2] + self.binary_version = version_line.split('\n')[0] + log.debug("Using GnuPG version %s" % self.binary_version) + def _make_args(self, args, passphrase=False): """Make a list of command line elements for GPG. @@ -470,21 +539,29 @@ class GPGBase(object): if passphrase: cmd.append('--batch --passphrase-fd 0') - if self.use_agent: cmd.append('--use-agent') - else: cmd.append('--no-use-agent') + if self.use_agent is True: cmd.append('--use-agent') + elif self.use_agent is False: cmd.append('--no-use-agent') + + # The arguments for debugging and verbosity should be placed into the + # cmd list before the options/args in order to resolve Issue #76: + # https://github.com/isislovecruft/python-gnupg/issues/76 + if self.verbose: + cmd.append('--debug-all') + + if (isinstance(self.verbose, str) or + (isinstance(self.verbose, int) and (self.verbose >= 1))): + # GnuPG<=1.4.18 parses the `--debug-level` command in a way + # that is incompatible with all other GnuPG versions. :'( + if self.binary_version and (self.binary_version <= '1.4.18'): + cmd.append('--debug-level=%s' % self.verbose) + else: + cmd.append('--debug-level %s' % self.verbose) if self.options: [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] if args: [cmd.append(arg) for arg in iter(_sanitise_list(args))] - if self.verbose: - cmd.append('--debug-all') - if ((isinstance(self.verbose, str) and - self.verbose in ['basic', 'advanced', 'expert', 'guru']) - or (isinstance(self.verbose, int) and (1<=self.verbose<=9))): - cmd.append('--debug-level %s' % self.verbose) - return cmd def _open_subprocess(self, args=None, passphrase=False): @@ -592,6 +669,36 @@ class GPGBase(object): log.debug("Finishing reading from stream %r..." % stream.__repr__()) log.debug("Read %4d bytes total" % len(result.data)) + def _set_verbose(self, verbose): + """Check and set our :data:`verbose` attribute. + The debug-level must be a string or an integer. If it is one of + the allowed strings, GnuPG will translate it internally to it's + corresponding integer level: + + basic = 1-2 + advanced = 3-5 + expert = 6-8 + guru = 9+ + + If it's not one of the recognised string levels, then then + entire argument is ignored by GnuPG. :( + + To fix that stupid behaviour, if they wanted debugging but typo'd + the string level (or specified ``verbose=True``), we'll default to + 'basic' logging. + """ + string_levels = ('basic', 'advanced', 'expert', 'guru') + + if verbose is True: + # The caller wants logging, but we need a valid --debug-level + # for gpg. Default to "basic", and warn about the ambiguity. + verbose = 'basic' + + if (isinstance(verbose, str) and not (verbose in string_levels)): + verbose = 'basic' + + self.verbose = verbose + def _collect_output(self, process, result, writer=None, stdin=None): """Drain the subprocesses output streams, writing the collected output to the result. If a writer thread (writing to the subprocess) is given, @@ -699,6 +806,19 @@ class GPGBase(object): ## We could use _handle_io here except for the fact that if the ## passphrase is bad, gpg bails and you can't write the message. result = self._result_map['sign'](self) + + ## If the passphrase is an empty string, the message up to and + ## including its first newline will be cut off before making it to the + ## GnuPG process. Therefore, if the passphrase='' or passphrase=b'', + ## we set passphrase=None. See Issue #82: + ## https://github.com/isislovecruft/python-gnupg/issues/82 + if _util._is_string(passphrase): + passphrase = passphrase if len(passphrase) > 0 else None + elif _util._is_bytes(passphrase): + passphrase = s(passphrase) if len(passphrase) > 0 else None + else: + passphrase = None + proc = self._open_subprocess(args, passphrase is not None) try: if passphrase: @@ -718,6 +838,8 @@ class GPGBase(object): symmetric=False, always_trust=True, output=None, + throw_keyids=False, + hidden_recipients=None, cipher_algo='AES256', digest_algo='SHA512', compress_algo='ZLIB'): @@ -790,6 +912,14 @@ class GPGBase(object): >>> decrypted 'The crow flies at midnight.' + + :param bool throw_keyids: If True, make all **recipients** keyids be + zero'd out in packet information. This is the same as using + **hidden_recipients** for all **recipients**. (Default: False). + + :param list hidden_recipients: A list of recipients that should have + their keyids zero'd out in packet information. + :param str cipher_algo: The cipher algorithm to use. To see available algorithms with your version of GnuPG, do: :command:`$ gpg --with-colons --list-config @@ -841,6 +971,7 @@ class GPGBase(object): ## is decryptable with a passphrase or secretkey. if symmetric: args.append('--symmetric') if encrypt: args.append('--encrypt') + if throw_keyids: args.append('--throw-keyids') if len(recipients) >= 1: log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" @@ -856,39 +987,54 @@ class GPGBase(object): log.info("Can't accept recipient string: %s" % recp) else: - args.append('--recipient %s' % str(recp)) + self._add_recipient_string(args, hidden_recipients, str(recp)) continue ## will give unicode in 2.x as '\uXXXX\uXXXX' - args.append('--recipient %r' % recp) + if isinstance(hidden_recipients, (list, tuple)): + if [s for s in hidden_recipients if recp in str(s)]: + args.append('--hidden-recipient %r' % recp) + else: + args.append('--recipient %r' % recp) + else: + args.append('--recipient %r' % recp) continue if isinstance(recp, str): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) elif (not _util._py3k) and isinstance(recp, basestring): for recp in recipients.split('\x20'): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) elif _util._py3k and isinstance(recp, str): for recp in recipients.split(' '): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) ## ...and now that we've proven py3k is better... - else: - log.debug("Don't know what to do with recipients: '%s'" + log.debug("Don't know what to do with recipients: %r" % recipients) result = self._result_map['crypt'](self) - log.debug("Got data '%s' with type '%s'." - % (data, type(data))) - self._handle_io(args, data, result, - passphrase=passphrase, binary=True) - log.debug("\n%s" % result.data) + log.debug("Got data '%s' with type '%s'." % (data, type(data))) + self._handle_io(args, data, result, passphrase=passphrase, binary=True) + # Avoid writing raw encrypted bytes to terminal loggers and breaking + # them in that adorable way where they spew hieroglyphics until reset: + if armor: + log.debug("\n%s" % result.data) if output_filename: log.info("Writing encrypted output to file: %s" % output_filename) - with open(output_filename, 'w+') as fh: + with open(output_filename, 'wb') as fh: fh.write(result.data) fh.flush() log.info("Encrypted output written successfully.") return result + + def _add_recipient_string(self, args, hidden_recipients, recipient): + if isinstance(hidden_recipients, (list, tuple)): + if [s for s in hidden_recipients if recipient in str(s)]: + args.append('--hidden-recipient %s' % recipient) + else: + args.append('--recipient %s' % recipient) + else: + args.append('--recipient %s' % recipient) diff --git a/gnupg/_parsers.py b/gnupg/_parsers.py index 2e1767e..9de57d2 100644 --- a/gnupg/_parsers.py +++ b/gnupg/_parsers.py @@ -367,7 +367,7 @@ def _sanitise(*args): checked += (val + " ") log.debug("_check_option(): No checks for %s" % val) - return checked + return checked.rstrip(' ') is_flag = lambda x: x.startswith('--') @@ -475,6 +475,8 @@ def _get_options_group(group=None): '--export-secret-subkeys', '--fingerprint', '--gen-revoke', + '--hidden-encrypt-to', + '--hidden-recipient', '--list-key', '--list-keys', '--list-public-keys', @@ -514,6 +516,7 @@ def _get_options_group(group=None): '--import', '--verify', '--verify-files', + '--output', ]) #: These options expect a string. see :func:`_check_preferences`. pref_options = frozenset(['--digest-algo', @@ -555,6 +558,9 @@ def _get_options_group(group=None): '--list-public-keys', '--list-secret-keys', '--list-sigs', + '--lock-multiple', + '--lock-never', + '--lock-once', '--no-default-keyring', '--no-default-recipient', '--no-emit-version', @@ -566,6 +572,7 @@ def _get_options_group(group=None): '--quiet', '--sign', '--symmetric', + '--throw-keyids', '--use-agent', '--verbose', '--version', @@ -905,6 +912,7 @@ class Sign(object): timestamp = None #: xxx fill me in what = None + status = None def __init__(self, gpg): self._gpg = gpg @@ -927,9 +935,9 @@ class Sign(object): :raises: :exc:`~exceptions.ValueError` if the status message is unknown. """ if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", - "INV_SGNR", "SIGEXPIRED"): - pass + "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED", + "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"): + self.status = key.replace("_", " ").lower() elif key == "SIG_CREATED": (self.sig_type, self.sig_algo, self.sig_hash_algo, self.what, self.timestamp, self.fingerprint) = value.split() @@ -946,6 +954,7 @@ class Sign(object): else: raise ValueError("Unknown status message: %r" % key) + class ListKeys(list): """Handle status messages for --list-keys. @@ -956,7 +965,6 @@ class ListKeys(list): | crs = X.509 certificate and private key available | ssb = secret subkey (secondary key) | uat = user attribute (same as user id except for field 10). - | sig = signature | rev = revocation signature | pkd = public key data (special field format, see below) | grp = reserved for gpgsm @@ -967,8 +975,10 @@ class ListKeys(list): super(ListKeys, self).__init__() self._gpg = gpg self.curkey = None + self.curuid = None self.fingerprints = [] self.uids = [] + self.sigs = {} def key(self, args): vars = (""" @@ -978,8 +988,12 @@ class ListKeys(list): for i in range(len(vars)): self.curkey[vars[i]] = args[i] self.curkey['uids'] = [] + self.curkey['sigs'] = {} if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) + self.curuid = self.curkey['uid'] + self.curkey['uids'].append(self.curuid) + self.sigs[self.curuid] = set() + self.curkey['sigs'][self.curuid] = [] del self.curkey['uid'] self.curkey['subkeys'] = [] self.append(self.curkey) @@ -994,8 +1008,21 @@ class ListKeys(list): uid = args[9] uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) self.curkey['uids'].append(uid) + self.curuid = uid + self.curkey['sigs'][uid] = [] + self.sigs[uid] = set() self.uids.append(uid) + def sig(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + sig = {} + for i in range(len(vars)): + sig[vars[i]] = args[i] + self.curkey['sigs'][self.curuid].append(sig) + self.sigs[self.curuid].add(sig['keyid']) + def sub(self, args): subkey = [args[4], args[11]] self.curkey['subkeys'].append(subkey) @@ -1005,42 +1032,52 @@ class ListKeys(list): class ImportResult(object): - """Parse GnuPG status messages for key import operations. - - :type gpg: :class:`gnupg.GPG` - :param gpg: An instance of :class:`gnupg.GPG`. - """ - _ok_reason = {'0': 'Not actually changed', - '1': 'Entirely new key', - '2': 'New user IDs', - '4': 'New signatures', - '8': 'New subkeys', - '16': 'Contains private key', - '17': 'Contains private key',} - - _problem_reason = { '0': 'No specific reason given', - '1': 'Invalid Certificate', - '2': 'Issuer Certificate missing', - '3': 'Certificate Chain too long', - '4': 'Error storing certificate', } - - _fields = '''count no_user_id imported imported_rsa unchanged - n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups - not_imported'''.split() - _counts = OrderedDict( - zip(_fields, [int(0) for x in range(len(_fields))]) ) - - #: A list of strings containing the fingerprints of the GnuPG keyIDs - #: imported. - fingerprints = list() - - #: A list containing dictionaries with information gathered on keys - #: imported. - results = list() + """Parse GnuPG status messages for key import operations.""" def __init__(self, gpg): + """Start parsing the results of a key import operation. + + :type gpg: :class:`gnupg.GPG` + :param gpg: An instance of :class:`gnupg.GPG`. + """ self._gpg = gpg - self.counts = self._counts + + #: A map from GnuPG codes shown with the ``IMPORT_OK`` status message + #: to their human-meaningful English equivalents. + self._ok_reason = {'0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + '17': 'Contains private key',} + + #: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status + #: message to their human-meaningful English equivalents. + self._problem_reason = { '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', } + + #: All the possible status messages pertaining to actions taken while + #: importing a key. + self._fields = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups + not_imported'''.split() + + #: Counts of all the status message results, :data:`_fields` which + #: have appeared. + self.counts = OrderedDict( + zip(self._fields, [int(0) for x in range(len(self._fields))])) + + #: A list of strings containing the fingerprints of the GnuPG keyIDs + #: imported. + self.fingerprints = list() + + #: A list containing dictionaries with information gathered on keys + #: imported. + self.results = list() def __nonzero__(self): """Override the determination for truthfulness evaluation. @@ -1048,7 +1085,7 @@ class ImportResult(object): :rtype: bool :returns: True if we have immport some keys, False otherwise. """ - if self.counts.not_imported > 0: return False + if self.counts['not_imported'] > 0: return False if len(self.fingerprints) == 0: return False return True __bool__ = __nonzero__ @@ -1056,7 +1093,7 @@ class ImportResult(object): def _handle_status(self, key, value): """Parse a status code from the attached GnuPG process. - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + :raises ValueError: if the status message is unknown. """ if key == "IMPORTED": # this duplicates info we already see in import_ok & import_problem @@ -1189,6 +1226,37 @@ class Verify(object): self.trust_level = None #: The string corresponding to the ``trust_level`` number. self.trust_text = None + #: The subpackets. These are stored as a dictionary, in the following + #: form: + #: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS, + #: 'length': LENGTH, + #: 'data': DATA}, + #: 'ANOTHER_SUBPACKET_NUMBER': {...}} + self.subpackets = {} + #: The signature or key notations. These are also stored as a + #: dictionary, in the following form: + #: + #: Verify.notations = {NOTATION_NAME: NOTATION_DATA} + #: + #: For example, the Bitcoin core developer, Peter Todd, encodes in + #: every signature the header of the latest block on the Bitcoin + #: blockchain (to prove that a GnuPG signature that Peter made was made + #: *after* a specific point in time). These look like: + #: + #: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a + #: + #: Which python-gnupg would store as: + #: + #: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a' + self.notations = {} + + #: This will be a str or None. If not None, it is the last + #: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're + #: not assured that a ``NOTATION_DATA`` status will arrive *immediately* + #: after its corresponding ``NOTATION_NAME``, we store the latest + #: ``NOTATION_NAME`` here until we get its corresponding + #: ``NOTATION_DATA``. + self._last_notation_name = None def __nonzero__(self): """Override the determination for truthfulness evaluation. @@ -1209,7 +1277,8 @@ class Verify(object): self.trust_level = self.TRUST_LEVELS[key] elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", - "DECRYPTION_OKAY", "INV_SGNR"): + "DECRYPTION_OKAY", "INV_SGNR", "PROGRESS", + "PINENTRY_LAUNCHED"): pass elif key == "BADSIG": self.valid = False @@ -1220,6 +1289,7 @@ class Verify(object): self.status = 'signature good' self.key_id, self.username = value.split(None, 1) elif key == "VALIDSIG": + self.valid = True (self.fingerprint, self.creation_date, self.sig_timestamp, @@ -1245,17 +1315,106 @@ class Verify(object): self.valid = False self.key_id = value self.status = 'no public key' + # These are useless in Verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures make with expired key, + # the relevant flags are REVKEYSIG and KEYREVOKED. elif key in ("KEYEXPIRED", "SIGEXPIRED"): - # these are useless in verify, since they are spit out for any - # pub/subkeys on the key, not just the one doing the signing. - # if we want to check for signatures with expired key, - # the relevant flag is EXPKEYSIG. pass + # The signature has an expiration date which has already passed + # (EXPKEYSIG), or the signature has been revoked (REVKEYSIG): elif key in ("EXPKEYSIG", "REVKEYSIG"): - # signed with expired or revoked key self.valid = False self.key_id = value.split()[0] self.status = (('%s %s') % (key[:3], key[3:])).lower() + # This is super annoying, and bad design on the part of GnuPG, in my + # opinion. + # + # This flag can get triggered if a valid signature is made, and then + # later the key (or subkey) which created the signature is + # revoked. When this happens, GnuPG will output: + # + # REVKEYSIG 075BFD18B365D34C Test Expired Key <test@python-gnupg.git> + # VALIDSIG DAB69B05F591640B7F4DCBEA075BFD18B365D34C 2014-09-26 1411700539 0 4 0 1 2 00 4BA800F77452A6C29447FF20F4AF76ACBBE22CE2 + # KEYREVOKED + # + # Meaning that we have a timestamp for when the signature was created, + # and we know that the signature is valid, but since GnuPG gives us no + # timestamp for when the key was revoked... we have no ability to + # determine if the valid signature was made *before* the signing key + # was revoked or *after*. Meaning that if you are like me and you sign + # all your software releases and git commits, and you also practice + # good opsec by doing regular key rotations, your old signatures made + # by your expired/revoked keys (even though they were created when the + # key was still good) are considered bad because GnuPG is a + # braindamaged piece of shit. + # + # Software engineering, motherfuckers, DO YOU SPEAK IT? + # + # The signing key which created the signature has since been revoked + # (KEYREVOKED), and we're going to ignore it (but add something to the + # status message): + elif key in ("KEYREVOKED"): + self.status = '\n'.join([self.status, "key revoked"]) + # SIG_SUBPACKET <type> <flags> <len> <data> + # This indicates that a signature subpacket was seen. The format is + # the same as the "spk" record above. + # + # [...] + # + # SPK - Signature subpacket records + # + # - Field 2 :: Subpacket number as per RFC-4880 and later. + # - Field 3 :: Flags in hex. Currently the only two bits assigned + # are 1, to indicate that the subpacket came from the + # hashed part of the signature, and 2, to indicate the + # subpacket was marked critical. + # - Field 4 :: Length of the subpacket. Note that this is the + # length of the subpacket, and not the length of field + # 5 below. Due to the need for %-encoding, the length + # of field 5 may be up to 3x this value. + # - Field 5 :: The subpacket data. Printable ASCII is shown as + # ASCII, but other values are rendered as %XX where XX + # is the hex value for the byte. + elif key in ("SIG_SUBPACKET"): + fields = value.split() + try: + subpacket_number = fields[0] + self.subpackets[subpacket_number] = {'flags': None, + 'length': None, + 'data': None} + except IndexError: + # We couldn't parse the subpacket type (an RFC4880 + # identifier), so we shouldn't continue parsing. + pass + else: + # Pull as much data as we can parse out of the subpacket: + try: + self.subpackets[subpacket_number]['flags'] = fields[1] + self.subpackets[subpacket_number]['length'] = fields[2] + self.subpackets[subpacket_number]['data'] = fields[3] + except IndexError: + pass + # NOTATION_ + # There are actually two related status codes to convey notation + # data: + # + # - NOTATION_NAME <name> + # - NOTATION_DATA <string> + # + # <name> and <string> are %XX escaped; the data may be split among + # several NOTATION_DATA lines. + elif key.startswith("NOTATION_"): + if key.endswith("NAME"): + self.notations[value] = str() + self._last_notation_name = value + elif key.endswith("DATA"): + if self._last_notation_name is not None: + # Append the NOTATION_DATA to any previous data we + # received for that NOTATION_NAME: + self.notations[self._last_notation_name] += value + else: + pass else: raise ValueError("Unknown status message: %r" % key) @@ -1360,26 +1519,33 @@ class ListPackets(object): self.need_passphrase_sym = None #: The keyid and uid which this data is encrypted to. self.userid_hint = None + #: The first key that we detected that a message was encrypted + #: to. This is provided for backwards compatibility. As of Issue #77_, + #: the ``encrypted_to`` attribute should be used instead. + self.key = None + #: A list of keyid's that the message has been encrypted to. + self.encrypted_to = [] def _handle_status(self, key, value): """Parse a status code from the attached GnuPG process. :raises: :exc:`~exceptions.ValueError` if the status message is unknown. """ - if key == 'NODATA': + if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED', + 'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'): + pass + elif key == 'NODATA': self.status = nodata(value) elif key == 'ENC_TO': - # This will only capture keys in our keyring. In the future we - # may want to include multiple unknown keys in this list. - self.key, _, _ = value.split() - elif key == 'NEED_PASSPHRASE': + key, _, _ = value.split() + if not self.key: + self.key = key + self.encrypted_to.append(key) + elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'): self.need_passphrase = True elif key == 'NEED_PASSPHRASE_SYM': self.need_passphrase_sym = True elif key == 'USERID_HINT': self.userid_hint = value.strip().split() - elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED', - 'END_DECRYPTION'): - pass else: raise ValueError("Unknown status message: %r" % key) diff --git a/gnupg/_trust.py b/gnupg/_trust.py index 514ae8c..224e7b6 100644 --- a/gnupg/_trust.py +++ b/gnupg/_trust.py @@ -57,7 +57,7 @@ def export_ownertrust(cls, trustdb=None): except (OSError, IOError) as err: log.debug(str(err)) - export_proc = cls._open_subprocess('--export-ownertrust') + export_proc = cls._open_subprocess(['--export-ownertrust']) tdb = open(trustdb, 'wb') _util._threaded_copy_data(export_proc.stdout, tdb) @@ -71,7 +71,7 @@ def import_ownertrust(self, trustdb=None): if trustdb is None: trustdb = os.path.join(cls.homedir, 'trustdb.gpg') - import_proc = cls._open_subprocess('--import-ownertrust') + import_proc = cls._open_subprocess(['--import-ownertrust']) tdb = open(trustdb, 'rb') _util._threaded_copy_data(tdb, import_proc.stdin) @@ -98,6 +98,6 @@ def fix_trustdb(cls, trustdb=None): """ if trustdb is None: trustdb = os.path.join(cls.homedir, 'trustdb.gpg') - export_proc = cls._open_subprocess('--export-ownertrust') - import_proc = cls._open_subprocess('--import-ownertrust') + export_proc = cls._open_subprocess(['--export-ownertrust']) + import_proc = cls._open_subprocess(['--import-ownertrust']) _util._threaded_copy_data(export_proc.stdout, import_proc.stdin) diff --git a/gnupg/_util.py b/gnupg/_util.py index e1e14ab..79855ac 100644 --- a/gnupg/_util.py +++ b/gnupg/_util.py @@ -28,18 +28,58 @@ from time import mktime import codecs import encodings import os -import psutil import threading import random import re import string import sys +# These are all the classes which are stream-like; they are used in +# :func:`_is_stream`. +_STREAMLIKE_TYPES = [] + +# These StringIO classes are actually utilised. try: + import io from io import StringIO from io import BytesIO except ImportError: from cStringIO import StringIO +else: + # The io.IOBase type covers the above example for an open file handle in + # Python3, as well as both io.BytesIO and io.StringIO. + _STREAMLIKE_TYPES.append(io.IOBase) + +# The remaining StringIO classes which are imported are used to determine if a +# object is a stream-like in :func:`_is_stream`. +if 2 == sys.version_info[0]: + # Import the StringIO class from the StringIO module since it is a + # commonly used stream class. It is distinct from either of the + # StringIO's that may be loaded in the above try/except clause, so the + # name is prefixed with an underscore to distinguish it. + from StringIO import StringIO as _StringIO_StringIO + _STREAMLIKE_TYPES.append(_StringIO_StringIO) + + # Import the cStringIO module to test for the cStringIO stream types, + # InputType and OutputType. See + # http://stackoverflow.com/questions/14735295/to-check-an-instance-is-stringio + import cStringIO as _cStringIO + _STREAMLIKE_TYPES.append(_cStringIO.InputType) + _STREAMLIKE_TYPES.append(_cStringIO.OutputType) + + # In Python2: + # + # >>> type(open('README.md', 'rb')) + # <open file 'README.md', mode 'rb' at 0x7f9493951d20> + # + # whereas, in Python3, the `file` builtin doesn't exist and instead we get: + # + # >>> type(open('README.md', 'rb')) + # <_io.BufferedReader name='README.md'> + # + # which is covered by the above addition of io.IOBase. + _STREAMLIKE_TYPES.append(file) + from . import _logger @@ -56,6 +96,9 @@ try: except NameError: _py3k = True +_running_windows = False +if "win" in sys.platform: + _running_windows = True ## Directory shortcuts: ## we don't want to use this one because it writes to the install dir: @@ -63,6 +106,20 @@ except NameError: _here = os.path.join(os.getcwd(), 'gnupg') ## current dir _test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp _user = os.environ.get('HOME') ## $HOME + +# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all +# environs. https://github.com/isislovecruft/python-gnupg/issues/74 +if not _user: + _user = '/tmp/python-gnupg' + try: + os.makedirs(_user) + except (OSError, IOError): + _user = os.getcwd() + # If we can't use $HOME, but we have (or can create) a + # /tmp/python-gnupg/gnupghome directory, then we'll default to using + # that. Otherwise, we'll use the current directory + /gnupghome. + _user = os.path.sep.join([_user, 'gnupghome']) + _ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg') ## $HOME/.config/python-gnupg @@ -70,6 +127,9 @@ _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg') ## Logger is disabled by default log = _logger.create_logger(0) +#: Compiled regex for determining a GnuPG binary's version: +_VERSION_STRING_REGEX = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*') + def find_encodings(enc=None, system=False): """Find functions for encoding translations for a specific codec. @@ -105,6 +165,51 @@ def find_encodings(enc=None, system=False): return coder + +if _py3k: + def b(x): + """See http://python3porting.com/problems.html#nicer-solutions""" + coder = find_encodings() + if isinstance(x, bytes): + return coder.encode(x.decode(coder.name))[0] + else: + return coder.encode(x)[0] + + def s(x): + if isinstance(x, str): + return x + elif isinstance(x, (bytes, bytearray)): + return x.decode(find_encodings().name) + else: + raise NotImplemented +else: + def b(x): + """See http://python3porting.com/problems.html#nicer-solutions""" + return x + + def s(x): + if isinstance(x, basestring): + return x + elif isinstance(x, (bytes, bytearray)): + return x.decode(find_encodings().name) + else: + raise NotImplemented + +def binary(data): + coder = find_encodings() + + if _py3k and isinstance(data, bytes): + encoded = coder.encode(data.decode(coder.name))[0] + elif _py3k and isinstance(data, str): + encoded = coder.encode(data)[0] + elif not _py3k and type(data) is not str: + encoded = coder.encode(data)[0] + else: + encoded = data + + return encoded + + def author_info(name, contact=None, public_key=None): """Easy object-oriented representation of contributor info. @@ -124,8 +229,6 @@ def _copy_data(instream, outstream): """ sent = 0 - coder = find_encodings() - while True: if ((_py3k and isinstance(instream, str)) or (not _py3k and isinstance(instream, basestring))): @@ -135,24 +238,64 @@ def _copy_data(instream, outstream): data = instream.read(1024) if len(data) == 0: break + sent += len(data) - log.debug("Sending chunk %d bytes:\n%s" - % (sent, data)) - try: - outstream.write(data) - except UnicodeError: + encoded = binary(data) + log.debug("Sending %d bytes of data..." % sent) + log.debug("Encoded data (type %s):\n%s" % (type(encoded), encoded)) + + if not _py3k: try: - outstream.write(coder.encode(data)) - except IOError: - log.exception("Error sending data: Broken pipe") + outstream.write(encoded) + except IOError as ioe: + # Can get 'broken pipe' errors even when all data was sent + if 'Broken pipe' in str(ioe): + log.error('Error sending data: Broken pipe') + else: + log.exception(ioe) break - except IOError as ioe: - # Can get 'broken pipe' errors even when all data was sent - if 'Broken pipe' in str(ioe): - log.error('Error sending data: Broken pipe') else: - log.exception(ioe) - break + log.debug("Wrote data type <type 'str'> to outstream.") + else: + try: + outstream.write(bytes(encoded)) + except TypeError as te: + # XXX FIXME This appears to happen because + # _threaded_copy_data() sometimes passes the `outstream` as an + # object with type <_io.BufferredWriter> and at other times + # with type <encodings.utf_8.StreamWriter>. We hit the + # following error when the `outstream` has type + # <encodings.utf_8.StreamWriter>. + if not "convert 'bytes' object to str implicitly" in str(te): + log.error(str(te)) + try: + outstream.write(encoded.decode()) + except TypeError as yate: + # We hit the "'str' does not support the buffer interface" + # error in Python3 when the `outstream` is an io.BytesIO and + # we try to write a str to it. We don't care about that + # error, we'll just try again with bytes. + if not "does not support the buffer interface" in str(yate): + log.error(str(yate)) + except IOError as ioe: + # Can get 'broken pipe' errors even when all data was sent + if 'Broken pipe' in str(ioe): + log.error('Error sending data: Broken pipe') + else: + log.exception(ioe) + break + else: + log.debug("Wrote data type <class 'str'> outstream.") + except IOError as ioe: + # Can get 'broken pipe' errors even when all data was sent + if 'Broken pipe' in str(ioe): + log.error('Error sending data: Broken pipe') + else: + log.exception(ioe) + break + else: + log.debug("Wrote data type <class 'bytes'> to outstream.") + try: outstream.close() except IOError as ioe: @@ -260,6 +403,8 @@ def _find_binary(binary=None): """ found = None if binary is not None: + if os.path.isabs(binary) and os.path.isfile(binary): + return binary if not os.path.isabs(binary): try: found = _which(binary) @@ -272,7 +417,7 @@ def _find_binary(binary=None): elif os.access(binary, os.X_OK): found = binary if found is None: - try: found = _which('gpg')[0] + try: found = _which('gpg', abspath_only=True, disallow_symlinks=True)[0] except IndexError as ie: log.error("Could not find binary for 'gpg'.") try: found = _which('gpg2')[0] @@ -281,14 +426,7 @@ def _find_binary(binary=None): if found is None: raise RuntimeError("GnuPG is not installed!") - try: - assert os.path.isabs(found), "Path to gpg binary not absolute" - assert not os.path.islink(found), "Path to gpg binary is symlink" - assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary" - except (AssertionError, AttributeError) as ae: - log.error(str(ae)) - else: - return found + return found def _has_readwrite(path): """ @@ -335,7 +473,32 @@ def _is_stream(input): :rtype: bool :returns: True if :param:input is a stream, False if otherwise. """ - return isinstance(input, BytesIO) or isinstance(input, StringIO) + return isinstance(input, tuple(_STREAMLIKE_TYPES)) + +def _is_string(thing): + """Check that **thing** is a string. The definition of the latter depends + upon the Python version. + + :param thing: The thing to check if it's a string. + :rtype: bool + :returns: ``True`` if **thing** is string (or unicode in Python2). + """ + if (_py3k and isinstance(thing, str)): + return True + if (not _py3k and isinstance(thing, basestring)): + return True + return False + +def _is_bytes(thing): + """Check that **thing** is bytes. + + :param thing: The thing to check if it's bytes. + :rtype: bool + :returns: ``True`` if **thing** is bytes or a bytearray. + """ + if isinstance(thing, (bytes, bytearray)): + return True + return False def _is_list_or_tuple(instance): """Check that ``instance`` is a list or tuple. @@ -368,21 +531,26 @@ def _is_gpg2(version): return True return False -def _make_binary_stream(s, encoding): - """ - xxx fill me in +def _make_binary_stream(thing, encoding=None, armor=True): + """Encode **thing**, then make it stream/file-like. + + :param thing: The thing to turn into a encoded stream. + :rtype: ``io.BytesIO`` or ``io.StringIO``. + :returns: The encoded **thing**, wrapped in an ``io.BytesIO`` (if + available), otherwise wrapped in a ``io.StringIO``. """ + if _py3k: + if isinstance(thing, str): + thing = thing.encode(encoding) + else: + if type(thing) is not str: + thing = thing.encode(encoding) + try: - if _py3k: - if isinstance(s, str): - s = s.encode(encoding) - else: - if type(s) is not str: - s = s.encode(encoding) - from io import BytesIO - rv = BytesIO(s) - except ImportError: - rv = StringIO(s) + rv = BytesIO(thing) + except NameError: + rv = StringIO(thing) + return rv def _make_passphrase(length=None, save=False, file=None): @@ -403,7 +571,7 @@ def _make_passphrase(length=None, save=False, file=None): passphrase = _make_random_string(length) if save: - ruid, euid, suid = psutil.Process(os.getpid()).uids + ruid, euid, suid = os.getresuid() gid = os.getgid() now = mktime(localtime()) @@ -434,8 +602,7 @@ def _match_version_string(version): :param str version: A version string in the form x.x.x """ - regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*') - matched = regex.match(version) + matched = _VERSION_STRING_REGEX.match(version) g = matched.groups() major, minor, micro = int(g[0]), int(g[2]), int(g[4]) return (major, minor, micro) @@ -485,7 +652,7 @@ def _utc_epoch(): """Get the seconds since epoch.""" return int(mktime(localtime())) -def _which(executable, flags=os.X_OK): +def _which(executable, flags=os.X_OK, abspath_only=False, disallow_symlinks=False): """Borrowed from Twisted's :mod:twisted.python.proutils . Search PATH for executable files with the given name. @@ -508,6 +675,17 @@ def _which(executable, flags=os.X_OK): :returns: A list of the full paths to files found, in the order in which they were found. """ + def _can_allow(p): + if not os.access(p, flags): + return False + if abspath_only and not os.path.abspath(p): + log.warn('Ignoring %r (path is not absolute)', p) + return False + if disallow_symlinks and os.path.islink(p): + log.warn('Ignoring %r (path is a symlink)', p) + return False + return True + result = [] exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep)) path = os.environ.get('PATH', None) @@ -515,11 +693,11 @@ def _which(executable, flags=os.X_OK): return [] for p in os.environ.get('PATH', '').split(os.pathsep): p = os.path.join(p, executable) - if os.access(p, flags): + if _can_allow(p): result.append(p) for e in exts: pext = p + e - if os.access(pext, flags): + if _can_allow(pext): result.append(pext) return result diff --git a/gnupg/_version.py b/gnupg/_version.py index 1a0cb9d..fede8ae 100644 --- a/gnupg/_version.py +++ b/gnupg/_version.py @@ -4,8 +4,8 @@ # unpacked source archive. Distribution tarballs contain a pre-generated copy # of this file. -version_version = '1.3.1' -version_full = '87928205a87afc66779b9760df039642eed291b2' +version_version = '2.0.2' +version_full = '4f1b1f6a8d16df9d4e1f29ba9223f05889131189' def get_versions(default={}, verbose=False): return {'version': version_version, 'full': version_full} diff --git a/gnupg/gnupg.py b/gnupg/gnupg.py index 9aa8232..215233e 100644 --- a/gnupg/gnupg.py +++ b/gnupg/gnupg.py @@ -36,13 +36,7 @@ import os import re import textwrap -try: - from io import StringIO -except ImportError: - from cStringIO import StringIO - #: see :pep:`328` http://docs.python.org/2.5/whatsnew/pep-328.html -from . import _parsers from . import _util from . import _trust from ._meta import GPGBase @@ -66,7 +60,7 @@ class GPG(GPGBase): def __init__(self, binary=None, homedir=None, verbose=False, use_agent=False, keyring=None, secring=None, - options=None): + ignore_homedir_permissions=False, options=None): """Initialize a GnuPG process wrapper. :param str binary: Name for GnuPG binary executable. If the absolute @@ -79,6 +73,10 @@ class GPG(GPGBase): and private keyrings. Default is whatever GnuPG defaults to. + :type ignore_homedir_permissions: :obj:`bool` + :param ignore_homedir_permissions: If true, bypass check that homedir + be writable. + :type verbose: :obj:`str` or :obj:`int` or :obj:`bool` :param verbose: String or numeric value to pass to GnuPG's ``--debug-level`` option. See the GnuPG man page for @@ -123,12 +121,16 @@ class GPG(GPGBase): secring=secring, options=options, verbose=verbose, - use_agent=use_agent,) + use_agent=use_agent, + ignore_homedir_permissions=ignore_homedir_permissions, + ) log.info(textwrap.dedent(""" Initialised settings: binary: %s + binary version: %s homedir: %s + ignore_homedir_permissions: %s keyring: %s secring: %s default_preference_list: %s @@ -136,9 +138,16 @@ class GPG(GPGBase): options: %s verbose: %s use_agent: %s - """ % (self.binary, self.homedir, self.keyring, self.secring, - self.default_preference_list, self.keyserver, self.options, - str(self.verbose), str(self.use_agent)))) + """ % (self.binary, + self.binary_version, + self.homedir, + self.ignore_homedir_permissions, + self.keyring, + self.secring, + self.default_preference_list, + self.keyserver, self.options, + str(self.verbose), + str(self.use_agent)))) self._batch_dir = os.path.join(self.homedir, 'batch-files') self._key_dir = os.path.join(self.homedir, 'generated-keys') @@ -147,58 +156,52 @@ class GPG(GPGBase): self.temp_keyring = None #: The secring used in the most recently created batch file self.temp_secring = None - #: The version string of our GnuPG binary - self.binary_version = str() - - ## check that everything runs alright, and grab the gpg binary's - ## version number while we're at it: - proc = self._open_subprocess(["--list-config", "--with-colons"]) - result = self._result_map['list'](self) - self._read_data(proc.stdout, result) - if proc.returncode: - raise RuntimeError("Error invoking gpg: %s" % result.data) - - version_line = str(result.data).partition(':version:')[2] - self.binary_version = version_line.split('\n')[0] - log.debug("Using GnuPG version %s" % self.binary_version) - if _util._is_gpg2: - # Make GnuPG>=2.0.0-only methods public: - self.fix_trustdb = self._fix_trustdb - self.import_ownertrust = self._import_ownertrust - self.export_ownertrust = self._export_ownertrust + # Make sure that the trustdb exists, or else GnuPG will exit with a + # fatal error (at least it does with GnuPG>=2.0.0): + self.create_trustdb() - # Make sure that the trustdb exists, or else GnuPG will exit with - # a fatal error (at least it does with GnuPG>=2.0.0): - self._create_trustdb() + # The --no-use-agent and --use-agent options were deprecated in GnuPG + # 2.x, so we should set use_agent to None here to avoid having + # GPGBase._make_args() add either one. + if self.is_gpg2(): + self.use_agent = None @functools.wraps(_trust._create_trustdb) - def _create_trustdb(self): + def create_trustdb(self): if self.is_gpg2(): _trust._create_trustdb(self) else: log.info("Creating the trustdb is only available with GnuPG>=2.x") + # For backward compatibility with python-gnupg<=1.3.1: + _create_trustdb = create_trustdb @functools.wraps(_trust.fix_trustdb) - def _fix_trustdb(self, trustdb=None): + def fix_trustdb(self, trustdb=None): if self.is_gpg2(): _trust.fix_trustdb(self) else: log.info("Fixing the trustdb is only available with GnuPG>=2.x") + # For backward compatibility with python-gnupg<=1.3.1: + _fix_trustdb = fix_trustdb @functools.wraps(_trust.import_ownertrust) - def _import_ownertrust(self, trustdb=None): + def import_ownertrust(self, trustdb=None): if self.is_gpg2(): _trust.import_ownertrust(self) else: log.info("Importing ownertrust is only available with GnuPG>=2.x") + # For backward compatibility with python-gnupg<=1.3.1: + _import_ownertrust = import_ownertrust @functools.wraps(_trust.export_ownertrust) - def _export_ownertrust(self, trustdb=None): + def export_ownertrust(self, trustdb=None): if self.is_gpg2(): _trust.export_ownertrust(self) else: log.info("Exporting ownertrust is only available with GnuPG>=2.x") + # For backward compatibility with python-gnupg<=1.3.1: + _export_ownertrust = export_ownertrust def is_gpg1(self): """Returns true if using GnuPG <= 1.x.""" @@ -284,15 +287,13 @@ class GPG(GPGBase): signatures. If using detached signatures, the file containing the detached signature should be specified as the ``sig_file``. - :param file file: A file descriptor object. Its type will be checked - with :func:`_util._is_file`. + :param file file: A file descriptor object. :param str sig_file: A file containing the GPG signature data for ``file``. If given, ``file`` is verified via this detached - signature. + signature. Its type will be checked with :func:`_util._is_file`. """ - fn = None result = self._result_map['verify'](self) if sig_file is None: @@ -307,19 +308,15 @@ class GPG(GPGBase): return result log.debug('verify_file(): Handling detached verification') sig_fh = None - data_fh = None try: sig_fh = open(sig_file, 'rb') - data_fh = open(file, 'rb') args = ["--verify %s -" % sig_fh.name] proc = self._open_subprocess(args) - writer = _util._threaded_copy_data(data_fh, proc.stdin) + writer = _util._threaded_copy_data(file, proc.stdin) self._collect_output(proc, result, writer, stdin=proc.stdin) finally: if sig_fh and not sig_fh.closed: sig_fh.close() - if data_fh and not data_fh.closed: - data_fh.close() return result def import_keys(self, key_data): @@ -488,19 +485,7 @@ class GPG(GPGBase): self._collect_output(p, result, stdin=p.stdin) lines = result.data.decode(self._encoding, self._decode_errors).splitlines() - valid_keywords = 'pub uid sec fpr sub'.split() - for line in lines: - if self.verbose: - print(line) - log.debug("%r", line.rstrip()) - if not line: - break - L = line.strip().split(':') - if not L: - continue - keyword = L[0] - if keyword in valid_keywords: - getattr(result, keyword)(L) + self._parse_keys(result) return result def list_packets(self, raw_data): @@ -521,8 +506,8 @@ class GPG(GPGBase): >>> assert key.fingerprint :rtype: dict - :returns: A dictionary whose keys are the original keyid parameters, - and whose values are lists of signatures. + :returns: res.sigs is a dictionary whose keys are the uids and whose + values are a set of signature keyids. """ if len(keyids) > self._batch_limit: raise ValueError( @@ -537,8 +522,26 @@ class GPG(GPGBase): proc = self._open_subprocess(args) result = self._result_map['list'](self) self._collect_output(proc, result, stdin=proc.stdin) + self._parse_keys(result) return result + def _parse_keys(self, result): + lines = result.data.decode(self._encoding, + self._decode_errors).splitlines() + valid_keywords = 'pub uid sec fpr sub sig'.split() + for line in lines: + if self.verbose: + print(line) + log.debug("%r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + def gen_key(self, input): """Generate a GnuPG key through batch file key generation. See :meth:`GPG.gen_key_input()` for creating the control input. @@ -798,7 +801,7 @@ class GPG(GPGBase): key = key.replace('_','-').title() ## to set 'cert', 'Key-Usage' must be blank string if not key in ('Key-Usage', 'Subkey-Usage'): - if str(val).strip(): + if type('')(val).strip(): parms[key] = val ## if Key-Type is 'default', make Subkey-Type also be 'default' @@ -941,6 +944,13 @@ generate keys. Please see 'The crow flies at midnight.' + :param bool throw_keyids: If True, make all **recipients** keyids be + zero'd out in packet information. This is the same as using + **hidden_recipients** for all **recipients**. (Default: False). + + :param list hidden_recipients: A list of recipients that should have + their keyids zero'd out in packet information. + :param str cipher_algo: The cipher algorithm to use. To see available algorithms with your version of GnuPG, do: :command:`$ gpg --with-colons --list-config ciphername`. @@ -956,7 +966,10 @@ generate keys. Please see .. seealso:: :meth:`._encrypt` """ - stream = _make_binary_stream(data, self._encoding) + if _is_stream(data): + stream = data + else: + stream = _make_binary_stream(data, self._encoding) result = self._encrypt(stream, recipients, **kwargs) stream.close() return result |