1"""Selector event loop for Unix with signal handling.""" 2 3import errno 4import io 5import os 6import selectors 7import signal 8import socket 9import stat 10import subprocess 11import sys 12import threading 13import warnings 14 15 16from . import base_events 17from . import base_subprocess 18from . import constants 19from . import coroutines 20from . import events 21from . import futures 22from . import selector_events 23from . import tasks 24from . import transports 25from .log import logger 26 27 28__all__ = ( 29 'SelectorEventLoop', 30 'AbstractChildWatcher', 'SafeChildWatcher', 31 'FastChildWatcher', 'DefaultEventLoopPolicy', 32) 33 34 35if sys.platform == 'win32': # pragma: no cover 36 raise ImportError('Signals are not really supported on Windows') 37 38 39def _sighandler_noop(signum, frame): 40 """Dummy signal handler.""" 41 pass 42 43 44class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 45 """Unix event loop. 46 47 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 48 """ 49 50 def __init__(self, selector=None): 51 super().__init__(selector) 52 self._signal_handlers = {} 53 54 def close(self): 55 super().close() 56 if not sys.is_finalizing(): 57 for sig in list(self._signal_handlers): 58 self.remove_signal_handler(sig) 59 else: 60 if self._signal_handlers: 61 warnings.warn(f"Closing the loop {self!r} " 62 f"on interpreter shutdown " 63 f"stage, skipping signal handlers removal", 64 ResourceWarning, 65 source=self) 66 self._signal_handlers.clear() 67 68 def _process_self_data(self, data): 69 for signum in data: 70 if not signum: 71 # ignore null bytes written by _write_to_self() 72 continue 73 self._handle_signal(signum) 74 75 def add_signal_handler(self, sig, callback, *args): 76 """Add a handler for a signal. UNIX only. 77 78 Raise ValueError if the signal number is invalid or uncatchable. 79 Raise RuntimeError if there is a problem setting up the handler. 80 """ 81 if (coroutines.iscoroutine(callback) or 82 coroutines.iscoroutinefunction(callback)): 83 raise TypeError("coroutines cannot be used " 84 "with add_signal_handler()") 85 self._check_signal(sig) 86 self._check_closed() 87 try: 88 # set_wakeup_fd() raises ValueError if this is not the 89 # main thread. By calling it early we ensure that an 90 # event loop running in another thread cannot add a signal 91 # handler. 92 signal.set_wakeup_fd(self._csock.fileno()) 93 except (ValueError, OSError) as exc: 94 raise RuntimeError(str(exc)) 95 96 handle = events.Handle(callback, args, self, None) 97 self._signal_handlers[sig] = handle 98 99 try: 100 # Register a dummy signal handler to ask Python to write the signal 101 # number in the wakup file descriptor. _process_self_data() will 102 # read signal numbers from this file descriptor to handle signals. 103 signal.signal(sig, _sighandler_noop) 104 105 # Set SA_RESTART to limit EINTR occurrences. 106 signal.siginterrupt(sig, False) 107 except OSError as exc: 108 del self._signal_handlers[sig] 109 if not self._signal_handlers: 110 try: 111 signal.set_wakeup_fd(-1) 112 except (ValueError, OSError) as nexc: 113 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 114 115 if exc.errno == errno.EINVAL: 116 raise RuntimeError(f'sig {sig} cannot be caught') 117 else: 118 raise 119 120 def _handle_signal(self, sig): 121 """Internal helper that is the actual signal handler.""" 122 handle = self._signal_handlers.get(sig) 123 if handle is None: 124 return # Assume it's some race condition. 125 if handle._cancelled: 126 self.remove_signal_handler(sig) # Remove it properly. 127 else: 128 self._add_callback_signalsafe(handle) 129 130 def remove_signal_handler(self, sig): 131 """Remove a handler for a signal. UNIX only. 132 133 Return True if a signal handler was removed, False if not. 134 """ 135 self._check_signal(sig) 136 try: 137 del self._signal_handlers[sig] 138 except KeyError: 139 return False 140 141 if sig == signal.SIGINT: 142 handler = signal.default_int_handler 143 else: 144 handler = signal.SIG_DFL 145 146 try: 147 signal.signal(sig, handler) 148 except OSError as exc: 149 if exc.errno == errno.EINVAL: 150 raise RuntimeError(f'sig {sig} cannot be caught') 151 else: 152 raise 153 154 if not self._signal_handlers: 155 try: 156 signal.set_wakeup_fd(-1) 157 except (ValueError, OSError) as exc: 158 logger.info('set_wakeup_fd(-1) failed: %s', exc) 159 160 return True 161 162 def _check_signal(self, sig): 163 """Internal helper to validate a signal. 164 165 Raise ValueError if the signal number is invalid or uncatchable. 166 Raise RuntimeError if there is a problem setting up the handler. 167 """ 168 if not isinstance(sig, int): 169 raise TypeError(f'sig must be an int, not {sig!r}') 170 171 if not (1 <= sig < signal.NSIG): 172 raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})') 173 174 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 175 extra=None): 176 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 177 178 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 179 extra=None): 180 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 181 182 async def _make_subprocess_transport(self, protocol, args, shell, 183 stdin, stdout, stderr, bufsize, 184 extra=None, **kwargs): 185 with events.get_child_watcher() as watcher: 186 waiter = self.create_future() 187 transp = _UnixSubprocessTransport(self, protocol, args, shell, 188 stdin, stdout, stderr, bufsize, 189 waiter=waiter, extra=extra, 190 **kwargs) 191 192 watcher.add_child_handler(transp.get_pid(), 193 self._child_watcher_callback, transp) 194 try: 195 await waiter 196 except Exception: 197 transp.close() 198 await transp._wait() 199 raise 200 201 return transp 202 203 def _child_watcher_callback(self, pid, returncode, transp): 204 self.call_soon_threadsafe(transp._process_exited, returncode) 205 206 async def create_unix_connection( 207 self, protocol_factory, path=None, *, 208 ssl=None, sock=None, 209 server_hostname=None, 210 ssl_handshake_timeout=None): 211 assert server_hostname is None or isinstance(server_hostname, str) 212 if ssl: 213 if server_hostname is None: 214 raise ValueError( 215 'you have to pass server_hostname when using ssl') 216 else: 217 if server_hostname is not None: 218 raise ValueError('server_hostname is only meaningful with ssl') 219 if ssl_handshake_timeout is not None: 220 raise ValueError( 221 'ssl_handshake_timeout is only meaningful with ssl') 222 223 if path is not None: 224 if sock is not None: 225 raise ValueError( 226 'path and sock can not be specified at the same time') 227 228 path = os.fspath(path) 229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 230 try: 231 sock.setblocking(False) 232 await self.sock_connect(sock, path) 233 except: 234 sock.close() 235 raise 236 237 else: 238 if sock is None: 239 raise ValueError('no path and sock were specified') 240 if (sock.family != socket.AF_UNIX or 241 sock.type != socket.SOCK_STREAM): 242 raise ValueError( 243 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 244 sock.setblocking(False) 245 246 transport, protocol = await self._create_connection_transport( 247 sock, protocol_factory, ssl, server_hostname, 248 ssl_handshake_timeout=ssl_handshake_timeout) 249 return transport, protocol 250 251 async def create_unix_server( 252 self, protocol_factory, path=None, *, 253 sock=None, backlog=100, ssl=None, 254 ssl_handshake_timeout=None, 255 start_serving=True): 256 if isinstance(ssl, bool): 257 raise TypeError('ssl argument must be an SSLContext or None') 258 259 if ssl_handshake_timeout is not None and not ssl: 260 raise ValueError( 261 'ssl_handshake_timeout is only meaningful with ssl') 262 263 if path is not None: 264 if sock is not None: 265 raise ValueError( 266 'path and sock can not be specified at the same time') 267 268 path = os.fspath(path) 269 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 270 271 # Check for abstract socket. `str` and `bytes` paths are supported. 272 if path[0] not in (0, '\x00'): 273 try: 274 if stat.S_ISSOCK(os.stat(path).st_mode): 275 os.remove(path) 276 except FileNotFoundError: 277 pass 278 except OSError as err: 279 # Directory may have permissions only to create socket. 280 logger.error('Unable to check or remove stale UNIX socket ' 281 '%r: %r', path, err) 282 283 try: 284 sock.bind(path) 285 except OSError as exc: 286 sock.close() 287 if exc.errno == errno.EADDRINUSE: 288 # Let's improve the error message by adding 289 # with what exact address it occurs. 290 msg = f'Address {path!r} is already in use' 291 raise OSError(errno.EADDRINUSE, msg) from None 292 else: 293 raise 294 except: 295 sock.close() 296 raise 297 else: 298 if sock is None: 299 raise ValueError( 300 'path was not specified, and no sock specified') 301 302 if (sock.family != socket.AF_UNIX or 303 sock.type != socket.SOCK_STREAM): 304 raise ValueError( 305 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 306 307 sock.setblocking(False) 308 server = base_events.Server(self, [sock], protocol_factory, 309 ssl, backlog, ssl_handshake_timeout) 310 if start_serving: 311 server._start_serving() 312 # Skip one loop iteration so that all 'loop.add_reader' 313 # go through. 314 await tasks.sleep(0, loop=self) 315 316 return server 317 318 async def _sock_sendfile_native(self, sock, file, offset, count): 319 try: 320 os.sendfile 321 except AttributeError as exc: 322 raise events.SendfileNotAvailableError( 323 "os.sendfile() is not available") 324 try: 325 fileno = file.fileno() 326 except (AttributeError, io.UnsupportedOperation) as err: 327 raise events.SendfileNotAvailableError("not a regular file") 328 try: 329 fsize = os.fstat(fileno).st_size 330 except OSError as err: 331 raise events.SendfileNotAvailableError("not a regular file") 332 blocksize = count if count else fsize 333 if not blocksize: 334 return 0 # empty file 335 336 fut = self.create_future() 337 self._sock_sendfile_native_impl(fut, None, sock, fileno, 338 offset, count, blocksize, 0) 339 return await fut 340 341 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 342 offset, count, blocksize, total_sent): 343 fd = sock.fileno() 344 if registered_fd is not None: 345 # Remove the callback early. It should be rare that the 346 # selector says the fd is ready but the call still returns 347 # EAGAIN, and I am willing to take a hit in that case in 348 # order to simplify the common case. 349 self.remove_writer(registered_fd) 350 if fut.cancelled(): 351 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 352 return 353 if count: 354 blocksize = count - total_sent 355 if blocksize <= 0: 356 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 357 fut.set_result(total_sent) 358 return 359 360 try: 361 sent = os.sendfile(fd, fileno, offset, blocksize) 362 except (BlockingIOError, InterruptedError): 363 if registered_fd is None: 364 self._sock_add_cancellation_callback(fut, sock) 365 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 366 fd, sock, fileno, 367 offset, count, blocksize, total_sent) 368 except OSError as exc: 369 if (registered_fd is not None and 370 exc.errno == errno.ENOTCONN and 371 type(exc) is not ConnectionError): 372 # If we have an ENOTCONN and this isn't a first call to 373 # sendfile(), i.e. the connection was closed in the middle 374 # of the operation, normalize the error to ConnectionError 375 # to make it consistent across all Posix systems. 376 new_exc = ConnectionError( 377 "socket is not connected", errno.ENOTCONN) 378 new_exc.__cause__ = exc 379 exc = new_exc 380 if total_sent == 0: 381 # We can get here for different reasons, the main 382 # one being 'file' is not a regular mmap(2)-like 383 # file, in which case we'll fall back on using 384 # plain send(). 385 err = events.SendfileNotAvailableError( 386 "os.sendfile call failed") 387 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 388 fut.set_exception(err) 389 else: 390 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 391 fut.set_exception(exc) 392 except Exception as exc: 393 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 394 fut.set_exception(exc) 395 else: 396 if sent == 0: 397 # EOF 398 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 399 fut.set_result(total_sent) 400 else: 401 offset += sent 402 total_sent += sent 403 if registered_fd is None: 404 self._sock_add_cancellation_callback(fut, sock) 405 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 406 fd, sock, fileno, 407 offset, count, blocksize, total_sent) 408 409 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 410 if total_sent > 0: 411 os.lseek(fileno, offset, os.SEEK_SET) 412 413 def _sock_add_cancellation_callback(self, fut, sock): 414 def cb(fut): 415 if fut.cancelled(): 416 fd = sock.fileno() 417 if fd != -1: 418 self.remove_writer(fd) 419 fut.add_done_callback(cb) 420 421 422class _UnixReadPipeTransport(transports.ReadTransport): 423 424 max_size = 256 * 1024 # max bytes we read in one event loop iteration 425 426 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 427 super().__init__(extra) 428 self._extra['pipe'] = pipe 429 self._loop = loop 430 self._pipe = pipe 431 self._fileno = pipe.fileno() 432 self._protocol = protocol 433 self._closing = False 434 435 mode = os.fstat(self._fileno).st_mode 436 if not (stat.S_ISFIFO(mode) or 437 stat.S_ISSOCK(mode) or 438 stat.S_ISCHR(mode)): 439 self._pipe = None 440 self._fileno = None 441 self._protocol = None 442 raise ValueError("Pipe transport is for pipes/sockets only.") 443 444 os.set_blocking(self._fileno, False) 445 446 self._loop.call_soon(self._protocol.connection_made, self) 447 # only start reading when connection_made() has been called 448 self._loop.call_soon(self._loop._add_reader, 449 self._fileno, self._read_ready) 450 if waiter is not None: 451 # only wake up the waiter when connection_made() has been called 452 self._loop.call_soon(futures._set_result_unless_cancelled, 453 waiter, None) 454 455 def __repr__(self): 456 info = [self.__class__.__name__] 457 if self._pipe is None: 458 info.append('closed') 459 elif self._closing: 460 info.append('closing') 461 info.append(f'fd={self._fileno}') 462 selector = getattr(self._loop, '_selector', None) 463 if self._pipe is not None and selector is not None: 464 polling = selector_events._test_selector_event( 465 selector, self._fileno, selectors.EVENT_READ) 466 if polling: 467 info.append('polling') 468 else: 469 info.append('idle') 470 elif self._pipe is not None: 471 info.append('open') 472 else: 473 info.append('closed') 474 return '<{}>'.format(' '.join(info)) 475 476 def _read_ready(self): 477 try: 478 data = os.read(self._fileno, self.max_size) 479 except (BlockingIOError, InterruptedError): 480 pass 481 except OSError as exc: 482 self._fatal_error(exc, 'Fatal read error on pipe transport') 483 else: 484 if data: 485 self._protocol.data_received(data) 486 else: 487 if self._loop.get_debug(): 488 logger.info("%r was closed by peer", self) 489 self._closing = True 490 self._loop._remove_reader(self._fileno) 491 self._loop.call_soon(self._protocol.eof_received) 492 self._loop.call_soon(self._call_connection_lost, None) 493 494 def pause_reading(self): 495 self._loop._remove_reader(self._fileno) 496 497 def resume_reading(self): 498 self._loop._add_reader(self._fileno, self._read_ready) 499 500 def set_protocol(self, protocol): 501 self._protocol = protocol 502 503 def get_protocol(self): 504 return self._protocol 505 506 def is_closing(self): 507 return self._closing 508 509 def close(self): 510 if not self._closing: 511 self._close(None) 512 513 def __del__(self): 514 if self._pipe is not None: 515 warnings.warn(f"unclosed transport {self!r}", ResourceWarning, 516 source=self) 517 self._pipe.close() 518 519 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 520 # should be called by exception handler only 521 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 522 if self._loop.get_debug(): 523 logger.debug("%r: %s", self, message, exc_info=True) 524 else: 525 self._loop.call_exception_handler({ 526 'message': message, 527 'exception': exc, 528 'transport': self, 529 'protocol': self._protocol, 530 }) 531 self._close(exc) 532 533 def _close(self, exc): 534 self._closing = True 535 self._loop._remove_reader(self._fileno) 536 self._loop.call_soon(self._call_connection_lost, exc) 537 538 def _call_connection_lost(self, exc): 539 try: 540 self._protocol.connection_lost(exc) 541 finally: 542 self._pipe.close() 543 self._pipe = None 544 self._protocol = None 545 self._loop = None 546 547 548class _UnixWritePipeTransport(transports._FlowControlMixin, 549 transports.WriteTransport): 550 551 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 552 super().__init__(extra, loop) 553 self._extra['pipe'] = pipe 554 self._pipe = pipe 555 self._fileno = pipe.fileno() 556 self._protocol = protocol 557 self._buffer = bytearray() 558 self._conn_lost = 0 559 self._closing = False # Set when close() or write_eof() called. 560 561 mode = os.fstat(self._fileno).st_mode 562 is_char = stat.S_ISCHR(mode) 563 is_fifo = stat.S_ISFIFO(mode) 564 is_socket = stat.S_ISSOCK(mode) 565 if not (is_char or is_fifo or is_socket): 566 self._pipe = None 567 self._fileno = None 568 self._protocol = None 569 raise ValueError("Pipe transport is only for " 570 "pipes, sockets and character devices") 571 572 os.set_blocking(self._fileno, False) 573 self._loop.call_soon(self._protocol.connection_made, self) 574 575 # On AIX, the reader trick (to be notified when the read end of the 576 # socket is closed) only works for sockets. On other platforms it 577 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 578 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 579 # only start reading when connection_made() has been called 580 self._loop.call_soon(self._loop._add_reader, 581 self._fileno, self._read_ready) 582 583 if waiter is not None: 584 # only wake up the waiter when connection_made() has been called 585 self._loop.call_soon(futures._set_result_unless_cancelled, 586 waiter, None) 587 588 def __repr__(self): 589 info = [self.__class__.__name__] 590 if self._pipe is None: 591 info.append('closed') 592 elif self._closing: 593 info.append('closing') 594 info.append(f'fd={self._fileno}') 595 selector = getattr(self._loop, '_selector', None) 596 if self._pipe is not None and selector is not None: 597 polling = selector_events._test_selector_event( 598 selector, self._fileno, selectors.EVENT_WRITE) 599 if polling: 600 info.append('polling') 601 else: 602 info.append('idle') 603 604 bufsize = self.get_write_buffer_size() 605 info.append(f'bufsize={bufsize}') 606 elif self._pipe is not None: 607 info.append('open') 608 else: 609 info.append('closed') 610 return '<{}>'.format(' '.join(info)) 611 612 def get_write_buffer_size(self): 613 return len(self._buffer) 614 615 def _read_ready(self): 616 # Pipe was closed by peer. 617 if self._loop.get_debug(): 618 logger.info("%r was closed by peer", self) 619 if self._buffer: 620 self._close(BrokenPipeError()) 621 else: 622 self._close() 623 624 def write(self, data): 625 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 626 if isinstance(data, bytearray): 627 data = memoryview(data) 628 if not data: 629 return 630 631 if self._conn_lost or self._closing: 632 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 633 logger.warning('pipe closed by peer or ' 634 'os.write(pipe, data) raised exception.') 635 self._conn_lost += 1 636 return 637 638 if not self._buffer: 639 # Attempt to send it right away first. 640 try: 641 n = os.write(self._fileno, data) 642 except (BlockingIOError, InterruptedError): 643 n = 0 644 except Exception as exc: 645 self._conn_lost += 1 646 self._fatal_error(exc, 'Fatal write error on pipe transport') 647 return 648 if n == len(data): 649 return 650 elif n > 0: 651 data = memoryview(data)[n:] 652 self._loop._add_writer(self._fileno, self._write_ready) 653 654 self._buffer += data 655 self._maybe_pause_protocol() 656 657 def _write_ready(self): 658 assert self._buffer, 'Data should not be empty' 659 660 try: 661 n = os.write(self._fileno, self._buffer) 662 except (BlockingIOError, InterruptedError): 663 pass 664 except Exception as exc: 665 self._buffer.clear() 666 self._conn_lost += 1 667 # Remove writer here, _fatal_error() doesn't it 668 # because _buffer is empty. 669 self._loop._remove_writer(self._fileno) 670 self._fatal_error(exc, 'Fatal write error on pipe transport') 671 else: 672 if n == len(self._buffer): 673 self._buffer.clear() 674 self._loop._remove_writer(self._fileno) 675 self._maybe_resume_protocol() # May append to buffer. 676 if self._closing: 677 self._loop._remove_reader(self._fileno) 678 self._call_connection_lost(None) 679 return 680 elif n > 0: 681 del self._buffer[:n] 682 683 def can_write_eof(self): 684 return True 685 686 def write_eof(self): 687 if self._closing: 688 return 689 assert self._pipe 690 self._closing = True 691 if not self._buffer: 692 self._loop._remove_reader(self._fileno) 693 self._loop.call_soon(self._call_connection_lost, None) 694 695 def set_protocol(self, protocol): 696 self._protocol = protocol 697 698 def get_protocol(self): 699 return self._protocol 700 701 def is_closing(self): 702 return self._closing 703 704 def close(self): 705 if self._pipe is not None and not self._closing: 706 # write_eof is all what we needed to close the write pipe 707 self.write_eof() 708 709 def __del__(self): 710 if self._pipe is not None: 711 warnings.warn(f"unclosed transport {self!r}", ResourceWarning, 712 source=self) 713 self._pipe.close() 714 715 def abort(self): 716 self._close(None) 717 718 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 719 # should be called by exception handler only 720 if isinstance(exc, base_events._FATAL_ERROR_IGNORE): 721 if self._loop.get_debug(): 722 logger.debug("%r: %s", self, message, exc_info=True) 723 else: 724 self._loop.call_exception_handler({ 725 'message': message, 726 'exception': exc, 727 'transport': self, 728 'protocol': self._protocol, 729 }) 730 self._close(exc) 731 732 def _close(self, exc=None): 733 self._closing = True 734 if self._buffer: 735 self._loop._remove_writer(self._fileno) 736 self._buffer.clear() 737 self._loop._remove_reader(self._fileno) 738 self._loop.call_soon(self._call_connection_lost, exc) 739 740 def _call_connection_lost(self, exc): 741 try: 742 self._protocol.connection_lost(exc) 743 finally: 744 self._pipe.close() 745 self._pipe = None 746 self._protocol = None 747 self._loop = None 748 749 750class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 751 752 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 753 stdin_w = None 754 if stdin == subprocess.PIPE: 755 # Use a socket pair for stdin, since not all platforms 756 # support selecting read events on the write end of a 757 # socket (which we use in order to detect closing of the 758 # other end). Notably this is needed on AIX, and works 759 # just fine on other platforms. 760 stdin, stdin_w = socket.socketpair() 761 self._proc = subprocess.Popen( 762 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 763 universal_newlines=False, bufsize=bufsize, **kwargs) 764 if stdin_w is not None: 765 stdin.close() 766 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 767 768 769class AbstractChildWatcher: 770 """Abstract base class for monitoring child processes. 771 772 Objects derived from this class monitor a collection of subprocesses and 773 report their termination or interruption by a signal. 774 775 New callbacks are registered with .add_child_handler(). Starting a new 776 process must be done within a 'with' block to allow the watcher to suspend 777 its activity until the new process if fully registered (this is needed to 778 prevent a race condition in some implementations). 779 780 Example: 781 with watcher: 782 proc = subprocess.Popen("sleep 1") 783 watcher.add_child_handler(proc.pid, callback) 784 785 Notes: 786 Implementations of this class must be thread-safe. 787 788 Since child watcher objects may catch the SIGCHLD signal and call 789 waitpid(-1), there should be only one active object per process. 790 """ 791 792 def add_child_handler(self, pid, callback, *args): 793 """Register a new child handler. 794 795 Arrange for callback(pid, returncode, *args) to be called when 796 process 'pid' terminates. Specifying another callback for the same 797 process replaces the previous handler. 798 799 Note: callback() must be thread-safe. 800 """ 801 raise NotImplementedError() 802 803 def remove_child_handler(self, pid): 804 """Removes the handler for process 'pid'. 805 806 The function returns True if the handler was successfully removed, 807 False if there was nothing to remove.""" 808 809 raise NotImplementedError() 810 811 def attach_loop(self, loop): 812 """Attach the watcher to an event loop. 813 814 If the watcher was previously attached to an event loop, then it is 815 first detached before attaching to the new loop. 816 817 Note: loop may be None. 818 """ 819 raise NotImplementedError() 820 821 def close(self): 822 """Close the watcher. 823 824 This must be called to make sure that any underlying resource is freed. 825 """ 826 raise NotImplementedError() 827 828 def __enter__(self): 829 """Enter the watcher's context and allow starting new processes 830 831 This function must return self""" 832 raise NotImplementedError() 833 834 def __exit__(self, a, b, c): 835 """Exit the watcher's context""" 836 raise NotImplementedError() 837 838 839class BaseChildWatcher(AbstractChildWatcher): 840 841 def __init__(self): 842 self._loop = None 843 self._callbacks = {} 844 845 def close(self): 846 self.attach_loop(None) 847 848 def _do_waitpid(self, expected_pid): 849 raise NotImplementedError() 850 851 def _do_waitpid_all(self): 852 raise NotImplementedError() 853 854 def attach_loop(self, loop): 855 assert loop is None or isinstance(loop, events.AbstractEventLoop) 856 857 if self._loop is not None and loop is None and self._callbacks: 858 warnings.warn( 859 'A loop is being detached ' 860 'from a child watcher with pending handlers', 861 RuntimeWarning) 862 863 if self._loop is not None: 864 self._loop.remove_signal_handler(signal.SIGCHLD) 865 866 self._loop = loop 867 if loop is not None: 868 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) 869 870 # Prevent a race condition in case a child terminated 871 # during the switch. 872 self._do_waitpid_all() 873 874 def _sig_chld(self): 875 try: 876 self._do_waitpid_all() 877 except Exception as exc: 878 # self._loop should always be available here 879 # as '_sig_chld' is added as a signal handler 880 # in 'attach_loop' 881 self._loop.call_exception_handler({ 882 'message': 'Unknown exception in SIGCHLD handler', 883 'exception': exc, 884 }) 885 886 def _compute_returncode(self, status): 887 if os.WIFSIGNALED(status): 888 # The child process died because of a signal. 889 return -os.WTERMSIG(status) 890 elif os.WIFEXITED(status): 891 # The child process exited (e.g sys.exit()). 892 return os.WEXITSTATUS(status) 893 else: 894 # The child exited, but we don't understand its status. 895 # This shouldn't happen, but if it does, let's just 896 # return that status; perhaps that helps debug it. 897 return status 898 899 900class SafeChildWatcher(BaseChildWatcher): 901 """'Safe' child watcher implementation. 902 903 This implementation avoids disrupting other code spawning processes by 904 polling explicitly each process in the SIGCHLD handler instead of calling 905 os.waitpid(-1). 906 907 This is a safe solution but it has a significant overhead when handling a 908 big number of children (O(n) each time SIGCHLD is raised) 909 """ 910 911 def close(self): 912 self._callbacks.clear() 913 super().close() 914 915 def __enter__(self): 916 return self 917 918 def __exit__(self, a, b, c): 919 pass 920 921 def add_child_handler(self, pid, callback, *args): 922 if self._loop is None: 923 raise RuntimeError( 924 "Cannot add child handler, " 925 "the child watcher does not have a loop attached") 926 927 self._callbacks[pid] = (callback, args) 928 929 # Prevent a race condition in case the child is already terminated. 930 self._do_waitpid(pid) 931 932 def remove_child_handler(self, pid): 933 try: 934 del self._callbacks[pid] 935 return True 936 except KeyError: 937 return False 938 939 def _do_waitpid_all(self): 940 941 for pid in list(self._callbacks): 942 self._do_waitpid(pid) 943 944 def _do_waitpid(self, expected_pid): 945 assert expected_pid > 0 946 947 try: 948 pid, status = os.waitpid(expected_pid, os.WNOHANG) 949 except ChildProcessError: 950 # The child process is already reaped 951 # (may happen if waitpid() is called elsewhere). 952 pid = expected_pid 953 returncode = 255 954 logger.warning( 955 "Unknown child process pid %d, will report returncode 255", 956 pid) 957 else: 958 if pid == 0: 959 # The child process is still alive. 960 return 961 962 returncode = self._compute_returncode(status) 963 if self._loop.get_debug(): 964 logger.debug('process %s exited with returncode %s', 965 expected_pid, returncode) 966 967 try: 968 callback, args = self._callbacks.pop(pid) 969 except KeyError: # pragma: no cover 970 # May happen if .remove_child_handler() is called 971 # after os.waitpid() returns. 972 if self._loop.get_debug(): 973 logger.warning("Child watcher got an unexpected pid: %r", 974 pid, exc_info=True) 975 else: 976 callback(pid, returncode, *args) 977 978 979class FastChildWatcher(BaseChildWatcher): 980 """'Fast' child watcher implementation. 981 982 This implementation reaps every terminated processes by calling 983 os.waitpid(-1) directly, possibly breaking other code spawning processes 984 and waiting for their termination. 985 986 There is no noticeable overhead when handling a big number of children 987 (O(1) each time a child terminates). 988 """ 989 def __init__(self): 990 super().__init__() 991 self._lock = threading.Lock() 992 self._zombies = {} 993 self._forks = 0 994 995 def close(self): 996 self._callbacks.clear() 997 self._zombies.clear() 998 super().close() 999 1000 def __enter__(self): 1001 with self._lock: 1002 self._forks += 1 1003 1004 return self 1005 1006 def __exit__(self, a, b, c): 1007 with self._lock: 1008 self._forks -= 1 1009 1010 if self._forks or not self._zombies: 1011 return 1012 1013 collateral_victims = str(self._zombies) 1014 self._zombies.clear() 1015 1016 logger.warning( 1017 "Caught subprocesses termination from unknown pids: %s", 1018 collateral_victims) 1019 1020 def add_child_handler(self, pid, callback, *args): 1021 assert self._forks, "Must use the context manager" 1022 1023 if self._loop is None: 1024 raise RuntimeError( 1025 "Cannot add child handler, " 1026 "the child watcher does not have a loop attached") 1027 1028 with self._lock: 1029 try: 1030 returncode = self._zombies.pop(pid) 1031 except KeyError: 1032 # The child is running. 1033 self._callbacks[pid] = callback, args 1034 return 1035 1036 # The child is dead already. We can fire the callback. 1037 callback(pid, returncode, *args) 1038 1039 def remove_child_handler(self, pid): 1040 try: 1041 del self._callbacks[pid] 1042 return True 1043 except KeyError: 1044 return False 1045 1046 def _do_waitpid_all(self): 1047 # Because of signal coalescing, we must keep calling waitpid() as 1048 # long as we're able to reap a child. 1049 while True: 1050 try: 1051 pid, status = os.waitpid(-1, os.WNOHANG) 1052 except ChildProcessError: 1053 # No more child processes exist. 1054 return 1055 else: 1056 if pid == 0: 1057 # A child process is still alive. 1058 return 1059 1060 returncode = self._compute_returncode(status) 1061 1062 with self._lock: 1063 try: 1064 callback, args = self._callbacks.pop(pid) 1065 except KeyError: 1066 # unknown child 1067 if self._forks: 1068 # It may not be registered yet. 1069 self._zombies[pid] = returncode 1070 if self._loop.get_debug(): 1071 logger.debug('unknown process %s exited ' 1072 'with returncode %s', 1073 pid, returncode) 1074 continue 1075 callback = None 1076 else: 1077 if self._loop.get_debug(): 1078 logger.debug('process %s exited with returncode %s', 1079 pid, returncode) 1080 1081 if callback is None: 1082 logger.warning( 1083 "Caught subprocess termination from unknown pid: " 1084 "%d -> %d", pid, returncode) 1085 else: 1086 callback(pid, returncode, *args) 1087 1088 1089class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 1090 """UNIX event loop policy with a watcher for child processes.""" 1091 _loop_factory = _UnixSelectorEventLoop 1092 1093 def __init__(self): 1094 super().__init__() 1095 self._watcher = None 1096 1097 def _init_watcher(self): 1098 with events._lock: 1099 if self._watcher is None: # pragma: no branch 1100 self._watcher = SafeChildWatcher() 1101 if isinstance(threading.current_thread(), 1102 threading._MainThread): 1103 self._watcher.attach_loop(self._local._loop) 1104 1105 def set_event_loop(self, loop): 1106 """Set the event loop. 1107 1108 As a side effect, if a child watcher was set before, then calling 1109 .set_event_loop() from the main thread will call .attach_loop(loop) on 1110 the child watcher. 1111 """ 1112 1113 super().set_event_loop(loop) 1114 1115 if (self._watcher is not None and 1116 isinstance(threading.current_thread(), threading._MainThread)): 1117 self._watcher.attach_loop(loop) 1118 1119 def get_child_watcher(self): 1120 """Get the watcher for child processes. 1121 1122 If not yet set, a SafeChildWatcher object is automatically created. 1123 """ 1124 if self._watcher is None: 1125 self._init_watcher() 1126 1127 return self._watcher 1128 1129 def set_child_watcher(self, watcher): 1130 """Set the watcher for child processes.""" 1131 1132 assert watcher is None or isinstance(watcher, AbstractChildWatcher) 1133 1134 if self._watcher is not None: 1135 self._watcher.close() 1136 1137 self._watcher = watcher 1138 1139 1140SelectorEventLoop = _UnixSelectorEventLoop 1141DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 1142