From a5d46a4e38985be540b9127ddcd3d8e21bbecb9a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 8 Jun 2015 16:46:11 -0400 Subject: Imported Upstream version 2.0.2 --- docs/_build/html/_modules/gnupg.html | 148 --- docs/_build/html/_modules/gnupg/_meta.html | 981 ---------------- docs/_build/html/_modules/gnupg/_parsers.html | 1486 ------------------------- docs/_build/html/_modules/gnupg/_util.html | 718 ------------ docs/_build/html/_modules/index.html | 104 -- 5 files changed, 3437 deletions(-) delete mode 100644 docs/_build/html/_modules/gnupg.html delete mode 100644 docs/_build/html/_modules/gnupg/_meta.html delete mode 100644 docs/_build/html/_modules/gnupg/_parsers.html delete mode 100644 docs/_build/html/_modules/gnupg/_util.html delete mode 100644 docs/_build/html/_modules/index.html (limited to 'docs/_build/html/_modules') diff --git a/docs/_build/html/_modules/gnupg.html b/docs/_build/html/_modules/gnupg.html deleted file mode 100644 index 24c2170..0000000 --- a/docs/_build/html/_modules/gnupg.html +++ /dev/null @@ -1,148 +0,0 @@ - - - - - - - - gnupg — gnupg unknown documentation - - - - - - - - - - - - -
- -
- -
-
-
- -
-
-
- -

Source code for gnupg

-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# This file is part of python-gnupg, a Python interface to GnuPG.
-# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
-#           © 2013 Andrej B.
-#           © 2013 LEAP Encryption Access Project
-#           © 2008-2012 Vinay Sajip
-#           © 2005 Steve Traugott
-#           © 2004 A.M. Kuchling
-#
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
-
-from __future__ import absolute_import
-
-from . import gnupg
-from . import copyleft
-from . import _ansistrm
-from . import _logger
-from . import _meta
-from . import _parsers
-from . import _util
-from .gnupg import GPG
-from ._version import get_versions
-
-__version__ = get_versions()['version']
-__authors__ = copyleft.authors
-__license__ = copyleft.full_text
-__copyleft__ = copyleft.copyright
-
-## do not set __package__ = "gnupg", else we will end up with
-## gnupg.<*allofthethings*>
-__all__ = ["GPG", "_util", "_parsers", "_meta", "_logger"]
-
-## avoid the "from gnupg import gnupg" idiom
-del gnupg
-del absolute_import
-del copyleft
-del get_versions
-del _version
-
- -
-
-
-
- -
-
-
- - - - - \ No newline at end of file diff --git a/docs/_build/html/_modules/gnupg/_meta.html b/docs/_build/html/_modules/gnupg/_meta.html deleted file mode 100644 index 23ca880..0000000 --- a/docs/_build/html/_modules/gnupg/_meta.html +++ /dev/null @@ -1,981 +0,0 @@ - - - - - - - - gnupg._meta — gnupg unknown documentation - - - - - - - - - - - - -
- -
- -
-
-
- -
-
-
- -

Source code for gnupg._meta

