From 4bd0fa843176a112c054929fbe6dd99f45d718a2 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 18 Aug 2014 12:52:50 -0500 Subject: Imported Upstream version 1.3.1 --- docs/_build/html/_modules/gnupg.html | 148 +++ docs/_build/html/_modules/gnupg/_meta.html | 981 ++++++++++++++++ docs/_build/html/_modules/gnupg/_parsers.html | 1486 +++++++++++++++++++++++++ docs/_build/html/_modules/gnupg/_util.html | 718 ++++++++++++ docs/_build/html/_modules/index.html | 104 ++ 5 files changed, 3437 insertions(+) create mode 100644 docs/_build/html/_modules/gnupg.html create mode 100644 docs/_build/html/_modules/gnupg/_meta.html create mode 100644 docs/_build/html/_modules/gnupg/_parsers.html create mode 100644 docs/_build/html/_modules/gnupg/_util.html create mode 100644 docs/_build/html/_modules/index.html (limited to 'docs/_build/html/_modules') diff --git a/docs/_build/html/_modules/gnupg.html b/docs/_build/html/_modules/gnupg.html new file mode 100644 index 0000000..24c2170 --- /dev/null +++ b/docs/_build/html/_modules/gnupg.html @@ -0,0 +1,148 @@ + + + + + + + + gnupg — gnupg unknown documentation + + + + + + + + + + + + +
+ +
+ +
+
+
+ +
+
+
+ +

Source code for gnupg

+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# This file is part of python-gnupg, a Python interface to GnuPG.
+# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
+#           © 2013 Andrej B.
+#           © 2013 LEAP Encryption Access Project
+#           © 2008-2012 Vinay Sajip
+#           © 2005 Steve Traugott
+#           © 2004 A.M. Kuchling
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
+
+from __future__ import absolute_import
+
+from . import gnupg
+from . import copyleft
+from . import _ansistrm
+from . import _logger
+from . import _meta
+from . import _parsers
+from . import _util
+from .gnupg import GPG
+from ._version import get_versions
+
+__version__ = get_versions()['version']
+__authors__ = copyleft.authors
+__license__ = copyleft.full_text
+__copyleft__ = copyleft.copyright
+
+## do not set __package__ = "gnupg", else we will end up with
+## gnupg.<*allofthethings*>
+__all__ = ["GPG", "_util", "_parsers", "_meta", "_logger"]
+
+## avoid the "from gnupg import gnupg" idiom
+del gnupg
+del absolute_import
+del copyleft
+del get_versions
+del _version
+
+ +
+
+
+
+ +
+
+
+ + + + + \ No newline at end of file diff --git a/docs/_build/html/_modules/gnupg/_meta.html b/docs/_build/html/_modules/gnupg/_meta.html new file mode 100644 index 0000000..23ca880 --- /dev/null +++ b/docs/_build/html/_modules/gnupg/_meta.html @@ -0,0 +1,981 @@ + + + + + + + + gnupg._meta — gnupg unknown documentation + + + + + + + + + + + + +
+ +
+ +
+
+
+ +
+
+
+ +

Source code for gnupg._meta

