From 81e0e2bc82757425bebfb659e6c2cb873bc88ec9 Mon Sep 17 00:00:00 2001
From: kali <kali@leap.se>
Date: Wed, 8 Aug 2012 17:33:47 +0900
Subject: reset tests + run_tests script + very simple first test.

---
 tests/support_tests.py | 1725 ------------------------------------------------
 1 file changed, 1725 deletions(-)
 delete mode 100644 tests/support_tests.py

(limited to 'tests/support_tests.py')

diff --git a/tests/support_tests.py b/tests/support_tests.py
deleted file mode 100644
index 2c56e12d..00000000
--- a/tests/support_tests.py
+++ /dev/null
@@ -1,1725 +0,0 @@
-"""Supporting definitions for the Python regression tests."""
-
-if __name__ != 'test.support':
-    raise ImportError('support must be imported from the test package')
-
-import contextlib
-import errno
-import functools
-import gc
-import socket
-import sys
-import os
-import platform
-import shutil
-import warnings
-import unittest
-import importlib
-import collections.abc
-import re
-import subprocess
-import imp
-import time
-import sysconfig
-import fnmatch
-import logging.handlers
-import struct
-
-try:
-    import _thread, threading
-except ImportError:
-    _thread = None
-    threading = None
-try:
-    import multiprocessing.process
-except ImportError:
-    multiprocessing = None
-
-try:
-    import faulthandler
-except ImportError:
-    faulthandler = None
-
-try:
-    import zlib
-except ImportError:
-    zlib = None
-
-__all__ = [
-    "Error", "TestFailed", "ResourceDenied", "import_module",
-    "verbose", "use_resources", "max_memuse", "record_original_stdout",
-    "get_original_stdout", "unload", "unlink", "rmtree", "forget",
-    "is_resource_enabled", "requires", "requires_freebsd_version",
-    "requires_linux_version", "requires_mac_ver", "find_unused_port", "bind_port",
-    "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd",
-    "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource",
-    "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource",
-    "captured_stdout", "captured_stdin", "captured_stderr", "time_out",
-    "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask',
-    "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest",
-    "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
-    "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail",
-    "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
-    "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
-    "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
-    "anticipate_failure"
-    ]
-
-class Error(Exception):
-    """Base class for regression test exceptions."""
-
-class TestFailed(Error):
-    """Test failed."""
-
-class ResourceDenied(unittest.SkipTest):
-    """Test skipped because it requested a disallowed resource.
-
-    This is raised when a test calls requires() for a resource that
-    has not be enabled.  It is used to distinguish between expected
-    and unexpected skips.
-    """
-
-@contextlib.contextmanager
-def _ignore_deprecated_imports(ignore=True):
-    """Context manager to suppress package and module deprecation
-    warnings when importing them.
-
-    If ignore is False, this context manager has no effect."""
-    if ignore:
-        with warnings.catch_warnings():
-            warnings.filterwarnings("ignore", ".+ (module|package)",
-                                    DeprecationWarning)
-            yield
-    else:
-        yield
-
-
-def import_module(name, deprecated=False):
-    """Import and return the module to be tested, raising SkipTest if
-    it is not available.
-
-    If deprecated is True, any module or package deprecation messages
-    will be suppressed."""
-    with _ignore_deprecated_imports(deprecated):
-        try:
-            return importlib.import_module(name)
-        except ImportError as msg:
-            raise unittest.SkipTest(str(msg))
-
-
-def _save_and_remove_module(name, orig_modules):
-    """Helper function to save and remove a module from sys.modules
-
-       Raise ImportError if the module can't be imported."""
-    # try to import the module and raise an error if it can't be imported
-    if name not in sys.modules:
-        __import__(name)
-        del sys.modules[name]
-    for modname in list(sys.modules):
-        if modname == name or modname.startswith(name + '.'):
-            orig_modules[modname] = sys.modules[modname]
-            del sys.modules[modname]
-
-def _save_and_block_module(name, orig_modules):
-    """Helper function to save and block a module in sys.modules
-
-       Return True if the module was in sys.modules, False otherwise."""
-    saved = True
-    try:
-        orig_modules[name] = sys.modules[name]
-    except KeyError:
-        saved = False
-    sys.modules[name] = None
-    return saved
-
-
-def anticipate_failure(condition):
-    """Decorator to mark a test that is known to be broken in some cases
-
-       Any use of this decorator should have a comment identifying the
-       associated tracker issue.
-    """
-    if condition:
-        return unittest.expectedFailure
-    return lambda f: f
-
-
-def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
-    """Imports and returns a module, deliberately bypassing the sys.modules cache
-    and importing a fresh copy of the module. Once the import is complete,
-    the sys.modules cache is restored to its original state.
-
-    Modules named in fresh are also imported anew if needed by the import.
-    If one of these modules can't be imported, None is returned.
-
-    Importing of modules named in blocked is prevented while the fresh import
-    takes place.
-
-    If deprecated is True, any module or package deprecation messages
-    will be suppressed."""
-    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
-    # to make sure that this utility function is working as expected
-    with _ignore_deprecated_imports(deprecated):
-        # Keep track of modules saved for later restoration as well
-        # as those which just need a blocking entry removed
-        orig_modules = {}
-        names_to_remove = []
-        _save_and_remove_module(name, orig_modules)
-        try:
-            for fresh_name in fresh:
-                _save_and_remove_module(fresh_name, orig_modules)
-            for blocked_name in blocked:
-                if not _save_and_block_module(blocked_name, orig_modules):
-                    names_to_remove.append(blocked_name)
-            fresh_module = importlib.import_module(name)
-        except ImportError:
-            fresh_module = None
-        finally:
-            for orig_name, module in orig_modules.items():
-                sys.modules[orig_name] = module
-            for name_to_remove in names_to_remove:
-                del sys.modules[name_to_remove]
-        return fresh_module
-
-
-def get_attribute(obj, name):
-    """Get an attribute, raising SkipTest if AttributeError is raised."""
-    try:
-        attribute = getattr(obj, name)
-    except AttributeError:
-        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
-    else:
-        return attribute
-
-verbose = 1              # Flag set to 0 by regrtest.py
-use_resources = None     # Flag set to [] by regrtest.py
-max_memuse = 0           # Disable bigmem tests (they will still be run with
-                         # small sizes, to make sure they work.)
-real_max_memuse = 0
-failfast = False
-match_tests = None
-
-# _original_stdout is meant to hold stdout at the time regrtest began.
-# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
-# The point is to have some flavor of stdout the user can actually see.
-_original_stdout = None
-def record_original_stdout(stdout):
-    global _original_stdout
-    _original_stdout = stdout
-
-def get_original_stdout():
-    return _original_stdout or sys.stdout
-
-def unload(name):
-    try:
-        del sys.modules[name]
-    except KeyError:
-        pass
-
-def unlink(filename):
-    try:
-        os.unlink(filename)
-    except OSError as error:
-        # The filename need not exist.
-        if error.errno not in (errno.ENOENT, errno.ENOTDIR):
-            raise
-
-def rmtree(path):
-    try:
-        shutil.rmtree(path)
-    except OSError as error:
-        if error.errno != errno.ENOENT:
-            raise
-
-def make_legacy_pyc(source):
-    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
-
-    The choice of .pyc or .pyo extension is done based on the __debug__ flag
-    value.
-
-    :param source: The file system path to the source file.  The source file
-        does not need to exist, however the PEP 3147 pyc file must exist.
-    :return: The file system path to the legacy pyc file.
-    """
-    pyc_file = imp.cache_from_source(source)
-    up_one = os.path.dirname(os.path.abspath(source))
-    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
-    os.rename(pyc_file, legacy_pyc)
-    return legacy_pyc
-
-def forget(modname):
-    """'Forget' a module was ever imported.
-
-    This removes the module from sys.modules and deletes any PEP 3147 or
-    legacy .pyc and .pyo files.
-    """
-    unload(modname)
-    for dirname in sys.path:
-        source = os.path.join(dirname, modname + '.py')
-        # It doesn't matter if they exist or not, unlink all possible
-        # combinations of PEP 3147 and legacy pyc and pyo files.
-        unlink(source + 'c')
-        unlink(source + 'o')
-        unlink(imp.cache_from_source(source, debug_override=True))
-        unlink(imp.cache_from_source(source, debug_override=False))
-
-# On some platforms, should not run gui test even if it is allowed
-# in `use_resources'.
-if sys.platform.startswith('win'):
-    import ctypes
-    import ctypes.wintypes
-    def _is_gui_available():
-        UOI_FLAGS = 1
-        WSF_VISIBLE = 0x0001
-        class USEROBJECTFLAGS(ctypes.Structure):
-            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
-                        ("fReserved", ctypes.wintypes.BOOL),
-                        ("dwFlags", ctypes.wintypes.DWORD)]
-        dll = ctypes.windll.user32
-        h = dll.GetProcessWindowStation()
-        if not h:
-            raise ctypes.WinError()
-        uof = USEROBJECTFLAGS()
-        needed = ctypes.wintypes.DWORD()
-        res = dll.GetUserObjectInformationW(h,
-            UOI_FLAGS,
-            ctypes.byref(uof),
-            ctypes.sizeof(uof),
-            ctypes.byref(needed))
-        if not res:
-            raise ctypes.WinError()
-        return bool(uof.dwFlags & WSF_VISIBLE)
-else:
-    def _is_gui_available():
-        return True
-
-def is_resource_enabled(resource):
-    """Test whether a resource is enabled.  Known resources are set by
-    regrtest.py."""
-    return use_resources is not None and resource in use_resources
-
-def requires(resource, msg=None):
-    """Raise ResourceDenied if the specified resource is not available.
-
-    If the caller's module is __main__ then automatically return True.  The
-    possibility of False being returned occurs when regrtest.py is
-    executing.
-    """
-    if resource == 'gui' and not _is_gui_available():
-        raise unittest.SkipTest("Cannot use the 'gui' resource")
-    # see if the caller's module is __main__ - if so, treat as if
-    # the resource was set
-    if sys._getframe(1).f_globals.get("__name__") == "__main__":
-        return
-    if not is_resource_enabled(resource):
-        if msg is None:
-            msg = "Use of the %r resource not enabled" % resource
-        raise ResourceDenied(msg)
-
-def _requires_unix_version(sysname, min_version):
-    """Decorator raising SkipTest if the OS is `sysname` and the version is less
-    than `min_version`.
-
-    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
-    the FreeBSD version is less than 7.2.
-    """
-    def decorator(func):
-        @functools.wraps(func)
-        def wrapper(*args, **kw):
-            if platform.system() == sysname:
-                version_txt = platform.release().split('-', 1)[0]
-                try:
-                    version = tuple(map(int, version_txt.split('.')))
-                except ValueError:
-                    pass
-                else:
-                    if version < min_version:
-                        min_version_txt = '.'.join(map(str, min_version))
-                        raise unittest.SkipTest(
-                            "%s version %s or higher required, not %s"
-                            % (sysname, min_version_txt, version_txt))
-        return wrapper
-    return decorator
-
-def requires_freebsd_version(*min_version):
-    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
-    less than `min_version`.
-
-    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
-    version is less than 7.2.
-    """
-    return _requires_unix_version('FreeBSD', min_version)
-
-def requires_linux_version(*min_version):
-    """Decorator raising SkipTest if the OS is Linux and the Linux version is
-    less than `min_version`.
-
-    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
-    version is less than 2.6.32.
-    """
-    return _requires_unix_version('Linux', min_version)
-
-def requires_mac_ver(*min_version):
-    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
-    version if less than min_version.
-
-    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
-    is lesser than 10.5.
-    """
-    def decorator(func):
-        @functools.wraps(func)
-        def wrapper(*args, **kw):
-            if sys.platform == 'darwin':
-                version_txt = platform.mac_ver()[0]
-                try:
-                    version = tuple(map(int, version_txt.split('.')))
-                except ValueError:
-                    pass
-                else:
-                    if version < min_version:
-                        min_version_txt = '.'.join(map(str, min_version))
-                        raise unittest.SkipTest(
-                            "Mac OS X %s or higher required, not %s"
-                            % (min_version_txt, version_txt))
-            return func(*args, **kw)
-        wrapper.min_version = min_version
-        return wrapper
-    return decorator
-
-
-HOST = 'localhost'
-
-def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
-    """Returns an unused port that should be suitable for binding.  This is
-    achieved by creating a temporary socket with the same family and type as
-    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
-    the specified host address (defaults to 0.0.0.0) with the port set to 0,
-    eliciting an unused ephemeral port from the OS.  The temporary socket is
-    then closed and deleted, and the ephemeral port is returned.
-
-    Either this method or bind_port() should be used for any tests where a
-    server socket needs to be bound to a particular port for the duration of
-    the test.  Which one to use depends on whether the calling code is creating
-    a python socket, or if an unused port needs to be provided in a constructor
-    or passed to an external program (i.e. the -accept argument to openssl's
-    s_server mode).  Always prefer bind_port() over find_unused_port() where
-    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
-    socket is bound to a hard coded port, the ability to run multiple instances
-    of the test simultaneously on the same host is compromised, which makes the
-    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
-    may simply manifest as a failed test, which can be recovered from without
-    intervention in most cases, but on Windows, the entire python process can
-    completely and utterly wedge, requiring someone to log in to the buildbot
-    and manually kill the affected process.
-
-    (This is easy to reproduce on Windows, unfortunately, and can be traced to
-    the SO_REUSEADDR socket option having different semantics on Windows versus
-    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
-    listen and then accept connections on identical host/ports.  An EADDRINUSE
-    socket.error will be raised at some point (depending on the platform and
-    the order bind and listen were called on each socket).
-
-    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
-    will ever be raised when attempting to bind two identical host/ports. When
-    accept() is called on each socket, the second caller's process will steal
-    the port from the first caller, leaving them both in an awkwardly wedged
-    state where they'll no longer respond to any signals or graceful kills, and
-    must be forcibly killed via OpenProcess()/TerminateProcess().
-
-    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
-    instead of SO_REUSEADDR, which effectively affords the same semantics as
-    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
-    Source world compared to Windows ones, this is a common mistake.  A quick
-    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
-    openssl.exe is called with the 's_server' option, for example. See
-    http://bugs.python.org/issue2550 for more info.  The following site also
-    has a very thorough description about the implications of both REUSEADDR
-    and EXCLUSIVEADDRUSE on Windows:
-    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
-
-    XXX: although this approach is a vast improvement on previous attempts to
-    elicit unused ports, it rests heavily on the assumption that the ephemeral
-    port returned to us by the OS won't immediately be dished back out to some
-    other process when we close and delete our temporary socket but before our
-    calling code has a chance to bind the returned port.  We can deal with this
-    issue if/when we come across it.
-    """
-
-    tempsock = socket.socket(family, socktype)
-    port = bind_port(tempsock)
-    tempsock.close()
-    del tempsock
-    return port
-
-def bind_port(sock, host=HOST):
-    """Bind the socket to a free port and return the port number.  Relies on
-    ephemeral ports in order to ensure we are using an unbound port.  This is
-    important as many tests may be running simultaneously, especially in a
-    buildbot environment.  This method raises an exception if the sock.family
-    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
-    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
-    for TCP/IP sockets.  The only case for setting these options is testing
-    multicasting via multiple UDP sockets.
-
-    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
-    on Windows), it will be set on the socket.  This will prevent anyone else
-    from bind()'ing to our host/port for the duration of the test.
-    """
-
-    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
-        if hasattr(socket, 'SO_REUSEADDR'):
-            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
-                raise TestFailed("tests should never set the SO_REUSEADDR "   \
-                                 "socket option on TCP/IP sockets!")
-        if hasattr(socket, 'SO_REUSEPORT'):
-            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
-                raise TestFailed("tests should never set the SO_REUSEPORT "   \
-                                 "socket option on TCP/IP sockets!")
-        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
-            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
-    sock.bind((host, 0))
-    port = sock.getsockname()[1]
-    return port
-
-def _is_ipv6_enabled():
-    """Check whether IPv6 is enabled on this host."""
-    if socket.has_ipv6:
-        try:
-            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
-            sock.bind(('::1', 0))
-        except (socket.error, socket.gaierror):
-            pass
-        else:
-            sock.close()
-            return True
-    return False
-
-IPV6_ENABLED = _is_ipv6_enabled()
-
-
-# A constant likely larger than the underlying OS pipe buffer size.
-# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe
-# buffer size: take 1M to be sure.
-PIPE_MAX_SIZE = 1024 * 1024
-
-
-# decorator for skipping tests on non-IEEE 754 platforms
-requires_IEEE_754 = unittest.skipUnless(
-    float.__getformat__("double").startswith("IEEE"),
-    "test requires IEEE 754 doubles")
-
-requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
-
-is_jython = sys.platform.startswith('java')
-
-# Filename used for testing
-if os.name == 'java':
-    # Jython disallows @ in module names
-    TESTFN = '$test'
-else:
-    TESTFN = '@test'
-
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-
-
-# TESTFN_UNICODE is a non-ascii filename
-TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
-if sys.platform == 'darwin':
-    # In Mac OS X's VFS API file names are, by definition, canonically
-    # decomposed Unicode, encoded using UTF-8. See QA1173:
-    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
-    import unicodedata
-    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
-TESTFN_ENCODING = sys.getfilesystemencoding()
-
-# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
-# encoded by the filesystem encoding (in strict mode). It can be None if we
-# cannot generate such filename.
-TESTFN_UNENCODABLE = None
-if os.name in ('nt', 'ce'):
-    # skip win32s (0) or Windows 9x/ME (1)
-    if sys.getwindowsversion().platform >= 2:
-        # Different kinds of characters from various languages to minimize the
-        # probability that the whole name is encodable to MBCS (issue #9819)
-        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
-        try:
-            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
-        except UnicodeEncodeError:
-            pass
-        else:
-            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
-                  'Unicode filename tests may not be effective'
-                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
-            TESTFN_UNENCODABLE = None
-# Mac OS X denies unencodable filenames (invalid utf-8)
-elif sys.platform != 'darwin':
-    try:
-        # ascii and utf-8 cannot encode the byte 0xff
-        b'\xff'.decode(TESTFN_ENCODING)
-    except UnicodeDecodeError:
-        # 0xff will be encoded using the surrogate character u+DCFF
-        TESTFN_UNENCODABLE = TESTFN \
-            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
-    else:
-        # File system encoding (eg. ISO-8859-* encodings) can encode
-        # the byte 0xff. Skip some unicode filename tests.
-        pass
-
-# Save the initial cwd
-SAVEDCWD = os.getcwd()
-
-@contextlib.contextmanager
-def temp_cwd(name='tempcwd', quiet=False, path=None):
-    """
-    Context manager that temporarily changes the CWD.
-
-    An existing path may be provided as *path*, in which case this
-    function makes no changes to the file system.
-
-    Otherwise, the new CWD is created in the current directory and it's
-    named *name*. If *quiet* is False (default) and it's not possible to
-    create or change the CWD, an error is raised.  If it's True, only a
-    warning is raised and the original CWD is used.
-    """
-    saved_dir = os.getcwd()
-    is_temporary = False
-    if path is None:
-        path = name
-        try:
-            os.mkdir(name)
-            is_temporary = True
-        except OSError:
-            if not quiet:
-                raise
-            warnings.warn('tests may fail, unable to create temp CWD ' + name,
-                          RuntimeWarning, stacklevel=3)
-    try:
-        os.chdir(path)
-    except OSError:
-        if not quiet:
-            raise
-        warnings.warn('tests may fail, unable to change the CWD to ' + name,
-                      RuntimeWarning, stacklevel=3)
-    try:
-        yield os.getcwd()
-    finally:
-        os.chdir(saved_dir)
-        if is_temporary:
-            rmtree(name)
-
-
-if hasattr(os, "umask"):
-    @contextlib.contextmanager
-    def temp_umask(umask):
-        """Context manager that temporarily sets the process umask."""
-        oldmask = os.umask(umask)
-        try:
-            yield
-        finally:
-            os.umask(oldmask)
-
-
-def findfile(file, here=__file__, subdir=None):
-    """Try to find a file on sys.path and the working directory.  If it is not
-    found the argument passed to the function is returned (this does not
-    necessarily signal failure; could still be the legitimate path)."""
-    if os.path.isabs(file):
-        return file
-    if subdir is not None:
-        file = os.path.join(subdir, file)
-    path = sys.path
-    path = [os.path.dirname(here)] + path
-    for dn in path:
-        fn = os.path.join(dn, file)
-        if os.path.exists(fn): return fn
-    return file
-
-def create_empty_file(filename):
-    """Create an empty file. If the file already exists, truncate it."""
-    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
-    os.close(fd)
-
-def sortdict(dict):
-    "Like repr(dict), but in sorted order."
-    items = sorted(dict.items())
-    reprpairs = ["%r: %r" % pair for pair in items]
-    withcommas = ", ".join(reprpairs)
-    return "{%s}" % withcommas
-
-def make_bad_fd():
-    """
-    Create an invalid file descriptor by opening and closing a file and return
-    its fd.
-    """
-    file = open(TESTFN, "wb")
-    try:
-        return file.fileno()
-    finally:
-        file.close()
-        unlink(TESTFN)
-
-def check_syntax_error(testcase, statement):
-    testcase.assertRaises(SyntaxError, compile, statement,
-                          '<test string>', 'exec')
-
-def open_urlresource(url, *args, **kw):
-    import urllib.request, urllib.parse
-
-    check = kw.pop('check', None)
-
-    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
-
-    fn = os.path.join(os.path.dirname(__file__), "data", filename)
-
-    def check_valid_file(fn):
-        f = open(fn, *args, **kw)
-        if check is None:
-            return f
-        elif check(f):
-            f.seek(0)
-            return f
-        f.close()
-
-    if os.path.exists(fn):
-        f = check_valid_file(fn)
-        if f is not None:
-            return f
-        unlink(fn)
-
-    # Verify the requirement before downloading the file
-    requires('urlfetch')
-
-    print('\tfetching %s ...' % url, file=get_original_stdout())
-    f = urllib.request.urlopen(url, timeout=15)
-    try:
-        with open(fn, "wb") as out:
-            s = f.read()
-            while s:
-                out.write(s)
-                s = f.read()
-    finally:
-        f.close()
-
-    f = check_valid_file(fn)
-    if f is not None:
-        return f
-    raise TestFailed('invalid resource %r' % fn)
-
-
-class WarningsRecorder(object):
-    """Convenience wrapper for the warnings list returned on
-       entry to the warnings.catch_warnings() context manager.
-    """
-    def __init__(self, warnings_list):
-        self._warnings = warnings_list
-        self._last = 0
-
-    def __getattr__(self, attr):
-        if len(self._warnings) > self._last:
-            return getattr(self._warnings[-1], attr)
-        elif attr in warnings.WarningMessage._WARNING_DETAILS:
-            return None
-        raise AttributeError("%r has no attribute %r" % (self, attr))
-
-    @property
-    def warnings(self):
-        return self._warnings[self._last:]
-
-    def reset(self):
-        self._last = len(self._warnings)
-
-
-def _filterwarnings(filters, quiet=False):
-    """Catch the warnings, then check if all the expected
-    warnings have been raised and re-raise unexpected warnings.
-    If 'quiet' is True, only re-raise the unexpected warnings.
-    """
-    # Clear the warning registry of the calling module
-    # in order to re-raise the warnings.
-    frame = sys._getframe(2)
-    registry = frame.f_globals.get('__warningregistry__')
-    if registry:
-        registry.clear()
-    with warnings.catch_warnings(record=True) as w:
-        # Set filter "always" to record all warnings.  Because
-        # test_warnings swap the module, we need to look up in
-        # the sys.modules dictionary.
-        sys.modules['warnings'].simplefilter("always")
-        yield WarningsRecorder(w)
-    # Filter the recorded warnings
-    reraise = list(w)
-    missing = []
-    for msg, cat in filters:
-        seen = False
-        for w in reraise[:]:
-            warning = w.message
-            # Filter out the matching messages
-            if (re.match(msg, str(warning), re.I) and
-                issubclass(warning.__class__, cat)):
-                seen = True
-                reraise.remove(w)
-        if not seen and not quiet:
-            # This filter caught nothing
-            missing.append((msg, cat.__name__))
-    if reraise:
-        raise AssertionError("unhandled warning %s" % reraise[0])
-    if missing:
-        raise AssertionError("filter (%r, %s) did not catch any warning" %
-                             missing[0])
-
-
-@contextlib.contextmanager
-def check_warnings(*filters, **kwargs):
-    """Context manager to silence warnings.
-
-    Accept 2-tuples as positional arguments:
-        ("message regexp", WarningCategory)
-
-    Optional argument:
-     - if 'quiet' is True, it does not fail if a filter catches nothing
-        (default True without argument,
-         default False if some filters are defined)
-
-    Without argument, it defaults to:
-        check_warnings(("", Warning), quiet=True)
-    """
-    quiet = kwargs.get('quiet')
-    if not filters:
-        filters = (("", Warning),)
-        # Preserve backward compatibility
-        if quiet is None:
-            quiet = True
-    return _filterwarnings(filters, quiet)
-
-
-class CleanImport(object):
-    """Context manager to force import to return a new module reference.
-
-    This is useful for testing module-level behaviours, such as
-    the emission of a DeprecationWarning on import.
-
-    Use like this:
-
-        with CleanImport("foo"):
-            importlib.import_module("foo") # new reference
-    """
-
-    def __init__(self, *module_names):
-        self.original_modules = sys.modules.copy()
-        for module_name in module_names:
-            if module_name in sys.modules:
-                module = sys.modules[module_name]
-                # It is possible that module_name is just an alias for
-                # another module (e.g. stub for modules renamed in 3.x).
-                # In that case, we also need delete the real module to clear
-                # the import cache.
-                if module.__name__ != module_name:
-                    del sys.modules[module.__name__]
-                del sys.modules[module_name]
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        sys.modules.update(self.original_modules)
-
-
-class EnvironmentVarGuard(collections.abc.MutableMapping):
-
-    """Class to help protect the environment variable properly.  Can be used as
-    a context manager."""
-
-    def __init__(self):
-        self._environ = os.environ
-        self._changed = {}
-
-    def __getitem__(self, envvar):
-        return self._environ[envvar]
-
-    def __setitem__(self, envvar, value):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = self._environ.get(envvar)
-        self._environ[envvar] = value
-
-    def __delitem__(self, envvar):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = self._environ.get(envvar)
-        if envvar in self._environ:
-            del self._environ[envvar]
-
-    def keys(self):
-        return self._environ.keys()
-
-    def __iter__(self):
-        return iter(self._environ)
-
-    def __len__(self):
-        return len(self._environ)
-
-    def set(self, envvar, value):
-        self[envvar] = value
-
-    def unset(self, envvar):
-        del self[envvar]
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        for (k, v) in self._changed.items():
-            if v is None:
-                if k in self._environ:
-                    del self._environ[k]
-            else:
-                self._environ[k] = v
-        os.environ = self._environ
-
-
-class DirsOnSysPath(object):
-    """Context manager to temporarily add directories to sys.path.
-
-    This makes a copy of sys.path, appends any directories given
-    as positional arguments, then reverts sys.path to the copied
-    settings when the context ends.
-
-    Note that *all* sys.path modifications in the body of the
-    context manager, including replacement of the object,
-    will be reverted at the end of the block.
-    """
-
-    def __init__(self, *paths):
-        self.original_value = sys.path[:]
-        self.original_object = sys.path
-        sys.path.extend(paths)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        sys.path = self.original_object
-        sys.path[:] = self.original_value
-
-
-class TransientResource(object):
-
-    """Raise ResourceDenied if an exception is raised while the context manager
-    is in effect that matches the specified exception and attributes."""
-
-    def __init__(self, exc, **kwargs):
-        self.exc = exc
-        self.attrs = kwargs
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, type_=None, value=None, traceback=None):
-        """If type_ is a subclass of self.exc and value has attributes matching
-        self.attrs, raise ResourceDenied.  Otherwise let the exception
-        propagate (if any)."""
-        if type_ is not None and issubclass(self.exc, type_):
-            for attr, attr_value in self.attrs.items():
-                if not hasattr(value, attr):
-                    break
-                if getattr(value, attr) != attr_value:
-                    break
-            else:
-                raise ResourceDenied("an optional resource is not available")
-
-# Context managers that raise ResourceDenied when various issues
-# with the Internet connection manifest themselves as exceptions.
-# XXX deprecate these and use transient_internet() instead
-time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
-socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
-ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
-
-
-@contextlib.contextmanager
-def transient_internet(resource_name, *, timeout=30.0, errnos=()):
-    """Return a context manager that raises ResourceDenied when various issues
-    with the Internet connection manifest themselves as exceptions."""
-    default_errnos = [
-        ('ECONNREFUSED', 111),
-        ('ECONNRESET', 104),
-        ('EHOSTUNREACH', 113),
-        ('ENETUNREACH', 101),
-        ('ETIMEDOUT', 110),
-    ]
-    default_gai_errnos = [
-        ('EAI_AGAIN', -3),
-        ('EAI_FAIL', -4),
-        ('EAI_NONAME', -2),
-        ('EAI_NODATA', -5),
-        # Encountered when trying to resolve IPv6-only hostnames
-        ('WSANO_DATA', 11004),
-    ]
-
-    denied = ResourceDenied("Resource %r is not available" % resource_name)
-    captured_errnos = errnos
-    gai_errnos = []
-    if not captured_errnos:
-        captured_errnos = [getattr(errno, name, num)
-                           for (name, num) in default_errnos]
-        gai_errnos = [getattr(socket, name, num)
-                      for (name, num) in default_gai_errnos]
-
-    def filter_error(err):
-        n = getattr(err, 'errno', None)
-        if (isinstance(err, socket.timeout) or
-            (isinstance(err, socket.gaierror) and n in gai_errnos) or
-            n in captured_errnos):
-            if not verbose:
-                sys.stderr.write(denied.args[0] + "\n")
-            raise denied from err
-
-    old_timeout = socket.getdefaulttimeout()
-    try:
-        if timeout is not None:
-            socket.setdefaulttimeout(timeout)
-        yield
-    except IOError as err:
-        # urllib can wrap original socket errors multiple times (!), we must
-        # unwrap to get at the original error.
-        while True:
-            a = err.args
-            if len(a) >= 1 and isinstance(a[0], IOError):
-                err = a[0]
-            # The error can also be wrapped as args[1]:
-            #    except socket.error as msg:
-            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
-            elif len(a) >= 2 and isinstance(a[1], IOError):
-                err = a[1]
-            else:
-                break
-        filter_error(err)
-        raise
-    # XXX should we catch generic exceptions and look for their
-    # __cause__ or __context__?
-    finally:
-        socket.setdefaulttimeout(old_timeout)
-
-
-@contextlib.contextmanager
-def captured_output(stream_name):
-    """Return a context manager used by captured_stdout/stdin/stderr
-    that temporarily replaces the sys stream *stream_name* with a StringIO."""
-    import io
-    orig_stdout = getattr(sys, stream_name)
-    setattr(sys, stream_name, io.StringIO())
-    try:
-        yield getattr(sys, stream_name)
-    finally:
-        setattr(sys, stream_name, orig_stdout)
-
-def captured_stdout():
-    """Capture the output of sys.stdout:
-
-       with captured_stdout() as s:
-           print("hello")
-       self.assertEqual(s.getvalue(), "hello")
-    """
-    return captured_output("stdout")
-
-def captured_stderr():
-    return captured_output("stderr")
-
-def captured_stdin():
-    return captured_output("stdin")
-
-
-def gc_collect():
-    """Force as many objects as possible to be collected.
-
-    In non-CPython implementations of Python, this is needed because timely
-    deallocation is not guaranteed by the garbage collector.  (Even in CPython
-    this can be the case in case of reference cycles.)  This means that __del__
-    methods may be called later than expected and weakrefs may remain alive for
-    longer than expected.  This function tries its best to force all garbage
-    objects to disappear.
-    """
-    gc.collect()
-    if is_jython:
-        time.sleep(0.1)
-    gc.collect()
-    gc.collect()
-
-@contextlib.contextmanager
-def disable_gc():
-    have_gc = gc.isenabled()
-    gc.disable()
-    try:
-        yield
-    finally:
-        if have_gc:
-            gc.enable()
-
-
-def python_is_optimized():
-    """Find if Python was built with optimizations."""
-    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
-    final_opt = ""
-    for opt in cflags.split():
-        if opt.startswith('-O'):
-            final_opt = opt
-    return final_opt != '' and final_opt != '-O0'
-
-
-#=======================================================================
-# Decorator for running a function in a different locale, correctly resetting
-# it afterwards.
-
-def run_with_locale(catstr, *locales):
-    def decorator(func):
-        def inner(*args, **kwds):
-            try:
-                import locale
-                category = getattr(locale, catstr)
-                orig_locale = locale.setlocale(category)
-            except AttributeError:
-                # if the test author gives us an invalid category string
-                raise
-            except:
-                # cannot retrieve original locale, so do nothing
-                locale = orig_locale = None
-            else:
-                for loc in locales:
-                    try:
-                        locale.setlocale(category, loc)
-                        break
-                    except:
-                        pass
-
-            # now run the function, resetting the locale on exceptions
-            try:
-                return func(*args, **kwds)
-            finally:
-                if locale and orig_locale:
-                    locale.setlocale(category, orig_locale)
-        inner.__name__ = func.__name__
-        inner.__doc__ = func.__doc__
-        return inner
-    return decorator
-
-#=======================================================================
-# Big-memory-test support. Separate from 'resources' because memory use
-# should be configurable.
-
-# Some handy shorthands. Note that these are used for byte-limits as well
-# as size-limits, in the various bigmem tests
-_1M = 1024*1024
-_1G = 1024 * _1M
-_2G = 2 * _1G
-_4G = 4 * _1G
-
-MAX_Py_ssize_t = sys.maxsize
-
-def set_memlimit(limit):
-    global max_memuse
-    global real_max_memuse
-    sizes = {
-        'k': 1024,
-        'm': _1M,
-        'g': _1G,
-        't': 1024*_1G,
-    }
-    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
-                 re.IGNORECASE | re.VERBOSE)
-    if m is None:
-        raise ValueError('Invalid memory limit %r' % (limit,))
-    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
-    real_max_memuse = memlimit
-    if memlimit > MAX_Py_ssize_t:
-        memlimit = MAX_Py_ssize_t
-    if memlimit < _2G - 1:
-        raise ValueError('Memory limit %r too low to be useful' % (limit,))
-    max_memuse = memlimit
-
-class _MemoryWatchdog:
-    """An object which periodically watches the process' memory consumption
-    and prints it out.
-    """
-
-    def __init__(self):
-        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
-        self.started = False
-        self.thread = None
-        try:
-            self.page_size = os.sysconf('SC_PAGESIZE')
-        except (ValueError, AttributeError):
-            try:
-                self.page_size = os.sysconf('SC_PAGE_SIZE')
-            except (ValueError, AttributeError):
-                self.page_size = 4096
-
-    def consumer(self, fd):
-        HEADER = "l"
-        header_size = struct.calcsize(HEADER)
-        try:
-            while True:
-                header = os.read(fd, header_size)
-                if len(header) < header_size:
-                    # Pipe closed on other end
-                    break
-                data_len, = struct.unpack(HEADER, header)
-                data = os.read(fd, data_len)
-                statm = data.decode('ascii')
-                data = int(statm.split()[5])
-                print(" ... process data size: {data:.1f}G"
-                       .format(data=data * self.page_size / (1024 ** 3)))
-        finally:
-            os.close(fd)
-
-    def start(self):
-        if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
-            return
-        try:
-            rfd = os.open(self.procfile, os.O_RDONLY)
-        except OSError as e:
-            warnings.warn('/proc not available for stats: {}'.format(e),
-                          RuntimeWarning)
-            sys.stderr.flush()
-            return
-        pipe_fd, wfd = os.pipe()
-        # _file_watchdog() doesn't take the GIL in its child thread, and
-        # therefore collects statistics timely
-        faulthandler._file_watchdog(rfd, wfd, 1.0)
-        self.started = True
-        self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
-        self.thread.daemon = True
-        self.thread.start()
-
-    def stop(self):
-        if not self.started:
-            return
-        faulthandler._cancel_file_watchdog()
-        self.thread.join()
-
-
-def bigmemtest(size, memuse, dry_run=True):
-    """Decorator for bigmem tests.
-
-    'minsize' is the minimum useful size for the test (in arbitrary,
-    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
-    the test, or a good estimate of it.
-
-    if 'dry_run' is False, it means the test doesn't support dummy runs
-    when -M is not specified.
-    """
-    def decorator(f):
-        def wrapper(self):
-            size = wrapper.size
-            memuse = wrapper.memuse
-            if not real_max_memuse:
-                maxsize = 5147
-            else:
-                maxsize = size
-
-            if ((real_max_memuse or not dry_run)
-                and real_max_memuse < maxsize * memuse):
-                raise unittest.SkipTest(
-                    "not enough memory: %.1fG minimum needed"
-                    % (size * memuse / (1024 ** 3)))
-
-            if real_max_memuse and verbose and faulthandler and threading:
-                print()
-                print(" ... expected peak memory use: {peak:.1f}G"
-                      .format(peak=size * memuse / (1024 ** 3)))
-                watchdog = _MemoryWatchdog()
-                watchdog.start()
-            else:
-                watchdog = None
-
-            try:
-                return f(self, maxsize)
-            finally:
-                if watchdog:
-                    watchdog.stop()
-
-        wrapper.size = size
-        wrapper.memuse = memuse
-        return wrapper
-    return decorator
-
-def bigaddrspacetest(f):
-    """Decorator for tests that fill the address space."""
-    def wrapper(self):
-        if max_memuse < MAX_Py_ssize_t:
-            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
-                raise unittest.SkipTest(
-                    "not enough memory: try a 32-bit build instead")
-            else:
-                raise unittest.SkipTest(
-                    "not enough memory: %.1fG minimum needed"
-                    % (MAX_Py_ssize_t / (1024 ** 3)))
-        else:
-            return f(self)
-    return wrapper
-
-#=======================================================================
-# unittest integration.
-
-class BasicTestRunner:
-    def run(self, test):
-        result = unittest.TestResult()
-        test(result)
-        return result
-
-def _id(obj):
-    return obj
-
-def requires_resource(resource):
-    if resource == 'gui' and not _is_gui_available():
-        return unittest.skip("resource 'gui' is not available")
-    if is_resource_enabled(resource):
-        return _id
-    else:
-        return unittest.skip("resource {0!r} is not enabled".format(resource))
-
-def cpython_only(test):
-    """
-    Decorator for tests only applicable on CPython.
-    """
-    return impl_detail(cpython=True)(test)
-
-def impl_detail(msg=None, **guards):
-    if check_impl_detail(**guards):
-        return _id
-    if msg is None:
-        guardnames, default = _parse_guards(guards)
-        if default:
-            msg = "implementation detail not available on {0}"
-        else:
-            msg = "implementation detail specific to {0}"
-        guardnames = sorted(guardnames.keys())
-        msg = msg.format(' or '.join(guardnames))
-    return unittest.skip(msg)
-
-def _parse_guards(guards):
-    # Returns a tuple ({platform_name: run_me}, default_value)
-    if not guards:
-        return ({'cpython': True}, False)
-    is_true = list(guards.values())[0]
-    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
-    return (guards, not is_true)
-
-# Use the following check to guard CPython's implementation-specific tests --
-# or to run them only on the implementation(s) guarded by the arguments.
-def check_impl_detail(**guards):
-    """This function returns True or False depending on the host platform.
-       Examples:
-          if check_impl_detail():               # only on CPython (default)
-          if check_impl_detail(jython=True):    # only on Jython
-          if check_impl_detail(cpython=False):  # everywhere except on CPython
-    """
-    guards, default = _parse_guards(guards)
-    return guards.get(platform.python_implementation().lower(), default)
-
-
-def no_tracing(func):
-    """Decorator to temporarily turn off tracing for the duration of a test."""
-    if not hasattr(sys, 'gettrace'):
-        return func
-    else:
-        @functools.wraps(func)
-        def wrapper(*args, **kwargs):
-            original_trace = sys.gettrace()
-            try:
-                sys.settrace(None)
-                return func(*args, **kwargs)
-            finally:
-                sys.settrace(original_trace)
-        return wrapper
-
-
-def refcount_test(test):
-    """Decorator for tests which involve reference counting.
-
-    To start, the decorator does not run the test if is not run by CPython.
-    After that, any trace function is unset during the test to prevent
-    unexpected refcounts caused by the trace function.
-
-    """
-    return no_tracing(cpython_only(test))
-
-
-def _filter_suite(suite, pred):
-    """Recursively filter test cases in a suite based on a predicate."""
-    newtests = []
-    for test in suite._tests:
-        if isinstance(test, unittest.TestSuite):
-            _filter_suite(test, pred)
-            newtests.append(test)
-        else:
-            if pred(test):
-                newtests.append(test)
-    suite._tests = newtests
-
-def _run_suite(suite):
-    """Run tests from a unittest.TestSuite-derived class."""
-    if verbose:
-        runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
-                                         failfast=failfast)
-    else:
-        runner = BasicTestRunner()
-
-    result = runner.run(suite)
-    if not result.wasSuccessful():
-        if len(result.errors) == 1 and not result.failures:
-            err = result.errors[0][1]
-        elif len(result.failures) == 1 and not result.errors:
-            err = result.failures[0][1]
-        else:
-            err = "multiple errors occurred"
-            if not verbose: err += "; run in verbose mode for details"
-        raise TestFailed(err)
-
-
-def run_unittest(*classes):
-    """Run tests from unittest.TestCase-derived classes."""
-    valid_types = (unittest.TestSuite, unittest.TestCase)
-    suite = unittest.TestSuite()
-    for cls in classes:
-        if isinstance(cls, str):
-            if cls in sys.modules:
-                suite.addTest(unittest.findTestCases(sys.modules[cls]))
-            else:
-                raise ValueError("str arguments must be keys in sys.modules")
-        elif isinstance(cls, valid_types):
-            suite.addTest(cls)
-        else:
-            suite.addTest(unittest.makeSuite(cls))
-    def case_pred(test):
-        if match_tests is None:
-            return True
-        for name in test.id().split("."):
-            if fnmatch.fnmatchcase(name, match_tests):
-                return True
-        return False
-    _filter_suite(suite, case_pred)
-    _run_suite(suite)
-
-
-#=======================================================================
-# doctest driver.
-
-def run_doctest(module, verbosity=None):
-    """Run doctest on the given module.  Return (#failures, #tests).
-
-    If optional argument verbosity is not specified (or is None), pass
-    support's belief about verbosity on to doctest.  Else doctest's
-    usual behavior is used (it searches sys.argv for -v).
-    """
-
-    import doctest
-
-    if verbosity is None:
-        verbosity = verbose
-    else:
-        verbosity = None
-
-    f, t = doctest.testmod(module, verbose=verbosity)
-    if f:
-        raise TestFailed("%d of %d doctests failed" % (f, t))
-    if verbose:
-        print('doctest (%s) ... %d tests with zero failures' %
-              (module.__name__, t))
-    return f, t
-
-
-#=======================================================================
-# Support for saving and restoring the imported modules.
-
-def modules_setup():
-    return sys.modules.copy(),
-
-def modules_cleanup(oldmodules):
-    # Encoders/decoders are registered permanently within the internal
-    # codec cache. If we destroy the corresponding modules their
-    # globals will be set to None which will trip up the cached functions.
-    encodings = [(k, v) for k, v in sys.modules.items()
-                 if k.startswith('encodings.')]
-    sys.modules.clear()
-    sys.modules.update(encodings)
-    # XXX: This kind of problem can affect more than just encodings. In particular
-    # extension modules (such as _ssl) don't cope with reloading properly.
-    # Really, test modules should be cleaning out the test specific modules they
-    # know they added (ala test_runpy) rather than relying on this function (as
-    # test_importhooks and test_pkg do currently).
-    # Implicitly imported *real* modules should be left alone (see issue 10556).
-    sys.modules.update(oldmodules)
-
-#=======================================================================
-# Threading support to prevent reporting refleaks when running regrtest.py -R
-
-# NOTE: we use thread._count() rather than threading.enumerate() (or the
-# moral equivalent thereof) because a threading.Thread object is still alive
-# until its __bootstrap() method has returned, even after it has been
-# unregistered from the threading module.
-# thread._count(), on the other hand, only gets decremented *after* the
-# __bootstrap() method has returned, which gives us reliable reference counts
-# at the end of a test run.
-
-def threading_setup():
-    if _thread:
-        return _thread._count(), threading._dangling.copy()
-    else:
-        return 1, ()
-
-def threading_cleanup(*original_values):
-    if not _thread:
-        return
-    _MAX_COUNT = 10
-    for count in range(_MAX_COUNT):
-        values = _thread._count(), threading._dangling
-        if values == original_values:
-            break
-        time.sleep(0.1)
-        gc_collect()
-    # XXX print a warning in case of failure?
-
-def reap_threads(func):
-    """Use this function when threads are being used.  This will
-    ensure that the threads are cleaned up even when the test fails.
-    If threading is unavailable this function does nothing.
-    """
-    if not _thread:
-        return func
-
-    @functools.wraps(func)
-    def decorator(*args):
-        key = threading_setup()
-        try:
-            return func(*args)
-        finally:
-            threading_cleanup(*key)
-    return decorator
-
-def reap_children():
-    """Use this function at the end of test_main() whenever sub-processes
-    are started.  This will help ensure that no extra children (zombies)
-    stick around to hog resources and create problems when looking
-    for refleaks.
-    """
-
-    # Reap all our dead child processes so we don't leave zombies around.
-    # These hog resources and might be causing some of the buildbots to die.
-    if hasattr(os, 'waitpid'):
-        any_process = -1
-        while True:
-            try:
-                # This will raise an exception on Windows.  That's ok.
-                pid, status = os.waitpid(any_process, os.WNOHANG)
-                if pid == 0:
-                    break
-            except:
-                break
-
-@contextlib.contextmanager
-def swap_attr(obj, attr, new_val):
-    """Temporary swap out an attribute with a new object.
-
-    Usage:
-        with swap_attr(obj, "attr", 5):
-            ...
-
-        This will set obj.attr to 5 for the duration of the with: block,
-        restoring the old value at the end of the block. If `attr` doesn't
-        exist on `obj`, it will be created and then deleted at the end of the
-        block.
-    """
-    if hasattr(obj, attr):
-        real_val = getattr(obj, attr)
-        setattr(obj, attr, new_val)
-        try:
-            yield
-        finally:
-            setattr(obj, attr, real_val)
-    else:
-        setattr(obj, attr, new_val)
-        try:
-            yield
-        finally:
-            delattr(obj, attr)
-
-@contextlib.contextmanager
-def swap_item(obj, item, new_val):
-    """Temporary swap out an item with a new object.
-
-    Usage:
-        with swap_item(obj, "item", 5):
-            ...
-
-        This will set obj["item"] to 5 for the duration of the with: block,
-        restoring the old value at the end of the block. If `item` doesn't
-        exist on `obj`, it will be created and then deleted at the end of the
-        block.
-    """
-    if item in obj:
-        real_val = obj[item]
-        obj[item] = new_val
-        try:
-            yield
-        finally:
-            obj[item] = real_val
-    else:
-        obj[item] = new_val
-        try:
-            yield
-        finally:
-            del obj[item]
-
-def strip_python_stderr(stderr):
-    """Strip the stderr of a Python process from potential debug output
-    emitted by the interpreter.
-
-    This will typically be run on the result of the communicate() method
-    of a subprocess.Popen object.
-    """
-    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
-    return stderr
-
-def args_from_interpreter_flags():
-    """Return a list of command-line arguments reproducing the current
-    settings in sys.flags and sys.warnoptions."""
-    flag_opt_map = {
-        'bytes_warning': 'b',
-        'dont_write_bytecode': 'B',
-        'ignore_environment': 'E',
-        'no_user_site': 's',
-        'no_site': 'S',
-        'optimize': 'O',
-        'verbose': 'v',
-    }
-    args = []
-    for flag, opt in flag_opt_map.items():
-        v = getattr(sys.flags, flag)
-        if v > 0:
-            args.append('-' + opt * v)
-    for opt in sys.warnoptions:
-        args.append('-W' + opt)
-    return args
-
-#============================================================
-# Support for assertions about logging.
-#============================================================
-
-class TestHandler(logging.handlers.BufferingHandler):
-    def __init__(self, matcher):
-        # BufferingHandler takes a "capacity" argument
-        # so as to know when to flush. As we're overriding
-        # shouldFlush anyway, we can set a capacity of zero.
-        # You can call flush() manually to clear out the
-        # buffer.
-        logging.handlers.BufferingHandler.__init__(self, 0)
-        self.matcher = matcher
-
-    def shouldFlush(self):
-        return False
-
-    def emit(self, record):
-        self.format(record)
-        self.buffer.append(record.__dict__)
-
-    def matches(self, **kwargs):
-        """
-        Look for a saved dict whose keys/values match the supplied arguments.
-        """
-        result = False
-        for d in self.buffer:
-            if self.matcher.matches(d, **kwargs):
-                result = True
-                break
-        return result
-
-class Matcher(object):
-
-    _partial_matches = ('msg', 'message')
-
-    def matches(self, d, **kwargs):
-        """
-        Try to match a single dict with the supplied arguments.
-
-        Keys whose values are strings and which are in self._partial_matches
-        will be checked for partial (i.e. substring) matches. You can extend
-        this scheme to (for example) do regular expression matching, etc.
-        """
-        result = True
-        for k in kwargs:
-            v = kwargs[k]
-            dv = d.get(k)
-            if not self.match_value(k, dv, v):
-                result = False
-                break
-        return result
-
-    def match_value(self, k, dv, v):
-        """
-        Try to match a single stored value (dv) with a supplied value (v).
-        """
-        if type(v) != type(dv):
-            result = False
-        elif type(dv) is not str or k not in self._partial_matches:
-            result = (v == dv)
-        else:
-            result = dv.find(v) >= 0
-        return result
-
-
-_can_symlink = None
-def can_symlink():
-    global _can_symlink
-    if _can_symlink is not None:
-        return _can_symlink
-    symlink_path = TESTFN + "can_symlink"
-    try:
-        os.symlink(TESTFN, symlink_path)
-        can = True
-    except (OSError, NotImplementedError, AttributeError):
-        can = False
-    else:
-        os.remove(symlink_path)
-    _can_symlink = can
-    return can
-
-def skip_unless_symlink(test):
-    """Skip decorator for tests that require functional symlink"""
-    ok = can_symlink()
-    msg = "Requires functional symlink implementation"
-    return test if ok else unittest.skip(msg)(test)
-
-def patch(test_instance, object_to_patch, attr_name, new_value):
-    """Override 'object_to_patch'.'attr_name' with 'new_value'.
-
-    Also, add a cleanup procedure to 'test_instance' to restore
-    'object_to_patch' value for 'attr_name'.
-    The 'attr_name' should be a valid attribute for 'object_to_patch'.
-
-    """
-    # check that 'attr_name' is a real attribute for 'object_to_patch'
-    # will raise AttributeError if it does not exist
-    getattr(object_to_patch, attr_name)
-
-    # keep a copy of the old value
-    attr_is_local = False
-    try:
-        old_value = object_to_patch.__dict__[attr_name]
-    except (AttributeError, KeyError):
-        old_value = getattr(object_to_patch, attr_name, None)
-    else:
-        attr_is_local = True
-
-    # restore the value when the test is done
-    def cleanup():
-        if attr_is_local:
-            setattr(object_to_patch, attr_name, old_value)
-        else:
-            delattr(object_to_patch, attr_name)
-
-    test_instance.addCleanup(cleanup)
-
-    # actually override the attribute
-    setattr(object_to_patch, attr_name, new_value)
-- 
cgit v1.2.3