-# -*- coding: utf-8 -*-
-#
-# This file is part of python-gnupg, a Python interface to GnuPG.
-# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
-#           © 2013 Andrej B.
-#           © 2013 LEAP Encryption Access Project
-#           © 2008-2012 Vinay Sajip
-#           © 2005 Steve Traugott
-#           © 2004 A.M. Kuchling
-#
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
-
-'''Meta and base classes for hiding internal functions, and controlling
-attribute creation and handling.
-'''
-
-from __future__ import absolute_import
-
-import atexit
-import codecs
-import encodings
-## For AOS, the locale module will need to point to a wrapper around the
-## java.util.Locale class.
-## See https://code.patternsinthevoid.net/?p=android-locale-hack.git
-import locale
-import os
-import platform
-import psutil
-import shlex
-import subprocess
-import sys
-import threading
-
-from . import _parsers
-from . import _util
-
-from ._parsers import _check_preferences
-from ._parsers import _sanitise_list
-from ._util    import log
-
-
-
[docs]class GPGMeta(type): - """Metaclass for changing the :meth:GPG.__init__ initialiser. - - Detects running gpg-agent processes and the presence of a pinentry - program, and disables pinentry so that python-gnupg can write the - passphrase to the controlled GnuPG process without killing the agent. - - :attr _agent_proc: If a :program:`gpg-agent` process is currently running - for the effective userid, then **_agent_proc** will be - set to a ``psutil.Process`` for that process. - """ - -
[docs] def __new__(cls, name, bases, attrs): - """Construct the initialiser for GPG""" - log.debug("Metaclass __new__ constructor called for %r" % cls) - if cls._find_agent(): - ## call the normal GPG.__init__() initialiser: - attrs['init'] = cls.__init__ - attrs['_remove_agent'] = True - return super(GPGMeta, cls).__new__(cls, name, bases, attrs) -
- @classmethod -
[docs] def _find_agent(cls): - """Discover if a gpg-agent process for the current euid is running. - - If there is a matching gpg-agent process, set a :class:`psutil.Process` - instance containing the gpg-agent process' information to - ``cls._agent_proc``. - - :returns: True if there exists a gpg-agent process running under the - same effective user ID as that of this program. Otherwise, - returns None. - """ - identity = psutil.Process(os.getpid()).uids - for proc in psutil.process_iter(): - if (proc.name == "gpg-agent") and proc.is_running: - log.debug("Found gpg-agent process with pid %d" % proc.pid) - if proc.uids == identity: - log.debug( - "Effective UIDs of this process and gpg-agent match") - setattr(cls, '_agent_proc', proc) - return True - -
-
[docs]class GPGBase(object): - """Base class for storing properties and controlling process initialisation. - - :const _result_map: A *dict* containing classes from - :mod:`~gnupg._parsers`, used for parsing results - obtained from GnuPG commands. - :const _decode_errors: How to handle encoding errors. - """ - __metaclass__ = GPGMeta - _decode_errors = 'strict' - _result_map = { 'crypt': _parsers.Crypt, - 'delete': _parsers.DeleteResult, - 'generate': _parsers.GenKey, - 'import': _parsers.ImportResult, - 'list': _parsers.ListKeys, - 'sign': _parsers.Sign, - 'verify': _parsers.Verify, - 'packets': _parsers.ListPackets } - -
[docs] def __init__(self, binary=None, home=None, keyring=None, secring=None, - use_agent=False, default_preference_list=None, - verbose=False, options=None): - """Create a ``GPGBase``. - - This class is used to set up properties for controlling the behaviour - of configuring various options for GnuPG, such as setting GnuPG's - **homedir** , and the paths to its **binary** and **keyring** . - - :const binary: (:obj:`str`) The full path to the GnuPG binary. - - :ivar homedir: (:class:`~gnupg._util.InheritableProperty`) The full - path to the current setting for the GnuPG - ``--homedir``. - - :ivar _generated_keys: (:class:`~gnupg._util.InheritableProperty`) - Controls setting the directory for storing any - keys which are generated with - :meth:`~gnupg.GPG.gen_key`. - - :ivar str keyring: The filename in **homedir** to use as the keyring - file for public keys. - :ivar str secring: The filename in **homedir** to use as the keyring - file for secret keys. - """ - self.binary = _util._find_binary(binary) - self.homedir = home if home else _util._conf - pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg' - sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg' - self.keyring = os.path.join(self._homedir, pub) - self.secring = os.path.join(self._homedir, sec) - self.options = _parsers._sanitise(options) if options else None - - if default_preference_list: - self._prefs = _check_preferences(default_preference_list, 'all') - else: - self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH' - self._prefs += ' AES192 ZLIB ZIP Uncompressed' - - encoding = locale.getpreferredencoding() - if encoding is None: # This happens on Jython! - encoding = sys.stdin.encoding - self._encoding = encoding.lower().replace('-', '_') - self._filesystemencoding = encodings.normalize_encoding( - sys.getfilesystemencoding().lower()) - - self._keyserver = 'hkp://wwwkeys.pgp.net' - self.__generated_keys = os.path.join(self.homedir, 'generated-keys') - - try: - assert self.binary, "Could not find binary %s" % binary - assert isinstance(verbose, (bool, str, int)), \ - "'verbose' must be boolean, string, or 0 <= n <= 9" - assert isinstance(use_agent, bool), "'use_agent' must be boolean" - if self.options is not None: - assert isinstance(self.options, str), "options not string" - except (AssertionError, AttributeError) as ae: - log.error("GPGBase.__init__(): %s" % str(ae)) - raise RuntimeError(str(ae)) - else: - if verbose is True: - # The caller wants logging, but we need a valid --debug-level - # for gpg. Default to "basic", and warn about the ambiguity. - # (garrettr) - verbose = "basic" - log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging') - self.verbose = verbose - self.use_agent = use_agent - - if hasattr(self, '_agent_proc') \ - and getattr(self, '_remove_agent', None) is True: - if hasattr(self, '__remove_path__'): - self.__remove_path__('pinentry') -
-
[docs] def __remove_path__(self, prog=None, at_exit=True): - """Remove the directories containing a program from the system's - ``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it - is linked to :file:'./gpg' in the current directory. - - :param str prog: The program to remove from ``$PATH``. - :param bool at_exit: Add the program back into the ``$PATH`` when the - Python interpreter exits, and delete any symlinks - to ``GPGBase.binary`` which were created. - """ - #: A list of ``$PATH`` entries which were removed to disable pinentry. - self._removed_path_entries = [] - - log.debug("Attempting to remove %s from system PATH" % str(prog)) - if (prog is None) or (not isinstance(prog, str)): return - - try: - program = _util._which(prog)[0] - except (OSError, IOError, IndexError) as err: - log.err(str(err)) - log.err("Cannot find program '%s', not changing PATH." % prog) - return - - ## __remove_path__ cannot be an @classmethod in GPGMeta, because - ## the use_agent attribute must be set by the instance. - if not self.use_agent: - program_base = os.path.dirname(prog) - gnupg_base = os.path.dirname(self.binary) - - ## symlink our gpg binary into $PWD if the path we are removing is - ## the one which contains our gpg executable: - new_gpg_location = os.path.join(os.getcwd(), 'gpg') - if gnupg_base == program_base: - os.symlink(self.binary, new_gpg_location) - self.binary = new_gpg_location - - ## copy the original environment so that we can put it back later: - env_copy = os.environ ## this one should not be touched - path_copy = os.environ.pop('PATH') - log.debug("Created a copy of system PATH: %r" % path_copy) - assert not os.environ.has_key('PATH'), "OS env kept $PATH anyway!" - - @staticmethod - def remove_program_from_path(path, prog_base): - """Remove all directories which contain a program from PATH. - - :param str path: The contents of the system environment's - ``$PATH``. - - :param str prog_base: The directory portion of a program's - location, without the trailing slash, - and without the program name. For - example, ``prog_base='/usr/bin'``. - """ - paths = path.split(':') - for directory in paths: - if directory == prog_base: - log.debug("Found directory with target program: %s" - % directory) - path.remove(directory) - self._removed_path_entries.append(directory) - log.debug("Deleted all found instance of %s." % directory) - log.debug("PATH is now:%s%s" % (os.linesep, path)) - new_path = ':'.join([p for p in path]) - return new_path - - @staticmethod - def update_path(environment, path): - """Add paths to the string at ``os.environ['PATH']``. - - :param str environment: The environment mapping to update. - :param list path: A list of strings to update the PATH with. - """ - log.debug("Updating system path...") - os.environ = environment - new_path = ':'.join([p for p in path]) - old = '' - if 'PATH' in os.environ: - new_path = ':'.join([os.environ['PATH'], new_path]) - os.environ.update({'PATH': new_path}) - log.debug("System $PATH: %s" % os.environ['PATH']) - - modified_path = remove_program_from_path(path_copy, program_base) - update_path(env_copy, modified_path) - - ## register an _exithandler with the python interpreter: - atexit.register(update_path, env_copy, path_copy) - - def remove_symlinked_binary(symlink): - if os.path.islink(symlink): - os.unlink(symlink) - log.debug("Removed binary symlink '%s'" % symlink) - atexit.register(remove_symlinked_binary, new_gpg_location) -
- @property - def default_preference_list(self): - """Get the default preference list.""" - return self._prefs - - @default_preference_list.setter - def default_preference_list(self, prefs): - """Set the default preference list. - - :param str prefs: A string containing the default preferences for - ciphers, digests, and compression algorithms. - """ - prefs = _check_preferences(prefs) - if prefs is not None: - self._prefs = prefs - - @default_preference_list.deleter -
[docs] def default_preference_list(self): - """Reset the default preference list to its original state. - - Note that "original state" does not mean the default preference - list for whichever version of GnuPG is being used. It means the - default preference list defined by :attr:`GPGBase._prefs`. - - Using BZIP2 is avoided due to not interacting well with some versions - of GnuPG>=2.0.0. - """ - self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP' -
- @property - def keyserver(self): - """Get the current keyserver setting.""" - return self._keyserver - - @keyserver.setter - def keyserver(self, location): - """Set the default keyserver to use for sending and receiving keys. - - The ``location`` is sent to :func:`_parsers._check_keyserver` when - option are parsed in :meth:`gnupg.GPG._make_options`. - - :param str location: A string containing the default keyserver. This - should contain the desired keyserver protocol - which is supported by the keyserver, for example, - ``'hkps://keys.mayfirst.org'``. The default - keyserver is ``'hkp://wwwkeys.pgp.net'``. - """ - self._keyserver = location - - @keyserver.deleter -
[docs] def keyserver(self): - """Reset the keyserver to the default setting.""" - self._keyserver = 'hkp://wwwkeys.pgp.net' -
-
[docs] def _homedir_getter(self): - """Get the directory currently being used as GnuPG's homedir. - - If unspecified, use :file:`~/.config/python-gnupg/` - - :rtype: str - :returns: The absolute path to the current GnuPG homedir. - """ - return self._homedir -
-
[docs] def _homedir_setter(self, directory): - """Set the directory to use as GnuPG's homedir. - - If unspecified, use $HOME/.config/python-gnupg. If specified, ensure - that the ``directory`` does not contain various shell escape - characters. If ``directory`` is not found, it will be automatically - created. Lastly, the ``direcory`` will be checked that the EUID has - read and write permissions for it. - - :param str directory: A relative or absolute path to the directory to - use for storing/accessing GnuPG's files, including - keyrings and the trustdb. - :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable - directory to use. - """ - if not directory: - log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'" - % _util._conf) - directory = _util._conf - - hd = _parsers._fix_unsafe(directory) - log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd) - - if hd: - log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd) - _util._create_if_necessary(hd) - - try: - log.debug("GPGBase._homedir_setter(): checking permissions") - assert _util._has_readwrite(hd), \ - "Homedir '%s' needs read/write permissions" % hd - except AssertionError as ae: - msg = ("Unable to set '%s' as GnuPG homedir" % directory) - log.debug("GPGBase.homedir.setter(): %s" % msg) - log.debug(str(ae)) - raise RuntimeError(str(ae)) - else: - log.info("Setting homedir to '%s'" % hd) - self._homedir = hd -
- homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter) - -
[docs] def _generated_keys_getter(self): - """Get the ``homedir`` subdirectory for storing generated keys. - - :rtype: str - :returns: The absolute path to the current GnuPG homedir. - """ - return self.__generated_keys -
-
[docs] def _generated_keys_setter(self, directory): - """Set the directory for storing generated keys. - - If unspecified, use - :meth:`~gnupg._meta.GPGBase.homedir`/generated-keys. If specified, - ensure that the ``directory`` does not contain various shell escape - characters. If ``directory`` isn't found, it will be automatically - created. Lastly, the ``directory`` will be checked to ensure that the - current EUID has read and write permissions for it. - - :param str directory: A relative or absolute path to the directory to - use for storing/accessing GnuPG's files, including keyrings and - the trustdb. - :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable - directory to use. - """ - if not directory: - directory = os.path.join(self.homedir, 'generated-keys') - log.debug("GPGBase._generated_keys_setter(): Using '%s'" - % directory) - - hd = _parsers._fix_unsafe(directory) - log.debug("GPGBase._generated_keys_setter(): got directory '%s'" % hd) - - if hd: - log.debug("GPGBase._generated_keys_setter(): Check exists '%s'" - % hd) - _util._create_if_necessary(hd) - - try: - log.debug("GPGBase._generated_keys_setter(): check permissions") - assert _util._has_readwrite(hd), \ - "Keys dir '%s' needs read/write permissions" % hd - except AssertionError as ae: - msg = ("Unable to set '%s' as generated keys dir" % directory) - log.debug("GPGBase._generated_keys_setter(): %s" % msg) - log.debug(str(ae)) - raise RuntimeError(str(ae)) - else: - log.info("Setting homedir to '%s'" % hd) - self.__generated_keys = hd -
- _generated_keys = _util.InheritableProperty(_generated_keys_getter, - _generated_keys_setter) - -
[docs] def _make_args(self, args, passphrase=False): - """Make a list of command line elements for GPG. - - The value of ``args`` will be appended only if it passes the checks in - :func:`gnupg._parsers._sanitise`. The ``passphrase`` argument needs to - be True if a passphrase will be sent to GnuPG, else False. - - :param list args: A list of strings of options and flags to pass to - ``GPG.binary``. This is input safe, meaning that - these values go through strict checks (see - ``parsers._sanitise_list``) before being passed to to - the input file descriptor for the GnuPG process. - Each string should be given exactly as it would be on - the commandline interface to GnuPG, - e.g. ["--cipher-algo AES256", "--default-key - A3ADB67A2CDB8B35"]. - - :param bool passphrase: If True, the passphrase will be sent to the - stdin file descriptor for the attached GnuPG - process. - """ - ## see TODO file, tag :io:makeargs: - cmd = [self.binary, - '--no-options --no-emit-version --no-tty --status-fd 2'] - - if self.homedir: cmd.append('--homedir "%s"' % self.homedir) - - if self.keyring: - cmd.append('--no-default-keyring --keyring %s' % self.keyring) - if self.secring: - cmd.append('--secret-keyring %s' % self.secring) - - if passphrase: cmd.append('--batch --passphrase-fd 0') - - if self.use_agent: cmd.append('--use-agent') - else: cmd.append('--no-use-agent') - - if self.options: - [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] - if args: - [cmd.append(arg) for arg in iter(_sanitise_list(args))] - - if self.verbose: - cmd.append('--debug-all') - if ((isinstance(self.verbose, str) and - self.verbose in ['basic', 'advanced', 'expert', 'guru']) - or (isinstance(self.verbose, int) and (1<=self.verbose<=9))): - cmd.append('--debug-level %s' % self.verbose) - - return cmd -
-
[docs] def _open_subprocess(self, args=None, passphrase=False): - """Open a pipe to a GPG subprocess and return the file objects for - communicating with it. - - :param list args: A list of strings of options and flags to pass to - ``GPG.binary``. This is input safe, meaning that - these values go through strict checks (see - ``parsers._sanitise_list``) before being passed to to - the input file descriptor for the GnuPG process. - Each string should be given exactly as it would be on - the commandline interface to GnuPG, - e.g. ["--cipher-algo AES256", "--default-key - A3ADB67A2CDB8B35"]. - - :param bool passphrase: If True, the passphrase will be sent to the - stdin file descriptor for the attached GnuPG - process. - """ - ## see http://docs.python.org/2/library/subprocess.html#converting-an\ - ## -argument-sequence-to-a-string-on-windows - cmd = shlex.split(' '.join(self._make_args(args, passphrase))) - log.debug("Sending command to GnuPG process:%s%s" % (os.linesep, cmd)) - - if platform.system() == "Windows": - # TODO figure out what the hell is going on there. - expand_shell = True - else: - expand_shell = False - - return subprocess.Popen(cmd, shell=expand_shell, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env={'LANGUAGE': 'en'}) -
-
[docs] def _read_response(self, stream, result): - """Reads all the stderr output from GPG, taking notice only of lines - that begin with the magic [GNUPG:] prefix. - - Calls methods on the response object for each valid token found, with - the arg being the remainder of the status line. - - :param stream: A byte-stream, file handle, or a - :data:`subprocess.PIPE` for parsing the status codes - from the GnuPG process. - - :param result: The result parser class from :mod:`~gnupg._parsers` ― - the ``handle_status()`` method of that class will be - called in order to parse the output of ``stream``. - """ - lines = [] - while True: - line = stream.readline() - if len(line) == 0: - break - lines.append(line) - line = line.rstrip() - - if line.startswith('[GNUPG:]'): - line = _util._deprefix(line, '[GNUPG:] ', log.status) - keyword, value = _util._separate_keyword(line) - result._handle_status(keyword, value) - elif line.startswith('gpg:'): - line = _util._deprefix(line, 'gpg: ') - keyword, value = _util._separate_keyword(line) - - # Log gpg's userland messages at our own levels: - if keyword.upper().startswith("WARNING"): - log.warn("%s" % value) - elif keyword.upper().startswith("FATAL"): - log.critical("%s" % value) - # Handle the gpg2 error where a missing trustdb.gpg is, - # for some stupid reason, considered fatal: - if value.find("trustdb.gpg") and value.find("No such file"): - result._handle_status('NEED_TRUSTDB', '') - else: - if self.verbose: - log.info("%s" % line) - else: - log.debug("%s" % line) - result.stderr = ''.join(lines) -
-
[docs] def _read_data(self, stream, result): - """Incrementally read from ``stream`` and store read data. - - All data gathered from calling ``stream.read()`` will be concatenated - and stored as ``result.data``. - - :param stream: An open file-like object to read() from. - :param result: An instance of one of the :ref:`result parsing classes - <parsers>` from :const:`~gnupg._meta.GPGBase._result_map`. - """ - chunks = [] - log.debug("Reading data from stream %r..." % stream.__repr__()) - - while True: - data = stream.read(1024) - if len(data) == 0: - break - chunks.append(data) - log.debug("Read %4d bytes" % len(data)) - - # Join using b'' or '', as appropriate - result.data = type(data)().join(chunks) - log.debug("Finishing reading from stream %r..." % stream.__repr__()) - log.debug("Read %4d bytes total" % len(result.data)) -
-
[docs] def _collect_output(self, process, result, writer=None, stdin=None): - """Drain the subprocesses output streams, writing the collected output - to the result. If a writer thread (writing to the subprocess) is given, - make sure it's joined before returning. If a stdin stream is given, - close it before returning. - """ - stderr = codecs.getreader(self._encoding)(process.stderr) - rr = threading.Thread(target=self._read_response, - args=(stderr, result)) - rr.setDaemon(True) - log.debug('stderr reader: %r', rr) - rr.start() - - stdout = process.stdout - dr = threading.Thread(target=self._read_data, args=(stdout, result)) - dr.setDaemon(True) - log.debug('stdout reader: %r', dr) - dr.start() - - dr.join() - rr.join() - if writer is not None: - writer.join() - process.wait() - if stdin is not None: - try: - stdin.close() - except IOError: - pass - stderr.close() - stdout.close() -
-
[docs] def _handle_io(self, args, file, result, passphrase=False, binary=False): - """Handle a call to GPG - pass input data, collect output data.""" - p = self._open_subprocess(args, passphrase) - if not binary: - stdin = codecs.getwriter(self._encoding)(p.stdin) - else: - stdin = p.stdin - if passphrase: - _util._write_passphrase(stdin, passphrase, self._encoding) - writer = _util._threaded_copy_data(file, stdin) - self._collect_output(p, result, writer, stdin) - return result -
-
[docs] def _recv_keys(self, keyids, keyserver=None): - """Import keys from a keyserver. - - :param str keyids: A space-delimited string containing the keyids to - request. - :param str keyserver: The keyserver to request the ``keyids`` from; - defaults to `gnupg.GPG.keyserver`. - """ - if not keyserver: - keyserver = self.keyserver - - args = ['--keyserver {0}'.format(keyserver), - '--recv-keys {0}'.format(keyids)] - log.info('Requesting keys from %s: %s' % (keyserver, keyids)) - - result = self._result_map['import'](self) - proc = self._open_subprocess(args) - self._collect_output(proc, result) - log.debug('recv_keys result: %r', result.__dict__) - return result -
-
[docs] def _sign_file(self, file, default_key=None, passphrase=None, - clearsign=True, detach=False, binary=False, - digest_algo='SHA512'): - """Create a signature for a file. - - :param file: The file stream (i.e. it's already been open()'d) to sign. - :param str default_key: The key to sign with. - :param str passphrase: The passphrase to pipe to stdin. - :param bool clearsign: If True, create a cleartext signature. - :param bool detach: If True, create a detached signature. - :param bool binary: If True, do not ascii armour the output. - :param str digest_algo: The hash digest to use. Again, to see which - hashes your GnuPG is capable of using, do: - ``$ gpg --with-colons --list-config - digestname``. The default, if unspecified, is - ``'SHA512'``. - """ - log.debug("_sign_file():") - if binary: - log.info("Creating binary signature for file %s" % file) - args = ['--sign'] - else: - log.info("Creating ascii-armoured signature for file %s" % file) - args = ['--sign --armor'] - - if clearsign: - args.append("--clearsign") - if detach: - log.warn("Cannot use both --clearsign and --detach-sign.") - log.warn("Using default GPG behaviour: --clearsign only.") - elif detach and not clearsign: - args.append("--detach-sign") - - if default_key: - args.append(str("--default-key %s" % default_key)) - - args.append(str("--digest-algo %s" % digest_algo)) - - ## We could use _handle_io here except for the fact that if the - ## passphrase is bad, gpg bails and you can't write the message. - result = self._result_map['sign'](self) - proc = self._open_subprocess(args, passphrase is not None) - try: - if passphrase: - _util._write_passphrase(proc.stdin, passphrase, self._encoding) - writer = _util._threaded_copy_data(file, proc.stdin) - except IOError as ioe: - log.exception("Error writing message: %s" % str(ioe)) - writer = None - self._collect_output(proc, result, writer, proc.stdin) - return result -
-
[docs] def _encrypt(self, data, recipients, - default_key=None, - passphrase=None, - armor=True, - encrypt=True, - symmetric=False, - always_trust=True, - output=None, - cipher_algo='AES256', - digest_algo='SHA512', - compress_algo='ZLIB'): - """Encrypt the message read from the file-like object **data**. - - :param str data: The file or bytestream to encrypt. - - :param str recipients: The recipients to encrypt to. Recipients must - be specified keyID/fingerprint. - - .. warning:: Care should be taken in Python2 to make sure that the - given fingerprints for **recipients** are in fact strings - and not unicode objects. - - :param str default_key: The keyID/fingerprint of the key to use for - signing. If given, **data** will be encrypted - *and* signed. - - :param str passphrase: If given, and **default_key** is also given, - use this passphrase to unlock the secret - portion of the **default_key** to sign the - encrypted **data**. Otherwise, if - **default_key** is not given, but **symmetric** - is ``True``, then use this passphrase as the - passphrase for symmetric encryption. Signing - and symmetric encryption should *not* be - combined when sending the **data** to other - recipients, else the passphrase to the secret - key would be shared with them. - - :param bool armor: If True, ascii armor the output; otherwise, the - output will be in binary format. (Default: True) - - :param bool encrypt: If True, encrypt the **data** using the - **recipients** public keys. (Default: True) - - :param bool symmetric: If True, encrypt the **data** to **recipients** - using a symmetric key. See the **passphrase** - parameter. Symmetric encryption and public key - encryption can be used simultaneously, and will - result in a ciphertext which is decryptable - with either the symmetric **passphrase** or one - of the corresponding private keys. - - :param bool always_trust: If True, ignore trust warnings on - **recipients** keys. If False, display trust - warnings. (default: True) - - :param str output: The output file to write to. If not specified, the - encrypted output is returned, and thus should be - stored as an object in Python. For example: - - - >>> import shutil - >>> import gnupg - >>> if os.path.exists("doctests"): - ... shutil.rmtree("doctests") - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key_settings = gpg.gen_key_input(key_type='RSA', - ... key_length=1024, - ... key_usage='ESCA', - ... passphrase='foo') - >>> key = gpg.gen_key(key_settings) - >>> message = "The crow flies at midnight." - >>> encrypted = str(gpg.encrypt(message, key.printprint)) - >>> assert encrypted != message - >>> assert not encrypted.isspace() - >>> decrypted = str(gpg.decrypt(encrypted)) - >>> assert not decrypted.isspace() - >>> decrypted - 'The crow flies at midnight.' - - :param str cipher_algo: The cipher algorithm to use. To see available - algorithms with your version of GnuPG, do: - :command:`$ gpg --with-colons --list-config - ciphername`. The default **cipher_algo**, if - unspecified, is ``'AES256'``. - - :param str digest_algo: The hash digest to use. Again, to see which - hashes your GnuPG is capable of using, do: - :command:`$ gpg --with-colons --list-config - digestname`. The default, if unspecified, is - ``'SHA512'``. - - :param str compress_algo: The compression algorithm to use. Can be one - of ``'ZLIB'``, ``'BZIP2'``, ``'ZIP'``, or - ``'Uncompressed'``. - """ - args = [] - - if output: - if getattr(output, 'fileno', None) is not None: - ## avoid overwrite confirmation message - if getattr(output, 'name', None) is None: - if os.path.exists(output): - os.remove(output) - args.append('--output %s' % output) - else: - if os.path.exists(output.name): - os.remove(output.name) - args.append('--output %s' % output.name) - - if armor: args.append('--armor') - if always_trust: args.append('--always-trust') - if cipher_algo: args.append('--cipher-algo %s' % cipher_algo) - if compress_algo: args.append('--compress-algo %s' % compress_algo) - - if default_key: - args.append('--sign') - args.append('--default-key %s' % default_key) - if digest_algo: - args.append('--digest-algo %s' % digest_algo) - - ## both can be used at the same time for an encrypted file which - ## is decryptable with a passphrase or secretkey. - if symmetric: args.append('--symmetric') - if encrypt: args.append('--encrypt') - - if len(recipients) >= 1: - log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" - % (recipients, type(recipients))) - - if isinstance(recipients, (list, tuple)): - for recp in recipients: - if not _util._py3k: - if isinstance(recp, unicode): - try: - assert _parsers._is_hex(str(recp)) - except AssertionError: - log.info("Can't accept recipient string: %s" - % recp) - else: - args.append('--recipient %s' % str(recp)) - continue - ## will give unicode in 2.x as '\uXXXX\uXXXX' - args.append('--recipient %r' % recp) - continue - if isinstance(recp, str): - args.append('--recipient %s' % recp) - - elif (not _util._py3k) and isinstance(recp, basestring): - for recp in recipients.split('\x20'): - args.append('--recipient %s' % recp) - - elif _util._py3k and isinstance(recp, str): - for recp in recipients.split(' '): - args.append('--recipient %s' % recp) - ## ...and now that we've proven py3k is better... - - else: - log.debug("Don't know what to do with recipients: '%s'" - % recipients) - - result = self._result_map['crypt'](self) - log.debug("Got data '%s' with type '%s'." - % (data, type(data))) - self._handle_io(args, data, result, - passphrase=passphrase, binary=True) - log.debug("\n%s" % result.data) - return result
-
- -
-
-
-
- -
-
-
- - - - - \ No newline at end of file diff --git a/docs/_build/html/_modules/gnupg/_parsers.html b/docs/_build/html/_modules/gnupg/_parsers.html deleted file mode 100644 index a203f11..0000000 --- a/docs/_build/html/_modules/gnupg/_parsers.html +++ /dev/null @@ -1,1486 +0,0 @@ - - - - - - - - gnupg._parsers — gnupg unknown documentation - - - - - - - - - - - - -
- -
- -
-
-
- -
-
-
- -

