summaryrefslogtreecommitdiff
path: root/gnupg/_meta.py
diff options
context:
space:
mode:
Diffstat (limited to 'gnupg/_meta.py')
-rw-r--r--gnupg/_meta.py250
1 files changed, 198 insertions, 52 deletions
diff --git a/gnupg/_meta.py b/gnupg/_meta.py
index 3aafacd..32ab287 100644
--- a/gnupg/_meta.py
+++ b/gnupg/_meta.py
@@ -32,14 +32,22 @@ import encodings
import locale
import os
import platform
-import psutil
import shlex
import subprocess
import sys
import threading
+## Using psutil is recommended, but since the extension doesn't run with the
+## PyPy interpreter, we'll run even if it's not present.
+try:
+ import psutil
+except ImportError:
+ psutil = None
+
from . import _parsers
from . import _util
+from ._util import b
+from ._util import s
from ._parsers import _check_preferences
from ._parsers import _sanitise_list
@@ -75,19 +83,49 @@ class GPGMeta(type):
instance containing the gpg-agent process' information to
``cls._agent_proc``.
+ For Unix systems, we check that the effective UID of this
+ ``python-gnupg`` process is also the owner of the gpg-agent
+ process. For Windows, we check that the usernames of the owners are
+ the same. (Sorry Windows users; maybe you should switch to anything
+ else.)
+
+ .. note: This function will only run if the psutil_ Python extension
+ is installed. Because psutil won't run with the PyPy interpreter,
+ use of it is optional (although highly recommended).
+
+ .. _psutil: https://pypi.python.org/pypi/psutil
+
:returns: True if there exists a gpg-agent process running under the
same effective user ID as that of this program. Otherwise,
- returns None.
+ returns False.
"""
- identity = psutil.Process(os.getpid()).uids
+ if not psutil:
+ return False
+
+ this_process = psutil.Process(os.getpid())
+ ownership_match = False
+
+ if _util._running_windows:
+ identity = this_process.username()
+ else:
+ identity = this_process.uids
+
for proc in psutil.process_iter():
if (proc.name == "gpg-agent") and proc.is_running:
log.debug("Found gpg-agent process with pid %d" % proc.pid)
- if proc.uids == identity:
- log.debug(
- "Effective UIDs of this process and gpg-agent match")
- setattr(cls, '_agent_proc', proc)
- return True
+ if _util._running_windows:
+ if proc.username() == identity:
+ ownership_match = True
+ else:
+ if proc.uids == identity:
+ ownership_match = True
+
+ if ownership_match:
+ log.debug("Effective UIDs of this process and gpg-agent match")
+ setattr(cls, '_agent_proc', proc)
+ return True
+
+ return False
class GPGBase(object):
@@ -111,7 +149,7 @@ class GPGBase(object):
def __init__(self, binary=None, home=None, keyring=None, secring=None,
use_agent=False, default_preference_list=None,
- verbose=False, options=None):
+ ignore_homedir_permissions=False, verbose=False, options=None):
"""Create a ``GPGBase``.
This class is used to set up properties for controlling the behaviour
@@ -134,13 +172,18 @@ class GPGBase(object):
:ivar str secring: The filename in **homedir** to use as the keyring
file for secret keys.
"""
+ self.ignore_homedir_permissions = ignore_homedir_permissions
self.binary = _util._find_binary(binary)
- self.homedir = home if home else _util._conf
+ self.homedir = os.path.expanduser(home) if home else _util._conf
pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
self.keyring = os.path.join(self._homedir, pub)
self.secring = os.path.join(self._homedir, sec)
- self.options = _parsers._sanitise(options) if options else None
+ self.options = list(_parsers._sanitise_list(options)) if options else None
+
+ #: The version string of our GnuPG binary
+ self.binary_version = '0.0.0'
+ self.verbose = False
if default_preference_list:
self._prefs = _check_preferences(default_preference_list, 'all')
@@ -155,6 +198,14 @@ class GPGBase(object):
self._filesystemencoding = encodings.normalize_encoding(
sys.getfilesystemencoding().lower())
+ # Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49
+ #
+ # During `line = stream.readline()` in `_read_response()`, the Python
+ # codecs module will choke on Unicode data, so we globally monkeypatch
+ # the "strict" error handler to use the builtin `replace_errors`
+ # handler:
+ codecs.register_error('strict', codecs.replace_errors)
+
self._keyserver = 'hkp://wwwkeys.pgp.net'
self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
@@ -164,18 +215,12 @@ class GPGBase(object):
"'verbose' must be boolean, string, or 0 <= n <= 9"
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
if self.options is not None:
- assert isinstance(self.options, str), "options not string"
+ assert isinstance(self.options, list), "options not list"
except (AssertionError, AttributeError) as ae:
log.error("GPGBase.__init__(): %s" % str(ae))
raise RuntimeError(str(ae))
else:
- if verbose is True:
- # The caller wants logging, but we need a valid --debug-level
- # for gpg. Default to "basic", and warn about the ambiguity.
- # (garrettr)
- verbose = "basic"
- log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging')
- self.verbose = verbose
+ self._set_verbose(verbose)
self.use_agent = use_agent
if hasattr(self, '_agent_proc') \
@@ -183,6 +228,9 @@ class GPGBase(object):
if hasattr(self, '__remove_path__'):
self.__remove_path__('pinentry')
+ # Assign our self.binary_version attribute:
+ self._check_sane_and_get_gpg_version()
+
def __remove_path__(self, prog=None, at_exit=True):
"""Remove the directories containing a program from the system's
``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it
@@ -368,18 +416,21 @@ class GPGBase(object):
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
_util._create_if_necessary(hd)
- try:
- log.debug("GPGBase._homedir_setter(): checking permissions")
- assert _util._has_readwrite(hd), \
- "Homedir '%s' needs read/write permissions" % hd
- except AssertionError as ae:
- msg = ("Unable to set '%s' as GnuPG homedir" % directory)
- log.debug("GPGBase.homedir.setter(): %s" % msg)
- log.debug(str(ae))
- raise RuntimeError(str(ae))
- else:
- log.info("Setting homedir to '%s'" % hd)
+ if self.ignore_homedir_permissions:
self._homedir = hd
+ else:
+ try:
+ log.debug("GPGBase._homedir_setter(): checking permissions")
+ assert _util._has_readwrite(hd), \
+ "Homedir '%s' needs read/write permissions" % hd
+ except AssertionError as ae:
+ msg = ("Unable to set '%s' as GnuPG homedir" % directory)
+ log.debug("GPGBase.homedir.setter(): %s" % msg)
+ log.debug(str(ae))
+ raise RuntimeError(str(ae))
+ else:
+ log.info("Setting homedir to '%s'" % hd)
+ self._homedir = hd
homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
@@ -436,6 +487,24 @@ class GPGBase(object):
_generated_keys = _util.InheritableProperty(_generated_keys_getter,
_generated_keys_setter)
+ def _check_sane_and_get_gpg_version(self):
+ """Check that everything runs alright, and grab the gpg binary's
+ version number while we're at it, storing it as :data:`binary_version`.
+
+ :raises RuntimeError: if we cannot invoke the gpg binary.
+ """
+ proc = self._open_subprocess(["--list-config", "--with-colons"])
+ result = self._result_map['list'](self)
+ self._read_data(proc.stdout, result)
+ if proc.returncode:
+ raise RuntimeError("Error invoking gpg: %s" % result.data)
+ else:
+ proc.terminate()
+
+ version_line = str(result.data).partition(':version:')[2]
+ self.binary_version = version_line.split('\n')[0]
+ log.debug("Using GnuPG version %s" % self.binary_version)
+
def _make_args(self, args, passphrase=False):
"""Make a list of command line elements for GPG.
@@ -470,21 +539,29 @@ class GPGBase(object):
if passphrase: cmd.append('--batch --passphrase-fd 0')
- if self.use_agent: cmd.append('--use-agent')
- else: cmd.append('--no-use-agent')
+ if self.use_agent is True: cmd.append('--use-agent')
+ elif self.use_agent is False: cmd.append('--no-use-agent')
+
+ # The arguments for debugging and verbosity should be placed into the
+ # cmd list before the options/args in order to resolve Issue #76:
+ # https://github.com/isislovecruft/python-gnupg/issues/76
+ if self.verbose:
+ cmd.append('--debug-all')
+
+ if (isinstance(self.verbose, str) or
+ (isinstance(self.verbose, int) and (self.verbose >= 1))):
+ # GnuPG<=1.4.18 parses the `--debug-level` command in a way
+ # that is incompatible with all other GnuPG versions. :'(
+ if self.binary_version and (self.binary_version <= '1.4.18'):
+ cmd.append('--debug-level=%s' % self.verbose)
+ else:
+ cmd.append('--debug-level %s' % self.verbose)
if self.options:
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
if args:
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
- if self.verbose:
- cmd.append('--debug-all')
- if ((isinstance(self.verbose, str) and
- self.verbose in ['basic', 'advanced', 'expert', 'guru'])
- or (isinstance(self.verbose, int) and (1<=self.verbose<=9))):
- cmd.append('--debug-level %s' % self.verbose)
-
return cmd
def _open_subprocess(self, args=None, passphrase=False):
@@ -592,6 +669,36 @@ class GPGBase(object):
log.debug("Finishing reading from stream %r..." % stream.__repr__())
log.debug("Read %4d bytes total" % len(result.data))
+ def _set_verbose(self, verbose):
+ """Check and set our :data:`verbose` attribute.
+ The debug-level must be a string or an integer. If it is one of
+ the allowed strings, GnuPG will translate it internally to it's
+ corresponding integer level:
+
+ basic = 1-2
+ advanced = 3-5
+ expert = 6-8
+ guru = 9+
+
+ If it's not one of the recognised string levels, then then
+ entire argument is ignored by GnuPG. :(
+
+ To fix that stupid behaviour, if they wanted debugging but typo'd
+ the string level (or specified ``verbose=True``), we'll default to
+ 'basic' logging.
+ """
+ string_levels = ('basic', 'advanced', 'expert', 'guru')
+
+ if verbose is True:
+ # The caller wants logging, but we need a valid --debug-level
+ # for gpg. Default to "basic", and warn about the ambiguity.
+ verbose = 'basic'
+
+ if (isinstance(verbose, str) and not (verbose in string_levels)):
+ verbose = 'basic'
+
+ self.verbose = verbose
+
def _collect_output(self, process, result, writer=None, stdin=None):
"""Drain the subprocesses output streams, writing the collected output
to the result. If a writer thread (writing to the subprocess) is given,
@@ -699,6 +806,19 @@ class GPGBase(object):
## We could use _handle_io here except for the fact that if the
## passphrase is bad, gpg bails and you can't write the message.
result = self._result_map['sign'](self)
+
+ ## If the passphrase is an empty string, the message up to and
+ ## including its first newline will be cut off before making it to the
+ ## GnuPG process. Therefore, if the passphrase='' or passphrase=b'',
+ ## we set passphrase=None. See Issue #82:
+ ## https://github.com/isislovecruft/python-gnupg/issues/82
+ if _util._is_string(passphrase):
+ passphrase = passphrase if len(passphrase) > 0 else None
+ elif _util._is_bytes(passphrase):
+ passphrase = s(passphrase) if len(passphrase) > 0 else None
+ else:
+ passphrase = None
+
proc = self._open_subprocess(args, passphrase is not None)
try:
if passphrase:
@@ -718,6 +838,8 @@ class GPGBase(object):
symmetric=False,
always_trust=True,
output=None,
+ throw_keyids=False,
+ hidden_recipients=None,
cipher_algo='AES256',
digest_algo='SHA512',
compress_algo='ZLIB'):
@@ -790,6 +912,14 @@ class GPGBase(object):
>>> decrypted
'The crow flies at midnight.'
+
+ :param bool throw_keyids: If True, make all **recipients** keyids be
+ zero'd out in packet information. This is the same as using
+ **hidden_recipients** for all **recipients**. (Default: False).
+
+ :param list hidden_recipients: A list of recipients that should have
+ their keyids zero'd out in packet information.
+
:param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config
@@ -841,6 +971,7 @@ class GPGBase(object):
## is decryptable with a passphrase or secretkey.
if symmetric: args.append('--symmetric')
if encrypt: args.append('--encrypt')
+ if throw_keyids: args.append('--throw-keyids')
if len(recipients) >= 1:
log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
@@ -856,39 +987,54 @@ class GPGBase(object):
log.info("Can't accept recipient string: %s"
% recp)
else:
- args.append('--recipient %s' % str(recp))
+ self._add_recipient_string(args, hidden_recipients, str(recp))
continue
## will give unicode in 2.x as '\uXXXX\uXXXX'
- args.append('--recipient %r' % recp)
+ if isinstance(hidden_recipients, (list, tuple)):
+ if [s for s in hidden_recipients if recp in str(s)]:
+ args.append('--hidden-recipient %r' % recp)
+ else:
+ args.append('--recipient %r' % recp)
+ else:
+ args.append('--recipient %r' % recp)
continue
if isinstance(recp, str):
- args.append('--recipient %s' % recp)
+ self._add_recipient_string(args, hidden_recipients, recp)
elif (not _util._py3k) and isinstance(recp, basestring):
for recp in recipients.split('\x20'):
- args.append('--recipient %s' % recp)
+ self._add_recipient_string(args, hidden_recipients, recp)
elif _util._py3k and isinstance(recp, str):
for recp in recipients.split(' '):
- args.append('--recipient %s' % recp)
+ self._add_recipient_string(args, hidden_recipients, recp)
## ...and now that we've proven py3k is better...
-
else:
- log.debug("Don't know what to do with recipients: '%s'"
+ log.debug("Don't know what to do with recipients: %r"
% recipients)
result = self._result_map['crypt'](self)
- log.debug("Got data '%s' with type '%s'."
- % (data, type(data)))
- self._handle_io(args, data, result,
- passphrase=passphrase, binary=True)
- log.debug("\n%s" % result.data)
+ log.debug("Got data '%s' with type '%s'." % (data, type(data)))
+ self._handle_io(args, data, result, passphrase=passphrase, binary=True)
+ # Avoid writing raw encrypted bytes to terminal loggers and breaking
+ # them in that adorable way where they spew hieroglyphics until reset:
+ if armor:
+ log.debug("\n%s" % result.data)
if output_filename:
log.info("Writing encrypted output to file: %s" % output_filename)
- with open(output_filename, 'w+') as fh:
+ with open(output_filename, 'wb') as fh:
fh.write(result.data)
fh.flush()
log.info("Encrypted output written successfully.")
return result
+
+ def _add_recipient_string(self, args, hidden_recipients, recipient):
+ if isinstance(hidden_recipients, (list, tuple)):
+ if [s for s in hidden_recipients if recipient in str(s)]:
+ args.append('--hidden-recipient %s' % recipient)
+ else:
+ args.append('--recipient %s' % recipient)
+ else:
+ args.append('--recipient %s' % recipient)