From a5d46a4e38985be540b9127ddcd3d8e21bbecb9a Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 8 Jun 2015 16:46:11 -0400 Subject: Imported Upstream version 2.0.2 --- docs/_build/html/_modules/gnupg/_meta.html | 981 ----------------------------- 1 file changed, 981 deletions(-) delete mode 100644 docs/_build/html/_modules/gnupg/_meta.html (limited to 'docs/_build/html/_modules/gnupg/_meta.html') diff --git a/docs/_build/html/_modules/gnupg/_meta.html b/docs/_build/html/_modules/gnupg/_meta.html deleted file mode 100644 index 23ca880..0000000 --- a/docs/_build/html/_modules/gnupg/_meta.html +++ /dev/null @@ -1,981 +0,0 @@ - - - - - - - - gnupg._meta — gnupg unknown documentation - - - - - - - - - - - - -
- -
- -
-
-
- -
-
-
- -

Source code for gnupg._meta

-# -*- coding: utf-8 -*-
-#
-# This file is part of python-gnupg, a Python interface to GnuPG.
-# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
-#           © 2013 Andrej B.
-#           © 2013 LEAP Encryption Access Project
-#           © 2008-2012 Vinay Sajip
-#           © 2005 Steve Traugott
-#           © 2004 A.M. Kuchling
-#
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
-
-'''Meta and base classes for hiding internal functions, and controlling
-attribute creation and handling.
-'''
-
-from __future__ import absolute_import
-
-import atexit
-import codecs
-import encodings
-## For AOS, the locale module will need to point to a wrapper around the
-## java.util.Locale class.
-## See https://code.patternsinthevoid.net/?p=android-locale-hack.git
-import locale
-import os
-import platform
-import psutil
-import shlex
-import subprocess
-import sys
-import threading
-
-from . import _parsers
-from . import _util
-
-from ._parsers import _check_preferences
-from ._parsers import _sanitise_list
-from ._util    import log
-
-
-
[docs]class GPGMeta(type): - """Metaclass for changing the :meth:GPG.__init__ initialiser. - - Detects running gpg-agent processes and the presence of a pinentry - program, and disables pinentry so that python-gnupg can write the - passphrase to the controlled GnuPG process without killing the agent. - - :attr _agent_proc: If a :program:`gpg-agent` process is currently running - for the effective userid, then **_agent_proc** will be - set to a ``psutil.Process`` for that process. - """ - -
[docs] def __new__(cls, name, bases, attrs): - """Construct the initialiser for GPG""" - log.debug("Metaclass __new__ constructor called for %r" % cls) - if cls._find_agent(): - ## call the normal GPG.__init__() initialiser: - attrs['init'] = cls.__init__ - attrs['_remove_agent'] = True - return super(GPGMeta, cls).__new__(cls, name, bases, attrs) -
- @classmethod -
[docs] def _find_agent(cls): - """Discover if a gpg-agent process for the current euid is running. - - If there is a matching gpg-agent process, set a :class:`psutil.Process` - instance containing the gpg-agent process' information to - ``cls._agent_proc``. - - :returns: True if there exists a gpg-agent process running under the - same effective user ID as that of this program. Otherwise, - returns None. - """ - identity = psutil.Process(os.getpid()).uids - for proc in psutil.process_iter(): - if (proc.name == "gpg-agent") and proc.is_running: - log.debug("Found gpg-agent process with pid %d" % proc.pid) - if proc.uids == identity: - log.debug( - "Effective UIDs of this process and gpg-agent match") - setattr(cls, '_agent_proc', proc) - return True - -
-
[docs]class GPGBase(object): - """Base class for storing properties and controlling process initialisation. - - :const _result_map: A *dict* containing classes from - :mod:`~gnupg._parsers`, used for parsing results - obtained from GnuPG commands. - :const _decode_errors: How to handle encoding errors. - """ - __metaclass__ = GPGMeta - _decode_errors = 'strict' - _result_map = { 'crypt': _parsers.Crypt, - 'delete': _parsers.DeleteResult, - 'generate': _parsers.GenKey, - 'import': _parsers.ImportResult, - 'list': _parsers.ListKeys, - 'sign': _parsers.Sign, - 'verify': _parsers.Verify, - 'packets': _parsers.ListPackets } - -
[docs] def __init__(self, binary=None, home=None, keyring=None, secring=None, - use_agent=False, default_preference_list=None, - verbose=False, options=None): - """Create a ``GPGBase``. - - This class is used to set up properties for controlling the behaviour - of configuring various options for GnuPG, such as setting GnuPG's - **homedir** , and the paths to its **binary** and **keyring** . - - :const binary: (:obj:`str`) The full path to the GnuPG binary. - - :ivar homedir: (:class:`~gnupg._util.InheritableProperty`) The full - path to the current setting for the GnuPG - ``--homedir``. - - :ivar _generated_keys: (:class:`~gnupg._util.InheritableProperty`) - Controls setting the directory for storing any - keys which are generated with - :meth:`~gnupg.GPG.gen_key`. - - :ivar str keyring: The filename in **homedir** to use as the keyring - file for public keys. - :ivar str secring: The filename in **homedir** to use as the keyring - file for secret keys. - """ - self.binary = _util._find_binary(binary) - self.homedir = home if home else _util._conf - pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg' - sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg' - self.keyring = os.path.join(self._homedir, pub) - self.secring = os.path.join(self._homedir, sec) - self.options = _parsers._sanitise(options) if options else None - - if default_preference_list: - self._prefs = _check_preferences(default_preference_list, 'all') - else: - self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH' - self._prefs += ' AES192 ZLIB ZIP Uncompressed' - - encoding = locale.getpreferredencoding() - if encoding is None: # This happens on Jython! - encoding = sys.stdin.encoding - self._encoding = encoding.lower().replace('-', '_') - self._filesystemencoding = encodings.normalize_encoding( - sys.getfilesystemencoding().lower()) - - self._keyserver = 'hkp://wwwkeys.pgp.net' - self.__generated_keys = os.path.join(self.homedir, 'generated-keys') - - try: - assert self.binary, "Could not find binary %s" % binary - assert isinstance(verbose, (bool, str, int)), \ - "'verbose' must be boolean, string, or 0 <= n <= 9" - assert isinstance(use_agent, bool), "'use_agent' must be boolean" - if self.options is not None: - assert isinstance(self.options, str), "options not string" - except (AssertionError, AttributeError) as ae: - log.error("GPGBase.__init__(): %s" % str(ae)) - raise RuntimeError(str(ae)) - else: - if verbose is True: - # The caller wants logging, but we need a valid --debug-level - # for gpg. Default to "basic", and warn about the ambiguity. - # (garrettr) - verbose = "basic" - log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging') - self.verbose = verbose - self.use_agent = use_agent - - if hasattr(self, '_agent_proc') \ - and getattr(self, '_remove_agent', None) is True: - if hasattr(self, '__remove_path__'): - self.__remove_path__('pinentry') -
-
[docs] def __remove_path__(self, prog=None, at_exit=True): - """Remove the directories containing a program from the system's - ``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it - is linked to :file:'./gpg' in the current directory. - - :param str prog: The program to remove from ``$PATH``. - :param bool at_exit: Add the program back into the ``$PATH`` when the - Python interpreter exits, and delete any symlinks - to ``GPGBase.binary`` which were created. - """ - #: A list of ``$PATH`` entries which were removed to disable pinentry. - self._removed_path_entries = [] - - log.debug("Attempting to remove %s from system PATH" % str(prog)) - if (prog is None) or (not isinstance(prog, str)): return - - try: - program = _util._which(prog)[0] - except (OSError, IOError, IndexError) as err: - log.err(str(err)) - log.err("Cannot find program '%s', not changing PATH." % prog) - return - - ## __remove_path__ cannot be an @classmethod in GPGMeta, because - ## the use_agent attribute must be set by the instance. - if not self.use_agent: - program_base = os.path.dirname(prog) - gnupg_base = os.path.dirname(self.binary) - - ## symlink our gpg binary into $PWD if the path we are removing is - ## the one which contains our gpg executable: - new_gpg_location = os.path.join(os.getcwd(), 'gpg') - if gnupg_base == program_base: - os.symlink(self.binary, new_gpg_location) - self.binary = new_gpg_location - - ## copy the original environment so that we can put it back later: - env_copy = os.environ ## this one should not be touched - path_copy = os.environ.pop('PATH') - log.debug("Created a copy of system PATH: %r" % path_copy) - assert not os.environ.has_key('PATH'), "OS env kept $PATH anyway!" - - @staticmethod - def remove_program_from_path(path, prog_base): - """Remove all directories which contain a program from PATH. - - :param str path: The contents of the system environment's - ``$PATH``. - - :param str prog_base: The directory portion of a program's - location, without the trailing slash, - and without the program name. For - example, ``prog_base='/usr/bin'``. - """ - paths = path.split(':') - for directory in paths: - if directory == prog_base: - log.debug("Found directory with target program: %s" - % directory) - path.remove(directory) - self._removed_path_entries.append(directory) - log.debug("Deleted all found instance of %s." % directory) - log.debug("PATH is now:%s%s" % (os.linesep, path)) - new_path = ':'.join([p for p in path]) - return new_path - - @staticmethod - def update_path(environment, path): - """Add paths to the string at ``os.environ['PATH']``. - - :param str environment: The environment mapping to update. - :param list path: A list of strings to update the PATH with. - """ - log.debug("Updating system path...") - os.environ = environment - new_path = ':'.join([p for p in path]) - old = '' - if 'PATH' in os.environ: - new_path = ':'.join([os.environ['PATH'], new_path]) - os.environ.update({'PATH': new_path}) - log.debug("System $PATH: %s" % os.environ['PATH']) - - modified_path = remove_program_from_path(path_copy, program_base) - update_path(env_copy, modified_path) - - ## register an _exithandler with the python interpreter: - atexit.register(update_path, env_copy, path_copy) - - def remove_symlinked_binary(symlink): - if os.path.islink(symlink): - os.unlink(symlink) - log.debug("Removed binary symlink '%s'" % symlink) - atexit.register(remove_symlinked_binary, new_gpg_location) -
- @property - def default_preference_list(self): - """Get the default preference list.""" - return self._prefs - - @default_preference_list.setter - def default_preference_list(self, prefs): - """Set the default preference list. - - :param str prefs: A string containing the default preferences for - ciphers, digests, and compression algorithms. - """ - prefs = _check_preferences(prefs) - if prefs is not None: - self._prefs = prefs - - @default_preference_list.deleter -
[docs] def default_preference_list(self): - """Reset the default preference list to its original state. - - Note that "original state" does not mean the default preference - list for whichever version of GnuPG is being used. It means the - default preference list defined by :attr:`GPGBase._prefs`. - - Using BZIP2 is avoided due to not interacting well with some versions - of GnuPG>=2.0.0. - """ - self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP' -
- @property - def keyserver(self): - """Get the current keyserver setting.""" - return self._keyserver - - @keyserver.setter - def keyserver(self, location): - """Set the default keyserver to use for sending and receiving keys. - - The ``location`` is sent to :func:`_parsers._check_keyserver` when - option are parsed in :meth:`gnupg.GPG._make_options`. - - :param str location: A string containing the default keyserver. This - should contain the desired keyserver protocol - which is supported by the keyserver, for example, - ``'hkps://keys.mayfirst.org'``. The default - keyserver is ``'hkp://wwwkeys.pgp.net'``. - """ - self._keyserver = location - - @keyserver.deleter -
[docs] def keyserver(self): - """Reset the keyserver to the default setting.""" - self._keyserver = 'hkp://wwwkeys.pgp.net' -
-
[docs] def _homedir_getter(self): - """Get the directory currently being used as GnuPG's homedir. - - If unspecified, use :file:`~/.config/python-gnupg/` - - :rtype: str - :returns: The absolute path to the current GnuPG homedir. - """ - return self._homedir -
-
[docs] def _homedir_setter(self, directory): - """Set the directory to use as GnuPG's homedir. - - If unspecified, use $HOME/.config/python-gnupg. If specified, ensure - that the ``directory`` does not contain various shell escape - characters. If ``directory`` is not found, it will be automatically - created. Lastly, the ``direcory`` will be checked that the EUID has - read and write permissions for it. - - :param str directory: A relative or absolute path to the directory to - use for storing/accessing GnuPG's files, including - keyrings and the trustdb. - :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable - directory to use. - """ - if not directory: - log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'" - % _util._conf) - directory = _util._conf - - hd = _parsers._fix_unsafe(directory) - log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd) - - if hd: - log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd) - _util._create_if_necessary(hd) - - try: - log.debug("GPGBase._homedir_setter(): checking permissions") - assert _util._has_readwrite(hd), \ - "Homedir '%s' needs read/write permissions" % hd - except AssertionError as ae: - msg = ("Unable to set '%s' as GnuPG homedir" % directory) - log.debug("GPGBase.homedir.setter(): %s" % msg) - log.debug(str(ae)) - raise RuntimeError(str(ae)) - else: - log.info("Setting homedir to '%s'" % hd) - self._homedir = hd -
- homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter) - -
[docs] def _generated_keys_getter(self): - """Get the ``homedir`` subdirectory for storing generated keys. - - :rtype: str - :returns: The absolute path to the current GnuPG homedir. - """ - return self.__generated_keys -
-
[docs] def _generated_keys_setter(self, directory): - """Set the directory for storing generated keys. - - If unspecified, use - :meth:`~gnupg._meta.GPGBase.homedir`/generated-keys. If specified, - ensure that the ``directory`` does not contain various shell escape - characters. If ``directory`` isn't found, it will be automatically - created. Lastly, the ``directory`` will be checked to ensure that the - current EUID has read and write permissions for it. - - :param str directory: A relative or absolute path to the directory to - use for storing/accessing GnuPG's files, including keyrings and - the trustdb. - :raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable - directory to use. - """ - if not directory: - directory = os.path.join(self.homedir, 'generated-keys') - log.debug("GPGBase._generated_keys_setter(): Using '%s'" - % directory) - - hd = _parsers._fix_unsafe(directory) - log.debug("GPGBase._generated_keys_setter(): got directory '%s'" % hd) - - if hd: - log.debug("GPGBase._generated_keys_setter(): Check exists '%s'" - % hd) - _util._create_if_necessary(hd) - - try: - log.debug("GPGBase._generated_keys_setter(): check permissions") - assert _util._has_readwrite(hd), \ - "Keys dir '%s' needs read/write permissions" % hd - except AssertionError as ae: - msg = ("Unable to set '%s' as generated keys dir" % directory) - log.debug("GPGBase._generated_keys_setter(): %s" % msg) - log.debug(str(ae)) - raise RuntimeError(str(ae)) - else: - log.info("Setting homedir to '%s'" % hd) - self.__generated_keys = hd -
- _generated_keys = _util.InheritableProperty(_generated_keys_getter, - _generated_keys_setter) - -
[docs] def _make_args(self, args, passphrase=False): - """Make a list of command line elements for GPG. - - The value of ``args`` will be appended only if it passes the checks in - :func:`gnupg._parsers._sanitise`. The ``passphrase`` argument needs to - be True if a passphrase will be sent to GnuPG, else False. - - :param list args: A list of strings of options and flags to pass to - ``GPG.binary``. This is input safe, meaning that - these values go through strict checks (see - ``parsers._sanitise_list``) before being passed to to - the input file descriptor for the GnuPG process. - Each string should be given exactly as it would be on - the commandline interface to GnuPG, - e.g. ["--cipher-algo AES256", "--default-key - A3ADB67A2CDB8B35"]. - - :param bool passphrase: If True, the passphrase will be sent to the - stdin file descriptor for the attached GnuPG - process. - """ - ## see TODO file, tag :io:makeargs: - cmd = [self.binary, - '--no-options --no-emit-version --no-tty --status-fd 2'] - - if self.homedir: cmd.append('--homedir "%s"' % self.homedir) - - if self.keyring: - cmd.append('--no-default-keyring --keyring %s' % self.keyring) - if self.secring: - cmd.append('--secret-keyring %s' % self.secring) - - if passphrase: cmd.append('--batch --passphrase-fd 0') - - if self.use_agent: cmd.append('--use-agent') - else: cmd.append('--no-use-agent') - - if self.options: - [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] - if args: - [cmd.append(arg) for arg in iter(_sanitise_list(args))] - - if self.verbose: - cmd.append('--debug-all') - if ((isinstance(self.verbose, str) and - self.verbose in ['basic', 'advanced', 'expert', 'guru']) - or (isinstance(self.verbose, int) and (1<=self.verbose<=9))): - cmd.append('--debug-level %s' % self.verbose) - - return cmd -
-
[docs] def _open_subprocess(self, args=None, passphrase=False): - """Open a pipe to a GPG subprocess and return the file objects for - communicating with it. - - :param list args: A list of strings of options and flags to pass to - ``GPG.binary``. This is input safe, meaning that - these values go through strict checks (see - ``parsers._sanitise_list``) before being passed to to - the input file descriptor for the GnuPG process. - Each string should be given exactly as it would be on - the commandline interface to GnuPG, - e.g. ["--cipher-algo AES256", "--default-key - A3ADB67A2CDB8B35"]. - - :param bool passphrase: If True, the passphrase will be sent to the - stdin file descriptor for the attached GnuPG - process. - """ - ## see http://docs.python.org/2/library/subprocess.html#converting-an\ - ## -argument-sequence-to-a-string-on-windows - cmd = shlex.split(' '.join(self._make_args(args, passphrase))) - log.debug("Sending command to GnuPG process:%s%s" % (os.linesep, cmd)) - - if platform.system() == "Windows": - # TODO figure out what the hell is going on there. - expand_shell = True - else: - expand_shell = False - - return subprocess.Popen(cmd, shell=expand_shell, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env={'LANGUAGE': 'en'}) -
-
[docs] def _read_response(self, stream, result): - """Reads all the stderr output from GPG, taking notice only of lines - that begin with the magic [GNUPG:] prefix. - - Calls methods on the response object for each valid token found, with - the arg being the remainder of the status line. - - :param stream: A byte-stream, file handle, or a - :data:`subprocess.PIPE` for parsing the status codes - from the GnuPG process. - - :param result: The result parser class from :mod:`~gnupg._parsers` ― - the ``handle_status()`` method of that class will be - called in order to parse the output of ``stream``. - """ - lines = [] - while True: - line = stream.readline() - if len(line) == 0: - break - lines.append(line) - line = line.rstrip() - - if line.startswith('[GNUPG:]'): - line = _util._deprefix(line, '[GNUPG:] ', log.status) - keyword, value = _util._separate_keyword(line) - result._handle_status(keyword, value) - elif line.startswith('gpg:'): - line = _util._deprefix(line, 'gpg: ') - keyword, value = _util._separate_keyword(line) - - # Log gpg's userland messages at our own levels: - if keyword.upper().startswith("WARNING"): - log.warn("%s" % value) - elif keyword.upper().startswith("FATAL"): - log.critical("%s" % value) - # Handle the gpg2 error where a missing trustdb.gpg is, - # for some stupid reason, considered fatal: - if value.find("trustdb.gpg") and value.find("No such file"): - result._handle_status('NEED_TRUSTDB', '') - else: - if self.verbose: - log.info("%s" % line) - else: - log.debug("%s" % line) - result.stderr = ''.join(lines) -
-
[docs] def _read_data(self, stream, result): - """Incrementally read from ``stream`` and store read data. - - All data gathered from calling ``stream.read()`` will be concatenated - and stored as ``result.data``. - - :param stream: An open file-like object to read() from. - :param result: An instance of one of the :ref:`result parsing classes - <parsers>` from :const:`~gnupg._meta.GPGBase._result_map`. - """ - chunks = [] - log.debug("Reading data from stream %r..." % stream.__repr__()) - - while True: - data = stream.read(1024) - if len(data) == 0: - break - chunks.append(data) - log.debug("Read %4d bytes" % len(data)) - - # Join using b'' or '', as appropriate - result.data = type(data)().join(chunks) - log.debug("Finishing reading from stream %r..." % stream.__repr__()) - log.debug("Read %4d bytes total" % len(result.data)) -
-
[docs] def _collect_output(self, process, result, writer=None, stdin=None): - """Drain the subprocesses output streams, writing the collected output - to the result. If a writer thread (writing to the subprocess) is given, - make sure it's joined before returning. If a stdin stream is given, - close it before returning. - """ - stderr = codecs.getreader(self._encoding)(process.stderr) - rr = threading.Thread(target=self._read_response, - args=(stderr, result)) - rr.setDaemon(True) - log.debug('stderr reader: %r', rr) - rr.start() - - stdout = process.stdout - dr = threading.Thread(target=self._read_data, args=(stdout, result)) - dr.setDaemon(True) - log.debug('stdout reader: %r', dr) - dr.start() - - dr.join() - rr.join() - if writer is not None: - writer.join() - process.wait() - if stdin is not None: - try: - stdin.close() - except IOError: - pass - stderr.close() - stdout.close() -
-
[docs] def _handle_io(self, args, file, result, passphrase=False, binary=False): - """Handle a call to GPG - pass input data, collect output data.""" - p = self._open_subprocess(args, passphrase) - if not binary: - stdin = codecs.getwriter(self._encoding)(p.stdin) - else: - stdin = p.stdin - if passphrase: - _util._write_passphrase(stdin, passphrase, self._encoding) - writer = _util._threaded_copy_data(file, stdin) - self._collect_output(p, result, writer, stdin) - return result -
-
[docs] def _recv_keys(self, keyids, keyserver=None): - """Import keys from a keyserver. - - :param str keyids: A space-delimited string containing the keyids to - request. - :param str keyserver: The keyserver to request the ``keyids`` from; - defaults to `gnupg.GPG.keyserver`. - """ - if not keyserver: - keyserver = self.keyserver - - args = ['--keyserver {0}'.format(keyserver), - '--recv-keys {0}'.format(keyids)] - log.info('Requesting keys from %s: %s' % (keyserver, keyids)) - - result = self._result_map['import'](self) - proc = self._open_subprocess(args) - self._collect_output(proc, result) - log.debug('recv_keys result: %r', result.__dict__) - return result -
-
[docs] def _sign_file(self, file, default_key=None, passphrase=None, - clearsign=True, detach=False, binary=False, - digest_algo='SHA512'): - """Create a signature for a file. - - :param file: The file stream (i.e. it's already been open()'d) to sign. - :param str default_key: The key to sign with. - :param str passphrase: The passphrase to pipe to stdin. - :param bool clearsign: If True, create a cleartext signature. - :param bool detach: If True, create a detached signature. - :param bool binary: If True, do not ascii armour the output. - :param str digest_algo: The hash digest to use. Again, to see which - hashes your GnuPG is capable of using, do: - ``$ gpg --with-colons --list-config - digestname``. The default, if unspecified, is - ``'SHA512'``. - """ - log.debug("_sign_file():") - if binary: - log.info("Creating binary signature for file %s" % file) - args = ['--sign'] - else: - log.info("Creating ascii-armoured signature for file %s" % file) - args = ['--sign --armor'] - - if clearsign: - args.append("--clearsign") - if detach: - log.warn("Cannot use both --clearsign and --detach-sign.") - log.warn("Using default GPG behaviour: --clearsign only.") - elif detach and not clearsign: - args.append("--detach-sign") - - if default_key: - args.append(str("--default-key %s" % default_key)) - - args.append(str("--digest-algo %s" % digest_algo)) - - ## We could use _handle_io here except for the fact that if the - ## passphrase is bad, gpg bails and you can't write the message. - result = self._result_map['sign'](self) - proc = self._open_subprocess(args, passphrase is not None) - try: - if passphrase: - _util._write_passphrase(proc.stdin, passphrase, self._encoding) - writer = _util._threaded_copy_data(file, proc.stdin) - except IOError as ioe: - log.exception("Error writing message: %s" % str(ioe)) - writer = None - self._collect_output(proc, result, writer, proc.stdin) - return result -
-
[docs] def _encrypt(self, data, recipients, - default_key=None, - passphrase=None, - armor=True, - encrypt=True, - symmetric=False, - always_trust=True, - output=None, - cipher_algo='AES256', - digest_algo='SHA512', - compress_algo='ZLIB'): - """Encrypt the message read from the file-like object **data**. - - :param str data: The file or bytestream to encrypt. - - :param str recipients: The recipients to encrypt to. Recipients must - be specified keyID/fingerprint. - - .. warning:: Care should be taken in Python2 to make sure that the - given fingerprints for **recipients** are in fact strings - and not unicode objects. - - :param str default_key: The keyID/fingerprint of the key to use for - signing. If given, **data** will be encrypted - *and* signed. - - :param str passphrase: If given, and **default_key** is also given, - use this passphrase to unlock the secret - portion of the **default_key** to sign the - encrypted **data**. Otherwise, if - **default_key** is not given, but **symmetric** - is ``True``, then use this passphrase as the - passphrase for symmetric encryption. Signing - and symmetric encryption should *not* be - combined when sending the **data** to other - recipients, else the passphrase to the secret - key would be shared with them. - - :param bool armor: If True, ascii armor the output; otherwise, the - output will be in binary format. (Default: True) - - :param bool encrypt: If True, encrypt the **data** using the - **recipients** public keys. (Default: True) - - :param bool symmetric: If True, encrypt the **data** to **recipients** - using a symmetric key. See the **passphrase** - parameter. Symmetric encryption and public key - encryption can be used simultaneously, and will - result in a ciphertext which is decryptable - with either the symmetric **passphrase** or one - of the corresponding private keys. - - :param bool always_trust: If True, ignore trust warnings on - **recipients** keys. If False, display trust - warnings. (default: True) - - :param str output: The output file to write to. If not specified, the - encrypted output is returned, and thus should be - stored as an object in Python. For example: - - - >>> import shutil - >>> import gnupg - >>> if os.path.exists("doctests"): - ... shutil.rmtree("doctests") - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key_settings = gpg.gen_key_input(key_type='RSA', - ... key_length=1024, - ... key_usage='ESCA', - ... passphrase='foo') - >>> key = gpg.gen_key(key_settings) - >>> message = "The crow flies at midnight." - >>> encrypted = str(gpg.encrypt(message, key.printprint)) - >>> assert encrypted != message - >>> assert not encrypted.isspace() - >>> decrypted = str(gpg.decrypt(encrypted)) - >>> assert not decrypted.isspace() - >>> decrypted - 'The crow flies at midnight.' - - :param str cipher_algo: The cipher algorithm to use. To see available - algorithms with your version of GnuPG, do: - :command:`$ gpg --with-colons --list-config - ciphername`. The default **cipher_algo**, if - unspecified, is ``'AES256'``. - - :param str digest_algo: The hash digest to use. Again, to see which - hashes your GnuPG is capable of using, do: - :command:`$ gpg --with-colons --list-config - digestname`. The default, if unspecified, is - ``'SHA512'``. - - :param str compress_algo: The compression algorithm to use. Can be one - of ``'ZLIB'``, ``'BZIP2'``, ``'ZIP'``, or - ``'Uncompressed'``. - """ - args = [] - - if output: - if getattr(output, 'fileno', None) is not None: - ## avoid overwrite confirmation message - if getattr(output, 'name', None) is None: - if os.path.exists(output): - os.remove(output) - args.append('--output %s' % output) - else: - if os.path.exists(output.name): - os.remove(output.name) - args.append('--output %s' % output.name) - - if armor: args.append('--armor') - if always_trust: args.append('--always-trust') - if cipher_algo: args.append('--cipher-algo %s' % cipher_algo) - if compress_algo: args.append('--compress-algo %s' % compress_algo) - - if default_key: - args.append('--sign') - args.append('--default-key %s' % default_key) - if digest_algo: - args.append('--digest-algo %s' % digest_algo) - - ## both can be used at the same time for an encrypted file which - ## is decryptable with a passphrase or secretkey. - if symmetric: args.append('--symmetric') - if encrypt: args.append('--encrypt') - - if len(recipients) >= 1: - log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" - % (recipients, type(recipients))) - - if isinstance(recipients, (list, tuple)): - for recp in recipients: - if not _util._py3k: - if isinstance(recp, unicode): - try: - assert _parsers._is_hex(str(recp)) - except AssertionError: - log.info("Can't accept recipient string: %s" - % recp) - else: - args.append('--recipient %s' % str(recp)) - continue - ## will give unicode in 2.x as '\uXXXX\uXXXX' - args.append('--recipient %r' % recp) - continue - if isinstance(recp, str): - args.append('--recipient %s' % recp) - - elif (not _util._py3k) and isinstance(recp, basestring): - for recp in recipients.split('\x20'): - args.append('--recipient %s' % recp) - - elif _util._py3k and isinstance(recp, str): - for recp in recipients.split(' '): - args.append('--recipient %s' % recp) - ## ...and now that we've proven py3k is better... - - else: - log.debug("Don't know what to do with recipients: '%s'" - % recipients) - - result = self._result_map['crypt'](self) - log.debug("Got data '%s' with type '%s'." - % (data, type(data))) - self._handle_io(args, data, result, - passphrase=passphrase, binary=True) - log.debug("\n%s" % result.data) - return result
-
- -
-
-
-
- -
-
-
- - - - - \ No newline at end of file -- cgit v1.2.3