Source code for gnupg._parsers

-# -*- coding: utf-8 -*-
-#
-# This file is part of python-gnupg, a Python interface to GnuPG.
-# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
-#           © 2013 Andrej B.
-#           © 2013 LEAP Encryption Access Project
-#           © 2008-2012 Vinay Sajip
-#           © 2005 Steve Traugott
-#           © 2004 A.M. Kuchling
-#
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
-
-'''Classes for parsing GnuPG status messages and sanitising commandline
-options.
-'''
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-try:
-    from collections import OrderedDict
-except ImportError:
-    from ordereddict import OrderedDict
-
-import re
-
-from .      import _util
-from ._util import log
-
-
-ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
-HEXIDECIMAL    = re.compile('([0-9A-Fa-f]{2})+')
-
-
-
[docs]class ProtectedOption(Exception): - """Raised when the option passed to GPG is disallowed.""" -
-
[docs]class UsageError(Exception): - """Raised when incorrect usage of the API occurs..""" - -
-
[docs]def _check_keyserver(location): - """Check that a given keyserver is a known protocol and does not contain - shell escape characters. - - :param str location: A string containing the default keyserver. This - should contain the desired keyserver protocol which - is supported by the keyserver, for example, the - default is ``'hkp://wwwkeys .pgp.net'``. - :rtype: :obj:`str` or :obj:`None` - :returns: A string specifying the protocol and keyserver hostname, if the - checks passed. If not, returns None. - """ - protocols = ['hkp://', 'hkps://', 'http://', 'https://', 'ldap://', - 'mailto:'] ## xxx feels like i´m forgetting one... - for proto in protocols: - if location.startswith(proto): - url = location.replace(proto, str()) - host, slash, extra = url.partition('/') - if extra: log.warn("URI text for %s: '%s'" % (host, extra)) - log.debug("Got host string for keyserver setting: '%s'" % host) - - host = _fix_unsafe(host) - if host: - log.debug("Cleaned host string: '%s'" % host) - keyserver = proto + host - return keyserver - return None -
-
[docs]def _check_preferences(prefs, pref_type=None): - """Check cipher, digest, and compression preference settings. - - MD5 is not allowed. This is `not 1994`__. SHA1 is allowed_ grudgingly_. - - __ http://www.cs.colorado.edu/~jrblack/papers/md5e-full.pdf - .. _allowed: http://eprint.iacr.org/2008/469.pdf - .. _grudgingly: https://www.schneier.com/blog/archives/2012/10/when_will_we_se.html - """ - if prefs is None: return - - cipher = frozenset(['AES256', 'AES192', 'AES128', - 'CAMELLIA256', 'CAMELLIA192', - 'TWOFISH', '3DES']) - digest = frozenset(['SHA512', 'SHA384', 'SHA256', 'SHA224', 'RMD160', - 'SHA1']) - compress = frozenset(['BZIP2', 'ZLIB', 'ZIP', 'Uncompressed']) - all = frozenset([cipher, digest, compress]) - - if isinstance(prefs, str): - prefs = set(prefs.split()) - elif isinstance(prefs, list): - prefs = set(prefs) - else: - msg = "prefs must be list of strings, or space-separated string" - log.error("parsers._check_preferences(): %s" % message) - raise TypeError(message) - - if not pref_type: - pref_type = 'all' - - allowed = str() - - if pref_type == 'cipher': - allowed += ' '.join(prefs.intersection(cipher)) - if pref_type == 'digest': - allowed += ' '.join(prefs.intersection(digest)) - if pref_type == 'compress': - allowed += ' '.join(prefs.intersection(compress)) - if pref_type == 'all': - allowed += ' '.join(prefs.intersection(all)) - - return allowed -
-
[docs]def _fix_unsafe(shell_input): - """Find characters used to escape from a string into a shell, and wrap them in - quotes if they exist. Regex pilfered from Python3 :mod:`shlex` module. - - :param str shell_input: The input intended for the GnuPG process. - """ - _unsafe = re.compile(r'[^\w@%+=:,./-]', 256) - try: - if len(_unsafe.findall(shell_input)) == 0: - return shell_input.strip() - else: - clean = "'" + shell_input.replace("'", "'\"'\"'") + "'" - return clean - except TypeError: - return None -
-
[docs]def _hyphenate(input, add_prefix=False): - """Change underscores to hyphens so that object attributes can be easily - tranlated to GPG option names. - - :param str input: The attribute to hyphenate. - :param bool add_prefix: If True, add leading hyphens to the input. - :rtype: str - :return: The ``input`` with underscores changed to hyphens. - """ - ret = '--' if add_prefix else '' - ret += input.replace('_', '-') - return ret -
-
[docs]def _is_allowed(input): - """Check that an option or argument given to GPG is in the set of allowed - options, the latter being a strict subset of the set of all options known - to GPG. - - :param str input: An input meant to be parsed as an option or flag to the - GnuPG process. Should be formatted the same as an option - or flag to the commandline gpg, i.e. "--encrypt-files". - - :ivar frozenset gnupg_options: All known GPG options and flags. - - :ivar frozenset allowed: All allowed GPG options and flags, e.g. all GPG - options and flags which we are willing to - acknowledge and parse. If we want to support a - new option, it will need to have its own parsing - class and its name will need to be added to this - set. - - :raises: :exc:`UsageError` if **input** is not a subset of the hard-coded - set of all GnuPG options in :func:`_get_all_gnupg_options`. - - :exc:`ProtectedOption` if **input** is not in the set of allowed - options. - - :rtype: str - :return: The original **input** parameter, unmodified and unsanitized, if - no errors occur. - """ - gnupg_options = _get_all_gnupg_options() - allowed = _get_options_group("allowed") - - ## these are the allowed options we will handle so far, all others should - ## be dropped. this dance is so that when new options are added later, we - ## merely add the to the _allowed list, and the `` _allowed.issubset`` - ## assertion will check that GPG will recognise them - try: - ## check that allowed is a subset of all gnupg_options - assert allowed.issubset(gnupg_options) - except AssertionError: - raise UsageError("'allowed' isn't a subset of known options, diff: %s" - % allowed.difference(gnupg_options)) - - ## if we got a list of args, join them - ## - ## see TODO file, tag :cleanup: - if not isinstance(input, str): - input = ' '.join([x for x in input]) - - if isinstance(input, str): - if input.find('_') > 0: - if not input.startswith('--'): - hyphenated = _hyphenate(input, add_prefix=True) - else: - hyphenated = _hyphenate(input) - else: - hyphenated = input - ## xxx we probably want to use itertools.dropwhile here - try: - assert hyphenated in allowed - except AssertionError as ae: - dropped = _fix_unsafe(hyphenated) - log.warn("_is_allowed(): Dropping option '%s'..." % dropped) - raise ProtectedOption("Option '%s' not supported." % dropped) - else: - return input - return None -
-
[docs]def _is_hex(string): - """Check that a string is hexidecimal, with alphabetic characters - capitalized and without whitespace. - - :param str string: The string to check. - """ - matched = HEXIDECIMAL.match(string) - if matched is not None and len(matched.group()) >= 2: - return True - return False -
-
[docs]def _is_string(thing): - """Python character arrays are a mess. - - If Python2, check if **thing** is an :obj:`unicode` or a :obj:`str`. - If Python3, check if **thing** is a :obj:`str`. - - :param thing: The thing to check. - :returns: ``True`` if **thing** is a string according to whichever version - of Python we're running in. - """ - if _util._py3k: return isinstance(thing, str) - else: return isinstance(thing, basestring) -
-
[docs]def _sanitise(*args): - """Take an arg or the key portion of a kwarg and check that it is in the - set of allowed GPG options and flags, and that it has the correct - type. Then, attempt to escape any unsafe characters. If an option is not - allowed, drop it with a logged warning. Returns a dictionary of all - sanitised, allowed options. - - Each new option that we support that is not a boolean, but instead has - some additional inputs following it, i.e. "--encrypt-file foo.txt", will - need some basic safety checks added here. - - GnuPG has three-hundred and eighteen commandline flags. Also, not all - implementations of OpenPGP parse PGP packets and headers in the same way, - so there is added potential there for messing with calls to GPG. - - For information on the PGP message format specification, see - :rfc:`1991`. - - If you're asking, "Is this *really* necessary?": No, not really -- we could - just follow the security precautions recommended by `this xkcd`__. - - __ https://xkcd.com/1181/ - - :param str args: (optional) The boolean arguments which will be passed to - the GnuPG process. - :rtype: str - :returns: ``sanitised`` - """ - - ## see TODO file, tag :cleanup:sanitise: - - def _check_option(arg, value): - """Check that a single ``arg`` is an allowed option. - - If it is allowed, quote out any escape characters in ``value``, and - add the pair to :ivar:`sanitised`. Otherwise, drop them. - - :param str arg: The arguments which will be passed to the GnuPG - process, and, optionally their corresponding values. - The values are any additional arguments following the - GnuPG option or flag. For example, if we wanted to - pass ``"--encrypt --recipient isis@leap.se"`` to - GnuPG, then ``"--encrypt"`` would be an arg without a - value, and ``"--recipient"`` would also be an arg, - with a value of ``"isis@leap.se"``. - - :ivar list checked: The sanitised, allowed options and values. - :rtype: str - :returns: A string of the items in ``checked``, delimited by spaces. - """ - checked = str() - none_options = _get_options_group("none_options") - hex_options = _get_options_group("hex_options") - hex_or_none_options = _get_options_group("hex_or_none_options") - - if not _util._py3k: - if not isinstance(arg, list) and isinstance(arg, unicode): - arg = str(arg) - - try: - flag = _is_allowed(arg) - assert flag is not None, "_check_option(): got None for flag" - except (AssertionError, ProtectedOption) as error: - log.warn("_check_option(): %s" % str(error)) - else: - checked += (flag + ' ') - - if _is_string(value): - values = value.split(' ') - for v in values: - ## these can be handled separately, without _fix_unsafe(), - ## because they are only allowed if they pass the regex - if (flag in none_options) and (v is None): - continue - - if flag in hex_options: - if _is_hex(v): checked += (v + " ") - else: - log.debug("'%s %s' not hex." % (flag, v)) - if (flag in hex_or_none_options) and (v is None): - log.debug("Allowing '%s' for all keys" % flag) - continue - - elif flag in ['--keyserver']: - host = _check_keyserver(v) - if host: - log.debug("Setting keyserver: %s" % host) - checked += (v + " ") - else: log.debug("Dropping keyserver: %s" % v) - continue - - ## the rest are strings, filenames, etc, and should be - ## shell escaped: - val = _fix_unsafe(v) - try: - assert not val is None - assert not val.isspace() - assert not v is None - assert not v.isspace() - except: - log.debug("Dropping %s %s" % (flag, v)) - continue - - if flag in ['--encrypt', '--encrypt-files', '--decrypt', - '--decrypt-files', '--import', '--verify']: - if ( (_util._is_file(val)) - or - ((flag == '--verify') and (val == '-')) ): - checked += (val + " ") - else: - log.debug("%s not file: %s" % (flag, val)) - - elif flag in ['--cipher-algo', '--personal-cipher-prefs', - '--personal-cipher-preferences']: - legit_algos = _check_preferences(val, 'cipher') - if legit_algos: checked += (legit_algos + " ") - else: log.debug("'%s' is not cipher" % val) - - elif flag in ['--compress-algo', '--compression-algo', - '--personal-compress-prefs', - '--personal-compress-preferences']: - legit_algos = _check_preferences(val, 'compress') - if legit_algos: checked += (legit_algos + " ") - else: log.debug("'%s' not compress algo" % val) - - else: - checked += (val + " ") - log.debug("_check_option(): No checks for %s" % val) - - return checked - - is_flag = lambda x: x.startswith('--') - - def _make_filo(args_string): - filo = arg.split(' ') - filo.reverse() - log.debug("_make_filo(): Converted to reverse list: %s" % filo) - return filo - - def _make_groups(filo): - groups = {} - while len(filo) >= 1: - last = filo.pop() - if is_flag(last): - log.debug("Got arg: %s" % last) - if last == '--verify': - groups[last] = str(filo.pop()) - ## accept the read-from-stdin arg: - if len(filo) >= 1 and filo[len(filo)-1] == '-': - groups[last] += str(' - ') ## gross hack - filo.pop() - else: - groups[last] = str() - while len(filo) > 1 and not is_flag(filo[len(filo)-1]): - log.debug("Got value: %s" % filo[len(filo)-1]) - groups[last] += (filo.pop() + " ") - else: - if len(filo) == 1 and not is_flag(filo[0]): - log.debug("Got value: %s" % filo[0]) - groups[last] += filo.pop() - else: - log.warn("_make_groups(): Got solitary value: %s" % last) - groups["xxx"] = last - return groups - - def _check_groups(groups): - log.debug("Got groups: %s" % groups) - checked_groups = [] - for a,v in groups.items(): - v = None if len(v) == 0 else v - safe = _check_option(a, v) - if safe is not None and not safe.strip() == "": - log.debug("Appending option: %s" % safe) - checked_groups.append(safe) - else: - log.warn("Dropped option: '%s %s'" % (a,v)) - return checked_groups - - if args is not None: - option_groups = {} - for arg in args: - ## if we're given a string with a bunch of options in it split - ## them up and deal with them separately - if (not _util._py3k and isinstance(arg, basestring)) \ - or (_util._py3k and isinstance(arg, str)): - log.debug("Got arg string: %s" % arg) - if arg.find(' ') > 0: - filo = _make_filo(arg) - option_groups.update(_make_groups(filo)) - else: - option_groups.update({ arg: "" }) - elif isinstance(arg, list): - log.debug("Got arg list: %s" % arg) - arg.reverse() - option_groups.update(_make_groups(arg)) - else: - log.warn("Got non-str/list arg: '%s', type '%s'" - % (arg, type(arg))) - checked = _check_groups(option_groups) - sanitised = ' '.join(x for x in checked) - return sanitised - else: - log.debug("Got None for args") -
-
[docs]def _sanitise_list(arg_list): - """A generator for iterating through a list of gpg options and sanitising - them. - - :param list arg_list: A list of options and flags for GnuPG. - :rtype: generator - :returns: A generator whose next() method returns each of the items in - ``arg_list`` after calling ``_sanitise()`` with that item as a - parameter. - """ - if isinstance(arg_list, list): - for arg in arg_list: - safe_arg = _sanitise(arg) - if safe_arg != "": - yield safe_arg -
-
[docs]def _get_options_group(group=None): - """Get a specific group of options which are allowed.""" - - #: These expect a hexidecimal keyid as their argument, and can be parsed - #: with :func:`_is_hex`. - hex_options = frozenset(['--check-sigs', - '--default-key', - '--default-recipient', - '--delete-keys', - '--delete-secret-keys', - '--delete-secret-and-public-keys', - '--desig-revoke', - '--export', - '--export-secret-keys', - '--export-secret-subkeys', - '--fingerprint', - '--gen-revoke', - '--list-key', - '--list-keys', - '--list-public-keys', - '--list-secret-keys', - '--list-sigs', - '--recipient', - '--recv-keys', - '--send-keys', - ]) - #: These options expect value which are left unchecked, though still run - #: through :func:`_fix_unsafe`. - unchecked_options = frozenset(['--list-options', - '--passphrase-fd', - '--status-fd', - '--verify-options', - ]) - #: These have their own parsers and don't really fit into a group - other_options = frozenset(['--debug-level', - '--keyserver', - - ]) - #: These should have a directory for an argument - dir_options = frozenset(['--homedir', - ]) - #: These expect a keyring or keyfile as their argument - keyring_options = frozenset(['--keyring', - '--primary-keyring', - '--secret-keyring', - '--trustdb-name', - ]) - #: These expect a filename (or the contents of a file as a string) or None - #: (meaning that they read from stdin) - file_or_none_options = frozenset(['--decrypt', - '--decrypt-files', - '--encrypt', - '--encrypt-files', - '--import', - '--verify', - '--verify-files', - ]) - #: These options expect a string. see :func:`_check_preferences`. - pref_options = frozenset(['--digest-algo', - '--cipher-algo', - '--compress-algo', - '--compression-algo', - '--cert-digest-algo', - '--personal-digest-prefs', - '--personal-digest-preferences', - '--personal-cipher-prefs', - '--personal-cipher-preferences', - '--personal-compress-prefs', - '--personal-compress-preferences', - '--print-md', - ]) - #: These options expect no arguments - none_options = frozenset(['--always-trust', - '--armor', - '--armour', - '--batch', - '--check-sigs', - '--check-trustdb', - '--clearsign', - '--debug-all', - '--default-recipient-self', - '--detach-sign', - '--export', - '--export-ownertrust', - '--export-secret-keys', - '--export-secret-subkeys', - '--fingerprint', - '--fixed-list-mode', - '--gen-key', - '--import-ownertrust', - '--list-config', - '--list-key', - '--list-keys', - '--list-packets', - '--list-public-keys', - '--list-secret-keys', - '--list-sigs', - '--no-default-keyring', - '--no-default-recipient', - '--no-emit-version', - '--no-options', - '--no-tty', - '--no-use-agent', - '--no-verbose', - '--print-mds', - '--quiet', - '--sign', - '--symmetric', - '--use-agent', - '--verbose', - '--version', - '--with-colons', - '--yes', - ]) - #: These options expect either None or a hex string - hex_or_none_options = hex_options.intersection(none_options) - allowed = hex_options.union(unchecked_options, other_options, dir_options, - keyring_options, file_or_none_options, - pref_options, none_options) - - if group and group in locals().keys(): - return locals()[group] -
-
[docs]def _get_all_gnupg_options(): - """Get all GnuPG options and flags. - - This is hardcoded within a local scope to reduce the chance of a tampered - GnuPG binary reporting falsified option sets, i.e. because certain options - (namedly the ``--no-options`` option, which prevents the usage of gpg.conf - files) are necessary and statically specified in - :meth:`gnupg._meta.GPGBase._make_args`, if the inputs into Python are - already controlled, and we were to summon the GnuPG binary to ask it for - its options, it would be possible to receive a falsified options set - missing the ``--no-options`` option in response. This seems unlikely, and - the method is stupid and ugly, but at least we'll never have to debug - whether or not an option *actually* disappeared in a different GnuPG - version, or some funny business is happening. - - These are the options as of GnuPG 1.4.12; the current stable branch of the - 2.1.x tree contains a few more -- if you need them you'll have to add them - in here. - - :type gnupg_options: frozenset - :ivar gnupg_options: All known GPG options and flags. - :rtype: frozenset - :returns: ``gnupg_options`` - """ - three_hundred_eighteen = (""" ---allow-freeform-uid --multifile ---allow-multiple-messages --no ---allow-multisig-verification --no-allow-freeform-uid ---allow-non-selfsigned-uid --no-allow-multiple-messages ---allow-secret-key-import --no-allow-non-selfsigned-uid ---always-trust --no-armor ---armor --no-armour ---armour --no-ask-cert-expire ---ask-cert-expire --no-ask-cert-level ---ask-cert-level --no-ask-sig-expire ---ask-sig-expire --no-auto-check-trustdb ---attribute-fd --no-auto-key-locate ---attribute-file --no-auto-key-retrieve ---auto-check-trustdb --no-batch ---auto-key-locate --no-comments ---auto-key-retrieve --no-default-keyring ---batch --no-default-recipient ---bzip2-compress-level --no-disable-mdc ---bzip2-decompress-lowmem --no-emit-version ---card-edit --no-encrypt-to ---card-status --no-escape-from-lines ---cert-digest-algo --no-expensive-trust-checks ---cert-notation --no-expert ---cert-policy-url --no-force-mdc ---change-pin --no-force-v3-sigs ---charset --no-force-v4-certs ---check-sig --no-for-your-eyes-only ---check-sigs --no-greeting ---check-trustdb --no-groups ---cipher-algo --no-literal ---clearsign --no-mangle-dos-filenames ---command-fd --no-mdc-warning ---command-file --no-options ---comment --no-permission-warning ---completes-needed --no-pgp2 ---compress-algo --no-pgp6 ---compression-algo --no-pgp7 ---compress-keys --no-pgp8 ---compress-level --no-random-seed-file ---compress-sigs --no-require-backsigs ---ctapi-driver --no-require-cross-certification ---dearmor --no-require-secmem ---dearmour --no-rfc2440-text ---debug --no-secmem-warning ---debug-all --no-show-notation ---debug-ccid-driver --no-show-photos ---debug-level --no-show-policy-url ---decrypt --no-sig-cache ---decrypt-files --no-sig-create-check ---default-cert-check-level --no-sk-comments ---default-cert-expire --no-strict ---default-cert-level --notation-data ---default-comment --not-dash-escaped ---default-key --no-textmode ---default-keyserver-url --no-throw-keyid ---default-preference-list --no-throw-keyids ---default-recipient --no-tty ---default-recipient-self --no-use-agent ---default-sig-expire --no-use-embedded-filename ---delete-keys --no-utf8-strings ---delete-secret-and-public-keys --no-verbose ---delete-secret-keys --no-version ---desig-revoke --openpgp ---detach-sign --options ---digest-algo --output ---disable-ccid --override-session-key ---disable-cipher-algo --passphrase ---disable-dsa2 --passphrase-fd ---disable-mdc --passphrase-file ---disable-pubkey-algo --passphrase-repeat ---display --pcsc-driver ---display-charset --personal-cipher-preferences ---dry-run --personal-cipher-prefs ---dump-options --personal-compress-preferences ---edit-key --personal-compress-prefs ---emit-version --personal-digest-preferences ---enable-dsa2 --personal-digest-prefs ---enable-progress-filter --pgp2 ---enable-special-filenames --pgp6 ---enarmor --pgp7 ---enarmour --pgp8 ---encrypt --photo-viewer ---encrypt-files --pipemode ---encrypt-to --preserve-permissions ---escape-from-lines --primary-keyring ---exec-path --print-md ---exit-on-status-write-error --print-mds ---expert --quick-random ---export --quiet ---export-options --reader-port ---export-ownertrust --rebuild-keydb-caches ---export-secret-keys --recipient ---export-secret-subkeys --recv-keys ---fast-import --refresh-keys ---fast-list-mode --remote-user ---fetch-keys --require-backsigs ---fingerprint --require-cross-certification ---fixed-list-mode --require-secmem ---fix-trustdb --rfc1991 ---force-mdc --rfc2440 ---force-ownertrust --rfc2440-text ---force-v3-sigs --rfc4880 ---force-v4-certs --run-as-shm-coprocess ---for-your-eyes-only --s2k-cipher-algo ---gen-key --s2k-count ---gen-prime --s2k-digest-algo ---gen-random --s2k-mode ---gen-revoke --search-keys ---gnupg --secret-keyring ---gpg-agent-info --send-keys ---gpgconf-list --set-filename ---gpgconf-test --set-filesize ---group --set-notation ---help --set-policy-url ---hidden-encrypt-to --show-keyring ---hidden-recipient --show-notation ---homedir --show-photos ---honor-http-proxy --show-policy-url ---ignore-crc-error --show-session-key ---ignore-mdc-error --sig-keyserver-url ---ignore-time-conflict --sign ---ignore-valid-from --sign-key ---import --sig-notation ---import-options --sign-with ---import-ownertrust --sig-policy-url ---interactive --simple-sk-checksum ---keyid-format --sk-comments ---keyring --skip-verify ---keyserver --status-fd ---keyserver-options --status-file ---lc-ctype --store ---lc-messages --strict ---limit-card-insert-tries --symmetric ---list-config --temp-directory ---list-key --textmode ---list-keys --throw-keyid ---list-only --throw-keyids ---list-options --trustdb-name ---list-ownertrust --trusted-key ---list-packets --trust-model ---list-public-keys --try-all-secrets ---list-secret-keys --ttyname ---list-sig --ttytype ---list-sigs --ungroup ---list-trustdb --update-trustdb ---load-extension --use-agent ---local-user --use-embedded-filename ---lock-multiple --user ---lock-never --utf8-strings ---lock-once --verbose ---logger-fd --verify ---logger-file --verify-files ---lsign-key --verify-options ---mangle-dos-filenames --version ---marginals-needed --warranty ---max-cert-depth --with-colons ---max-output --with-fingerprint ---merge-only --with-key-data ---min-cert-level --yes -""").split() - - # These are extra options which only exist for GnuPG>=2.0.0 - three_hundred_eighteen.append('--export-ownertrust') - three_hundred_eighteen.append('--import-ownertrust') - - gnupg_options = frozenset(three_hundred_eighteen) - return gnupg_options -
-
[docs]def nodata(status_code): - """Translate NODATA status codes from GnuPG to messages.""" - lookup = { - '1': 'No armored data.', - '2': 'Expected a packet but did not find one.', - '3': 'Invalid packet found, this may indicate a non OpenPGP message.', - '4': 'Signature expected but not found.' } - for key, value in lookup.items(): - if str(status_code) == key: - return value -
-
[docs]def progress(status_code): - """Translate PROGRESS status codes from GnuPG to messages.""" - lookup = { - 'pk_dsa': 'DSA key generation', - 'pk_elg': 'Elgamal key generation', - 'primegen': 'Prime generation', - 'need_entropy': 'Waiting for new entropy in the RNG', - 'tick': 'Generic tick without any special meaning - still working.', - 'starting_agent': 'A gpg-agent was started.', - 'learncard': 'gpg-agent or gpgsm is learning the smartcard data.', - 'card_busy': 'A smartcard is still working.' } - for key, value in lookup.items(): - if str(status_code) == key: - return value - -
-
[docs]class GenKey(object): - """Handle status messages for key generation. - - Calling the ``__str__()`` method of this class will return the generated - key's fingerprint, or a status string explaining the results. - """ - def __init__(self, gpg): - self._gpg = gpg - ## this should get changed to something more useful, like 'key_type' - #: 'P':= primary, 'S':= subkey, 'B':= both - self.type = None - self.fingerprint = None - self.status = None - self.subkey_created = False - self.primary_created = False - #: This will store the key's public keyring filename, if - #: :meth:`~gnupg.GPG.gen_key_input` was called with - #: ``separate_keyring=True``. - self.keyring = None - #: This will store the key's secret keyring filename, if : - #: :meth:`~gnupg.GPG.gen_key_input` was called with - #: ``separate_keyring=True``. - self.secring = None - - def __nonzero__(self): - if self.fingerprint: return True - return False - __bool__ = __nonzero__ - - def __str__(self): - if self.fingerprint: - return self.fingerprint - else: - if self.status is not None: - return self.status - else: - return False - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in ("GOOD_PASSPHRASE"): - pass - elif key == "KEY_NOT_CREATED": - self.status = 'key not created' - elif key == "KEY_CREATED": - (self.type, self.fingerprint) = value.split() - self.status = 'key created' - elif key == "NODATA": - self.status = nodata(value) - elif key == "PROGRESS": - self.status = progress(value.split(' ', 1)[0]) - else: - raise ValueError("Unknown status message: %r" % key) - - if self.type in ('B', 'P'): - self.primary_created = True - if self.type in ('B', 'S'): - self.subkey_created = True -
-
[docs]class DeleteResult(object): - """Handle status messages for --delete-keys and --delete-secret-keys""" - def __init__(self, gpg): - self._gpg = gpg - self.status = 'ok' - - def __str__(self): - return self.status - - problem_reason = { '1': 'No such key', - '2': 'Must delete secret key first', - '3': 'Ambigious specification', } - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key == "DELETE_PROBLEM": - self.status = self.problem_reason.get(value, "Unknown error: %r" - % value) - else: - raise ValueError("Unknown status message: %r" % key) -
-
[docs]class Sign(object): - """Parse GnuPG status messages for signing operations. - - :param gpg: An instance of :class:`gnupg.GPG`. - """ - - #: The type of signature created. - sig_type = None - #: The algorithm used to create the signature. - sig_algo = None - #: The hash algorithm used to create the signature. - sig_hash_also = None - #: The fingerprint of the signing keyid. - fingerprint = None - #: The timestamp on the signature. - timestamp = None - #: xxx fill me in - what = None - - def __init__(self, gpg): - self._gpg = gpg - - def __nonzero__(self): - """Override the determination for truthfulness evaluation. - - :rtype: bool - :returns: True if we have a valid signature, False otherwise. - """ - return self.fingerprint is not None - __bool__ = __nonzero__ - - def __str__(self): - return self.data.decode(self._gpg._encoding, self._gpg._decode_errors) - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", - "INV_SGNR", "SIGEXPIRED"): - pass - elif key == "SIG_CREATED": - (self.sig_type, self.sig_algo, self.sig_hash_algo, - self.what, self.timestamp, self.fingerprint) = value.split() - elif key == "KEYEXPIRED": - self.status = "skipped signing key, key expired" - if (value is not None) and (len(value) > 0): - self.status += " on {}".format(str(value)) - elif key == "KEYREVOKED": - self.status = "skipped signing key, key revoked" - if (value is not None) and (len(value) > 0): - self.status += " on {}".format(str(value)) - elif key == "NODATA": - self.status = nodata(value) - else: - raise ValueError("Unknown status message: %r" % key) -
-
[docs]class ListKeys(list): - """Handle status messages for --list-keys. - - Handles pub and uid (relating the latter to the former). Don't care about - the following attributes/status messages (from doc/DETAILS): - - | crt = X.509 certificate - | crs = X.509 certificate and private key available - | ssb = secret subkey (secondary key) - | uat = user attribute (same as user id except for field 10). - | sig = signature - | rev = revocation signature - | pkd = public key data (special field format, see below) - | grp = reserved for gpgsm - | rvk = revocation key - """ - - def __init__(self, gpg): - super(ListKeys, self).__init__() - self._gpg = gpg - self.curkey = None - self.fingerprints = [] - self.uids = [] - -
[docs] def key(self, args): - vars = (""" - type trust length algo keyid date expires dummy ownertrust uid - """).split() - self.curkey = {} - for i in range(len(vars)): - self.curkey[vars[i]] = args[i] - self.curkey['uids'] = [] - if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) - del self.curkey['uid'] - self.curkey['subkeys'] = [] - self.append(self.curkey) -
- pub = sec = key - -
[docs] def fpr(self, args): - self.curkey['fingerprint'] = args[9] - self.fingerprints.append(args[9]) -
-
[docs] def uid(self, args): - uid = args[9] - uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) - self.curkey['uids'].append(uid) - self.uids.append(uid) -
-
[docs] def sub(self, args): - subkey = [args[4], args[11]] - self.curkey['subkeys'].append(subkey) -
-
[docs] def _handle_status(self, key, value): - pass - -
-
[docs]class ImportResult(object): - """Parse GnuPG status messages for key import operations. - - :type gpg: :class:`gnupg.GPG` - :param gpg: An instance of :class:`gnupg.GPG`. - """ - _ok_reason = {'0': 'Not actually changed', - '1': 'Entirely new key', - '2': 'New user IDs', - '4': 'New signatures', - '8': 'New subkeys', - '16': 'Contains private key', - '17': 'Contains private key',} - - _problem_reason = { '0': 'No specific reason given', - '1': 'Invalid Certificate', - '2': 'Issuer Certificate missing', - '3': 'Certificate Chain too long', - '4': 'Error storing certificate', } - - _fields = '''count no_user_id imported imported_rsa unchanged - n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups - not_imported'''.split() - _counts = OrderedDict( - zip(_fields, [int(0) for x in range(len(_fields))]) ) - - #: A list of strings containing the fingerprints of the GnuPG keyIDs - #: imported. - fingerprints = list() - - #: A list containing dictionaries with information gathered on keys - #: imported. - results = list() - - def __init__(self, gpg): - self._gpg = gpg - self.counts = self._counts - - def __nonzero__(self): - """Override the determination for truthfulness evaluation. - - :rtype: bool - :returns: True if we have immport some keys, False otherwise. - """ - if self.counts.not_imported > 0: return False - if len(self.fingerprints) == 0: return False - return True - __bool__ = __nonzero__ - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key == "IMPORTED": - # this duplicates info we already see in import_ok & import_problem - pass - elif key == "NODATA": - self.results.append({'fingerprint': None, - 'status': 'No valid data found'}) - elif key == "IMPORT_OK": - reason, fingerprint = value.split() - reasons = [] - for code, text in self._ok_reason.items(): - if int(reason) == int(code): - reasons.append(text) - reasontext = '\n'.join(reasons) + "\n" - self.results.append({'fingerprint': fingerprint, - 'status': reasontext}) - self.fingerprints.append(fingerprint) - elif key == "IMPORT_PROBLEM": - try: - reason, fingerprint = value.split() - except: - reason = value - fingerprint = '<unknown>' - self.results.append({'fingerprint': fingerprint, - 'status': self._problem_reason[reason]}) - elif key == "IMPORT_RES": - import_res = value.split() - for x in self.counts.keys(): - self.counts[x] = int(import_res.pop(0)) - elif key == "KEYEXPIRED": - res = {'fingerprint': None, - 'status': 'Key expired'} - self.results.append(res) - ## Accoring to docs/DETAILS L859, SIGEXPIRED is obsolete: - ## "Removed on 2011-02-04. This is deprecated in favor of KEYEXPIRED." - elif key == "SIGEXPIRED": - res = {'fingerprint': None, - 'status': 'Signature expired'} - self.results.append(res) - else: - raise ValueError("Unknown status message: %r" % key) -
-
[docs] def summary(self): - l = [] - l.append('%d imported' % self.counts['imported']) - if self.counts['not_imported']: - l.append('%d not imported' % self.counts['not_imported']) - return ', '.join(l) - -
-
[docs]class Verify(object): - """Parser for status messages from GnuPG for certifications and signature - verifications. - - People often mix these up, or think that they are the same thing. While it - is true that certifications and signatures *are* the same cryptographic - operation -- and also true that both are the same as the decryption - operation -- a distinction is made for important reasons. - - A certification: - * is made on a key, - * can help to validate or invalidate the key owner's identity, - * can assign trust levels to the key (or to uids and/or subkeys that - the key contains), - * and can be used in absense of in-person fingerprint checking to try - to build a path (through keys whose fingerprints have been checked) - to the key, so that the identity of the key's owner can be more - reliable without having to actually physically meet in person. - - A signature: - * is created for a file or other piece of data, - * can help to prove that the data hasn't been altered, - * and can help to prove that the data was sent by the person(s) in - possession of the private key that created the signature, and for - parsing portions of status messages from decryption operations. - - There are probably other things unique to each that have been - scatterbrainedly omitted due to the programmer sitting still and staring - at GnuPG debugging logs for too long without snacks, but that is the gist - of it. - """ - - TRUST_UNDEFINED = 0 - TRUST_NEVER = 1 - TRUST_MARGINAL = 2 - TRUST_FULLY = 3 - TRUST_ULTIMATE = 4 - - TRUST_LEVELS = {"TRUST_UNDEFINED" : TRUST_UNDEFINED, - "TRUST_NEVER" : TRUST_NEVER, - "TRUST_MARGINAL" : TRUST_MARGINAL, - "TRUST_FULLY" : TRUST_FULLY, - "TRUST_ULTIMATE" : TRUST_ULTIMATE,} - - def __init__(self, gpg): - """Create a parser for verification and certification commands. - - :param gpg: An instance of :class:`gnupg.GPG`. - """ - self._gpg = gpg - #: True if the signature is valid, False otherwise. - self.valid = False - #: A string describing the status of the signature verification. - #: Can be one of ``signature bad``, ``signature good``, - #: ``signature valid``, ``signature error``, ``decryption failed``, - #: ``no public key``, ``key exp``, or ``key rev``. - self.status = None - #: The fingerprint of the signing keyid. - self.fingerprint = None - #: The fingerprint of the corresponding public key, which may be - #: different if the signature was created with a subkey. - self.pubkey_fingerprint = None - #: The keyid of the signing key. - self.key_id = None - #: The id of the signature itself. - self.signature_id = None - #: The creation date of the signing key. - self.creation_date = None - #: The timestamp of the purported signature, if we are unable to parse - #: and/or validate it. - self.timestamp = None - #: The timestamp for when the valid signature was created. - self.sig_timestamp = None - #: The userid of the signing key which was used to create the - #: signature. - self.username = None - #: When the signing key is due to expire. - self.expire_timestamp = None - #: An integer 0-4 describing the trust level of the signature. - self.trust_level = None - #: The string corresponding to the ``trust_level`` number. - self.trust_text = None - - def __nonzero__(self): - """Override the determination for truthfulness evaluation. - - :rtype: bool - :returns: True if we have a valid signature, False otherwise. - """ - return self.valid - __bool__ = __nonzero__ - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in self.TRUST_LEVELS: - self.trust_text = key - self.trust_level = self.TRUST_LEVELS[key] - elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", - "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", - "DECRYPTION_OKAY", "INV_SGNR"): - pass - elif key == "BADSIG": - self.valid = False - self.status = 'signature bad' - self.key_id, self.username = value.split(None, 1) - elif key == "GOODSIG": - self.valid = True - self.status = 'signature good' - self.key_id, self.username = value.split(None, 1) - elif key == "VALIDSIG": - (self.fingerprint, - self.creation_date, - self.sig_timestamp, - self.expire_timestamp) = value.split()[:4] - # may be different if signature is made with a subkey - self.pubkey_fingerprint = value.split()[-1] - self.status = 'signature valid' - elif key == "SIG_ID": - (self.signature_id, - self.creation_date, self.timestamp) = value.split() - elif key == "ERRSIG": - self.valid = False - (self.key_id, - algo, hash_algo, - cls, - self.timestamp) = value.split()[:5] - self.status = 'signature error' - elif key == "DECRYPTION_FAILED": - self.valid = False - self.key_id = value - self.status = 'decryption failed' - elif key == "NO_PUBKEY": - self.valid = False - self.key_id = value - self.status = 'no public key' - elif key in ("KEYEXPIRED", "SIGEXPIRED"): - # these are useless in verify, since they are spit out for any - # pub/subkeys on the key, not just the one doing the signing. - # if we want to check for signatures with expired key, - # the relevant flag is EXPKEYSIG. - pass - elif key in ("EXPKEYSIG", "REVKEYSIG"): - # signed with expired or revoked key - self.valid = False - self.key_id = value.split()[0] - self.status = (('%s %s') % (key[:3], key[3:])).lower() - else: - raise ValueError("Unknown status message: %r" % key) - -
-
[docs]class Crypt(Verify): - """Parser for internal status messages from GnuPG for ``--encrypt``, - ``--decrypt``, and ``--decrypt-files``. - """ - def __init__(self, gpg): - Verify.__init__(self, gpg) - self._gpg = gpg - #: A string containing the encrypted or decrypted data. - self.data = '' - #: True if the decryption/encryption process turned out okay. - self.ok = False - #: A string describing the current processing status, or error, if one - #: has occurred. - self.status = None - self.data_format = None - self.data_timestamp = None - self.data_filename = None - - def __nonzero__(self): - if self.ok: return True - return False - __bool__ = __nonzero__ - - def __str__(self): - """The str() method for a :class:`Crypt` object will automatically return the - decoded data string, which stores the encryped or decrypted data. - - In other words, these two statements are equivalent: - - >>> assert decrypted.data == str(decrypted) - - """ - return self.data.decode(self._gpg._encoding, self._gpg._decode_errors) - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", - "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", - "CARDCTRL"): - # in the case of ERROR, this is because a more specific error - # message will have come first - pass - elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", - "MISSING_PASSPHRASE", "DECRYPTION_FAILED", - "KEY_NOT_CREATED"): - self.status = key.replace("_", " ").lower() - elif key == "NEED_TRUSTDB": - self._gpg._create_trustdb() - elif key == "NEED_PASSPHRASE_SYM": - self.status = 'need symmetric passphrase' - elif key == "BEGIN_DECRYPTION": - self.status = 'decryption incomplete' - elif key == "BEGIN_ENCRYPTION": - self.status = 'encryption incomplete' - elif key == "DECRYPTION_OKAY": - self.status = 'decryption ok' - self.ok = True - elif key == "END_ENCRYPTION": - self.status = 'encryption ok' - self.ok = True - elif key == "INV_RECP": - self.status = 'invalid recipient' - elif key == "KEYEXPIRED": - self.status = 'key expired' - elif key == "KEYREVOKED": - self.status = 'key revoked' - elif key == "SIG_CREATED": - self.status = 'sig created' - elif key == "SIGEXPIRED": - self.status = 'sig expired' - elif key == "PLAINTEXT": - fmt, dts = value.split(' ', 1) - if dts.find(' ') > 0: - self.data_timestamp, self.data_filename = dts.split(' ', 1) - else: - self.data_timestamp = dts - ## GnuPG gives us a hex byte for an ascii char corresponding to - ## the data format of the resulting plaintext, - ## i.e. '62'→'b':= binary data - self.data_format = chr(int(str(fmt), 16)) - else: - super(Crypt, self)._handle_status(key, value) -
-
[docs]class ListPackets(object): - """Handle status messages for --list-packets.""" - - def __init__(self, gpg): - self._gpg = gpg - #: A string describing the current processing status, or error, if one - #: has occurred. - self.status = None - #: True if the passphrase to a public/private keypair is required. - self.need_passphrase = None - #: True if a passphrase for a symmetric key is required. - self.need_passphrase_sym = None - #: The keyid and uid which this data is encrypted to. - self.userid_hint = None - -
[docs] def _handle_status(self, key, value): - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key == 'NODATA': - self.status = nodata(value) - elif key == 'ENC_TO': - # This will only capture keys in our keyring. In the future we - # may want to include multiple unknown keys in this list. - self.key, _, _ = value.split() - elif key == 'NEED_PASSPHRASE': - self.need_passphrase = True - elif key == 'NEED_PASSPHRASE_SYM': - self.need_passphrase_sym = True - elif key == 'USERID_HINT': - self.userid_hint = value.strip().split() - elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED', - 'END_DECRYPTION'): - pass - else: - raise ValueError("Unknown status message: %r" % key)
-
- -
-
-
-
- -
-
-
- - - - - \ No newline at end of file diff --git a/docs/_build/html/_modules/gnupg/_util.html b/docs/_build/html/_modules/gnupg/_util.html deleted file mode 100644 index b6968c4..0000000 --- a/docs/_build/html/_modules/gnupg/_util.html +++ /dev/null @@ -1,718 +0,0 @@ - - - - - - - - gnupg._util — gnupg unknown documentation - - - - - - - - - - - - -
- -
- -
-
-
- -
-
-
- -

