1import faulthandler
2import json
3import os
4import queue
5import sys
6import threading
7import time
8import traceback
9import types
10from test import support
11
12from test.libregrtest.runtest import (
13    runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
14    format_test_result)
15from test.libregrtest.setup import setup_tests
16from test.libregrtest.utils import format_duration
17
18
19# Display the running tests if nothing happened last N seconds
20PROGRESS_UPDATE = 30.0   # seconds
21
22# If interrupted, display the wait progress every N seconds
23WAIT_PROGRESS = 2.0   # seconds
24
25
26def run_test_in_subprocess(testname, ns):
27    """Run the given test in a subprocess with --worker-args.
28
29    ns is the option Namespace parsed from command-line arguments. regrtest
30    is invoked in a subprocess with the --worker-args argument; when the
31    subprocess exits, its return code, stdout and stderr are returned as a
32    3-tuple.
33    """
34    from subprocess import Popen, PIPE
35
36    ns_dict = vars(ns)
37    worker_args = (ns_dict, testname)
38    worker_args = json.dumps(worker_args)
39
40    cmd = [sys.executable, *support.args_from_interpreter_flags(),
41           '-u',    # Unbuffered stdout and stderr
42           '-m', 'test.regrtest',
43           '--worker-args', worker_args]
44    if ns.pgo:
45        cmd += ['--pgo']
46
47    # Running the child from the same working directory as regrtest's original
48    # invocation ensures that TEMPDIR for the child is the same when
49    # sysconfig.is_python_build() is true. See issue 15300.
50    popen = Popen(cmd,
51                  stdout=PIPE, stderr=PIPE,
52                  universal_newlines=True,
53                  close_fds=(os.name != 'nt'),
54                  cwd=support.SAVEDCWD)
55    with popen:
56        stdout, stderr = popen.communicate()
57        retcode = popen.wait()
58    return retcode, stdout, stderr
59
60
61def run_tests_worker(worker_args):
62    ns_dict, testname = json.loads(worker_args)
63    ns = types.SimpleNamespace(**ns_dict)
64
65    setup_tests(ns)
66
67    try:
68        result = runtest(ns, testname)
69    except KeyboardInterrupt:
70        result = INTERRUPTED, '', None
71    except BaseException as e:
72        traceback.print_exc()
73        result = CHILD_ERROR, str(e)
74
75    print()   # Force a newline (just in case)
76    print(json.dumps(result), flush=True)
77    sys.exit(0)
78
79
80# We do not use a generator so multiple threads can call next().
81class MultiprocessIterator:
82
83    """A thread-safe iterator over tests for multiprocess mode."""
84
85    def __init__(self, tests):
86        self.interrupted = False
87        self.lock = threading.Lock()
88        self.tests = tests
89
90    def __iter__(self):
91        return self
92
93    def __next__(self):
94        with self.lock:
95            if self.interrupted:
96                raise StopIteration('tests interrupted')
97            return next(self.tests)
98
99
100class MultiprocessThread(threading.Thread):
101    def __init__(self, pending, output, ns):
102        super().__init__()
103        self.pending = pending
104        self.output = output
105        self.ns = ns
106        self.current_test = None
107        self.start_time = None
108
109    def _runtest(self):
110        try:
111            test = next(self.pending)
112        except StopIteration:
113            self.output.put((None, None, None, None))
114            return True
115
116        try:
117            self.start_time = time.monotonic()
118            self.current_test = test
119
120            retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
121        finally:
122            self.current_test = None
123
124        if retcode != 0:
125            result = (CHILD_ERROR, "Exit code %s" % retcode, None)
126            self.output.put((test, stdout.rstrip(), stderr.rstrip(),
127                             result))
128            return False
129
130        stdout, _, result = stdout.strip().rpartition("\n")
131        if not result:
132            self.output.put((None, None, None, None))
133            return True
134
135        result = json.loads(result)
136        assert len(result) == 3, f"Invalid result tuple: {result!r}"
137        self.output.put((test, stdout.rstrip(), stderr.rstrip(),
138                         result))
139        return False
140
141    def run(self):
142        try:
143            stop = False
144            while not stop:
145                stop = self._runtest()
146        except BaseException:
147            self.output.put((None, None, None, None))
148            raise
149
150
151def run_tests_multiprocess(regrtest):
152    output = queue.Queue()
153    pending = MultiprocessIterator(regrtest.tests)
154    test_timeout = regrtest.ns.timeout
155    use_timeout = (test_timeout is not None)
156
157    workers = [MultiprocessThread(pending, output, regrtest.ns)
158               for i in range(regrtest.ns.use_mp)]
159    print("Run tests in parallel using %s child processes"
160          % len(workers))
161    for worker in workers:
162        worker.start()
163
164    def get_running(workers):
165        running = []
166        for worker in workers:
167            current_test = worker.current_test
168            if not current_test:
169                continue
170            dt = time.monotonic() - worker.start_time
171            if dt >= PROGRESS_MIN_TIME:
172                text = '%s (%s)' % (current_test, format_duration(dt))
173                running.append(text)
174        return running
175
176    finished = 0
177    test_index = 1
178    get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
179    try:
180        while finished < regrtest.ns.use_mp:
181            if use_timeout:
182                faulthandler.dump_traceback_later(test_timeout, exit=True)
183
184            try:
185                item = output.get(timeout=get_timeout)
186            except queue.Empty:
187                running = get_running(workers)
188                if running and not regrtest.ns.pgo:
189                    print('running: %s' % ', '.join(running), flush=True)
190                continue
191
192            test, stdout, stderr, result = item
193            if test is None:
194                finished += 1
195                continue
196            regrtest.accumulate_result(test, result)
197
198            # Display progress
199            ok, test_time, xml_data = result
200            text = format_test_result(test, ok)
201            if (ok not in (CHILD_ERROR, INTERRUPTED)
202                and test_time >= PROGRESS_MIN_TIME
203                and not regrtest.ns.pgo):
204                text += ' (%s)' % format_duration(test_time)
205            elif ok == CHILD_ERROR:
206                text = '%s (%s)' % (text, test_time)
207            running = get_running(workers)
208            if running and not regrtest.ns.pgo:
209                text += ' -- running: %s' % ', '.join(running)
210            regrtest.display_progress(test_index, text)
211
212            # Copy stdout and stderr from the child process
213            if stdout:
214                print(stdout, flush=True)
215            if stderr and not regrtest.ns.pgo:
216                print(stderr, file=sys.stderr, flush=True)
217
218            if result[0] == INTERRUPTED:
219                raise KeyboardInterrupt
220            test_index += 1
221    except KeyboardInterrupt:
222        regrtest.interrupted = True
223        pending.interrupted = True
224        print()
225    finally:
226        if use_timeout:
227            faulthandler.cancel_dump_traceback_later()
228
229    # If tests are interrupted, wait until tests complete
230    wait_start = time.monotonic()
231    while True:
232        running = [worker.current_test for worker in workers]
233        running = list(filter(bool, running))
234        if not running:
235            break
236
237        dt = time.monotonic() - wait_start
238        line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
239        if dt >= WAIT_PROGRESS:
240            line = "%s since %.0f sec" % (line, dt)
241        print(line, flush=True)
242        for worker in workers:
243            worker.join(WAIT_PROGRESS)
244