+# -*- coding: utf-8 -*-
+#
+# This file is part of python-gnupg, a Python interface to GnuPG.
+# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
+#           © 2013 Andrej B.
+#           © 2013 LEAP Encryption Access Project
+#           © 2008-2012 Vinay Sajip
+#           © 2005 Steve Traugott
+#           © 2004 A.M. Kuchling
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
+
+'''Meta and base classes for hiding internal functions, and controlling
+attribute creation and handling.
+'''
+
+from __future__ import absolute_import
+
+import atexit
+import codecs
+import encodings
+## For AOS, the locale module will need to point to a wrapper around the
+## java.util.Locale class.
+## See https://code.patternsinthevoid.net/?p=android-locale-hack.git
+import locale
+import os
+import platform
+import psutil
+import shlex
+import subprocess
+import sys
+import threading
+
+from . import _parsers
+from . import _util
+
+from ._parsers import _check_preferences
+from ._parsers import _sanitise_list
+from ._util    import log
+
+
+
[docs]class GPGMeta(type): + """Metaclass for changing the :meth:GPG.__init__ initialiser. + + Detects running gpg-agent processes and the presence of a pinentry + program, and disables pinentry so that python-gnupg can write the + passphrase to the controlled GnuPG process without killing the agent. + + :attr _agent_proc: If a :program:`gpg-agent` process is currently running + for the effective userid, then **_agent_proc** will be + set to a ``psutil.Process`` for that process. + """ + +
[docs] def __new__(cls, name, bases, attrs): + """Construct the initialiser for GPG""" + log.debug("Metaclass __new__ constructor called for %r" % cls) + if cls._find_agent(): + ## call the normal GPG.__init__() initialiser: + attrs['init'] = cls.__init__ + attrs['_remove_agent'] = True + return super(GPGMeta, cls).__new__(cls, name, bases, attrs) +
+ @classmethod +
[docs] def _find_agent(cls): + """Discover if a gpg-agent process for the current euid is running. + + If there is a matching gpg-agent process, set a :class:`psutil.Process` + instance containing the gpg-agent process' information to + ``cls._agent_proc``. + + :returns: True if there exists a gpg-agent process running under the + same effective user ID as that of this program. Otherwise, + returns None. + """ + identity = psutil.Process(os.getpid()).uids + for proc in psutil.process_iter(): + if (proc.name == "gpg-agent") and proc.is_running: + log.debug("Found gpg-agent process with pid %d" % proc.pid) + if proc.uids == identity: + log.debug( + "Effective UIDs of this process and gpg-agent match") + setattr(cls, '_agent_proc', proc) + return True + +
+
[docs]class GPGBase(object): + """Base class for storing properties and controlling process initialisation. + + :const _result_map: A *dict* containing classes from + :mod:`~gnupg._parsers`, used for parsing results + obtained from GnuPG commands. + :const _decode_errors: How to handle encoding errors. + """ + __metaclass__ = GPGMeta + _decode_errors = 'strict' + _result_map = { 'crypt': _parsers.Crypt, + 'delete': _parsers.DeleteResult, + 'generate': _parsers.GenKey, + 'import': _parsers.ImportResult, + 'list': _parsers.ListKeys, + 'sign': _parsers.Sign, + 'verify': _parsers.Verify, + 'packets': _parsers.ListPackets } + +
[docs] def __init__(self, binary=None, home=None, keyring=None, secring=None, + use_agent=False, default_preference_list=None, + verbose=False, options=None): + """Create a ``GPGBase``. + + This class is used to set up properties for controlling the behaviour + of configuring various options for GnuPG, such as setting GnuPG's + **homedir** , and the paths to its **binary** and **keyring** . + + :const binary: (:obj:`str`) The full path to the GnuPG binary. + + :ivar homedir: (:class:`~gnupg._util.InheritableProperty`) The full + path to the current setting for the GnuPG + ``--homedir``. + + :ivar _generated_keys: (:class:`~gnupg._util.InheritableProperty`) + Controls setting the directory for storing any + keys which are generated with + :meth:`~gnupg.GPG.gen_key`. + + :ivar str keyring: The filename in **homedir** to use as the keyring + file for public keys. + :ivar str secring: The filename in **homedir** to use as the keyring + file for secret keys. + """ + self.binary = _util._find_binary(binary) + self.homedir = home if home else _util._conf + pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg' + sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg' + self.keyring = os.path.join(self._homedir, pub) + self.secring = os.path.join(self._homedir, sec) + self.options = _parsers._sanitise(options) if options else None + + if default_preference_list: + self._prefs = _check_preferences(default_preference_list, 'all') + else: + self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH' + self._prefs += ' AES192 ZLIB ZIP Uncompressed' + + encoding = locale.getpreferredencoding() + if encoding is None: # This happens on Jython! + encoding = sys.stdin.encoding + self._encoding = encoding.lower().replace('-', '_') + self._filesystemencoding = encodings.normalize_encoding( + sys.getfilesystemencoding().lower()) + + self._keyserver = 'hkp://wwwkeys.pgp.net' + self.__generated_keys = os.path.join(self.homedir, 'generated-keys') + + try: + assert self.binary, "Could not find binary %s" % binary + assert isinstance(verbose, (bool, str, int)), \ + "'verbose' must be boolean, string, or 0 <= n <= 9" + assert isinstance(use_agent, bool), "'use_agent' must be boolean" + if self.options is not None: + assert isinstance(self.options, str), "options not string" + except (AssertionError, AttributeError) as ae: + log.error("GPGBase.__init__(): %s" % str(ae)) + raise RuntimeError(str(ae)) + else: + if verbose is True: + # The caller wants logging, but we need a valid --debug-level + # for gpg. Default to "basic", and warn about the ambiguity. + # (garrettr) + verbose = "basic" + log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging') + self.verbose = verbose + self.use_agent = use_agent + + if hasattr(self, '_agent_proc') \ + and getattr(self, '_remove_agent', None) is True: + if hasattr(self, '__remove_path__'): + self.__remove_path__('pinentry') +
+
[docs] def __remove_path__(self, prog=None, at_exit=True): + """Remove the directories containing a program from the system's + ``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it + is linked to :file:'./gpg' in the current directory. + + :param str prog: The program to remove from ``$PATH``. + :param bool at_exit: Add the program back into the ``$PATH`` when the + Python interpreter exits, and delete any symlinks + to ``GPGBase.binary`` which were created. + """ + #: A list of ``$PATH`` entries which were removed to disable pinentry. + self._removed_path_entries = [] + + log.debug("Attempting to remove %s from system PATH" % str(prog)) + if (prog is None) or (not isinstance(prog, str)): return + + try: + program = _util._which(prog)[0] + except (OSError, IOError, IndexError) as err: + log.err(str(err)) + log.err("Cannot find program '%s', not changing PATH." % prog) + return + + ## __remove_path__ cannot be an @classmethod in GPGMeta, because + ## the use_agent attribute must be set by the instance. + if not self.use_agent: + program_base = os.path.dirname(prog) + gnupg_base = os.path.dirname(self.binary) + + ## symlink our gpg binary into $PWD if the path we are removing is + ## the one which contains our gpg executable: + new_gpg_location = os.path.join(os.getcwd(), 'gpg') + if gnupg_base == program_base: + os.symlink(self.binary, new_gpg_location) + self.binary = new_gpg_location + + ## copy the original environment so that we can put it back later: + env_copy = os.environ ## this one should not be touched + path_copy = os.environ.pop('PATH') + log.debug("Created a copy of system PATH: %r" % path_copy) + assert not os.environ.has_key('PATH'), "OS env kept $PATH anyway!" + + @staticmethod + def remove_program_from_path(path, prog_base): + """Remove all directories which contain a program from PATH. + + :param str path: The contents of the system environment's + ``$PATH``. + + :param str prog_base: The directory portion of a program's + location, without the trailing slash, + and without the program name. For + example, ``prog_base='/usr/bin'``. + """ + paths = path.split(':') + for directory in paths: + if directory == prog_base: + log.debug("Found directory with target program: %s" + % directory) + path.remove(directory) + self._removed_path_entries.append(directory) + log.debug("Deleted all found instance of %s." % directory) + log.debug("PATH is now:%s%s" % (os.linesep, path)) + new_path = ':'.join([p for p in path]) + return new_path + + @staticmethod + def update_path(environment, path): + """Add paths to the string at ``os.environ['PATH']``. + + :param str environment: The environment mapping to update. + :param list path: A list of strings to update the PATH with. + """ + log.debug("Updating system path...") + os.environ = environment + new_path = ':'.join([p for p in path]) + old = '' + if 'PATH' in os.environ: + new_path = ':'.join([os.environ['PATH'], new_path]) + os.environ.update({'PATH': new_path}) + log.debug("System $PATH: %s" % os.environ['PATH']) + + modified_path = remove_program_from_path(path_copy, program_base) + update_path(env_copy, modified_path) + + ## register an _exithandler with the python interpreter: + atexit.register(update_path, env_copy, path_copy) + + def remove_symlinked_binary(symlink): + if os.path.islink(symlink): + os.unlink(symlink) + log.debug("Removed binary symlink '%s'" % symlink) + atexit.register(remove_symlinked_binary, new_gpg_location) +
+ @property + def default_preference_list(self): + """Get the default preference list.""" + return self._prefs + + @default_preference_list.setter + def default_preference_list(self, prefs): + """Set the default preference list. + + :param str prefs: A string containing the default preferences for + ciphers, digests, and compression algorithms. + """ + prefs = _check_preferences(prefs) + if prefs is not None: + self._prefs = prefs + + @default_preference_list.deleter +
[docs] def default_preference_list(self): + """Reset the default preference list to its original state. + + Note that "original state" does not mean the default preference + list for whichever version of GnuPG is being used. It means the + default preference list defined by :attr:`GPGBase._prefs`. + + Using BZIP2 is avoided due to not interacting well with some versions + of GnuPG>=2.0.0. + """ + self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP' +
+ @property + def keyserver(self): + """Get the current keyserver setting.""" + return self._keyserver + + @keyserver.setter + def keyserver(self, location): + """Set the default keyserver to use for sending and receiving keys. + + The ``location`` is sent to :func:`_parsers._check_keyserver` when + option are parsed in :meth:`gnupg.GPG._make_options`. + + :param str location: A string containing the default keyserver. This + should contain the desired keyserver protocol + which is supported by the keyserver, for example, + ``'hkps://keys.mayfirst.org'``. The default + keyserver is ``'hkp://wwwkeys.pgp.net'``. + """ + self._keyserver = location + + @keyserver.deleter +
[docs] def keyserver(self): + """Reset the keyserver to the default setting.""" + self._keyserver = 'hkp://wwwkeys.pgp.net' +
+
[docs] def _homedir_getter(self): + """Get the directory currently being used as GnuPG's homedir. + + If unspecified, use :file:`~/.config/python-gnupg/` + + :rtype: str + :returns: The absolute path to the current GnuPG homedir. + """ + return self._homedir +
+
[docs] def _homedir_setter(self, directory): + """Set the directory to use as GnuPG's homedir. + + If unspecified, use $HOME/.config/python-gnupg. If specified, ensure + that the ``directory`` does not contain various shell escape + characters. If ``directory`` is not found, it will be automatically + created. Lastly, the ``direcory`` will be checked that the EUID has + read and write permissions for it. + + :param str directory: A relative or absolute path to the directory to + use for storing/accessing GnuPG's files, including + keyrings and the trustdb. + :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable + directory to use. + """ + if not directory: + log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'" + % _util._conf) + directory = _util._conf + + hd = _parsers._fix_unsafe(directory) + log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd) + + if hd: + log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd) + _util._create_if_necessary(hd) + + try: + log.debug("GPGBase._homedir_setter(): checking permissions") + assert _util._has_readwrite(hd), \ + "Homedir '%s' needs read/write permissions" % hd + except AssertionError as ae: + msg = ("Unable to set '%s' as GnuPG homedir" % directory) + log.debug("GPGBase.homedir.setter(): %s" % msg) + log.debug(str(ae)) + raise RuntimeError(str(ae)) + else: + log.info("Setting homedir to '%s'" % hd) + self._homedir = hd +
+ homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter) + +
[docs] def _generated_keys_getter(self): + """Get the ``homedir`` subdirectory for storing generated keys. + + :rtype: str + :returns: The absolute path to the current GnuPG homedir. + """ + return self.__generated_keys +
+
[docs] def _generated_keys_setter(self, directory): + """Set the directory for storing generated keys. + + If unspecified, use + :meth:`~gnupg._meta.GPGBase.homedir`/generated-keys. If specified, + ensure that the ``directory`` does not contain various shell escape + characters. If ``directory`` isn't found, it will be automatically + created. Lastly, the ``directory`` will be checked to ensure that the + current EUID has read and write permissions for it. + + :param str directory: A relative or absolute path to the directory to + use for storing/accessing GnuPG's files, including keyrings and + the trustdb. + :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable + directory to use. + """ + if not directory: + directory = os.path.join(self.homedir, 'generated-keys') + log.debug("GPGBase._generated_keys_setter(): Using '%s'" + % directory) + + hd = _parsers._fix_unsafe(directory) + log.debug("GPGBase._generated_keys_setter(): got directory '%s'" % hd) + + if hd: + log.debug("GPGBase._generated_keys_setter(): Check exists '%s'" + % hd) + _util._create_if_necessary(hd) + + try: + log.debug("GPGBase._generated_keys_setter(): check permissions") + assert _util._has_readwrite(hd), \ + "Keys dir '%s' needs read/write permissions" % hd + except AssertionError as ae: + msg = ("Unable to set '%s' as generated keys dir" % directory) + log.debug("GPGBase._generated_keys_setter(): %s" % msg) + log.debug(str(ae)) + raise RuntimeError(str(ae)) + else: + log.info("Setting homedir to '%s'" % hd) + self.__generated_keys = hd +
+ _generated_keys = _util.InheritableProperty(_generated_keys_getter, + _generated_keys_setter) + +
[docs] def _make_args(self, args, passphrase=False): + """Make a list of command line elements for GPG. + + The value of ``args`` will be appended only if it passes the checks in + :func:`gnupg._parsers._sanitise`. The ``passphrase`` argument needs to + be True if a passphrase will be sent to GnuPG, else False. + + :param list args: A list of strings of options and flags to pass to + ``GPG.binary``. This is input safe, meaning that + these values go through strict checks (see + ``parsers._sanitise_list``) before being passed to to + the input file descriptor for the GnuPG process. + Each string should be given exactly as it would be on + the commandline interface to GnuPG, + e.g. ["--cipher-algo AES256", "--default-key + A3ADB67A2CDB8B35"]. + + :param bool passphrase: If True, the passphrase will be sent to the + stdin file descriptor for the attached GnuPG + process. + """ + ## see TODO file, tag :io:makeargs: + cmd = [self.binary, + '--no-options --no-emit-version --no-tty --status-fd 2'] + + if self.homedir: cmd.append('--homedir "%s"' % self.homedir) + + if self.keyring: + cmd.append('--no-default-keyring --keyring %s' % self.keyring) + if self.secring: + cmd.append('--secret-keyring %s' % self.secring) + + if passphrase: cmd.append('--batch --passphrase-fd 0') + + if self.use_agent: cmd.append('--use-agent') + else: cmd.append('--no-use-agent') + + if self.options: + [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] + if args: + [cmd.append(arg) for arg in iter(_sanitise_list(args))] + + if self.verbose: + cmd.append('--debug-all') + if ((isinstance(self.verbose, str) and + self.verbose in ['basic', 'advanced', 'expert', 'guru']) + or (isinstance(self.verbose, int) and (1<=self.verbose<=9))): + cmd.append('--debug-level %s' % self.verbose) + + return cmd +
+
[docs] def _open_subprocess(self, args=None, passphrase=False): + """Open a pipe to a GPG subprocess and return the file objects for + communicating with it. + + :param list args: A list of strings of options and flags to pass to + ``GPG.binary``. This is input safe, meaning that + these values go through strict checks (see + ``parsers._sanitise_list``) before being passed to to + the input file descriptor for the GnuPG process. + Each string should be given exactly as it would be on + the commandline interface to GnuPG, + e.g. ["--cipher-algo AES256", "--default-key + A3ADB67A2CDB8B35"]. + + :param bool passphrase: If True, the passphrase will be sent to the + stdin file descriptor for the attached GnuPG + process. + """ + ## see http://docs.python.org/2/library/subprocess.html#converting-an\ + ## -argument-sequence-to-a-string-on-windows + cmd = shlex.split(' '.join(self._make_args(args, passphrase))) + log.debug("Sending command to GnuPG process:%s%s" % (os.linesep, cmd)) + + if platform.system() == "Windows": + # TODO figure out what the hell is going on there. + expand_shell = True + else: + expand_shell = False + + return subprocess.Popen(cmd, shell=expand_shell, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env={'LANGUAGE': 'en'}) +
+
[docs] def _read_response(self, stream, result): + """Reads all the stderr output from GPG, taking notice only of lines + that begin with the magic [GNUPG:] prefix. + + Calls methods on the response object for each valid token found, with + the arg being the remainder of the status line. + + :param stream: A byte-stream, file handle, or a + :data:`subprocess.PIPE` for parsing the status codes + from the GnuPG process. + + :param result: The result parser class from :mod:`~gnupg._parsers` ― + the ``handle_status()`` method of that class will be + called in order to parse the output of ``stream``. + """ + lines = [] + while True: + line = stream.readline() + if len(line) == 0: + break + lines.append(line) + line = line.rstrip() + + if line.startswith('[GNUPG:]'): + line = _util._deprefix(line, '[GNUPG:] ', log.status) + keyword, value = _util._separate_keyword(line) + result._handle_status(keyword, value) + elif line.startswith('gpg:'): + line = _util._deprefix(line, 'gpg: ') + keyword, value = _util._separate_keyword(line) + + # Log gpg's userland messages at our own levels: + if keyword.upper().startswith("WARNING"): + log.warn("%s" % value) + elif keyword.upper().startswith("FATAL"): + log.critical("%s" % value) + # Handle the gpg2 error where a missing trustdb.gpg is, + # for some stupid reason, considered fatal: + if value.find("trustdb.gpg") and value.find("No such file"): + result._handle_status('NEED_TRUSTDB', '') + else: + if self.verbose: + log.info("%s" % line) + else: + log.debug("%s" % line) + result.stderr = ''.join(lines) +
+
[docs] def _read_data(self, stream, result): + """Incrementally read from ``stream`` and store read data. + + All data gathered from calling ``stream.read()`` will be concatenated + and stored as ``result.data``. + + :param stream: An open file-like object to read() from. + :param result: An instance of one of the :ref:`result parsing classes + <parsers>` from :const:`~gnupg._meta.GPGBase._result_map`. + """ + chunks = [] + log.debug("Reading data from stream %r..." % stream.__repr__()) + + while True: + data = stream.read(1024) + if len(data) == 0: + break + chunks.append(data) + log.debug("Read %4d bytes" % len(data)) + + # Join using b'' or '', as appropriate + result.data = type(data)().join(chunks) + log.debug("Finishing reading from stream %r..." % stream.__repr__()) + log.debug("Read %4d bytes total" % len(result.data)) +
+
[docs] def _collect_output(self, process, result, writer=None, stdin=None): + """Drain the subprocesses output streams, writing the collected output + to the result. If a writer thread (writing to the subprocess) is given, + make sure it's joined before returning. If a stdin stream is given, + close it before returning. + """ + stderr = codecs.getreader(self._encoding)(process.stderr) + rr = threading.Thread(target=self._read_response, + args=(stderr, result)) + rr.setDaemon(True) + log.debug('stderr reader: %r', rr) + rr.start() + + stdout = process.stdout + dr = threading.Thread(target=self._read_data, args=(stdout, result)) + dr.setDaemon(True) + log.debug('stdout reader: %r', dr) + dr.start() + + dr.join() + rr.join() + if writer is not None: + writer.join() + process.wait() + if stdin is not None: + try: + stdin.close() + except IOError: + pass + stderr.close() + stdout.close() +
+
[docs] def _handle_io(self, args, file, result, passphrase=False, binary=False): + """Handle a call to GPG - pass input data, collect output data.""" + p = self._open_subprocess(args, passphrase) + if not binary: + stdin = codecs.getwriter(self._encoding)(p.stdin) + else: + stdin = p.stdin + if passphrase: + _util._write_passphrase(stdin, passphrase, self._encoding) + writer = _util._threaded_copy_data(file, stdin) + self._collect_output(p, result, writer, stdin) + return result +
+
[docs] def _recv_keys(self, keyids, keyserver=None): + """Import keys from a keyserver. + + :param str keyids: A space-delimited string containing the keyids to + request. + :param str keyserver: The keyserver to request the ``keyids`` from; + defaults to `gnupg.GPG.keyserver`. + """ + if not keyserver: + keyserver = self.keyserver + + args = ['--keyserver {0}'.format(keyserver), + '--recv-keys {0}'.format(keyids)] + log.info('Requesting keys from %s: %s' % (keyserver, keyids)) + + result = self._result_map['import'](self) + proc = self._open_subprocess(args) + self._collect_output(proc, result) + log.debug('recv_keys result: %r', result.__dict__) + return result +
+
[docs] def _sign_file(self, file, default_key=None, passphrase=None, + clearsign=True, detach=False, binary=False, + digest_algo='SHA512'): + """Create a signature for a file. + + :param file: The file stream (i.e. it's already been open()'d) to sign. + :param str default_key: The key to sign with. + :param str passphrase: The passphrase to pipe to stdin. + :param bool clearsign: If True, create a cleartext signature. + :param bool detach: If True, create a detached signature. + :param bool binary: If True, do not ascii armour the output. + :param str digest_algo: The hash digest to use. Again, to see which + hashes your GnuPG is capable of using, do: + ``$ gpg --with-colons --list-config + digestname``. The default, if unspecified, is + ``'SHA512'``. + """ + log.debug("_sign_file():") + if binary: + log.info("Creating binary signature for file %s" % file) + args = ['--sign'] + else: + log.info("Creating ascii-armoured signature for file %s" % file) + args = ['--sign --armor'] + + if clearsign: + args.append("--clearsign") + if detach: + log.warn("Cannot use both --clearsign and --detach-sign.") + log.warn("Using default GPG behaviour: --clearsign only.") + elif detach and not clearsign: + args.append("--detach-sign") + + if default_key: + args.append(str("--default-key %s" % default_key)) + + args.append(str("--digest-algo %s" % digest_algo)) + + ## We could use _handle_io here except for the fact that if the + ## passphrase is bad, gpg bails and you can't write the message. + result = self._result_map['sign'](self) + proc = self._open_subprocess(args, passphrase is not None) + try: + if passphrase: + _util._write_passphrase(proc.stdin, passphrase, self._encoding) + writer = _util._threaded_copy_data(file, proc.stdin) + except IOError as ioe: + log.exception("Error writing message: %s" % str(ioe)) + writer = None + self._collect_output(proc, result, writer, proc.stdin) + return result +
+
[docs] def _encrypt(self, data, recipients, + default_key=None, + passphrase=None, + armor=True, + encrypt=True, + symmetric=False, + always_trust=True, + output=None, + cipher_algo='AES256', + digest_algo='SHA512', + compress_algo='ZLIB'): + """Encrypt the message read from the file-like object **data**. + + :param str data: The file or bytestream to encrypt. + + :param str recipients: The recipients to encrypt to. Recipients must + be specified keyID/fingerprint. + + .. warning:: Care should be taken in Python2 to make sure that the + given fingerprints for **recipients** are in fact strings + and not unicode objects. + + :param str default_key: The keyID/fingerprint of the key to use for + signing. If given, **data** will be encrypted + *and* signed. + + :param str passphrase: If given, and **default_key** is also given, + use this passphrase to unlock the secret + portion of the **default_key** to sign the + encrypted **data**. Otherwise, if + **default_key** is not given, but **symmetric** + is ``True``, then use this passphrase as the + passphrase for symmetric encryption. Signing + and symmetric encryption should *not* be + combined when sending the **data** to other + recipients, else the passphrase to the secret + key would be shared with them. + + :param bool armor: If True, ascii armor the output; otherwise, the + output will be in binary format. (Default: True) + + :param bool encrypt: If True, encrypt the **data** using the + **recipients** public keys. (Default: True) + + :param bool symmetric: If True, encrypt the **data** to **recipients** + using a symmetric key. See the **passphrase** + parameter. Symmetric encryption and public key + encryption can be used simultaneously, and will + result in a ciphertext which is decryptable + with either the symmetric **passphrase** or one + of the corresponding private keys. + + :param bool always_trust: If True, ignore trust warnings on + **recipients** keys. If False, display trust + warnings. (default: True) + + :param str output: The output file to write to. If not specified, the + encrypted output is returned, and thus should be + stored as an object in Python. For example: + + + >>> import shutil + >>> import gnupg + >>> if os.path.exists("doctests"): + ... shutil.rmtree("doctests") + >>> gpg = gnupg.GPG(homedir="doctests") + >>> key_settings = gpg.gen_key_input(key_type='RSA', + ... key_length=1024, + ... key_usage='ESCA', + ... passphrase='foo') + >>> key = gpg.gen_key(key_settings) + >>> message = "The crow flies at midnight." + >>> encrypted = str(gpg.encrypt(message, key.printprint)) + >>> assert encrypted != message + >>> assert not encrypted.isspace() + >>> decrypted = str(gpg.decrypt(encrypted)) + >>> assert not decrypted.isspace() + >>> decrypted + 'The crow flies at midnight.' + + :param str cipher_algo: The cipher algorithm to use. To see available + algorithms with your version of GnuPG, do: + :command:`$ gpg --with-colons --list-config + ciphername`. The default **cipher_algo**, if + unspecified, is ``'AES256'``. + + :param str digest_algo: The hash digest to use. Again, to see which + hashes your GnuPG is capable of using, do: + :command:`$ gpg --with-colons --list-config + digestname`. The default, if unspecified, is + ``'SHA512'``. + + :param str compress_algo: The compression algorithm to use. Can be one + of ``'ZLIB'``, ``'BZIP2'``, ``'ZIP'``, or + ``'Uncompressed'``. + """ + args = [] + + if output: + if getattr(output, 'fileno', None) is not None: + ## avoid overwrite confirmation message + if getattr(output, 'name', None) is None: + if os.path.exists(output): + os.remove(output) + args.append('--output %s' % output) + else: + if os.path.exists(output.name): + os.remove(output.name) + args.append('--output %s' % output.name) + + if armor: args.append('--armor') + if always_trust: args.append('--always-trust') + if cipher_algo: args.append('--cipher-algo %s' % cipher_algo) + if compress_algo: args.append('--compress-algo %s' % compress_algo) + + if default_key: + args.append('--sign') + args.append('--default-key %s' % default_key) + if digest_algo: + args.append('--digest-algo %s' % digest_algo) + + ## both can be used at the same time for an encrypted file which + ## is decryptable with a passphrase or secretkey. + if symmetric: args.append('--symmetric') + if encrypt: args.append('--encrypt') + + if len(recipients) >= 1: + log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" + % (recipients, type(recipients))) + + if isinstance(recipients, (list, tuple)): + for recp in recipients: + if not _util._py3k: + if isinstance(recp, unicode): + try: + assert _parsers._is_hex(str(recp)) + except AssertionError: + log.info("Can't accept recipient string: %s" + % recp) + else: + args.append('--recipient %s' % str(recp)) + continue + ## will give unicode in 2.x as '\uXXXX\uXXXX' + args.append('--recipient %r' % recp) + continue + if isinstance(recp, str): + args.append('--recipient %s' % recp) + + elif (not _util._py3k) and isinstance(recp, basestring): + for recp in recipients.split('\x20'): + args.append('--recipient %s' % recp) + + elif _util._py3k and isinstance(recp, str): + for recp in recipients.split(' '): + args.append('--recipient %s' % recp) + ## ...and now that we've proven py3k is better... + + else: + log.debug("Don't know what to do with recipients: '%s'" + % recipients) + + result = self._result_map['crypt'](self) + log.debug("Got data '%s' with type '%s'." + % (data, type(data))) + self._handle_io(args, data, result, + passphrase=passphrase, binary=True) + log.debug("\n%s" % result.data) + return result
+
+ +
+
+
+
+ +
+
+
+ + + + + \ No newline at end of file diff --git a/docs/_build/html/_modules/gnupg/_parsers.html b/docs/_build/html/_modules/gnupg/_parsers.html new file mode 100644 index 0000000..a203f11 --- /dev/null +++ b/docs/_build/html/_modules/gnupg/_parsers.html @@ -0,0 +1,1486 @@ + + + + + + + + gnupg._parsers — gnupg unknown documentation + + + + + + + + + + + + +
+ +
+ +
+
+
+ +
+
+
+ +

