1"""
2This test suite exercises some system calls subject to interruption with EINTR,
3to check that it is actually handled transparently.
4It is intended to be run by the main test suite within a child process, to
5ensure there is no background thread running (so that signals are delivered to
6the correct thread).
7Signals are generated in-process using setitimer(ITIMER_REAL), which allows
8sub-second periodicity (contrarily to signal()).
9"""
10
11import contextlib
12import faulthandler
13import os
14import select
15import signal
16import socket
17import subprocess
18import sys
19import time
20import unittest
21
22from test import support
23android_not_root = support.android_not_root
24
25@contextlib.contextmanager
26def kill_on_error(proc):
27    """Context manager killing the subprocess if a Python exception is raised."""
28    with proc:
29        try:
30            yield proc
31        except:
32            proc.kill()
33            raise
34
35
36@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
37class EINTRBaseTest(unittest.TestCase):
38    """ Base class for EINTR tests. """
39
40    # delay for initial signal delivery
41    signal_delay = 0.1
42    # signal delivery periodicity
43    signal_period = 0.1
44    # default sleep time for tests - should obviously have:
45    # sleep_time > signal_period
46    sleep_time = 0.2
47
48    @classmethod
49    def setUpClass(cls):
50        cls.orig_handler = signal.signal(signal.SIGALRM, lambda *args: None)
51        signal.setitimer(signal.ITIMER_REAL, cls.signal_delay,
52                         cls.signal_period)
53
54        # Issue #25277: Use faulthandler to try to debug a hang on FreeBSD
55        if hasattr(faulthandler, 'dump_traceback_later'):
56            faulthandler.dump_traceback_later(10 * 60, exit=True)
57
58    @classmethod
59    def stop_alarm(cls):
60        signal.setitimer(signal.ITIMER_REAL, 0, 0)
61
62    @classmethod
63    def tearDownClass(cls):
64        cls.stop_alarm()
65        signal.signal(signal.SIGALRM, cls.orig_handler)
66        if hasattr(faulthandler, 'cancel_dump_traceback_later'):
67            faulthandler.cancel_dump_traceback_later()
68
69    def subprocess(self, *args, **kw):
70        cmd_args = (sys.executable, '-c') + args
71        return subprocess.Popen(cmd_args, **kw)
72
73
74@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
75class OSEINTRTest(EINTRBaseTest):
76    """ EINTR tests for the os module. """
77
78    def new_sleep_process(self):
79        code = 'import time; time.sleep(%r)' % self.sleep_time
80        return self.subprocess(code)
81
82    def _test_wait_multiple(self, wait_func):
83        num = 3
84        processes = [self.new_sleep_process() for _ in range(num)]
85        for _ in range(num):
86            wait_func()
87        # Call the Popen method to avoid a ResourceWarning
88        for proc in processes:
89            proc.wait()
90
91    def test_wait(self):
92        self._test_wait_multiple(os.wait)
93
94    @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
95    def test_wait3(self):
96        self._test_wait_multiple(lambda: os.wait3(0))
97
98    def _test_wait_single(self, wait_func):
99        proc = self.new_sleep_process()
100        wait_func(proc.pid)
101        # Call the Popen method to avoid a ResourceWarning
102        proc.wait()
103
104    def test_waitpid(self):
105        self._test_wait_single(lambda pid: os.waitpid(pid, 0))
106
107    @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
108    def test_wait4(self):
109        self._test_wait_single(lambda pid: os.wait4(pid, 0))
110
111    def test_read(self):
112        rd, wr = os.pipe()
113        self.addCleanup(os.close, rd)
114        # wr closed explicitly by parent
115
116        # the payload below are smaller than PIPE_BUF, hence the writes will be
117        # atomic
118        datas = [b"hello", b"world", b"spam"]
119
120        code = '\n'.join((
121            'import os, sys, time',
122            '',
123            'wr = int(sys.argv[1])',
124            'datas = %r' % datas,
125            'sleep_time = %r' % self.sleep_time,
126            '',
127            'for data in datas:',
128            '    # let the parent block on read()',
129            '    time.sleep(sleep_time)',
130            '    os.write(wr, data)',
131        ))
132
133        proc = self.subprocess(code, str(wr), pass_fds=[wr])
134        with kill_on_error(proc):
135            os.close(wr)
136            for data in datas:
137                self.assertEqual(data, os.read(rd, len(data)))
138            self.assertEqual(proc.wait(), 0)
139
140    def test_write(self):
141        rd, wr = os.pipe()
142        self.addCleanup(os.close, wr)
143        # rd closed explicitly by parent
144
145        # we must write enough data for the write() to block
146        data = b"x" * support.PIPE_MAX_SIZE
147
148        code = '\n'.join((
149            'import io, os, sys, time',
150            '',
151            'rd = int(sys.argv[1])',
152            'sleep_time = %r' % self.sleep_time,
153            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
154            'data_len = len(data)',
155            '',
156            '# let the parent block on write()',
157            'time.sleep(sleep_time)',
158            '',
159            'read_data = io.BytesIO()',
160            'while len(read_data.getvalue()) < data_len:',
161            '    chunk = os.read(rd, 2 * data_len)',
162            '    read_data.write(chunk)',
163            '',
164            'value = read_data.getvalue()',
165            'if value != data:',
166            '    raise Exception("read error: %s vs %s bytes"',
167            '                    % (len(value), data_len))',
168        ))
169
170        proc = self.subprocess(code, str(rd), pass_fds=[rd])
171        with kill_on_error(proc):
172            os.close(rd)
173            written = 0
174            while written < len(data):
175                written += os.write(wr, memoryview(data)[written:])
176            self.assertEqual(proc.wait(), 0)
177
178
179@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
180class SocketEINTRTest(EINTRBaseTest):
181    """ EINTR tests for the socket module. """
182
183    @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
184    def _test_recv(self, recv_func):
185        rd, wr = socket.socketpair()
186        self.addCleanup(rd.close)
187        # wr closed explicitly by parent
188
189        # single-byte payload guard us against partial recv
190        datas = [b"x", b"y", b"z"]
191
192        code = '\n'.join((
193            'import os, socket, sys, time',
194            '',
195            'fd = int(sys.argv[1])',
196            'family = %s' % int(wr.family),
197            'sock_type = %s' % int(wr.type),
198            'datas = %r' % datas,
199            'sleep_time = %r' % self.sleep_time,
200            '',
201            'wr = socket.fromfd(fd, family, sock_type)',
202            'os.close(fd)',
203            '',
204            'with wr:',
205            '    for data in datas:',
206            '        # let the parent block on recv()',
207            '        time.sleep(sleep_time)',
208            '        wr.sendall(data)',
209        ))
210
211        fd = wr.fileno()
212        proc = self.subprocess(code, str(fd), pass_fds=[fd])
213        with kill_on_error(proc):
214            wr.close()
215            for data in datas:
216                self.assertEqual(data, recv_func(rd, len(data)))
217            self.assertEqual(proc.wait(), 0)
218
219    def test_recv(self):
220        self._test_recv(socket.socket.recv)
221
222    @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
223    def test_recvmsg(self):
224        self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
225
226    def _test_send(self, send_func):
227        rd, wr = socket.socketpair()
228        self.addCleanup(wr.close)
229        # rd closed explicitly by parent
230
231        # we must send enough data for the send() to block
232        data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
233
234        code = '\n'.join((
235            'import os, socket, sys, time',
236            '',
237            'fd = int(sys.argv[1])',
238            'family = %s' % int(rd.family),
239            'sock_type = %s' % int(rd.type),
240            'sleep_time = %r' % self.sleep_time,
241            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
242            'data_len = len(data)',
243            '',
244            'rd = socket.fromfd(fd, family, sock_type)',
245            'os.close(fd)',
246            '',
247            'with rd:',
248            '    # let the parent block on send()',
249            '    time.sleep(sleep_time)',
250            '',
251            '    received_data = bytearray(data_len)',
252            '    n = 0',
253            '    while n < data_len:',
254            '        n += rd.recv_into(memoryview(received_data)[n:])',
255            '',
256            'if received_data != data:',
257            '    raise Exception("recv error: %s vs %s bytes"',
258            '                    % (len(received_data), data_len))',
259        ))
260
261        fd = rd.fileno()
262        proc = self.subprocess(code, str(fd), pass_fds=[fd])
263        with kill_on_error(proc):
264            rd.close()
265            written = 0
266            while written < len(data):
267                sent = send_func(wr, memoryview(data)[written:])
268                # sendall() returns None
269                written += len(data) if sent is None else sent
270            self.assertEqual(proc.wait(), 0)
271
272    def test_send(self):
273        self._test_send(socket.socket.send)
274
275    def test_sendall(self):
276        self._test_send(socket.socket.sendall)
277
278    @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
279    def test_sendmsg(self):
280        self._test_send(lambda sock, data: sock.sendmsg([data]))
281
282    def test_accept(self):
283        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
284        self.addCleanup(sock.close)
285
286        sock.bind((support.HOST, 0))
287        port = sock.getsockname()[1]
288        sock.listen()
289
290        code = '\n'.join((
291            'import socket, time',
292            '',
293            'host = %r' % support.HOST,
294            'port = %s' % port,
295            'sleep_time = %r' % self.sleep_time,
296            '',
297            '# let parent block on accept()',
298            'time.sleep(sleep_time)',
299            'with socket.create_connection((host, port)):',
300            '    time.sleep(sleep_time)',
301        ))
302
303        proc = self.subprocess(code)
304        with kill_on_error(proc):
305            client_sock, _ = sock.accept()
306            client_sock.close()
307            self.assertEqual(proc.wait(), 0)
308
309    # Issue #25122: There is a race condition in the FreeBSD kernel on
310    # handling signals in the FIFO device. Skip the test until the bug is
311    # fixed in the kernel.
312    # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
313    @support.requires_freebsd_version(10, 3)
314    @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
315    @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
316    def _test_open(self, do_open_close_reader, do_open_close_writer):
317        filename = support.TESTFN
318
319        # Use a fifo: until the child opens it for reading, the parent will
320        # block when trying to open it for writing.
321        support.unlink(filename)
322        os.mkfifo(filename)
323        self.addCleanup(support.unlink, filename)
324
325        code = '\n'.join((
326            'import os, time',
327            '',
328            'path = %a' % filename,
329            'sleep_time = %r' % self.sleep_time,
330            '',
331            '# let the parent block',
332            'time.sleep(sleep_time)',
333            '',
334            do_open_close_reader,
335        ))
336
337        proc = self.subprocess(code)
338        with kill_on_error(proc):
339            do_open_close_writer(filename)
340            self.assertEqual(proc.wait(), 0)
341
342    def python_open(self, path):
343        fp = open(path, 'w')
344        fp.close()
345
346    def test_open(self):
347        self._test_open("fp = open(path, 'r')\nfp.close()",
348                        self.python_open)
349
350    @unittest.skipIf(sys.platform == 'darwin', "hangs under OS X; see issue #25234")
351    def os_open(self, path):
352        fd = os.open(path, os.O_WRONLY)
353        os.close(fd)
354
355    @unittest.skipIf(sys.platform == "darwin", "hangs under OS X; see issue #25234")
356    def test_os_open(self):
357        self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
358                        self.os_open)
359
360
361@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
362class TimeEINTRTest(EINTRBaseTest):
363    """ EINTR tests for the time module. """
364
365    def test_sleep(self):
366        t0 = time.monotonic()
367        time.sleep(self.sleep_time)
368        self.stop_alarm()
369        dt = time.monotonic() - t0
370        self.assertGreaterEqual(dt, self.sleep_time)
371
372
373@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
374class SignalEINTRTest(EINTRBaseTest):
375    """ EINTR tests for the signal module. """
376
377    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
378                         'need signal.sigtimedwait()')
379    def test_sigtimedwait(self):
380        t0 = time.monotonic()
381        signal.sigtimedwait([signal.SIGUSR1], self.sleep_time)
382        dt = time.monotonic() - t0
383        self.assertGreaterEqual(dt, self.sleep_time)
384
385    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
386                         'need signal.sigwaitinfo()')
387    def test_sigwaitinfo(self):
388        # Issue #25277, #25868: give a few milliseconds to the parent process
389        # between os.write() and signal.sigwaitinfo() to works around a race
390        # condition
391        self.sleep_time = 0.100
392
393        signum = signal.SIGUSR1
394        pid = os.getpid()
395
396        old_handler = signal.signal(signum, lambda *args: None)
397        self.addCleanup(signal.signal, signum, old_handler)
398
399        rpipe, wpipe = os.pipe()
400
401        code = '\n'.join((
402            'import os, time',
403            'pid = %s' % os.getpid(),
404            'signum = %s' % int(signum),
405            'sleep_time = %r' % self.sleep_time,
406            'rpipe = %r' % rpipe,
407            'os.read(rpipe, 1)',
408            'os.close(rpipe)',
409            'time.sleep(sleep_time)',
410            'os.kill(pid, signum)',
411        ))
412
413        t0 = time.monotonic()
414        proc = self.subprocess(code, pass_fds=(rpipe,))
415        os.close(rpipe)
416        with kill_on_error(proc):
417            # sync child-parent
418            os.write(wpipe, b'x')
419            os.close(wpipe)
420
421            # parent
422            signal.sigwaitinfo([signum])
423            dt = time.monotonic() - t0
424            self.assertEqual(proc.wait(), 0)
425
426        self.assertGreaterEqual(dt, self.sleep_time)
427
428
429@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
430class SelectEINTRTest(EINTRBaseTest):
431    """ EINTR tests for the select module. """
432
433    def test_select(self):
434        t0 = time.monotonic()
435        select.select([], [], [], self.sleep_time)
436        dt = time.monotonic() - t0
437        self.stop_alarm()
438        self.assertGreaterEqual(dt, self.sleep_time)
439
440    @unittest.skipIf(sys.platform == "darwin",
441                     "poll may fail on macOS; see issue #28087")
442    @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
443    def test_poll(self):
444        poller = select.poll()
445
446        t0 = time.monotonic()
447        poller.poll(self.sleep_time * 1e3)
448        dt = time.monotonic() - t0
449        self.stop_alarm()
450        self.assertGreaterEqual(dt, self.sleep_time)
451
452    @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
453    def test_epoll(self):
454        poller = select.epoll()
455        self.addCleanup(poller.close)
456
457        t0 = time.monotonic()
458        poller.poll(self.sleep_time)
459        dt = time.monotonic() - t0
460        self.stop_alarm()
461        self.assertGreaterEqual(dt, self.sleep_time)
462
463    @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
464    def test_kqueue(self):
465        kqueue = select.kqueue()
466        self.addCleanup(kqueue.close)
467
468        t0 = time.monotonic()
469        kqueue.control(None, 1, self.sleep_time)
470        dt = time.monotonic() - t0
471        self.stop_alarm()
472        self.assertGreaterEqual(dt, self.sleep_time)
473
474    @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
475    def test_devpoll(self):
476        poller = select.devpoll()
477        self.addCleanup(poller.close)
478
479        t0 = time.monotonic()
480        poller.poll(self.sleep_time * 1e3)
481        dt = time.monotonic() - t0
482        self.stop_alarm()
483        self.assertGreaterEqual(dt, self.sleep_time)
484
485
486def test_main():
487    support.run_unittest(
488        OSEINTRTest,
489        SocketEINTRTest,
490        TimeEINTRTest,
491        SignalEINTRTest,
492        SelectEINTRTest)
493
494
495if __name__ == "__main__":
496    test_main()
497