1"""This test checks for correct fork() behavior.
2"""
3
4import _imp as imp
5import os
6import signal
7import sys
8import time
9import unittest
10
11from test.fork_wait import ForkWait
12from test.support import (reap_children, get_attribute,
13                          import_module, verbose)
14
15threading = import_module('threading')
16
17# Skip test if fork does not exist.
18get_attribute(os, 'fork')
19
20class ForkTest(ForkWait):
21    def wait_impl(self, cpid):
22        deadline = time.monotonic() + 10.0
23        while time.monotonic() <= deadline:
24            # waitpid() shouldn't hang, but some of the buildbots seem to hang
25            # in the forking tests.  This is an attempt to fix the problem.
26            spid, status = os.waitpid(cpid, os.WNOHANG)
27            if spid == cpid:
28                break
29            time.sleep(0.1)
30
31        self.assertEqual(spid, cpid)
32        self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
33
34    def test_threaded_import_lock_fork(self):
35        """Check fork() in main thread works while a subthread is doing an import"""
36        import_started = threading.Event()
37        fake_module_name = "fake test module"
38        partial_module = "partial"
39        complete_module = "complete"
40        def importer():
41            imp.acquire_lock()
42            sys.modules[fake_module_name] = partial_module
43            import_started.set()
44            time.sleep(0.01) # Give the other thread time to try and acquire.
45            sys.modules[fake_module_name] = complete_module
46            imp.release_lock()
47        t = threading.Thread(target=importer)
48        t.start()
49        import_started.wait()
50        pid = os.fork()
51        try:
52            # PyOS_BeforeFork should have waited for the import to complete
53            # before forking, so the child can recreate the import lock
54            # correctly, but also won't see a partially initialised module
55            if not pid:
56                m = __import__(fake_module_name)
57                if m == complete_module:
58                    os._exit(0)
59                else:
60                    if verbose > 1:
61                        print("Child encountered partial module")
62                    os._exit(1)
63            else:
64                t.join()
65                # Exitcode 1 means the child got a partial module (bad.) No
66                # exitcode (but a hang, which manifests as 'got pid 0')
67                # means the child deadlocked (also bad.)
68                self.wait_impl(pid)
69        finally:
70            try:
71                os.kill(pid, signal.SIGKILL)
72            except OSError:
73                pass
74
75
76    def test_nested_import_lock_fork(self):
77        """Check fork() in main thread works while the main thread is doing an import"""
78        # Issue 9573: this used to trigger RuntimeError in the child process
79        def fork_with_import_lock(level):
80            release = 0
81            in_child = False
82            try:
83                try:
84                    for i in range(level):
85                        imp.acquire_lock()
86                        release += 1
87                    pid = os.fork()
88                    in_child = not pid
89                finally:
90                    for i in range(release):
91                        imp.release_lock()
92            except RuntimeError:
93                if in_child:
94                    if verbose > 1:
95                        print("RuntimeError in child")
96                    os._exit(1)
97                raise
98            if in_child:
99                os._exit(0)
100            self.wait_impl(pid)
101
102        # Check this works with various levels of nested
103        # import in the main thread
104        for level in range(5):
105            fork_with_import_lock(level)
106
107
108def tearDownModule():
109    reap_children()
110
111if __name__ == "__main__":
112    unittest.main()
113