1""" 2Tests for the threading module. 3""" 4 5import test.support 6from test.support import verbose, import_module, cpython_only 7from test.support.script_helper import assert_python_ok, assert_python_failure 8 9import random 10import sys 11import _thread 12import threading 13import time 14import unittest 15import weakref 16import os 17import subprocess 18import signal 19import textwrap 20 21from test import lock_tests 22from test import support 23 24 25# Between fork() and exec(), only async-safe functions are allowed (issues 26# #12316 and #11870), and fork() from a worker thread is known to trigger 27# problems with some operating systems (issue #3863): skip problematic tests 28# on platforms known to behave badly. 29platforms_to_skip = ('netbsd5', 'hp-ux11') 30 31 32# A trivial mutable counter. 33class Counter(object): 34 def __init__(self): 35 self.value = 0 36 def inc(self): 37 self.value += 1 38 def dec(self): 39 self.value -= 1 40 def get(self): 41 return self.value 42 43class TestThread(threading.Thread): 44 def __init__(self, name, testcase, sema, mutex, nrunning): 45 threading.Thread.__init__(self, name=name) 46 self.testcase = testcase 47 self.sema = sema 48 self.mutex = mutex 49 self.nrunning = nrunning 50 51 def run(self): 52 delay = random.random() / 10000.0 53 if verbose: 54 print('task %s will run for %.1f usec' % 55 (self.name, delay * 1e6)) 56 57 with self.sema: 58 with self.mutex: 59 self.nrunning.inc() 60 if verbose: 61 print(self.nrunning.get(), 'tasks are running') 62 self.testcase.assertLessEqual(self.nrunning.get(), 3) 63 64 time.sleep(delay) 65 if verbose: 66 print('task', self.name, 'done') 67 68 with self.mutex: 69 self.nrunning.dec() 70 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 71 if verbose: 72 print('%s is finished. %d tasks are running' % 73 (self.name, self.nrunning.get())) 74 75 76class BaseTestCase(unittest.TestCase): 77 def setUp(self): 78 self._threads = test.support.threading_setup() 79 80 def tearDown(self): 81 test.support.threading_cleanup(*self._threads) 82 test.support.reap_children() 83 84 85class ThreadTests(BaseTestCase): 86 87 # Create a bunch of threads, let each do some work, wait until all are 88 # done. 89 def test_various_ops(self): 90 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 91 # times about 1 second per clump). 92 NUMTASKS = 10 93 94 # no more than 3 of the 10 can run at once 95 sema = threading.BoundedSemaphore(value=3) 96 mutex = threading.RLock() 97 numrunning = Counter() 98 99 threads = [] 100 101 for i in range(NUMTASKS): 102 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 103 threads.append(t) 104 self.assertIsNone(t.ident) 105 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') 106 t.start() 107 108 if hasattr(threading, 'get_native_id'): 109 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()} 110 self.assertNotIn(None, native_ids) 111 self.assertEqual(len(native_ids), NUMTASKS + 1) 112 113 if verbose: 114 print('waiting for all tasks to complete') 115 for t in threads: 116 t.join() 117 self.assertFalse(t.is_alive()) 118 self.assertNotEqual(t.ident, 0) 119 self.assertIsNotNone(t.ident) 120 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') 121 if verbose: 122 print('all tasks done') 123 self.assertEqual(numrunning.get(), 0) 124 125 def test_ident_of_no_threading_threads(self): 126 # The ident still must work for the main thread and dummy threads. 127 self.assertIsNotNone(threading.currentThread().ident) 128 def f(): 129 ident.append(threading.currentThread().ident) 130 done.set() 131 done = threading.Event() 132 ident = [] 133 with support.wait_threads_exit(): 134 tid = _thread.start_new_thread(f, ()) 135 done.wait() 136 self.assertEqual(ident[0], tid) 137 # Kill the "immortal" _DummyThread 138 del threading._active[ident[0]] 139 140 # run with a small(ish) thread stack size (256 KiB) 141 def test_various_ops_small_stack(self): 142 if verbose: 143 print('with 256 KiB thread stack size...') 144 try: 145 threading.stack_size(262144) 146 except _thread.error: 147 raise unittest.SkipTest( 148 'platform does not support changing thread stack size') 149 self.test_various_ops() 150 threading.stack_size(0) 151 152 # run with a large thread stack size (1 MiB) 153 def test_various_ops_large_stack(self): 154 if verbose: 155 print('with 1 MiB thread stack size...') 156 try: 157 threading.stack_size(0x100000) 158 except _thread.error: 159 raise unittest.SkipTest( 160 'platform does not support changing thread stack size') 161 self.test_various_ops() 162 threading.stack_size(0) 163 164 def test_foreign_thread(self): 165 # Check that a "foreign" thread can use the threading module. 166 def f(mutex): 167 # Calling current_thread() forces an entry for the foreign 168 # thread to get made in the threading._active map. 169 threading.current_thread() 170 mutex.release() 171 172 mutex = threading.Lock() 173 mutex.acquire() 174 with support.wait_threads_exit(): 175 tid = _thread.start_new_thread(f, (mutex,)) 176 # Wait for the thread to finish. 177 mutex.acquire() 178 self.assertIn(tid, threading._active) 179 self.assertIsInstance(threading._active[tid], threading._DummyThread) 180 #Issue 29376 181 self.assertTrue(threading._active[tid].is_alive()) 182 self.assertRegex(repr(threading._active[tid]), '_DummyThread') 183 del threading._active[tid] 184 185 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 186 # exposed at the Python level. This test relies on ctypes to get at it. 187 def test_PyThreadState_SetAsyncExc(self): 188 ctypes = import_module("ctypes") 189 190 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 191 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object) 192 193 class AsyncExc(Exception): 194 pass 195 196 exception = ctypes.py_object(AsyncExc) 197 198 # First check it works when setting the exception from the same thread. 199 tid = threading.get_ident() 200 self.assertIsInstance(tid, int) 201 self.assertGreater(tid, 0) 202 203 try: 204 result = set_async_exc(tid, exception) 205 # The exception is async, so we might have to keep the VM busy until 206 # it notices. 207 while True: 208 pass 209 except AsyncExc: 210 pass 211 else: 212 # This code is unreachable but it reflects the intent. If we wanted 213 # to be smarter the above loop wouldn't be infinite. 214 self.fail("AsyncExc not raised") 215 try: 216 self.assertEqual(result, 1) # one thread state modified 217 except UnboundLocalError: 218 # The exception was raised too quickly for us to get the result. 219 pass 220 221 # `worker_started` is set by the thread when it's inside a try/except 222 # block waiting to catch the asynchronously set AsyncExc exception. 223 # `worker_saw_exception` is set by the thread upon catching that 224 # exception. 225 worker_started = threading.Event() 226 worker_saw_exception = threading.Event() 227 228 class Worker(threading.Thread): 229 def run(self): 230 self.id = threading.get_ident() 231 self.finished = False 232 233 try: 234 while True: 235 worker_started.set() 236 time.sleep(0.1) 237 except AsyncExc: 238 self.finished = True 239 worker_saw_exception.set() 240 241 t = Worker() 242 t.daemon = True # so if this fails, we don't hang Python at shutdown 243 t.start() 244 if verbose: 245 print(" started worker thread") 246 247 # Try a thread id that doesn't make sense. 248 if verbose: 249 print(" trying nonsensical thread id") 250 result = set_async_exc(-1, exception) 251 self.assertEqual(result, 0) # no thread states modified 252 253 # Now raise an exception in the worker thread. 254 if verbose: 255 print(" waiting for worker thread to get started") 256 ret = worker_started.wait() 257 self.assertTrue(ret) 258 if verbose: 259 print(" verifying worker hasn't exited") 260 self.assertFalse(t.finished) 261 if verbose: 262 print(" attempting to raise asynch exception in worker") 263 result = set_async_exc(t.id, exception) 264 self.assertEqual(result, 1) # one thread state modified 265 if verbose: 266 print(" waiting for worker to say it caught the exception") 267 worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT) 268 self.assertTrue(t.finished) 269 if verbose: 270 print(" all OK -- joining worker") 271 if t.finished: 272 t.join() 273 # else the thread is still running, and we have no way to kill it 274 275 def test_limbo_cleanup(self): 276 # Issue 7481: Failure to start thread should cleanup the limbo map. 277 def fail_new_thread(*args): 278 raise threading.ThreadError() 279 _start_new_thread = threading._start_new_thread 280 threading._start_new_thread = fail_new_thread 281 try: 282 t = threading.Thread(target=lambda: None) 283 self.assertRaises(threading.ThreadError, t.start) 284 self.assertFalse( 285 t in threading._limbo, 286 "Failed to cleanup _limbo map on failure of Thread.start().") 287 finally: 288 threading._start_new_thread = _start_new_thread 289 290 def test_finalize_running_thread(self): 291 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 292 # very late on python exit: on deallocation of a running thread for 293 # example. 294 import_module("ctypes") 295 296 rc, out, err = assert_python_failure("-c", """if 1: 297 import ctypes, sys, time, _thread 298 299 # This lock is used as a simple event variable. 300 ready = _thread.allocate_lock() 301 ready.acquire() 302 303 # Module globals are cleared before __del__ is run 304 # So we save the functions in class dict 305 class C: 306 ensure = ctypes.pythonapi.PyGILState_Ensure 307 release = ctypes.pythonapi.PyGILState_Release 308 def __del__(self): 309 state = self.ensure() 310 self.release(state) 311 312 def waitingThread(): 313 x = C() 314 ready.release() 315 time.sleep(100) 316 317 _thread.start_new_thread(waitingThread, ()) 318 ready.acquire() # Be sure the other thread is waiting. 319 sys.exit(42) 320 """) 321 self.assertEqual(rc, 42) 322 323 def test_finalize_with_trace(self): 324 # Issue1733757 325 # Avoid a deadlock when sys.settrace steps into threading._shutdown 326 assert_python_ok("-c", """if 1: 327 import sys, threading 328 329 # A deadlock-killer, to prevent the 330 # testsuite to hang forever 331 def killer(): 332 import os, time 333 time.sleep(2) 334 print('program blocked; aborting') 335 os._exit(2) 336 t = threading.Thread(target=killer) 337 t.daemon = True 338 t.start() 339 340 # This is the trace function 341 def func(frame, event, arg): 342 threading.current_thread() 343 return func 344 345 sys.settrace(func) 346 """) 347 348 def test_join_nondaemon_on_shutdown(self): 349 # Issue 1722344 350 # Raising SystemExit skipped threading._shutdown 351 rc, out, err = assert_python_ok("-c", """if 1: 352 import threading 353 from time import sleep 354 355 def child(): 356 sleep(1) 357 # As a non-daemon thread we SHOULD wake up and nothing 358 # should be torn down yet 359 print("Woke up, sleep function is:", sleep) 360 361 threading.Thread(target=child).start() 362 raise SystemExit 363 """) 364 self.assertEqual(out.strip(), 365 b"Woke up, sleep function is: <built-in function sleep>") 366 self.assertEqual(err, b"") 367 368 def test_enumerate_after_join(self): 369 # Try hard to trigger #1703448: a thread is still returned in 370 # threading.enumerate() after it has been join()ed. 371 enum = threading.enumerate 372 old_interval = sys.getswitchinterval() 373 try: 374 for i in range(1, 100): 375 sys.setswitchinterval(i * 0.0002) 376 t = threading.Thread(target=lambda: None) 377 t.start() 378 t.join() 379 l = enum() 380 self.assertNotIn(t, l, 381 "#1703448 triggered after %d trials: %s" % (i, l)) 382 finally: 383 sys.setswitchinterval(old_interval) 384 385 def test_no_refcycle_through_target(self): 386 class RunSelfFunction(object): 387 def __init__(self, should_raise): 388 # The links in this refcycle from Thread back to self 389 # should be cleaned up when the thread completes. 390 self.should_raise = should_raise 391 self.thread = threading.Thread(target=self._run, 392 args=(self,), 393 kwargs={'yet_another':self}) 394 self.thread.start() 395 396 def _run(self, other_ref, yet_another): 397 if self.should_raise: 398 raise SystemExit 399 400 cyclic_object = RunSelfFunction(should_raise=False) 401 weak_cyclic_object = weakref.ref(cyclic_object) 402 cyclic_object.thread.join() 403 del cyclic_object 404 self.assertIsNone(weak_cyclic_object(), 405 msg=('%d references still around' % 406 sys.getrefcount(weak_cyclic_object()))) 407 408 raising_cyclic_object = RunSelfFunction(should_raise=True) 409 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 410 raising_cyclic_object.thread.join() 411 del raising_cyclic_object 412 self.assertIsNone(weak_raising_cyclic_object(), 413 msg=('%d references still around' % 414 sys.getrefcount(weak_raising_cyclic_object()))) 415 416 def test_old_threading_api(self): 417 # Just a quick sanity check to make sure the old method names are 418 # still present 419 t = threading.Thread() 420 t.isDaemon() 421 t.setDaemon(True) 422 t.getName() 423 t.setName("name") 424 e = threading.Event() 425 e.isSet() 426 threading.activeCount() 427 428 def test_repr_daemon(self): 429 t = threading.Thread() 430 self.assertNotIn('daemon', repr(t)) 431 t.daemon = True 432 self.assertIn('daemon', repr(t)) 433 434 def test_daemon_param(self): 435 t = threading.Thread() 436 self.assertFalse(t.daemon) 437 t = threading.Thread(daemon=False) 438 self.assertFalse(t.daemon) 439 t = threading.Thread(daemon=True) 440 self.assertTrue(t.daemon) 441 442 @unittest.skipUnless(hasattr(os, 'fork'), 'needs os.fork()') 443 def test_fork_at_exit(self): 444 # bpo-42350: Calling os.fork() after threading._shutdown() must 445 # not log an error. 446 code = textwrap.dedent(""" 447 import atexit 448 import os 449 import sys 450 from test.support import wait_process 451 452 # Import the threading module to register its "at fork" callback 453 import threading 454 455 def exit_handler(): 456 pid = os.fork() 457 if not pid: 458 print("child process ok", file=sys.stderr, flush=True) 459 # child process 460 sys.exit() 461 else: 462 wait_process(pid, exitcode=0) 463 464 # exit_handler() will be called after threading._shutdown() 465 atexit.register(exit_handler) 466 """) 467 _, out, err = assert_python_ok("-c", code) 468 self.assertEqual(out, b'') 469 self.assertEqual(err.rstrip(), b'child process ok') 470 471 @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') 472 def test_dummy_thread_after_fork(self): 473 # Issue #14308: a dummy thread in the active list doesn't mess up 474 # the after-fork mechanism. 475 code = """if 1: 476 import _thread, threading, os, time 477 478 def background_thread(evt): 479 # Creates and registers the _DummyThread instance 480 threading.current_thread() 481 evt.set() 482 time.sleep(10) 483 484 evt = threading.Event() 485 _thread.start_new_thread(background_thread, (evt,)) 486 evt.wait() 487 assert threading.active_count() == 2, threading.active_count() 488 if os.fork() == 0: 489 assert threading.active_count() == 1, threading.active_count() 490 os._exit(0) 491 else: 492 os.wait() 493 """ 494 _, out, err = assert_python_ok("-c", code) 495 self.assertEqual(out, b'') 496 self.assertEqual(err, b'') 497 498 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 499 def test_is_alive_after_fork(self): 500 # Try hard to trigger #18418: is_alive() could sometimes be True on 501 # threads that vanished after a fork. 502 old_interval = sys.getswitchinterval() 503 self.addCleanup(sys.setswitchinterval, old_interval) 504 505 # Make the bug more likely to manifest. 506 test.support.setswitchinterval(1e-6) 507 508 for i in range(20): 509 t = threading.Thread(target=lambda: None) 510 t.start() 511 pid = os.fork() 512 if pid == 0: 513 os._exit(11 if t.is_alive() else 10) 514 else: 515 t.join() 516 517 support.wait_process(pid, exitcode=10) 518 519 def test_main_thread(self): 520 main = threading.main_thread() 521 self.assertEqual(main.name, 'MainThread') 522 self.assertEqual(main.ident, threading.current_thread().ident) 523 self.assertEqual(main.ident, threading.get_ident()) 524 525 def f(): 526 self.assertNotEqual(threading.main_thread().ident, 527 threading.current_thread().ident) 528 th = threading.Thread(target=f) 529 th.start() 530 th.join() 531 532 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 533 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 534 def test_main_thread_after_fork(self): 535 code = """if 1: 536 import os, threading 537 from test import support 538 539 pid = os.fork() 540 if pid == 0: 541 main = threading.main_thread() 542 print(main.name) 543 print(main.ident == threading.current_thread().ident) 544 print(main.ident == threading.get_ident()) 545 else: 546 support.wait_process(pid, exitcode=0) 547 """ 548 _, out, err = assert_python_ok("-c", code) 549 data = out.decode().replace('\r', '') 550 self.assertEqual(err, b"") 551 self.assertEqual(data, "MainThread\nTrue\nTrue\n") 552 553 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 554 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 555 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 556 def test_main_thread_after_fork_from_nonmain_thread(self): 557 code = """if 1: 558 import os, threading, sys 559 from test import support 560 561 def f(): 562 pid = os.fork() 563 if pid == 0: 564 main = threading.main_thread() 565 print(main.name) 566 print(main.ident == threading.current_thread().ident) 567 print(main.ident == threading.get_ident()) 568 # stdout is fully buffered because not a tty, 569 # we have to flush before exit. 570 sys.stdout.flush() 571 else: 572 support.wait_process(pid, exitcode=0) 573 574 th = threading.Thread(target=f) 575 th.start() 576 th.join() 577 """ 578 _, out, err = assert_python_ok("-c", code) 579 data = out.decode().replace('\r', '') 580 self.assertEqual(err, b"") 581 self.assertEqual(data, "Thread-1\nTrue\nTrue\n") 582 583 def test_main_thread_during_shutdown(self): 584 # bpo-31516: current_thread() should still point to the main thread 585 # at shutdown 586 code = """if 1: 587 import gc, threading 588 589 main_thread = threading.current_thread() 590 assert main_thread is threading.main_thread() # sanity check 591 592 class RefCycle: 593 def __init__(self): 594 self.cycle = self 595 596 def __del__(self): 597 print("GC:", 598 threading.current_thread() is main_thread, 599 threading.main_thread() is main_thread, 600 threading.enumerate() == [main_thread]) 601 602 RefCycle() 603 gc.collect() # sanity check 604 x = RefCycle() 605 """ 606 _, out, err = assert_python_ok("-c", code) 607 data = out.decode() 608 self.assertEqual(err, b"") 609 self.assertEqual(data.splitlines(), 610 ["GC: True True True"] * 2) 611 612 def test_finalization_shutdown(self): 613 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait 614 # until Python thread states of all non-daemon threads get deleted. 615 # 616 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but 617 # test the finalization of the main interpreter. 618 code = """if 1: 619 import os 620 import threading 621 import time 622 import random 623 624 def random_sleep(): 625 seconds = random.random() * 0.010 626 time.sleep(seconds) 627 628 class Sleeper: 629 def __del__(self): 630 random_sleep() 631 632 tls = threading.local() 633 634 def f(): 635 # Sleep a bit so that the thread is still running when 636 # Py_Finalize() is called. 637 random_sleep() 638 tls.x = Sleeper() 639 random_sleep() 640 641 threading.Thread(target=f).start() 642 random_sleep() 643 """ 644 rc, out, err = assert_python_ok("-c", code) 645 self.assertEqual(err, b"") 646 647 def test_tstate_lock(self): 648 # Test an implementation detail of Thread objects. 649 started = _thread.allocate_lock() 650 finish = _thread.allocate_lock() 651 started.acquire() 652 finish.acquire() 653 def f(): 654 started.release() 655 finish.acquire() 656 time.sleep(0.01) 657 # The tstate lock is None until the thread is started 658 t = threading.Thread(target=f) 659 self.assertIs(t._tstate_lock, None) 660 t.start() 661 started.acquire() 662 self.assertTrue(t.is_alive()) 663 # The tstate lock can't be acquired when the thread is running 664 # (or suspended). 665 tstate_lock = t._tstate_lock 666 self.assertFalse(tstate_lock.acquire(timeout=0), False) 667 finish.release() 668 # When the thread ends, the state_lock can be successfully 669 # acquired. 670 self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False) 671 # But is_alive() is still True: we hold _tstate_lock now, which 672 # prevents is_alive() from knowing the thread's end-of-life C code 673 # is done. 674 self.assertTrue(t.is_alive()) 675 # Let is_alive() find out the C code is done. 676 tstate_lock.release() 677 self.assertFalse(t.is_alive()) 678 # And verify the thread disposed of _tstate_lock. 679 self.assertIsNone(t._tstate_lock) 680 t.join() 681 682 def test_repr_stopped(self): 683 # Verify that "stopped" shows up in repr(Thread) appropriately. 684 started = _thread.allocate_lock() 685 finish = _thread.allocate_lock() 686 started.acquire() 687 finish.acquire() 688 def f(): 689 started.release() 690 finish.acquire() 691 t = threading.Thread(target=f) 692 t.start() 693 started.acquire() 694 self.assertIn("started", repr(t)) 695 finish.release() 696 # "stopped" should appear in the repr in a reasonable amount of time. 697 # Implementation detail: as of this writing, that's trivially true 698 # if .join() is called, and almost trivially true if .is_alive() is 699 # called. The detail we're testing here is that "stopped" shows up 700 # "all on its own". 701 LOOKING_FOR = "stopped" 702 for i in range(500): 703 if LOOKING_FOR in repr(t): 704 break 705 time.sleep(0.01) 706 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds 707 t.join() 708 709 def test_BoundedSemaphore_limit(self): 710 # BoundedSemaphore should raise ValueError if released too often. 711 for limit in range(1, 10): 712 bs = threading.BoundedSemaphore(limit) 713 threads = [threading.Thread(target=bs.acquire) 714 for _ in range(limit)] 715 for t in threads: 716 t.start() 717 for t in threads: 718 t.join() 719 threads = [threading.Thread(target=bs.release) 720 for _ in range(limit)] 721 for t in threads: 722 t.start() 723 for t in threads: 724 t.join() 725 self.assertRaises(ValueError, bs.release) 726 727 @cpython_only 728 def test_frame_tstate_tracing(self): 729 # Issue #14432: Crash when a generator is created in a C thread that is 730 # destroyed while the generator is still used. The issue was that a 731 # generator contains a frame, and the frame kept a reference to the 732 # Python state of the destroyed C thread. The crash occurs when a trace 733 # function is setup. 734 735 def noop_trace(frame, event, arg): 736 # no operation 737 return noop_trace 738 739 def generator(): 740 while 1: 741 yield "generator" 742 743 def callback(): 744 if callback.gen is None: 745 callback.gen = generator() 746 return next(callback.gen) 747 callback.gen = None 748 749 old_trace = sys.gettrace() 750 sys.settrace(noop_trace) 751 try: 752 # Install a trace function 753 threading.settrace(noop_trace) 754 755 # Create a generator in a C thread which exits after the call 756 import _testcapi 757 _testcapi.call_in_temporary_c_thread(callback) 758 759 # Call the generator in a different Python thread, check that the 760 # generator didn't keep a reference to the destroyed thread state 761 for test in range(3): 762 # The trace function is still called here 763 callback() 764 finally: 765 sys.settrace(old_trace) 766 767 @cpython_only 768 def test_shutdown_locks(self): 769 for daemon in (False, True): 770 with self.subTest(daemon=daemon): 771 event = threading.Event() 772 thread = threading.Thread(target=event.wait, daemon=daemon) 773 774 # Thread.start() must add lock to _shutdown_locks, 775 # but only for non-daemon thread 776 thread.start() 777 tstate_lock = thread._tstate_lock 778 if not daemon: 779 self.assertIn(tstate_lock, threading._shutdown_locks) 780 else: 781 self.assertNotIn(tstate_lock, threading._shutdown_locks) 782 783 # unblock the thread and join it 784 event.set() 785 thread.join() 786 787 # Thread._stop() must remove tstate_lock from _shutdown_locks. 788 # Daemon threads must never add it to _shutdown_locks. 789 self.assertNotIn(tstate_lock, threading._shutdown_locks) 790 791 def test_locals_at_exit(self): 792 # bpo-19466: thread locals must not be deleted before destructors 793 # are called 794 rc, out, err = assert_python_ok("-c", """if 1: 795 import threading 796 797 class Atexit: 798 def __del__(self): 799 print("thread_dict.atexit = %r" % thread_dict.atexit) 800 801 thread_dict = threading.local() 802 thread_dict.atexit = "value" 803 804 atexit = Atexit() 805 """) 806 self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'") 807 808 809class ThreadJoinOnShutdown(BaseTestCase): 810 811 def _run_and_join(self, script): 812 script = """if 1: 813 import sys, os, time, threading 814 815 # a thread, which waits for the main program to terminate 816 def joiningfunc(mainthread): 817 mainthread.join() 818 print('end of thread') 819 # stdout is fully buffered because not a tty, we have to flush 820 # before exit. 821 sys.stdout.flush() 822 \n""" + script 823 824 rc, out, err = assert_python_ok("-c", script) 825 data = out.decode().replace('\r', '') 826 self.assertEqual(data, "end of main\nend of thread\n") 827 828 def test_1_join_on_shutdown(self): 829 # The usual case: on exit, wait for a non-daemon thread 830 script = """if 1: 831 import os 832 t = threading.Thread(target=joiningfunc, 833 args=(threading.current_thread(),)) 834 t.start() 835 time.sleep(0.1) 836 print('end of main') 837 """ 838 self._run_and_join(script) 839 840 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 841 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 842 def test_2_join_in_forked_process(self): 843 # Like the test above, but from a forked interpreter 844 script = """if 1: 845 from test import support 846 847 childpid = os.fork() 848 if childpid != 0: 849 # parent process 850 support.wait_process(childpid, exitcode=0) 851 sys.exit(0) 852 853 # child process 854 t = threading.Thread(target=joiningfunc, 855 args=(threading.current_thread(),)) 856 t.start() 857 print('end of main') 858 """ 859 self._run_and_join(script) 860 861 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 862 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 863 def test_3_join_in_forked_from_thread(self): 864 # Like the test above, but fork() was called from a worker thread 865 # In the forked process, the main Thread object must be marked as stopped. 866 867 script = """if 1: 868 from test import support 869 870 main_thread = threading.current_thread() 871 def worker(): 872 childpid = os.fork() 873 if childpid != 0: 874 # parent process 875 support.wait_process(childpid, exitcode=0) 876 sys.exit(0) 877 878 # child process 879 t = threading.Thread(target=joiningfunc, 880 args=(main_thread,)) 881 print('end of main') 882 t.start() 883 t.join() # Should not block: main_thread is already stopped 884 885 w = threading.Thread(target=worker) 886 w.start() 887 """ 888 self._run_and_join(script) 889 890 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 891 def test_4_daemon_threads(self): 892 # Check that a daemon thread cannot crash the interpreter on shutdown 893 # by manipulating internal structures that are being disposed of in 894 # the main thread. 895 script = """if True: 896 import os 897 import random 898 import sys 899 import time 900 import threading 901 902 thread_has_run = set() 903 904 def random_io(): 905 '''Loop for a while sleeping random tiny amounts and doing some I/O.''' 906 while True: 907 with open(os.__file__, 'rb') as in_f: 908 stuff = in_f.read(200) 909 with open(os.devnull, 'wb') as null_f: 910 null_f.write(stuff) 911 time.sleep(random.random() / 1995) 912 thread_has_run.add(threading.current_thread()) 913 914 def main(): 915 count = 0 916 for _ in range(40): 917 new_thread = threading.Thread(target=random_io) 918 new_thread.daemon = True 919 new_thread.start() 920 count += 1 921 while len(thread_has_run) < count: 922 time.sleep(0.001) 923 # Trigger process shutdown 924 sys.exit(0) 925 926 main() 927 """ 928 rc, out, err = assert_python_ok('-c', script) 929 self.assertFalse(err) 930 931 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 932 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 933 def test_reinit_tls_after_fork(self): 934 # Issue #13817: fork() would deadlock in a multithreaded program with 935 # the ad-hoc TLS implementation. 936 937 def do_fork_and_wait(): 938 # just fork a child process and wait it 939 pid = os.fork() 940 if pid > 0: 941 support.wait_process(pid, exitcode=50) 942 else: 943 os._exit(50) 944 945 # start a bunch of threads that will fork() child processes 946 threads = [] 947 for i in range(16): 948 t = threading.Thread(target=do_fork_and_wait) 949 threads.append(t) 950 t.start() 951 952 for t in threads: 953 t.join() 954 955 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 956 def test_clear_threads_states_after_fork(self): 957 # Issue #17094: check that threads states are cleared after fork() 958 959 # start a bunch of threads 960 threads = [] 961 for i in range(16): 962 t = threading.Thread(target=lambda : time.sleep(0.3)) 963 threads.append(t) 964 t.start() 965 966 pid = os.fork() 967 if pid == 0: 968 # check that threads states have been cleared 969 if len(sys._current_frames()) == 1: 970 os._exit(51) 971 else: 972 os._exit(52) 973 else: 974 support.wait_process(pid, exitcode=51) 975 976 for t in threads: 977 t.join() 978 979 980class SubinterpThreadingTests(BaseTestCase): 981 def pipe(self): 982 r, w = os.pipe() 983 self.addCleanup(os.close, r) 984 self.addCleanup(os.close, w) 985 if hasattr(os, 'set_blocking'): 986 os.set_blocking(r, False) 987 return (r, w) 988 989 def test_threads_join(self): 990 # Non-daemon threads should be joined at subinterpreter shutdown 991 # (issue #18808) 992 r, w = self.pipe() 993 code = textwrap.dedent(r""" 994 import os 995 import random 996 import threading 997 import time 998 999 def random_sleep(): 1000 seconds = random.random() * 0.010 1001 time.sleep(seconds) 1002 1003 def f(): 1004 # Sleep a bit so that the thread is still running when 1005 # Py_EndInterpreter is called. 1006 random_sleep() 1007 os.write(%d, b"x") 1008 1009 threading.Thread(target=f).start() 1010 random_sleep() 1011 """ % (w,)) 1012 ret = test.support.run_in_subinterp(code) 1013 self.assertEqual(ret, 0) 1014 # The thread was joined properly. 1015 self.assertEqual(os.read(r, 1), b"x") 1016 1017 def test_threads_join_2(self): 1018 # Same as above, but a delay gets introduced after the thread's 1019 # Python code returned but before the thread state is deleted. 1020 # To achieve this, we register a thread-local object which sleeps 1021 # a bit when deallocated. 1022 r, w = self.pipe() 1023 code = textwrap.dedent(r""" 1024 import os 1025 import random 1026 import threading 1027 import time 1028 1029 def random_sleep(): 1030 seconds = random.random() * 0.010 1031 time.sleep(seconds) 1032 1033 class Sleeper: 1034 def __del__(self): 1035 random_sleep() 1036 1037 tls = threading.local() 1038 1039 def f(): 1040 # Sleep a bit so that the thread is still running when 1041 # Py_EndInterpreter is called. 1042 random_sleep() 1043 tls.x = Sleeper() 1044 os.write(%d, b"x") 1045 1046 threading.Thread(target=f).start() 1047 random_sleep() 1048 """ % (w,)) 1049 ret = test.support.run_in_subinterp(code) 1050 self.assertEqual(ret, 0) 1051 # The thread was joined properly. 1052 self.assertEqual(os.read(r, 1), b"x") 1053 1054 @cpython_only 1055 def test_daemon_threads_fatal_error(self): 1056 subinterp_code = f"""if 1: 1057 import os 1058 import threading 1059 import time 1060 1061 def f(): 1062 # Make sure the daemon thread is still running when 1063 # Py_EndInterpreter is called. 1064 time.sleep({test.support.SHORT_TIMEOUT}) 1065 threading.Thread(target=f, daemon=True).start() 1066 """ 1067 script = r"""if 1: 1068 import _testcapi 1069 1070 _testcapi.run_in_subinterp(%r) 1071 """ % (subinterp_code,) 1072 with test.support.SuppressCrashReport(): 1073 rc, out, err = assert_python_failure("-c", script) 1074 self.assertIn("Fatal Python error: Py_EndInterpreter: " 1075 "not the last thread", err.decode()) 1076 1077 1078class ThreadingExceptionTests(BaseTestCase): 1079 # A RuntimeError should be raised if Thread.start() is called 1080 # multiple times. 1081 def test_start_thread_again(self): 1082 thread = threading.Thread() 1083 thread.start() 1084 self.assertRaises(RuntimeError, thread.start) 1085 thread.join() 1086 1087 def test_joining_current_thread(self): 1088 current_thread = threading.current_thread() 1089 self.assertRaises(RuntimeError, current_thread.join); 1090 1091 def test_joining_inactive_thread(self): 1092 thread = threading.Thread() 1093 self.assertRaises(RuntimeError, thread.join) 1094 1095 def test_daemonize_active_thread(self): 1096 thread = threading.Thread() 1097 thread.start() 1098 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 1099 thread.join() 1100 1101 def test_releasing_unacquired_lock(self): 1102 lock = threading.Lock() 1103 self.assertRaises(RuntimeError, lock.release) 1104 1105 def test_recursion_limit(self): 1106 # Issue 9670 1107 # test that excessive recursion within a non-main thread causes 1108 # an exception rather than crashing the interpreter on platforms 1109 # like Mac OS X or FreeBSD which have small default stack sizes 1110 # for threads 1111 script = """if True: 1112 import threading 1113 1114 def recurse(): 1115 return recurse() 1116 1117 def outer(): 1118 try: 1119 recurse() 1120 except RecursionError: 1121 pass 1122 1123 w = threading.Thread(target=outer) 1124 w.start() 1125 w.join() 1126 print('end of main thread') 1127 """ 1128 expected_output = "end of main thread\n" 1129 p = subprocess.Popen([sys.executable, "-c", script], 1130 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1131 stdout, stderr = p.communicate() 1132 data = stdout.decode().replace('\r', '') 1133 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) 1134 self.assertEqual(data, expected_output) 1135 1136 def test_print_exception(self): 1137 script = r"""if True: 1138 import threading 1139 import time 1140 1141 running = False 1142 def run(): 1143 global running 1144 running = True 1145 while running: 1146 time.sleep(0.01) 1147 1/0 1148 t = threading.Thread(target=run) 1149 t.start() 1150 while not running: 1151 time.sleep(0.01) 1152 running = False 1153 t.join() 1154 """ 1155 rc, out, err = assert_python_ok("-c", script) 1156 self.assertEqual(out, b'') 1157 err = err.decode() 1158 self.assertIn("Exception in thread", err) 1159 self.assertIn("Traceback (most recent call last):", err) 1160 self.assertIn("ZeroDivisionError", err) 1161 self.assertNotIn("Unhandled exception", err) 1162 1163 def test_print_exception_stderr_is_none_1(self): 1164 script = r"""if True: 1165 import sys 1166 import threading 1167 import time 1168 1169 running = False 1170 def run(): 1171 global running 1172 running = True 1173 while running: 1174 time.sleep(0.01) 1175 1/0 1176 t = threading.Thread(target=run) 1177 t.start() 1178 while not running: 1179 time.sleep(0.01) 1180 sys.stderr = None 1181 running = False 1182 t.join() 1183 """ 1184 rc, out, err = assert_python_ok("-c", script) 1185 self.assertEqual(out, b'') 1186 err = err.decode() 1187 self.assertIn("Exception in thread", err) 1188 self.assertIn("Traceback (most recent call last):", err) 1189 self.assertIn("ZeroDivisionError", err) 1190 self.assertNotIn("Unhandled exception", err) 1191 1192 def test_print_exception_stderr_is_none_2(self): 1193 script = r"""if True: 1194 import sys 1195 import threading 1196 import time 1197 1198 running = False 1199 def run(): 1200 global running 1201 running = True 1202 while running: 1203 time.sleep(0.01) 1204 1/0 1205 sys.stderr = None 1206 t = threading.Thread(target=run) 1207 t.start() 1208 while not running: 1209 time.sleep(0.01) 1210 running = False 1211 t.join() 1212 """ 1213 rc, out, err = assert_python_ok("-c", script) 1214 self.assertEqual(out, b'') 1215 self.assertNotIn("Unhandled exception", err.decode()) 1216 1217 def test_bare_raise_in_brand_new_thread(self): 1218 def bare_raise(): 1219 raise 1220 1221 class Issue27558(threading.Thread): 1222 exc = None 1223 1224 def run(self): 1225 try: 1226 bare_raise() 1227 except Exception as exc: 1228 self.exc = exc 1229 1230 thread = Issue27558() 1231 thread.start() 1232 thread.join() 1233 self.assertIsNotNone(thread.exc) 1234 self.assertIsInstance(thread.exc, RuntimeError) 1235 # explicitly break the reference cycle to not leak a dangling thread 1236 thread.exc = None 1237 1238 1239class ThreadRunFail(threading.Thread): 1240 def run(self): 1241 raise ValueError("run failed") 1242 1243 1244class ExceptHookTests(BaseTestCase): 1245 def test_excepthook(self): 1246 with support.captured_output("stderr") as stderr: 1247 thread = ThreadRunFail(name="excepthook thread") 1248 thread.start() 1249 thread.join() 1250 1251 stderr = stderr.getvalue().strip() 1252 self.assertIn(f'Exception in thread {thread.name}:\n', stderr) 1253 self.assertIn('Traceback (most recent call last):\n', stderr) 1254 self.assertIn(' raise ValueError("run failed")', stderr) 1255 self.assertIn('ValueError: run failed', stderr) 1256 1257 @support.cpython_only 1258 def test_excepthook_thread_None(self): 1259 # threading.excepthook called with thread=None: log the thread 1260 # identifier in this case. 1261 with support.captured_output("stderr") as stderr: 1262 try: 1263 raise ValueError("bug") 1264 except Exception as exc: 1265 args = threading.ExceptHookArgs([*sys.exc_info(), None]) 1266 try: 1267 threading.excepthook(args) 1268 finally: 1269 # Explicitly break a reference cycle 1270 args = None 1271 1272 stderr = stderr.getvalue().strip() 1273 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr) 1274 self.assertIn('Traceback (most recent call last):\n', stderr) 1275 self.assertIn(' raise ValueError("bug")', stderr) 1276 self.assertIn('ValueError: bug', stderr) 1277 1278 def test_system_exit(self): 1279 class ThreadExit(threading.Thread): 1280 def run(self): 1281 sys.exit(1) 1282 1283 # threading.excepthook() silently ignores SystemExit 1284 with support.captured_output("stderr") as stderr: 1285 thread = ThreadExit() 1286 thread.start() 1287 thread.join() 1288 1289 self.assertEqual(stderr.getvalue(), '') 1290 1291 def test_custom_excepthook(self): 1292 args = None 1293 1294 def hook(hook_args): 1295 nonlocal args 1296 args = hook_args 1297 1298 try: 1299 with support.swap_attr(threading, 'excepthook', hook): 1300 thread = ThreadRunFail() 1301 thread.start() 1302 thread.join() 1303 1304 self.assertEqual(args.exc_type, ValueError) 1305 self.assertEqual(str(args.exc_value), 'run failed') 1306 self.assertEqual(args.exc_traceback, args.exc_value.__traceback__) 1307 self.assertIs(args.thread, thread) 1308 finally: 1309 # Break reference cycle 1310 args = None 1311 1312 def test_custom_excepthook_fail(self): 1313 def threading_hook(args): 1314 raise ValueError("threading_hook failed") 1315 1316 err_str = None 1317 1318 def sys_hook(exc_type, exc_value, exc_traceback): 1319 nonlocal err_str 1320 err_str = str(exc_value) 1321 1322 with support.swap_attr(threading, 'excepthook', threading_hook), \ 1323 support.swap_attr(sys, 'excepthook', sys_hook), \ 1324 support.captured_output('stderr') as stderr: 1325 thread = ThreadRunFail() 1326 thread.start() 1327 thread.join() 1328 1329 self.assertEqual(stderr.getvalue(), 1330 'Exception in threading.excepthook:\n') 1331 self.assertEqual(err_str, 'threading_hook failed') 1332 1333 1334class TimerTests(BaseTestCase): 1335 1336 def setUp(self): 1337 BaseTestCase.setUp(self) 1338 self.callback_args = [] 1339 self.callback_event = threading.Event() 1340 1341 def test_init_immutable_default_args(self): 1342 # Issue 17435: constructor defaults were mutable objects, they could be 1343 # mutated via the object attributes and affect other Timer objects. 1344 timer1 = threading.Timer(0.01, self._callback_spy) 1345 timer1.start() 1346 self.callback_event.wait() 1347 timer1.args.append("blah") 1348 timer1.kwargs["foo"] = "bar" 1349 self.callback_event.clear() 1350 timer2 = threading.Timer(0.01, self._callback_spy) 1351 timer2.start() 1352 self.callback_event.wait() 1353 self.assertEqual(len(self.callback_args), 2) 1354 self.assertEqual(self.callback_args, [((), {}), ((), {})]) 1355 timer1.join() 1356 timer2.join() 1357 1358 def _callback_spy(self, *args, **kwargs): 1359 self.callback_args.append((args[:], kwargs.copy())) 1360 self.callback_event.set() 1361 1362class LockTests(lock_tests.LockTests): 1363 locktype = staticmethod(threading.Lock) 1364 1365class PyRLockTests(lock_tests.RLockTests): 1366 locktype = staticmethod(threading._PyRLock) 1367 1368@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') 1369class CRLockTests(lock_tests.RLockTests): 1370 locktype = staticmethod(threading._CRLock) 1371 1372class EventTests(lock_tests.EventTests): 1373 eventtype = staticmethod(threading.Event) 1374 1375class ConditionAsRLockTests(lock_tests.RLockTests): 1376 # Condition uses an RLock by default and exports its API. 1377 locktype = staticmethod(threading.Condition) 1378 1379class ConditionTests(lock_tests.ConditionTests): 1380 condtype = staticmethod(threading.Condition) 1381 1382class SemaphoreTests(lock_tests.SemaphoreTests): 1383 semtype = staticmethod(threading.Semaphore) 1384 1385class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 1386 semtype = staticmethod(threading.BoundedSemaphore) 1387 1388class BarrierTests(lock_tests.BarrierTests): 1389 barriertype = staticmethod(threading.Barrier) 1390 1391 1392class MiscTestCase(unittest.TestCase): 1393 def test__all__(self): 1394 extra = {"ThreadError"} 1395 blacklist = {'currentThread', 'activeCount'} 1396 support.check__all__(self, threading, ('threading', '_thread'), 1397 extra=extra, blacklist=blacklist) 1398 1399 1400class InterruptMainTests(unittest.TestCase): 1401 def test_interrupt_main_subthread(self): 1402 # Calling start_new_thread with a function that executes interrupt_main 1403 # should raise KeyboardInterrupt upon completion. 1404 def call_interrupt(): 1405 _thread.interrupt_main() 1406 t = threading.Thread(target=call_interrupt) 1407 with self.assertRaises(KeyboardInterrupt): 1408 t.start() 1409 t.join() 1410 t.join() 1411 1412 def test_interrupt_main_mainthread(self): 1413 # Make sure that if interrupt_main is called in main thread that 1414 # KeyboardInterrupt is raised instantly. 1415 with self.assertRaises(KeyboardInterrupt): 1416 _thread.interrupt_main() 1417 1418 def test_interrupt_main_noerror(self): 1419 handler = signal.getsignal(signal.SIGINT) 1420 try: 1421 # No exception should arise. 1422 signal.signal(signal.SIGINT, signal.SIG_IGN) 1423 _thread.interrupt_main() 1424 1425 signal.signal(signal.SIGINT, signal.SIG_DFL) 1426 _thread.interrupt_main() 1427 finally: 1428 # Restore original handler 1429 signal.signal(signal.SIGINT, handler) 1430 1431 1432class AtexitTests(unittest.TestCase): 1433 1434 def test_atexit_output(self): 1435 rc, out, err = assert_python_ok("-c", """if True: 1436 import threading 1437 1438 def run_last(): 1439 print('parrot') 1440 1441 threading._register_atexit(run_last) 1442 """) 1443 1444 self.assertFalse(err) 1445 self.assertEqual(out.strip(), b'parrot') 1446 1447 def test_atexit_called_once(self): 1448 rc, out, err = assert_python_ok("-c", """if True: 1449 import threading 1450 from unittest.mock import Mock 1451 1452 mock = Mock() 1453 threading._register_atexit(mock) 1454 mock.assert_not_called() 1455 # force early shutdown to ensure it was called once 1456 threading._shutdown() 1457 mock.assert_called_once() 1458 """) 1459 1460 self.assertFalse(err) 1461 1462 def test_atexit_after_shutdown(self): 1463 # The only way to do this is by registering an atexit within 1464 # an atexit, which is intended to raise an exception. 1465 rc, out, err = assert_python_ok("-c", """if True: 1466 import threading 1467 1468 def func(): 1469 pass 1470 1471 def run_last(): 1472 threading._register_atexit(func) 1473 1474 threading._register_atexit(run_last) 1475 """) 1476 1477 self.assertTrue(err) 1478 self.assertIn("RuntimeError: can't register atexit after shutdown", 1479 err.decode()) 1480 1481 1482if __name__ == "__main__": 1483 unittest.main() 1484