1""" 2This test suite exercises some system calls subject to interruption with EINTR, 3to check that it is actually handled transparently. 4It is intended to be run by the main test suite within a child process, to 5ensure there is no background thread running (so that signals are delivered to 6the correct thread). 7Signals are generated in-process using setitimer(ITIMER_REAL), which allows 8sub-second periodicity (contrarily to signal()). 9""" 10 11import contextlib 12import faulthandler 13import fcntl 14import os 15import platform 16import select 17import signal 18import socket 19import subprocess 20import sys 21import time 22import unittest 23 24from test import support 25 26@contextlib.contextmanager 27def kill_on_error(proc): 28 """Context manager killing the subprocess if a Python exception is raised.""" 29 with proc: 30 try: 31 yield proc 32 except: 33 proc.kill() 34 raise 35 36 37@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 38class EINTRBaseTest(unittest.TestCase): 39 """ Base class for EINTR tests. """ 40 41 # delay for initial signal delivery 42 signal_delay = 0.1 43 # signal delivery periodicity 44 signal_period = 0.1 45 # default sleep time for tests - should obviously have: 46 # sleep_time > signal_period 47 sleep_time = 0.2 48 49 def sighandler(self, signum, frame): 50 self.signals += 1 51 52 def setUp(self): 53 self.signals = 0 54 self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) 55 signal.setitimer(signal.ITIMER_REAL, self.signal_delay, 56 self.signal_period) 57 58 # Use faulthandler as watchdog to debug when a test hangs 59 # (timeout of 10 minutes) 60 if hasattr(faulthandler, 'dump_traceback_later'): 61 faulthandler.dump_traceback_later(10 * 60, exit=True, 62 file=sys.__stderr__) 63 64 @staticmethod 65 def stop_alarm(): 66 signal.setitimer(signal.ITIMER_REAL, 0, 0) 67 68 def tearDown(self): 69 self.stop_alarm() 70 signal.signal(signal.SIGALRM, self.orig_handler) 71 if hasattr(faulthandler, 'cancel_dump_traceback_later'): 72 faulthandler.cancel_dump_traceback_later() 73 74 def subprocess(self, *args, **kw): 75 cmd_args = (sys.executable, '-c') + args 76 return subprocess.Popen(cmd_args, **kw) 77 78 79@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 80class OSEINTRTest(EINTRBaseTest): 81 """ EINTR tests for the os module. """ 82 83 def new_sleep_process(self): 84 code = 'import time; time.sleep(%r)' % self.sleep_time 85 return self.subprocess(code) 86 87 def _test_wait_multiple(self, wait_func): 88 num = 3 89 processes = [self.new_sleep_process() for _ in range(num)] 90 for _ in range(num): 91 wait_func() 92 # Call the Popen method to avoid a ResourceWarning 93 for proc in processes: 94 proc.wait() 95 96 def test_wait(self): 97 self._test_wait_multiple(os.wait) 98 99 @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') 100 def test_wait3(self): 101 self._test_wait_multiple(lambda: os.wait3(0)) 102 103 def _test_wait_single(self, wait_func): 104 proc = self.new_sleep_process() 105 wait_func(proc.pid) 106 # Call the Popen method to avoid a ResourceWarning 107 proc.wait() 108 109 def test_waitpid(self): 110 self._test_wait_single(lambda pid: os.waitpid(pid, 0)) 111 112 @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') 113 def test_wait4(self): 114 self._test_wait_single(lambda pid: os.wait4(pid, 0)) 115 116 def test_read(self): 117 rd, wr = os.pipe() 118 self.addCleanup(os.close, rd) 119 # wr closed explicitly by parent 120 121 # the payload below are smaller than PIPE_BUF, hence the writes will be 122 # atomic 123 datas = [b"hello", b"world", b"spam"] 124 125 code = '\n'.join(( 126 'import os, sys, time', 127 '', 128 'wr = int(sys.argv[1])', 129 'datas = %r' % datas, 130 'sleep_time = %r' % self.sleep_time, 131 '', 132 'for data in datas:', 133 ' # let the parent block on read()', 134 ' time.sleep(sleep_time)', 135 ' os.write(wr, data)', 136 )) 137 138 proc = self.subprocess(code, str(wr), pass_fds=[wr]) 139 with kill_on_error(proc): 140 os.close(wr) 141 for data in datas: 142 self.assertEqual(data, os.read(rd, len(data))) 143 self.assertEqual(proc.wait(), 0) 144 145 def test_write(self): 146 rd, wr = os.pipe() 147 self.addCleanup(os.close, wr) 148 # rd closed explicitly by parent 149 150 # we must write enough data for the write() to block 151 data = b"x" * support.PIPE_MAX_SIZE 152 153 code = '\n'.join(( 154 'import io, os, sys, time', 155 '', 156 'rd = int(sys.argv[1])', 157 'sleep_time = %r' % self.sleep_time, 158 'data = b"x" * %s' % support.PIPE_MAX_SIZE, 159 'data_len = len(data)', 160 '', 161 '# let the parent block on write()', 162 'time.sleep(sleep_time)', 163 '', 164 'read_data = io.BytesIO()', 165 'while len(read_data.getvalue()) < data_len:', 166 ' chunk = os.read(rd, 2 * data_len)', 167 ' read_data.write(chunk)', 168 '', 169 'value = read_data.getvalue()', 170 'if value != data:', 171 ' raise Exception("read error: %s vs %s bytes"', 172 ' % (len(value), data_len))', 173 )) 174 175 proc = self.subprocess(code, str(rd), pass_fds=[rd]) 176 with kill_on_error(proc): 177 os.close(rd) 178 written = 0 179 while written < len(data): 180 written += os.write(wr, memoryview(data)[written:]) 181 self.assertEqual(proc.wait(), 0) 182 183 184@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 185class SocketEINTRTest(EINTRBaseTest): 186 """ EINTR tests for the socket module. """ 187 188 @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') 189 def _test_recv(self, recv_func): 190 rd, wr = socket.socketpair() 191 self.addCleanup(rd.close) 192 # wr closed explicitly by parent 193 194 # single-byte payload guard us against partial recv 195 datas = [b"x", b"y", b"z"] 196 197 code = '\n'.join(( 198 'import os, socket, sys, time', 199 '', 200 'fd = int(sys.argv[1])', 201 'family = %s' % int(wr.family), 202 'sock_type = %s' % int(wr.type), 203 'datas = %r' % datas, 204 'sleep_time = %r' % self.sleep_time, 205 '', 206 'wr = socket.fromfd(fd, family, sock_type)', 207 'os.close(fd)', 208 '', 209 'with wr:', 210 ' for data in datas:', 211 ' # let the parent block on recv()', 212 ' time.sleep(sleep_time)', 213 ' wr.sendall(data)', 214 )) 215 216 fd = wr.fileno() 217 proc = self.subprocess(code, str(fd), pass_fds=[fd]) 218 with kill_on_error(proc): 219 wr.close() 220 for data in datas: 221 self.assertEqual(data, recv_func(rd, len(data))) 222 self.assertEqual(proc.wait(), 0) 223 224 def test_recv(self): 225 self._test_recv(socket.socket.recv) 226 227 @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') 228 def test_recvmsg(self): 229 self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) 230 231 def _test_send(self, send_func): 232 rd, wr = socket.socketpair() 233 self.addCleanup(wr.close) 234 # rd closed explicitly by parent 235 236 # we must send enough data for the send() to block 237 data = b"xyz" * (support.SOCK_MAX_SIZE // 3) 238 239 code = '\n'.join(( 240 'import os, socket, sys, time', 241 '', 242 'fd = int(sys.argv[1])', 243 'family = %s' % int(rd.family), 244 'sock_type = %s' % int(rd.type), 245 'sleep_time = %r' % self.sleep_time, 246 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3), 247 'data_len = len(data)', 248 '', 249 'rd = socket.fromfd(fd, family, sock_type)', 250 'os.close(fd)', 251 '', 252 'with rd:', 253 ' # let the parent block on send()', 254 ' time.sleep(sleep_time)', 255 '', 256 ' received_data = bytearray(data_len)', 257 ' n = 0', 258 ' while n < data_len:', 259 ' n += rd.recv_into(memoryview(received_data)[n:])', 260 '', 261 'if received_data != data:', 262 ' raise Exception("recv error: %s vs %s bytes"', 263 ' % (len(received_data), data_len))', 264 )) 265 266 fd = rd.fileno() 267 proc = self.subprocess(code, str(fd), pass_fds=[fd]) 268 with kill_on_error(proc): 269 rd.close() 270 written = 0 271 while written < len(data): 272 sent = send_func(wr, memoryview(data)[written:]) 273 # sendall() returns None 274 written += len(data) if sent is None else sent 275 self.assertEqual(proc.wait(), 0) 276 277 def test_send(self): 278 self._test_send(socket.socket.send) 279 280 def test_sendall(self): 281 self._test_send(socket.socket.sendall) 282 283 @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') 284 def test_sendmsg(self): 285 self._test_send(lambda sock, data: sock.sendmsg([data])) 286 287 def test_accept(self): 288 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 289 self.addCleanup(sock.close) 290 291 sock.bind((support.HOST, 0)) 292 port = sock.getsockname()[1] 293 sock.listen() 294 295 code = '\n'.join(( 296 'import socket, time', 297 '', 298 'host = %r' % support.HOST, 299 'port = %s' % port, 300 'sleep_time = %r' % self.sleep_time, 301 '', 302 '# let parent block on accept()', 303 'time.sleep(sleep_time)', 304 'with socket.create_connection((host, port)):', 305 ' time.sleep(sleep_time)', 306 )) 307 308 proc = self.subprocess(code) 309 with kill_on_error(proc): 310 client_sock, _ = sock.accept() 311 client_sock.close() 312 self.assertEqual(proc.wait(), 0) 313 314 # Issue #25122: There is a race condition in the FreeBSD kernel on 315 # handling signals in the FIFO device. Skip the test until the bug is 316 # fixed in the kernel. 317 # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 318 @support.requires_freebsd_version(10, 3) 319 @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') 320 def _test_open(self, do_open_close_reader, do_open_close_writer): 321 filename = support.TESTFN 322 323 # Use a fifo: until the child opens it for reading, the parent will 324 # block when trying to open it for writing. 325 support.unlink(filename) 326 try: 327 os.mkfifo(filename) 328 except PermissionError as e: 329 self.skipTest('os.mkfifo(): %s' % e) 330 self.addCleanup(support.unlink, filename) 331 332 code = '\n'.join(( 333 'import os, time', 334 '', 335 'path = %a' % filename, 336 'sleep_time = %r' % self.sleep_time, 337 '', 338 '# let the parent block', 339 'time.sleep(sleep_time)', 340 '', 341 do_open_close_reader, 342 )) 343 344 proc = self.subprocess(code) 345 with kill_on_error(proc): 346 do_open_close_writer(filename) 347 self.assertEqual(proc.wait(), 0) 348 349 def python_open(self, path): 350 fp = open(path, 'w') 351 fp.close() 352 353 @unittest.skipIf(sys.platform == "darwin", 354 "hangs under macOS; see bpo-25234, bpo-35363") 355 def test_open(self): 356 self._test_open("fp = open(path, 'r')\nfp.close()", 357 self.python_open) 358 359 def os_open(self, path): 360 fd = os.open(path, os.O_WRONLY) 361 os.close(fd) 362 363 @unittest.skipIf(sys.platform == "darwin", 364 "hangs under macOS; see bpo-25234, bpo-35363") 365 def test_os_open(self): 366 self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", 367 self.os_open) 368 369 370@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 371class TimeEINTRTest(EINTRBaseTest): 372 """ EINTR tests for the time module. """ 373 374 def test_sleep(self): 375 t0 = time.monotonic() 376 time.sleep(self.sleep_time) 377 self.stop_alarm() 378 dt = time.monotonic() - t0 379 self.assertGreaterEqual(dt, self.sleep_time) 380 381 382@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 383# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test 384# is vulnerable to a race condition between the child and the parent processes. 385@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 386 'need signal.pthread_sigmask()') 387class SignalEINTRTest(EINTRBaseTest): 388 """ EINTR tests for the signal module. """ 389 390 def check_sigwait(self, wait_func): 391 signum = signal.SIGUSR1 392 pid = os.getpid() 393 394 old_handler = signal.signal(signum, lambda *args: None) 395 self.addCleanup(signal.signal, signum, old_handler) 396 397 code = '\n'.join(( 398 'import os, time', 399 'pid = %s' % os.getpid(), 400 'signum = %s' % int(signum), 401 'sleep_time = %r' % self.sleep_time, 402 'time.sleep(sleep_time)', 403 'os.kill(pid, signum)', 404 )) 405 406 old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 407 self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) 408 409 t0 = time.monotonic() 410 proc = self.subprocess(code) 411 with kill_on_error(proc): 412 wait_func(signum) 413 dt = time.monotonic() - t0 414 415 self.assertEqual(proc.wait(), 0) 416 417 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 418 'need signal.sigwaitinfo()') 419 def test_sigwaitinfo(self): 420 def wait_func(signum): 421 signal.sigwaitinfo([signum]) 422 423 self.check_sigwait(wait_func) 424 425 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 426 'need signal.sigwaitinfo()') 427 def test_sigtimedwait(self): 428 def wait_func(signum): 429 signal.sigtimedwait([signum], 120.0) 430 431 self.check_sigwait(wait_func) 432 433 434@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 435class SelectEINTRTest(EINTRBaseTest): 436 """ EINTR tests for the select module. """ 437 438 def test_select(self): 439 t0 = time.monotonic() 440 select.select([], [], [], self.sleep_time) 441 dt = time.monotonic() - t0 442 self.stop_alarm() 443 self.assertGreaterEqual(dt, self.sleep_time) 444 445 @unittest.skipIf(sys.platform == "darwin", 446 "poll may fail on macOS; see issue #28087") 447 @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') 448 def test_poll(self): 449 poller = select.poll() 450 451 t0 = time.monotonic() 452 poller.poll(self.sleep_time * 1e3) 453 dt = time.monotonic() - t0 454 self.stop_alarm() 455 self.assertGreaterEqual(dt, self.sleep_time) 456 457 @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') 458 def test_epoll(self): 459 poller = select.epoll() 460 self.addCleanup(poller.close) 461 462 t0 = time.monotonic() 463 poller.poll(self.sleep_time) 464 dt = time.monotonic() - t0 465 self.stop_alarm() 466 self.assertGreaterEqual(dt, self.sleep_time) 467 468 @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') 469 def test_kqueue(self): 470 kqueue = select.kqueue() 471 self.addCleanup(kqueue.close) 472 473 t0 = time.monotonic() 474 kqueue.control(None, 1, self.sleep_time) 475 dt = time.monotonic() - t0 476 self.stop_alarm() 477 self.assertGreaterEqual(dt, self.sleep_time) 478 479 @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') 480 def test_devpoll(self): 481 poller = select.devpoll() 482 self.addCleanup(poller.close) 483 484 t0 = time.monotonic() 485 poller.poll(self.sleep_time * 1e3) 486 dt = time.monotonic() - t0 487 self.stop_alarm() 488 self.assertGreaterEqual(dt, self.sleep_time) 489 490 491class FNTLEINTRTest(EINTRBaseTest): 492 def _lock(self, lock_func, lock_name): 493 self.addCleanup(support.unlink, support.TESTFN) 494 code = '\n'.join(( 495 "import fcntl, time", 496 "with open('%s', 'wb') as f:" % support.TESTFN, 497 " fcntl.%s(f, fcntl.LOCK_EX)" % lock_name, 498 " time.sleep(%s)" % self.sleep_time)) 499 start_time = time.monotonic() 500 proc = self.subprocess(code) 501 with kill_on_error(proc): 502 with open(support.TESTFN, 'wb') as f: 503 while True: # synchronize the subprocess 504 dt = time.monotonic() - start_time 505 if dt > 60.0: 506 raise Exception("failed to sync child in %.1f sec" % dt) 507 try: 508 lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB) 509 lock_func(f, fcntl.LOCK_UN) 510 time.sleep(0.01) 511 except BlockingIOError: 512 break 513 # the child locked the file just a moment ago for 'sleep_time' seconds 514 # that means that the lock below will block for 'sleep_time' minus some 515 # potential context switch delay 516 lock_func(f, fcntl.LOCK_EX) 517 dt = time.monotonic() - start_time 518 self.assertGreaterEqual(dt, self.sleep_time) 519 self.stop_alarm() 520 proc.wait() 521 522 # Issue 35633: See https://bugs.python.org/issue35633#msg333662 523 # skip test rather than accept PermissionError from all platforms 524 @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") 525 def test_lockf(self): 526 self._lock(fcntl.lockf, "lockf") 527 528 def test_flock(self): 529 self._lock(fcntl.flock, "flock") 530 531 532if __name__ == "__main__": 533 unittest.main() 534