1import errno
2import importlib
3import io
4import os
5import shutil
6import socket
7import stat
8import subprocess
9import sys
10import tempfile
11import textwrap
12import time
13import unittest
14from test import support
15from test.support import script_helper
16from test.support import socket_helper
17
18TESTFN = support.TESTFN
19
20
21class TestSupport(unittest.TestCase):
22
23    def test_import_module(self):
24        support.import_module("ftplib")
25        self.assertRaises(unittest.SkipTest, support.import_module, "foo")
26
27    def test_import_fresh_module(self):
28        support.import_fresh_module("ftplib")
29
30    def test_get_attribute(self):
31        self.assertEqual(support.get_attribute(self, "test_get_attribute"),
32                        self.test_get_attribute)
33        self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo")
34
35    @unittest.skip("failing buildbots")
36    def test_get_original_stdout(self):
37        self.assertEqual(support.get_original_stdout(), sys.stdout)
38
39    def test_unload(self):
40        import sched
41        self.assertIn("sched", sys.modules)
42        support.unload("sched")
43        self.assertNotIn("sched", sys.modules)
44
45    def test_unlink(self):
46        with open(TESTFN, "w") as f:
47            pass
48        support.unlink(TESTFN)
49        self.assertFalse(os.path.exists(TESTFN))
50        support.unlink(TESTFN)
51
52    def test_rmtree(self):
53        dirpath = support.TESTFN + 'd'
54        subdirpath = os.path.join(dirpath, 'subdir')
55        os.mkdir(dirpath)
56        os.mkdir(subdirpath)
57        support.rmtree(dirpath)
58        self.assertFalse(os.path.exists(dirpath))
59        with support.swap_attr(support, 'verbose', 0):
60            support.rmtree(dirpath)
61
62        os.mkdir(dirpath)
63        os.mkdir(subdirpath)
64        os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR)
65        with support.swap_attr(support, 'verbose', 0):
66            support.rmtree(dirpath)
67        self.assertFalse(os.path.exists(dirpath))
68
69        os.mkdir(dirpath)
70        os.mkdir(subdirpath)
71        os.chmod(dirpath, 0)
72        with support.swap_attr(support, 'verbose', 0):
73            support.rmtree(dirpath)
74        self.assertFalse(os.path.exists(dirpath))
75
76    def test_forget(self):
77        mod_filename = TESTFN + '.py'
78        with open(mod_filename, 'w') as f:
79            print('foo = 1', file=f)
80        sys.path.insert(0, os.curdir)
81        importlib.invalidate_caches()
82        try:
83            mod = __import__(TESTFN)
84            self.assertIn(TESTFN, sys.modules)
85
86            support.forget(TESTFN)
87            self.assertNotIn(TESTFN, sys.modules)
88        finally:
89            del sys.path[0]
90            support.unlink(mod_filename)
91            support.rmtree('__pycache__')
92
93    def test_HOST(self):
94        s = socket.create_server((socket_helper.HOST, 0))
95        s.close()
96
97    def test_find_unused_port(self):
98        port = socket_helper.find_unused_port()
99        s = socket.create_server((socket_helper.HOST, port))
100        s.close()
101
102    def test_bind_port(self):
103        s = socket.socket()
104        socket_helper.bind_port(s)
105        s.listen()
106        s.close()
107
108    # Tests for temp_dir()
109
110    def test_temp_dir(self):
111        """Test that temp_dir() creates and destroys its directory."""
112        parent_dir = tempfile.mkdtemp()
113        parent_dir = os.path.realpath(parent_dir)
114
115        try:
116            path = os.path.join(parent_dir, 'temp')
117            self.assertFalse(os.path.isdir(path))
118            with support.temp_dir(path) as temp_path:
119                self.assertEqual(temp_path, path)
120                self.assertTrue(os.path.isdir(path))
121            self.assertFalse(os.path.isdir(path))
122        finally:
123            support.rmtree(parent_dir)
124
125    def test_temp_dir__path_none(self):
126        """Test passing no path."""
127        with support.temp_dir() as temp_path:
128            self.assertTrue(os.path.isdir(temp_path))
129        self.assertFalse(os.path.isdir(temp_path))
130
131    def test_temp_dir__existing_dir__quiet_default(self):
132        """Test passing a directory that already exists."""
133        def call_temp_dir(path):
134            with support.temp_dir(path) as temp_path:
135                raise Exception("should not get here")
136
137        path = tempfile.mkdtemp()
138        path = os.path.realpath(path)
139        try:
140            self.assertTrue(os.path.isdir(path))
141            self.assertRaises(FileExistsError, call_temp_dir, path)
142            # Make sure temp_dir did not delete the original directory.
143            self.assertTrue(os.path.isdir(path))
144        finally:
145            shutil.rmtree(path)
146
147    def test_temp_dir__existing_dir__quiet_true(self):
148        """Test passing a directory that already exists with quiet=True."""
149        path = tempfile.mkdtemp()
150        path = os.path.realpath(path)
151
152        try:
153            with support.check_warnings() as recorder:
154                with support.temp_dir(path, quiet=True) as temp_path:
155                    self.assertEqual(path, temp_path)
156                warnings = [str(w.message) for w in recorder.warnings]
157            # Make sure temp_dir did not delete the original directory.
158            self.assertTrue(os.path.isdir(path))
159        finally:
160            shutil.rmtree(path)
161
162        self.assertEqual(len(warnings), 1, warnings)
163        warn = warnings[0]
164        self.assertTrue(warn.startswith(f'tests may fail, unable to create '
165                                        f'temporary directory {path!r}: '),
166                        warn)
167
168    @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork")
169    def test_temp_dir__forked_child(self):
170        """Test that a forked child process does not remove the directory."""
171        # See bpo-30028 for details.
172        # Run the test as an external script, because it uses fork.
173        script_helper.assert_python_ok("-c", textwrap.dedent("""
174            import os
175            from test import support
176            with support.temp_cwd() as temp_path:
177                pid = os.fork()
178                if pid != 0:
179                    # parent process
180
181                    # wait for the child to terminate
182                    support.wait_process(pid, exitcode=0)
183
184                    # Make sure that temp_path is still present. When the child
185                    # process leaves the 'temp_cwd'-context, the __exit__()-
186                    # method of the context must not remove the temporary
187                    # directory.
188                    if not os.path.isdir(temp_path):
189                        raise AssertionError("Child removed temp_path.")
190        """))
191
192    # Tests for change_cwd()
193
194    def test_change_cwd(self):
195        original_cwd = os.getcwd()
196
197        with support.temp_dir() as temp_path:
198            with support.change_cwd(temp_path) as new_cwd:
199                self.assertEqual(new_cwd, temp_path)
200                self.assertEqual(os.getcwd(), new_cwd)
201
202        self.assertEqual(os.getcwd(), original_cwd)
203
204    def test_change_cwd__non_existent_dir(self):
205        """Test passing a non-existent directory."""
206        original_cwd = os.getcwd()
207
208        def call_change_cwd(path):
209            with support.change_cwd(path) as new_cwd:
210                raise Exception("should not get here")
211
212        with support.temp_dir() as parent_dir:
213            non_existent_dir = os.path.join(parent_dir, 'does_not_exist')
214            self.assertRaises(FileNotFoundError, call_change_cwd,
215                              non_existent_dir)
216
217        self.assertEqual(os.getcwd(), original_cwd)
218
219    def test_change_cwd__non_existent_dir__quiet_true(self):
220        """Test passing a non-existent directory with quiet=True."""
221        original_cwd = os.getcwd()
222
223        with support.temp_dir() as parent_dir:
224            bad_dir = os.path.join(parent_dir, 'does_not_exist')
225            with support.check_warnings() as recorder:
226                with support.change_cwd(bad_dir, quiet=True) as new_cwd:
227                    self.assertEqual(new_cwd, original_cwd)
228                    self.assertEqual(os.getcwd(), new_cwd)
229                warnings = [str(w.message) for w in recorder.warnings]
230
231        self.assertEqual(len(warnings), 1, warnings)
232        warn = warnings[0]
233        self.assertTrue(warn.startswith(f'tests may fail, unable to change '
234                                        f'the current working directory '
235                                        f'to {bad_dir!r}: '),
236                        warn)
237
238    # Tests for change_cwd()
239
240    def test_change_cwd__chdir_warning(self):
241        """Check the warning message when os.chdir() fails."""
242        path = TESTFN + '_does_not_exist'
243        with support.check_warnings() as recorder:
244            with support.change_cwd(path=path, quiet=True):
245                pass
246            messages = [str(w.message) for w in recorder.warnings]
247
248        self.assertEqual(len(messages), 1, messages)
249        msg = messages[0]
250        self.assertTrue(msg.startswith(f'tests may fail, unable to change '
251                                       f'the current working directory '
252                                       f'to {path!r}: '),
253                        msg)
254
255    # Tests for temp_cwd()
256
257    def test_temp_cwd(self):
258        here = os.getcwd()
259        with support.temp_cwd(name=TESTFN):
260            self.assertEqual(os.path.basename(os.getcwd()), TESTFN)
261        self.assertFalse(os.path.exists(TESTFN))
262        self.assertEqual(os.getcwd(), here)
263
264
265    def test_temp_cwd__name_none(self):
266        """Test passing None to temp_cwd()."""
267        original_cwd = os.getcwd()
268        with support.temp_cwd(name=None) as new_cwd:
269            self.assertNotEqual(new_cwd, original_cwd)
270            self.assertTrue(os.path.isdir(new_cwd))
271            self.assertEqual(os.getcwd(), new_cwd)
272        self.assertEqual(os.getcwd(), original_cwd)
273
274    def test_sortdict(self):
275        self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}")
276
277    def test_make_bad_fd(self):
278        fd = support.make_bad_fd()
279        with self.assertRaises(OSError) as cm:
280            os.write(fd, b"foo")
281        self.assertEqual(cm.exception.errno, errno.EBADF)
282
283    def test_check_syntax_error(self):
284        support.check_syntax_error(self, "def class", lineno=1, offset=5)
285        with self.assertRaises(AssertionError):
286            support.check_syntax_error(self, "x=1")
287
288    def test_CleanImport(self):
289        import importlib
290        with support.CleanImport("asyncore"):
291            importlib.import_module("asyncore")
292
293    def test_DirsOnSysPath(self):
294        with support.DirsOnSysPath('foo', 'bar'):
295            self.assertIn("foo", sys.path)
296            self.assertIn("bar", sys.path)
297        self.assertNotIn("foo", sys.path)
298        self.assertNotIn("bar", sys.path)
299
300    def test_captured_stdout(self):
301        with support.captured_stdout() as stdout:
302            print("hello")
303        self.assertEqual(stdout.getvalue(), "hello\n")
304
305    def test_captured_stderr(self):
306        with support.captured_stderr() as stderr:
307            print("hello", file=sys.stderr)
308        self.assertEqual(stderr.getvalue(), "hello\n")
309
310    def test_captured_stdin(self):
311        with support.captured_stdin() as stdin:
312            stdin.write('hello\n')
313            stdin.seek(0)
314            # call test code that consumes from sys.stdin
315            captured = input()
316        self.assertEqual(captured, "hello")
317
318    def test_gc_collect(self):
319        support.gc_collect()
320
321    def test_python_is_optimized(self):
322        self.assertIsInstance(support.python_is_optimized(), bool)
323
324    def test_swap_attr(self):
325        class Obj:
326            pass
327        obj = Obj()
328        obj.x = 1
329        with support.swap_attr(obj, "x", 5) as x:
330            self.assertEqual(obj.x, 5)
331            self.assertEqual(x, 1)
332        self.assertEqual(obj.x, 1)
333        with support.swap_attr(obj, "y", 5) as y:
334            self.assertEqual(obj.y, 5)
335            self.assertIsNone(y)
336        self.assertFalse(hasattr(obj, 'y'))
337        with support.swap_attr(obj, "y", 5):
338            del obj.y
339        self.assertFalse(hasattr(obj, 'y'))
340
341    def test_swap_item(self):
342        D = {"x":1}
343        with support.swap_item(D, "x", 5) as x:
344            self.assertEqual(D["x"], 5)
345            self.assertEqual(x, 1)
346        self.assertEqual(D["x"], 1)
347        with support.swap_item(D, "y", 5) as y:
348            self.assertEqual(D["y"], 5)
349            self.assertIsNone(y)
350        self.assertNotIn("y", D)
351        with support.swap_item(D, "y", 5):
352            del D["y"]
353        self.assertNotIn("y", D)
354
355    class RefClass:
356        attribute1 = None
357        attribute2 = None
358        _hidden_attribute1 = None
359        __magic_1__ = None
360
361    class OtherClass:
362        attribute2 = None
363        attribute3 = None
364        __magic_1__ = None
365        __magic_2__ = None
366
367    def test_detect_api_mismatch(self):
368        missing_items = support.detect_api_mismatch(self.RefClass,
369                                                    self.OtherClass)
370        self.assertEqual({'attribute1'}, missing_items)
371
372        missing_items = support.detect_api_mismatch(self.OtherClass,
373                                                    self.RefClass)
374        self.assertEqual({'attribute3', '__magic_2__'}, missing_items)
375
376    def test_detect_api_mismatch__ignore(self):
377        ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either']
378
379        missing_items = support.detect_api_mismatch(
380                self.RefClass, self.OtherClass, ignore=ignore)
381        self.assertEqual(set(), missing_items)
382
383        missing_items = support.detect_api_mismatch(
384                self.OtherClass, self.RefClass, ignore=ignore)
385        self.assertEqual(set(), missing_items)
386
387    def test_check__all__(self):
388        extra = {'tempdir'}
389        blacklist = {'template'}
390        support.check__all__(self,
391                             tempfile,
392                             extra=extra,
393                             blacklist=blacklist)
394
395        extra = {'TextTestResult', 'installHandler'}
396        blacklist = {'load_tests', "TestProgram", "BaseTestSuite"}
397
398        support.check__all__(self,
399                             unittest,
400                             ("unittest.result", "unittest.case",
401                              "unittest.suite", "unittest.loader",
402                              "unittest.main", "unittest.runner",
403                              "unittest.signals", "unittest.async_case"),
404                             extra=extra,
405                             blacklist=blacklist)
406
407        self.assertRaises(AssertionError, support.check__all__, self, unittest)
408
409    @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
410                         'need os.waitpid() and os.WNOHANG')
411    def test_reap_children(self):
412        # Make sure that there is no other pending child process
413        support.reap_children()
414
415        # Create a child process
416        pid = os.fork()
417        if pid == 0:
418            # child process: do nothing, just exit
419            os._exit(0)
420
421        t0 = time.monotonic()
422        deadline = time.monotonic() + support.SHORT_TIMEOUT
423
424        was_altered = support.environment_altered
425        try:
426            support.environment_altered = False
427            stderr = io.StringIO()
428
429            while True:
430                if time.monotonic() > deadline:
431                    self.fail("timeout")
432
433                old_stderr = sys.__stderr__
434                try:
435                    sys.__stderr__ = stderr
436                    support.reap_children()
437                finally:
438                    sys.__stderr__ = old_stderr
439
440                # Use environment_altered to check if reap_children() found
441                # the child process
442                if support.environment_altered:
443                    break
444
445                # loop until the child process completed
446                time.sleep(0.100)
447
448            msg = "Warning -- reap_children() reaped child process %s" % pid
449            self.assertIn(msg, stderr.getvalue())
450            self.assertTrue(support.environment_altered)
451        finally:
452            support.environment_altered = was_altered
453
454        # Just in case, check again that there is no other
455        # pending child process
456        support.reap_children()
457
458    def check_options(self, args, func, expected=None):
459        code = f'from test.support import {func}; print(repr({func}()))'
460        cmd = [sys.executable, *args, '-c', code]
461        env = {key: value for key, value in os.environ.items()
462               if not key.startswith('PYTHON')}
463        proc = subprocess.run(cmd,
464                              stdout=subprocess.PIPE,
465                              stderr=subprocess.DEVNULL,
466                              universal_newlines=True,
467                              env=env)
468        if expected is None:
469            expected = args
470        self.assertEqual(proc.stdout.rstrip(), repr(expected))
471        self.assertEqual(proc.returncode, 0)
472
473    def test_args_from_interpreter_flags(self):
474        # Test test.support.args_from_interpreter_flags()
475        for opts in (
476            # no option
477            [],
478            # single option
479            ['-B'],
480            ['-s'],
481            ['-S'],
482            ['-E'],
483            ['-v'],
484            ['-b'],
485            ['-q'],
486            ['-I'],
487            # same option multiple times
488            ['-bb'],
489            ['-vvv'],
490            # -W options
491            ['-Wignore'],
492            # -X options
493            ['-X', 'dev'],
494            ['-Wignore', '-X', 'dev'],
495            ['-X', 'faulthandler'],
496            ['-X', 'importtime'],
497            ['-X', 'showrefcount'],
498            ['-X', 'tracemalloc'],
499            ['-X', 'tracemalloc=3'],
500        ):
501            with self.subTest(opts=opts):
502                self.check_options(opts, 'args_from_interpreter_flags')
503
504        self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags',
505                           ['-I'])
506
507    def test_optim_args_from_interpreter_flags(self):
508        # Test test.support.optim_args_from_interpreter_flags()
509        for opts in (
510            # no option
511            [],
512            ['-O'],
513            ['-OO'],
514            ['-OOOO'],
515        ):
516            with self.subTest(opts=opts):
517                self.check_options(opts, 'optim_args_from_interpreter_flags')
518
519    def test_match_test(self):
520        class Test:
521            def __init__(self, test_id):
522                self.test_id = test_id
523
524            def id(self):
525                return self.test_id
526
527        test_access = Test('test.test_os.FileTests.test_access')
528        test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir')
529
530        # Test acceptance
531        with support.swap_attr(support, '_match_test_func', None):
532            # match all
533            support.set_match_tests([])
534            self.assertTrue(support.match_test(test_access))
535            self.assertTrue(support.match_test(test_chdir))
536
537            # match all using None
538            support.set_match_tests(None, None)
539            self.assertTrue(support.match_test(test_access))
540            self.assertTrue(support.match_test(test_chdir))
541
542            # match the full test identifier
543            support.set_match_tests([test_access.id()], None)
544            self.assertTrue(support.match_test(test_access))
545            self.assertFalse(support.match_test(test_chdir))
546
547            # match the module name
548            support.set_match_tests(['test_os'], None)
549            self.assertTrue(support.match_test(test_access))
550            self.assertTrue(support.match_test(test_chdir))
551
552            # Test '*' pattern
553            support.set_match_tests(['test_*'], None)
554            self.assertTrue(support.match_test(test_access))
555            self.assertTrue(support.match_test(test_chdir))
556
557            # Test case sensitivity
558            support.set_match_tests(['filetests'], None)
559            self.assertFalse(support.match_test(test_access))
560            support.set_match_tests(['FileTests'], None)
561            self.assertTrue(support.match_test(test_access))
562
563            # Test pattern containing '.' and a '*' metacharacter
564            support.set_match_tests(['*test_os.*.test_*'], None)
565            self.assertTrue(support.match_test(test_access))
566            self.assertTrue(support.match_test(test_chdir))
567
568            # Multiple patterns
569            support.set_match_tests([test_access.id(), test_chdir.id()], None)
570            self.assertTrue(support.match_test(test_access))
571            self.assertTrue(support.match_test(test_chdir))
572
573            support.set_match_tests(['test_access', 'DONTMATCH'], None)
574            self.assertTrue(support.match_test(test_access))
575            self.assertFalse(support.match_test(test_chdir))
576
577        # Test rejection
578        with support.swap_attr(support, '_match_test_func', None):
579            # match all
580            support.set_match_tests(ignore_patterns=[])
581            self.assertTrue(support.match_test(test_access))
582            self.assertTrue(support.match_test(test_chdir))
583
584            # match all using None
585            support.set_match_tests(None, None)
586            self.assertTrue(support.match_test(test_access))
587            self.assertTrue(support.match_test(test_chdir))
588
589            # match the full test identifier
590            support.set_match_tests(None, [test_access.id()])
591            self.assertFalse(support.match_test(test_access))
592            self.assertTrue(support.match_test(test_chdir))
593
594            # match the module name
595            support.set_match_tests(None, ['test_os'])
596            self.assertFalse(support.match_test(test_access))
597            self.assertFalse(support.match_test(test_chdir))
598
599            # Test '*' pattern
600            support.set_match_tests(None, ['test_*'])
601            self.assertFalse(support.match_test(test_access))
602            self.assertFalse(support.match_test(test_chdir))
603
604            # Test case sensitivity
605            support.set_match_tests(None, ['filetests'])
606            self.assertTrue(support.match_test(test_access))
607            support.set_match_tests(None, ['FileTests'])
608            self.assertFalse(support.match_test(test_access))
609
610            # Test pattern containing '.' and a '*' metacharacter
611            support.set_match_tests(None, ['*test_os.*.test_*'])
612            self.assertFalse(support.match_test(test_access))
613            self.assertFalse(support.match_test(test_chdir))
614
615            # Multiple patterns
616            support.set_match_tests(None, [test_access.id(), test_chdir.id()])
617            self.assertFalse(support.match_test(test_access))
618            self.assertFalse(support.match_test(test_chdir))
619
620            support.set_match_tests(None, ['test_access', 'DONTMATCH'])
621            self.assertFalse(support.match_test(test_access))
622            self.assertTrue(support.match_test(test_chdir))
623
624    def test_fd_count(self):
625        # We cannot test the absolute value of fd_count(): on old Linux
626        # kernel or glibc versions, os.urandom() keeps a FD open on
627        # /dev/urandom device and Python has 4 FD opens instead of 3.
628        start = support.fd_count()
629        fd = os.open(__file__, os.O_RDONLY)
630        try:
631            more = support.fd_count()
632        finally:
633            os.close(fd)
634        self.assertEqual(more - start, 1)
635
636    def check_print_warning(self, msg, expected):
637        stderr = io.StringIO()
638
639        old_stderr = sys.__stderr__
640        try:
641            sys.__stderr__ = stderr
642            support.print_warning(msg)
643        finally:
644            sys.__stderr__ = old_stderr
645
646        self.assertEqual(stderr.getvalue(), expected)
647
648    def test_print_warning(self):
649        self.check_print_warning("msg",
650                                 "Warning -- msg\n")
651        self.check_print_warning("a\nb",
652                                 'Warning -- a\nWarning -- b\n')
653
654    # XXX -follows a list of untested API
655    # make_legacy_pyc
656    # is_resource_enabled
657    # requires
658    # fcmp
659    # umaks
660    # findfile
661    # check_warnings
662    # EnvironmentVarGuard
663    # TransientResource
664    # transient_internet
665    # run_with_locale
666    # set_memlimit
667    # bigmemtest
668    # precisionbigmemtest
669    # bigaddrspacetest
670    # requires_resource
671    # run_doctest
672    # threading_cleanup
673    # reap_threads
674    # can_symlink
675    # skip_unless_symlink
676    # SuppressCrashReport
677
678
679def test_main():
680    tests = [TestSupport]
681    support.run_unittest(*tests)
682
683if __name__ == '__main__':
684    test_main()
685