diff options
| author | kali <kali@leap.se> | 2012-08-08 20:06:14 +0900 | 
|---|---|---|
| committer | kali <kali@leap.se> | 2012-08-08 20:06:14 +0900 | 
| commit | b1e025a343d0c6019127871acb47d7b295d342e9 (patch) | |
| tree | b5d175e1bf8b625433ba8d8ec83cb45b0ccc8618 /tests/support_tests.py | |
| parent | c1fa99be4dc4174a34620324ec5056b793196b53 (diff) | |
| parent | 60a51aed9c1ee9249a79b3d996ae86d93a9532de (diff) | |
Merge branch 'tests-cleanup' into develop
moved out old broken stuff;
copied a run_scripts entry point for tests;
created a bunch of (mostly stubs) simple tests on secondary
functions.
Diffstat (limited to 'tests/support_tests.py')
| -rw-r--r-- | tests/support_tests.py | 1725 | 
1 files changed, 0 insertions, 1725 deletions
| diff --git a/tests/support_tests.py b/tests/support_tests.py deleted file mode 100644 index 2c56e12d..00000000 --- a/tests/support_tests.py +++ /dev/null @@ -1,1725 +0,0 @@ -"""Supporting definitions for the Python regression tests.""" - -if __name__ != 'test.support': -    raise ImportError('support must be imported from the test package') - -import contextlib -import errno -import functools -import gc -import socket -import sys -import os -import platform -import shutil -import warnings -import unittest -import importlib -import collections.abc -import re -import subprocess -import imp -import time -import sysconfig -import fnmatch -import logging.handlers -import struct - -try: -    import _thread, threading -except ImportError: -    _thread = None -    threading = None -try: -    import multiprocessing.process -except ImportError: -    multiprocessing = None - -try: -    import faulthandler -except ImportError: -    faulthandler = None - -try: -    import zlib -except ImportError: -    zlib = None - -__all__ = [ -    "Error", "TestFailed", "ResourceDenied", "import_module", -    "verbose", "use_resources", "max_memuse", "record_original_stdout", -    "get_original_stdout", "unload", "unlink", "rmtree", "forget", -    "is_resource_enabled", "requires", "requires_freebsd_version", -    "requires_linux_version", "requires_mac_ver", "find_unused_port", "bind_port", -    "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd", -    "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource", -    "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource", -    "captured_stdout", "captured_stdin", "captured_stderr", "time_out", -    "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask', -    "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", -    "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", -    "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", -    "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", -    "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", -    "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast", -    "anticipate_failure" -    ] - -class Error(Exception): -    """Base class for regression test exceptions.""" - -class TestFailed(Error): -    """Test failed.""" - -class ResourceDenied(unittest.SkipTest): -    """Test skipped because it requested a disallowed resource. - -    This is raised when a test calls requires() for a resource that -    has not be enabled.  It is used to distinguish between expected -    and unexpected skips. -    """ - -@contextlib.contextmanager -def _ignore_deprecated_imports(ignore=True): -    """Context manager to suppress package and module deprecation -    warnings when importing them. - -    If ignore is False, this context manager has no effect.""" -    if ignore: -        with warnings.catch_warnings(): -            warnings.filterwarnings("ignore", ".+ (module|package)", -                                    DeprecationWarning) -            yield -    else: -        yield - - -def import_module(name, deprecated=False): -    """Import and return the module to be tested, raising SkipTest if -    it is not available. - -    If deprecated is True, any module or package deprecation messages -    will be suppressed.""" -    with _ignore_deprecated_imports(deprecated): -        try: -            return importlib.import_module(name) -        except ImportError as msg: -            raise unittest.SkipTest(str(msg)) - - -def _save_and_remove_module(name, orig_modules): -    """Helper function to save and remove a module from sys.modules - -       Raise ImportError if the module can't be imported.""" -    # try to import the module and raise an error if it can't be imported -    if name not in sys.modules: -        __import__(name) -        del sys.modules[name] -    for modname in list(sys.modules): -        if modname == name or modname.startswith(name + '.'): -            orig_modules[modname] = sys.modules[modname] -            del sys.modules[modname] - -def _save_and_block_module(name, orig_modules): -    """Helper function to save and block a module in sys.modules - -       Return True if the module was in sys.modules, False otherwise.""" -    saved = True -    try: -        orig_modules[name] = sys.modules[name] -    except KeyError: -        saved = False -    sys.modules[name] = None -    return saved - - -def anticipate_failure(condition): -    """Decorator to mark a test that is known to be broken in some cases - -       Any use of this decorator should have a comment identifying the -       associated tracker issue. -    """ -    if condition: -        return unittest.expectedFailure -    return lambda f: f - - -def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): -    """Imports and returns a module, deliberately bypassing the sys.modules cache -    and importing a fresh copy of the module. Once the import is complete, -    the sys.modules cache is restored to its original state. - -    Modules named in fresh are also imported anew if needed by the import. -    If one of these modules can't be imported, None is returned. - -    Importing of modules named in blocked is prevented while the fresh import -    takes place. - -    If deprecated is True, any module or package deprecation messages -    will be suppressed.""" -    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks -    # to make sure that this utility function is working as expected -    with _ignore_deprecated_imports(deprecated): -        # Keep track of modules saved for later restoration as well -        # as those which just need a blocking entry removed -        orig_modules = {} -        names_to_remove = [] -        _save_and_remove_module(name, orig_modules) -        try: -            for fresh_name in fresh: -                _save_and_remove_module(fresh_name, orig_modules) -            for blocked_name in blocked: -                if not _save_and_block_module(blocked_name, orig_modules): -                    names_to_remove.append(blocked_name) -            fresh_module = importlib.import_module(name) -        except ImportError: -            fresh_module = None -        finally: -            for orig_name, module in orig_modules.items(): -                sys.modules[orig_name] = module -            for name_to_remove in names_to_remove: -                del sys.modules[name_to_remove] -        return fresh_module - - -def get_attribute(obj, name): -    """Get an attribute, raising SkipTest if AttributeError is raised.""" -    try: -        attribute = getattr(obj, name) -    except AttributeError: -        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) -    else: -        return attribute - -verbose = 1              # Flag set to 0 by regrtest.py -use_resources = None     # Flag set to [] by regrtest.py -max_memuse = 0           # Disable bigmem tests (they will still be run with -                         # small sizes, to make sure they work.) -real_max_memuse = 0 -failfast = False -match_tests = None - -# _original_stdout is meant to hold stdout at the time regrtest began. -# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. -# The point is to have some flavor of stdout the user can actually see. -_original_stdout = None -def record_original_stdout(stdout): -    global _original_stdout -    _original_stdout = stdout - -def get_original_stdout(): -    return _original_stdout or sys.stdout - -def unload(name): -    try: -        del sys.modules[name] -    except KeyError: -        pass - -def unlink(filename): -    try: -        os.unlink(filename) -    except OSError as error: -        # The filename need not exist. -        if error.errno not in (errno.ENOENT, errno.ENOTDIR): -            raise - -def rmtree(path): -    try: -        shutil.rmtree(path) -    except OSError as error: -        if error.errno != errno.ENOENT: -            raise - -def make_legacy_pyc(source): -    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location. - -    The choice of .pyc or .pyo extension is done based on the __debug__ flag -    value. - -    :param source: The file system path to the source file.  The source file -        does not need to exist, however the PEP 3147 pyc file must exist. -    :return: The file system path to the legacy pyc file. -    """ -    pyc_file = imp.cache_from_source(source) -    up_one = os.path.dirname(os.path.abspath(source)) -    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o')) -    os.rename(pyc_file, legacy_pyc) -    return legacy_pyc - -def forget(modname): -    """'Forget' a module was ever imported. - -    This removes the module from sys.modules and deletes any PEP 3147 or -    legacy .pyc and .pyo files. -    """ -    unload(modname) -    for dirname in sys.path: -        source = os.path.join(dirname, modname + '.py') -        # It doesn't matter if they exist or not, unlink all possible -        # combinations of PEP 3147 and legacy pyc and pyo files. -        unlink(source + 'c') -        unlink(source + 'o') -        unlink(imp.cache_from_source(source, debug_override=True)) -        unlink(imp.cache_from_source(source, debug_override=False)) - -# On some platforms, should not run gui test even if it is allowed -# in `use_resources'. -if sys.platform.startswith('win'): -    import ctypes -    import ctypes.wintypes -    def _is_gui_available(): -        UOI_FLAGS = 1 -        WSF_VISIBLE = 0x0001 -        class USEROBJECTFLAGS(ctypes.Structure): -            _fields_ = [("fInherit", ctypes.wintypes.BOOL), -                        ("fReserved", ctypes.wintypes.BOOL), -                        ("dwFlags", ctypes.wintypes.DWORD)] -        dll = ctypes.windll.user32 -        h = dll.GetProcessWindowStation() -        if not h: -            raise ctypes.WinError() -        uof = USEROBJECTFLAGS() -        needed = ctypes.wintypes.DWORD() -        res = dll.GetUserObjectInformationW(h, -            UOI_FLAGS, -            ctypes.byref(uof), -            ctypes.sizeof(uof), -            ctypes.byref(needed)) -        if not res: -            raise ctypes.WinError() -        return bool(uof.dwFlags & WSF_VISIBLE) -else: -    def _is_gui_available(): -        return True - -def is_resource_enabled(resource): -    """Test whether a resource is enabled.  Known resources are set by -    regrtest.py.""" -    return use_resources is not None and resource in use_resources - -def requires(resource, msg=None): -    """Raise ResourceDenied if the specified resource is not available. - -    If the caller's module is __main__ then automatically return True.  The -    possibility of False being returned occurs when regrtest.py is -    executing. -    """ -    if resource == 'gui' and not _is_gui_available(): -        raise unittest.SkipTest("Cannot use the 'gui' resource") -    # see if the caller's module is __main__ - if so, treat as if -    # the resource was set -    if sys._getframe(1).f_globals.get("__name__") == "__main__": -        return -    if not is_resource_enabled(resource): -        if msg is None: -            msg = "Use of the %r resource not enabled" % resource -        raise ResourceDenied(msg) - -def _requires_unix_version(sysname, min_version): -    """Decorator raising SkipTest if the OS is `sysname` and the version is less -    than `min_version`. - -    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if -    the FreeBSD version is less than 7.2. -    """ -    def decorator(func): -        @functools.wraps(func) -        def wrapper(*args, **kw): -            if platform.system() == sysname: -                version_txt = platform.release().split('-', 1)[0] -                try: -                    version = tuple(map(int, version_txt.split('.'))) -                except ValueError: -                    pass -                else: -                    if version < min_version: -                        min_version_txt = '.'.join(map(str, min_version)) -                        raise unittest.SkipTest( -                            "%s version %s or higher required, not %s" -                            % (sysname, min_version_txt, version_txt)) -        return wrapper -    return decorator - -def requires_freebsd_version(*min_version): -    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is -    less than `min_version`. - -    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD -    version is less than 7.2. -    """ -    return _requires_unix_version('FreeBSD', min_version) - -def requires_linux_version(*min_version): -    """Decorator raising SkipTest if the OS is Linux and the Linux version is -    less than `min_version`. - -    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux -    version is less than 2.6.32. -    """ -    return _requires_unix_version('Linux', min_version) - -def requires_mac_ver(*min_version): -    """Decorator raising SkipTest if the OS is Mac OS X and the OS X -    version if less than min_version. - -    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version -    is lesser than 10.5. -    """ -    def decorator(func): -        @functools.wraps(func) -        def wrapper(*args, **kw): -            if sys.platform == 'darwin': -                version_txt = platform.mac_ver()[0] -                try: -                    version = tuple(map(int, version_txt.split('.'))) -                except ValueError: -                    pass -                else: -                    if version < min_version: -                        min_version_txt = '.'.join(map(str, min_version)) -                        raise unittest.SkipTest( -                            "Mac OS X %s or higher required, not %s" -                            % (min_version_txt, version_txt)) -            return func(*args, **kw) -        wrapper.min_version = min_version -        return wrapper -    return decorator - - -HOST = 'localhost' - -def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): -    """Returns an unused port that should be suitable for binding.  This is -    achieved by creating a temporary socket with the same family and type as -    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to -    the specified host address (defaults to 0.0.0.0) with the port set to 0, -    eliciting an unused ephemeral port from the OS.  The temporary socket is -    then closed and deleted, and the ephemeral port is returned. - -    Either this method or bind_port() should be used for any tests where a -    server socket needs to be bound to a particular port for the duration of -    the test.  Which one to use depends on whether the calling code is creating -    a python socket, or if an unused port needs to be provided in a constructor -    or passed to an external program (i.e. the -accept argument to openssl's -    s_server mode).  Always prefer bind_port() over find_unused_port() where -    possible.  Hard coded ports should *NEVER* be used.  As soon as a server -    socket is bound to a hard coded port, the ability to run multiple instances -    of the test simultaneously on the same host is compromised, which makes the -    test a ticking time bomb in a buildbot environment. On Unix buildbots, this -    may simply manifest as a failed test, which can be recovered from without -    intervention in most cases, but on Windows, the entire python process can -    completely and utterly wedge, requiring someone to log in to the buildbot -    and manually kill the affected process. - -    (This is easy to reproduce on Windows, unfortunately, and can be traced to -    the SO_REUSEADDR socket option having different semantics on Windows versus -    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, -    listen and then accept connections on identical host/ports.  An EADDRINUSE -    socket.error will be raised at some point (depending on the platform and -    the order bind and listen were called on each socket). - -    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE -    will ever be raised when attempting to bind two identical host/ports. When -    accept() is called on each socket, the second caller's process will steal -    the port from the first caller, leaving them both in an awkwardly wedged -    state where they'll no longer respond to any signals or graceful kills, and -    must be forcibly killed via OpenProcess()/TerminateProcess(). - -    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option -    instead of SO_REUSEADDR, which effectively affords the same semantics as -    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open -    Source world compared to Windows ones, this is a common mistake.  A quick -    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when -    openssl.exe is called with the 's_server' option, for example. See -    http://bugs.python.org/issue2550 for more info.  The following site also -    has a very thorough description about the implications of both REUSEADDR -    and EXCLUSIVEADDRUSE on Windows: -    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) - -    XXX: although this approach is a vast improvement on previous attempts to -    elicit unused ports, it rests heavily on the assumption that the ephemeral -    port returned to us by the OS won't immediately be dished back out to some -    other process when we close and delete our temporary socket but before our -    calling code has a chance to bind the returned port.  We can deal with this -    issue if/when we come across it. -    """ - -    tempsock = socket.socket(family, socktype) -    port = bind_port(tempsock) -    tempsock.close() -    del tempsock -    return port - -def bind_port(sock, host=HOST): -    """Bind the socket to a free port and return the port number.  Relies on -    ephemeral ports in order to ensure we are using an unbound port.  This is -    important as many tests may be running simultaneously, especially in a -    buildbot environment.  This method raises an exception if the sock.family -    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR -    or SO_REUSEPORT set on it.  Tests should *never* set these socket options -    for TCP/IP sockets.  The only case for setting these options is testing -    multicasting via multiple UDP sockets. - -    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. -    on Windows), it will be set on the socket.  This will prevent anyone else -    from bind()'ing to our host/port for the duration of the test. -    """ - -    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: -        if hasattr(socket, 'SO_REUSEADDR'): -            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: -                raise TestFailed("tests should never set the SO_REUSEADDR "   \ -                                 "socket option on TCP/IP sockets!") -        if hasattr(socket, 'SO_REUSEPORT'): -            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: -                raise TestFailed("tests should never set the SO_REUSEPORT "   \ -                                 "socket option on TCP/IP sockets!") -        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): -            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) - -    sock.bind((host, 0)) -    port = sock.getsockname()[1] -    return port - -def _is_ipv6_enabled(): -    """Check whether IPv6 is enabled on this host.""" -    if socket.has_ipv6: -        try: -            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) -            sock.bind(('::1', 0)) -        except (socket.error, socket.gaierror): -            pass -        else: -            sock.close() -            return True -    return False - -IPV6_ENABLED = _is_ipv6_enabled() - - -# A constant likely larger than the underlying OS pipe buffer size. -# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe -# buffer size: take 1M to be sure. -PIPE_MAX_SIZE = 1024 * 1024 - - -# decorator for skipping tests on non-IEEE 754 platforms -requires_IEEE_754 = unittest.skipUnless( -    float.__getformat__("double").startswith("IEEE"), -    "test requires IEEE 754 doubles") - -requires_zlib = unittest.skipUnless(zlib, 'requires zlib') - -is_jython = sys.platform.startswith('java') - -# Filename used for testing -if os.name == 'java': -    # Jython disallows @ in module names -    TESTFN = '$test' -else: -    TESTFN = '@test' - -# Disambiguate TESTFN for parallel testing, while letting it remain a valid -# module name. -TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) - - -# TESTFN_UNICODE is a non-ascii filename -TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" -if sys.platform == 'darwin': -    # In Mac OS X's VFS API file names are, by definition, canonically -    # decomposed Unicode, encoded using UTF-8. See QA1173: -    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html -    import unicodedata -    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) -TESTFN_ENCODING = sys.getfilesystemencoding() - -# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be -# encoded by the filesystem encoding (in strict mode). It can be None if we -# cannot generate such filename. -TESTFN_UNENCODABLE = None -if os.name in ('nt', 'ce'): -    # skip win32s (0) or Windows 9x/ME (1) -    if sys.getwindowsversion().platform >= 2: -        # Different kinds of characters from various languages to minimize the -        # probability that the whole name is encodable to MBCS (issue #9819) -        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" -        try: -            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) -        except UnicodeEncodeError: -            pass -        else: -            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' -                  'Unicode filename tests may not be effective' -                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) -            TESTFN_UNENCODABLE = None -# Mac OS X denies unencodable filenames (invalid utf-8) -elif sys.platform != 'darwin': -    try: -        # ascii and utf-8 cannot encode the byte 0xff -        b'\xff'.decode(TESTFN_ENCODING) -    except UnicodeDecodeError: -        # 0xff will be encoded using the surrogate character u+DCFF -        TESTFN_UNENCODABLE = TESTFN \ -            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') -    else: -        # File system encoding (eg. ISO-8859-* encodings) can encode -        # the byte 0xff. Skip some unicode filename tests. -        pass - -# Save the initial cwd -SAVEDCWD = os.getcwd() - -@contextlib.contextmanager -def temp_cwd(name='tempcwd', quiet=False, path=None): -    """ -    Context manager that temporarily changes the CWD. - -    An existing path may be provided as *path*, in which case this -    function makes no changes to the file system. - -    Otherwise, the new CWD is created in the current directory and it's -    named *name*. If *quiet* is False (default) and it's not possible to -    create or change the CWD, an error is raised.  If it's True, only a -    warning is raised and the original CWD is used. -    """ -    saved_dir = os.getcwd() -    is_temporary = False -    if path is None: -        path = name -        try: -            os.mkdir(name) -            is_temporary = True -        except OSError: -            if not quiet: -                raise -            warnings.warn('tests may fail, unable to create temp CWD ' + name, -                          RuntimeWarning, stacklevel=3) -    try: -        os.chdir(path) -    except OSError: -        if not quiet: -            raise -        warnings.warn('tests may fail, unable to change the CWD to ' + name, -                      RuntimeWarning, stacklevel=3) -    try: -        yield os.getcwd() -    finally: -        os.chdir(saved_dir) -        if is_temporary: -            rmtree(name) - - -if hasattr(os, "umask"): -    @contextlib.contextmanager -    def temp_umask(umask): -        """Context manager that temporarily sets the process umask.""" -        oldmask = os.umask(umask) -        try: -            yield -        finally: -            os.umask(oldmask) - - -def findfile(file, here=__file__, subdir=None): -    """Try to find a file on sys.path and the working directory.  If it is not -    found the argument passed to the function is returned (this does not -    necessarily signal failure; could still be the legitimate path).""" -    if os.path.isabs(file): -        return file -    if subdir is not None: -        file = os.path.join(subdir, file) -    path = sys.path -    path = [os.path.dirname(here)] + path -    for dn in path: -        fn = os.path.join(dn, file) -        if os.path.exists(fn): return fn -    return file - -def create_empty_file(filename): -    """Create an empty file. If the file already exists, truncate it.""" -    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) -    os.close(fd) - -def sortdict(dict): -    "Like repr(dict), but in sorted order." -    items = sorted(dict.items()) -    reprpairs = ["%r: %r" % pair for pair in items] -    withcommas = ", ".join(reprpairs) -    return "{%s}" % withcommas - -def make_bad_fd(): -    """ -    Create an invalid file descriptor by opening and closing a file and return -    its fd. -    """ -    file = open(TESTFN, "wb") -    try: -        return file.fileno() -    finally: -        file.close() -        unlink(TESTFN) - -def check_syntax_error(testcase, statement): -    testcase.assertRaises(SyntaxError, compile, statement, -                          '<test string>', 'exec') - -def open_urlresource(url, *args, **kw): -    import urllib.request, urllib.parse - -    check = kw.pop('check', None) - -    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! - -    fn = os.path.join(os.path.dirname(__file__), "data", filename) - -    def check_valid_file(fn): -        f = open(fn, *args, **kw) -        if check is None: -            return f -        elif check(f): -            f.seek(0) -            return f -        f.close() - -    if os.path.exists(fn): -        f = check_valid_file(fn) -        if f is not None: -            return f -        unlink(fn) - -    # Verify the requirement before downloading the file -    requires('urlfetch') - -    print('\tfetching %s ...' % url, file=get_original_stdout()) -    f = urllib.request.urlopen(url, timeout=15) -    try: -        with open(fn, "wb") as out: -            s = f.read() -            while s: -                out.write(s) -                s = f.read() -    finally: -        f.close() - -    f = check_valid_file(fn) -    if f is not None: -        return f -    raise TestFailed('invalid resource %r' % fn) - - -class WarningsRecorder(object): -    """Convenience wrapper for the warnings list returned on -       entry to the warnings.catch_warnings() context manager. -    """ -    def __init__(self, warnings_list): -        self._warnings = warnings_list -        self._last = 0 - -    def __getattr__(self, attr): -        if len(self._warnings) > self._last: -            return getattr(self._warnings[-1], attr) -        elif attr in warnings.WarningMessage._WARNING_DETAILS: -            return None -        raise AttributeError("%r has no attribute %r" % (self, attr)) - -    @property -    def warnings(self): -        return self._warnings[self._last:] - -    def reset(self): -        self._last = len(self._warnings) - - -def _filterwarnings(filters, quiet=False): -    """Catch the warnings, then check if all the expected -    warnings have been raised and re-raise unexpected warnings. -    If 'quiet' is True, only re-raise the unexpected warnings. -    """ -    # Clear the warning registry of the calling module -    # in order to re-raise the warnings. -    frame = sys._getframe(2) -    registry = frame.f_globals.get('__warningregistry__') -    if registry: -        registry.clear() -    with warnings.catch_warnings(record=True) as w: -        # Set filter "always" to record all warnings.  Because -        # test_warnings swap the module, we need to look up in -        # the sys.modules dictionary. -        sys.modules['warnings'].simplefilter("always") -        yield WarningsRecorder(w) -    # Filter the recorded warnings -    reraise = list(w) -    missing = [] -    for msg, cat in filters: -        seen = False -        for w in reraise[:]: -            warning = w.message -            # Filter out the matching messages -            if (re.match(msg, str(warning), re.I) and -                issubclass(warning.__class__, cat)): -                seen = True -                reraise.remove(w) -        if not seen and not quiet: -            # This filter caught nothing -            missing.append((msg, cat.__name__)) -    if reraise: -        raise AssertionError("unhandled warning %s" % reraise[0]) -    if missing: -        raise AssertionError("filter (%r, %s) did not catch any warning" % -                             missing[0]) - - -@contextlib.contextmanager -def check_warnings(*filters, **kwargs): -    """Context manager to silence warnings. - -    Accept 2-tuples as positional arguments: -        ("message regexp", WarningCategory) - -    Optional argument: -     - if 'quiet' is True, it does not fail if a filter catches nothing -        (default True without argument, -         default False if some filters are defined) - -    Without argument, it defaults to: -        check_warnings(("", Warning), quiet=True) -    """ -    quiet = kwargs.get('quiet') -    if not filters: -        filters = (("", Warning),) -        # Preserve backward compatibility -        if quiet is None: -            quiet = True -    return _filterwarnings(filters, quiet) - - -class CleanImport(object): -    """Context manager to force import to return a new module reference. - -    This is useful for testing module-level behaviours, such as -    the emission of a DeprecationWarning on import. - -    Use like this: - -        with CleanImport("foo"): -            importlib.import_module("foo") # new reference -    """ - -    def __init__(self, *module_names): -        self.original_modules = sys.modules.copy() -        for module_name in module_names: -            if module_name in sys.modules: -                module = sys.modules[module_name] -                # It is possible that module_name is just an alias for -                # another module (e.g. stub for modules renamed in 3.x). -                # In that case, we also need delete the real module to clear -                # the import cache. -                if module.__name__ != module_name: -                    del sys.modules[module.__name__] -                del sys.modules[module_name] - -    def __enter__(self): -        return self - -    def __exit__(self, *ignore_exc): -        sys.modules.update(self.original_modules) - - -class EnvironmentVarGuard(collections.abc.MutableMapping): - -    """Class to help protect the environment variable properly.  Can be used as -    a context manager.""" - -    def __init__(self): -        self._environ = os.environ -        self._changed = {} - -    def __getitem__(self, envvar): -        return self._environ[envvar] - -    def __setitem__(self, envvar, value): -        # Remember the initial value on the first access -        if envvar not in self._changed: -            self._changed[envvar] = self._environ.get(envvar) -        self._environ[envvar] = value - -    def __delitem__(self, envvar): -        # Remember the initial value on the first access -        if envvar not in self._changed: -            self._changed[envvar] = self._environ.get(envvar) -        if envvar in self._environ: -            del self._environ[envvar] - -    def keys(self): -        return self._environ.keys() - -    def __iter__(self): -        return iter(self._environ) - -    def __len__(self): -        return len(self._environ) - -    def set(self, envvar, value): -        self[envvar] = value - -    def unset(self, envvar): -        del self[envvar] - -    def __enter__(self): -        return self - -    def __exit__(self, *ignore_exc): -        for (k, v) in self._changed.items(): -            if v is None: -                if k in self._environ: -                    del self._environ[k] -            else: -                self._environ[k] = v -        os.environ = self._environ - - -class DirsOnSysPath(object): -    """Context manager to temporarily add directories to sys.path. - -    This makes a copy of sys.path, appends any directories given -    as positional arguments, then reverts sys.path to the copied -    settings when the context ends. - -    Note that *all* sys.path modifications in the body of the -    context manager, including replacement of the object, -    will be reverted at the end of the block. -    """ - -    def __init__(self, *paths): -        self.original_value = sys.path[:] -        self.original_object = sys.path -        sys.path.extend(paths) - -    def __enter__(self): -        return self - -    def __exit__(self, *ignore_exc): -        sys.path = self.original_object -        sys.path[:] = self.original_value - - -class TransientResource(object): - -    """Raise ResourceDenied if an exception is raised while the context manager -    is in effect that matches the specified exception and attributes.""" - -    def __init__(self, exc, **kwargs): -        self.exc = exc -        self.attrs = kwargs - -    def __enter__(self): -        return self - -    def __exit__(self, type_=None, value=None, traceback=None): -        """If type_ is a subclass of self.exc and value has attributes matching -        self.attrs, raise ResourceDenied.  Otherwise let the exception -        propagate (if any).""" -        if type_ is not None and issubclass(self.exc, type_): -            for attr, attr_value in self.attrs.items(): -                if not hasattr(value, attr): -                    break -                if getattr(value, attr) != attr_value: -                    break -            else: -                raise ResourceDenied("an optional resource is not available") - -# Context managers that raise ResourceDenied when various issues -# with the Internet connection manifest themselves as exceptions. -# XXX deprecate these and use transient_internet() instead -time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) -socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) -ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) - - -@contextlib.contextmanager -def transient_internet(resource_name, *, timeout=30.0, errnos=()): -    """Return a context manager that raises ResourceDenied when various issues -    with the Internet connection manifest themselves as exceptions.""" -    default_errnos = [ -        ('ECONNREFUSED', 111), -        ('ECONNRESET', 104), -        ('EHOSTUNREACH', 113), -        ('ENETUNREACH', 101), -        ('ETIMEDOUT', 110), -    ] -    default_gai_errnos = [ -        ('EAI_AGAIN', -3), -        ('EAI_FAIL', -4), -        ('EAI_NONAME', -2), -        ('EAI_NODATA', -5), -        # Encountered when trying to resolve IPv6-only hostnames -        ('WSANO_DATA', 11004), -    ] - -    denied = ResourceDenied("Resource %r is not available" % resource_name) -    captured_errnos = errnos -    gai_errnos = [] -    if not captured_errnos: -        captured_errnos = [getattr(errno, name, num) -                           for (name, num) in default_errnos] -        gai_errnos = [getattr(socket, name, num) -                      for (name, num) in default_gai_errnos] - -    def filter_error(err): -        n = getattr(err, 'errno', None) -        if (isinstance(err, socket.timeout) or -            (isinstance(err, socket.gaierror) and n in gai_errnos) or -            n in captured_errnos): -            if not verbose: -                sys.stderr.write(denied.args[0] + "\n") -            raise denied from err - -    old_timeout = socket.getdefaulttimeout() -    try: -        if timeout is not None: -            socket.setdefaulttimeout(timeout) -        yield -    except IOError as err: -        # urllib can wrap original socket errors multiple times (!), we must -        # unwrap to get at the original error. -        while True: -            a = err.args -            if len(a) >= 1 and isinstance(a[0], IOError): -                err = a[0] -            # The error can also be wrapped as args[1]: -            #    except socket.error as msg: -            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) -            elif len(a) >= 2 and isinstance(a[1], IOError): -                err = a[1] -            else: -                break -        filter_error(err) -        raise -    # XXX should we catch generic exceptions and look for their -    # __cause__ or __context__? -    finally: -        socket.setdefaulttimeout(old_timeout) - - -@contextlib.contextmanager -def captured_output(stream_name): -    """Return a context manager used by captured_stdout/stdin/stderr -    that temporarily replaces the sys stream *stream_name* with a StringIO.""" -    import io -    orig_stdout = getattr(sys, stream_name) -    setattr(sys, stream_name, io.StringIO()) -    try: -        yield getattr(sys, stream_name) -    finally: -        setattr(sys, stream_name, orig_stdout) - -def captured_stdout(): -    """Capture the output of sys.stdout: - -       with captured_stdout() as s: -           print("hello") -       self.assertEqual(s.getvalue(), "hello") -    """ -    return captured_output("stdout") - -def captured_stderr(): -    return captured_output("stderr") - -def captured_stdin(): -    return captured_output("stdin") - - -def gc_collect(): -    """Force as many objects as possible to be collected. - -    In non-CPython implementations of Python, this is needed because timely -    deallocation is not guaranteed by the garbage collector.  (Even in CPython -    this can be the case in case of reference cycles.)  This means that __del__ -    methods may be called later than expected and weakrefs may remain alive for -    longer than expected.  This function tries its best to force all garbage -    objects to disappear. -    """ -    gc.collect() -    if is_jython: -        time.sleep(0.1) -    gc.collect() -    gc.collect() - -@contextlib.contextmanager -def disable_gc(): -    have_gc = gc.isenabled() -    gc.disable() -    try: -        yield -    finally: -        if have_gc: -            gc.enable() - - -def python_is_optimized(): -    """Find if Python was built with optimizations.""" -    cflags = sysconfig.get_config_var('PY_CFLAGS') or '' -    final_opt = "" -    for opt in cflags.split(): -        if opt.startswith('-O'): -            final_opt = opt -    return final_opt != '' and final_opt != '-O0' - - -#======================================================================= -# Decorator for running a function in a different locale, correctly resetting -# it afterwards. - -def run_with_locale(catstr, *locales): -    def decorator(func): -        def inner(*args, **kwds): -            try: -                import locale -                category = getattr(locale, catstr) -                orig_locale = locale.setlocale(category) -            except AttributeError: -                # if the test author gives us an invalid category string -                raise -            except: -                # cannot retrieve original locale, so do nothing -                locale = orig_locale = None -            else: -                for loc in locales: -                    try: -                        locale.setlocale(category, loc) -                        break -                    except: -                        pass - -            # now run the function, resetting the locale on exceptions -            try: -                return func(*args, **kwds) -            finally: -                if locale and orig_locale: -                    locale.setlocale(category, orig_locale) -        inner.__name__ = func.__name__ -        inner.__doc__ = func.__doc__ -        return inner -    return decorator - -#======================================================================= -# Big-memory-test support. Separate from 'resources' because memory use -# should be configurable. - -# Some handy shorthands. Note that these are used for byte-limits as well -# as size-limits, in the various bigmem tests -_1M = 1024*1024 -_1G = 1024 * _1M -_2G = 2 * _1G -_4G = 4 * _1G - -MAX_Py_ssize_t = sys.maxsize - -def set_memlimit(limit): -    global max_memuse -    global real_max_memuse -    sizes = { -        'k': 1024, -        'm': _1M, -        'g': _1G, -        't': 1024*_1G, -    } -    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, -                 re.IGNORECASE | re.VERBOSE) -    if m is None: -        raise ValueError('Invalid memory limit %r' % (limit,)) -    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) -    real_max_memuse = memlimit -    if memlimit > MAX_Py_ssize_t: -        memlimit = MAX_Py_ssize_t -    if memlimit < _2G - 1: -        raise ValueError('Memory limit %r too low to be useful' % (limit,)) -    max_memuse = memlimit - -class _MemoryWatchdog: -    """An object which periodically watches the process' memory consumption -    and prints it out. -    """ - -    def __init__(self): -        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) -        self.started = False -        self.thread = None -        try: -            self.page_size = os.sysconf('SC_PAGESIZE') -        except (ValueError, AttributeError): -            try: -                self.page_size = os.sysconf('SC_PAGE_SIZE') -            except (ValueError, AttributeError): -                self.page_size = 4096 - -    def consumer(self, fd): -        HEADER = "l" -        header_size = struct.calcsize(HEADER) -        try: -            while True: -                header = os.read(fd, header_size) -                if len(header) < header_size: -                    # Pipe closed on other end -                    break -                data_len, = struct.unpack(HEADER, header) -                data = os.read(fd, data_len) -                statm = data.decode('ascii') -                data = int(statm.split()[5]) -                print(" ... process data size: {data:.1f}G" -                       .format(data=data * self.page_size / (1024 ** 3))) -        finally: -            os.close(fd) - -    def start(self): -        if not faulthandler or not hasattr(faulthandler, '_file_watchdog'): -            return -        try: -            rfd = os.open(self.procfile, os.O_RDONLY) -        except OSError as e: -            warnings.warn('/proc not available for stats: {}'.format(e), -                          RuntimeWarning) -            sys.stderr.flush() -            return -        pipe_fd, wfd = os.pipe() -        # _file_watchdog() doesn't take the GIL in its child thread, and -        # therefore collects statistics timely -        faulthandler._file_watchdog(rfd, wfd, 1.0) -        self.started = True -        self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,)) -        self.thread.daemon = True -        self.thread.start() - -    def stop(self): -        if not self.started: -            return -        faulthandler._cancel_file_watchdog() -        self.thread.join() - - -def bigmemtest(size, memuse, dry_run=True): -    """Decorator for bigmem tests. - -    'minsize' is the minimum useful size for the test (in arbitrary, -    test-interpreted units.) 'memuse' is the number of 'bytes per size' for -    the test, or a good estimate of it. - -    if 'dry_run' is False, it means the test doesn't support dummy runs -    when -M is not specified. -    """ -    def decorator(f): -        def wrapper(self): -            size = wrapper.size -            memuse = wrapper.memuse -            if not real_max_memuse: -                maxsize = 5147 -            else: -                maxsize = size - -            if ((real_max_memuse or not dry_run) -                and real_max_memuse < maxsize * memuse): -                raise unittest.SkipTest( -                    "not enough memory: %.1fG minimum needed" -                    % (size * memuse / (1024 ** 3))) - -            if real_max_memuse and verbose and faulthandler and threading: -                print() -                print(" ... expected peak memory use: {peak:.1f}G" -                      .format(peak=size * memuse / (1024 ** 3))) -                watchdog = _MemoryWatchdog() -                watchdog.start() -            else: -                watchdog = None - -            try: -                return f(self, maxsize) -            finally: -                if watchdog: -                    watchdog.stop() - -        wrapper.size = size -        wrapper.memuse = memuse -        return wrapper -    return decorator - -def bigaddrspacetest(f): -    """Decorator for tests that fill the address space.""" -    def wrapper(self): -        if max_memuse < MAX_Py_ssize_t: -            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: -                raise unittest.SkipTest( -                    "not enough memory: try a 32-bit build instead") -            else: -                raise unittest.SkipTest( -                    "not enough memory: %.1fG minimum needed" -                    % (MAX_Py_ssize_t / (1024 ** 3))) -        else: -            return f(self) -    return wrapper - -#======================================================================= -# unittest integration. - -class BasicTestRunner: -    def run(self, test): -        result = unittest.TestResult() -        test(result) -        return result - -def _id(obj): -    return obj - -def requires_resource(resource): -    if resource == 'gui' and not _is_gui_available(): -        return unittest.skip("resource 'gui' is not available") -    if is_resource_enabled(resource): -        return _id -    else: -        return unittest.skip("resource {0!r} is not enabled".format(resource)) - -def cpython_only(test): -    """ -    Decorator for tests only applicable on CPython. -    """ -    return impl_detail(cpython=True)(test) - -def impl_detail(msg=None, **guards): -    if check_impl_detail(**guards): -        return _id -    if msg is None: -        guardnames, default = _parse_guards(guards) -        if default: -            msg = "implementation detail not available on {0}" -        else: -            msg = "implementation detail specific to {0}" -        guardnames = sorted(guardnames.keys()) -        msg = msg.format(' or '.join(guardnames)) -    return unittest.skip(msg) - -def _parse_guards(guards): -    # Returns a tuple ({platform_name: run_me}, default_value) -    if not guards: -        return ({'cpython': True}, False) -    is_true = list(guards.values())[0] -    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False -    return (guards, not is_true) - -# Use the following check to guard CPython's implementation-specific tests -- -# or to run them only on the implementation(s) guarded by the arguments. -def check_impl_detail(**guards): -    """This function returns True or False depending on the host platform. -       Examples: -          if check_impl_detail():               # only on CPython (default) -          if check_impl_detail(jython=True):    # only on Jython -          if check_impl_detail(cpython=False):  # everywhere except on CPython -    """ -    guards, default = _parse_guards(guards) -    return guards.get(platform.python_implementation().lower(), default) - - -def no_tracing(func): -    """Decorator to temporarily turn off tracing for the duration of a test.""" -    if not hasattr(sys, 'gettrace'): -        return func -    else: -        @functools.wraps(func) -        def wrapper(*args, **kwargs): -            original_trace = sys.gettrace() -            try: -                sys.settrace(None) -                return func(*args, **kwargs) -            finally: -                sys.settrace(original_trace) -        return wrapper - - -def refcount_test(test): -    """Decorator for tests which involve reference counting. - -    To start, the decorator does not run the test if is not run by CPython. -    After that, any trace function is unset during the test to prevent -    unexpected refcounts caused by the trace function. - -    """ -    return no_tracing(cpython_only(test)) - - -def _filter_suite(suite, pred): -    """Recursively filter test cases in a suite based on a predicate.""" -    newtests = [] -    for test in suite._tests: -        if isinstance(test, unittest.TestSuite): -            _filter_suite(test, pred) -            newtests.append(test) -        else: -            if pred(test): -                newtests.append(test) -    suite._tests = newtests - -def _run_suite(suite): -    """Run tests from a unittest.TestSuite-derived class.""" -    if verbose: -        runner = unittest.TextTestRunner(sys.stdout, verbosity=2, -                                         failfast=failfast) -    else: -        runner = BasicTestRunner() - -    result = runner.run(suite) -    if not result.wasSuccessful(): -        if len(result.errors) == 1 and not result.failures: -            err = result.errors[0][1] -        elif len(result.failures) == 1 and not result.errors: -            err = result.failures[0][1] -        else: -            err = "multiple errors occurred" -            if not verbose: err += "; run in verbose mode for details" -        raise TestFailed(err) - - -def run_unittest(*classes): -    """Run tests from unittest.TestCase-derived classes.""" -    valid_types = (unittest.TestSuite, unittest.TestCase) -    suite = unittest.TestSuite() -    for cls in classes: -        if isinstance(cls, str): -            if cls in sys.modules: -                suite.addTest(unittest.findTestCases(sys.modules[cls])) -            else: -                raise ValueError("str arguments must be keys in sys.modules") -        elif isinstance(cls, valid_types): -            suite.addTest(cls) -        else: -            suite.addTest(unittest.makeSuite(cls)) -    def case_pred(test): -        if match_tests is None: -            return True -        for name in test.id().split("."): -            if fnmatch.fnmatchcase(name, match_tests): -                return True -        return False -    _filter_suite(suite, case_pred) -    _run_suite(suite) - - -#======================================================================= -# doctest driver. - -def run_doctest(module, verbosity=None): -    """Run doctest on the given module.  Return (#failures, #tests). - -    If optional argument verbosity is not specified (or is None), pass -    support's belief about verbosity on to doctest.  Else doctest's -    usual behavior is used (it searches sys.argv for -v). -    """ - -    import doctest - -    if verbosity is None: -        verbosity = verbose -    else: -        verbosity = None - -    f, t = doctest.testmod(module, verbose=verbosity) -    if f: -        raise TestFailed("%d of %d doctests failed" % (f, t)) -    if verbose: -        print('doctest (%s) ... %d tests with zero failures' % -              (module.__name__, t)) -    return f, t - - -#======================================================================= -# Support for saving and restoring the imported modules. - -def modules_setup(): -    return sys.modules.copy(), - -def modules_cleanup(oldmodules): -    # Encoders/decoders are registered permanently within the internal -    # codec cache. If we destroy the corresponding modules their -    # globals will be set to None which will trip up the cached functions. -    encodings = [(k, v) for k, v in sys.modules.items() -                 if k.startswith('encodings.')] -    sys.modules.clear() -    sys.modules.update(encodings) -    # XXX: This kind of problem can affect more than just encodings. In particular -    # extension modules (such as _ssl) don't cope with reloading properly. -    # Really, test modules should be cleaning out the test specific modules they -    # know they added (ala test_runpy) rather than relying on this function (as -    # test_importhooks and test_pkg do currently). -    # Implicitly imported *real* modules should be left alone (see issue 10556). -    sys.modules.update(oldmodules) - -#======================================================================= -# Threading support to prevent reporting refleaks when running regrtest.py -R - -# NOTE: we use thread._count() rather than threading.enumerate() (or the -# moral equivalent thereof) because a threading.Thread object is still alive -# until its __bootstrap() method has returned, even after it has been -# unregistered from the threading module. -# thread._count(), on the other hand, only gets decremented *after* the -# __bootstrap() method has returned, which gives us reliable reference counts -# at the end of a test run. - -def threading_setup(): -    if _thread: -        return _thread._count(), threading._dangling.copy() -    else: -        return 1, () - -def threading_cleanup(*original_values): -    if not _thread: -        return -    _MAX_COUNT = 10 -    for count in range(_MAX_COUNT): -        values = _thread._count(), threading._dangling -        if values == original_values: -            break -        time.sleep(0.1) -        gc_collect() -    # XXX print a warning in case of failure? - -def reap_threads(func): -    """Use this function when threads are being used.  This will -    ensure that the threads are cleaned up even when the test fails. -    If threading is unavailable this function does nothing. -    """ -    if not _thread: -        return func - -    @functools.wraps(func) -    def decorator(*args): -        key = threading_setup() -        try: -            return func(*args) -        finally: -            threading_cleanup(*key) -    return decorator - -def reap_children(): -    """Use this function at the end of test_main() whenever sub-processes -    are started.  This will help ensure that no extra children (zombies) -    stick around to hog resources and create problems when looking -    for refleaks. -    """ - -    # Reap all our dead child processes so we don't leave zombies around. -    # These hog resources and might be causing some of the buildbots to die. -    if hasattr(os, 'waitpid'): -        any_process = -1 -        while True: -            try: -                # This will raise an exception on Windows.  That's ok. -                pid, status = os.waitpid(any_process, os.WNOHANG) -                if pid == 0: -                    break -            except: -                break - -@contextlib.contextmanager -def swap_attr(obj, attr, new_val): -    """Temporary swap out an attribute with a new object. - -    Usage: -        with swap_attr(obj, "attr", 5): -            ... - -        This will set obj.attr to 5 for the duration of the with: block, -        restoring the old value at the end of the block. If `attr` doesn't -        exist on `obj`, it will be created and then deleted at the end of the -        block. -    """ -    if hasattr(obj, attr): -        real_val = getattr(obj, attr) -        setattr(obj, attr, new_val) -        try: -            yield -        finally: -            setattr(obj, attr, real_val) -    else: -        setattr(obj, attr, new_val) -        try: -            yield -        finally: -            delattr(obj, attr) - -@contextlib.contextmanager -def swap_item(obj, item, new_val): -    """Temporary swap out an item with a new object. - -    Usage: -        with swap_item(obj, "item", 5): -            ... - -        This will set obj["item"] to 5 for the duration of the with: block, -        restoring the old value at the end of the block. If `item` doesn't -        exist on `obj`, it will be created and then deleted at the end of the -        block. -    """ -    if item in obj: -        real_val = obj[item] -        obj[item] = new_val -        try: -            yield -        finally: -            obj[item] = real_val -    else: -        obj[item] = new_val -        try: -            yield -        finally: -            del obj[item] - -def strip_python_stderr(stderr): -    """Strip the stderr of a Python process from potential debug output -    emitted by the interpreter. - -    This will typically be run on the result of the communicate() method -    of a subprocess.Popen object. -    """ -    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() -    return stderr - -def args_from_interpreter_flags(): -    """Return a list of command-line arguments reproducing the current -    settings in sys.flags and sys.warnoptions.""" -    flag_opt_map = { -        'bytes_warning': 'b', -        'dont_write_bytecode': 'B', -        'ignore_environment': 'E', -        'no_user_site': 's', -        'no_site': 'S', -        'optimize': 'O', -        'verbose': 'v', -    } -    args = [] -    for flag, opt in flag_opt_map.items(): -        v = getattr(sys.flags, flag) -        if v > 0: -            args.append('-' + opt * v) -    for opt in sys.warnoptions: -        args.append('-W' + opt) -    return args - -#============================================================ -# Support for assertions about logging. -#============================================================ - -class TestHandler(logging.handlers.BufferingHandler): -    def __init__(self, matcher): -        # BufferingHandler takes a "capacity" argument -        # so as to know when to flush. As we're overriding -        # shouldFlush anyway, we can set a capacity of zero. -        # You can call flush() manually to clear out the -        # buffer. -        logging.handlers.BufferingHandler.__init__(self, 0) -        self.matcher = matcher - -    def shouldFlush(self): -        return False - -    def emit(self, record): -        self.format(record) -        self.buffer.append(record.__dict__) - -    def matches(self, **kwargs): -        """ -        Look for a saved dict whose keys/values match the supplied arguments. -        """ -        result = False -        for d in self.buffer: -            if self.matcher.matches(d, **kwargs): -                result = True -                break -        return result - -class Matcher(object): - -    _partial_matches = ('msg', 'message') - -    def matches(self, d, **kwargs): -        """ -        Try to match a single dict with the supplied arguments. - -        Keys whose values are strings and which are in self._partial_matches -        will be checked for partial (i.e. substring) matches. You can extend -        this scheme to (for example) do regular expression matching, etc. -        """ -        result = True -        for k in kwargs: -            v = kwargs[k] -            dv = d.get(k) -            if not self.match_value(k, dv, v): -                result = False -                break -        return result - -    def match_value(self, k, dv, v): -        """ -        Try to match a single stored value (dv) with a supplied value (v). -        """ -        if type(v) != type(dv): -            result = False -        elif type(dv) is not str or k not in self._partial_matches: -            result = (v == dv) -        else: -            result = dv.find(v) >= 0 -        return result - - -_can_symlink = None -def can_symlink(): -    global _can_symlink -    if _can_symlink is not None: -        return _can_symlink -    symlink_path = TESTFN + "can_symlink" -    try: -        os.symlink(TESTFN, symlink_path) -        can = True -    except (OSError, NotImplementedError, AttributeError): -        can = False -    else: -        os.remove(symlink_path) -    _can_symlink = can -    return can - -def skip_unless_symlink(test): -    """Skip decorator for tests that require functional symlink""" -    ok = can_symlink() -    msg = "Requires functional symlink implementation" -    return test if ok else unittest.skip(msg)(test) - -def patch(test_instance, object_to_patch, attr_name, new_value): -    """Override 'object_to_patch'.'attr_name' with 'new_value'. - -    Also, add a cleanup procedure to 'test_instance' to restore -    'object_to_patch' value for 'attr_name'. -    The 'attr_name' should be a valid attribute for 'object_to_patch'. - -    """ -    # check that 'attr_name' is a real attribute for 'object_to_patch' -    # will raise AttributeError if it does not exist -    getattr(object_to_patch, attr_name) - -    # keep a copy of the old value -    attr_is_local = False -    try: -        old_value = object_to_patch.__dict__[attr_name] -    except (AttributeError, KeyError): -        old_value = getattr(object_to_patch, attr_name, None) -    else: -        attr_is_local = True - -    # restore the value when the test is done -    def cleanup(): -        if attr_is_local: -            setattr(object_to_patch, attr_name, old_value) -        else: -            delattr(object_to_patch, attr_name) - -    test_instance.addCleanup(cleanup) - -    # actually override the attribute -    setattr(object_to_patch, attr_name, new_value) | 