Source code for gnupg._parsers

+# -*- coding: utf-8 -*-
+#
+# This file is part of python-gnupg, a Python interface to GnuPG.
+# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
+#           © 2013 Andrej B.
+#           © 2013 LEAP Encryption Access Project
+#           © 2008-2012 Vinay Sajip
+#           © 2005 Steve Traugott
+#           © 2004 A.M. Kuchling
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
+
+'''Classes for parsing GnuPG status messages and sanitising commandline
+options.
+'''
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+try:
+    from collections import OrderedDict
+except ImportError:
+    from ordereddict import OrderedDict
+
+import re
+
+from .      import _util
+from ._util import log
+
+
+ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
+HEXIDECIMAL    = re.compile('([0-9A-Fa-f]{2})+')
+
+
+
[docs]class ProtectedOption(Exception): + """Raised when the option passed to GPG is disallowed.""" +
+
[docs]class UsageError(Exception): + """Raised when incorrect usage of the API occurs..""" + +
+
[docs]def _check_keyserver(location): + """Check that a given keyserver is a known protocol and does not contain + shell escape characters. + + :param str location: A string containing the default keyserver. This + should contain the desired keyserver protocol which + is supported by the keyserver, for example, the + default is ``'hkp://wwwkeys .pgp.net'``. + :rtype: :obj:`str` or :obj:`None` + :returns: A string specifying the protocol and keyserver hostname, if the + checks passed. If not, returns None. + """ + protocols = ['hkp://', 'hkps://', 'http://', 'https://', 'ldap://', + 'mailto:'] ## xxx feels like i´m forgetting one... + for proto in protocols: + if location.startswith(proto): + url = location.replace(proto, str()) + host, slash, extra = url.partition('/') + if extra: log.warn("URI text for %s: '%s'" % (host, extra)) + log.debug("Got host string for keyserver setting: '%s'" % host) + + host = _fix_unsafe(host) + if host: + log.debug("Cleaned host string: '%s'" % host) + keyserver = proto + host + return keyserver + return None +
+
[docs]def _check_preferences(prefs, pref_type=None): + """Check cipher, digest, and compression preference settings. + + MD5 is not allowed. This is `not 1994`__. SHA1 is allowed_ grudgingly_. + + __ http://www.cs.colorado.edu/~jrblack/papers/md5e-full.pdf + .. _allowed: http://eprint.iacr.org/2008/469.pdf + .. _grudgingly: https://www.schneier.com/blog/archives/2012/10/when_will_we_se.html + """ + if prefs is None: return + + cipher = frozenset(['AES256', 'AES192', 'AES128', + 'CAMELLIA256', 'CAMELLIA192', + 'TWOFISH', '3DES']) + digest = frozenset(['SHA512', 'SHA384', 'SHA256', 'SHA224', 'RMD160', + 'SHA1']) + compress = frozenset(['BZIP2', 'ZLIB', 'ZIP', 'Uncompressed']) + all = frozenset([cipher, digest, compress]) + + if isinstance(prefs, str): + prefs = set(prefs.split()) + elif isinstance(prefs, list): + prefs = set(prefs) + else: + msg = "prefs must be list of strings, or space-separated string" + log.error("parsers._check_preferences(): %s" % message) + raise TypeError(message) + + if not pref_type: + pref_type = 'all' + + allowed = str() + + if pref_type == 'cipher': + allowed += ' '.join(prefs.intersection(cipher)) + if pref_type == 'digest': + allowed += ' '.join(prefs.intersection(digest)) + if pref_type == 'compress': + allowed += ' '.join(prefs.intersection(compress)) + if pref_type == 'all': + allowed += ' '.join(prefs.intersection(all)) + + return allowed +
+
[docs]def _fix_unsafe(shell_input): + """Find characters used to escape from a string into a shell, and wrap them in + quotes if they exist. Regex pilfered from Python3 :mod:`shlex` module. + + :param str shell_input: The input intended for the GnuPG process. + """ + _unsafe = re.compile(r'[^\w@%+=:,./-]', 256) + try: + if len(_unsafe.findall(shell_input)) == 0: + return shell_input.strip() + else: + clean = "'" + shell_input.replace("'", "'\"'\"'") + "'" + return clean + except TypeError: + return None +
+
[docs]def _hyphenate(input, add_prefix=False): + """Change underscores to hyphens so that object attributes can be easily + tranlated to GPG option names. + + :param str input: The attribute to hyphenate. + :param bool add_prefix: If True, add leading hyphens to the input. + :rtype: str + :return: The ``input`` with underscores changed to hyphens. + """ + ret = '--' if add_prefix else '' + ret += input.replace('_', '-') + return ret +
+
[docs]def _is_allowed(input): + """Check that an option or argument given to GPG is in the set of allowed + options, the latter being a strict subset of the set of all options known + to GPG. + + :param str input: An input meant to be parsed as an option or flag to the + GnuPG process. Should be formatted the same as an option + or flag to the commandline gpg, i.e. "--encrypt-files". + + :ivar frozenset gnupg_options: All known GPG options and flags. + + :ivar frozenset allowed: All allowed GPG options and flags, e.g. all GPG + options and flags which we are willing to + acknowledge and parse. If we want to support a + new option, it will need to have its own parsing + class and its name will need to be added to this + set. + + :raises: :exc:`UsageError` if **input** is not a subset of the hard-coded + set of all GnuPG options in :func:`_get_all_gnupg_options`. + + :exc:`ProtectedOption` if **input** is not in the set of allowed + options. + + :rtype: str + :return: The original **input** parameter, unmodified and unsanitized, if + no errors occur. + """ + gnupg_options = _get_all_gnupg_options() + allowed = _get_options_group("allowed") + + ## these are the allowed options we will handle so far, all others should + ## be dropped. this dance is so that when new options are added later, we + ## merely add the to the _allowed list, and the `` _allowed.issubset`` + ## assertion will check that GPG will recognise them + try: + ## check that allowed is a subset of all gnupg_options + assert allowed.issubset(gnupg_options) + except AssertionError: + raise UsageError("'allowed' isn't a subset of known options, diff: %s" + % allowed.difference(gnupg_options)) + + ## if we got a list of args, join them + ## + ## see TODO file, tag :cleanup: + if not isinstance(input, str): + input = ' '.join([x for x in input]) + + if isinstance(input, str): + if input.find('_') > 0: + if not input.startswith('--'): + hyphenated = _hyphenate(input, add_prefix=True) + else: + hyphenated = _hyphenate(input) + else: + hyphenated = input + ## xxx we probably want to use itertools.dropwhile here + try: + assert hyphenated in allowed + except AssertionError as ae: + dropped = _fix_unsafe(hyphenated) + log.warn("_is_allowed(): Dropping option '%s'..." % dropped) + raise ProtectedOption("Option '%s' not supported." % dropped) + else: + return input + return None +
+
[docs]def _is_hex(string): + """Check that a string is hexidecimal, with alphabetic characters + capitalized and without whitespace. + + :param str string: The string to check. + """ + matched = HEXIDECIMAL.match(string) + if matched is not None and len(matched.group()) >= 2: + return True + return False +
+
[docs]def _is_string(thing): + """Python character arrays are a mess. + + If Python2, check if **thing** is an :obj:`unicode` or a :obj:`str`. + If Python3, check if **thing** is a :obj:`str`. + + :param thing: The thing to check. + :returns: ``True`` if **thing** is a string according to whichever version + of Python we're running in. + """ + if _util._py3k: return isinstance(thing, str) + else: return isinstance(thing, basestring) +
+
[docs]def _sanitise(*args): + """Take an arg or the key portion of a kwarg and check that it is in the + set of allowed GPG options and flags, and that it has the correct + type. Then, attempt to escape any unsafe characters. If an option is not + allowed, drop it with a logged warning. Returns a dictionary of all + sanitised, allowed options. + + Each new option that we support that is not a boolean, but instead has + some additional inputs following it, i.e. "--encrypt-file foo.txt", will + need some basic safety checks added here. + + GnuPG has three-hundred and eighteen commandline flags. Also, not all + implementations of OpenPGP parse PGP packets and headers in the same way, + so there is added potential there for messing with calls to GPG. + + For information on the PGP message format specification, see + :rfc:`1991`. + + If you're asking, "Is this *really* necessary?": No, not really -- we could + just follow the security precautions recommended by `this xkcd`__. + + __ https://xkcd.com/1181/ + + :param str args: (optional) The boolean arguments which will be passed to + the GnuPG process. + :rtype: str + :returns: ``sanitised`` + """ + + ## see TODO file, tag :cleanup:sanitise: + + def _check_option(arg, value): + """Check that a single ``arg`` is an allowed option. + + If it is allowed, quote out any escape characters in ``value``, and + add the pair to :ivar:`sanitised`. Otherwise, drop them. + + :param str arg: The arguments which will be passed to the GnuPG + process, and, optionally their corresponding values. + The values are any additional arguments following the + GnuPG option or flag. For example, if we wanted to + pass ``"--encrypt --recipient isis@leap.se"`` to + GnuPG, then ``"--encrypt"`` would be an arg without a + value, and ``"--recipient"`` would also be an arg, + with a value of ``"isis@leap.se"``. + + :ivar list checked: The sanitised, allowed options and values. + :rtype: str + :returns: A string of the items in ``checked``, delimited by spaces. + """ + checked = str() + none_options = _get_options_group("none_options") + hex_options = _get_options_group("hex_options") + hex_or_none_options = _get_options_group("hex_or_none_options") + + if not _util._py3k: + if not isinstance(arg, list) and isinstance(arg, unicode): + arg = str(arg) + + try: + flag = _is_allowed(arg) + assert flag is not None, "_check_option(): got None for flag" + except (AssertionError, ProtectedOption) as error: + log.warn("_check_option(): %s" % str(error)) + else: + checked += (flag + ' ') + + if _is_string(value): + values = value.split(' ') + for v in values: + ## these can be handled separately, without _fix_unsafe(), + ## because they are only allowed if they pass the regex + if (flag in none_options) and (v is None): + continue + + if flag in hex_options: + if _is_hex(v): checked += (v + " ") + else: + log.debug("'%s %s' not hex." % (flag, v)) + if (flag in hex_or_none_options) and (v is None): + log.debug("Allowing '%s' for all keys" % flag) + continue + + elif flag in ['--keyserver']: + host = _check_keyserver(v) + if host: + log.debug("Setting keyserver: %s" % host) + checked += (v + " ") + else: log.debug("Dropping keyserver: %s" % v) + continue + + ## the rest are strings, filenames, etc, and should be + ## shell escaped: + val = _fix_unsafe(v) + try: + assert not val is None + assert not val.isspace() + assert not v is None + assert not v.isspace() + except: + log.debug("Dropping %s %s" % (flag, v)) + continue + + if flag in ['--encrypt', '--encrypt-files', '--decrypt', + '--decrypt-files', '--import', '--verify']: + if ( (_util._is_file(val)) + or + ((flag == '--verify') and (val == '-')) ): + checked += (val + " ") + else: + log.debug("%s not file: %s" % (flag, val)) + + elif flag in ['--cipher-algo', '--personal-cipher-prefs', + '--personal-cipher-preferences']: + legit_algos = _check_preferences(val, 'cipher') + if legit_algos: checked += (legit_algos + " ") + else: log.debug("'%s' is not cipher" % val) + + elif flag in ['--compress-algo', '--compression-algo', + '--personal-compress-prefs', + '--personal-compress-preferences']: + legit_algos = _check_preferences(val, 'compress') + if legit_algos: checked += (legit_algos + " ") + else: log.debug("'%s' not compress algo" % val) + + else: + checked += (val + " ") + log.debug("_check_option(): No checks for %s" % val) + + return checked + + is_flag = lambda x: x.startswith('--') + + def _make_filo(args_string): + filo = arg.split(' ') + filo.reverse() + log.debug("_make_filo(): Converted to reverse list: %s" % filo) + return filo + + def _make_groups(filo): + groups = {} + while len(filo) >= 1: + last = filo.pop() + if is_flag(last): + log.debug("Got arg: %s" % last) + if last == '--verify': + groups[last] = str(filo.pop()) + ## accept the read-from-stdin arg: + if len(filo) >= 1 and filo[len(filo)-1] == '-': + groups[last] += str(' - ') ## gross hack + filo.pop() + else: + groups[last] = str() + while len(filo) > 1 and not is_flag(filo[len(filo)-1]): + log.debug("Got value: %s" % filo[len(filo)-1]) + groups[last] += (filo.pop() + " ") + else: + if len(filo) == 1 and not is_flag(filo[0]): + log.debug("Got value: %s" % filo[0]) + groups[last] += filo.pop() + else: + log.warn("_make_groups(): Got solitary value: %s" % last) + groups["xxx"] = last + return groups + + def _check_groups(groups): + log.debug("Got groups: %s" % groups) + checked_groups = [] + for a,v in groups.items(): + v = None if len(v) == 0 else v + safe = _check_option(a, v) + if safe is not None and not safe.strip() == "": + log.debug("Appending option: %s" % safe) + checked_groups.append(safe) + else: + log.warn("Dropped option: '%s %s'" % (a,v)) + return checked_groups + + if args is not None: + option_groups = {} + for arg in args: + ## if we're given a string with a bunch of options in it split + ## them up and deal with them separately + if (not _util._py3k and isinstance(arg, basestring)) \ + or (_util._py3k and isinstance(arg, str)): + log.debug("Got arg string: %s" % arg) + if arg.find(' ') > 0: + filo = _make_filo(arg) + option_groups.update(_make_groups(filo)) + else: + option_groups.update({ arg: "" }) + elif isinstance(arg, list): + log.debug("Got arg list: %s" % arg) + arg.reverse() + option_groups.update(_make_groups(arg)) + else: + log.warn("Got non-str/list arg: '%s', type '%s'" + % (arg, type(arg))) + checked = _check_groups(option_groups) + sanitised = ' '.join(x for x in checked) + return sanitised + else: + log.debug("Got None for args") +
+
[docs]def _sanitise_list(arg_list): + """A generator for iterating through a list of gpg options and sanitising + them. + + :param list arg_list: A list of options and flags for GnuPG. + :rtype: generator + :returns: A generator whose next() method returns each of the items in + ``arg_list`` after calling ``_sanitise()`` with that item as a + parameter. + """ + if isinstance(arg_list, list): + for arg in arg_list: + safe_arg = _sanitise(arg) + if safe_arg != "": + yield safe_arg +
+
[docs]def _get_options_group(group=None): + """Get a specific group of options which are allowed.""" + + #: These expect a hexidecimal keyid as their argument, and can be parsed + #: with :func:`_is_hex`. + hex_options = frozenset(['--check-sigs', + '--default-key', + '--default-recipient', + '--delete-keys', + '--delete-secret-keys', + '--delete-secret-and-public-keys', + '--desig-revoke', + '--export', + '--export-secret-keys', + '--export-secret-subkeys', + '--fingerprint', + '--gen-revoke', + '--list-key', + '--list-keys', + '--list-public-keys', + '--list-secret-keys', + '--list-sigs', + '--recipient', + '--recv-keys', + '--send-keys', + ]) + #: These options expect value which are left unchecked, though still run + #: through :func:`_fix_unsafe`. + unchecked_options = frozenset(['--list-options', + '--passphrase-fd', + '--status-fd', + '--verify-options', + ]) + #: These have their own parsers and don't really fit into a group + other_options = frozenset(['--debug-level', + '--keyserver', + + ]) + #: These should have a directory for an argument + dir_options = frozenset(['--homedir', + ]) + #: These expect a keyring or keyfile as their argument + keyring_options = frozenset(['--keyring', + '--primary-keyring', + '--secret-keyring', + '--trustdb-name', + ]) + #: These expect a filename (or the contents of a file as a string) or None + #: (meaning that they read from stdin) + file_or_none_options = frozenset(['--decrypt', + '--decrypt-files', + '--encrypt', + '--encrypt-files', + '--import', + '--verify', + '--verify-files', + ]) + #: These options expect a string. see :func:`_check_preferences`. + pref_options = frozenset(['--digest-algo', + '--cipher-algo', + '--compress-algo', + '--compression-algo', + '--cert-digest-algo', + '--personal-digest-prefs', + '--personal-digest-preferences', + '--personal-cipher-prefs', + '--personal-cipher-preferences', + '--personal-compress-prefs', + '--personal-compress-preferences', + '--print-md', + ]) + #: These options expect no arguments + none_options = frozenset(['--always-trust', + '--armor', + '--armour', + '--batch', + '--check-sigs', + '--check-trustdb', + '--clearsign', + '--debug-all', + '--default-recipient-self', + '--detach-sign', + '--export', + '--export-ownertrust', + '--export-secret-keys', + '--export-secret-subkeys', + '--fingerprint', + '--fixed-list-mode', + '--gen-key', + '--import-ownertrust', + '--list-config', + '--list-key', + '--list-keys', + '--list-packets', + '--list-public-keys', + '--list-secret-keys', + '--list-sigs', + '--no-default-keyring', + '--no-default-recipient', + '--no-emit-version', + '--no-options', + '--no-tty', + '--no-use-agent', + '--no-verbose', + '--print-mds', + '--quiet', + '--sign', + '--symmetric', + '--use-agent', + '--verbose', + '--version', + '--with-colons', + '--yes', + ]) + #: These options expect either None or a hex string + hex_or_none_options = hex_options.intersection(none_options) + allowed = hex_options.union(unchecked_options, other_options, dir_options, + keyring_options, file_or_none_options, + pref_options, none_options) + + if group and group in locals().keys(): + return locals()[group] +
+
[docs]def _get_all_gnupg_options(): + """Get all GnuPG options and flags. + + This is hardcoded within a local scope to reduce the chance of a tampered + GnuPG binary reporting falsified option sets, i.e. because certain options + (namedly the ``--no-options`` option, which prevents the usage of gpg.conf + files) are necessary and statically specified in + :meth:`gnupg._meta.GPGBase._make_args`, if the inputs into Python are + already controlled, and we were to summon the GnuPG binary to ask it for + its options, it would be possible to receive a falsified options set + missing the ``--no-options`` option in response. This seems unlikely, and + the method is stupid and ugly, but at least we'll never have to debug + whether or not an option *actually* disappeared in a different GnuPG + version, or some funny business is happening. + + These are the options as of GnuPG 1.4.12; the current stable branch of the + 2.1.x tree contains a few more -- if you need them you'll have to add them + in here. + + :type gnupg_options: frozenset + :ivar gnupg_options: All known GPG options and flags. + :rtype: frozenset + :returns: ``gnupg_options`` + """ + three_hundred_eighteen = (""" +--allow-freeform-uid --multifile +--allow-multiple-messages --no +--allow-multisig-verification --no-allow-freeform-uid +--allow-non-selfsigned-uid --no-allow-multiple-messages +--allow-secret-key-import --no-allow-non-selfsigned-uid +--always-trust --no-armor +--armor --no-armour +--armour --no-ask-cert-expire +--ask-cert-expire --no-ask-cert-level +--ask-cert-level --no-ask-sig-expire +--ask-sig-expire --no-auto-check-trustdb +--attribute-fd --no-auto-key-locate +--attribute-file --no-auto-key-retrieve +--auto-check-trustdb --no-batch +--auto-key-locate --no-comments +--auto-key-retrieve --no-default-keyring +--batch --no-default-recipient +--bzip2-compress-level --no-disable-mdc +--bzip2-decompress-lowmem --no-emit-version +--card-edit --no-encrypt-to +--card-status --no-escape-from-lines +--cert-digest-algo --no-expensive-trust-checks +--cert-notation --no-expert +--cert-policy-url --no-force-mdc +--change-pin --no-force-v3-sigs +--charset --no-force-v4-certs +--check-sig --no-for-your-eyes-only +--check-sigs --no-greeting +--check-trustdb --no-groups +--cipher-algo --no-literal +--clearsign --no-mangle-dos-filenames +--command-fd --no-mdc-warning +--command-file --no-options +--comment --no-permission-warning +--completes-needed --no-pgp2 +--compress-algo --no-pgp6 +--compression-algo --no-pgp7 +--compress-keys --no-pgp8 +--compress-level --no-random-seed-file +--compress-sigs --no-require-backsigs +--ctapi-driver --no-require-cross-certification +--dearmor --no-require-secmem +--dearmour --no-rfc2440-text +--debug --no-secmem-warning +--debug-all --no-show-notation +--debug-ccid-driver --no-show-photos +--debug-level --no-show-policy-url +--decrypt --no-sig-cache +--decrypt-files --no-sig-create-check +--default-cert-check-level --no-sk-comments +--default-cert-expire --no-strict +--default-cert-level --notation-data +--default-comment --not-dash-escaped +--default-key --no-textmode +--default-keyserver-url --no-throw-keyid +--default-preference-list --no-throw-keyids +--default-recipient --no-tty +--default-recipient-self --no-use-agent +--default-sig-expire --no-use-embedded-filename +--delete-keys --no-utf8-strings +--delete-secret-and-public-keys --no-verbose +--delete-secret-keys --no-version +--desig-revoke --openpgp +--detach-sign --options +--digest-algo --output +--disable-ccid --override-session-key +--disable-cipher-algo --passphrase +--disable-dsa2 --passphrase-fd +--disable-mdc --passphrase-file +--disable-pubkey-algo --passphrase-repeat +--display --pcsc-driver +--display-charset --personal-cipher-preferences +--dry-run --personal-cipher-prefs +--dump-options --personal-compress-preferences +--edit-key --personal-compress-prefs +--emit-version --personal-digest-preferences +--enable-dsa2 --personal-digest-prefs +--enable-progress-filter --pgp2 +--enable-special-filenames --pgp6 +--enarmor --pgp7 +--enarmour --pgp8 +--encrypt --photo-viewer +--encrypt-files --pipemode +--encrypt-to --preserve-permissions +--escape-from-lines --primary-keyring +--exec-path --print-md +--exit-on-status-write-error --print-mds +--expert --quick-random +--export --quiet +--export-options --reader-port +--export-ownertrust --rebuild-keydb-caches +--export-secret-keys --recipient +--export-secret-subkeys --recv-keys +--fast-import --refresh-keys +--fast-list-mode --remote-user +--fetch-keys --require-backsigs +--fingerprint --require-cross-certification +--fixed-list-mode --require-secmem +--fix-trustdb --rfc1991 +--force-mdc --rfc2440 +--force-ownertrust --rfc2440-text +--force-v3-sigs --rfc4880 +--force-v4-certs --run-as-shm-coprocess +--for-your-eyes-only --s2k-cipher-algo +--gen-key --s2k-count +--gen-prime --s2k-digest-algo +--gen-random --s2k-mode +--gen-revoke --search-keys +--gnupg --secret-keyring +--gpg-agent-info --send-keys +--gpgconf-list --set-filename +--gpgconf-test --set-filesize +--group --set-notation +--help --set-policy-url +--hidden-encrypt-to --show-keyring +--hidden-recipient --show-notation +--homedir --show-photos +--honor-http-proxy --show-policy-url +--ignore-crc-error --show-session-key +--ignore-mdc-error --sig-keyserver-url +--ignore-time-conflict --sign +--ignore-valid-from --sign-key +--import --sig-notation +--import-options --sign-with +--import-ownertrust --sig-policy-url +--interactive --simple-sk-checksum +--keyid-format --sk-comments +--keyring --skip-verify +--keyserver --status-fd +--keyserver-options --status-file +--lc-ctype --store +--lc-messages --strict +--limit-card-insert-tries --symmetric +--list-config --temp-directory +--list-key --textmode +--list-keys --throw-keyid +--list-only --throw-keyids +--list-options --trustdb-name +--list-ownertrust --trusted-key +--list-packets --trust-model +--list-public-keys --try-all-secrets +--list-secret-keys --ttyname +--list-sig --ttytype +--list-sigs --ungroup +--list-trustdb --update-trustdb +--load-extension --use-agent +--local-user --use-embedded-filename +--lock-multiple --user +--lock-never --utf8-strings +--lock-once --verbose +--logger-fd --verify +--logger-file --verify-files +--lsign-key --verify-options +--mangle-dos-filenames --version +--marginals-needed --warranty +--max-cert-depth --with-colons +--max-output --with-fingerprint +--merge-only --with-key-data +--min-cert-level --yes +""").split() + + # These are extra options which only exist for GnuPG>=2.0.0 + three_hundred_eighteen.append('--export-ownertrust') + three_hundred_eighteen.append('--import-ownertrust') + + gnupg_options = frozenset(three_hundred_eighteen) + return gnupg_options +
+
[docs]def nodata(status_code): + """Translate NODATA status codes from GnuPG to messages.""" + lookup = { + '1': 'No armored data.', + '2': 'Expected a packet but did not find one.', + '3': 'Invalid packet found, this may indicate a non OpenPGP message.', + '4': 'Signature expected but not found.' } + for key, value in lookup.items(): + if str(status_code) == key: + return value +
+
[docs]def progress(status_code): + """Translate PROGRESS status codes from GnuPG to messages.""" + lookup = { + 'pk_dsa': 'DSA key generation', + 'pk_elg': 'Elgamal key generation', + 'primegen': 'Prime generation', + 'need_entropy': 'Waiting for new entropy in the RNG', + 'tick': 'Generic tick without any special meaning - still working.', + 'starting_agent': 'A gpg-agent was started.', + 'learncard': 'gpg-agent or gpgsm is learning the smartcard data.', + 'card_busy': 'A smartcard is still working.' } + for key, value in lookup.items(): + if str(status_code) == key: + return value + +
+
[docs]class GenKey(object): + """Handle status messages for key generation. + + Calling the ``__str__()`` method of this class will return the generated + key's fingerprint, or a status string explaining the results. + """ + def __init__(self, gpg): + self._gpg = gpg + ## this should get changed to something more useful, like 'key_type' + #: 'P':= primary, 'S':= subkey, 'B':= both + self.type = None + self.fingerprint = None + self.status = None + self.subkey_created = False + self.primary_created = False + #: This will store the key's public keyring filename, if + #: :meth:`~gnupg.GPG.gen_key_input` was called with + #: ``separate_keyring=True``. + self.keyring = None + #: This will store the key's secret keyring filename, if : + #: :meth:`~gnupg.GPG.gen_key_input` was called with + #: ``separate_keyring=True``. + self.secring = None + + def __nonzero__(self): + if self.fingerprint: return True + return False + __bool__ = __nonzero__ + + def __str__(self): + if self.fingerprint: + return self.fingerprint + else: + if self.status is not None: + return self.status + else: + return False + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key in ("GOOD_PASSPHRASE"): + pass + elif key == "KEY_NOT_CREATED": + self.status = 'key not created' + elif key == "KEY_CREATED": + (self.type, self.fingerprint) = value.split() + self.status = 'key created' + elif key == "NODATA": + self.status = nodata(value) + elif key == "PROGRESS": + self.status = progress(value.split(' ', 1)[0]) + else: + raise ValueError("Unknown status message: %r" % key) + + if self.type in ('B', 'P'): + self.primary_created = True + if self.type in ('B', 'S'): + self.subkey_created = True +
+
[docs]class DeleteResult(object): + """Handle status messages for --delete-keys and --delete-secret-keys""" + def __init__(self, gpg): + self._gpg = gpg + self.status = 'ok' + + def __str__(self): + return self.status + + problem_reason = { '1': 'No such key', + '2': 'Must delete secret key first', + '3': 'Ambigious specification', } + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key == "DELETE_PROBLEM": + self.status = self.problem_reason.get(value, "Unknown error: %r" + % value) + else: + raise ValueError("Unknown status message: %r" % key) +
+
[docs]class Sign(object): + """Parse GnuPG status messages for signing operations. + + :param gpg: An instance of :class:`gnupg.GPG`. + """ + + #: The type of signature created. + sig_type = None + #: The algorithm used to create the signature. + sig_algo = None + #: The hash algorithm used to create the signature. + sig_hash_also = None + #: The fingerprint of the signing keyid. + fingerprint = None + #: The timestamp on the signature. + timestamp = None + #: xxx fill me in + what = None + + def __init__(self, gpg): + self._gpg = gpg + + def __nonzero__(self): + """Override the determination for truthfulness evaluation. + + :rtype: bool + :returns: True if we have a valid signature, False otherwise. + """ + return self.fingerprint is not None + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self._gpg._encoding, self._gpg._decode_errors) + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", + "INV_SGNR", "SIGEXPIRED"): + pass + elif key == "SIG_CREATED": + (self.sig_type, self.sig_algo, self.sig_hash_algo, + self.what, self.timestamp, self.fingerprint) = value.split() + elif key == "KEYEXPIRED": + self.status = "skipped signing key, key expired" + if (value is not None) and (len(value) > 0): + self.status += " on {}".format(str(value)) + elif key == "KEYREVOKED": + self.status = "skipped signing key, key revoked" + if (value is not None) and (len(value) > 0): + self.status += " on {}".format(str(value)) + elif key == "NODATA": + self.status = nodata(value) + else: + raise ValueError("Unknown status message: %r" % key) +
+
[docs]class ListKeys(list): + """Handle status messages for --list-keys. + + Handles pub and uid (relating the latter to the former). Don't care about + the following attributes/status messages (from doc/DETAILS): + + | crt = X.509 certificate + | crs = X.509 certificate and private key available + | ssb = secret subkey (secondary key) + | uat = user attribute (same as user id except for field 10). + | sig = signature + | rev = revocation signature + | pkd = public key data (special field format, see below) + | grp = reserved for gpgsm + | rvk = revocation key + """ + + def __init__(self, gpg): + super(ListKeys, self).__init__() + self._gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + +
[docs] def key(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + self.curkey = {} + for i in range(len(vars)): + self.curkey[vars[i]] = args[i] + self.curkey['uids'] = [] + if self.curkey['uid']: + self.curkey['uids'].append(self.curkey['uid']) + del self.curkey['uid'] + self.curkey['subkeys'] = [] + self.append(self.curkey) +
+ pub = sec = key + +
[docs] def fpr(self, args): + self.curkey['fingerprint'] = args[9] + self.fingerprints.append(args[9]) +
+
[docs] def uid(self, args): + uid = args[9] + uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) + self.curkey['uids'].append(uid) + self.uids.append(uid) +
+
[docs] def sub(self, args): + subkey = [args[4], args[11]] + self.curkey['subkeys'].append(subkey) +
+
[docs] def _handle_status(self, key, value): + pass + +
+
[docs]class ImportResult(object): + """Parse GnuPG status messages for key import operations. + + :type gpg: :class:`gnupg.GPG` + :param gpg: An instance of :class:`gnupg.GPG`. + """ + _ok_reason = {'0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + '17': 'Contains private key',} + + _problem_reason = { '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', } + + _fields = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups + not_imported'''.split() + _counts = OrderedDict( + zip(_fields, [int(0) for x in range(len(_fields))]) ) + + #: A list of strings containing the fingerprints of the GnuPG keyIDs + #: imported. + fingerprints = list() + + #: A list containing dictionaries with information gathered on keys + #: imported. + results = list() + + def __init__(self, gpg): + self._gpg = gpg + self.counts = self._counts + + def __nonzero__(self): + """Override the determination for truthfulness evaluation. + + :rtype: bool + :returns: True if we have immport some keys, False otherwise. + """ + if self.counts.not_imported > 0: return False + if len(self.fingerprints) == 0: return False + return True + __bool__ = __nonzero__ + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key == "IMPORTED": + # this duplicates info we already see in import_ok & import_problem + pass + elif key == "NODATA": + self.results.append({'fingerprint': None, + 'status': 'No valid data found'}) + elif key == "IMPORT_OK": + reason, fingerprint = value.split() + reasons = [] + for code, text in self._ok_reason.items(): + if int(reason) == int(code): + reasons.append(text) + reasontext = '\n'.join(reasons) + "\n" + self.results.append({'fingerprint': fingerprint, + 'status': reasontext}) + self.fingerprints.append(fingerprint) + elif key == "IMPORT_PROBLEM": + try: + reason, fingerprint = value.split() + except: + reason = value + fingerprint = '<unknown>' + self.results.append({'fingerprint': fingerprint, + 'status': self._problem_reason[reason]}) + elif key == "IMPORT_RES": + import_res = value.split() + for x in self.counts.keys(): + self.counts[x] = int(import_res.pop(0)) + elif key == "KEYEXPIRED": + res = {'fingerprint': None, + 'status': 'Key expired'} + self.results.append(res) + ## Accoring to docs/DETAILS L859, SIGEXPIRED is obsolete: + ## "Removed on 2011-02-04. This is deprecated in favor of KEYEXPIRED." + elif key == "SIGEXPIRED": + res = {'fingerprint': None, + 'status': 'Signature expired'} + self.results.append(res) + else: + raise ValueError("Unknown status message: %r" % key) +
+
[docs] def summary(self): + l = [] + l.append('%d imported' % self.counts['imported']) + if self.counts['not_imported']: + l.append('%d not imported' % self.counts['not_imported']) + return ', '.join(l) + +
+
[docs]class Verify(object): + """Parser for status messages from GnuPG for certifications and signature + verifications. + + People often mix these up, or think that they are the same thing. While it + is true that certifications and signatures *are* the same cryptographic + operation -- and also true that both are the same as the decryption + operation -- a distinction is made for important reasons. + + A certification: + * is made on a key, + * can help to validate or invalidate the key owner's identity, + * can assign trust levels to the key (or to uids and/or subkeys that + the key contains), + * and can be used in absense of in-person fingerprint checking to try + to build a path (through keys whose fingerprints have been checked) + to the key, so that the identity of the key's owner can be more + reliable without having to actually physically meet in person. + + A signature: + * is created for a file or other piece of data, + * can help to prove that the data hasn't been altered, + * and can help to prove that the data was sent by the person(s) in + possession of the private key that created the signature, and for + parsing portions of status messages from decryption operations. + + There are probably other things unique to each that have been + scatterbrainedly omitted due to the programmer sitting still and staring + at GnuPG debugging logs for too long without snacks, but that is the gist + of it. + """ + + TRUST_UNDEFINED = 0 + TRUST_NEVER = 1 + TRUST_MARGINAL = 2 + TRUST_FULLY = 3 + TRUST_ULTIMATE = 4 + + TRUST_LEVELS = {"TRUST_UNDEFINED" : TRUST_UNDEFINED, + "TRUST_NEVER" : TRUST_NEVER, + "TRUST_MARGINAL" : TRUST_MARGINAL, + "TRUST_FULLY" : TRUST_FULLY, + "TRUST_ULTIMATE" : TRUST_ULTIMATE,} + + def __init__(self, gpg): + """Create a parser for verification and certification commands. + + :param gpg: An instance of :class:`gnupg.GPG`. + """ + self._gpg = gpg + #: True if the signature is valid, False otherwise. + self.valid = False + #: A string describing the status of the signature verification. + #: Can be one of ``signature bad``, ``signature good``, + #: ``signature valid``, ``signature error``, ``decryption failed``, + #: ``no public key``, ``key exp``, or ``key rev``. + self.status = None + #: The fingerprint of the signing keyid. + self.fingerprint = None + #: The fingerprint of the corresponding public key, which may be + #: different if the signature was created with a subkey. + self.pubkey_fingerprint = None + #: The keyid of the signing key. + self.key_id = None + #: The id of the signature itself. + self.signature_id = None + #: The creation date of the signing key. + self.creation_date = None + #: The timestamp of the purported signature, if we are unable to parse + #: and/or validate it. + self.timestamp = None + #: The timestamp for when the valid signature was created. + self.sig_timestamp = None + #: The userid of the signing key which was used to create the + #: signature. + self.username = None + #: When the signing key is due to expire. + self.expire_timestamp = None + #: An integer 0-4 describing the trust level of the signature. + self.trust_level = None + #: The string corresponding to the ``trust_level`` number. + self.trust_text = None + + def __nonzero__(self): + """Override the determination for truthfulness evaluation. + + :rtype: bool + :returns: True if we have a valid signature, False otherwise. + """ + return self.valid + __bool__ = __nonzero__ + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key in self.TRUST_LEVELS: + self.trust_text = key + self.trust_level = self.TRUST_LEVELS[key] + elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", + "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", + "DECRYPTION_OKAY", "INV_SGNR"): + pass + elif key == "BADSIG": + self.valid = False + self.status = 'signature bad' + self.key_id, self.username = value.split(None, 1) + elif key == "GOODSIG": + self.valid = True + self.status = 'signature good' + self.key_id, self.username = value.split(None, 1) + elif key == "VALIDSIG": + (self.fingerprint, + self.creation_date, + self.sig_timestamp, + self.expire_timestamp) = value.split()[:4] + # may be different if signature is made with a subkey + self.pubkey_fingerprint = value.split()[-1] + self.status = 'signature valid' + elif key == "SIG_ID": + (self.signature_id, + self.creation_date, self.timestamp) = value.split() + elif key == "ERRSIG": + self.valid = False + (self.key_id, + algo, hash_algo, + cls, + self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "DECRYPTION_FAILED": + self.valid = False + self.key_id = value + self.status = 'decryption failed' + elif key == "NO_PUBKEY": + self.valid = False + self.key_id = value + self.status = 'no public key' + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + # these are useless in verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures with expired key, + # the relevant flag is EXPKEYSIG. + pass + elif key in ("EXPKEYSIG", "REVKEYSIG"): + # signed with expired or revoked key + self.valid = False + self.key_id = value.split()[0] + self.status = (('%s %s') % (key[:3], key[3:])).lower() + else: + raise ValueError("Unknown status message: %r" % key) + +
+
[docs]class Crypt(Verify): + """Parser for internal status messages from GnuPG for ``--encrypt``, + ``--decrypt``, and ``--decrypt-files``. + """ + def __init__(self, gpg): + Verify.__init__(self, gpg) + self._gpg = gpg + #: A string containing the encrypted or decrypted data. + self.data = '' + #: True if the decryption/encryption process turned out okay. + self.ok = False + #: A string describing the current processing status, or error, if one + #: has occurred. + self.status = None + self.data_format = None + self.data_timestamp = None + self.data_filename = None + + def __nonzero__(self): + if self.ok: return True + return False + __bool__ = __nonzero__ + + def __str__(self): + """The str() method for a :class:`Crypt` object will automatically return the + decoded data string, which stores the encryped or decrypted data. + + In other words, these two statements are equivalent: + + >>> assert decrypted.data == str(decrypted) + + """ + return self.data.decode(self._gpg._encoding, self._gpg._decode_errors) + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", + "CARDCTRL"): + # in the case of ERROR, this is because a more specific error + # message will have come first + pass + elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", + "MISSING_PASSPHRASE", "DECRYPTION_FAILED", + "KEY_NOT_CREATED"): + self.status = key.replace("_", " ").lower() + elif key == "NEED_TRUSTDB": + self._gpg._create_trustdb() + elif key == "NEED_PASSPHRASE_SYM": + self.status = 'need symmetric passphrase' + elif key == "BEGIN_DECRYPTION": + self.status = 'decryption incomplete' + elif key == "BEGIN_ENCRYPTION": + self.status = 'encryption incomplete' + elif key == "DECRYPTION_OKAY": + self.status = 'decryption ok' + self.ok = True + elif key == "END_ENCRYPTION": + self.status = 'encryption ok' + self.ok = True + elif key == "INV_RECP": + self.status = 'invalid recipient' + elif key == "KEYEXPIRED": + self.status = 'key expired' + elif key == "KEYREVOKED": + self.status = 'key revoked' + elif key == "SIG_CREATED": + self.status = 'sig created' + elif key == "SIGEXPIRED": + self.status = 'sig expired' + elif key == "PLAINTEXT": + fmt, dts = value.split(' ', 1) + if dts.find(' ') > 0: + self.data_timestamp, self.data_filename = dts.split(' ', 1) + else: + self.data_timestamp = dts + ## GnuPG gives us a hex byte for an ascii char corresponding to + ## the data format of the resulting plaintext, + ## i.e. '62'→'b':= binary data + self.data_format = chr(int(str(fmt), 16)) + else: + super(Crypt, self)._handle_status(key, value) +
+
[docs]class ListPackets(object): + """Handle status messages for --list-packets.""" + + def __init__(self, gpg): + self._gpg = gpg + #: A string describing the current processing status, or error, if one + #: has occurred. + self.status = None + #: True if the passphrase to a public/private keypair is required. + self.need_passphrase = None + #: True if a passphrase for a symmetric key is required. + self.need_passphrase_sym = None + #: The keyid and uid which this data is encrypted to. + self.userid_hint = None + +
[docs] def _handle_status(self, key, value): + """Parse a status code from the attached GnuPG process. + + :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + """ + if key == 'NODATA': + self.status = nodata(value) + elif key == 'ENC_TO': + # This will only capture keys in our keyring. In the future we + # may want to include multiple unknown keys in this list. + self.key, _, _ = value.split() + elif key == 'NEED_PASSPHRASE': + self.need_passphrase = True + elif key == 'NEED_PASSPHRASE_SYM': + self.need_passphrase_sym = True + elif key == 'USERID_HINT': + self.userid_hint = value.strip().split() + elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED', + 'END_DECRYPTION'): + pass + else: + raise ValueError("Unknown status message: %r" % key)
+
+ +
+
+
+
+ +
+
+
+ + + + + \ No newline at end of file diff --git a/docs/_build/html/_modules/gnupg/_util.html b/docs/_build/html/_modules/gnupg/_util.html new file mode 100644 index 0000000..b6968c4 --- /dev/null +++ b/docs/_build/html/_modules/gnupg/_util.html @@ -0,0 +1,718 @@ + + + + + + + + gnupg._util — gnupg unknown documentation + + + + + + + + + + + + +
+ +
+ +
+
+
+ +
+
+
+ +

Source code for gnupg._util

+# -*- coding: utf-8 -*-
+#
+# This file is part of python-gnupg, a Python interface to GnuPG.
+# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
+#           © 2013 Andrej B.
+#           © 2013 LEAP Encryption Access Project
+#           © 2008-2012 Vinay Sajip
+#           © 2005 Steve Traugott
+#           © 2004 A.M. Kuchling
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
+
+'''Extra utilities for python-gnupg.'''
+
+from __future__ import absolute_import
+from datetime   import datetime
+from socket     import gethostname
+from time       import localtime
+from time       import mktime
+
+import codecs
+import encodings
+import os
+import psutil
+import threading
+import random
+import re
+import string
+import sys
+
+try:
+    from io import StringIO
+    from io import BytesIO
+except ImportError:
+    from cStringIO import StringIO
+
+from . import _logger
+
+
+try:
+    unicode
+    _py3k = False
+    try:
+        isinstance(__name__, basestring)
+    except NameError:
+        msg  = "Sorry, python-gnupg requires a Python version with proper"
+        msg += " unicode support. Please upgrade to Python>=2.6."
+        raise SystemExit(msg)
+except NameError:
+    _py3k = True
+
+
+## Directory shortcuts:
+## we don't want to use this one because it writes to the install dir:
+#_here = getabsfile(currentframe()).rsplit(os.path.sep, 1)[0]
+_here = os.path.join(os.getcwd(), 'gnupg')                   ## current dir
+_test = os.path.join(os.path.join(_here, 'test'), 'tmp')     ## ./tests/tmp
+_user = os.environ.get('HOME')                               ## $HOME
+_ugpg = os.path.join(_user, '.gnupg')                        ## $HOME/.gnupg
+_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
+                                     ## $HOME/.config/python-gnupg
+
+## Logger is disabled by default
+log = _logger.create_logger(0)
+
+
+
[docs]def find_encodings(enc=None, system=False): + """Find functions for encoding translations for a specific codec. + + :param str enc: The codec to find translation functions for. It will be + normalized by converting to lowercase, excluding + everything which is not ascii, and hyphens will be + converted to underscores. + + :param bool system: If True, find encodings based on the system's stdin + encoding, otherwise assume utf-8. + + :raises: :exc:LookupError if the normalized codec, ``enc``, cannot be + found in Python's encoding translation map. + """ + if not enc: + enc = 'utf-8' + + if system: + if getattr(sys.stdin, 'encoding', None) is None: + enc = sys.stdin.encoding + log.debug("Obtained encoding from stdin: %s" % enc) + else: + enc = 'ascii' + + ## have to have lowercase to work, see + ## http://docs.python.org/dev/library/codecs.html#standard-encodings + enc = enc.lower() + codec_alias = encodings.normalize_encoding(enc) + + codecs.register(encodings.search_function) + coder = codecs.lookup(codec_alias) + + return coder +
+
[docs]def author_info(name, contact=None, public_key=None): + """Easy object-oriented representation of contributor info. + + :param str name: The contributor´s name. + :param str contact: The contributor´s email address or contact + information, if given. + :param str public_key: The contributor´s public keyid, if given. + """ + return Storage(name=name, contact=contact, public_key=public_key) +
+
[docs]def _copy_data(instream, outstream): + """Copy data from one stream to another. + + :type instream: :class:`io.BytesIO` or :class:`io.StringIO` or file + :param instream: A byte stream or open file to read from. + :param file outstream: The file descriptor of a tmpfile to write to. + """ + sent = 0 + + coder = find_encodings() + + while True: + if ((_py3k and isinstance(instream, str)) or + (not _py3k and isinstance(instream, basestring))): + data = instream[:1024] + instream = instream[1024:] + else: + data = instream.read(1024) + if len(data) == 0: + break + sent += len(data) + log.debug("Sending chunk %d bytes:\n%s" + % (sent, data)) + try: + outstream.write(data) + except UnicodeError: + try: + outstream.write(coder.encode(data)) + except IOError: + log.exception("Error sending data: Broken pipe") + break + except IOError as ioe: + # Can get 'broken pipe' errors even when all data was sent + if 'Broken pipe' in str(ioe): + log.error('Error sending data: Broken pipe') + else: + log.exception(ioe) + break + try: + outstream.close() + except IOError as ioe: + log.error("Unable to close outstream %s:\r\t%s" % (outstream, ioe)) + else: + log.debug("Closed outstream: %d bytes sent." % sent) +
+
[docs]def _create_if_necessary(directory): + """Create the specified directory, if necessary. + + :param str directory: The directory to use. + :rtype: bool + :returns: True if no errors occurred and the directory was created or + existed beforehand, False otherwise. + """ + + if not os.path.isabs(directory): + log.debug("Got non-absolute path: %s" % directory) + directory = os.path.abspath(directory) + + if not os.path.isdir(directory): + log.info("Creating directory: %s" % directory) + try: + os.makedirs(directory, 0x1C0) + except OSError as ose: + log.error(ose, exc_info=1) + return False + else: + log.debug("Created directory.") + return True +
+
[docs]def create_uid_email(username=None, hostname=None): + """Create an email address suitable for a UID on a GnuPG key. + + :param str username: The username portion of an email address. If None, + defaults to the username of the running Python + process. + + :param str hostname: The FQDN portion of an email address. If None, the + hostname is obtained from gethostname(2). + + :rtype: str + :returns: A string formatted as <username>@<hostname>. + """ + if hostname: + hostname = hostname.replace(' ', '_') + if not username: + try: username = os.environ['LOGNAME'] + except KeyError: username = os.environ['USERNAME'] + + if not hostname: hostname = gethostname() + + uid = "%s@%s" % (username.replace(' ', '_'), hostname) + else: + username = username.replace(' ', '_') + if (not hostname) and (username.find('@') == 0): + uid = "%s@%s" % (username, gethostname()) + elif hostname: + uid = "%s@%s" % (username, hostname) + else: + uid = username + + return uid +
+
[docs]def _deprefix(line, prefix, callback=None): + """Remove the prefix string from the beginning of line, if it exists. + + :param string line: A line, such as one output by GnuPG's status-fd. + :param string prefix: A substring to remove from the beginning of + ``line``. Case insensitive. + :type callback: callable + :param callback: Function to call if the prefix is found. The signature to + callback will be only one argument, the ``line`` without the ``prefix``, i.e. + ``callback(line)``. + :rtype: string + :returns: If the prefix was found, the ``line`` without the prefix is + returned. Otherwise, the original ``line`` is returned. + """ + try: + assert line.upper().startswith(u''.join(prefix).upper()) + except AssertionError: + log.debug("Line doesn't start with prefix '%s':\n%s" % (prefix, line)) + return line + else: + newline = line[len(prefix):] + if callback is not None: + try: + callback(newline) + except Exception as exc: + log.exception(exc) + return newline +
+
[docs]def _find_binary(binary=None): + """Find the absolute path to the GnuPG binary. + + Also run checks that the binary is not a symlink, and check that + our process real uid has exec permissions. + + :param str binary: The path to the GnuPG binary. + :raises: :exc:`~exceptions.RuntimeError` if it appears that GnuPG is not + installed. + :rtype: str + :returns: The absolute path to the GnuPG binary to use, if no exceptions + occur. + """ + found = None + if binary is not None: + if not os.path.isabs(binary): + try: + found = _which(binary) + log.debug("Found potential binary paths: %s" + % '\n'.join([path for path in found])) + found = found[0] + except IndexError as ie: + log.info("Could not determine absolute path of binary: '%s'" + % binary) + elif os.access(binary, os.X_OK): + found = binary + if found is None: + try: found = _which('gpg')[0] + except IndexError as ie: + log.error("Could not find binary for 'gpg'.") + try: found = _which('gpg2')[0] + except IndexError as ie: + log.error("Could not find binary for 'gpg2'.") + if found is None: + raise RuntimeError("GnuPG is not installed!") + + try: + assert os.path.isabs(found), "Path to gpg binary not absolute" + assert not os.path.islink(found), "Path to gpg binary is symlink" + assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary" + except (AssertionError, AttributeError) as ae: + log.error(str(ae)) + else: + return found +
+
[docs]def _has_readwrite(path): + """ + Determine if the real uid/gid of the executing user has read and write + permissions for a directory or a file. + + :param str path: The path to the directory or file to check permissions + for. + :rtype: bool + :returns: True if real uid/gid has read+write permissions, False otherwise. + """ + return os.access(path, os.R_OK ^ os.W_OK) +
+
[docs]def _is_file(filename): + """Check that the size of the thing which is supposed to be a filename has + size greater than zero, without following symbolic links or using + :func:os.path.isfile. + + :param filename: An object to check. + :rtype: bool + :returns: True if **filename** is file-like, False otherwise. + """ + try: + statinfo = os.lstat(filename) + log.debug("lstat(%r) with type=%s gave us %r" + % (repr(filename), type(filename), repr(statinfo))) + if not (statinfo.st_size > 0): + raise ValueError("'%s' appears to be an empty file!" % filename) + except OSError as oserr: + log.error(oserr) + if filename == '-': + log.debug("Got '-' for filename, assuming sys.stdin...") + return True + except (ValueError, TypeError, IOError) as err: + log.error(err) + else: + return True + return False +
+
[docs]def _is_stream(input): + """Check that the input is a byte stream. + + :param input: An object provided for reading from or writing to. + :rtype: bool + :returns: True if :param:input is a stream, False if otherwise. + """ + return isinstance(input, BytesIO) or isinstance(input, StringIO) +
+
[docs]def _is_list_or_tuple(instance): + """Check that ``instance`` is a list or tuple. + + :param instance: The object to type check. + :rtype: bool + :returns: True if ``instance`` is a list or tuple, False otherwise. + """ + return isinstance(instance, (list, tuple,)) +
+
[docs]def _is_gpg1(version): + """Returns True if using GnuPG version 1.x. + + :param tuple version: A tuple of three integers indication major, minor, + and micro version numbers. + """ + (major, minor, micro) = _match_version_string(version) + if major == 1: + return True + return False +
+
[docs]def _is_gpg2(version): + """Returns True if using GnuPG version 2.x. + + :param tuple version: A tuple of three integers indication major, minor, + and micro version numbers. + """ + (major, minor, micro) = _match_version_string(version) + if major == 2: + return True + return False +
+
[docs]def _make_binary_stream(s, encoding): + """ + xxx fill me in + """ + try: + if _py3k: + if isinstance(s, str): + s = s.encode(encoding) + else: + if type(s) is not str: + s = s.encode(encoding) + from io import BytesIO + rv = BytesIO(s) + except ImportError: + rv = StringIO(s) + return rv +
+
[docs]def _make_passphrase(length=None, save=False, file=None): + """Create a passphrase and write it to a file that only the user can read. + + This is not very secure, and should not be relied upon for actual key + passphrases. + + :param int length: The length in bytes of the string to generate. + + :param file file: The file to save the generated passphrase in. If not + given, defaults to 'passphrase-<the real user id>-<seconds since + epoch>' in the top-level directory. + """ + if not length: + length = 40 + + passphrase = _make_random_string(length) + + if save: + ruid, euid, suid = psutil.Process(os.getpid()).uids + gid = os.getgid() + now = mktime(localtime()) + + if not file: + filename = str('passphrase-%s-%s' % uid, now) + file = os.path.join(_repo, filename) + + with open(file, 'a') as fh: + fh.write(passphrase) + fh.flush() + fh.close() + os.chmod(file, stat.S_IRUSR | stat.S_IWUSR) + os.chown(file, ruid, gid) + + log.warn("Generated passphrase saved to %s" % file) + return passphrase +
+
[docs]def _make_random_string(length): + """Returns a random lowercase, uppercase, alphanumerical string. + + :param int length: The length in bytes of the string to generate. + """ + chars = string.ascii_lowercase + string.ascii_uppercase + string.digits + return ''.join(random.choice(chars) for x in range(length)) +
+
[docs]def _match_version_string(version): + """Sort a binary version string into major, minor, and micro integers. + + :param str version: A version string in the form x.x.x + """ + regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*') + matched = regex.match(version) + g = matched.groups() + major, minor, micro = int(g[0]), int(g[2]), int(g[4]) + return (major, minor, micro) +
+
[docs]def _next_year(): + """Get the date of today plus one year. + + :rtype: str + :returns: The date of this day next year, in the format '%Y-%m-%d'. + """ + now = datetime.now().__str__() + date = now.split(' ', 1)[0] + year, month, day = date.split('-', 2) + next_year = str(int(year)+1) + return '-'.join((next_year, month, day)) +
+
[docs]def _now(): + """Get a timestamp for right now, formatted according to ISO 8601.""" + return datetime.isoformat(datetime.now()) +
+
[docs]def _separate_keyword(line): + """Split the line, and return (first_word, the_rest).""" + try: + first, rest = line.split(None, 1) + except ValueError: + first = line.strip() + rest = '' + return first, rest +
+
[docs]def _threaded_copy_data(instream, outstream): + """Copy data from one stream to another in a separate thread. + + Wraps ``_copy_data()`` in a :class:`threading.Thread`. + + :type instream: :class:`io.BytesIO` or :class:`io.StringIO` + :param instream: A byte stream to read from. + :param file outstream: The file descriptor of a tmpfile to write to. + """ + copy_thread = threading.Thread(target=_copy_data, + args=(instream, outstream)) + copy_thread.setDaemon(True) + log.debug('%r, %r, %r', copy_thread, instream, outstream) + copy_thread.start() + return copy_thread +
+
[docs]def _utc_epoch(): + """Get the seconds since epoch.""" + return int(mktime(localtime())) +
+
[docs]def _which(executable, flags=os.X_OK): + """Borrowed from Twisted's :mod:twisted.python.proutils . + + Search PATH for executable files with the given name. + + On newer versions of MS-Windows, the PATHEXT environment variable will be + set to the list of file extensions for files considered executable. This + will normally include things like ".EXE". This fuction will also find files + with the given name ending with any of these extensions. + + On MS-Windows the only flag that has any meaning is os.F_OK. Any other + flags will be ignored. + + Note: This function does not help us prevent an attacker who can already + manipulate the environment's PATH settings from placing malicious code + higher in the PATH. It also does happily follows links. + + :param str name: The name for which to search. + :param int flags: Arguments to L{os.access}. + :rtype: list + :returns: A list of the full paths to files found, in the order in which + they were found. + """ + result = [] + exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep)) + path = os.environ.get('PATH', None) + if path is None: + return [] + for p in os.environ.get('PATH', '').split(os.pathsep): + p = os.path.join(p, executable) + if os.access(p, flags): + result.append(p) + for e in exts: + pext = p + e + if os.access(pext, flags): + result.append(pext) + return result +
+
[docs]def _write_passphrase(stream, passphrase, encoding): + """Write the passphrase from memory to the GnuPG process' stdin. + + :type stream: file, :class:`~io.BytesIO`, or :class:`~io.StringIO` + :param stream: The input file descriptor to write the password to. + :param str passphrase: The passphrase for the secret key material. + :param str encoding: The data encoding expected by GnuPG. Usually, this + is ``sys.getfilesystemencoding()``. + """ + passphrase = '%s\n' % passphrase + passphrase = passphrase.encode(encoding) + stream.write(passphrase) + log.debug("Wrote passphrase on stdin.") + +
+
[docs]class InheritableProperty(object): + """Based on the emulation of PyProperty_Type() in Objects/descrobject.c""" + + def __init__(self, fget=None, fset=None, fdel=None, doc=None): + self.fget = fget + self.fset = fset + self.fdel = fdel + self.__doc__ = doc + + def __get__(self, obj, objtype=None): + if obj is None: + return self + if self.fget is None: + raise AttributeError("unreadable attribute") + if self.fget.__name__ == '<lambda>' or not self.fget.__name__: + return self.fget(obj) + else: + return getattr(obj, self.fget.__name__)() + + def __set__(self, obj, value): + if self.fset is None: + raise AttributeError("can't set attribute") + if self.fset.__name__ == '<lambda>' or not self.fset.__name__: + self.fset(obj, value) + else: + getattr(obj, self.fset.__name__)(value) + + def __delete__(self, obj): + if self.fdel is None: + raise AttributeError("can't delete attribute") + if self.fdel.__name__ == '<lambda>' or not self.fdel.__name__: + self.fdel(obj) + else: + getattr(obj, self.fdel.__name__)() + +
+
[docs]class Storage(dict): + """A dictionary where keys are stored as class attributes. + + For example, ``obj.foo`` can be used in addition to ``obj['foo']``: + + >>> o = Storage(a=1) + >>> o.a + 1 + >>> o['a'] + 1 + >>> o.a = 2 + >>> o['a'] + 2 + >>> del o.a + >>> o.a + None + """ + def __getattr__(self, key): + try: + return self[key] + except KeyError as k: + return None + + def __setattr__(self, key, value): + self[key] = value + + def __delattr__(self, key): + try: + del self[key] + except KeyError as k: + raise AttributeError(k.args[0]) + + def __repr__(self): + return '<Storage ' + dict.__repr__(self) + '>' + + def __getstate__(self): + return dict(self) + + def __setstate__(self, value): + for (k, v) in value.items(): + self[k] = v
+
+ +
+
+
+
+ +
+
+
+ + + + + \ No newline at end of file diff --git a/docs/_build/html/_modules/index.html b/docs/_build/html/_modules/index.html new file mode 100644 index 0000000..d032746 --- /dev/null +++ b/docs/_build/html/_modules/index.html @@ -0,0 +1,104 @@ + + + + + + + + Overview: module code — gnupg unknown documentation + + + + + + + + + + + +
+ +
+ +
+
+
+ +
+
+
+ +

All modules for which code is available

+ + +
+
+
+
+ +
+
+
+ + + + + \ No newline at end of file -- cgit v1.2.3