1import builtins
2import locale
3import logging
4import os
5import shutil
6import sys
7import sysconfig
8import threading
9import warnings
10from test import support
11try:
12    import _multiprocessing, multiprocessing.process
13except ImportError:
14    multiprocessing = None
15
16
17# Unit tests are supposed to leave the execution environment unchanged
18# once they complete.  But sometimes tests have bugs, especially when
19# tests fail, and the changes to environment go on to mess up other
20# tests.  This can cause issues with buildbot stability, since tests
21# are run in random order and so problems may appear to come and go.
22# There are a few things we can save and restore to mitigate this, and
23# the following context manager handles this task.
24
25class saved_test_environment:
26    """Save bits of the test environment and restore them at block exit.
27
28        with saved_test_environment(testname, verbose, quiet):
29            #stuff
30
31    Unless quiet is True, a warning is printed to stderr if any of
32    the saved items was changed by the test.  The attribute 'changed'
33    is initially False, but is set to True if a change is detected.
34
35    If verbose is more than 1, the before and after state of changed
36    items is also printed.
37    """
38
39    changed = False
40
41    def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
42        self.testname = testname
43        self.verbose = verbose
44        self.quiet = quiet
45        self.pgo = pgo
46
47    # To add things to save and restore, add a name XXX to the resources list
48    # and add corresponding get_XXX/restore_XXX functions.  get_XXX should
49    # return the value to be saved and compared against a second call to the
50    # get function when test execution completes.  restore_XXX should accept
51    # the saved value and restore the resource using it.  It will be called if
52    # and only if a change in the value is detected.
53    #
54    # Note: XXX will have any '.' replaced with '_' characters when determining
55    # the corresponding method names.
56
57    resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
58                 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
59                 'warnings.filters', 'asyncore.socket_map',
60                 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
61                 'sys.warnoptions',
62                 # multiprocessing.process._cleanup() may release ref
63                 # to a thread, so check processes first.
64                 'multiprocessing.process._dangling', 'threading._dangling',
65                 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
66                 'files', 'locale', 'warnings.showwarning',
67                 'shutil_archive_formats', 'shutil_unpack_formats',
68                )
69
70    def get_sys_argv(self):
71        return id(sys.argv), sys.argv, sys.argv[:]
72    def restore_sys_argv(self, saved_argv):
73        sys.argv = saved_argv[1]
74        sys.argv[:] = saved_argv[2]
75
76    def get_cwd(self):
77        return os.getcwd()
78    def restore_cwd(self, saved_cwd):
79        os.chdir(saved_cwd)
80
81    def get_sys_stdout(self):
82        return sys.stdout
83    def restore_sys_stdout(self, saved_stdout):
84        sys.stdout = saved_stdout
85
86    def get_sys_stderr(self):
87        return sys.stderr
88    def restore_sys_stderr(self, saved_stderr):
89        sys.stderr = saved_stderr
90
91    def get_sys_stdin(self):
92        return sys.stdin
93    def restore_sys_stdin(self, saved_stdin):
94        sys.stdin = saved_stdin
95
96    def get_os_environ(self):
97        return id(os.environ), os.environ, dict(os.environ)
98    def restore_os_environ(self, saved_environ):
99        os.environ = saved_environ[1]
100        os.environ.clear()
101        os.environ.update(saved_environ[2])
102
103    def get_sys_path(self):
104        return id(sys.path), sys.path, sys.path[:]
105    def restore_sys_path(self, saved_path):
106        sys.path = saved_path[1]
107        sys.path[:] = saved_path[2]
108
109    def get_sys_path_hooks(self):
110        return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
111    def restore_sys_path_hooks(self, saved_hooks):
112        sys.path_hooks = saved_hooks[1]
113        sys.path_hooks[:] = saved_hooks[2]
114
115    def get_sys_gettrace(self):
116        return sys.gettrace()
117    def restore_sys_gettrace(self, trace_fxn):
118        sys.settrace(trace_fxn)
119
120    def get___import__(self):
121        return builtins.__import__
122    def restore___import__(self, import_):
123        builtins.__import__ = import_
124
125    def get_warnings_filters(self):
126        return id(warnings.filters), warnings.filters, warnings.filters[:]
127    def restore_warnings_filters(self, saved_filters):
128        warnings.filters = saved_filters[1]
129        warnings.filters[:] = saved_filters[2]
130
131    def get_asyncore_socket_map(self):
132        asyncore = sys.modules.get('asyncore')
133        # XXX Making a copy keeps objects alive until __exit__ gets called.
134        return asyncore and asyncore.socket_map.copy() or {}
135    def restore_asyncore_socket_map(self, saved_map):
136        asyncore = sys.modules.get('asyncore')
137        if asyncore is not None:
138            asyncore.close_all(ignore_all=True)
139            asyncore.socket_map.update(saved_map)
140
141    def get_shutil_archive_formats(self):
142        # we could call get_archives_formats() but that only returns the
143        # registry keys; we want to check the values too (the functions that
144        # are registered)
145        return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
146    def restore_shutil_archive_formats(self, saved):
147        shutil._ARCHIVE_FORMATS = saved[0]
148        shutil._ARCHIVE_FORMATS.clear()
149        shutil._ARCHIVE_FORMATS.update(saved[1])
150
151    def get_shutil_unpack_formats(self):
152        return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
153    def restore_shutil_unpack_formats(self, saved):
154        shutil._UNPACK_FORMATS = saved[0]
155        shutil._UNPACK_FORMATS.clear()
156        shutil._UNPACK_FORMATS.update(saved[1])
157
158    def get_logging__handlers(self):
159        # _handlers is a WeakValueDictionary
160        return id(logging._handlers), logging._handlers, logging._handlers.copy()
161    def restore_logging__handlers(self, saved_handlers):
162        # Can't easily revert the logging state
163        pass
164
165    def get_logging__handlerList(self):
166        # _handlerList is a list of weakrefs to handlers
167        return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
168    def restore_logging__handlerList(self, saved_handlerList):
169        # Can't easily revert the logging state
170        pass
171
172    def get_sys_warnoptions(self):
173        return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
174    def restore_sys_warnoptions(self, saved_options):
175        sys.warnoptions = saved_options[1]
176        sys.warnoptions[:] = saved_options[2]
177
178    # Controlling dangling references to Thread objects can make it easier
179    # to track reference leaks.
180    def get_threading__dangling(self):
181        # This copies the weakrefs without making any strong reference
182        return threading._dangling.copy()
183    def restore_threading__dangling(self, saved):
184        threading._dangling.clear()
185        threading._dangling.update(saved)
186
187    # Same for Process objects
188    def get_multiprocessing_process__dangling(self):
189        if not multiprocessing:
190            return None
191        # Unjoined process objects can survive after process exits
192        multiprocessing.process._cleanup()
193        # This copies the weakrefs without making any strong reference
194        return multiprocessing.process._dangling.copy()
195    def restore_multiprocessing_process__dangling(self, saved):
196        if not multiprocessing:
197            return
198        multiprocessing.process._dangling.clear()
199        multiprocessing.process._dangling.update(saved)
200
201    def get_sysconfig__CONFIG_VARS(self):
202        # make sure the dict is initialized
203        sysconfig.get_config_var('prefix')
204        return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
205                dict(sysconfig._CONFIG_VARS))
206    def restore_sysconfig__CONFIG_VARS(self, saved):
207        sysconfig._CONFIG_VARS = saved[1]
208        sysconfig._CONFIG_VARS.clear()
209        sysconfig._CONFIG_VARS.update(saved[2])
210
211    def get_sysconfig__INSTALL_SCHEMES(self):
212        return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
213                sysconfig._INSTALL_SCHEMES.copy())
214    def restore_sysconfig__INSTALL_SCHEMES(self, saved):
215        sysconfig._INSTALL_SCHEMES = saved[1]
216        sysconfig._INSTALL_SCHEMES.clear()
217        sysconfig._INSTALL_SCHEMES.update(saved[2])
218
219    def get_files(self):
220        return sorted(fn + ('/' if os.path.isdir(fn) else '')
221                      for fn in os.listdir())
222    def restore_files(self, saved_value):
223        fn = support.TESTFN
224        if fn not in saved_value and (fn + '/') not in saved_value:
225            if os.path.isfile(fn):
226                support.unlink(fn)
227            elif os.path.isdir(fn):
228                support.rmtree(fn)
229
230    _lc = [getattr(locale, lc) for lc in dir(locale)
231           if lc.startswith('LC_')]
232    def get_locale(self):
233        pairings = []
234        for lc in self._lc:
235            try:
236                pairings.append((lc, locale.setlocale(lc, None)))
237            except (TypeError, ValueError):
238                continue
239        return pairings
240    def restore_locale(self, saved):
241        for lc, setting in saved:
242            locale.setlocale(lc, setting)
243
244    def get_warnings_showwarning(self):
245        return warnings.showwarning
246    def restore_warnings_showwarning(self, fxn):
247        warnings.showwarning = fxn
248
249    def resource_info(self):
250        for name in self.resources:
251            method_suffix = name.replace('.', '_')
252            get_name = 'get_' + method_suffix
253            restore_name = 'restore_' + method_suffix
254            yield name, getattr(self, get_name), getattr(self, restore_name)
255
256    def __enter__(self):
257        self.saved_values = dict((name, get()) for name, get, restore
258                                                   in self.resource_info())
259        return self
260
261    def __exit__(self, exc_type, exc_val, exc_tb):
262        saved_values = self.saved_values
263        del self.saved_values
264
265        # Some resources use weak references
266        support.gc_collect()
267
268        # Read support.environment_altered, set by support helper functions
269        self.changed |= support.environment_altered
270
271        for name, get, restore in self.resource_info():
272            current = get()
273            original = saved_values.pop(name)
274            # Check for changes to the resource's value
275            if current != original:
276                self.changed = True
277                restore(original)
278                if not self.quiet and not self.pgo:
279                    print(f"Warning -- {name} was modified by {self.testname}",
280                          file=sys.stderr, flush=True)
281                    print(f"  Before: {original}\n  After:  {current} ",
282                          file=sys.stderr, flush=True)
283        return False
284