summaryrefslogtreecommitdiff
path: root/gnupg
diff options
context:
space:
mode:
Diffstat (limited to 'gnupg')
-rw-r--r--gnupg/_meta.py250
-rw-r--r--gnupg/_parsers.py276
-rw-r--r--gnupg/_trust.py8
-rw-r--r--gnupg/_util.py270
-rw-r--r--gnupg/_version.py4
-rw-r--r--gnupg/gnupg.py139
6 files changed, 725 insertions, 222 deletions
diff --git a/gnupg/_meta.py b/gnupg/_meta.py
index 3aafacd..32ab287 100644
--- a/gnupg/_meta.py
+++ b/gnupg/_meta.py
@@ -32,14 +32,22 @@ import encodings
import locale
import os
import platform
-import psutil
import shlex
import subprocess
import sys
import threading
+## Using psutil is recommended, but since the extension doesn't run with the
+## PyPy interpreter, we'll run even if it's not present.
+try:
+ import psutil
+except ImportError:
+ psutil = None
+
from . import _parsers
from . import _util
+from ._util import b
+from ._util import s
from ._parsers import _check_preferences
from ._parsers import _sanitise_list
@@ -75,19 +83,49 @@ class GPGMeta(type):
instance containing the gpg-agent process' information to
``cls._agent_proc``.
+ For Unix systems, we check that the effective UID of this
+ ``python-gnupg`` process is also the owner of the gpg-agent
+ process. For Windows, we check that the usernames of the owners are
+ the same. (Sorry Windows users; maybe you should switch to anything
+ else.)
+
+ .. note: This function will only run if the psutil_ Python extension
+ is installed. Because psutil won't run with the PyPy interpreter,
+ use of it is optional (although highly recommended).
+
+ .. _psutil: https://pypi.python.org/pypi/psutil
+
:returns: True if there exists a gpg-agent process running under the
same effective user ID as that of this program. Otherwise,
- returns None.
+ returns False.
"""
- identity = psutil.Process(os.getpid()).uids
+ if not psutil:
+ return False
+
+ this_process = psutil.Process(os.getpid())
+ ownership_match = False
+
+ if _util._running_windows:
+ identity = this_process.username()
+ else:
+ identity = this_process.uids
+
for proc in psutil.process_iter():
if (proc.name == "gpg-agent") and proc.is_running:
log.debug("Found gpg-agent process with pid %d" % proc.pid)
- if proc.uids == identity:
- log.debug(
- "Effective UIDs of this process and gpg-agent match")
- setattr(cls, '_agent_proc', proc)
- return True
+ if _util._running_windows:
+ if proc.username() == identity:
+ ownership_match = True
+ else:
+ if proc.uids == identity:
+ ownership_match = True
+
+ if ownership_match:
+ log.debug("Effective UIDs of this process and gpg-agent match")
+ setattr(cls, '_agent_proc', proc)
+ return True
+
+ return False
class GPGBase(object):
@@ -111,7 +149,7 @@ class GPGBase(object):
def __init__(self, binary=None, home=None, keyring=None, secring=None,
use_agent=False, default_preference_list=None,
- verbose=False, options=None):
+ ignore_homedir_permissions=False, verbose=False, options=None):
"""Create a ``GPGBase``.
This class is used to set up properties for controlling the behaviour
@@ -134,13 +172,18 @@ class GPGBase(object):
:ivar str secring: The filename in **homedir** to use as the keyring
file for secret keys.
"""
+ self.ignore_homedir_permissions = ignore_homedir_permissions
self.binary = _util._find_binary(binary)
- self.homedir = home if home else _util._conf
+ self.homedir = os.path.expanduser(home) if home else _util._conf
pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
self.keyring = os.path.join(self._homedir, pub)
self.secring = os.path.join(self._homedir, sec)
- self.options = _parsers._sanitise(options) if options else None
+ self.options = list(_parsers._sanitise_list(options)) if options else None
+
+ #: The version string of our GnuPG binary
+ self.binary_version = '0.0.0'
+ self.verbose = False
if default_preference_list:
self._prefs = _check_preferences(default_preference_list, 'all')
@@ -155,6 +198,14 @@ class GPGBase(object):
self._filesystemencoding = encodings.normalize_encoding(
sys.getfilesystemencoding().lower())
+ # Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49
+ #
+ # During `line = stream.readline()` in `_read_response()`, the Python
+ # codecs module will choke on Unicode data, so we globally monkeypatch
+ # the "strict" error handler to use the builtin `replace_errors`
+ # handler:
+ codecs.register_error('strict', codecs.replace_errors)
+
self._keyserver = 'hkp://wwwkeys.pgp.net'
self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
@@ -164,18 +215,12 @@ class GPGBase(object):
"'verbose' must be boolean, string, or 0 <= n <= 9"
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
if self.options is not None:
- assert isinstance(self.options, str), "options not string"
+ assert isinstance(self.options, list), "options not list"
except (AssertionError, AttributeError) as ae:
log.error("GPGBase.__init__(): %s" % str(ae))
raise RuntimeError(str(ae))
else:
- if verbose is True:
- # The caller wants logging, but we need a valid --debug-level
- # for gpg. Default to "basic", and warn about the ambiguity.
- # (garrettr)
- verbose = "basic"
- log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging')
- self.verbose = verbose
+ self._set_verbose(verbose)
self.use_agent = use_agent
if hasattr(self, '_agent_proc') \
@@ -183,6 +228,9 @@ class GPGBase(object):
if hasattr(self, '__remove_path__'):
self.__remove_path__('pinentry')
+ # Assign our self.binary_version attribute:
+ self._check_sane_and_get_gpg_version()
+
def __remove_path__(self, prog=None, at_exit=True):
"""Remove the directories containing a program from the system's
``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it
@@ -368,18 +416,21 @@ class GPGBase(object):
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
_util._create_if_necessary(hd)
- try:
- log.debug("GPGBase._homedir_setter(): checking permissions")
- assert _util._has_readwrite(hd), \
- "Homedir '%s' needs read/write permissions" % hd
- except AssertionError as ae:
- msg = ("Unable to set '%s' as GnuPG homedir" % directory)
- log.debug("GPGBase.homedir.setter(): %s" % msg)
- log.debug(str(ae))
- raise RuntimeError(str(ae))
- else:
- log.info("Setting homedir to '%s'" % hd)
+ if self.ignore_homedir_permissions:
self._homedir = hd
+ else:
+ try:
+ log.debug("GPGBase._homedir_setter(): checking permissions")
+ assert _util._has_readwrite(hd), \
+ "Homedir '%s' needs read/write permissions" % hd
+ except AssertionError as ae:
+ msg = ("Unable to set '%s' as GnuPG homedir" % directory)
+ log.debug("GPGBase.homedir.setter(): %s" % msg)
+ log.debug(str(ae))
+ raise RuntimeError(str(ae))
+ else:
+ log.info("Setting homedir to '%s'" % hd)
+ self._homedir = hd
homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
@@ -436,6 +487,24 @@ class GPGBase(object):
_generated_keys = _util.InheritableProperty(_generated_keys_getter,
_generated_keys_setter)
+ def _check_sane_and_get_gpg_version(self):
+ """Check that everything runs alright, and grab the gpg binary's
+ version number while we're at it, storing it as :data:`binary_version`.
+
+ :raises RuntimeError: if we cannot invoke the gpg binary.
+ """
+ proc = self._open_subprocess(["--list-config", "--with-colons"])
+ result = self._result_map['list'](self)
+ self._read_data(proc.stdout, result)
+ if proc.returncode:
+ raise RuntimeError("Error invoking gpg: %s" % result.data)
+ else:
+ proc.terminate()
+
+ version_line = str(result.data).partition(':version:')[2]
+ self.binary_version = version_line.split('\n')[0]
+ log.debug("Using GnuPG version %s" % self.binary_version)
+
def _make_args(self, args, passphrase=False):
"""Make a list of command line elements for GPG.
@@ -470,21 +539,29 @@ class GPGBase(object):
if passphrase: cmd.append('--batch --passphrase-fd 0')
- if self.use_agent: cmd.append('--use-agent')
- else: cmd.append('--no-use-agent')
+ if self.use_agent is True: cmd.append('--use-agent')
+ elif self.use_agent is False: cmd.append('--no-use-agent')
+
+ # The arguments for debugging and verbosity should be placed into the
+ # cmd list before the options/args in order to resolve Issue #76:
+ # https://github.com/isislovecruft/python-gnupg/issues/76
+ if self.verbose:
+ cmd.append('--debug-all')
+
+ if (isinstance(self.verbose, str) or
+ (isinstance(self.verbose, int) and (self.verbose >= 1))):
+ # GnuPG<=1.4.18 parses the `--debug-level` command in a way
+ # that is incompatible with all other GnuPG versions. :'(
+ if self.binary_version and (self.binary_version <= '1.4.18'):
+ cmd.append('--debug-level=%s' % self.verbose)
+ else:
+ cmd.append('--debug-level %s' % self.verbose)
if self.options:
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
if args:
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
- if self.verbose:
- cmd.append('--debug-all')
- if ((isinstance(self.verbose, str) and
- self.verbose in ['basic', 'advanced', 'expert', 'guru'])
- or (isinstance(self.verbose, int) and (1<=self.verbose<=9))):
- cmd.append('--debug-level %s' % self.verbose)
-
return cmd
def _open_subprocess(self, args=None, passphrase=False):
@@ -592,6 +669,36 @@ class GPGBase(object):
log.debug("Finishing reading from stream %r..." % stream.__repr__())
log.debug("Read %4d bytes total" % len(result.data))
+ def _set_verbose(self, verbose):
+ """Check and set our :data:`verbose` attribute.
+ The debug-level must be a string or an integer. If it is one of
+ the allowed strings, GnuPG will translate it internally to it's
+ corresponding integer level:
+
+ basic = 1-2
+ advanced = 3-5
+ expert = 6-8
+ guru = 9+
+
+ If it's not one of the recognised string levels, then then
+ entire argument is ignored by GnuPG. :(
+
+ To fix that stupid behaviour, if they wanted debugging but typo'd
+ the string level (or specified ``verbose=True``), we'll default to
+ 'basic' logging.
+ """
+ string_levels = ('basic', 'advanced', 'expert', 'guru')
+
+ if verbose is True:
+ # The caller wants logging, but we need a valid --debug-level
+ # for gpg. Default to "basic", and warn about the ambiguity.
+ verbose = 'basic'
+
+ if (isinstance(verbose, str) and not (verbose in string_levels)):
+ verbose = 'basic'
+
+ self.verbose = verbose
+
def _collect_output(self, process, result, writer=None, stdin=None):
"""Drain the subprocesses output streams, writing the collected output
to the result. If a writer thread (writing to the subprocess) is given,
@@ -699,6 +806,19 @@ class GPGBase(object):
## We could use _handle_io here except for the fact that if the
## passphrase is bad, gpg bails and you can't write the message.
result = self._result_map['sign'](self)
+
+ ## If the passphrase is an empty string, the message up to and
+ ## including its first newline will be cut off before making it to the
+ ## GnuPG process. Therefore, if the passphrase='' or passphrase=b'',
+ ## we set passphrase=None. See Issue #82:
+ ## https://github.com/isislovecruft/python-gnupg/issues/82
+ if _util._is_string(passphrase):
+ passphrase = passphrase if len(passphrase) > 0 else None
+ elif _util._is_bytes(passphrase):
+ passphrase = s(passphrase) if len(passphrase) > 0 else None
+ else:
+ passphrase = None
+
proc = self._open_subprocess(args, passphrase is not None)
try:
if passphrase:
@@ -718,6 +838,8 @@ class GPGBase(object):
symmetric=False,
always_trust=True,
output=None,
+ throw_keyids=False,
+ hidden_recipients=None,
cipher_algo='AES256',
digest_algo='SHA512',
compress_algo='ZLIB'):
@@ -790,6 +912,14 @@ class GPGBase(object):
>>> decrypted
'The crow flies at midnight.'
+
+ :param bool throw_keyids: If True, make all **recipients** keyids be
+ zero'd out in packet information. This is the same as using
+ **hidden_recipients** for all **recipients**. (Default: False).
+
+ :param list hidden_recipients: A list of recipients that should have
+ their keyids zero'd out in packet information.
+
:param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config
@@ -841,6 +971,7 @@ class GPGBase(object):
## is decryptable with a passphrase or secretkey.
if symmetric: args.append('--symmetric')
if encrypt: args.append('--encrypt')
+ if throw_keyids: args.append('--throw-keyids')
if len(recipients) >= 1:
log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
@@ -856,39 +987,54 @@ class GPGBase(object):
log.info("Can't accept recipient string: %s"
% recp)
else:
- args.append('--recipient %s' % str(recp))
+ self._add_recipient_string(args, hidden_recipients, str(recp))
continue
## will give unicode in 2.x as '\uXXXX\uXXXX'
- args.append('--recipient %r' % recp)
+ if isinstance(hidden_recipients, (list, tuple)):
+ if [s for s in hidden_recipients if recp in str(s)]:
+ args.append('--hidden-recipient %r' % recp)
+ else:
+ args.append('--recipient %r' % recp)
+ else:
+ args.append('--recipient %r' % recp)
continue
if isinstance(recp, str):
- args.append('--recipient %s' % recp)
+ self._add_recipient_string(args, hidden_recipients, recp)
elif (not _util._py3k) and isinstance(recp, basestring):
for recp in recipients.split('\x20'):
- args.append('--recipient %s' % recp)
+ self._add_recipient_string(args, hidden_recipients, recp)
elif _util._py3k and isinstance(recp, str):
for recp in recipients.split(' '):
- args.append('--recipient %s' % recp)
+ self._add_recipient_string(args, hidden_recipients, recp)
## ...and now that we've proven py3k is better...
-
else:
- log.debug("Don't know what to do with recipients: '%s'"
+ log.debug("Don't know what to do with recipients: %r"
% recipients)
result = self._result_map['crypt'](self)
- log.debug("Got data '%s' with type '%s'."
- % (data, type(data)))
- self._handle_io(args, data, result,
- passphrase=passphrase, binary=True)
- log.debug("\n%s" % result.data)
+ log.debug("Got data '%s' with type '%s'." % (data, type(data)))
+ self._handle_io(args, data, result, passphrase=passphrase, binary=True)
+ # Avoid writing raw encrypted bytes to terminal loggers and breaking
+ # them in that adorable way where they spew hieroglyphics until reset:
+ if armor:
+ log.debug("\n%s" % result.data)
if output_filename:
log.info("Writing encrypted output to file: %s" % output_filename)
- with open(output_filename, 'w+') as fh:
+ with open(output_filename, 'wb') as fh:
fh.write(result.data)
fh.flush()
log.info("Encrypted output written successfully.")
return result
+
+ def _add_recipient_string(self, args, hidden_recipients, recipient):
+ if isinstance(hidden_recipients, (list, tuple)):
+ if [s for s in hidden_recipients if recipient in str(s)]:
+ args.append('--hidden-recipient %s' % recipient)
+ else:
+ args.append('--recipient %s' % recipient)
+ else:
+ args.append('--recipient %s' % recipient)
diff --git a/gnupg/_parsers.py b/gnupg/_parsers.py
index 2e1767e..9de57d2 100644
--- a/gnupg/_parsers.py
+++ b/gnupg/_parsers.py
@@ -367,7 +367,7 @@ def _sanitise(*args):
checked += (val + " ")
log.debug("_check_option(): No checks for %s" % val)
- return checked
+ return checked.rstrip(' ')
is_flag = lambda x: x.startswith('--')
@@ -475,6 +475,8 @@ def _get_options_group(group=None):
'--export-secret-subkeys',
'--fingerprint',
'--gen-revoke',
+ '--hidden-encrypt-to',
+ '--hidden-recipient',
'--list-key',
'--list-keys',
'--list-public-keys',
@@ -514,6 +516,7 @@ def _get_options_group(group=None):
'--import',
'--verify',
'--verify-files',
+ '--output',
])
#: These options expect a string. see :func:`_check_preferences`.
pref_options = frozenset(['--digest-algo',
@@ -555,6 +558,9 @@ def _get_options_group(group=None):
'--list-public-keys',
'--list-secret-keys',
'--list-sigs',
+ '--lock-multiple',
+ '--lock-never',
+ '--lock-once',
'--no-default-keyring',
'--no-default-recipient',
'--no-emit-version',
@@ -566,6 +572,7 @@ def _get_options_group(group=None):
'--quiet',
'--sign',
'--symmetric',
+ '--throw-keyids',
'--use-agent',
'--verbose',
'--version',
@@ -905,6 +912,7 @@ class Sign(object):
timestamp = None
#: xxx fill me in
what = None
+ status = None
def __init__(self, gpg):
self._gpg = gpg
@@ -927,9 +935,9 @@ class Sign(object):
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
- "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL",
- "INV_SGNR", "SIGEXPIRED"):
- pass
+ "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED",
+ "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"):
+ self.status = key.replace("_", " ").lower()
elif key == "SIG_CREATED":
(self.sig_type, self.sig_algo, self.sig_hash_algo,
self.what, self.timestamp, self.fingerprint) = value.split()
@@ -946,6 +954,7 @@ class Sign(object):
else:
raise ValueError("Unknown status message: %r" % key)
+
class ListKeys(list):
"""Handle status messages for --list-keys.
@@ -956,7 +965,6 @@ class ListKeys(list):
| crs = X.509 certificate and private key available
| ssb = secret subkey (secondary key)
| uat = user attribute (same as user id except for field 10).
- | sig = signature
| rev = revocation signature
| pkd = public key data (special field format, see below)
| grp = reserved for gpgsm
@@ -967,8 +975,10 @@ class ListKeys(list):
super(ListKeys, self).__init__()
self._gpg = gpg
self.curkey = None
+ self.curuid = None
self.fingerprints = []
self.uids = []
+ self.sigs = {}
def key(self, args):
vars = ("""
@@ -978,8 +988,12 @@ class ListKeys(list):
for i in range(len(vars)):
self.curkey[vars[i]] = args[i]
self.curkey['uids'] = []
+ self.curkey['sigs'] = {}
if self.curkey['uid']:
- self.curkey['uids'].append(self.curkey['uid'])
+ self.curuid = self.curkey['uid']
+ self.curkey['uids'].append(self.curuid)
+ self.sigs[self.curuid] = set()
+ self.curkey['sigs'][self.curuid] = []
del self.curkey['uid']
self.curkey['subkeys'] = []
self.append(self.curkey)
@@ -994,8 +1008,21 @@ class ListKeys(list):
uid = args[9]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
self.curkey['uids'].append(uid)
+ self.curuid = uid
+ self.curkey['sigs'][uid] = []
+ self.sigs[uid] = set()
self.uids.append(uid)
+ def sig(self, args):
+ vars = ("""
+ type trust length algo keyid date expires dummy ownertrust uid
+ """).split()
+ sig = {}
+ for i in range(len(vars)):
+ sig[vars[i]] = args[i]
+ self.curkey['sigs'][self.curuid].append(sig)
+ self.sigs[self.curuid].add(sig['keyid'])
+
def sub(self, args):
subkey = [args[4], args[11]]
self.curkey['subkeys'].append(subkey)
@@ -1005,42 +1032,52 @@ class ListKeys(list):
class ImportResult(object):
- """Parse GnuPG status messages for key import operations.
-
- :type gpg: :class:`gnupg.GPG`
- :param gpg: An instance of :class:`gnupg.GPG`.
- """
- _ok_reason = {'0': 'Not actually changed',
- '1': 'Entirely new key',
- '2': 'New user IDs',
- '4': 'New signatures',
- '8': 'New subkeys',
- '16': 'Contains private key',
- '17': 'Contains private key',}
-
- _problem_reason = { '0': 'No specific reason given',
- '1': 'Invalid Certificate',
- '2': 'Issuer Certificate missing',
- '3': 'Certificate Chain too long',
- '4': 'Error storing certificate', }
-
- _fields = '''count no_user_id imported imported_rsa unchanged
- n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
- not_imported'''.split()
- _counts = OrderedDict(
- zip(_fields, [int(0) for x in range(len(_fields))]) )
-
- #: A list of strings containing the fingerprints of the GnuPG keyIDs
- #: imported.
- fingerprints = list()
-
- #: A list containing dictionaries with information gathered on keys
- #: imported.
- results = list()
+ """Parse GnuPG status messages for key import operations."""
def __init__(self, gpg):
+ """Start parsing the results of a key import operation.
+
+ :type gpg: :class:`gnupg.GPG`
+ :param gpg: An instance of :class:`gnupg.GPG`.
+ """
self._gpg = gpg
- self.counts = self._counts
+
+ #: A map from GnuPG codes shown with the ``IMPORT_OK`` status message
+ #: to their human-meaningful English equivalents.
+ self._ok_reason = {'0': 'Not actually changed',
+ '1': 'Entirely new key',
+ '2': 'New user IDs',
+ '4': 'New signatures',
+ '8': 'New subkeys',
+ '16': 'Contains private key',
+ '17': 'Contains private key',}
+
+ #: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status
+ #: message to their human-meaningful English equivalents.
+ self._problem_reason = { '0': 'No specific reason given',
+ '1': 'Invalid Certificate',
+ '2': 'Issuer Certificate missing',
+ '3': 'Certificate Chain too long',
+ '4': 'Error storing certificate', }
+
+ #: All the possible status messages pertaining to actions taken while
+ #: importing a key.
+ self._fields = '''count no_user_id imported imported_rsa unchanged
+ n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
+ not_imported'''.split()
+
+ #: Counts of all the status message results, :data:`_fields` which
+ #: have appeared.
+ self.counts = OrderedDict(
+ zip(self._fields, [int(0) for x in range(len(self._fields))]))
+
+ #: A list of strings containing the fingerprints of the GnuPG keyIDs
+ #: imported.
+ self.fingerprints = list()
+
+ #: A list containing dictionaries with information gathered on keys
+ #: imported.
+ self.results = list()
def __nonzero__(self):
"""Override the determination for truthfulness evaluation.
@@ -1048,7 +1085,7 @@ class ImportResult(object):
:rtype: bool
:returns: True if we have immport some keys, False otherwise.
"""
- if self.counts.not_imported > 0: return False
+ if self.counts['not_imported'] > 0: return False
if len(self.fingerprints) == 0: return False
return True
__bool__ = __nonzero__
@@ -1056,7 +1093,7 @@ class ImportResult(object):
def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process.
- :raises: :exc:`~exceptions.ValueError` if the status message is unknown.
+ :raises ValueError: if the status message is unknown.
"""
if key == "IMPORTED":
# this duplicates info we already see in import_ok & import_problem
@@ -1189,6 +1226,37 @@ class Verify(object):
self.trust_level = None
#: The string corresponding to the ``trust_level`` number.
self.trust_text = None
+ #: The subpackets. These are stored as a dictionary, in the following
+ #: form:
+ #: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS,
+ #: 'length': LENGTH,
+ #: 'data': DATA},
+ #: 'ANOTHER_SUBPACKET_NUMBER': {...}}
+ self.subpackets = {}
+ #: The signature or key notations. These are also stored as a
+ #: dictionary, in the following form:
+ #:
+ #: Verify.notations = {NOTATION_NAME: NOTATION_DATA}
+ #:
+ #: For example, the Bitcoin core developer, Peter Todd, encodes in
+ #: every signature the header of the latest block on the Bitcoin
+ #: blockchain (to prove that a GnuPG signature that Peter made was made
+ #: *after* a specific point in time). These look like:
+ #:
+ #: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a
+ #:
+ #: Which python-gnupg would store as:
+ #:
+ #: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a'
+ self.notations = {}
+
+ #: This will be a str or None. If not None, it is the last
+ #: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're
+ #: not assured that a ``NOTATION_DATA`` status will arrive *immediately*
+ #: after its corresponding ``NOTATION_NAME``, we store the latest
+ #: ``NOTATION_NAME`` here until we get its corresponding
+ #: ``NOTATION_DATA``.
+ self._last_notation_name = None
def __nonzero__(self):
"""Override the determination for truthfulness evaluation.
@@ -1209,7 +1277,8 @@ class Verify(object):
self.trust_level = self.TRUST_LEVELS[key]
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
- "DECRYPTION_OKAY", "INV_SGNR"):
+ "DECRYPTION_OKAY", "INV_SGNR", "PROGRESS",
+ "PINENTRY_LAUNCHED"):
pass
elif key == "BADSIG":
self.valid = False
@@ -1220,6 +1289,7 @@ class Verify(object):
self.status = 'signature good'
self.key_id, self.username = value.split(None, 1)
elif key == "VALIDSIG":
+ self.valid = True
(self.fingerprint,
self.creation_date,
self.sig_timestamp,
@@ -1245,17 +1315,106 @@ class Verify(object):
self.valid = False
self.key_id = value
self.status = 'no public key'
+ # These are useless in Verify, since they are spit out for any
+ # pub/subkeys on the key, not just the one doing the signing.
+ # if we want to check for signatures make with expired key,
+ # the relevant flags are REVKEYSIG and KEYREVOKED.
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
- # these are useless in verify, since they are spit out for any
- # pub/subkeys on the key, not just the one doing the signing.
- # if we want to check for signatures with expired key,
- # the relevant flag is EXPKEYSIG.
pass
+ # The signature has an expiration date which has already passed
+ # (EXPKEYSIG), or the signature has been revoked (REVKEYSIG):
elif key in ("EXPKEYSIG", "REVKEYSIG"):
- # signed with expired or revoked key
self.valid = False
self.key_id = value.split()[0]
self.status = (('%s %s') % (key[:3], key[3:])).lower()
+ # This is super annoying, and bad design on the part of GnuPG, in my
+ # opinion.
+ #
+ # This flag can get triggered if a valid signature is made, and then
+ # later the key (or subkey) which created the signature is
+ # revoked. When this happens, GnuPG will output:
+ #
+ # REVKEYSIG 075BFD18B365D34C Test Expired Key <test@python-gnupg.git>
+ # VALIDSIG DAB69B05F591640B7F4DCBEA075BFD18B365D34C 2014-09-26 1411700539 0 4 0 1 2 00 4BA800F77452A6C29447FF20F4AF76ACBBE22CE2
+ # KEYREVOKED
+ #
+ # Meaning that we have a timestamp for when the signature was created,
+ # and we know that the signature is valid, but since GnuPG gives us no
+ # timestamp for when the key was revoked... we have no ability to
+ # determine if the valid signature was made *before* the signing key
+ # was revoked or *after*. Meaning that if you are like me and you sign
+ # all your software releases and git commits, and you also practice
+ # good opsec by doing regular key rotations, your old signatures made
+ # by your expired/revoked keys (even though they were created when the
+ # key was still good) are considered bad because GnuPG is a
+ # braindamaged piece of shit.
+ #
+ # Software engineering, motherfuckers, DO YOU SPEAK IT?
+ #
+ # The signing key which created the signature has since been revoked
+ # (KEYREVOKED), and we're going to ignore it (but add something to the
+ # status message):
+ elif key in ("KEYREVOKED"):
+ self.status = '\n'.join([self.status, "key revoked"])
+ # SIG_SUBPACKET <type> <flags> <len> <data>
+ # This indicates that a signature subpacket was seen. The format is
+ # the same as the "spk" record above.
+ #
+ # [...]
+ #
+ # SPK - Signature subpacket records
+ #
+ # - Field 2 :: Subpacket number as per RFC-4880 and later.
+ # - Field 3 :: Flags in hex. Currently the only two bits assigned
+ # are 1, to indicate that the subpacket came from the
+ # hashed part of the signature, and 2, to indicate the
+ # subpacket was marked critical.
+ # - Field 4 :: Length of the subpacket. Note that this is the
+ # length of the subpacket, and not the length of field
+ # 5 below. Due to the need for %-encoding, the length
+ # of field 5 may be up to 3x this value.
+ # - Field 5 :: The subpacket data. Printable ASCII is shown as
+ # ASCII, but other values are rendered as %XX where XX
+ # is the hex value for the byte.
+ elif key in ("SIG_SUBPACKET"):
+ fields = value.split()
+ try:
+ subpacket_number = fields[0]
+ self.subpackets[subpacket_number] = {'flags': None,
+ 'length': None,
+ 'data': None}
+ except IndexError:
+ # We couldn't parse the subpacket type (an RFC4880
+ # identifier), so we shouldn't continue parsing.
+ pass
+ else:
+ # Pull as much data as we can parse out of the subpacket:
+ try:
+ self.subpackets[subpacket_number]['flags'] = fields[1]
+ self.subpackets[subpacket_number]['length'] = fields[2]
+ self.subpackets[subpacket_number]['data'] = fields[3]
+ except IndexError:
+ pass
+ # NOTATION_
+ # There are actually two related status codes to convey notation
+ # data:
+ #
+ # - NOTATION_NAME <name>
+ # - NOTATION_DATA <string>
+ #
+ # <name> and <string> are %XX escaped; the data may be split among
+ # several NOTATION_DATA lines.
+ elif key.startswith("NOTATION_"):
+ if key.endswith("NAME"):
+ self.notations[value] = str()
+ self._last_notation_name = value
+ elif key.endswith("DATA"):
+ if self._last_notation_name is not None:
+ # Append the NOTATION_DATA to any previous data we
+ # received for that NOTATION_NAME:
+ self.notations[self._last_notation_name] += value
+ else:
+ pass
else:
raise ValueError("Unknown status message: %r" % key)
@@ -1360,26 +1519,33 @@ class ListPackets(object):
self.need_passphrase_sym = None
#: The keyid and uid which this data is encrypted to.
self.userid_hint = None
+ #: The first key that we detected that a message was encrypted
+ #: to. This is provided for backwards compatibility. As of Issue #77_,
+ #: the ``encrypted_to`` attribute should be used instead.
+ self.key = None
+ #: A list of keyid's that the message has been encrypted to.
+ self.encrypted_to = []
def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
- if key == 'NODATA':
+ if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
+ 'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'):
+ pass
+ elif key == 'NODATA':
self.status = nodata(value)
elif key == 'ENC_TO':
- # This will only capture keys in our keyring. In the future we
- # may want to include multiple unknown keys in this list.
- self.key, _, _ = value.split()
- elif key == 'NEED_PASSPHRASE':
+ key, _, _ = value.split()
+ if not self.key:
+ self.key = key
+ self.encrypted_to.append(key)
+ elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'):
self.need_passphrase = True
elif key == 'NEED_PASSPHRASE_SYM':
self.need_passphrase_sym = True
elif key == 'USERID_HINT':
self.userid_hint = value.strip().split()
- elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
- 'END_DECRYPTION'):
- pass
else:
raise ValueError("Unknown status message: %r" % key)
diff --git a/gnupg/_trust.py b/gnupg/_trust.py
index 514ae8c..224e7b6 100644
--- a/gnupg/_trust.py
+++ b/gnupg/_trust.py
@@ -57,7 +57,7 @@ def export_ownertrust(cls, trustdb=None):
except (OSError, IOError) as err:
log.debug(str(err))
- export_proc = cls._open_subprocess('--export-ownertrust')
+ export_proc = cls._open_subprocess(['--export-ownertrust'])
tdb = open(trustdb, 'wb')
_util._threaded_copy_data(export_proc.stdout, tdb)
@@ -71,7 +71,7 @@ def import_ownertrust(self, trustdb=None):
if trustdb is None:
trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
- import_proc = cls._open_subprocess('--import-ownertrust')
+ import_proc = cls._open_subprocess(['--import-ownertrust'])
tdb = open(trustdb, 'rb')
_util._threaded_copy_data(tdb, import_proc.stdin)
@@ -98,6 +98,6 @@ def fix_trustdb(cls, trustdb=None):
"""
if trustdb is None:
trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
- export_proc = cls._open_subprocess('--export-ownertrust')
- import_proc = cls._open_subprocess('--import-ownertrust')
+ export_proc = cls._open_subprocess(['--export-ownertrust'])
+ import_proc = cls._open_subprocess(['--import-ownertrust'])
_util._threaded_copy_data(export_proc.stdout, import_proc.stdin)
diff --git a/gnupg/_util.py b/gnupg/_util.py
index e1e14ab..79855ac 100644
--- a/gnupg/_util.py
+++ b/gnupg/_util.py
@@ -28,18 +28,58 @@ from time import mktime
import codecs
import encodings
import os
-import psutil
import threading
import random
import re
import string
import sys
+# These are all the classes which are stream-like; they are used in
+# :func:`_is_stream`.
+_STREAMLIKE_TYPES = []
+
+# These StringIO classes are actually utilised.
try:
+ import io
from io import StringIO
from io import BytesIO
except ImportError:
from cStringIO import StringIO
+else:
+ # The io.IOBase type covers the above example for an open file handle in
+ # Python3, as well as both io.BytesIO and io.StringIO.
+ _STREAMLIKE_TYPES.append(io.IOBase)
+
+# The remaining StringIO classes which are imported are used to determine if a
+# object is a stream-like in :func:`_is_stream`.
+if 2 == sys.version_info[0]:
+ # Import the StringIO class from the StringIO module since it is a
+ # commonly used stream class. It is distinct from either of the
+ # StringIO's that may be loaded in the above try/except clause, so the
+ # name is prefixed with an underscore to distinguish it.
+ from StringIO import StringIO as _StringIO_StringIO
+ _STREAMLIKE_TYPES.append(_StringIO_StringIO)
+
+ # Import the cStringIO module to test for the cStringIO stream types,
+ # InputType and OutputType. See
+ # http://stackoverflow.com/questions/14735295/to-check-an-instance-is-stringio
+ import cStringIO as _cStringIO
+ _STREAMLIKE_TYPES.append(_cStringIO.InputType)
+ _STREAMLIKE_TYPES.append(_cStringIO.OutputType)
+
+ # In Python2:
+ #
+ # >>> type(open('README.md', 'rb'))
+ # <open file 'README.md', mode 'rb' at 0x7f9493951d20>
+ #
+ # whereas, in Python3, the `file` builtin doesn't exist and instead we get:
+ #
+ # >>> type(open('README.md', 'rb'))
+ # <_io.BufferedReader name='README.md'>
+ #
+ # which is covered by the above addition of io.IOBase.
+ _STREAMLIKE_TYPES.append(file)
+
from . import _logger
@@ -56,6 +96,9 @@ try:
except NameError:
_py3k = True
+_running_windows = False
+if "win" in sys.platform:
+ _running_windows = True
## Directory shortcuts:
## we don't want to use this one because it writes to the install dir:
@@ -63,6 +106,20 @@ except NameError:
_here = os.path.join(os.getcwd(), 'gnupg') ## current dir
_test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp
_user = os.environ.get('HOME') ## $HOME
+
+# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
+# environs. https://github.com/isislovecruft/python-gnupg/issues/74
+if not _user:
+ _user = '/tmp/python-gnupg'
+ try:
+ os.makedirs(_user)
+ except (OSError, IOError):
+ _user = os.getcwd()
+ # If we can't use $HOME, but we have (or can create) a
+ # /tmp/python-gnupg/gnupghome directory, then we'll default to using
+ # that. Otherwise, we'll use the current directory + /gnupghome.
+ _user = os.path.sep.join([_user, 'gnupghome'])
+
_ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg
_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## $HOME/.config/python-gnupg
@@ -70,6 +127,9 @@ _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## Logger is disabled by default
log = _logger.create_logger(0)
+#: Compiled regex for determining a GnuPG binary's version:
+_VERSION_STRING_REGEX = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
+
def find_encodings(enc=None, system=False):
"""Find functions for encoding translations for a specific codec.
@@ -105,6 +165,51 @@ def find_encodings(enc=None, system=False):
return coder
+
+if _py3k:
+ def b(x):
+ """See http://python3porting.com/problems.html#nicer-solutions"""
+ coder = find_encodings()
+ if isinstance(x, bytes):
+ return coder.encode(x.decode(coder.name))[0]
+ else:
+ return coder.encode(x)[0]
+
+ def s(x):
+ if isinstance(x, str):
+ return x
+ elif isinstance(x, (bytes, bytearray)):
+ return x.decode(find_encodings().name)
+ else:
+ raise NotImplemented
+else:
+ def b(x):
+ """See http://python3porting.com/problems.html#nicer-solutions"""
+ return x
+
+ def s(x):
+ if isinstance(x, basestring):
+ return x
+ elif isinstance(x, (bytes, bytearray)):
+ return x.decode(find_encodings().name)
+ else:
+ raise NotImplemented
+
+def binary(data):
+ coder = find_encodings()
+
+ if _py3k and isinstance(data, bytes):
+ encoded = coder.encode(data.decode(coder.name))[0]
+ elif _py3k and isinstance(data, str):
+ encoded = coder.encode(data)[0]
+ elif not _py3k and type(data) is not str:
+ encoded = coder.encode(data)[0]
+ else:
+ encoded = data
+
+ return encoded
+
+
def author_info(name, contact=None, public_key=None):
"""Easy object-oriented representation of contributor info.
@@ -124,8 +229,6 @@ def _copy_data(instream, outstream):
"""
sent = 0
- coder = find_encodings()
-
while True:
if ((_py3k and isinstance(instream, str)) or
(not _py3k and isinstance(instream, basestring))):
@@ -135,24 +238,64 @@ def _copy_data(instream, outstream):
data = instream.read(1024)
if len(data) == 0:
break
+
sent += len(data)
- log.debug("Sending chunk %d bytes:\n%s"
- % (sent, data))
- try:
- outstream.write(data)
- except UnicodeError:
+ encoded = binary(data)
+ log.debug("Sending %d bytes of data..." % sent)
+ log.debug("Encoded data (type %s):\n%s" % (type(encoded), encoded))
+
+ if not _py3k:
try:
- outstream.write(coder.encode(data))
- except IOError:
- log.exception("Error sending data: Broken pipe")
+ outstream.write(encoded)
+ except IOError as ioe:
+ # Can get 'broken pipe' errors even when all data was sent
+ if 'Broken pipe' in str(ioe):
+ log.error('Error sending data: Broken pipe')
+ else:
+ log.exception(ioe)
break
- except IOError as ioe:
- # Can get 'broken pipe' errors even when all data was sent
- if 'Broken pipe' in str(ioe):
- log.error('Error sending data: Broken pipe')
else:
- log.exception(ioe)
- break
+ log.debug("Wrote data type <type 'str'> to outstream.")
+ else:
+ try:
+ outstream.write(bytes(encoded))
+ except TypeError as te:
+ # XXX FIXME This appears to happen because
+ # _threaded_copy_data() sometimes passes the `outstream` as an
+ # object with type <_io.BufferredWriter> and at other times
+ # with type <encodings.utf_8.StreamWriter>. We hit the
+ # following error when the `outstream` has type
+ # <encodings.utf_8.StreamWriter>.
+ if not "convert 'bytes' object to str implicitly" in str(te):
+ log.error(str(te))
+ try:
+ outstream.write(encoded.decode())
+ except TypeError as yate:
+ # We hit the "'str' does not support the buffer interface"
+ # error in Python3 when the `outstream` is an io.BytesIO and
+ # we try to write a str to it. We don't care about that
+ # error, we'll just try again with bytes.
+ if not "does not support the buffer interface" in str(yate):
+ log.error(str(yate))
+ except IOError as ioe:
+ # Can get 'broken pipe' errors even when all data was sent
+ if 'Broken pipe' in str(ioe):
+ log.error('Error sending data: Broken pipe')
+ else:
+ log.exception(ioe)
+ break
+ else:
+ log.debug("Wrote data type <class 'str'> outstream.")
+ except IOError as ioe:
+ # Can get 'broken pipe' errors even when all data was sent
+ if 'Broken pipe' in str(ioe):
+ log.error('Error sending data: Broken pipe')
+ else:
+ log.exception(ioe)
+ break
+ else:
+ log.debug("Wrote data type <class 'bytes'> to outstream.")
+
try:
outstream.close()
except IOError as ioe:
@@ -260,6 +403,8 @@ def _find_binary(binary=None):
"""
found = None
if binary is not None:
+ if os.path.isabs(binary) and os.path.isfile(binary):
+ return binary
if not os.path.isabs(binary):
try:
found = _which(binary)
@@ -272,7 +417,7 @@ def _find_binary(binary=None):
elif os.access(binary, os.X_OK):
found = binary
if found is None:
- try: found = _which('gpg')[0]
+ try: found = _which('gpg', abspath_only=True, disallow_symlinks=True)[0]
except IndexError as ie:
log.error("Could not find binary for 'gpg'.")
try: found = _which('gpg2')[0]
@@ -281,14 +426,7 @@ def _find_binary(binary=None):
if found is None:
raise RuntimeError("GnuPG is not installed!")
- try:
- assert os.path.isabs(found), "Path to gpg binary not absolute"
- assert not os.path.islink(found), "Path to gpg binary is symlink"
- assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary"
- except (AssertionError, AttributeError) as ae:
- log.error(str(ae))
- else:
- return found
+ return found
def _has_readwrite(path):
"""
@@ -335,7 +473,32 @@ def _is_stream(input):
:rtype: bool
:returns: True if :param:input is a stream, False if otherwise.
"""
- return isinstance(input, BytesIO) or isinstance(input, StringIO)
+ return isinstance(input, tuple(_STREAMLIKE_TYPES))
+
+def _is_string(thing):
+ """Check that **thing** is a string. The definition of the latter depends
+ upon the Python version.
+
+ :param thing: The thing to check if it's a string.
+ :rtype: bool
+ :returns: ``True`` if **thing** is string (or unicode in Python2).
+ """
+ if (_py3k and isinstance(thing, str)):
+ return True
+ if (not _py3k and isinstance(thing, basestring)):
+ return True
+ return False
+
+def _is_bytes(thing):
+ """Check that **thing** is bytes.
+
+ :param thing: The thing to check if it's bytes.
+ :rtype: bool
+ :returns: ``True`` if **thing** is bytes or a bytearray.
+ """
+ if isinstance(thing, (bytes, bytearray)):
+ return True
+ return False
def _is_list_or_tuple(instance):
"""Check that ``instance`` is a list or tuple.
@@ -368,21 +531,26 @@ def _is_gpg2(version):
return True
return False
-def _make_binary_stream(s, encoding):
- """
- xxx fill me in
+def _make_binary_stream(thing, encoding=None, armor=True):
+ """Encode **thing**, then make it stream/file-like.
+
+ :param thing: The thing to turn into a encoded stream.
+ :rtype: ``io.BytesIO`` or ``io.StringIO``.
+ :returns: The encoded **thing**, wrapped in an ``io.BytesIO`` (if
+ available), otherwise wrapped in a ``io.StringIO``.
"""
+ if _py3k:
+ if isinstance(thing, str):
+ thing = thing.encode(encoding)
+ else:
+ if type(thing) is not str:
+ thing = thing.encode(encoding)
+
try:
- if _py3k:
- if isinstance(s, str):
- s = s.encode(encoding)
- else:
- if type(s) is not str:
- s = s.encode(encoding)
- from io import BytesIO
- rv = BytesIO(s)
- except ImportError:
- rv = StringIO(s)
+ rv = BytesIO(thing)
+ except NameError:
+ rv = StringIO(thing)
+
return rv
def _make_passphrase(length=None, save=False, file=None):
@@ -403,7 +571,7 @@ def _make_passphrase(length=None, save=False, file=None):
passphrase = _make_random_string(length)
if save:
- ruid, euid, suid = psutil.Process(os.getpid()).uids
+ ruid, euid, suid = os.getresuid()
gid = os.getgid()
now = mktime(localtime())
@@ -434,8 +602,7 @@ def _match_version_string(version):
:param str version: A version string in the form x.x.x
"""
- regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
- matched = regex.match(version)
+ matched = _VERSION_STRING_REGEX.match(version)
g = matched.groups()
major, minor, micro = int(g[0]), int(g[2]), int(g[4])
return (major, minor, micro)
@@ -485,7 +652,7 @@ def _utc_epoch():
"""Get the seconds since epoch."""
return int(mktime(localtime()))
-def _which(executable, flags=os.X_OK):
+def _which(executable, flags=os.X_OK, abspath_only=False, disallow_symlinks=False):
"""Borrowed from Twisted's :mod:twisted.python.proutils .
Search PATH for executable files with the given name.
@@ -508,6 +675,17 @@ def _which(executable, flags=os.X_OK):
:returns: A list of the full paths to files found, in the order in which
they were found.
"""
+ def _can_allow(p):
+ if not os.access(p, flags):
+ return False
+ if abspath_only and not os.path.abspath(p):
+ log.warn('Ignoring %r (path is not absolute)', p)
+ return False
+ if disallow_symlinks and os.path.islink(p):
+ log.warn('Ignoring %r (path is a symlink)', p)
+ return False
+ return True
+
result = []
exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep))
path = os.environ.get('PATH', None)
@@ -515,11 +693,11 @@ def _which(executable, flags=os.X_OK):
return []
for p in os.environ.get('PATH', '').split(os.pathsep):
p = os.path.join(p, executable)
- if os.access(p, flags):
+ if _can_allow(p):
result.append(p)
for e in exts:
pext = p + e
- if os.access(pext, flags):
+ if _can_allow(pext):
result.append(pext)
return result
diff --git a/gnupg/_version.py b/gnupg/_version.py
index 1a0cb9d..fede8ae 100644
--- a/gnupg/_version.py
+++ b/gnupg/_version.py
@@ -4,8 +4,8 @@
# unpacked source archive. Distribution tarballs contain a pre-generated copy
# of this file.
-version_version = '1.3.1'
-version_full = '87928205a87afc66779b9760df039642eed291b2'
+version_version = '2.0.2'
+version_full = '4f1b1f6a8d16df9d4e1f29ba9223f05889131189'
def get_versions(default={}, verbose=False):
return {'version': version_version, 'full': version_full}
diff --git a/gnupg/gnupg.py b/gnupg/gnupg.py
index 9aa8232..215233e 100644
--- a/gnupg/gnupg.py
+++ b/gnupg/gnupg.py
@@ -36,13 +36,7 @@ import os
import re
import textwrap
-try:
- from io import StringIO
-except ImportError:
- from cStringIO import StringIO
-
#: see :pep:`328` http://docs.python.org/2.5/whatsnew/pep-328.html
-from . import _parsers
from . import _util
from . import _trust
from ._meta import GPGBase
@@ -66,7 +60,7 @@ class GPG(GPGBase):
def __init__(self, binary=None, homedir=None, verbose=False,
use_agent=False, keyring=None, secring=None,
- options=None):
+ ignore_homedir_permissions=False, options=None):
"""Initialize a GnuPG process wrapper.
:param str binary: Name for GnuPG binary executable. If the absolute
@@ -79,6 +73,10 @@ class GPG(GPGBase):
and private keyrings. Default is whatever GnuPG
defaults to.
+ :type ignore_homedir_permissions: :obj:`bool`
+ :param ignore_homedir_permissions: If true, bypass check that homedir
+ be writable.
+
:type verbose: :obj:`str` or :obj:`int` or :obj:`bool`
:param verbose: String or numeric value to pass to GnuPG's
``--debug-level`` option. See the GnuPG man page for
@@ -123,12 +121,16 @@ class GPG(GPGBase):
secring=secring,
options=options,
verbose=verbose,
- use_agent=use_agent,)
+ use_agent=use_agent,
+ ignore_homedir_permissions=ignore_homedir_permissions,
+ )
log.info(textwrap.dedent("""
Initialised settings:
binary: %s
+ binary version: %s
homedir: %s
+ ignore_homedir_permissions: %s
keyring: %s
secring: %s
default_preference_list: %s
@@ -136,9 +138,16 @@ class GPG(GPGBase):
options: %s
verbose: %s
use_agent: %s
- """ % (self.binary, self.homedir, self.keyring, self.secring,
- self.default_preference_list, self.keyserver, self.options,
- str(self.verbose), str(self.use_agent))))
+ """ % (self.binary,
+ self.binary_version,
+ self.homedir,
+ self.ignore_homedir_permissions,
+ self.keyring,
+ self.secring,
+ self.default_preference_list,
+ self.keyserver, self.options,
+ str(self.verbose),
+ str(self.use_agent))))
self._batch_dir = os.path.join(self.homedir, 'batch-files')
self._key_dir = os.path.join(self.homedir, 'generated-keys')
@@ -147,58 +156,52 @@ class GPG(GPGBase):
self.temp_keyring = None
#: The secring used in the most recently created batch file
self.temp_secring = None
- #: The version string of our GnuPG binary
- self.binary_version = str()
-
- ## check that everything runs alright, and grab the gpg binary's
- ## version number while we're at it:
- proc = self._open_subprocess(["--list-config", "--with-colons"])
- result = self._result_map['list'](self)
- self._read_data(proc.stdout, result)
- if proc.returncode:
- raise RuntimeError("Error invoking gpg: %s" % result.data)
-
- version_line = str(result.data).partition(':version:')[2]
- self.binary_version = version_line.split('\n')[0]
- log.debug("Using GnuPG version %s" % self.binary_version)
- if _util._is_gpg2:
- # Make GnuPG>=2.0.0-only methods public:
- self.fix_trustdb = self._fix_trustdb
- self.import_ownertrust = self._import_ownertrust
- self.export_ownertrust = self._export_ownertrust
+ # Make sure that the trustdb exists, or else GnuPG will exit with a
+ # fatal error (at least it does with GnuPG>=2.0.0):
+ self.create_trustdb()
- # Make sure that the trustdb exists, or else GnuPG will exit with
- # a fatal error (at least it does with GnuPG>=2.0.0):
- self._create_trustdb()
+ # The --no-use-agent and --use-agent options were deprecated in GnuPG
+ # 2.x, so we should set use_agent to None here to avoid having
+ # GPGBase._make_args() add either one.
+ if self.is_gpg2():
+ self.use_agent = None
@functools.wraps(_trust._create_trustdb)
- def _create_trustdb(self):
+ def create_trustdb(self):
if self.is_gpg2():
_trust._create_trustdb(self)
else:
log.info("Creating the trustdb is only available with GnuPG>=2.x")
+ # For backward compatibility with python-gnupg<=1.3.1:
+ _create_trustdb = create_trustdb
@functools.wraps(_trust.fix_trustdb)
- def _fix_trustdb(self, trustdb=None):
+ def fix_trustdb(self, trustdb=None):
if self.is_gpg2():
_trust.fix_trustdb(self)
else:
log.info("Fixing the trustdb is only available with GnuPG>=2.x")
+ # For backward compatibility with python-gnupg<=1.3.1:
+ _fix_trustdb = fix_trustdb
@functools.wraps(_trust.import_ownertrust)
- def _import_ownertrust(self, trustdb=None):
+ def import_ownertrust(self, trustdb=None):
if self.is_gpg2():
_trust.import_ownertrust(self)
else:
log.info("Importing ownertrust is only available with GnuPG>=2.x")
+ # For backward compatibility with python-gnupg<=1.3.1:
+ _import_ownertrust = import_ownertrust
@functools.wraps(_trust.export_ownertrust)
- def _export_ownertrust(self, trustdb=None):
+ def export_ownertrust(self, trustdb=None):
if self.is_gpg2():
_trust.export_ownertrust(self)
else:
log.info("Exporting ownertrust is only available with GnuPG>=2.x")
+ # For backward compatibility with python-gnupg<=1.3.1:
+ _export_ownertrust = export_ownertrust
def is_gpg1(self):
"""Returns true if using GnuPG <= 1.x."""
@@ -284,15 +287,13 @@ class GPG(GPGBase):
signatures. If using detached signatures, the file containing the
detached signature should be specified as the ``sig_file``.
- :param file file: A file descriptor object. Its type will be checked
- with :func:`_util._is_file`.
+ :param file file: A file descriptor object.
:param str sig_file: A file containing the GPG signature data for
``file``. If given, ``file`` is verified via this detached
- signature.
+ signature. Its type will be checked with :func:`_util._is_file`.
"""
- fn = None
result = self._result_map['verify'](self)
if sig_file is None:
@@ -307,19 +308,15 @@ class GPG(GPGBase):
return result
log.debug('verify_file(): Handling detached verification')
sig_fh = None
- data_fh = None
try:
sig_fh = open(sig_file, 'rb')
- data_fh = open(file, 'rb')
args = ["--verify %s -" % sig_fh.name]
proc = self._open_subprocess(args)
- writer = _util._threaded_copy_data(data_fh, proc.stdin)
+ writer = _util._threaded_copy_data(file, proc.stdin)
self._collect_output(proc, result, writer, stdin=proc.stdin)
finally:
if sig_fh and not sig_fh.closed:
sig_fh.close()
- if data_fh and not data_fh.closed:
- data_fh.close()
return result
def import_keys(self, key_data):
@@ -488,19 +485,7 @@ class GPG(GPGBase):
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self._encoding,
self._decode_errors).splitlines()
- valid_keywords = 'pub uid sec fpr sub'.split()
- for line in lines:
- if self.verbose:
- print(line)
- log.debug("%r", line.rstrip())
- if not line:
- break
- L = line.strip().split(':')
- if not L:
- continue
- keyword = L[0]
- if keyword in valid_keywords:
- getattr(result, keyword)(L)
+ self._parse_keys(result)
return result
def list_packets(self, raw_data):
@@ -521,8 +506,8 @@ class GPG(GPGBase):
>>> assert key.fingerprint
:rtype: dict
- :returns: A dictionary whose keys are the original keyid parameters,
- and whose values are lists of signatures.
+ :returns: res.sigs is a dictionary whose keys are the uids and whose
+ values are a set of signature keyids.
"""
if len(keyids) > self._batch_limit:
raise ValueError(
@@ -537,8 +522,26 @@ class GPG(GPGBase):
proc = self._open_subprocess(args)
result = self._result_map['list'](self)
self._collect_output(proc, result, stdin=proc.stdin)
+ self._parse_keys(result)
return result
+ def _parse_keys(self, result):
+ lines = result.data.decode(self._encoding,
+ self._decode_errors).splitlines()
+ valid_keywords = 'pub uid sec fpr sub sig'.split()
+ for line in lines:
+ if self.verbose:
+ print(line)
+ log.debug("%r", line.rstrip())
+ if not line:
+ break
+ L = line.strip().split(':')
+ if not L:
+ continue
+ keyword = L[0]
+ if keyword in valid_keywords:
+ getattr(result, keyword)(L)
+
def gen_key(self, input):
"""Generate a GnuPG key through batch file key generation. See
:meth:`GPG.gen_key_input()` for creating the control input.
@@ -798,7 +801,7 @@ class GPG(GPGBase):
key = key.replace('_','-').title()
## to set 'cert', 'Key-Usage' must be blank string
if not key in ('Key-Usage', 'Subkey-Usage'):
- if str(val).strip():
+ if type('')(val).strip():
parms[key] = val
## if Key-Type is 'default', make Subkey-Type also be 'default'
@@ -941,6 +944,13 @@ generate keys. Please see
'The crow flies at midnight.'
+ :param bool throw_keyids: If True, make all **recipients** keyids be
+ zero'd out in packet information. This is the same as using
+ **hidden_recipients** for all **recipients**. (Default: False).
+
+ :param list hidden_recipients: A list of recipients that should have
+ their keyids zero'd out in packet information.
+
:param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config ciphername`.
@@ -956,7 +966,10 @@ generate keys. Please see
.. seealso:: :meth:`._encrypt`
"""
- stream = _make_binary_stream(data, self._encoding)
+ if _is_stream(data):
+ stream = data
+ else:
+ stream = _make_binary_stream(data, self._encoding)
result = self._encrypt(stream, recipients, **kwargs)
stream.close()
return result