1from test import test_support
2import unittest
3import dummy_threading as _threading
4import time
5
6class DummyThreadingTestCase(unittest.TestCase):
7
8    class TestThread(_threading.Thread):
9
10        def run(self):
11            global running
12            global sema
13            global mutex
14            # Uncomment if testing another module, such as the real 'threading'
15            # module.
16            #delay = random.random() * 2
17            delay = 0
18            if test_support.verbose:
19                print 'task', self.name, 'will run for', delay, 'sec'
20            sema.acquire()
21            mutex.acquire()
22            running += 1
23            if test_support.verbose:
24                print running, 'tasks are running'
25            mutex.release()
26            time.sleep(delay)
27            if test_support.verbose:
28                print 'task', self.name, 'done'
29            mutex.acquire()
30            running -= 1
31            if test_support.verbose:
32                print self.name, 'is finished.', running, 'tasks are running'
33            mutex.release()
34            sema.release()
35
36    def setUp(self):
37        self.numtasks = 10
38        global sema
39        sema = _threading.BoundedSemaphore(value=3)
40        global mutex
41        mutex = _threading.RLock()
42        global running
43        running = 0
44        self.threads = []
45
46    def test_tasks(self):
47        for i in range(self.numtasks):
48            t = self.TestThread(name="<thread %d>"%i)
49            self.threads.append(t)
50            t.start()
51
52        if test_support.verbose:
53            print 'waiting for all tasks to complete'
54        for t in self.threads:
55            t.join()
56        if test_support.verbose:
57            print 'all tasks done'
58
59def test_main():
60    test_support.run_unittest(DummyThreadingTestCase)
61
62if __name__ == '__main__':
63    test_main()
64