1# Very rudimentary test of threading module
2
3import test.test_support
4from test.test_support import verbose
5import random
6import re
7import sys
8thread = test.test_support.import_module('thread')
9threading = test.test_support.import_module('threading')
10import time
11import unittest
12import weakref
13import os
14import subprocess
15
16from test import lock_tests
17
18# A trivial mutable counter.
19class Counter(object):
20    def __init__(self):
21        self.value = 0
22    def inc(self):
23        self.value += 1
24    def dec(self):
25        self.value -= 1
26    def get(self):
27        return self.value
28
29class TestThread(threading.Thread):
30    def __init__(self, name, testcase, sema, mutex, nrunning):
31        threading.Thread.__init__(self, name=name)
32        self.testcase = testcase
33        self.sema = sema
34        self.mutex = mutex
35        self.nrunning = nrunning
36
37    def run(self):
38        delay = random.random() / 10000.0
39        if verbose:
40            print 'task %s will run for %.1f usec' % (
41                self.name, delay * 1e6)
42
43        with self.sema:
44            with self.mutex:
45                self.nrunning.inc()
46                if verbose:
47                    print self.nrunning.get(), 'tasks are running'
48                self.testcase.assertTrue(self.nrunning.get() <= 3)
49
50            time.sleep(delay)
51            if verbose:
52                print 'task', self.name, 'done'
53
54            with self.mutex:
55                self.nrunning.dec()
56                self.testcase.assertTrue(self.nrunning.get() >= 0)
57                if verbose:
58                    print '%s is finished. %d tasks are running' % (
59                        self.name, self.nrunning.get())
60
61class BaseTestCase(unittest.TestCase):
62    def setUp(self):
63        self._threads = test.test_support.threading_setup()
64
65    def tearDown(self):
66        test.test_support.threading_cleanup(*self._threads)
67        test.test_support.reap_children()
68
69
70class ThreadTests(BaseTestCase):
71
72    # Create a bunch of threads, let each do some work, wait until all are
73    # done.
74    def test_various_ops(self):
75        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
76        # times about 1 second per clump).
77        NUMTASKS = 10
78
79        # no more than 3 of the 10 can run at once
80        sema = threading.BoundedSemaphore(value=3)
81        mutex = threading.RLock()
82        numrunning = Counter()
83
84        threads = []
85
86        for i in range(NUMTASKS):
87            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
88            threads.append(t)
89            self.assertEqual(t.ident, None)
90            self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
91            t.start()
92
93        if verbose:
94            print 'waiting for all tasks to complete'
95        for t in threads:
96            t.join(NUMTASKS)
97            self.assertTrue(not t.is_alive())
98            self.assertNotEqual(t.ident, 0)
99            self.assertFalse(t.ident is None)
100            self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
101        if verbose:
102            print 'all tasks done'
103        self.assertEqual(numrunning.get(), 0)
104
105    def test_ident_of_no_threading_threads(self):
106        # The ident still must work for the main thread and dummy threads.
107        self.assertFalse(threading.currentThread().ident is None)
108        def f():
109            ident.append(threading.currentThread().ident)
110            done.set()
111        done = threading.Event()
112        ident = []
113        thread.start_new_thread(f, ())
114        done.wait()
115        self.assertFalse(ident[0] is None)
116        # Kill the "immortal" _DummyThread
117        del threading._active[ident[0]]
118
119    # run with a small(ish) thread stack size (256kB)
120    def test_various_ops_small_stack(self):
121        if verbose:
122            print 'with 256kB thread stack size...'
123        try:
124            threading.stack_size(262144)
125        except thread.error:
126            if verbose:
127                print 'platform does not support changing thread stack size'
128            return
129        self.test_various_ops()
130        threading.stack_size(0)
131
132    # run with a large thread stack size (1MB)
133    def test_various_ops_large_stack(self):
134        if verbose:
135            print 'with 1MB thread stack size...'
136        try:
137            threading.stack_size(0x100000)
138        except thread.error:
139            if verbose:
140                print 'platform does not support changing thread stack size'
141            return
142        self.test_various_ops()
143        threading.stack_size(0)
144
145    def test_foreign_thread(self):
146        # Check that a "foreign" thread can use the threading module.
147        def f(mutex):
148            # Calling current_thread() forces an entry for the foreign
149            # thread to get made in the threading._active map.
150            threading.current_thread()
151            mutex.release()
152
153        mutex = threading.Lock()
154        mutex.acquire()
155        tid = thread.start_new_thread(f, (mutex,))
156        # Wait for the thread to finish.
157        mutex.acquire()
158        self.assertIn(tid, threading._active)
159        self.assertIsInstance(threading._active[tid], threading._DummyThread)
160        del threading._active[tid]
161
162    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
163    # exposed at the Python level.  This test relies on ctypes to get at it.
164    def test_PyThreadState_SetAsyncExc(self):
165        try:
166            import ctypes
167        except ImportError:
168            if verbose:
169                print "test_PyThreadState_SetAsyncExc can't import ctypes"
170            return  # can't do anything
171
172        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
173
174        class AsyncExc(Exception):
175            pass
176
177        exception = ctypes.py_object(AsyncExc)
178
179        # First check it works when setting the exception from the same thread.
180        tid = thread.get_ident()
181
182        try:
183            result = set_async_exc(ctypes.c_long(tid), exception)
184            # The exception is async, so we might have to keep the VM busy until
185            # it notices.
186            while True:
187                pass
188        except AsyncExc:
189            pass
190        else:
191            # This code is unreachable but it reflects the intent. If we wanted
192            # to be smarter the above loop wouldn't be infinite.
193            self.fail("AsyncExc not raised")
194        try:
195            self.assertEqual(result, 1) # one thread state modified
196        except UnboundLocalError:
197            # The exception was raised too quickly for us to get the result.
198            pass
199
200        # `worker_started` is set by the thread when it's inside a try/except
201        # block waiting to catch the asynchronously set AsyncExc exception.
202        # `worker_saw_exception` is set by the thread upon catching that
203        # exception.
204        worker_started = threading.Event()
205        worker_saw_exception = threading.Event()
206
207        class Worker(threading.Thread):
208            def run(self):
209                self.id = thread.get_ident()
210                self.finished = False
211
212                try:
213                    while True:
214                        worker_started.set()
215                        time.sleep(0.1)
216                except AsyncExc:
217                    self.finished = True
218                    worker_saw_exception.set()
219
220        t = Worker()
221        t.daemon = True # so if this fails, we don't hang Python at shutdown
222        t.start()
223        if verbose:
224            print "    started worker thread"
225
226        # Try a thread id that doesn't make sense.
227        if verbose:
228            print "    trying nonsensical thread id"
229        result = set_async_exc(ctypes.c_long(-1), exception)
230        self.assertEqual(result, 0)  # no thread states modified
231
232        # Now raise an exception in the worker thread.
233        if verbose:
234            print "    waiting for worker thread to get started"
235        ret = worker_started.wait()
236        self.assertTrue(ret)
237        if verbose:
238            print "    verifying worker hasn't exited"
239        self.assertTrue(not t.finished)
240        if verbose:
241            print "    attempting to raise asynch exception in worker"
242        result = set_async_exc(ctypes.c_long(t.id), exception)
243        self.assertEqual(result, 1) # one thread state modified
244        if verbose:
245            print "    waiting for worker to say it caught the exception"
246        worker_saw_exception.wait(timeout=10)
247        self.assertTrue(t.finished)
248        if verbose:
249            print "    all OK -- joining worker"
250        if t.finished:
251            t.join()
252        # else the thread is still running, and we have no way to kill it
253
254    def test_limbo_cleanup(self):
255        # Issue 7481: Failure to start thread should cleanup the limbo map.
256        def fail_new_thread(*args):
257            raise thread.error()
258        _start_new_thread = threading._start_new_thread
259        threading._start_new_thread = fail_new_thread
260        try:
261            t = threading.Thread(target=lambda: None)
262            self.assertRaises(thread.error, t.start)
263            self.assertFalse(
264                t in threading._limbo,
265                "Failed to cleanup _limbo map on failure of Thread.start().")
266        finally:
267            threading._start_new_thread = _start_new_thread
268
269    def test_finalize_runnning_thread(self):
270        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
271        # very late on python exit: on deallocation of a running thread for
272        # example.
273        try:
274            import ctypes
275        except ImportError:
276            if verbose:
277                print("test_finalize_with_runnning_thread can't import ctypes")
278            return  # can't do anything
279
280        rc = subprocess.call([sys.executable, "-c", """if 1:
281            import ctypes, sys, time, thread
282
283            # This lock is used as a simple event variable.
284            ready = thread.allocate_lock()
285            ready.acquire()
286
287            # Module globals are cleared before __del__ is run
288            # So we save the functions in class dict
289            class C:
290                ensure = ctypes.pythonapi.PyGILState_Ensure
291                release = ctypes.pythonapi.PyGILState_Release
292                def __del__(self):
293                    state = self.ensure()
294                    self.release(state)
295
296            def waitingThread():
297                x = C()
298                ready.release()
299                time.sleep(100)
300
301            thread.start_new_thread(waitingThread, ())
302            ready.acquire()  # Be sure the other thread is waiting.
303            sys.exit(42)
304            """])
305        self.assertEqual(rc, 42)
306
307    def test_finalize_with_trace(self):
308        # Issue1733757
309        # Avoid a deadlock when sys.settrace steps into threading._shutdown
310        p = subprocess.Popen([sys.executable, "-c", """if 1:
311            import sys, threading
312
313            # A deadlock-killer, to prevent the
314            # testsuite to hang forever
315            def killer():
316                import os, time
317                time.sleep(2)
318                print 'program blocked; aborting'
319                os._exit(2)
320            t = threading.Thread(target=killer)
321            t.daemon = True
322            t.start()
323
324            # This is the trace function
325            def func(frame, event, arg):
326                threading.current_thread()
327                return func
328
329            sys.settrace(func)
330            """],
331            stdout=subprocess.PIPE,
332            stderr=subprocess.PIPE)
333        self.addCleanup(p.stdout.close)
334        self.addCleanup(p.stderr.close)
335        stdout, stderr = p.communicate()
336        rc = p.returncode
337        self.assertFalse(rc == 2, "interpreted was blocked")
338        self.assertTrue(rc == 0,
339                        "Unexpected error: " + repr(stderr))
340
341    def test_join_nondaemon_on_shutdown(self):
342        # Issue 1722344
343        # Raising SystemExit skipped threading._shutdown
344        p = subprocess.Popen([sys.executable, "-c", """if 1:
345                import threading
346                from time import sleep
347
348                def child():
349                    sleep(1)
350                    # As a non-daemon thread we SHOULD wake up and nothing
351                    # should be torn down yet
352                    print "Woke up, sleep function is:", sleep
353
354                threading.Thread(target=child).start()
355                raise SystemExit
356            """],
357            stdout=subprocess.PIPE,
358            stderr=subprocess.PIPE)
359        self.addCleanup(p.stdout.close)
360        self.addCleanup(p.stderr.close)
361        stdout, stderr = p.communicate()
362        self.assertEqual(stdout.strip(),
363            "Woke up, sleep function is: <built-in function sleep>")
364        stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
365        self.assertEqual(stderr, "")
366
367    def test_enumerate_after_join(self):
368        # Try hard to trigger #1703448: a thread is still returned in
369        # threading.enumerate() after it has been join()ed.
370        enum = threading.enumerate
371        old_interval = sys.getcheckinterval()
372        try:
373            for i in xrange(1, 100):
374                # Try a couple times at each thread-switching interval
375                # to get more interleavings.
376                sys.setcheckinterval(i // 5)
377                t = threading.Thread(target=lambda: None)
378                t.start()
379                t.join()
380                l = enum()
381                self.assertNotIn(t, l,
382                    "#1703448 triggered after %d trials: %s" % (i, l))
383        finally:
384            sys.setcheckinterval(old_interval)
385
386    def test_no_refcycle_through_target(self):
387        class RunSelfFunction(object):
388            def __init__(self, should_raise):
389                # The links in this refcycle from Thread back to self
390                # should be cleaned up when the thread completes.
391                self.should_raise = should_raise
392                self.thread = threading.Thread(target=self._run,
393                                               args=(self,),
394                                               kwargs={'yet_another':self})
395                self.thread.start()
396
397            def _run(self, other_ref, yet_another):
398                if self.should_raise:
399                    raise SystemExit
400
401        cyclic_object = RunSelfFunction(should_raise=False)
402        weak_cyclic_object = weakref.ref(cyclic_object)
403        cyclic_object.thread.join()
404        del cyclic_object
405        self.assertEqual(None, weak_cyclic_object(),
406                         msg=('%d references still around' %
407                              sys.getrefcount(weak_cyclic_object())))
408
409        raising_cyclic_object = RunSelfFunction(should_raise=True)
410        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
411        raising_cyclic_object.thread.join()
412        del raising_cyclic_object
413        self.assertEqual(None, weak_raising_cyclic_object(),
414                         msg=('%d references still around' %
415                              sys.getrefcount(weak_raising_cyclic_object())))
416
417
418class ThreadJoinOnShutdown(BaseTestCase):
419
420    def _run_and_join(self, script):
421        script = """if 1:
422            import sys, os, time, threading
423
424            # a thread, which waits for the main program to terminate
425            def joiningfunc(mainthread):
426                mainthread.join()
427                print 'end of thread'
428        \n""" + script
429
430        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
431        rc = p.wait()
432        data = p.stdout.read().replace('\r', '')
433        p.stdout.close()
434        self.assertEqual(data, "end of main\nend of thread\n")
435        self.assertFalse(rc == 2, "interpreter was blocked")
436        self.assertTrue(rc == 0, "Unexpected error")
437
438    def test_1_join_on_shutdown(self):
439        # The usual case: on exit, wait for a non-daemon thread
440        script = """if 1:
441            import os
442            t = threading.Thread(target=joiningfunc,
443                                 args=(threading.current_thread(),))
444            t.start()
445            time.sleep(0.1)
446            print 'end of main'
447            """
448        self._run_and_join(script)
449
450
451    def test_2_join_in_forked_process(self):
452        # Like the test above, but from a forked interpreter
453        import os
454        if not hasattr(os, 'fork'):
455            return
456        script = """if 1:
457            childpid = os.fork()
458            if childpid != 0:
459                os.waitpid(childpid, 0)
460                sys.exit(0)
461
462            t = threading.Thread(target=joiningfunc,
463                                 args=(threading.current_thread(),))
464            t.start()
465            print 'end of main'
466            """
467        self._run_and_join(script)
468
469    def test_3_join_in_forked_from_thread(self):
470        # Like the test above, but fork() was called from a worker thread
471        # In the forked process, the main Thread object must be marked as stopped.
472        import os
473        if not hasattr(os, 'fork'):
474            return
475        # Skip platforms with known problems forking from a worker thread.
476        # See http://bugs.python.org/issue3863.
477        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
478                           'os2emx'):
479            print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'
480                                 ' due to known OS bugs on'), sys.platform
481            return
482        script = """if 1:
483            main_thread = threading.current_thread()
484            def worker():
485                childpid = os.fork()
486                if childpid != 0:
487                    os.waitpid(childpid, 0)
488                    sys.exit(0)
489
490                t = threading.Thread(target=joiningfunc,
491                                     args=(main_thread,))
492                print 'end of main'
493                t.start()
494                t.join() # Should not block: main_thread is already stopped
495
496            w = threading.Thread(target=worker)
497            w.start()
498            """
499        self._run_and_join(script)
500
501    def assertScriptHasOutput(self, script, expected_output):
502        p = subprocess.Popen([sys.executable, "-c", script],
503                             stdout=subprocess.PIPE)
504        rc = p.wait()
505        data = p.stdout.read().decode().replace('\r', '')
506        self.assertEqual(rc, 0, "Unexpected error")
507        self.assertEqual(data, expected_output)
508
509    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
510    def test_4_joining_across_fork_in_worker_thread(self):
511        # There used to be a possible deadlock when forking from a child
512        # thread.  See http://bugs.python.org/issue6643.
513
514        # Skip platforms with known problems forking from a worker thread.
515        # See http://bugs.python.org/issue3863.
516        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
517            raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
518
519        # The script takes the following steps:
520        # - The main thread in the parent process starts a new thread and then
521        #   tries to join it.
522        # - The join operation acquires the Lock inside the thread's _block
523        #   Condition.  (See threading.py:Thread.join().)
524        # - We stub out the acquire method on the condition to force it to wait
525        #   until the child thread forks.  (See LOCK ACQUIRED HERE)
526        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
527        #   HERE)
528        # - The main thread of the parent process enters Condition.wait(),
529        #   which releases the lock on the child thread.
530        # - The child process returns.  Without the necessary fix, when the
531        #   main thread of the child process (which used to be the child thread
532        #   in the parent process) attempts to exit, it will try to acquire the
533        #   lock in the Thread._block Condition object and hang, because the
534        #   lock was held across the fork.
535
536        script = """if 1:
537            import os, time, threading
538
539            finish_join = False
540            start_fork = False
541
542            def worker():
543                # Wait until this thread's lock is acquired before forking to
544                # create the deadlock.
545                global finish_join
546                while not start_fork:
547                    time.sleep(0.01)
548                # LOCK HELD: Main thread holds lock across this call.
549                childpid = os.fork()
550                finish_join = True
551                if childpid != 0:
552                    # Parent process just waits for child.
553                    os.waitpid(childpid, 0)
554                # Child process should just return.
555
556            w = threading.Thread(target=worker)
557
558            # Stub out the private condition variable's lock acquire method.
559            # This acquires the lock and then waits until the child has forked
560            # before returning, which will release the lock soon after.  If
561            # someone else tries to fix this test case by acquiring this lock
562            # before forking instead of resetting it, the test case will
563            # deadlock when it shouldn't.
564            condition = w._block
565            orig_acquire = condition.acquire
566            call_count_lock = threading.Lock()
567            call_count = 0
568            def my_acquire():
569                global call_count
570                global start_fork
571                orig_acquire()  # LOCK ACQUIRED HERE
572                start_fork = True
573                if call_count == 0:
574                    while not finish_join:
575                        time.sleep(0.01)  # WORKER THREAD FORKS HERE
576                with call_count_lock:
577                    call_count += 1
578            condition.acquire = my_acquire
579
580            w.start()
581            w.join()
582            print('end of main')
583            """
584        self.assertScriptHasOutput(script, "end of main\n")
585
586    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
587    def test_5_clear_waiter_locks_to_avoid_crash(self):
588        # Check that a spawned thread that forks doesn't segfault on certain
589        # platforms, namely OS X.  This used to happen if there was a waiter
590        # lock in the thread's condition variable's waiters list.  Even though
591        # we know the lock will be held across the fork, it is not safe to
592        # release locks held across forks on all platforms, so releasing the
593        # waiter lock caused a segfault on OS X.  Furthermore, since locks on
594        # OS X are (as of this writing) implemented with a mutex + condition
595        # variable instead of a semaphore, while we know that the Python-level
596        # lock will be acquired, we can't know if the internal mutex will be
597        # acquired at the time of the fork.
598
599        # Skip platforms with known problems forking from a worker thread.
600        # See http://bugs.python.org/issue3863.
601        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
602            raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
603        script = """if True:
604            import os, time, threading
605
606            start_fork = False
607
608            def worker():
609                # Wait until the main thread has attempted to join this thread
610                # before continuing.
611                while not start_fork:
612                    time.sleep(0.01)
613                childpid = os.fork()
614                if childpid != 0:
615                    # Parent process just waits for child.
616                    (cpid, rc) = os.waitpid(childpid, 0)
617                    assert cpid == childpid
618                    assert rc == 0
619                    print('end of worker thread')
620                else:
621                    # Child process should just return.
622                    pass
623
624            w = threading.Thread(target=worker)
625
626            # Stub out the private condition variable's _release_save method.
627            # This releases the condition's lock and flips the global that
628            # causes the worker to fork.  At this point, the problematic waiter
629            # lock has been acquired once by the waiter and has been put onto
630            # the waiters list.
631            condition = w._block
632            orig_release_save = condition._release_save
633            def my_release_save():
634                global start_fork
635                orig_release_save()
636                # Waiter lock held here, condition lock released.
637                start_fork = True
638            condition._release_save = my_release_save
639
640            w.start()
641            w.join()
642            print('end of main thread')
643            """
644        output = "end of worker thread\nend of main thread\n"
645        self.assertScriptHasOutput(script, output)
646
647
648class ThreadingExceptionTests(BaseTestCase):
649    # A RuntimeError should be raised if Thread.start() is called
650    # multiple times.
651    def test_start_thread_again(self):
652        thread = threading.Thread()
653        thread.start()
654        self.assertRaises(RuntimeError, thread.start)
655
656    def test_joining_current_thread(self):
657        current_thread = threading.current_thread()
658        self.assertRaises(RuntimeError, current_thread.join);
659
660    def test_joining_inactive_thread(self):
661        thread = threading.Thread()
662        self.assertRaises(RuntimeError, thread.join)
663
664    def test_daemonize_active_thread(self):
665        thread = threading.Thread()
666        thread.start()
667        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
668
669
670class LockTests(lock_tests.LockTests):
671    locktype = staticmethod(threading.Lock)
672
673class RLockTests(lock_tests.RLockTests):
674    locktype = staticmethod(threading.RLock)
675
676class EventTests(lock_tests.EventTests):
677    eventtype = staticmethod(threading.Event)
678
679class ConditionAsRLockTests(lock_tests.RLockTests):
680    # An Condition uses an RLock by default and exports its API.
681    locktype = staticmethod(threading.Condition)
682
683class ConditionTests(lock_tests.ConditionTests):
684    condtype = staticmethod(threading.Condition)
685
686class SemaphoreTests(lock_tests.SemaphoreTests):
687    semtype = staticmethod(threading.Semaphore)
688
689class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
690    semtype = staticmethod(threading.BoundedSemaphore)
691
692    @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
693    def test_recursion_limit(self):
694        # Issue 9670
695        # test that excessive recursion within a non-main thread causes
696        # an exception rather than crashing the interpreter on platforms
697        # like Mac OS X or FreeBSD which have small default stack sizes
698        # for threads
699        script = """if True:
700            import threading
701
702            def recurse():
703                return recurse()
704
705            def outer():
706                try:
707                    recurse()
708                except RuntimeError:
709                    pass
710
711            w = threading.Thread(target=outer)
712            w.start()
713            w.join()
714            print('end of main thread')
715            """
716        expected_output = "end of main thread\n"
717        p = subprocess.Popen([sys.executable, "-c", script],
718                             stdout=subprocess.PIPE)
719        stdout, stderr = p.communicate()
720        data = stdout.decode().replace('\r', '')
721        self.assertEqual(p.returncode, 0, "Unexpected error")
722        self.assertEqual(data, expected_output)
723
724def test_main():
725    test.test_support.run_unittest(LockTests, RLockTests, EventTests,
726                                   ConditionAsRLockTests, ConditionTests,
727                                   SemaphoreTests, BoundedSemaphoreTests,
728                                   ThreadTests,
729                                   ThreadJoinOnShutdown,
730                                   ThreadingExceptionTests,
731                                   )
732
733if __name__ == "__main__":
734    test_main()
735