1import errno 2import importlib 3import io 4import os 5import shutil 6import socket 7import stat 8import subprocess 9import sys 10import tempfile 11import textwrap 12import time 13import unittest 14from test import support 15from test.support import script_helper 16from test.support import socket_helper 17 18TESTFN = support.TESTFN 19 20 21class TestSupport(unittest.TestCase): 22 23 def test_import_module(self): 24 support.import_module("ftplib") 25 self.assertRaises(unittest.SkipTest, support.import_module, "foo") 26 27 def test_import_fresh_module(self): 28 support.import_fresh_module("ftplib") 29 30 def test_get_attribute(self): 31 self.assertEqual(support.get_attribute(self, "test_get_attribute"), 32 self.test_get_attribute) 33 self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo") 34 35 @unittest.skip("failing buildbots") 36 def test_get_original_stdout(self): 37 self.assertEqual(support.get_original_stdout(), sys.stdout) 38 39 def test_unload(self): 40 import sched 41 self.assertIn("sched", sys.modules) 42 support.unload("sched") 43 self.assertNotIn("sched", sys.modules) 44 45 def test_unlink(self): 46 with open(TESTFN, "w") as f: 47 pass 48 support.unlink(TESTFN) 49 self.assertFalse(os.path.exists(TESTFN)) 50 support.unlink(TESTFN) 51 52 def test_rmtree(self): 53 dirpath = support.TESTFN + 'd' 54 subdirpath = os.path.join(dirpath, 'subdir') 55 os.mkdir(dirpath) 56 os.mkdir(subdirpath) 57 support.rmtree(dirpath) 58 self.assertFalse(os.path.exists(dirpath)) 59 with support.swap_attr(support, 'verbose', 0): 60 support.rmtree(dirpath) 61 62 os.mkdir(dirpath) 63 os.mkdir(subdirpath) 64 os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR) 65 with support.swap_attr(support, 'verbose', 0): 66 support.rmtree(dirpath) 67 self.assertFalse(os.path.exists(dirpath)) 68 69 os.mkdir(dirpath) 70 os.mkdir(subdirpath) 71 os.chmod(dirpath, 0) 72 with support.swap_attr(support, 'verbose', 0): 73 support.rmtree(dirpath) 74 self.assertFalse(os.path.exists(dirpath)) 75 76 def test_forget(self): 77 mod_filename = TESTFN + '.py' 78 with open(mod_filename, 'w') as f: 79 print('foo = 1', file=f) 80 sys.path.insert(0, os.curdir) 81 importlib.invalidate_caches() 82 try: 83 mod = __import__(TESTFN) 84 self.assertIn(TESTFN, sys.modules) 85 86 support.forget(TESTFN) 87 self.assertNotIn(TESTFN, sys.modules) 88 finally: 89 del sys.path[0] 90 support.unlink(mod_filename) 91 support.rmtree('__pycache__') 92 93 def test_HOST(self): 94 s = socket.create_server((socket_helper.HOST, 0)) 95 s.close() 96 97 def test_find_unused_port(self): 98 port = socket_helper.find_unused_port() 99 s = socket.create_server((socket_helper.HOST, port)) 100 s.close() 101 102 def test_bind_port(self): 103 s = socket.socket() 104 socket_helper.bind_port(s) 105 s.listen() 106 s.close() 107 108 # Tests for temp_dir() 109 110 def test_temp_dir(self): 111 """Test that temp_dir() creates and destroys its directory.""" 112 parent_dir = tempfile.mkdtemp() 113 parent_dir = os.path.realpath(parent_dir) 114 115 try: 116 path = os.path.join(parent_dir, 'temp') 117 self.assertFalse(os.path.isdir(path)) 118 with support.temp_dir(path) as temp_path: 119 self.assertEqual(temp_path, path) 120 self.assertTrue(os.path.isdir(path)) 121 self.assertFalse(os.path.isdir(path)) 122 finally: 123 support.rmtree(parent_dir) 124 125 def test_temp_dir__path_none(self): 126 """Test passing no path.""" 127 with support.temp_dir() as temp_path: 128 self.assertTrue(os.path.isdir(temp_path)) 129 self.assertFalse(os.path.isdir(temp_path)) 130 131 def test_temp_dir__existing_dir__quiet_default(self): 132 """Test passing a directory that already exists.""" 133 def call_temp_dir(path): 134 with support.temp_dir(path) as temp_path: 135 raise Exception("should not get here") 136 137 path = tempfile.mkdtemp() 138 path = os.path.realpath(path) 139 try: 140 self.assertTrue(os.path.isdir(path)) 141 self.assertRaises(FileExistsError, call_temp_dir, path) 142 # Make sure temp_dir did not delete the original directory. 143 self.assertTrue(os.path.isdir(path)) 144 finally: 145 shutil.rmtree(path) 146 147 def test_temp_dir__existing_dir__quiet_true(self): 148 """Test passing a directory that already exists with quiet=True.""" 149 path = tempfile.mkdtemp() 150 path = os.path.realpath(path) 151 152 try: 153 with support.check_warnings() as recorder: 154 with support.temp_dir(path, quiet=True) as temp_path: 155 self.assertEqual(path, temp_path) 156 warnings = [str(w.message) for w in recorder.warnings] 157 # Make sure temp_dir did not delete the original directory. 158 self.assertTrue(os.path.isdir(path)) 159 finally: 160 shutil.rmtree(path) 161 162 self.assertEqual(len(warnings), 1, warnings) 163 warn = warnings[0] 164 self.assertTrue(warn.startswith(f'tests may fail, unable to create ' 165 f'temporary directory {path!r}: '), 166 warn) 167 168 @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork") 169 def test_temp_dir__forked_child(self): 170 """Test that a forked child process does not remove the directory.""" 171 # See bpo-30028 for details. 172 # Run the test as an external script, because it uses fork. 173 script_helper.assert_python_ok("-c", textwrap.dedent(""" 174 import os 175 from test import support 176 with support.temp_cwd() as temp_path: 177 pid = os.fork() 178 if pid != 0: 179 # parent process 180 181 # wait for the child to terminate 182 support.wait_process(pid, exitcode=0) 183 184 # Make sure that temp_path is still present. When the child 185 # process leaves the 'temp_cwd'-context, the __exit__()- 186 # method of the context must not remove the temporary 187 # directory. 188 if not os.path.isdir(temp_path): 189 raise AssertionError("Child removed temp_path.") 190 """)) 191 192 # Tests for change_cwd() 193 194 def test_change_cwd(self): 195 original_cwd = os.getcwd() 196 197 with support.temp_dir() as temp_path: 198 with support.change_cwd(temp_path) as new_cwd: 199 self.assertEqual(new_cwd, temp_path) 200 self.assertEqual(os.getcwd(), new_cwd) 201 202 self.assertEqual(os.getcwd(), original_cwd) 203 204 def test_change_cwd__non_existent_dir(self): 205 """Test passing a non-existent directory.""" 206 original_cwd = os.getcwd() 207 208 def call_change_cwd(path): 209 with support.change_cwd(path) as new_cwd: 210 raise Exception("should not get here") 211 212 with support.temp_dir() as parent_dir: 213 non_existent_dir = os.path.join(parent_dir, 'does_not_exist') 214 self.assertRaises(FileNotFoundError, call_change_cwd, 215 non_existent_dir) 216 217 self.assertEqual(os.getcwd(), original_cwd) 218 219 def test_change_cwd__non_existent_dir__quiet_true(self): 220 """Test passing a non-existent directory with quiet=True.""" 221 original_cwd = os.getcwd() 222 223 with support.temp_dir() as parent_dir: 224 bad_dir = os.path.join(parent_dir, 'does_not_exist') 225 with support.check_warnings() as recorder: 226 with support.change_cwd(bad_dir, quiet=True) as new_cwd: 227 self.assertEqual(new_cwd, original_cwd) 228 self.assertEqual(os.getcwd(), new_cwd) 229 warnings = [str(w.message) for w in recorder.warnings] 230 231 self.assertEqual(len(warnings), 1, warnings) 232 warn = warnings[0] 233 self.assertTrue(warn.startswith(f'tests may fail, unable to change ' 234 f'the current working directory ' 235 f'to {bad_dir!r}: '), 236 warn) 237 238 # Tests for change_cwd() 239 240 def test_change_cwd__chdir_warning(self): 241 """Check the warning message when os.chdir() fails.""" 242 path = TESTFN + '_does_not_exist' 243 with support.check_warnings() as recorder: 244 with support.change_cwd(path=path, quiet=True): 245 pass 246 messages = [str(w.message) for w in recorder.warnings] 247 248 self.assertEqual(len(messages), 1, messages) 249 msg = messages[0] 250 self.assertTrue(msg.startswith(f'tests may fail, unable to change ' 251 f'the current working directory ' 252 f'to {path!r}: '), 253 msg) 254 255 # Tests for temp_cwd() 256 257 def test_temp_cwd(self): 258 here = os.getcwd() 259 with support.temp_cwd(name=TESTFN): 260 self.assertEqual(os.path.basename(os.getcwd()), TESTFN) 261 self.assertFalse(os.path.exists(TESTFN)) 262 self.assertEqual(os.getcwd(), here) 263 264 265 def test_temp_cwd__name_none(self): 266 """Test passing None to temp_cwd().""" 267 original_cwd = os.getcwd() 268 with support.temp_cwd(name=None) as new_cwd: 269 self.assertNotEqual(new_cwd, original_cwd) 270 self.assertTrue(os.path.isdir(new_cwd)) 271 self.assertEqual(os.getcwd(), new_cwd) 272 self.assertEqual(os.getcwd(), original_cwd) 273 274 def test_sortdict(self): 275 self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}") 276 277 def test_make_bad_fd(self): 278 fd = support.make_bad_fd() 279 with self.assertRaises(OSError) as cm: 280 os.write(fd, b"foo") 281 self.assertEqual(cm.exception.errno, errno.EBADF) 282 283 def test_check_syntax_error(self): 284 support.check_syntax_error(self, "def class", lineno=1, offset=5) 285 with self.assertRaises(AssertionError): 286 support.check_syntax_error(self, "x=1") 287 288 def test_CleanImport(self): 289 import importlib 290 with support.CleanImport("asyncore"): 291 importlib.import_module("asyncore") 292 293 def test_DirsOnSysPath(self): 294 with support.DirsOnSysPath('foo', 'bar'): 295 self.assertIn("foo", sys.path) 296 self.assertIn("bar", sys.path) 297 self.assertNotIn("foo", sys.path) 298 self.assertNotIn("bar", sys.path) 299 300 def test_captured_stdout(self): 301 with support.captured_stdout() as stdout: 302 print("hello") 303 self.assertEqual(stdout.getvalue(), "hello\n") 304 305 def test_captured_stderr(self): 306 with support.captured_stderr() as stderr: 307 print("hello", file=sys.stderr) 308 self.assertEqual(stderr.getvalue(), "hello\n") 309 310 def test_captured_stdin(self): 311 with support.captured_stdin() as stdin: 312 stdin.write('hello\n') 313 stdin.seek(0) 314 # call test code that consumes from sys.stdin 315 captured = input() 316 self.assertEqual(captured, "hello") 317 318 def test_gc_collect(self): 319 support.gc_collect() 320 321 def test_python_is_optimized(self): 322 self.assertIsInstance(support.python_is_optimized(), bool) 323 324 def test_swap_attr(self): 325 class Obj: 326 pass 327 obj = Obj() 328 obj.x = 1 329 with support.swap_attr(obj, "x", 5) as x: 330 self.assertEqual(obj.x, 5) 331 self.assertEqual(x, 1) 332 self.assertEqual(obj.x, 1) 333 with support.swap_attr(obj, "y", 5) as y: 334 self.assertEqual(obj.y, 5) 335 self.assertIsNone(y) 336 self.assertFalse(hasattr(obj, 'y')) 337 with support.swap_attr(obj, "y", 5): 338 del obj.y 339 self.assertFalse(hasattr(obj, 'y')) 340 341 def test_swap_item(self): 342 D = {"x":1} 343 with support.swap_item(D, "x", 5) as x: 344 self.assertEqual(D["x"], 5) 345 self.assertEqual(x, 1) 346 self.assertEqual(D["x"], 1) 347 with support.swap_item(D, "y", 5) as y: 348 self.assertEqual(D["y"], 5) 349 self.assertIsNone(y) 350 self.assertNotIn("y", D) 351 with support.swap_item(D, "y", 5): 352 del D["y"] 353 self.assertNotIn("y", D) 354 355 class RefClass: 356 attribute1 = None 357 attribute2 = None 358 _hidden_attribute1 = None 359 __magic_1__ = None 360 361 class OtherClass: 362 attribute2 = None 363 attribute3 = None 364 __magic_1__ = None 365 __magic_2__ = None 366 367 def test_detect_api_mismatch(self): 368 missing_items = support.detect_api_mismatch(self.RefClass, 369 self.OtherClass) 370 self.assertEqual({'attribute1'}, missing_items) 371 372 missing_items = support.detect_api_mismatch(self.OtherClass, 373 self.RefClass) 374 self.assertEqual({'attribute3', '__magic_2__'}, missing_items) 375 376 def test_detect_api_mismatch__ignore(self): 377 ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either'] 378 379 missing_items = support.detect_api_mismatch( 380 self.RefClass, self.OtherClass, ignore=ignore) 381 self.assertEqual(set(), missing_items) 382 383 missing_items = support.detect_api_mismatch( 384 self.OtherClass, self.RefClass, ignore=ignore) 385 self.assertEqual(set(), missing_items) 386 387 def test_check__all__(self): 388 extra = {'tempdir'} 389 blacklist = {'template'} 390 support.check__all__(self, 391 tempfile, 392 extra=extra, 393 blacklist=blacklist) 394 395 extra = {'TextTestResult', 'installHandler'} 396 blacklist = {'load_tests', "TestProgram", "BaseTestSuite"} 397 398 support.check__all__(self, 399 unittest, 400 ("unittest.result", "unittest.case", 401 "unittest.suite", "unittest.loader", 402 "unittest.main", "unittest.runner", 403 "unittest.signals", "unittest.async_case"), 404 extra=extra, 405 blacklist=blacklist) 406 407 self.assertRaises(AssertionError, support.check__all__, self, unittest) 408 409 @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'), 410 'need os.waitpid() and os.WNOHANG') 411 def test_reap_children(self): 412 # Make sure that there is no other pending child process 413 support.reap_children() 414 415 # Create a child process 416 pid = os.fork() 417 if pid == 0: 418 # child process: do nothing, just exit 419 os._exit(0) 420 421 t0 = time.monotonic() 422 deadline = time.monotonic() + support.SHORT_TIMEOUT 423 424 was_altered = support.environment_altered 425 try: 426 support.environment_altered = False 427 stderr = io.StringIO() 428 429 while True: 430 if time.monotonic() > deadline: 431 self.fail("timeout") 432 433 old_stderr = sys.__stderr__ 434 try: 435 sys.__stderr__ = stderr 436 support.reap_children() 437 finally: 438 sys.__stderr__ = old_stderr 439 440 # Use environment_altered to check if reap_children() found 441 # the child process 442 if support.environment_altered: 443 break 444 445 # loop until the child process completed 446 time.sleep(0.100) 447 448 msg = "Warning -- reap_children() reaped child process %s" % pid 449 self.assertIn(msg, stderr.getvalue()) 450 self.assertTrue(support.environment_altered) 451 finally: 452 support.environment_altered = was_altered 453 454 # Just in case, check again that there is no other 455 # pending child process 456 support.reap_children() 457 458 def check_options(self, args, func, expected=None): 459 code = f'from test.support import {func}; print(repr({func}()))' 460 cmd = [sys.executable, *args, '-c', code] 461 env = {key: value for key, value in os.environ.items() 462 if not key.startswith('PYTHON')} 463 proc = subprocess.run(cmd, 464 stdout=subprocess.PIPE, 465 stderr=subprocess.DEVNULL, 466 universal_newlines=True, 467 env=env) 468 if expected is None: 469 expected = args 470 self.assertEqual(proc.stdout.rstrip(), repr(expected)) 471 self.assertEqual(proc.returncode, 0) 472 473 def test_args_from_interpreter_flags(self): 474 # Test test.support.args_from_interpreter_flags() 475 for opts in ( 476 # no option 477 [], 478 # single option 479 ['-B'], 480 ['-s'], 481 ['-S'], 482 ['-E'], 483 ['-v'], 484 ['-b'], 485 ['-q'], 486 ['-I'], 487 # same option multiple times 488 ['-bb'], 489 ['-vvv'], 490 # -W options 491 ['-Wignore'], 492 # -X options 493 ['-X', 'dev'], 494 ['-Wignore', '-X', 'dev'], 495 ['-X', 'faulthandler'], 496 ['-X', 'importtime'], 497 ['-X', 'showrefcount'], 498 ['-X', 'tracemalloc'], 499 ['-X', 'tracemalloc=3'], 500 ): 501 with self.subTest(opts=opts): 502 self.check_options(opts, 'args_from_interpreter_flags') 503 504 self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags', 505 ['-I']) 506 507 def test_optim_args_from_interpreter_flags(self): 508 # Test test.support.optim_args_from_interpreter_flags() 509 for opts in ( 510 # no option 511 [], 512 ['-O'], 513 ['-OO'], 514 ['-OOOO'], 515 ): 516 with self.subTest(opts=opts): 517 self.check_options(opts, 'optim_args_from_interpreter_flags') 518 519 def test_match_test(self): 520 class Test: 521 def __init__(self, test_id): 522 self.test_id = test_id 523 524 def id(self): 525 return self.test_id 526 527 test_access = Test('test.test_os.FileTests.test_access') 528 test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir') 529 530 # Test acceptance 531 with support.swap_attr(support, '_match_test_func', None): 532 # match all 533 support.set_match_tests([]) 534 self.assertTrue(support.match_test(test_access)) 535 self.assertTrue(support.match_test(test_chdir)) 536 537 # match all using None 538 support.set_match_tests(None, None) 539 self.assertTrue(support.match_test(test_access)) 540 self.assertTrue(support.match_test(test_chdir)) 541 542 # match the full test identifier 543 support.set_match_tests([test_access.id()], None) 544 self.assertTrue(support.match_test(test_access)) 545 self.assertFalse(support.match_test(test_chdir)) 546 547 # match the module name 548 support.set_match_tests(['test_os'], None) 549 self.assertTrue(support.match_test(test_access)) 550 self.assertTrue(support.match_test(test_chdir)) 551 552 # Test '*' pattern 553 support.set_match_tests(['test_*'], None) 554 self.assertTrue(support.match_test(test_access)) 555 self.assertTrue(support.match_test(test_chdir)) 556 557 # Test case sensitivity 558 support.set_match_tests(['filetests'], None) 559 self.assertFalse(support.match_test(test_access)) 560 support.set_match_tests(['FileTests'], None) 561 self.assertTrue(support.match_test(test_access)) 562 563 # Test pattern containing '.' and a '*' metacharacter 564 support.set_match_tests(['*test_os.*.test_*'], None) 565 self.assertTrue(support.match_test(test_access)) 566 self.assertTrue(support.match_test(test_chdir)) 567 568 # Multiple patterns 569 support.set_match_tests([test_access.id(), test_chdir.id()], None) 570 self.assertTrue(support.match_test(test_access)) 571 self.assertTrue(support.match_test(test_chdir)) 572 573 support.set_match_tests(['test_access', 'DONTMATCH'], None) 574 self.assertTrue(support.match_test(test_access)) 575 self.assertFalse(support.match_test(test_chdir)) 576 577 # Test rejection 578 with support.swap_attr(support, '_match_test_func', None): 579 # match all 580 support.set_match_tests(ignore_patterns=[]) 581 self.assertTrue(support.match_test(test_access)) 582 self.assertTrue(support.match_test(test_chdir)) 583 584 # match all using None 585 support.set_match_tests(None, None) 586 self.assertTrue(support.match_test(test_access)) 587 self.assertTrue(support.match_test(test_chdir)) 588 589 # match the full test identifier 590 support.set_match_tests(None, [test_access.id()]) 591 self.assertFalse(support.match_test(test_access)) 592 self.assertTrue(support.match_test(test_chdir)) 593 594 # match the module name 595 support.set_match_tests(None, ['test_os']) 596 self.assertFalse(support.match_test(test_access)) 597 self.assertFalse(support.match_test(test_chdir)) 598 599 # Test '*' pattern 600 support.set_match_tests(None, ['test_*']) 601 self.assertFalse(support.match_test(test_access)) 602 self.assertFalse(support.match_test(test_chdir)) 603 604 # Test case sensitivity 605 support.set_match_tests(None, ['filetests']) 606 self.assertTrue(support.match_test(test_access)) 607 support.set_match_tests(None, ['FileTests']) 608 self.assertFalse(support.match_test(test_access)) 609 610 # Test pattern containing '.' and a '*' metacharacter 611 support.set_match_tests(None, ['*test_os.*.test_*']) 612 self.assertFalse(support.match_test(test_access)) 613 self.assertFalse(support.match_test(test_chdir)) 614 615 # Multiple patterns 616 support.set_match_tests(None, [test_access.id(), test_chdir.id()]) 617 self.assertFalse(support.match_test(test_access)) 618 self.assertFalse(support.match_test(test_chdir)) 619 620 support.set_match_tests(None, ['test_access', 'DONTMATCH']) 621 self.assertFalse(support.match_test(test_access)) 622 self.assertTrue(support.match_test(test_chdir)) 623 624 def test_fd_count(self): 625 # We cannot test the absolute value of fd_count(): on old Linux 626 # kernel or glibc versions, os.urandom() keeps a FD open on 627 # /dev/urandom device and Python has 4 FD opens instead of 3. 628 start = support.fd_count() 629 fd = os.open(__file__, os.O_RDONLY) 630 try: 631 more = support.fd_count() 632 finally: 633 os.close(fd) 634 self.assertEqual(more - start, 1) 635 636 def check_print_warning(self, msg, expected): 637 stderr = io.StringIO() 638 639 old_stderr = sys.__stderr__ 640 try: 641 sys.__stderr__ = stderr 642 support.print_warning(msg) 643 finally: 644 sys.__stderr__ = old_stderr 645 646 self.assertEqual(stderr.getvalue(), expected) 647 648 def test_print_warning(self): 649 self.check_print_warning("msg", 650 "Warning -- msg\n") 651 self.check_print_warning("a\nb", 652 'Warning -- a\nWarning -- b\n') 653 654 # XXX -follows a list of untested API 655 # make_legacy_pyc 656 # is_resource_enabled 657 # requires 658 # fcmp 659 # umaks 660 # findfile 661 # check_warnings 662 # EnvironmentVarGuard 663 # TransientResource 664 # transient_internet 665 # run_with_locale 666 # set_memlimit 667 # bigmemtest 668 # precisionbigmemtest 669 # bigaddrspacetest 670 # requires_resource 671 # run_doctest 672 # threading_cleanup 673 # reap_threads 674 # can_symlink 675 # skip_unless_symlink 676 # SuppressCrashReport 677 678 679def test_main(): 680 tests = [TestSupport] 681 support.run_unittest(*tests) 682 683if __name__ == '__main__': 684 test_main() 685