1"""
2This test suite exercises some system calls subject to interruption with EINTR,
3to check that it is actually handled transparently.
4It is intended to be run by the main test suite within a child process, to
5ensure there is no background thread running (so that signals are delivered to
6the correct thread).
7Signals are generated in-process using setitimer(ITIMER_REAL), which allows
8sub-second periodicity (contrarily to signal()).
9"""
10
11import contextlib
12import faulthandler
13import fcntl
14import os
15import platform
16import select
17import signal
18import socket
19import subprocess
20import sys
21import time
22import unittest
23
24from test import support
25
26@contextlib.contextmanager
27def kill_on_error(proc):
28    """Context manager killing the subprocess if a Python exception is raised."""
29    with proc:
30        try:
31            yield proc
32        except:
33            proc.kill()
34            raise
35
36
37@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
38class EINTRBaseTest(unittest.TestCase):
39    """ Base class for EINTR tests. """
40
41    # delay for initial signal delivery
42    signal_delay = 0.1
43    # signal delivery periodicity
44    signal_period = 0.1
45    # default sleep time for tests - should obviously have:
46    # sleep_time > signal_period
47    sleep_time = 0.2
48
49    def sighandler(self, signum, frame):
50        self.signals += 1
51
52    def setUp(self):
53        self.signals = 0
54        self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
55        signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
56                         self.signal_period)
57
58        # Use faulthandler as watchdog to debug when a test hangs
59        # (timeout of 10 minutes)
60        if hasattr(faulthandler, 'dump_traceback_later'):
61            faulthandler.dump_traceback_later(10 * 60, exit=True,
62                                              file=sys.__stderr__)
63
64    @staticmethod
65    def stop_alarm():
66        signal.setitimer(signal.ITIMER_REAL, 0, 0)
67
68    def tearDown(self):
69        self.stop_alarm()
70        signal.signal(signal.SIGALRM, self.orig_handler)
71        if hasattr(faulthandler, 'cancel_dump_traceback_later'):
72            faulthandler.cancel_dump_traceback_later()
73
74    def subprocess(self, *args, **kw):
75        cmd_args = (sys.executable, '-c') + args
76        return subprocess.Popen(cmd_args, **kw)
77
78
79@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
80class OSEINTRTest(EINTRBaseTest):
81    """ EINTR tests for the os module. """
82
83    def new_sleep_process(self):
84        code = 'import time; time.sleep(%r)' % self.sleep_time
85        return self.subprocess(code)
86
87    def _test_wait_multiple(self, wait_func):
88        num = 3
89        processes = [self.new_sleep_process() for _ in range(num)]
90        for _ in range(num):
91            wait_func()
92        # Call the Popen method to avoid a ResourceWarning
93        for proc in processes:
94            proc.wait()
95
96    def test_wait(self):
97        self._test_wait_multiple(os.wait)
98
99    @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
100    def test_wait3(self):
101        self._test_wait_multiple(lambda: os.wait3(0))
102
103    def _test_wait_single(self, wait_func):
104        proc = self.new_sleep_process()
105        wait_func(proc.pid)
106        # Call the Popen method to avoid a ResourceWarning
107        proc.wait()
108
109    def test_waitpid(self):
110        self._test_wait_single(lambda pid: os.waitpid(pid, 0))
111
112    @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
113    def test_wait4(self):
114        self._test_wait_single(lambda pid: os.wait4(pid, 0))
115
116    def test_read(self):
117        rd, wr = os.pipe()
118        self.addCleanup(os.close, rd)
119        # wr closed explicitly by parent
120
121        # the payload below are smaller than PIPE_BUF, hence the writes will be
122        # atomic
123        datas = [b"hello", b"world", b"spam"]
124
125        code = '\n'.join((
126            'import os, sys, time',
127            '',
128            'wr = int(sys.argv[1])',
129            'datas = %r' % datas,
130            'sleep_time = %r' % self.sleep_time,
131            '',
132            'for data in datas:',
133            '    # let the parent block on read()',
134            '    time.sleep(sleep_time)',
135            '    os.write(wr, data)',
136        ))
137
138        proc = self.subprocess(code, str(wr), pass_fds=[wr])
139        with kill_on_error(proc):
140            os.close(wr)
141            for data in datas:
142                self.assertEqual(data, os.read(rd, len(data)))
143            self.assertEqual(proc.wait(), 0)
144
145    def test_write(self):
146        rd, wr = os.pipe()
147        self.addCleanup(os.close, wr)
148        # rd closed explicitly by parent
149
150        # we must write enough data for the write() to block
151        data = b"x" * support.PIPE_MAX_SIZE
152
153        code = '\n'.join((
154            'import io, os, sys, time',
155            '',
156            'rd = int(sys.argv[1])',
157            'sleep_time = %r' % self.sleep_time,
158            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
159            'data_len = len(data)',
160            '',
161            '# let the parent block on write()',
162            'time.sleep(sleep_time)',
163            '',
164            'read_data = io.BytesIO()',
165            'while len(read_data.getvalue()) < data_len:',
166            '    chunk = os.read(rd, 2 * data_len)',
167            '    read_data.write(chunk)',
168            '',
169            'value = read_data.getvalue()',
170            'if value != data:',
171            '    raise Exception("read error: %s vs %s bytes"',
172            '                    % (len(value), data_len))',
173        ))
174
175        proc = self.subprocess(code, str(rd), pass_fds=[rd])
176        with kill_on_error(proc):
177            os.close(rd)
178            written = 0
179            while written < len(data):
180                written += os.write(wr, memoryview(data)[written:])
181            self.assertEqual(proc.wait(), 0)
182
183
184@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
185class SocketEINTRTest(EINTRBaseTest):
186    """ EINTR tests for the socket module. """
187
188    @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
189    def _test_recv(self, recv_func):
190        rd, wr = socket.socketpair()
191        self.addCleanup(rd.close)
192        # wr closed explicitly by parent
193
194        # single-byte payload guard us against partial recv
195        datas = [b"x", b"y", b"z"]
196
197        code = '\n'.join((
198            'import os, socket, sys, time',
199            '',
200            'fd = int(sys.argv[1])',
201            'family = %s' % int(wr.family),
202            'sock_type = %s' % int(wr.type),
203            'datas = %r' % datas,
204            'sleep_time = %r' % self.sleep_time,
205            '',
206            'wr = socket.fromfd(fd, family, sock_type)',
207            'os.close(fd)',
208            '',
209            'with wr:',
210            '    for data in datas:',
211            '        # let the parent block on recv()',
212            '        time.sleep(sleep_time)',
213            '        wr.sendall(data)',
214        ))
215
216        fd = wr.fileno()
217        proc = self.subprocess(code, str(fd), pass_fds=[fd])
218        with kill_on_error(proc):
219            wr.close()
220            for data in datas:
221                self.assertEqual(data, recv_func(rd, len(data)))
222            self.assertEqual(proc.wait(), 0)
223
224    def test_recv(self):
225        self._test_recv(socket.socket.recv)
226
227    @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
228    def test_recvmsg(self):
229        self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
230
231    def _test_send(self, send_func):
232        rd, wr = socket.socketpair()
233        self.addCleanup(wr.close)
234        # rd closed explicitly by parent
235
236        # we must send enough data for the send() to block
237        data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
238
239        code = '\n'.join((
240            'import os, socket, sys, time',
241            '',
242            'fd = int(sys.argv[1])',
243            'family = %s' % int(rd.family),
244            'sock_type = %s' % int(rd.type),
245            'sleep_time = %r' % self.sleep_time,
246            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
247            'data_len = len(data)',
248            '',
249            'rd = socket.fromfd(fd, family, sock_type)',
250            'os.close(fd)',
251            '',
252            'with rd:',
253            '    # let the parent block on send()',
254            '    time.sleep(sleep_time)',
255            '',
256            '    received_data = bytearray(data_len)',
257            '    n = 0',
258            '    while n < data_len:',
259            '        n += rd.recv_into(memoryview(received_data)[n:])',
260            '',
261            'if received_data != data:',
262            '    raise Exception("recv error: %s vs %s bytes"',
263            '                    % (len(received_data), data_len))',
264        ))
265
266        fd = rd.fileno()
267        proc = self.subprocess(code, str(fd), pass_fds=[fd])
268        with kill_on_error(proc):
269            rd.close()
270            written = 0
271            while written < len(data):
272                sent = send_func(wr, memoryview(data)[written:])
273                # sendall() returns None
274                written += len(data) if sent is None else sent
275            self.assertEqual(proc.wait(), 0)
276
277    def test_send(self):
278        self._test_send(socket.socket.send)
279
280    def test_sendall(self):
281        self._test_send(socket.socket.sendall)
282
283    @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
284    def test_sendmsg(self):
285        self._test_send(lambda sock, data: sock.sendmsg([data]))
286
287    def test_accept(self):
288        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
289        self.addCleanup(sock.close)
290
291        sock.bind((support.HOST, 0))
292        port = sock.getsockname()[1]
293        sock.listen()
294
295        code = '\n'.join((
296            'import socket, time',
297            '',
298            'host = %r' % support.HOST,
299            'port = %s' % port,
300            'sleep_time = %r' % self.sleep_time,
301            '',
302            '# let parent block on accept()',
303            'time.sleep(sleep_time)',
304            'with socket.create_connection((host, port)):',
305            '    time.sleep(sleep_time)',
306        ))
307
308        proc = self.subprocess(code)
309        with kill_on_error(proc):
310            client_sock, _ = sock.accept()
311            client_sock.close()
312            self.assertEqual(proc.wait(), 0)
313
314    # Issue #25122: There is a race condition in the FreeBSD kernel on
315    # handling signals in the FIFO device. Skip the test until the bug is
316    # fixed in the kernel.
317    # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
318    @support.requires_freebsd_version(10, 3)
319    @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
320    def _test_open(self, do_open_close_reader, do_open_close_writer):
321        filename = support.TESTFN
322
323        # Use a fifo: until the child opens it for reading, the parent will
324        # block when trying to open it for writing.
325        support.unlink(filename)
326        try:
327            os.mkfifo(filename)
328        except PermissionError as e:
329            self.skipTest('os.mkfifo(): %s' % e)
330        self.addCleanup(support.unlink, filename)
331
332        code = '\n'.join((
333            'import os, time',
334            '',
335            'path = %a' % filename,
336            'sleep_time = %r' % self.sleep_time,
337            '',
338            '# let the parent block',
339            'time.sleep(sleep_time)',
340            '',
341            do_open_close_reader,
342        ))
343
344        proc = self.subprocess(code)
345        with kill_on_error(proc):
346            do_open_close_writer(filename)
347            self.assertEqual(proc.wait(), 0)
348
349    def python_open(self, path):
350        fp = open(path, 'w')
351        fp.close()
352
353    @unittest.skipIf(sys.platform == "darwin",
354                     "hangs under macOS; see bpo-25234, bpo-35363")
355    def test_open(self):
356        self._test_open("fp = open(path, 'r')\nfp.close()",
357                        self.python_open)
358
359    def os_open(self, path):
360        fd = os.open(path, os.O_WRONLY)
361        os.close(fd)
362
363    @unittest.skipIf(sys.platform == "darwin",
364                     "hangs under macOS; see bpo-25234, bpo-35363")
365    def test_os_open(self):
366        self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
367                        self.os_open)
368
369
370@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
371class TimeEINTRTest(EINTRBaseTest):
372    """ EINTR tests for the time module. """
373
374    def test_sleep(self):
375        t0 = time.monotonic()
376        time.sleep(self.sleep_time)
377        self.stop_alarm()
378        dt = time.monotonic() - t0
379        self.assertGreaterEqual(dt, self.sleep_time)
380
381
382@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
383# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
384# is vulnerable to a race condition between the child and the parent processes.
385@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
386                     'need signal.pthread_sigmask()')
387class SignalEINTRTest(EINTRBaseTest):
388    """ EINTR tests for the signal module. """
389
390    def check_sigwait(self, wait_func):
391        signum = signal.SIGUSR1
392        pid = os.getpid()
393
394        old_handler = signal.signal(signum, lambda *args: None)
395        self.addCleanup(signal.signal, signum, old_handler)
396
397        code = '\n'.join((
398            'import os, time',
399            'pid = %s' % os.getpid(),
400            'signum = %s' % int(signum),
401            'sleep_time = %r' % self.sleep_time,
402            'time.sleep(sleep_time)',
403            'os.kill(pid, signum)',
404        ))
405
406        old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
407        self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
408
409        t0 = time.monotonic()
410        proc = self.subprocess(code)
411        with kill_on_error(proc):
412            wait_func(signum)
413            dt = time.monotonic() - t0
414
415        self.assertEqual(proc.wait(), 0)
416
417    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
418                         'need signal.sigwaitinfo()')
419    def test_sigwaitinfo(self):
420        def wait_func(signum):
421            signal.sigwaitinfo([signum])
422
423        self.check_sigwait(wait_func)
424
425    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
426                         'need signal.sigwaitinfo()')
427    def test_sigtimedwait(self):
428        def wait_func(signum):
429            signal.sigtimedwait([signum], 120.0)
430
431        self.check_sigwait(wait_func)
432
433
434@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
435class SelectEINTRTest(EINTRBaseTest):
436    """ EINTR tests for the select module. """
437
438    def test_select(self):
439        t0 = time.monotonic()
440        select.select([], [], [], self.sleep_time)
441        dt = time.monotonic() - t0
442        self.stop_alarm()
443        self.assertGreaterEqual(dt, self.sleep_time)
444
445    @unittest.skipIf(sys.platform == "darwin",
446                     "poll may fail on macOS; see issue #28087")
447    @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
448    def test_poll(self):
449        poller = select.poll()
450
451        t0 = time.monotonic()
452        poller.poll(self.sleep_time * 1e3)
453        dt = time.monotonic() - t0
454        self.stop_alarm()
455        self.assertGreaterEqual(dt, self.sleep_time)
456
457    @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
458    def test_epoll(self):
459        poller = select.epoll()
460        self.addCleanup(poller.close)
461
462        t0 = time.monotonic()
463        poller.poll(self.sleep_time)
464        dt = time.monotonic() - t0
465        self.stop_alarm()
466        self.assertGreaterEqual(dt, self.sleep_time)
467
468    @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
469    def test_kqueue(self):
470        kqueue = select.kqueue()
471        self.addCleanup(kqueue.close)
472
473        t0 = time.monotonic()
474        kqueue.control(None, 1, self.sleep_time)
475        dt = time.monotonic() - t0
476        self.stop_alarm()
477        self.assertGreaterEqual(dt, self.sleep_time)
478
479    @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
480    def test_devpoll(self):
481        poller = select.devpoll()
482        self.addCleanup(poller.close)
483
484        t0 = time.monotonic()
485        poller.poll(self.sleep_time * 1e3)
486        dt = time.monotonic() - t0
487        self.stop_alarm()
488        self.assertGreaterEqual(dt, self.sleep_time)
489
490
491class FNTLEINTRTest(EINTRBaseTest):
492    def _lock(self, lock_func, lock_name):
493        self.addCleanup(support.unlink, support.TESTFN)
494        code = '\n'.join((
495            "import fcntl, time",
496            "with open('%s', 'wb') as f:" % support.TESTFN,
497            "   fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
498            "   time.sleep(%s)" % self.sleep_time))
499        start_time = time.monotonic()
500        proc = self.subprocess(code)
501        with kill_on_error(proc):
502            with open(support.TESTFN, 'wb') as f:
503                while True:  # synchronize the subprocess
504                    dt = time.monotonic() - start_time
505                    if dt > 60.0:
506                        raise Exception("failed to sync child in %.1f sec" % dt)
507                    try:
508                        lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
509                        lock_func(f, fcntl.LOCK_UN)
510                        time.sleep(0.01)
511                    except BlockingIOError:
512                        break
513                # the child locked the file just a moment ago for 'sleep_time' seconds
514                # that means that the lock below will block for 'sleep_time' minus some
515                # potential context switch delay
516                lock_func(f, fcntl.LOCK_EX)
517                dt = time.monotonic() - start_time
518                self.assertGreaterEqual(dt, self.sleep_time)
519                self.stop_alarm()
520            proc.wait()
521
522    # Issue 35633: See https://bugs.python.org/issue35633#msg333662
523    # skip test rather than accept PermissionError from all platforms
524    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
525    def test_lockf(self):
526        self._lock(fcntl.lockf, "lockf")
527
528    def test_flock(self):
529        self._lock(fcntl.flock, "flock")
530
531
532if __name__ == "__main__":
533    unittest.main()
534