1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('test.support must be imported from the test package')
5
6import contextlib
7import errno
8import fnmatch
9import functools
10import gc
11import socket
12import stat
13import sys
14import os
15import platform
16import shutil
17import warnings
18import unittest
19import importlib
20import UserDict
21import re
22import time
23import struct
24import sysconfig
25import types
26
27try:
28    import thread
29except ImportError:
30    thread = None
31
32__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
33           "verbose", "use_resources", "max_memuse", "record_original_stdout",
34           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
35           "is_resource_enabled", "requires", "requires_mac_ver",
36           "find_unused_port", "bind_port",
37           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
38           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
39           "open_urlresource", "check_warnings", "check_py3k_warnings",
40           "CleanImport", "EnvironmentVarGuard", "captured_output",
41           "captured_stdout", "TransientResource", "transient_internet",
42           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
43           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
44           "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
45           "check_impl_detail", "get_attribute", "py3k_bytes",
46           "import_fresh_module", "threading_cleanup", "reap_children",
47           "strip_python_stderr", "IPV6_ENABLED", "run_with_tz",
48           "SuppressCrashReport"]
49
50class Error(Exception):
51    """Base class for regression test exceptions."""
52
53class TestFailed(Error):
54    """Test failed."""
55
56class ResourceDenied(unittest.SkipTest):
57    """Test skipped because it requested a disallowed resource.
58
59    This is raised when a test calls requires() for a resource that
60    has not been enabled.  It is used to distinguish between expected
61    and unexpected skips.
62    """
63
64@contextlib.contextmanager
65def _ignore_deprecated_imports(ignore=True):
66    """Context manager to suppress package and module deprecation
67    warnings when importing them.
68
69    If ignore is False, this context manager has no effect."""
70    if ignore:
71        with warnings.catch_warnings():
72            warnings.filterwarnings("ignore", ".+ (module|package)",
73                                    DeprecationWarning)
74            yield
75    else:
76        yield
77
78
79def import_module(name, deprecated=False):
80    """Import and return the module to be tested, raising SkipTest if
81    it is not available.
82
83    If deprecated is True, any module or package deprecation messages
84    will be suppressed."""
85    with _ignore_deprecated_imports(deprecated):
86        try:
87            return importlib.import_module(name)
88        except ImportError, msg:
89            raise unittest.SkipTest(str(msg))
90
91
92def _save_and_remove_module(name, orig_modules):
93    """Helper function to save and remove a module from sys.modules
94
95       Raise ImportError if the module can't be imported."""
96    # try to import the module and raise an error if it can't be imported
97    if name not in sys.modules:
98        __import__(name)
99        del sys.modules[name]
100    for modname in list(sys.modules):
101        if modname == name or modname.startswith(name + '.'):
102            orig_modules[modname] = sys.modules[modname]
103            del sys.modules[modname]
104
105def _save_and_block_module(name, orig_modules):
106    """Helper function to save and block a module in sys.modules
107
108       Return True if the module was in sys.modules, False otherwise."""
109    saved = True
110    try:
111        orig_modules[name] = sys.modules[name]
112    except KeyError:
113        saved = False
114    sys.modules[name] = None
115    return saved
116
117
118def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
119    """Imports and returns a module, deliberately bypassing the sys.modules cache
120    and importing a fresh copy of the module. Once the import is complete,
121    the sys.modules cache is restored to its original state.
122
123    Modules named in fresh are also imported anew if needed by the import.
124    If one of these modules can't be imported, None is returned.
125
126    Importing of modules named in blocked is prevented while the fresh import
127    takes place.
128
129    If deprecated is True, any module or package deprecation messages
130    will be suppressed."""
131    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
132    # checks to make sure that this utility function is working as expected
133    with _ignore_deprecated_imports(deprecated):
134        # Keep track of modules saved for later restoration as well
135        # as those which just need a blocking entry removed
136        orig_modules = {}
137        names_to_remove = []
138        _save_and_remove_module(name, orig_modules)
139        try:
140            for fresh_name in fresh:
141                _save_and_remove_module(fresh_name, orig_modules)
142            for blocked_name in blocked:
143                if not _save_and_block_module(blocked_name, orig_modules):
144                    names_to_remove.append(blocked_name)
145            fresh_module = importlib.import_module(name)
146        except ImportError:
147            fresh_module = None
148        finally:
149            for orig_name, module in orig_modules.items():
150                sys.modules[orig_name] = module
151            for name_to_remove in names_to_remove:
152                del sys.modules[name_to_remove]
153        return fresh_module
154
155
156def get_attribute(obj, name):
157    """Get an attribute, raising SkipTest if AttributeError is raised."""
158    try:
159        attribute = getattr(obj, name)
160    except AttributeError:
161        if isinstance(obj, types.ModuleType):
162            msg = "module %r has no attribute %r" % (obj.__name__, name)
163        elif isinstance(obj, types.ClassType):
164            msg = "class %s has no attribute %r" % (obj.__name__, name)
165        elif isinstance(obj, types.InstanceType):
166            msg = "%s instance has no attribute %r" % (obj.__class__.__name__, name)
167        elif isinstance(obj, type):
168            msg = "type object %r has no attribute %r" % (obj.__name__, name)
169        else:
170            msg = "%r object has no attribute %r" % (type(obj).__name__, name)
171        raise unittest.SkipTest(msg)
172    else:
173        return attribute
174
175
176verbose = 1              # Flag set to 0 by regrtest.py
177use_resources = None     # Flag set to [] by regrtest.py
178max_memuse = 0           # Disable bigmem tests (they will still be run with
179                         # small sizes, to make sure they work.)
180real_max_memuse = 0
181failfast = False
182
183# _original_stdout is meant to hold stdout at the time regrtest began.
184# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
185# The point is to have some flavor of stdout the user can actually see.
186_original_stdout = None
187def record_original_stdout(stdout):
188    global _original_stdout
189    _original_stdout = stdout
190
191def get_original_stdout():
192    return _original_stdout or sys.stdout
193
194def unload(name):
195    try:
196        del sys.modules[name]
197    except KeyError:
198        pass
199
200def _force_run(path, func, *args):
201    try:
202        return func(*args)
203    except EnvironmentError as err:
204        if verbose >= 2:
205            print('%s: %s' % (err.__class__.__name__, err))
206            print('re-run %s%r' % (func.__name__, args))
207        os.chmod(path, stat.S_IRWXU)
208        return func(*args)
209
210if sys.platform.startswith("win"):
211    def _waitfor(func, pathname, waitall=False):
212        # Perform the operation
213        func(pathname)
214        # Now setup the wait loop
215        if waitall:
216            dirname = pathname
217        else:
218            dirname, name = os.path.split(pathname)
219            dirname = dirname or '.'
220        # Check for `pathname` to be removed from the filesystem.
221        # The exponential backoff of the timeout amounts to a total
222        # of ~1 second after which the deletion is probably an error
223        # anyway.
224        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
225        # required when contention occurs.
226        timeout = 0.001
227        while timeout < 1.0:
228            # Note we are only testing for the existence of the file(s) in
229            # the contents of the directory regardless of any security or
230            # access rights.  If we have made it this far, we have sufficient
231            # permissions to do that much using Python's equivalent of the
232            # Windows API FindFirstFile.
233            # Other Windows APIs can fail or give incorrect results when
234            # dealing with files that are pending deletion.
235            L = os.listdir(dirname)
236            if not (L if waitall else name in L):
237                return
238            # Increase the timeout and try again
239            time.sleep(timeout)
240            timeout *= 2
241        warnings.warn('tests may fail, delete still pending for ' + pathname,
242                      RuntimeWarning, stacklevel=4)
243
244    def _unlink(filename):
245        _waitfor(os.unlink, filename)
246
247    def _rmdir(dirname):
248        _waitfor(os.rmdir, dirname)
249
250    def _rmtree(path):
251        def _rmtree_inner(path):
252            for name in _force_run(path, os.listdir, path):
253                fullname = os.path.join(path, name)
254                if os.path.isdir(fullname):
255                    _waitfor(_rmtree_inner, fullname, waitall=True)
256                    _force_run(fullname, os.rmdir, fullname)
257                else:
258                    _force_run(fullname, os.unlink, fullname)
259        _waitfor(_rmtree_inner, path, waitall=True)
260        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
261else:
262    _unlink = os.unlink
263    _rmdir = os.rmdir
264
265    def _rmtree(path):
266        try:
267            shutil.rmtree(path)
268            return
269        except EnvironmentError:
270            pass
271
272        def _rmtree_inner(path):
273            for name in _force_run(path, os.listdir, path):
274                fullname = os.path.join(path, name)
275                try:
276                    mode = os.lstat(fullname).st_mode
277                except EnvironmentError:
278                    mode = 0
279                if stat.S_ISDIR(mode):
280                    _rmtree_inner(fullname)
281                    _force_run(path, os.rmdir, fullname)
282                else:
283                    _force_run(path, os.unlink, fullname)
284        _rmtree_inner(path)
285        os.rmdir(path)
286
287def unlink(filename):
288    try:
289        _unlink(filename)
290    except OSError as exc:
291        if exc.errno not in (errno.ENOENT, errno.ENOTDIR):
292            raise
293
294def rmdir(dirname):
295    try:
296        _rmdir(dirname)
297    except OSError as error:
298        # The directory need not exist.
299        if error.errno != errno.ENOENT:
300            raise
301
302def rmtree(path):
303    try:
304        _rmtree(path)
305    except OSError, e:
306        # Unix returns ENOENT, Windows returns ESRCH.
307        if e.errno not in (errno.ENOENT, errno.ESRCH):
308            raise
309
310def forget(modname):
311    '''"Forget" a module was ever imported by removing it from sys.modules and
312    deleting any .pyc and .pyo files.'''
313    unload(modname)
314    for dirname in sys.path:
315        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
316        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
317        # the chance exists that there is no .pyc (and thus the 'try' statement
318        # is exited) but there is a .pyo file.
319        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
320
321# Check whether a gui is actually available
322def _is_gui_available():
323    if hasattr(_is_gui_available, 'result'):
324        return _is_gui_available.result
325    reason = None
326    if sys.platform.startswith('win'):
327        # if Python is running as a service (such as the buildbot service),
328        # gui interaction may be disallowed
329        import ctypes
330        import ctypes.wintypes
331        UOI_FLAGS = 1
332        WSF_VISIBLE = 0x0001
333        class USEROBJECTFLAGS(ctypes.Structure):
334            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
335                        ("fReserved", ctypes.wintypes.BOOL),
336                        ("dwFlags", ctypes.wintypes.DWORD)]
337        dll = ctypes.windll.user32
338        h = dll.GetProcessWindowStation()
339        if not h:
340            raise ctypes.WinError()
341        uof = USEROBJECTFLAGS()
342        needed = ctypes.wintypes.DWORD()
343        res = dll.GetUserObjectInformationW(h,
344            UOI_FLAGS,
345            ctypes.byref(uof),
346            ctypes.sizeof(uof),
347            ctypes.byref(needed))
348        if not res:
349            raise ctypes.WinError()
350        if not bool(uof.dwFlags & WSF_VISIBLE):
351            reason = "gui not available (WSF_VISIBLE flag not set)"
352    elif sys.platform == 'darwin':
353        # The Aqua Tk implementations on OS X can abort the process if
354        # being called in an environment where a window server connection
355        # cannot be made, for instance when invoked by a buildbot or ssh
356        # process not running under the same user id as the current console
357        # user.  To avoid that, raise an exception if the window manager
358        # connection is not available.
359        from ctypes import cdll, c_int, pointer, Structure
360        from ctypes.util import find_library
361
362        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
363
364        if app_services.CGMainDisplayID() == 0:
365            reason = "gui tests cannot run without OS X window manager"
366        else:
367            class ProcessSerialNumber(Structure):
368                _fields_ = [("highLongOfPSN", c_int),
369                            ("lowLongOfPSN", c_int)]
370            psn = ProcessSerialNumber()
371            psn_p = pointer(psn)
372            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
373                  (app_services.SetFrontProcess(psn_p) < 0) ):
374                reason = "cannot run without OS X gui process"
375
376    # check on every platform whether tkinter can actually do anything
377    if not reason:
378        try:
379            from Tkinter import Tk
380            root = Tk()
381            root.withdraw()
382            root.update()
383            root.destroy()
384        except Exception as e:
385            err_string = str(e)
386            if len(err_string) > 50:
387                err_string = err_string[:50] + ' [...]'
388            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
389                                                           err_string)
390
391    _is_gui_available.reason = reason
392    _is_gui_available.result = not reason
393
394    return _is_gui_available.result
395
396def is_resource_enabled(resource):
397    """Test whether a resource is enabled.
398
399    Known resources are set by regrtest.py.  If not running under regrtest.py,
400    all resources are assumed enabled unless use_resources has been set.
401    """
402    return use_resources is None or resource in use_resources
403
404def requires(resource, msg=None):
405    """Raise ResourceDenied if the specified resource is not available."""
406    if not is_resource_enabled(resource):
407        if msg is None:
408            msg = "Use of the `%s' resource not enabled" % resource
409        raise ResourceDenied(msg)
410    if resource == 'gui' and not _is_gui_available():
411        raise ResourceDenied(_is_gui_available.reason)
412
413def requires_mac_ver(*min_version):
414    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
415    version if less than min_version.
416
417    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
418    is lesser than 10.5.
419    """
420    def decorator(func):
421        @functools.wraps(func)
422        def wrapper(*args, **kw):
423            if sys.platform == 'darwin':
424                version_txt = platform.mac_ver()[0]
425                try:
426                    version = tuple(map(int, version_txt.split('.')))
427                except ValueError:
428                    pass
429                else:
430                    if version < min_version:
431                        min_version_txt = '.'.join(map(str, min_version))
432                        raise unittest.SkipTest(
433                            "Mac OS X %s or higher required, not %s"
434                            % (min_version_txt, version_txt))
435            return func(*args, **kw)
436        wrapper.min_version = min_version
437        return wrapper
438    return decorator
439
440
441# Don't use "localhost", since resolving it uses the DNS under recent
442# Windows versions (see issue #18792).
443HOST = "127.0.0.1"
444HOSTv6 = "::1"
445
446
447def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
448    """Returns an unused port that should be suitable for binding.  This is
449    achieved by creating a temporary socket with the same family and type as
450    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
451    the specified host address (defaults to 0.0.0.0) with the port set to 0,
452    eliciting an unused ephemeral port from the OS.  The temporary socket is
453    then closed and deleted, and the ephemeral port is returned.
454
455    Either this method or bind_port() should be used for any tests where a
456    server socket needs to be bound to a particular port for the duration of
457    the test.  Which one to use depends on whether the calling code is creating
458    a python socket, or if an unused port needs to be provided in a constructor
459    or passed to an external program (i.e. the -accept argument to openssl's
460    s_server mode).  Always prefer bind_port() over find_unused_port() where
461    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
462    socket is bound to a hard coded port, the ability to run multiple instances
463    of the test simultaneously on the same host is compromised, which makes the
464    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
465    may simply manifest as a failed test, which can be recovered from without
466    intervention in most cases, but on Windows, the entire python process can
467    completely and utterly wedge, requiring someone to log in to the buildbot
468    and manually kill the affected process.
469
470    (This is easy to reproduce on Windows, unfortunately, and can be traced to
471    the SO_REUSEADDR socket option having different semantics on Windows versus
472    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
473    listen and then accept connections on identical host/ports.  An EADDRINUSE
474    socket.error will be raised at some point (depending on the platform and
475    the order bind and listen were called on each socket).
476
477    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
478    will ever be raised when attempting to bind two identical host/ports. When
479    accept() is called on each socket, the second caller's process will steal
480    the port from the first caller, leaving them both in an awkwardly wedged
481    state where they'll no longer respond to any signals or graceful kills, and
482    must be forcibly killed via OpenProcess()/TerminateProcess().
483
484    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
485    instead of SO_REUSEADDR, which effectively affords the same semantics as
486    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
487    Source world compared to Windows ones, this is a common mistake.  A quick
488    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
489    openssl.exe is called with the 's_server' option, for example. See
490    http://bugs.python.org/issue2550 for more info.  The following site also
491    has a very thorough description about the implications of both REUSEADDR
492    and EXCLUSIVEADDRUSE on Windows:
493    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
494
495    XXX: although this approach is a vast improvement on previous attempts to
496    elicit unused ports, it rests heavily on the assumption that the ephemeral
497    port returned to us by the OS won't immediately be dished back out to some
498    other process when we close and delete our temporary socket but before our
499    calling code has a chance to bind the returned port.  We can deal with this
500    issue if/when we come across it."""
501    tempsock = socket.socket(family, socktype)
502    port = bind_port(tempsock)
503    tempsock.close()
504    del tempsock
505    return port
506
507def bind_port(sock, host=HOST):
508    """Bind the socket to a free port and return the port number.  Relies on
509    ephemeral ports in order to ensure we are using an unbound port.  This is
510    important as many tests may be running simultaneously, especially in a
511    buildbot environment.  This method raises an exception if the sock.family
512    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
513    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
514    for TCP/IP sockets.  The only case for setting these options is testing
515    multicasting via multiple UDP sockets.
516
517    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
518    on Windows), it will be set on the socket.  This will prevent anyone else
519    from bind()'ing to our host/port for the duration of the test.
520    """
521    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
522        if hasattr(socket, 'SO_REUSEADDR'):
523            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
524                raise TestFailed("tests should never set the SO_REUSEADDR "   \
525                                 "socket option on TCP/IP sockets!")
526        if hasattr(socket, 'SO_REUSEPORT'):
527            try:
528                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
529                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
530                                     "socket option on TCP/IP sockets!")
531            except EnvironmentError:
532                # Python's socket module was compiled using modern headers
533                # thus defining SO_REUSEPORT but this process is running
534                # under an older kernel that does not support SO_REUSEPORT.
535                pass
536        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
537            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
538
539    sock.bind((host, 0))
540    port = sock.getsockname()[1]
541    return port
542
543def _is_ipv6_enabled():
544    """Check whether IPv6 is enabled on this host."""
545    if socket.has_ipv6:
546        sock = None
547        try:
548            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
549            sock.bind((HOSTv6, 0))
550            return True
551        except socket.error:
552            pass
553        finally:
554            if sock:
555                sock.close()
556    return False
557
558IPV6_ENABLED = _is_ipv6_enabled()
559
560def system_must_validate_cert(f):
561    """Skip the test on TLS certificate validation failures."""
562    @functools.wraps(f)
563    def dec(*args, **kwargs):
564        try:
565            f(*args, **kwargs)
566        except IOError as e:
567            if "CERTIFICATE_VERIFY_FAILED" in str(e):
568                raise unittest.SkipTest("system does not contain "
569                                        "necessary certificates")
570            raise
571    return dec
572
573FUZZ = 1e-6
574
575def fcmp(x, y): # fuzzy comparison function
576    if isinstance(x, float) or isinstance(y, float):
577        try:
578            fuzz = (abs(x) + abs(y)) * FUZZ
579            if abs(x-y) <= fuzz:
580                return 0
581        except:
582            pass
583    elif type(x) == type(y) and isinstance(x, (tuple, list)):
584        for i in range(min(len(x), len(y))):
585            outcome = fcmp(x[i], y[i])
586            if outcome != 0:
587                return outcome
588        return (len(x) > len(y)) - (len(x) < len(y))
589    return (x > y) - (x < y)
590
591
592# A constant likely larger than the underlying OS pipe buffer size, to
593# make writes blocking.
594# Windows limit seems to be around 512 B, and many Unix kernels have a
595# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
596# (see issue #17835 for a discussion of this number).
597PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
598
599# A constant likely larger than the underlying OS socket buffer size, to make
600# writes blocking.
601# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
602# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
603# for a discussion of this number).
604SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
605
606is_jython = sys.platform.startswith('java')
607
608try:
609    unicode
610    have_unicode = True
611except NameError:
612    have_unicode = False
613
614requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
615
616def u(s):
617    return unicode(s, 'unicode-escape')
618
619# FS_NONASCII: non-ASCII Unicode character encodable by
620# sys.getfilesystemencoding(), or None if there is no such character.
621FS_NONASCII = None
622if have_unicode:
623    for character in (
624        # First try printable and common characters to have a readable filename.
625        # For each character, the encoding list are just example of encodings able
626        # to encode the character (the list is not exhaustive).
627
628        # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
629        unichr(0x00E6),
630        # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
631        unichr(0x0130),
632        # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
633        unichr(0x0141),
634        # U+03C6 (Greek Small Letter Phi): cp1253
635        unichr(0x03C6),
636        # U+041A (Cyrillic Capital Letter Ka): cp1251
637        unichr(0x041A),
638        # U+05D0 (Hebrew Letter Alef): Encodable to cp424
639        unichr(0x05D0),
640        # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
641        unichr(0x060C),
642        # U+062A (Arabic Letter Teh): cp720
643        unichr(0x062A),
644        # U+0E01 (Thai Character Ko Kai): cp874
645        unichr(0x0E01),
646
647        # Then try more "special" characters. "special" because they may be
648        # interpreted or displayed differently depending on the exact locale
649        # encoding and the font.
650
651        # U+00A0 (No-Break Space)
652        unichr(0x00A0),
653        # U+20AC (Euro Sign)
654        unichr(0x20AC),
655    ):
656        try:
657            character.encode(sys.getfilesystemencoding())\
658                     .decode(sys.getfilesystemencoding())
659        except UnicodeError:
660            pass
661        else:
662            FS_NONASCII = character
663            break
664
665# Filename used for testing
666if os.name == 'java':
667    # Jython disallows @ in module names
668    TESTFN = '$test'
669elif os.name == 'riscos':
670    TESTFN = 'testfile'
671else:
672    TESTFN = '@test'
673    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
674    if have_unicode:
675        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
676        # TESTFN_UNICODE is a filename that can be encoded using the
677        # file system encoding, but *not* with the default (ascii) encoding
678        if isinstance('', unicode):
679            # python -U
680            # XXX perhaps unicode() should accept Unicode strings?
681            TESTFN_UNICODE = "@test-\xe0\xf2"
682        else:
683            # 2 latin characters.
684            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
685        TESTFN_ENCODING = sys.getfilesystemencoding()
686        # TESTFN_UNENCODABLE is a filename that should *not* be
687        # able to be encoded by *either* the default or filesystem encoding.
688        # This test really only makes sense on Windows NT platforms
689        # which have special Unicode support in posixmodule.
690        if (not hasattr(sys, "getwindowsversion") or
691                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
692            TESTFN_UNENCODABLE = None
693        else:
694            # Japanese characters (I think - from bug 846133)
695            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
696            try:
697                # XXX - Note - should be using TESTFN_ENCODING here - but for
698                # Windows, "mbcs" currently always operates as if in
699                # errors=ignore' mode - hence we get '?' characters rather than
700                # the exception.  'Latin1' operates as we expect - ie, fails.
701                # See [ 850997 ] mbcs encoding ignores errors
702                TESTFN_UNENCODABLE.encode("Latin1")
703            except UnicodeEncodeError:
704                pass
705            else:
706                print \
707                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
708                'Unicode filename tests may not be effective' \
709                % TESTFN_UNENCODABLE
710
711
712# Disambiguate TESTFN for parallel testing, while letting it remain a valid
713# module name.
714TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
715
716# Save the initial cwd
717SAVEDCWD = os.getcwd()
718
719@contextlib.contextmanager
720def temp_dir(path=None, quiet=False):
721    """Return a context manager that creates a temporary directory.
722
723    Arguments:
724
725      path: the directory to create temporarily.  If omitted or None,
726        defaults to creating a temporary directory using tempfile.mkdtemp.
727
728      quiet: if False (the default), the context manager raises an exception
729        on error.  Otherwise, if the path is specified and cannot be
730        created, only a warning is issued.
731
732    """
733    dir_created = False
734    if path is None:
735        import tempfile
736        path = tempfile.mkdtemp()
737        dir_created = True
738        path = os.path.realpath(path)
739    else:
740        if (have_unicode and isinstance(path, unicode) and
741            not os.path.supports_unicode_filenames):
742            try:
743                path = path.encode(sys.getfilesystemencoding() or 'ascii')
744            except UnicodeEncodeError:
745                if not quiet:
746                    raise unittest.SkipTest('unable to encode the cwd name with '
747                                            'the filesystem encoding.')
748        try:
749            os.mkdir(path)
750            dir_created = True
751        except OSError:
752            if not quiet:
753                raise
754            warnings.warn('tests may fail, unable to create temp dir: ' + path,
755                          RuntimeWarning, stacklevel=3)
756    if dir_created:
757        pid = os.getpid()
758    try:
759        yield path
760    finally:
761        # In case the process forks, let only the parent remove the
762        # directory. The child has a diffent process id. (bpo-30028)
763        if dir_created and pid == os.getpid():
764            rmtree(path)
765
766@contextlib.contextmanager
767def change_cwd(path, quiet=False):
768    """Return a context manager that changes the current working directory.
769
770    Arguments:
771
772      path: the directory to use as the temporary current working directory.
773
774      quiet: if False (the default), the context manager raises an exception
775        on error.  Otherwise, it issues only a warning and keeps the current
776        working directory the same.
777
778    """
779    saved_dir = os.getcwd()
780    try:
781        os.chdir(path)
782    except OSError:
783        if not quiet:
784            raise
785        warnings.warn('tests may fail, unable to change CWD to: ' + path,
786                      RuntimeWarning, stacklevel=3)
787    try:
788        yield os.getcwd()
789    finally:
790        os.chdir(saved_dir)
791
792
793@contextlib.contextmanager
794def temp_cwd(name='tempcwd', quiet=False):
795    """
796    Context manager that temporarily creates and changes the CWD.
797
798    The function temporarily changes the current working directory
799    after creating a temporary directory in the current directory with
800    name *name*.  If *name* is None, the temporary directory is
801    created using tempfile.mkdtemp.
802
803    If *quiet* is False (default) and it is not possible to
804    create or change the CWD, an error is raised.  If *quiet* is True,
805    only a warning is raised and the original CWD is used.
806
807    """
808    with temp_dir(path=name, quiet=quiet) as temp_path:
809        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
810            yield cwd_dir
811
812# TEST_HOME_DIR refers to the top level directory of the "test" package
813# that contains Python's regression test suite
814TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
815TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
816
817# TEST_DATA_DIR is used as a target download location for remote resources
818TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
819
820def findfile(file, subdir=None):
821    """Try to find a file on sys.path and the working directory.  If it is not
822    found the argument passed to the function is returned (this does not
823    necessarily signal failure; could still be the legitimate path)."""
824    if os.path.isabs(file):
825        return file
826    if subdir is not None:
827        file = os.path.join(subdir, file)
828    path = [TEST_HOME_DIR] + sys.path
829    for dn in path:
830        fn = os.path.join(dn, file)
831        if os.path.exists(fn): return fn
832    return file
833
834def sortdict(dict):
835    "Like repr(dict), but in sorted order."
836    items = dict.items()
837    items.sort()
838    reprpairs = ["%r: %r" % pair for pair in items]
839    withcommas = ", ".join(reprpairs)
840    return "{%s}" % withcommas
841
842def make_bad_fd():
843    """
844    Create an invalid file descriptor by opening and closing a file and return
845    its fd.
846    """
847    file = open(TESTFN, "wb")
848    try:
849        return file.fileno()
850    finally:
851        file.close()
852        unlink(TESTFN)
853
854def check_syntax_error(testcase, statement, errtext='', lineno=None, offset=None):
855    with testcase.assertRaisesRegexp(SyntaxError, errtext) as cm:
856        compile(statement, '<test string>', 'exec')
857    err = cm.exception
858    if lineno is not None:
859        testcase.assertEqual(err.lineno, lineno)
860    if offset is not None:
861        testcase.assertEqual(err.offset, offset)
862
863def open_urlresource(url, check=None):
864    import urlparse, urllib2
865
866    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
867
868    fn = os.path.join(TEST_DATA_DIR, filename)
869
870    def check_valid_file(fn):
871        f = open(fn)
872        if check is None:
873            return f
874        elif check(f):
875            f.seek(0)
876            return f
877        f.close()
878
879    if os.path.exists(fn):
880        f = check_valid_file(fn)
881        if f is not None:
882            return f
883        unlink(fn)
884
885    # Verify the requirement before downloading the file
886    requires('urlfetch')
887
888    print >> get_original_stdout(), '\tfetching %s ...' % url
889    f = urllib2.urlopen(url, timeout=15)
890    try:
891        with open(fn, "wb") as out:
892            s = f.read()
893            while s:
894                out.write(s)
895                s = f.read()
896    finally:
897        f.close()
898
899    f = check_valid_file(fn)
900    if f is not None:
901        return f
902    raise TestFailed('invalid resource "%s"' % fn)
903
904
905class WarningsRecorder(object):
906    """Convenience wrapper for the warnings list returned on
907       entry to the warnings.catch_warnings() context manager.
908    """
909    def __init__(self, warnings_list):
910        self._warnings = warnings_list
911        self._last = 0
912
913    def __getattr__(self, attr):
914        if len(self._warnings) > self._last:
915            return getattr(self._warnings[-1], attr)
916        elif attr in warnings.WarningMessage._WARNING_DETAILS:
917            return None
918        raise AttributeError("%r has no attribute %r" % (self, attr))
919
920    @property
921    def warnings(self):
922        return self._warnings[self._last:]
923
924    def reset(self):
925        self._last = len(self._warnings)
926
927
928def _filterwarnings(filters, quiet=False):
929    """Catch the warnings, then check if all the expected
930    warnings have been raised and re-raise unexpected warnings.
931    If 'quiet' is True, only re-raise the unexpected warnings.
932    """
933    # Clear the warning registry of the calling module
934    # in order to re-raise the warnings.
935    frame = sys._getframe(2)
936    registry = frame.f_globals.get('__warningregistry__')
937    if registry:
938        registry.clear()
939    with warnings.catch_warnings(record=True) as w:
940        # Set filter "always" to record all warnings.  Because
941        # test_warnings swap the module, we need to look up in
942        # the sys.modules dictionary.
943        sys.modules['warnings'].simplefilter("always")
944        yield WarningsRecorder(w)
945    # Filter the recorded warnings
946    reraise = [warning.message for warning in w]
947    missing = []
948    for msg, cat in filters:
949        seen = False
950        for exc in reraise[:]:
951            message = str(exc)
952            # Filter out the matching messages
953            if (re.match(msg, message, re.I) and
954                issubclass(exc.__class__, cat)):
955                seen = True
956                reraise.remove(exc)
957        if not seen and not quiet:
958            # This filter caught nothing
959            missing.append((msg, cat.__name__))
960    if reraise:
961        raise AssertionError("unhandled warning %r" % reraise[0])
962    if missing:
963        raise AssertionError("filter (%r, %s) did not catch any warning" %
964                             missing[0])
965
966
967@contextlib.contextmanager
968def check_warnings(*filters, **kwargs):
969    """Context manager to silence warnings.
970
971    Accept 2-tuples as positional arguments:
972        ("message regexp", WarningCategory)
973
974    Optional argument:
975     - if 'quiet' is True, it does not fail if a filter catches nothing
976        (default True without argument,
977         default False if some filters are defined)
978
979    Without argument, it defaults to:
980        check_warnings(("", Warning), quiet=True)
981    """
982    quiet = kwargs.get('quiet')
983    if not filters:
984        filters = (("", Warning),)
985        # Preserve backward compatibility
986        if quiet is None:
987            quiet = True
988    return _filterwarnings(filters, quiet)
989
990
991@contextlib.contextmanager
992def check_py3k_warnings(*filters, **kwargs):
993    """Context manager to silence py3k warnings.
994
995    Accept 2-tuples as positional arguments:
996        ("message regexp", WarningCategory)
997
998    Optional argument:
999     - if 'quiet' is True, it does not fail if a filter catches nothing
1000        (default False)
1001
1002    Without argument, it defaults to:
1003        check_py3k_warnings(("", DeprecationWarning), quiet=False)
1004    """
1005    if sys.py3kwarning:
1006        if not filters:
1007            filters = (("", DeprecationWarning),)
1008    else:
1009        # It should not raise any py3k warning
1010        filters = ()
1011    return _filterwarnings(filters, kwargs.get('quiet'))
1012
1013
1014class CleanImport(object):
1015    """Context manager to force import to return a new module reference.
1016
1017    This is useful for testing module-level behaviours, such as
1018    the emission of a DeprecationWarning on import.
1019
1020    Use like this:
1021
1022        with CleanImport("foo"):
1023            importlib.import_module("foo") # new reference
1024    """
1025
1026    def __init__(self, *module_names):
1027        self.original_modules = sys.modules.copy()
1028        for module_name in module_names:
1029            if module_name in sys.modules:
1030                module = sys.modules[module_name]
1031                # It is possible that module_name is just an alias for
1032                # another module (e.g. stub for modules renamed in 3.x).
1033                # In that case, we also need delete the real module to clear
1034                # the import cache.
1035                if module.__name__ != module_name:
1036                    del sys.modules[module.__name__]
1037                del sys.modules[module_name]
1038
1039    def __enter__(self):
1040        return self
1041
1042    def __exit__(self, *ignore_exc):
1043        sys.modules.update(self.original_modules)
1044
1045
1046class EnvironmentVarGuard(UserDict.DictMixin):
1047
1048    """Class to help protect the environment variable properly.  Can be used as
1049    a context manager."""
1050
1051    def __init__(self):
1052        self._environ = os.environ
1053        self._changed = {}
1054
1055    def __getitem__(self, envvar):
1056        return self._environ[envvar]
1057
1058    def __setitem__(self, envvar, value):
1059        # Remember the initial value on the first access
1060        if envvar not in self._changed:
1061            self._changed[envvar] = self._environ.get(envvar)
1062        self._environ[envvar] = value
1063
1064    def __delitem__(self, envvar):
1065        # Remember the initial value on the first access
1066        if envvar not in self._changed:
1067            self._changed[envvar] = self._environ.get(envvar)
1068        if envvar in self._environ:
1069            del self._environ[envvar]
1070
1071    def keys(self):
1072        return self._environ.keys()
1073
1074    def set(self, envvar, value):
1075        self[envvar] = value
1076
1077    def unset(self, envvar):
1078        del self[envvar]
1079
1080    def __enter__(self):
1081        return self
1082
1083    def __exit__(self, *ignore_exc):
1084        for (k, v) in self._changed.items():
1085            if v is None:
1086                if k in self._environ:
1087                    del self._environ[k]
1088            else:
1089                self._environ[k] = v
1090        os.environ = self._environ
1091
1092
1093class DirsOnSysPath(object):
1094    """Context manager to temporarily add directories to sys.path.
1095
1096    This makes a copy of sys.path, appends any directories given
1097    as positional arguments, then reverts sys.path to the copied
1098    settings when the context ends.
1099
1100    Note that *all* sys.path modifications in the body of the
1101    context manager, including replacement of the object,
1102    will be reverted at the end of the block.
1103    """
1104
1105    def __init__(self, *paths):
1106        self.original_value = sys.path[:]
1107        self.original_object = sys.path
1108        sys.path.extend(paths)
1109
1110    def __enter__(self):
1111        return self
1112
1113    def __exit__(self, *ignore_exc):
1114        sys.path = self.original_object
1115        sys.path[:] = self.original_value
1116
1117
1118class TransientResource(object):
1119
1120    """Raise ResourceDenied if an exception is raised while the context manager
1121    is in effect that matches the specified exception and attributes."""
1122
1123    def __init__(self, exc, **kwargs):
1124        self.exc = exc
1125        self.attrs = kwargs
1126
1127    def __enter__(self):
1128        return self
1129
1130    def __exit__(self, type_=None, value=None, traceback=None):
1131        """If type_ is a subclass of self.exc and value has attributes matching
1132        self.attrs, raise ResourceDenied.  Otherwise let the exception
1133        propagate (if any)."""
1134        if type_ is not None and issubclass(self.exc, type_):
1135            for attr, attr_value in self.attrs.iteritems():
1136                if not hasattr(value, attr):
1137                    break
1138                if getattr(value, attr) != attr_value:
1139                    break
1140            else:
1141                raise ResourceDenied("an optional resource is not available")
1142
1143
1144@contextlib.contextmanager
1145def transient_internet(resource_name, timeout=30.0, errnos=()):
1146    """Return a context manager that raises ResourceDenied when various issues
1147    with the Internet connection manifest themselves as exceptions."""
1148    default_errnos = [
1149        ('ECONNREFUSED', 111),
1150        ('ECONNRESET', 104),
1151        ('EHOSTUNREACH', 113),
1152        ('ENETUNREACH', 101),
1153        ('ETIMEDOUT', 110),
1154    ]
1155    default_gai_errnos = [
1156        ('EAI_AGAIN', -3),
1157        ('EAI_FAIL', -4),
1158        ('EAI_NONAME', -2),
1159        ('EAI_NODATA', -5),
1160        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
1161        # implementation actually returns WSANO_DATA i.e. 11004.
1162        ('WSANO_DATA', 11004),
1163    ]
1164
1165    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
1166    captured_errnos = errnos
1167    gai_errnos = []
1168    if not captured_errnos:
1169        captured_errnos = [getattr(errno, name, num)
1170                           for (name, num) in default_errnos]
1171        gai_errnos = [getattr(socket, name, num)
1172                      for (name, num) in default_gai_errnos]
1173
1174    def filter_error(err):
1175        n = getattr(err, 'errno', None)
1176        if (isinstance(err, socket.timeout) or
1177            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1178            n in captured_errnos):
1179            if not verbose:
1180                sys.stderr.write(denied.args[0] + "\n")
1181            raise denied
1182
1183    old_timeout = socket.getdefaulttimeout()
1184    try:
1185        if timeout is not None:
1186            socket.setdefaulttimeout(timeout)
1187        yield
1188    except IOError as err:
1189        # urllib can wrap original socket errors multiple times (!), we must
1190        # unwrap to get at the original error.
1191        while True:
1192            a = err.args
1193            if len(a) >= 1 and isinstance(a[0], IOError):
1194                err = a[0]
1195            # The error can also be wrapped as args[1]:
1196            #    except socket.error as msg:
1197            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
1198            elif len(a) >= 2 and isinstance(a[1], IOError):
1199                err = a[1]
1200            else:
1201                break
1202        filter_error(err)
1203        raise
1204    # XXX should we catch generic exceptions and look for their
1205    # __cause__ or __context__?
1206    finally:
1207        socket.setdefaulttimeout(old_timeout)
1208
1209
1210@contextlib.contextmanager
1211def captured_output(stream_name):
1212    """Return a context manager used by captured_stdout and captured_stdin
1213    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1214    import StringIO
1215    orig_stdout = getattr(sys, stream_name)
1216    setattr(sys, stream_name, StringIO.StringIO())
1217    try:
1218        yield getattr(sys, stream_name)
1219    finally:
1220        setattr(sys, stream_name, orig_stdout)
1221
1222def captured_stdout():
1223    """Capture the output of sys.stdout:
1224
1225       with captured_stdout() as s:
1226           print "hello"
1227       self.assertEqual(s.getvalue(), "hello")
1228    """
1229    return captured_output("stdout")
1230
1231def captured_stderr():
1232    return captured_output("stderr")
1233
1234def captured_stdin():
1235    return captured_output("stdin")
1236
1237def gc_collect():
1238    """Force as many objects as possible to be collected.
1239
1240    In non-CPython implementations of Python, this is needed because timely
1241    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1242    this can be the case in case of reference cycles.)  This means that __del__
1243    methods may be called later than expected and weakrefs may remain alive for
1244    longer than expected.  This function tries its best to force all garbage
1245    objects to disappear.
1246    """
1247    gc.collect()
1248    if is_jython:
1249        time.sleep(0.1)
1250    gc.collect()
1251    gc.collect()
1252
1253
1254_header = '2P'
1255if hasattr(sys, "gettotalrefcount"):
1256    _header = '2P' + _header
1257_vheader = _header + 'P'
1258
1259def calcobjsize(fmt):
1260    return struct.calcsize(_header + fmt + '0P')
1261
1262def calcvobjsize(fmt):
1263    return struct.calcsize(_vheader + fmt + '0P')
1264
1265
1266_TPFLAGS_HAVE_GC = 1<<14
1267_TPFLAGS_HEAPTYPE = 1<<9
1268
1269def check_sizeof(test, o, size):
1270    import _testcapi
1271    result = sys.getsizeof(o)
1272    # add GC header size
1273    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1274        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1275        size += _testcapi.SIZEOF_PYGC_HEAD
1276    msg = 'wrong size for %s: got %d, expected %d' \
1277            % (type(o), result, size)
1278    test.assertEqual(result, size, msg)
1279
1280
1281#=======================================================================
1282# Decorator for running a function in a different locale, correctly resetting
1283# it afterwards.
1284
1285def run_with_locale(catstr, *locales):
1286    def decorator(func):
1287        def inner(*args, **kwds):
1288            try:
1289                import locale
1290                category = getattr(locale, catstr)
1291                orig_locale = locale.setlocale(category)
1292            except AttributeError:
1293                # if the test author gives us an invalid category string
1294                raise
1295            except:
1296                # cannot retrieve original locale, so do nothing
1297                locale = orig_locale = None
1298            else:
1299                for loc in locales:
1300                    try:
1301                        locale.setlocale(category, loc)
1302                        break
1303                    except:
1304                        pass
1305
1306            # now run the function, resetting the locale on exceptions
1307            try:
1308                return func(*args, **kwds)
1309            finally:
1310                if locale and orig_locale:
1311                    locale.setlocale(category, orig_locale)
1312        inner.func_name = func.func_name
1313        inner.__doc__ = func.__doc__
1314        return inner
1315    return decorator
1316
1317#=======================================================================
1318# Decorator for running a function in a specific timezone, correctly
1319# resetting it afterwards.
1320
1321def run_with_tz(tz):
1322    def decorator(func):
1323        def inner(*args, **kwds):
1324            try:
1325                tzset = time.tzset
1326            except AttributeError:
1327                raise unittest.SkipTest("tzset required")
1328            if 'TZ' in os.environ:
1329                orig_tz = os.environ['TZ']
1330            else:
1331                orig_tz = None
1332            os.environ['TZ'] = tz
1333            tzset()
1334
1335            # now run the function, resetting the tz on exceptions
1336            try:
1337                return func(*args, **kwds)
1338            finally:
1339                if orig_tz is None:
1340                    del os.environ['TZ']
1341                else:
1342                    os.environ['TZ'] = orig_tz
1343                time.tzset()
1344
1345        inner.__name__ = func.__name__
1346        inner.__doc__ = func.__doc__
1347        return inner
1348    return decorator
1349
1350#=======================================================================
1351# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
1352
1353# Some handy shorthands. Note that these are used for byte-limits as well
1354# as size-limits, in the various bigmem tests
1355_1M = 1024*1024
1356_1G = 1024 * _1M
1357_2G = 2 * _1G
1358_4G = 4 * _1G
1359
1360MAX_Py_ssize_t = sys.maxsize
1361
1362def set_memlimit(limit):
1363    global max_memuse
1364    global real_max_memuse
1365    sizes = {
1366        'k': 1024,
1367        'm': _1M,
1368        'g': _1G,
1369        't': 1024*_1G,
1370    }
1371    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1372                 re.IGNORECASE | re.VERBOSE)
1373    if m is None:
1374        raise ValueError('Invalid memory limit %r' % (limit,))
1375    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1376    real_max_memuse = memlimit
1377    if memlimit > MAX_Py_ssize_t:
1378        memlimit = MAX_Py_ssize_t
1379    if memlimit < _2G - 1:
1380        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1381    max_memuse = memlimit
1382
1383def bigmemtest(minsize, memuse, overhead=5*_1M):
1384    """Decorator for bigmem tests.
1385
1386    'minsize' is the minimum useful size for the test (in arbitrary,
1387    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1388    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
1389    independent of the testsize, and defaults to 5Mb.
1390
1391    The decorator tries to guess a good value for 'size' and passes it to
1392    the decorated test function. If minsize * memuse is more than the
1393    allowed memory use (as defined by max_memuse), the test is skipped.
1394    Otherwise, minsize is adjusted upward to use up to max_memuse.
1395    """
1396    def decorator(f):
1397        def wrapper(self):
1398            if not max_memuse:
1399                # If max_memuse is 0 (the default),
1400                # we still want to run the tests with size set to a few kb,
1401                # to make sure they work. We still want to avoid using
1402                # too much memory, though, but we do that noisily.
1403                maxsize = 5147
1404                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
1405            else:
1406                maxsize = int((max_memuse - overhead) / memuse)
1407                if maxsize < minsize:
1408                    # Really ought to print 'test skipped' or something
1409                    if verbose:
1410                        sys.stderr.write("Skipping %s because of memory "
1411                                         "constraint\n" % (f.__name__,))
1412                    return
1413                # Try to keep some breathing room in memory use
1414                maxsize = max(maxsize - 50 * _1M, minsize)
1415            return f(self, maxsize)
1416        wrapper.minsize = minsize
1417        wrapper.memuse = memuse
1418        wrapper.overhead = overhead
1419        return wrapper
1420    return decorator
1421
1422def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
1423    def decorator(f):
1424        def wrapper(self):
1425            if not real_max_memuse:
1426                maxsize = 5147
1427            else:
1428                maxsize = size
1429
1430            if ((real_max_memuse or not dry_run)
1431                and real_max_memuse < maxsize * memuse):
1432                if verbose:
1433                    sys.stderr.write("Skipping %s because of memory "
1434                                     "constraint\n" % (f.__name__,))
1435                return
1436
1437            return f(self, maxsize)
1438        wrapper.size = size
1439        wrapper.memuse = memuse
1440        wrapper.overhead = overhead
1441        return wrapper
1442    return decorator
1443
1444def bigaddrspacetest(f):
1445    """Decorator for tests that fill the address space."""
1446    def wrapper(self):
1447        if max_memuse < MAX_Py_ssize_t:
1448            if verbose:
1449                sys.stderr.write("Skipping %s because of memory "
1450                                 "constraint\n" % (f.__name__,))
1451        else:
1452            return f(self)
1453    return wrapper
1454
1455#=======================================================================
1456# unittest integration.
1457
1458class BasicTestRunner:
1459    def run(self, test):
1460        result = unittest.TestResult()
1461        test(result)
1462        return result
1463
1464def _id(obj):
1465    return obj
1466
1467def requires_resource(resource):
1468    if resource == 'gui' and not _is_gui_available():
1469        return unittest.skip(_is_gui_available.reason)
1470    if is_resource_enabled(resource):
1471        return _id
1472    else:
1473        return unittest.skip("resource {0!r} is not enabled".format(resource))
1474
1475def cpython_only(test):
1476    """
1477    Decorator for tests only applicable on CPython.
1478    """
1479    return impl_detail(cpython=True)(test)
1480
1481def impl_detail(msg=None, **guards):
1482    if check_impl_detail(**guards):
1483        return _id
1484    if msg is None:
1485        guardnames, default = _parse_guards(guards)
1486        if default:
1487            msg = "implementation detail not available on {0}"
1488        else:
1489            msg = "implementation detail specific to {0}"
1490        guardnames = sorted(guardnames.keys())
1491        msg = msg.format(' or '.join(guardnames))
1492    return unittest.skip(msg)
1493
1494def _parse_guards(guards):
1495    # Returns a tuple ({platform_name: run_me}, default_value)
1496    if not guards:
1497        return ({'cpython': True}, False)
1498    is_true = guards.values()[0]
1499    assert guards.values() == [is_true] * len(guards)   # all True or all False
1500    return (guards, not is_true)
1501
1502# Use the following check to guard CPython's implementation-specific tests --
1503# or to run them only on the implementation(s) guarded by the arguments.
1504def check_impl_detail(**guards):
1505    """This function returns True or False depending on the host platform.
1506       Examples:
1507          if check_impl_detail():               # only on CPython (default)
1508          if check_impl_detail(jython=True):    # only on Jython
1509          if check_impl_detail(cpython=False):  # everywhere except on CPython
1510    """
1511    guards, default = _parse_guards(guards)
1512    return guards.get(platform.python_implementation().lower(), default)
1513
1514
1515def _filter_suite(suite, pred):
1516    """Recursively filter test cases in a suite based on a predicate."""
1517    newtests = []
1518    for test in suite._tests:
1519        if isinstance(test, unittest.TestSuite):
1520            _filter_suite(test, pred)
1521            newtests.append(test)
1522        else:
1523            if pred(test):
1524                newtests.append(test)
1525    suite._tests = newtests
1526
1527def _run_suite(suite):
1528    """Run tests from a unittest.TestSuite-derived class."""
1529    if verbose:
1530        runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
1531                                         failfast=failfast)
1532    else:
1533        runner = BasicTestRunner()
1534
1535    result = runner.run(suite)
1536    if not result.wasSuccessful():
1537        if len(result.errors) == 1 and not result.failures:
1538            err = result.errors[0][1]
1539        elif len(result.failures) == 1 and not result.errors:
1540            err = result.failures[0][1]
1541        else:
1542            err = "multiple errors occurred"
1543            if not verbose:
1544                err += "; run in verbose mode for details"
1545        raise TestFailed(err)
1546
1547
1548# By default, don't filter tests
1549_match_test_func = None
1550_match_test_patterns = None
1551
1552
1553def match_test(test):
1554    # Function used by support.run_unittest() and regrtest --list-cases
1555    if _match_test_func is None:
1556        return True
1557    else:
1558        return _match_test_func(test.id())
1559
1560
1561def _is_full_match_test(pattern):
1562    # If a pattern contains at least one dot, it's considered
1563    # as a full test identifier.
1564    # Example: 'test.test_os.FileTests.test_access'.
1565    #
1566    # Reject patterns which contain fnmatch patterns: '*', '?', '[...]'
1567    # or '[!...]'. For example, reject 'test_access*'.
1568    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1569
1570
1571def set_match_tests(patterns):
1572    global _match_test_func, _match_test_patterns
1573
1574    if patterns == _match_test_patterns:
1575        # No change: no need to recompile patterns.
1576        return
1577
1578    if not patterns:
1579        func = None
1580        # set_match_tests(None) behaves as set_match_tests(())
1581        patterns = ()
1582    elif all(map(_is_full_match_test, patterns)):
1583        # Simple case: all patterns are full test identifier.
1584        # The test.bisect utility only uses such full test identifiers.
1585        func = set(patterns).__contains__
1586    else:
1587        regex = '|'.join(map(fnmatch.translate, patterns))
1588        # The search *is* case sensitive on purpose:
1589        # don't use flags=re.IGNORECASE
1590        regex_match = re.compile(regex).match
1591
1592        def match_test_regex(test_id):
1593            if regex_match(test_id):
1594                # The regex matchs the whole identifier like
1595                # 'test.test_os.FileTests.test_access'
1596                return True
1597            else:
1598                # Try to match parts of the test identifier.
1599                # For example, split 'test.test_os.FileTests.test_access'
1600                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1601                return any(map(regex_match, test_id.split(".")))
1602
1603        func = match_test_regex
1604
1605    # Create a copy since patterns can be mutable and so modified later
1606    _match_test_patterns = tuple(patterns)
1607    _match_test_func = func
1608
1609
1610
1611def run_unittest(*classes):
1612    """Run tests from unittest.TestCase-derived classes."""
1613    valid_types = (unittest.TestSuite, unittest.TestCase)
1614    suite = unittest.TestSuite()
1615    for cls in classes:
1616        if isinstance(cls, str):
1617            if cls in sys.modules:
1618                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1619            else:
1620                raise ValueError("str arguments must be keys in sys.modules")
1621        elif isinstance(cls, valid_types):
1622            suite.addTest(cls)
1623        else:
1624            suite.addTest(unittest.makeSuite(cls))
1625    _filter_suite(suite, match_test)
1626    _run_suite(suite)
1627
1628#=======================================================================
1629# Check for the presence of docstrings.
1630
1631HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1632                   sys.platform == 'win32' or
1633                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
1634
1635requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1636                                          "test requires docstrings")
1637
1638
1639#=======================================================================
1640# doctest driver.
1641
1642def run_doctest(module, verbosity=None):
1643    """Run doctest on the given module.  Return (#failures, #tests).
1644
1645    If optional argument verbosity is not specified (or is None), pass
1646    test.support's belief about verbosity on to doctest.  Else doctest's
1647    usual behavior is used (it searches sys.argv for -v).
1648    """
1649
1650    import doctest
1651
1652    if verbosity is None:
1653        verbosity = verbose
1654    else:
1655        verbosity = None
1656
1657    # Direct doctest output (normally just errors) to real stdout; doctest
1658    # output shouldn't be compared by regrtest.
1659    save_stdout = sys.stdout
1660    sys.stdout = get_original_stdout()
1661    try:
1662        f, t = doctest.testmod(module, verbose=verbosity)
1663        if f:
1664            raise TestFailed("%d of %d doctests failed" % (f, t))
1665    finally:
1666        sys.stdout = save_stdout
1667    if verbose:
1668        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1669    return f, t
1670
1671#=======================================================================
1672# Threading support to prevent reporting refleaks when running regrtest.py -R
1673
1674# Flag used by saved_test_environment of test.libregrtest.save_env,
1675# to check if a test modified the environment. The flag should be set to False
1676# before running a new test.
1677#
1678# For example, threading_cleanup() sets the flag is the function fails
1679# to cleanup threads.
1680environment_altered = False
1681
1682# NOTE: we use thread._count() rather than threading.enumerate() (or the
1683# moral equivalent thereof) because a threading.Thread object is still alive
1684# until its __bootstrap() method has returned, even after it has been
1685# unregistered from the threading module.
1686# thread._count(), on the other hand, only gets decremented *after* the
1687# __bootstrap() method has returned, which gives us reliable reference counts
1688# at the end of a test run.
1689
1690def threading_setup():
1691    if thread:
1692        return thread._count(),
1693    else:
1694        return 1,
1695
1696def threading_cleanup(nb_threads):
1697    if not thread:
1698        return
1699
1700    _MAX_COUNT = 10
1701    for count in range(_MAX_COUNT):
1702        n = thread._count()
1703        if n == nb_threads:
1704            break
1705        time.sleep(0.1)
1706    # XXX print a warning in case of failure?
1707
1708def reap_threads(func):
1709    """Use this function when threads are being used.  This will
1710    ensure that the threads are cleaned up even when the test fails.
1711    If threading is unavailable this function does nothing.
1712    """
1713    if not thread:
1714        return func
1715
1716    @functools.wraps(func)
1717    def decorator(*args):
1718        key = threading_setup()
1719        try:
1720            return func(*args)
1721        finally:
1722            threading_cleanup(*key)
1723    return decorator
1724
1725
1726@contextlib.contextmanager
1727def wait_threads_exit(timeout=60.0):
1728    """
1729    bpo-31234: Context manager to wait until all threads created in the with
1730    statement exit.
1731
1732    Use thread.count() to check if threads exited. Indirectly, wait until
1733    threads exit the internal t_bootstrap() C function of the thread module.
1734
1735    threading_setup() and threading_cleanup() are designed to emit a warning
1736    if a test leaves running threads in the background. This context manager
1737    is designed to cleanup threads started by the thread.start_new_thread()
1738    which doesn't allow to wait for thread exit, whereas thread.Thread has a
1739    join() method.
1740    """
1741    old_count = thread._count()
1742    try:
1743        yield
1744    finally:
1745        start_time = time.time()
1746        deadline = start_time + timeout
1747        while True:
1748            count = thread._count()
1749            if count <= old_count:
1750                break
1751            if time.time() > deadline:
1752                dt = time.time() - start_time
1753                msg = ("wait_threads() failed to cleanup %s "
1754                       "threads after %.1f seconds "
1755                       "(count: %s, old count: %s)"
1756                       % (count - old_count, dt, count, old_count))
1757                raise AssertionError(msg)
1758            time.sleep(0.010)
1759            gc_collect()
1760
1761
1762def reap_children():
1763    """Use this function at the end of test_main() whenever sub-processes
1764    are started.  This will help ensure that no extra children (zombies)
1765    stick around to hog resources and create problems when looking
1766    for refleaks.
1767    """
1768
1769    # Reap all our dead child processes so we don't leave zombies around.
1770    # These hog resources and might be causing some of the buildbots to die.
1771    if hasattr(os, 'waitpid'):
1772        any_process = -1
1773        while True:
1774            try:
1775                # This will raise an exception on Windows.  That's ok.
1776                pid, status = os.waitpid(any_process, os.WNOHANG)
1777                if pid == 0:
1778                    break
1779            except:
1780                break
1781
1782@contextlib.contextmanager
1783def start_threads(threads, unlock=None):
1784    threads = list(threads)
1785    started = []
1786    try:
1787        try:
1788            for t in threads:
1789                t.start()
1790                started.append(t)
1791        except:
1792            if verbose:
1793                print("Can't start %d threads, only %d threads started" %
1794                      (len(threads), len(started)))
1795            raise
1796        yield
1797    finally:
1798        if unlock:
1799            unlock()
1800        endtime = starttime = time.time()
1801        for timeout in range(1, 16):
1802            endtime += 60
1803            for t in started:
1804                t.join(max(endtime - time.time(), 0.01))
1805            started = [t for t in started if t.isAlive()]
1806            if not started:
1807                break
1808            if verbose:
1809                print('Unable to join %d threads during a period of '
1810                      '%d minutes' % (len(started), timeout))
1811    started = [t for t in started if t.isAlive()]
1812    if started:
1813        raise AssertionError('Unable to join %d threads' % len(started))
1814
1815@contextlib.contextmanager
1816def swap_attr(obj, attr, new_val):
1817    """Temporary swap out an attribute with a new object.
1818
1819    Usage:
1820        with swap_attr(obj, "attr", 5):
1821            ...
1822
1823        This will set obj.attr to 5 for the duration of the with: block,
1824        restoring the old value at the end of the block. If `attr` doesn't
1825        exist on `obj`, it will be created and then deleted at the end of the
1826        block.
1827
1828        The old value (or None if it doesn't exist) will be assigned to the
1829        target of the "as" clause, if there is one.
1830    """
1831    if hasattr(obj, attr):
1832        real_val = getattr(obj, attr)
1833        setattr(obj, attr, new_val)
1834        try:
1835            yield real_val
1836        finally:
1837            setattr(obj, attr, real_val)
1838    else:
1839        setattr(obj, attr, new_val)
1840        try:
1841            yield
1842        finally:
1843            if hasattr(obj, attr):
1844                delattr(obj, attr)
1845
1846@contextlib.contextmanager
1847def swap_item(obj, item, new_val):
1848    """Temporary swap out an item with a new object.
1849
1850    Usage:
1851        with swap_item(obj, "item", 5):
1852            ...
1853
1854        This will set obj["item"] to 5 for the duration of the with: block,
1855        restoring the old value at the end of the block. If `item` doesn't
1856        exist on `obj`, it will be created and then deleted at the end of the
1857        block.
1858
1859        The old value (or None if it doesn't exist) will be assigned to the
1860        target of the "as" clause, if there is one.
1861    """
1862    if item in obj:
1863        real_val = obj[item]
1864        obj[item] = new_val
1865        try:
1866            yield real_val
1867        finally:
1868            obj[item] = real_val
1869    else:
1870        obj[item] = new_val
1871        try:
1872            yield
1873        finally:
1874            if item in obj:
1875                del obj[item]
1876
1877def py3k_bytes(b):
1878    """Emulate the py3k bytes() constructor.
1879
1880    NOTE: This is only a best effort function.
1881    """
1882    try:
1883        # memoryview?
1884        return b.tobytes()
1885    except AttributeError:
1886        try:
1887            # iterable of ints?
1888            return b"".join(chr(x) for x in b)
1889        except TypeError:
1890            return bytes(b)
1891
1892requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
1893                        'types are immortal if COUNT_ALLOCS is defined')
1894
1895def args_from_interpreter_flags():
1896    """Return a list of command-line arguments reproducing the current
1897    settings in sys.flags."""
1898    import subprocess
1899    return subprocess._args_from_interpreter_flags()
1900
1901def strip_python_stderr(stderr):
1902    """Strip the stderr of a Python process from potential debug output
1903    emitted by the interpreter.
1904
1905    This will typically be run on the result of the communicate() method
1906    of a subprocess.Popen object.
1907    """
1908    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
1909    return stderr
1910
1911
1912def check_free_after_iterating(test, iter, cls, args=()):
1913    class A(cls):
1914        def __del__(self):
1915            done[0] = True
1916            try:
1917                next(it)
1918            except StopIteration:
1919                pass
1920
1921    done = [False]
1922    it = iter(A(*args))
1923    # Issue 26494: Shouldn't crash
1924    test.assertRaises(StopIteration, next, it)
1925    # The sequence should be deallocated just after the end of iterating
1926    gc_collect()
1927    test.assertTrue(done[0])
1928
1929@contextlib.contextmanager
1930def disable_gc():
1931    have_gc = gc.isenabled()
1932    gc.disable()
1933    try:
1934        yield
1935    finally:
1936        if have_gc:
1937            gc.enable()
1938
1939
1940def python_is_optimized():
1941    """Find if Python was built with optimizations."""
1942    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1943    final_opt = ""
1944    for opt in cflags.split():
1945        if opt.startswith('-O'):
1946            final_opt = opt
1947    return final_opt not in ('', '-O0', '-Og')
1948
1949
1950class SuppressCrashReport:
1951    """Try to prevent a crash report from popping up.
1952
1953    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
1954    disable the creation of coredump file.
1955    """
1956    old_value = None
1957    old_modes = None
1958
1959    def __enter__(self):
1960        """On Windows, disable Windows Error Reporting dialogs using
1961        SetErrorMode.
1962
1963        On UNIX, try to save the previous core file size limit, then set
1964        soft limit to 0.
1965        """
1966        if sys.platform.startswith('win'):
1967            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1968            # GetErrorMode is not available on Windows XP and Windows Server 2003,
1969            # but SetErrorMode returns the previous value, so we can use that
1970            import ctypes
1971            self._k32 = ctypes.windll.kernel32
1972            SEM_NOGPFAULTERRORBOX = 0x02
1973            self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
1974            self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
1975
1976            # Suppress assert dialogs in debug builds
1977            # (see http://bugs.python.org/issue23314)
1978            try:
1979                import _testcapi
1980                _testcapi.CrtSetReportMode
1981            except (AttributeError, ImportError):
1982                # no _testcapi or a release build
1983                pass
1984            else:
1985                self.old_modes = {}
1986                for report_type in [_testcapi.CRT_WARN,
1987                                    _testcapi.CRT_ERROR,
1988                                    _testcapi.CRT_ASSERT]:
1989                    old_mode = _testcapi.CrtSetReportMode(report_type,
1990                            _testcapi.CRTDBG_MODE_FILE)
1991                    old_file = _testcapi.CrtSetReportFile(report_type,
1992                            _testcapi.CRTDBG_FILE_STDERR)
1993                    self.old_modes[report_type] = old_mode, old_file
1994
1995        else:
1996            try:
1997                import resource
1998            except ImportError:
1999                resource = None
2000
2001            if resource is not None:
2002                try:
2003                    self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
2004                    resource.setrlimit(resource.RLIMIT_CORE,
2005                                       (0, self.old_value[1]))
2006                except (ValueError, OSError):
2007                    pass
2008
2009            if sys.platform == 'darwin':
2010                # Check if the 'Crash Reporter' on OSX was configured
2011                # in 'Developer' mode and warn that it will get triggered
2012                # when it is.
2013                #
2014                # This assumes that this context manager is used in tests
2015                # that might trigger the next manager.
2016                import subprocess
2017                cmd = ['/usr/bin/defaults', 'read',
2018                       'com.apple.CrashReporter', 'DialogType']
2019                proc = subprocess.Popen(cmd,
2020                                        stdout=subprocess.PIPE,
2021                                        stderr=subprocess.PIPE)
2022                stdout = proc.communicate()[0]
2023                if stdout.strip() == b'developer':
2024                    sys.stdout.write("this test triggers the Crash Reporter, "
2025                                     "that is intentional")
2026                    sys.stdout.flush()
2027
2028        return self
2029
2030    def __exit__(self, *ignore_exc):
2031        """Restore Windows ErrorMode or core file behavior to initial value."""
2032        if self.old_value is None:
2033            return
2034
2035        if sys.platform.startswith('win'):
2036            self._k32.SetErrorMode(self.old_value)
2037
2038            if self.old_modes:
2039                import _testcapi
2040                for report_type, (old_mode, old_file) in self.old_modes.items():
2041                    _testcapi.CrtSetReportMode(report_type, old_mode)
2042                    _testcapi.CrtSetReportFile(report_type, old_file)
2043        else:
2044            import resource
2045            try:
2046                resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
2047            except (ValueError, OSError):
2048                pass
2049
2050
2051def _crash_python():
2052    """Deliberate crash of Python.
2053
2054    Python can be killed by a segmentation fault (SIGSEGV), a bus error
2055    (SIGBUS), or a different error depending on the platform.
2056
2057    Use SuppressCrashReport() to prevent a crash report from popping up.
2058    """
2059
2060    import _testcapi
2061    with SuppressCrashReport():
2062        _testcapi._read_null()
2063
2064
2065def fd_count():
2066    """Count the number of open file descriptors.
2067    """
2068    if sys.platform.startswith(('linux', 'freebsd')):
2069        try:
2070            names = os.listdir("/proc/self/fd")
2071            # Substract one because listdir() opens internally a file
2072            # descriptor to list the content of the /proc/self/fd/ directory.
2073            return len(names) - 1
2074        except OSError as exc:
2075            if exc.errno != errno.ENOENT:
2076                raise
2077
2078    MAXFD = 256
2079    if hasattr(os, 'sysconf'):
2080        try:
2081            MAXFD = os.sysconf("SC_OPEN_MAX")
2082        except OSError:
2083            pass
2084
2085    old_modes = None
2086    if sys.platform == 'win32':
2087        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
2088        # on invalid file descriptor if Python is compiled in debug mode
2089        try:
2090            import msvcrt
2091            msvcrt.CrtSetReportMode
2092        except (AttributeError, ImportError):
2093            # no msvcrt or a release build
2094            pass
2095        else:
2096            old_modes = {}
2097            for report_type in (msvcrt.CRT_WARN,
2098                                msvcrt.CRT_ERROR,
2099                                msvcrt.CRT_ASSERT):
2100                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
2101
2102    try:
2103        count = 0
2104        for fd in range(MAXFD):
2105            try:
2106                # Prefer dup() over fstat(). fstat() can require input/output
2107                # whereas dup() doesn't.
2108                fd2 = os.dup(fd)
2109            except OSError as e:
2110                if e.errno != errno.EBADF:
2111                    raise
2112            else:
2113                os.close(fd2)
2114                count += 1
2115    finally:
2116        if old_modes is not None:
2117            for report_type in (msvcrt.CRT_WARN,
2118                                msvcrt.CRT_ERROR,
2119                                msvcrt.CRT_ASSERT):
2120                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
2121
2122    return count
2123
2124
2125class SaveSignals:
2126    """
2127    Save an restore signal handlers.
2128
2129    This class is only able to save/restore signal handlers registered
2130    by the Python signal module: see bpo-13285 for "external" signal
2131    handlers.
2132    """
2133
2134    def __init__(self):
2135        import signal
2136        self.signal = signal
2137        self.signals = list(range(1, signal.NSIG))
2138        # SIGKILL and SIGSTOP signals cannot be ignored nor catched
2139        for signame in ('SIGKILL', 'SIGSTOP'):
2140            try:
2141                signum = getattr(signal, signame)
2142            except AttributeError:
2143                continue
2144            self.signals.remove(signum)
2145        self.handlers = {}
2146
2147    def save(self):
2148        for signum in self.signals:
2149            handler = self.signal.getsignal(signum)
2150            if handler is None:
2151                # getsignal() returns None if a signal handler was not
2152                # registered by the Python signal module,
2153                # and the handler is not SIG_DFL nor SIG_IGN.
2154                #
2155                # Ignore the signal: we cannot restore the handler.
2156                continue
2157            self.handlers[signum] = handler
2158
2159    def restore(self):
2160        for signum, handler in self.handlers.items():
2161            self.signal.signal(signum, handler)
2162