1"""This test case provides support for checking forking and wait behavior.
2
3To test different wait behavior, override the wait_impl method.
4
5We want fork1() semantics -- only the forking thread survives in the
6child after a fork().
7
8On some systems (e.g. Solaris without posix threads) we find that all
9active threads survive in the child after a fork(); this is an error.
10
11While BeOS doesn't officially support fork and native threading in
12the same application, the present example should work just fine.  DC
13"""
14
15import os, sys, time, unittest
16import test.support as support
17
18threading = support.import_module('threading')
19
20LONGSLEEP = 2
21SHORTSLEEP = 0.5
22NUM_THREADS = 4
23
24class ForkWait(unittest.TestCase):
25
26    def setUp(self):
27        self._threading_key = support.threading_setup()
28        self.alive = {}
29        self.stop = 0
30        self.threads = []
31
32    def tearDown(self):
33        # Stop threads
34        self.stop = 1
35        for thread in self.threads:
36            thread.join()
37        thread = None
38        del self.threads[:]
39        support.threading_cleanup(*self._threading_key)
40
41    def f(self, id):
42        while not self.stop:
43            self.alive[id] = os.getpid()
44            try:
45                time.sleep(SHORTSLEEP)
46            except IOError:
47                pass
48
49    def wait_impl(self, cpid):
50        for i in range(10):
51            # waitpid() shouldn't hang, but some of the buildbots seem to hang
52            # in the forking tests.  This is an attempt to fix the problem.
53            spid, status = os.waitpid(cpid, os.WNOHANG)
54            if spid == cpid:
55                break
56            time.sleep(2 * SHORTSLEEP)
57
58        self.assertEqual(spid, cpid)
59        self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
60
61    def test_wait(self):
62        for i in range(NUM_THREADS):
63            thread = threading.Thread(target=self.f, args=(i,))
64            thread.start()
65            self.threads.append(thread)
66
67        time.sleep(LONGSLEEP)
68
69        a = self.alive.keys()
70        a.sort()
71        self.assertEqual(a, range(NUM_THREADS))
72
73        prefork_lives = self.alive.copy()
74
75        if sys.platform in ['unixware7']:
76            cpid = os.fork1()
77        else:
78            cpid = os.fork()
79
80        if cpid == 0:
81            # Child
82            time.sleep(LONGSLEEP)
83            n = 0
84            for key in self.alive:
85                if self.alive[key] != prefork_lives[key]:
86                    n += 1
87            os._exit(n)
88        else:
89            # Parent
90            self.wait_impl(cpid)
91