summaryrefslogtreecommitdiff
path: root/gnupg/_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'gnupg/_util.py')
-rw-r--r--gnupg/_util.py270
1 files changed, 224 insertions, 46 deletions
diff --git a/gnupg/_util.py b/gnupg/_util.py
index e1e14ab..79855ac 100644
--- a/gnupg/_util.py
+++ b/gnupg/_util.py
@@ -28,18 +28,58 @@ from time import mktime
import codecs
import encodings
import os
-import psutil
import threading
import random
import re
import string
import sys
+# These are all the classes which are stream-like; they are used in
+# :func:`_is_stream`.
+_STREAMLIKE_TYPES = []
+
+# These StringIO classes are actually utilised.
try:
+ import io
from io import StringIO
from io import BytesIO
except ImportError:
from cStringIO import StringIO
+else:
+ # The io.IOBase type covers the above example for an open file handle in
+ # Python3, as well as both io.BytesIO and io.StringIO.
+ _STREAMLIKE_TYPES.append(io.IOBase)
+
+# The remaining StringIO classes which are imported are used to determine if a
+# object is a stream-like in :func:`_is_stream`.
+if 2 == sys.version_info[0]:
+ # Import the StringIO class from the StringIO module since it is a
+ # commonly used stream class. It is distinct from either of the
+ # StringIO's that may be loaded in the above try/except clause, so the
+ # name is prefixed with an underscore to distinguish it.
+ from StringIO import StringIO as _StringIO_StringIO
+ _STREAMLIKE_TYPES.append(_StringIO_StringIO)
+
+ # Import the cStringIO module to test for the cStringIO stream types,
+ # InputType and OutputType. See
+ # http://stackoverflow.com/questions/14735295/to-check-an-instance-is-stringio
+ import cStringIO as _cStringIO
+ _STREAMLIKE_TYPES.append(_cStringIO.InputType)
+ _STREAMLIKE_TYPES.append(_cStringIO.OutputType)
+
+ # In Python2:
+ #
+ # >>> type(open('README.md', 'rb'))
+ # <open file 'README.md', mode 'rb' at 0x7f9493951d20>
+ #
+ # whereas, in Python3, the `file` builtin doesn't exist and instead we get:
+ #
+ # >>> type(open('README.md', 'rb'))
+ # <_io.BufferedReader name='README.md'>
+ #
+ # which is covered by the above addition of io.IOBase.
+ _STREAMLIKE_TYPES.append(file)
+
from . import _logger
@@ -56,6 +96,9 @@ try:
except NameError:
_py3k = True
+_running_windows = False
+if "win" in sys.platform:
+ _running_windows = True
## Directory shortcuts:
## we don't want to use this one because it writes to the install dir:
@@ -63,6 +106,20 @@ except NameError:
_here = os.path.join(os.getcwd(), 'gnupg') ## current dir
_test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp
_user = os.environ.get('HOME') ## $HOME
+
+# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
+# environs. https://github.com/isislovecruft/python-gnupg/issues/74
+if not _user:
+ _user = '/tmp/python-gnupg'
+ try:
+ os.makedirs(_user)
+ except (OSError, IOError):
+ _user = os.getcwd()
+ # If we can't use $HOME, but we have (or can create) a
+ # /tmp/python-gnupg/gnupghome directory, then we'll default to using
+ # that. Otherwise, we'll use the current directory + /gnupghome.
+ _user = os.path.sep.join([_user, 'gnupghome'])
+
_ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg
_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## $HOME/.config/python-gnupg
@@ -70,6 +127,9 @@ _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## Logger is disabled by default
log = _logger.create_logger(0)
+#: Compiled regex for determining a GnuPG binary's version:
+_VERSION_STRING_REGEX = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
+
def find_encodings(enc=None, system=False):
"""Find functions for encoding translations for a specific codec.
@@ -105,6 +165,51 @@ def find_encodings(enc=None, system=False):
return coder
+
+if _py3k:
+ def b(x):
+ """See http://python3porting.com/problems.html#nicer-solutions"""
+ coder = find_encodings()
+ if isinstance(x, bytes):
+ return coder.encode(x.decode(coder.name))[0]
+ else:
+ return coder.encode(x)[0]
+
+ def s(x):
+ if isinstance(x, str):
+ return x
+ elif isinstance(x, (bytes, bytearray)):
+ return x.decode(find_encodings().name)
+ else:
+ raise NotImplemented
+else:
+ def b(x):
+ """See http://python3porting.com/problems.html#nicer-solutions"""
+ return x
+
+ def s(x):
+ if isinstance(x, basestring):
+ return x
+ elif isinstance(x, (bytes, bytearray)):
+ return x.decode(find_encodings().name)
+ else:
+ raise NotImplemented
+
+def binary(data):
+ coder = find_encodings()
+
+ if _py3k and isinstance(data, bytes):
+ encoded = coder.encode(data.decode(coder.name))[0]
+ elif _py3k and isinstance(data, str):
+ encoded = coder.encode(data)[0]
+ elif not _py3k and type(data) is not str:
+ encoded = coder.encode(data)[0]
+ else:
+ encoded = data
+
+ return encoded
+
+
def author_info(name, contact=None, public_key=None):
"""Easy object-oriented representation of contributor info.
@@ -124,8 +229,6 @@ def _copy_data(instream, outstream):
"""
sent = 0
- coder = find_encodings()
-
while True:
if ((_py3k and isinstance(instream, str)) or
(not _py3k and isinstance(instream, basestring))):
@@ -135,24 +238,64 @@ def _copy_data(instream, outstream):
data = instream.read(1024)
if len(data) == 0:
break
+
sent += len(data)
- log.debug("Sending chunk %d bytes:\n%s"
- % (sent, data))
- try:
- outstream.write(data)
- except UnicodeError:
+ encoded = binary(data)
+ log.debug("Sending %d bytes of data..." % sent)
+ log.debug("Encoded data (type %s):\n%s" % (type(encoded), encoded))
+
+ if not _py3k:
try:
- outstream.write(coder.encode(data))
- except IOError:
- log.exception("Error sending data: Broken pipe")
+ outstream.write(encoded)
+ except IOError as ioe:
+ # Can get 'broken pipe' errors even when all data was sent
+ if 'Broken pipe' in str(ioe):
+ log.error('Error sending data: Broken pipe')
+ else:
+ log.exception(ioe)
break
- except IOError as ioe:
- # Can get 'broken pipe' errors even when all data was sent
- if 'Broken pipe' in str(ioe):
- log.error('Error sending data: Broken pipe')
else:
- log.exception(ioe)
- break
+ log.debug("Wrote data type <type 'str'> to outstream.")
+ else:
+ try:
+ outstream.write(bytes(encoded))
+ except TypeError as te:
+ # XXX FIXME This appears to happen because
+ # _threaded_copy_data() sometimes passes the `outstream` as an
+ # object with type <_io.BufferredWriter> and at other times
+ # with type <encodings.utf_8.StreamWriter>. We hit the
+ # following error when the `outstream` has type
+ # <encodings.utf_8.StreamWriter>.
+ if not "convert 'bytes' object to str implicitly" in str(te):
+ log.error(str(te))
+ try:
+ outstream.write(encoded.decode())
+ except TypeError as yate:
+ # We hit the "'str' does not support the buffer interface"
+ # error in Python3 when the `outstream` is an io.BytesIO and
+ # we try to write a str to it. We don't care about that
+ # error, we'll just try again with bytes.
+ if not "does not support the buffer interface" in str(yate):
+ log.error(str(yate))
+ except IOError as ioe:
+ # Can get 'broken pipe' errors even when all data was sent
+ if 'Broken pipe' in str(ioe):
+ log.error('Error sending data: Broken pipe')
+ else:
+ log.exception(ioe)
+ break
+ else:
+ log.debug("Wrote data type <class 'str'> outstream.")
+ except IOError as ioe:
+ # Can get 'broken pipe' errors even when all data was sent
+ if 'Broken pipe' in str(ioe):
+ log.error('Error sending data: Broken pipe')
+ else:
+ log.exception(ioe)
+ break
+ else:
+ log.debug("Wrote data type <class 'bytes'> to outstream.")
+
try:
outstream.close()
except IOError as ioe:
@@ -260,6 +403,8 @@ def _find_binary(binary=None):
"""
found = None
if binary is not None:
+ if os.path.isabs(binary) and os.path.isfile(binary):
+ return binary
if not os.path.isabs(binary):
try:
found = _which(binary)
@@ -272,7 +417,7 @@ def _find_binary(binary=None):
elif os.access(binary, os.X_OK):
found = binary
if found is None:
- try: found = _which('gpg')[0]
+ try: found = _which('gpg', abspath_only=True, disallow_symlinks=True)[0]
except IndexError as ie:
log.error("Could not find binary for 'gpg'.")
try: found = _which('gpg2')[0]
@@ -281,14 +426,7 @@ def _find_binary(binary=None):
if found is None:
raise RuntimeError("GnuPG is not installed!")
- try:
- assert os.path.isabs(found), "Path to gpg binary not absolute"
- assert not os.path.islink(found), "Path to gpg binary is symlink"
- assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary"
- except (AssertionError, AttributeError) as ae:
- log.error(str(ae))
- else:
- return found
+ return found
def _has_readwrite(path):
"""
@@ -335,7 +473,32 @@ def _is_stream(input):
:rtype: bool
:returns: True if :param:input is a stream, False if otherwise.
"""
- return isinstance(input, BytesIO) or isinstance(input, StringIO)
+ return isinstance(input, tuple(_STREAMLIKE_TYPES))
+
+def _is_string(thing):
+ """Check that **thing** is a string. The definition of the latter depends
+ upon the Python version.
+
+ :param thing: The thing to check if it's a string.
+ :rtype: bool
+ :returns: ``True`` if **thing** is string (or unicode in Python2).
+ """
+ if (_py3k and isinstance(thing, str)):
+ return True
+ if (not _py3k and isinstance(thing, basestring)):
+ return True
+ return False
+
+def _is_bytes(thing):
+ """Check that **thing** is bytes.
+
+ :param thing: The thing to check if it's bytes.
+ :rtype: bool
+ :returns: ``True`` if **thing** is bytes or a bytearray.
+ """
+ if isinstance(thing, (bytes, bytearray)):
+ return True
+ return False
def _is_list_or_tuple(instance):
"""Check that ``instance`` is a list or tuple.
@@ -368,21 +531,26 @@ def _is_gpg2(version):
return True
return False
-def _make_binary_stream(s, encoding):
- """
- xxx fill me in
+def _make_binary_stream(thing, encoding=None, armor=True):
+ """Encode **thing**, then make it stream/file-like.
+
+ :param thing: The thing to turn into a encoded stream.
+ :rtype: ``io.BytesIO`` or ``io.StringIO``.
+ :returns: The encoded **thing**, wrapped in an ``io.BytesIO`` (if
+ available), otherwise wrapped in a ``io.StringIO``.
"""
+ if _py3k:
+ if isinstance(thing, str):
+ thing = thing.encode(encoding)
+ else:
+ if type(thing) is not str:
+ thing = thing.encode(encoding)
+
try:
- if _py3k:
- if isinstance(s, str):
- s = s.encode(encoding)
- else:
- if type(s) is not str:
- s = s.encode(encoding)
- from io import BytesIO
- rv = BytesIO(s)
- except ImportError:
- rv = StringIO(s)
+ rv = BytesIO(thing)
+ except NameError:
+ rv = StringIO(thing)
+
return rv
def _make_passphrase(length=None, save=False, file=None):
@@ -403,7 +571,7 @@ def _make_passphrase(length=None, save=False, file=None):
passphrase = _make_random_string(length)
if save:
- ruid, euid, suid = psutil.Process(os.getpid()).uids
+ ruid, euid, suid = os.getresuid()
gid = os.getgid()
now = mktime(localtime())
@@ -434,8 +602,7 @@ def _match_version_string(version):
:param str version: A version string in the form x.x.x
"""
- regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
- matched = regex.match(version)
+ matched = _VERSION_STRING_REGEX.match(version)
g = matched.groups()
major, minor, micro = int(g[0]), int(g[2]), int(g[4])
return (major, minor, micro)
@@ -485,7 +652,7 @@ def _utc_epoch():
"""Get the seconds since epoch."""
return int(mktime(localtime()))
-def _which(executable, flags=os.X_OK):
+def _which(executable, flags=os.X_OK, abspath_only=False, disallow_symlinks=False):
"""Borrowed from Twisted's :mod:twisted.python.proutils .
Search PATH for executable files with the given name.
@@ -508,6 +675,17 @@ def _which(executable, flags=os.X_OK):
:returns: A list of the full paths to files found, in the order in which
they were found.
"""
+ def _can_allow(p):
+ if not os.access(p, flags):
+ return False
+ if abspath_only and not os.path.abspath(p):
+ log.warn('Ignoring %r (path is not absolute)', p)
+ return False
+ if disallow_symlinks and os.path.islink(p):
+ log.warn('Ignoring %r (path is a symlink)', p)
+ return False
+ return True
+
result = []
exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep))
path = os.environ.get('PATH', None)
@@ -515,11 +693,11 @@ def _which(executable, flags=os.X_OK):
return []
for p in os.environ.get('PATH', '').split(os.pathsep):
p = os.path.join(p, executable)
- if os.access(p, flags):
+ if _can_allow(p):
result.append(p)
for e in exts:
pext = p + e
- if os.access(pext, flags):
+ if _can_allow(pext):
result.append(pext)
return result