1import os 2import unittest 3import random 4from test import test_support 5thread = test_support.import_module('thread') 6import time 7import sys 8import weakref 9 10from test import lock_tests 11 12NUMTASKS = 10 13NUMTRIPS = 3 14 15 16_print_mutex = thread.allocate_lock() 17 18def verbose_print(arg): 19 """Helper function for printing out debugging output.""" 20 if test_support.verbose: 21 with _print_mutex: 22 print arg 23 24 25class BasicThreadTest(unittest.TestCase): 26 27 def setUp(self): 28 self.done_mutex = thread.allocate_lock() 29 self.done_mutex.acquire() 30 self.running_mutex = thread.allocate_lock() 31 self.random_mutex = thread.allocate_lock() 32 self.created = 0 33 self.running = 0 34 self.next_ident = 0 35 36 37class ThreadRunningTests(BasicThreadTest): 38 39 def newtask(self): 40 with self.running_mutex: 41 self.next_ident += 1 42 verbose_print("creating task %s" % self.next_ident) 43 thread.start_new_thread(self.task, (self.next_ident,)) 44 self.created += 1 45 self.running += 1 46 47 def task(self, ident): 48 with self.random_mutex: 49 delay = random.random() / 10000.0 50 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 51 time.sleep(delay) 52 verbose_print("task %s done" % ident) 53 with self.running_mutex: 54 self.running -= 1 55 if self.created == NUMTASKS and self.running == 0: 56 self.done_mutex.release() 57 58 def test_starting_threads(self): 59 # Basic test for thread creation. 60 for i in range(NUMTASKS): 61 self.newtask() 62 verbose_print("waiting for tasks to complete...") 63 self.done_mutex.acquire() 64 verbose_print("all tasks done") 65 66 def test_stack_size(self): 67 # Various stack size tests. 68 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 69 70 thread.stack_size(0) 71 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 72 73 if os.name not in ("nt", "os2", "posix"): 74 return 75 76 tss_supported = True 77 try: 78 thread.stack_size(4096) 79 except ValueError: 80 verbose_print("caught expected ValueError setting " 81 "stack_size(4096)") 82 except thread.error: 83 tss_supported = False 84 verbose_print("platform does not support changing thread stack " 85 "size") 86 87 if tss_supported: 88 fail_msg = "stack_size(%d) failed - should succeed" 89 for tss in (262144, 0x100000, 0): 90 thread.stack_size(tss) 91 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 92 verbose_print("successfully set stack_size(%d)" % tss) 93 94 for tss in (262144, 0x100000): 95 verbose_print("trying stack_size = (%d)" % tss) 96 self.next_ident = 0 97 self.created = 0 98 for i in range(NUMTASKS): 99 self.newtask() 100 101 verbose_print("waiting for all tasks to complete") 102 self.done_mutex.acquire() 103 verbose_print("all tasks done") 104 105 thread.stack_size(0) 106 107 def test__count(self): 108 # Test the _count() function. 109 orig = thread._count() 110 mut = thread.allocate_lock() 111 mut.acquire() 112 started = [] 113 def task(): 114 started.append(None) 115 mut.acquire() 116 mut.release() 117 thread.start_new_thread(task, ()) 118 while not started: 119 time.sleep(0.01) 120 self.assertEqual(thread._count(), orig + 1) 121 # Allow the task to finish. 122 mut.release() 123 # The only reliable way to be sure that the thread ended from the 124 # interpreter's point of view is to wait for the function object to be 125 # destroyed. 126 done = [] 127 wr = weakref.ref(task, lambda _: done.append(None)) 128 del task 129 while not done: 130 time.sleep(0.01) 131 self.assertEqual(thread._count(), orig) 132 133 134class Barrier: 135 def __init__(self, num_threads): 136 self.num_threads = num_threads 137 self.waiting = 0 138 self.checkin_mutex = thread.allocate_lock() 139 self.checkout_mutex = thread.allocate_lock() 140 self.checkout_mutex.acquire() 141 142 def enter(self): 143 self.checkin_mutex.acquire() 144 self.waiting = self.waiting + 1 145 if self.waiting == self.num_threads: 146 self.waiting = self.num_threads - 1 147 self.checkout_mutex.release() 148 return 149 self.checkin_mutex.release() 150 151 self.checkout_mutex.acquire() 152 self.waiting = self.waiting - 1 153 if self.waiting == 0: 154 self.checkin_mutex.release() 155 return 156 self.checkout_mutex.release() 157 158 159class BarrierTest(BasicThreadTest): 160 161 def test_barrier(self): 162 self.bar = Barrier(NUMTASKS) 163 self.running = NUMTASKS 164 for i in range(NUMTASKS): 165 thread.start_new_thread(self.task2, (i,)) 166 verbose_print("waiting for tasks to end") 167 self.done_mutex.acquire() 168 verbose_print("tasks done") 169 170 def task2(self, ident): 171 for i in range(NUMTRIPS): 172 if ident == 0: 173 # give it a good chance to enter the next 174 # barrier before the others are all out 175 # of the current one 176 delay = 0 177 else: 178 with self.random_mutex: 179 delay = random.random() / 10000.0 180 verbose_print("task %s will run for %sus" % 181 (ident, round(delay * 1e6))) 182 time.sleep(delay) 183 verbose_print("task %s entering %s" % (ident, i)) 184 self.bar.enter() 185 verbose_print("task %s leaving barrier" % ident) 186 with self.running_mutex: 187 self.running -= 1 188 # Must release mutex before releasing done, else the main thread can 189 # exit and set mutex to None as part of global teardown; then 190 # mutex.release() raises AttributeError. 191 finished = self.running == 0 192 if finished: 193 self.done_mutex.release() 194 195 196class LockTests(lock_tests.LockTests): 197 locktype = thread.allocate_lock 198 199 200class TestForkInThread(unittest.TestCase): 201 def setUp(self): 202 self.read_fd, self.write_fd = os.pipe() 203 204 @unittest.skipIf(sys.platform.startswith('win'), 205 "This test is only appropriate for POSIX-like systems.") 206 @test_support.reap_threads 207 def test_forkinthread(self): 208 def thread1(): 209 try: 210 pid = os.fork() # fork in a thread 211 except RuntimeError: 212 sys.exit(0) # exit the child 213 214 if pid == 0: # child 215 os.close(self.read_fd) 216 os.write(self.write_fd, "OK") 217 sys.exit(0) 218 else: # parent 219 os.close(self.write_fd) 220 221 thread.start_new_thread(thread1, ()) 222 self.assertEqual(os.read(self.read_fd, 2), "OK", 223 "Unable to fork() in thread") 224 225 def tearDown(self): 226 try: 227 os.close(self.read_fd) 228 except OSError: 229 pass 230 231 try: 232 os.close(self.write_fd) 233 except OSError: 234 pass 235 236 237def test_main(): 238 test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests, 239 TestForkInThread) 240 241if __name__ == "__main__": 242 test_main() 243