Source code for gnupg._util

-# -*- coding: utf-8 -*-
-#
-# This file is part of python-gnupg, a Python interface to GnuPG.
-# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
-#           © 2013 Andrej B.
-#           © 2013 LEAP Encryption Access Project
-#           © 2008-2012 Vinay Sajip
-#           © 2005 Steve Traugott
-#           © 2004 A.M. Kuchling
-#
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
-
-'''Extra utilities for python-gnupg.'''
-
-from __future__ import absolute_import
-from datetime   import datetime
-from socket     import gethostname
-from time       import localtime
-from time       import mktime
-
-import codecs
-import encodings
-import os
-import psutil
-import threading
-import random
-import re
-import string
-import sys
-
-try:
-    from io import StringIO
-    from io import BytesIO
-except ImportError:
-    from cStringIO import StringIO
-
-from . import _logger
-
-
-try:
-    unicode
-    _py3k = False
-    try:
-        isinstance(__name__, basestring)
-    except NameError:
-        msg  = "Sorry, python-gnupg requires a Python version with proper"
-        msg += " unicode support. Please upgrade to Python>=2.6."
-        raise SystemExit(msg)
-except NameError:
-    _py3k = True
-
-
-## Directory shortcuts:
-## we don't want to use this one because it writes to the install dir:
-#_here = getabsfile(currentframe()).rsplit(os.path.sep, 1)[0]
-_here = os.path.join(os.getcwd(), 'gnupg')                   ## current dir
-_test = os.path.join(os.path.join(_here, 'test'), 'tmp')     ## ./tests/tmp
-_user = os.environ.get('HOME')                               ## $HOME
-_ugpg = os.path.join(_user, '.gnupg')                        ## $HOME/.gnupg
-_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
-                                     ## $HOME/.config/python-gnupg
-
-## Logger is disabled by default
-log = _logger.create_logger(0)
-
-
-
[docs]def find_encodings(enc=None, system=False): - """Find functions for encoding translations for a specific codec. - - :param str enc: The codec to find translation functions for. It will be - normalized by converting to lowercase, excluding - everything which is not ascii, and hyphens will be - converted to underscores. - - :param bool system: If True, find encodings based on the system's stdin - encoding, otherwise assume utf-8. - - :raises: :exc:LookupError if the normalized codec, ``enc``, cannot be - found in Python's encoding translation map. - """ - if not enc: - enc = 'utf-8' - - if system: - if getattr(sys.stdin, 'encoding', None) is None: - enc = sys.stdin.encoding - log.debug("Obtained encoding from stdin: %s" % enc) - else: - enc = 'ascii' - - ## have to have lowercase to work, see - ## http://docs.python.org/dev/library/codecs.html#standard-encodings - enc = enc.lower() - codec_alias = encodings.normalize_encoding(enc) - - codecs.register(encodings.search_function) - coder = codecs.lookup(codec_alias) - - return coder -
-
[docs]def author_info(name, contact=None, public_key=None): - """Easy object-oriented representation of contributor info. - - :param str name: The contributor´s name. - :param str contact: The contributor´s email address or contact - information, if given. - :param str public_key: The contributor´s public keyid, if given. - """ - return Storage(name=name, contact=contact, public_key=public_key) -
-
[docs]def _copy_data(instream, outstream): - """Copy data from one stream to another. - - :type instream: :class:`io.BytesIO` or :class:`io.StringIO` or file - :param instream: A byte stream or open file to read from. - :param file outstream: The file descriptor of a tmpfile to write to. - """ - sent = 0 - - coder = find_encodings() - - while True: - if ((_py3k and isinstance(instream, str)) or - (not _py3k and isinstance(instream, basestring))): - data = instream[:1024] - instream = instream[1024:] - else: - data = instream.read(1024) - if len(data) == 0: - break - sent += len(data) - log.debug("Sending chunk %d bytes:\n%s" - % (sent, data)) - try: - outstream.write(data) - except UnicodeError: - try: - outstream.write(coder.encode(data)) - except IOError: - log.exception("Error sending data: Broken pipe") - break - except IOError as ioe: - # Can get 'broken pipe' errors even when all data was sent - if 'Broken pipe' in str(ioe): - log.error('Error sending data: Broken pipe') - else: - log.exception(ioe) - break - try: - outstream.close() - except IOError as ioe: - log.error("Unable to close outstream %s:\r\t%s" % (outstream, ioe)) - else: - log.debug("Closed outstream: %d bytes sent." % sent) -
-
[docs]def _create_if_necessary(directory): - """Create the specified directory, if necessary. - - :param str directory: The directory to use. - :rtype: bool - :returns: True if no errors occurred and the directory was created or - existed beforehand, False otherwise. - """ - - if not os.path.isabs(directory): - log.debug("Got non-absolute path: %s" % directory) - directory = os.path.abspath(directory) - - if not os.path.isdir(directory): - log.info("Creating directory: %s" % directory) - try: - os.makedirs(directory, 0x1C0) - except OSError as ose: - log.error(ose, exc_info=1) - return False - else: - log.debug("Created directory.") - return True -
-
[docs]def create_uid_email(username=None, hostname=None): - """Create an email address suitable for a UID on a GnuPG key. - - :param str username: The username portion of an email address. If None, - defaults to the username of the running Python - process. - - :param str hostname: The FQDN portion of an email address. If None, the - hostname is obtained from gethostname(2). - - :rtype: str - :returns: A string formatted as <username>@<hostname>. - """ - if hostname: - hostname = hostname.replace(' ', '_') - if not username: - try: username = os.environ['LOGNAME'] - except KeyError: username = os.environ['USERNAME'] - - if not hostname: hostname = gethostname() - - uid = "%s@%s" % (username.replace(' ', '_'), hostname) - else: - username = username.replace(' ', '_') - if (not hostname) and (username.find('@') == 0): - uid = "%s@%s" % (username, gethostname()) - elif hostname: - uid = "%s@%s" % (username, hostname) - else: - uid = username - - return uid -
-
[docs]def _deprefix(line, prefix, callback=None): - """Remove the prefix string from the beginning of line, if it exists. - - :param string line: A line, such as one output by GnuPG's status-fd. - :param string prefix: A substring to remove from the beginning of - ``line``. Case insensitive. - :type callback: callable - :param callback: Function to call if the prefix is found. The signature to - callback will be only one argument, the ``line`` without the ``prefix``, i.e. - ``callback(line)``. - :rtype: string - :returns: If the prefix was found, the ``line`` without the prefix is - returned. Otherwise, the original ``line`` is returned. - """ - try: - assert line.upper().startswith(u''.join(prefix).upper()) - except AssertionError: - log.debug("Line doesn't start with prefix '%s':\n%s" % (prefix, line)) - return line - else: - newline = line[len(prefix):] - if callback is not None: - try: - callback(newline) - except Exception as exc: - log.exception(exc) - return newline -
-
[docs]def _find_binary(binary=None): - """Find the absolute path to the GnuPG binary. - - Also run checks that the binary is not a symlink, and check that - our process real uid has exec permissions. - - :param str binary: The path to the GnuPG binary. - :raises: :exc:`~exceptions.RuntimeError` if it appears that GnuPG is not - installed. - :rtype: str - :returns: The absolute path to the GnuPG binary to use, if no exceptions - occur. - """ - found = None - if binary is not None: - if not os.path.isabs(binary): - try: - found = _which(binary) - log.debug("Found potential binary paths: %s" - % '\n'.join([path for path in found])) - found = found[0] - except IndexError as ie: - log.info("Could not determine absolute path of binary: '%s'" - % binary) - elif os.access(binary, os.X_OK): - found = binary - if found is None: - try: found = _which('gpg')[0] - except IndexError as ie: - log.error("Could not find binary for 'gpg'.") - try: found = _which('gpg2')[0] - except IndexError as ie: - log.error("Could not find binary for 'gpg2'.") - if found is None: - raise RuntimeError("GnuPG is not installed!") - - try: - assert os.path.isabs(found), "Path to gpg binary not absolute" - assert not os.path.islink(found), "Path to gpg binary is symlink" - assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary" - except (AssertionError, AttributeError) as ae: - log.error(str(ae)) - else: - return found -
-
[docs]def _has_readwrite(path): - """ - Determine if the real uid/gid of the executing user has read and write - permissions for a directory or a file. - - :param str path: The path to the directory or file to check permissions - for. - :rtype: bool - :returns: True if real uid/gid has read+write permissions, False otherwise. - """ - return os.access(path, os.R_OK ^ os.W_OK) -
-
[docs]def _is_file(filename): - """Check that the size of the thing which is supposed to be a filename has - size greater than zero, without following symbolic links or using - :func:os.path.isfile. - - :param filename: An object to check. - :rtype: bool - :returns: True if **filename** is file-like, False otherwise. - """ - try: - statinfo = os.lstat(filename) - log.debug("lstat(%r) with type=%s gave us %r" - % (repr(filename), type(filename), repr(statinfo))) - if not (statinfo.st_size > 0): - raise ValueError("'%s' appears to be an empty file!" % filename) - except OSError as oserr: - log.error(oserr) - if filename == '-': - log.debug("Got '-' for filename, assuming sys.stdin...") - return True - except (ValueError, TypeError, IOError) as err: - log.error(err) - else: - return True - return False -
-
[docs]def _is_stream(input): - """Check that the input is a byte stream. - - :param input: An object provided for reading from or writing to. - :rtype: bool - :returns: True if :param:input is a stream, False if otherwise. - """ - return isinstance(input, BytesIO) or isinstance(input, StringIO) -
-
[docs]def _is_list_or_tuple(instance): - """Check that ``instance`` is a list or tuple. - - :param instance: The object to type check. - :rtype: bool - :returns: True if ``instance`` is a list or tuple, False otherwise. - """ - return isinstance(instance, (list, tuple,)) -
-
[docs]def _is_gpg1(version): - """Returns True if using GnuPG version 1.x. - - :param tuple version: A tuple of three integers indication major, minor, - and micro version numbers. - """ - (major, minor, micro) = _match_version_string(version) - if major == 1: - return True - return False -
-
[docs]def _is_gpg2(version): - """Returns True if using GnuPG version 2.x. - - :param tuple version: A tuple of three integers indication major, minor, - and micro version numbers. - """ - (major, minor, micro) = _match_version_string(version) - if major == 2: - return True - return False -
-
[docs]def _make_binary_stream(s, encoding): - """ - xxx fill me in - """ - try: - if _py3k: - if isinstance(s, str): - s = s.encode(encoding) - else: - if type(s) is not str: - s = s.encode(encoding) - from io import BytesIO - rv = BytesIO(s) - except ImportError: - rv = StringIO(s) - return rv -
-
[docs]def _make_passphrase(length=None, save=False, file=None): - """Create a passphrase and write it to a file that only the user can read. - - This is not very secure, and should not be relied upon for actual key - passphrases. - - :param int length: The length in bytes of the string to generate. - - :param file file: The file to save the generated passphrase in. If not - given, defaults to 'passphrase-<the real user id>-<seconds since - epoch>' in the top-level directory. - """ - if not length: - length = 40 - - passphrase = _make_random_string(length) - - if save: - ruid, euid, suid = psutil.Process(os.getpid()).uids - gid = os.getgid() - now = mktime(localtime()) - - if not file: - filename = str('passphrase-%s-%s' % uid, now) - file = os.path.join(_repo, filename) - - with open(file, 'a') as fh: - fh.write(passphrase) - fh.flush() - fh.close() - os.chmod(file, stat.S_IRUSR | stat.S_IWUSR) - os.chown(file, ruid, gid) - - log.warn("Generated passphrase saved to %s" % file) - return passphrase -
-
[docs]def _make_random_string(length): - """Returns a random lowercase, uppercase, alphanumerical string. - - :param int length: The length in bytes of the string to generate. - """ - chars = string.ascii_lowercase + string.ascii_uppercase + string.digits - return ''.join(random.choice(chars) for x in range(length)) -
-
[docs]def _match_version_string(version): - """Sort a binary version string into major, minor, and micro integers. - - :param str version: A version string in the form x.x.x - """ - regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*') - matched = regex.match(version) - g = matched.groups() - major, minor, micro = int(g[0]), int(g[2]), int(g[4]) - return (major, minor, micro) -
-
[docs]def _next_year(): - """Get the date of today plus one year. - - :rtype: str - :returns: The date of this day next year, in the format '%Y-%m-%d'. - """ - now = datetime.now().__str__() - date = now.split(' ', 1)[0] - year, month, day = date.split('-', 2) - next_year = str(int(year)+1) - return '-'.join((next_year, month, day)) -
-
[docs]def _now(): - """Get a timestamp for right now, formatted according to ISO 8601.""" - return datetime.isoformat(datetime.now()) -
-
[docs]def _separate_keyword(line): - """Split the line, and return (first_word, the_rest).""" - try: - first, rest = line.split(None, 1) - except ValueError: - first = line.strip() - rest = '' - return first, rest -
-
[docs]def _threaded_copy_data(instream, outstream): - """Copy data from one stream to another in a separate thread. - - Wraps ``_copy_data()`` in a :class:`threading.Thread`. - - :type instream: :class:`io.BytesIO` or :class:`io.StringIO` - :param instream: A byte stream to read from. - :param file outstream: The file descriptor of a tmpfile to write to. - """ - copy_thread = threading.Thread(target=_copy_data, - args=(instream, outstream)) - copy_thread.setDaemon(True) - log.debug('%r, %r, %r', copy_thread, instream, outstream) - copy_thread.start() - return copy_thread -
-
[docs]def _utc_epoch(): - """Get the seconds since epoch.""" - return int(mktime(localtime())) -
-
[docs]def _which(executable, flags=os.X_OK): - """Borrowed from Twisted's :mod:twisted.python.proutils . - - Search PATH for executable files with the given name. - - On newer versions of MS-Windows, the PATHEXT environment variable will be - set to the list of file extensions for files considered executable. This - will normally include things like ".EXE". This fuction will also find files - with the given name ending with any of these extensions. - - On MS-Windows the only flag that has any meaning is os.F_OK. Any other - flags will be ignored. - - Note: This function does not help us prevent an attacker who can already - manipulate the environment's PATH settings from placing malicious code - higher in the PATH. It also does happily follows links. - - :param str name: The name for which to search. - :param int flags: Arguments to L{os.access}. - :rtype: list - :returns: A list of the full paths to files found, in the order in which - they were found. - """ - result = [] - exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep)) - path = os.environ.get('PATH', None) - if path is None: - return [] - for p in os.environ.get('PATH', '').split(os.pathsep): - p = os.path.join(p, executable) - if os.access(p, flags): - result.append(p) - for e in exts: - pext = p + e - if os.access(pext, flags): - result.append(pext) - return result -
-
[docs]def _write_passphrase(stream, passphrase, encoding): - """Write the passphrase from memory to the GnuPG process' stdin. - - :type stream: file, :class:`~io.BytesIO`, or :class:`~io.StringIO` - :param stream: The input file descriptor to write the password to. - :param str passphrase: The passphrase for the secret key material. - :param str encoding: The data encoding expected by GnuPG. Usually, this - is ``sys.getfilesystemencoding()``. - """ - passphrase = '%s\n' % passphrase - passphrase = passphrase.encode(encoding) - stream.write(passphrase) - log.debug("Wrote passphrase on stdin.") - -
-
[docs]class InheritableProperty(object): - """Based on the emulation of PyProperty_Type() in Objects/descrobject.c""" - - def __init__(self, fget=None, fset=None, fdel=None, doc=None): - self.fget = fget - self.fset = fset - self.fdel = fdel - self.__doc__ = doc - - def __get__(self, obj, objtype=None): - if obj is None: - return self - if self.fget is None: - raise AttributeError("unreadable attribute") - if self.fget.__name__ == '<lambda>' or not self.fget.__name__: - return self.fget(obj) - else: - return getattr(obj, self.fget.__name__)() - - def __set__(self, obj, value): - if self.fset is None: - raise AttributeError("can't set attribute") - if self.fset.__name__ == '<lambda>' or not self.fset.__name__: - self.fset(obj, value) - else: - getattr(obj, self.fset.__name__)(value) - - def __delete__(self, obj): - if self.fdel is None: - raise AttributeError("can't delete attribute") - if self.fdel.__name__ == '<lambda>' or not self.fdel.__name__: - self.fdel(obj) - else: - getattr(obj, self.fdel.__name__)() - -
-
[docs]class Storage(dict): - """A dictionary where keys are stored as class attributes. - - For example, ``obj.foo`` can be used in addition to ``obj['foo']``: - - >>> o = Storage(a=1) - >>> o.a - 1 - >>> o['a'] - 1 - >>> o.a = 2 - >>> o['a'] - 2 - >>> del o.a - >>> o.a - None - """ - def __getattr__(self, key): - try: - return self[key] - except KeyError as k: - return None - - def __setattr__(self, key, value): - self[key] = value - - def __delattr__(self, key): - try: - del self[key] - except KeyError as k: - raise AttributeError(k.args[0]) - - def __repr__(self): - return '<Storage ' + dict.__repr__(self) + '>' - - def __getstate__(self): - return dict(self) - - def __setstate__(self, value): - for (k, v) in value.items(): - self[k] = v
-
- -
-
-
-
- -
-
-
- - - - - \ No newline at end of file diff --git a/docs/_build/html/_modules/index.html b/docs/_build/html/_modules/index.html deleted file mode 100644 index d032746..0000000 --- a/docs/_build/html/_modules/index.html +++ /dev/null @@ -1,104 +0,0 @@ - - - - - - - - Overview: module code — gnupg unknown documentation - - - - - - - - - - - -
- -
- -
-
-
- -
-
-
- -

All modules for which code is available

- - -
-
-
-
- -
-
-
- - - - - \ No newline at end of file -- cgit v1.2.3