1"""Selector event loop for Unix with signal handling."""
2
3import errno
4import io
5import os
6import selectors
7import signal
8import socket
9import stat
10import subprocess
11import sys
12import threading
13import warnings
14
15
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import futures
22from . import selector_events
23from . import tasks
24from . import transports
25from .log import logger
26
27
28__all__ = (
29    'SelectorEventLoop',
30    'AbstractChildWatcher', 'SafeChildWatcher',
31    'FastChildWatcher', 'DefaultEventLoopPolicy',
32)
33
34
35if sys.platform == 'win32':  # pragma: no cover
36    raise ImportError('Signals are not really supported on Windows')
37
38
39def _sighandler_noop(signum, frame):
40    """Dummy signal handler."""
41    pass
42
43
44class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
45    """Unix event loop.
46
47    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
48    """
49
50    def __init__(self, selector=None):
51        super().__init__(selector)
52        self._signal_handlers = {}
53
54    def close(self):
55        super().close()
56        if not sys.is_finalizing():
57            for sig in list(self._signal_handlers):
58                self.remove_signal_handler(sig)
59        else:
60            if self._signal_handlers:
61                warnings.warn(f"Closing the loop {self!r} "
62                              f"on interpreter shutdown "
63                              f"stage, skipping signal handlers removal",
64                              ResourceWarning,
65                              source=self)
66                self._signal_handlers.clear()
67
68    def _process_self_data(self, data):
69        for signum in data:
70            if not signum:
71                # ignore null bytes written by _write_to_self()
72                continue
73            self._handle_signal(signum)
74
75    def add_signal_handler(self, sig, callback, *args):
76        """Add a handler for a signal.  UNIX only.
77
78        Raise ValueError if the signal number is invalid or uncatchable.
79        Raise RuntimeError if there is a problem setting up the handler.
80        """
81        if (coroutines.iscoroutine(callback) or
82                coroutines.iscoroutinefunction(callback)):
83            raise TypeError("coroutines cannot be used "
84                            "with add_signal_handler()")
85        self._check_signal(sig)
86        self._check_closed()
87        try:
88            # set_wakeup_fd() raises ValueError if this is not the
89            # main thread.  By calling it early we ensure that an
90            # event loop running in another thread cannot add a signal
91            # handler.
92            signal.set_wakeup_fd(self._csock.fileno())
93        except (ValueError, OSError) as exc:
94            raise RuntimeError(str(exc))
95
96        handle = events.Handle(callback, args, self, None)
97        self._signal_handlers[sig] = handle
98
99        try:
100            # Register a dummy signal handler to ask Python to write the signal
101            # number in the wakup file descriptor. _process_self_data() will
102            # read signal numbers from this file descriptor to handle signals.
103            signal.signal(sig, _sighandler_noop)
104
105            # Set SA_RESTART to limit EINTR occurrences.
106            signal.siginterrupt(sig, False)
107        except OSError as exc:
108            del self._signal_handlers[sig]
109            if not self._signal_handlers:
110                try:
111                    signal.set_wakeup_fd(-1)
112                except (ValueError, OSError) as nexc:
113                    logger.info('set_wakeup_fd(-1) failed: %s', nexc)
114
115            if exc.errno == errno.EINVAL:
116                raise RuntimeError(f'sig {sig} cannot be caught')
117            else:
118                raise
119
120    def _handle_signal(self, sig):
121        """Internal helper that is the actual signal handler."""
122        handle = self._signal_handlers.get(sig)
123        if handle is None:
124            return  # Assume it's some race condition.
125        if handle._cancelled:
126            self.remove_signal_handler(sig)  # Remove it properly.
127        else:
128            self._add_callback_signalsafe(handle)
129
130    def remove_signal_handler(self, sig):
131        """Remove a handler for a signal.  UNIX only.
132
133        Return True if a signal handler was removed, False if not.
134        """
135        self._check_signal(sig)
136        try:
137            del self._signal_handlers[sig]
138        except KeyError:
139            return False
140
141        if sig == signal.SIGINT:
142            handler = signal.default_int_handler
143        else:
144            handler = signal.SIG_DFL
145
146        try:
147            signal.signal(sig, handler)
148        except OSError as exc:
149            if exc.errno == errno.EINVAL:
150                raise RuntimeError(f'sig {sig} cannot be caught')
151            else:
152                raise
153
154        if not self._signal_handlers:
155            try:
156                signal.set_wakeup_fd(-1)
157            except (ValueError, OSError) as exc:
158                logger.info('set_wakeup_fd(-1) failed: %s', exc)
159
160        return True
161
162    def _check_signal(self, sig):
163        """Internal helper to validate a signal.
164
165        Raise ValueError if the signal number is invalid or uncatchable.
166        Raise RuntimeError if there is a problem setting up the handler.
167        """
168        if not isinstance(sig, int):
169            raise TypeError(f'sig must be an int, not {sig!r}')
170
171        if not (1 <= sig < signal.NSIG):
172            raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
173
174    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
175                                  extra=None):
176        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
177
178    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
179                                   extra=None):
180        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
181
182    async def _make_subprocess_transport(self, protocol, args, shell,
183                                         stdin, stdout, stderr, bufsize,
184                                         extra=None, **kwargs):
185        with events.get_child_watcher() as watcher:
186            waiter = self.create_future()
187            transp = _UnixSubprocessTransport(self, protocol, args, shell,
188                                              stdin, stdout, stderr, bufsize,
189                                              waiter=waiter, extra=extra,
190                                              **kwargs)
191
192            watcher.add_child_handler(transp.get_pid(),
193                                      self._child_watcher_callback, transp)
194            try:
195                await waiter
196            except Exception:
197                transp.close()
198                await transp._wait()
199                raise
200
201        return transp
202
203    def _child_watcher_callback(self, pid, returncode, transp):
204        self.call_soon_threadsafe(transp._process_exited, returncode)
205
206    async def create_unix_connection(
207            self, protocol_factory, path=None, *,
208            ssl=None, sock=None,
209            server_hostname=None,
210            ssl_handshake_timeout=None):
211        assert server_hostname is None or isinstance(server_hostname, str)
212        if ssl:
213            if server_hostname is None:
214                raise ValueError(
215                    'you have to pass server_hostname when using ssl')
216        else:
217            if server_hostname is not None:
218                raise ValueError('server_hostname is only meaningful with ssl')
219            if ssl_handshake_timeout is not None:
220                raise ValueError(
221                    'ssl_handshake_timeout is only meaningful with ssl')
222
223        if path is not None:
224            if sock is not None:
225                raise ValueError(
226                    'path and sock can not be specified at the same time')
227
228            path = os.fspath(path)
229            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
230            try:
231                sock.setblocking(False)
232                await self.sock_connect(sock, path)
233            except:
234                sock.close()
235                raise
236
237        else:
238            if sock is None:
239                raise ValueError('no path and sock were specified')
240            if (sock.family != socket.AF_UNIX or
241                    sock.type != socket.SOCK_STREAM):
242                raise ValueError(
243                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
244            sock.setblocking(False)
245
246        transport, protocol = await self._create_connection_transport(
247            sock, protocol_factory, ssl, server_hostname,
248            ssl_handshake_timeout=ssl_handshake_timeout)
249        return transport, protocol
250
251    async def create_unix_server(
252            self, protocol_factory, path=None, *,
253            sock=None, backlog=100, ssl=None,
254            ssl_handshake_timeout=None,
255            start_serving=True):
256        if isinstance(ssl, bool):
257            raise TypeError('ssl argument must be an SSLContext or None')
258
259        if ssl_handshake_timeout is not None and not ssl:
260            raise ValueError(
261                'ssl_handshake_timeout is only meaningful with ssl')
262
263        if path is not None:
264            if sock is not None:
265                raise ValueError(
266                    'path and sock can not be specified at the same time')
267
268            path = os.fspath(path)
269            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
270
271            # Check for abstract socket. `str` and `bytes` paths are supported.
272            if path[0] not in (0, '\x00'):
273                try:
274                    if stat.S_ISSOCK(os.stat(path).st_mode):
275                        os.remove(path)
276                except FileNotFoundError:
277                    pass
278                except OSError as err:
279                    # Directory may have permissions only to create socket.
280                    logger.error('Unable to check or remove stale UNIX socket '
281                                 '%r: %r', path, err)
282
283            try:
284                sock.bind(path)
285            except OSError as exc:
286                sock.close()
287                if exc.errno == errno.EADDRINUSE:
288                    # Let's improve the error message by adding
289                    # with what exact address it occurs.
290                    msg = f'Address {path!r} is already in use'
291                    raise OSError(errno.EADDRINUSE, msg) from None
292                else:
293                    raise
294            except:
295                sock.close()
296                raise
297        else:
298            if sock is None:
299                raise ValueError(
300                    'path was not specified, and no sock specified')
301
302            if (sock.family != socket.AF_UNIX or
303                    sock.type != socket.SOCK_STREAM):
304                raise ValueError(
305                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
306
307        sock.setblocking(False)
308        server = base_events.Server(self, [sock], protocol_factory,
309                                    ssl, backlog, ssl_handshake_timeout)
310        if start_serving:
311            server._start_serving()
312            # Skip one loop iteration so that all 'loop.add_reader'
313            # go through.
314            await tasks.sleep(0, loop=self)
315
316        return server
317
318    async def _sock_sendfile_native(self, sock, file, offset, count):
319        try:
320            os.sendfile
321        except AttributeError as exc:
322            raise events.SendfileNotAvailableError(
323                "os.sendfile() is not available")
324        try:
325            fileno = file.fileno()
326        except (AttributeError, io.UnsupportedOperation) as err:
327            raise events.SendfileNotAvailableError("not a regular file")
328        try:
329            fsize = os.fstat(fileno).st_size
330        except OSError as err:
331            raise events.SendfileNotAvailableError("not a regular file")
332        blocksize = count if count else fsize
333        if not blocksize:
334            return 0  # empty file
335
336        fut = self.create_future()
337        self._sock_sendfile_native_impl(fut, None, sock, fileno,
338                                        offset, count, blocksize, 0)
339        return await fut
340
341    def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
342                                   offset, count, blocksize, total_sent):
343        fd = sock.fileno()
344        if registered_fd is not None:
345            # Remove the callback early.  It should be rare that the
346            # selector says the fd is ready but the call still returns
347            # EAGAIN, and I am willing to take a hit in that case in
348            # order to simplify the common case.
349            self.remove_writer(registered_fd)
350        if fut.cancelled():
351            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
352            return
353        if count:
354            blocksize = count - total_sent
355            if blocksize <= 0:
356                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
357                fut.set_result(total_sent)
358                return
359
360        try:
361            sent = os.sendfile(fd, fileno, offset, blocksize)
362        except (BlockingIOError, InterruptedError):
363            if registered_fd is None:
364                self._sock_add_cancellation_callback(fut, sock)
365            self.add_writer(fd, self._sock_sendfile_native_impl, fut,
366                            fd, sock, fileno,
367                            offset, count, blocksize, total_sent)
368        except OSError as exc:
369            if (registered_fd is not None and
370                    exc.errno == errno.ENOTCONN and
371                    type(exc) is not ConnectionError):
372                # If we have an ENOTCONN and this isn't a first call to
373                # sendfile(), i.e. the connection was closed in the middle
374                # of the operation, normalize the error to ConnectionError
375                # to make it consistent across all Posix systems.
376                new_exc = ConnectionError(
377                    "socket is not connected", errno.ENOTCONN)
378                new_exc.__cause__ = exc
379                exc = new_exc
380            if total_sent == 0:
381                # We can get here for different reasons, the main
382                # one being 'file' is not a regular mmap(2)-like
383                # file, in which case we'll fall back on using
384                # plain send().
385                err = events.SendfileNotAvailableError(
386                    "os.sendfile call failed")
387                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
388                fut.set_exception(err)
389            else:
390                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
391                fut.set_exception(exc)
392        except Exception as exc:
393            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
394            fut.set_exception(exc)
395        else:
396            if sent == 0:
397                # EOF
398                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
399                fut.set_result(total_sent)
400            else:
401                offset += sent
402                total_sent += sent
403                if registered_fd is None:
404                    self._sock_add_cancellation_callback(fut, sock)
405                self.add_writer(fd, self._sock_sendfile_native_impl, fut,
406                                fd, sock, fileno,
407                                offset, count, blocksize, total_sent)
408
409    def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
410        if total_sent > 0:
411            os.lseek(fileno, offset, os.SEEK_SET)
412
413    def _sock_add_cancellation_callback(self, fut, sock):
414        def cb(fut):
415            if fut.cancelled():
416                fd = sock.fileno()
417                if fd != -1:
418                    self.remove_writer(fd)
419        fut.add_done_callback(cb)
420
421
422class _UnixReadPipeTransport(transports.ReadTransport):
423
424    max_size = 256 * 1024  # max bytes we read in one event loop iteration
425
426    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
427        super().__init__(extra)
428        self._extra['pipe'] = pipe
429        self._loop = loop
430        self._pipe = pipe
431        self._fileno = pipe.fileno()
432        self._protocol = protocol
433        self._closing = False
434
435        mode = os.fstat(self._fileno).st_mode
436        if not (stat.S_ISFIFO(mode) or
437                stat.S_ISSOCK(mode) or
438                stat.S_ISCHR(mode)):
439            self._pipe = None
440            self._fileno = None
441            self._protocol = None
442            raise ValueError("Pipe transport is for pipes/sockets only.")
443
444        os.set_blocking(self._fileno, False)
445
446        self._loop.call_soon(self._protocol.connection_made, self)
447        # only start reading when connection_made() has been called
448        self._loop.call_soon(self._loop._add_reader,
449                             self._fileno, self._read_ready)
450        if waiter is not None:
451            # only wake up the waiter when connection_made() has been called
452            self._loop.call_soon(futures._set_result_unless_cancelled,
453                                 waiter, None)
454
455    def __repr__(self):
456        info = [self.__class__.__name__]
457        if self._pipe is None:
458            info.append('closed')
459        elif self._closing:
460            info.append('closing')
461        info.append(f'fd={self._fileno}')
462        selector = getattr(self._loop, '_selector', None)
463        if self._pipe is not None and selector is not None:
464            polling = selector_events._test_selector_event(
465                selector, self._fileno, selectors.EVENT_READ)
466            if polling:
467                info.append('polling')
468            else:
469                info.append('idle')
470        elif self._pipe is not None:
471            info.append('open')
472        else:
473            info.append('closed')
474        return '<{}>'.format(' '.join(info))
475
476    def _read_ready(self):
477        try:
478            data = os.read(self._fileno, self.max_size)
479        except (BlockingIOError, InterruptedError):
480            pass
481        except OSError as exc:
482            self._fatal_error(exc, 'Fatal read error on pipe transport')
483        else:
484            if data:
485                self._protocol.data_received(data)
486            else:
487                if self._loop.get_debug():
488                    logger.info("%r was closed by peer", self)
489                self._closing = True
490                self._loop._remove_reader(self._fileno)
491                self._loop.call_soon(self._protocol.eof_received)
492                self._loop.call_soon(self._call_connection_lost, None)
493
494    def pause_reading(self):
495        self._loop._remove_reader(self._fileno)
496
497    def resume_reading(self):
498        self._loop._add_reader(self._fileno, self._read_ready)
499
500    def set_protocol(self, protocol):
501        self._protocol = protocol
502
503    def get_protocol(self):
504        return self._protocol
505
506    def is_closing(self):
507        return self._closing
508
509    def close(self):
510        if not self._closing:
511            self._close(None)
512
513    def __del__(self):
514        if self._pipe is not None:
515            warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
516                          source=self)
517            self._pipe.close()
518
519    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
520        # should be called by exception handler only
521        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
522            if self._loop.get_debug():
523                logger.debug("%r: %s", self, message, exc_info=True)
524        else:
525            self._loop.call_exception_handler({
526                'message': message,
527                'exception': exc,
528                'transport': self,
529                'protocol': self._protocol,
530            })
531        self._close(exc)
532
533    def _close(self, exc):
534        self._closing = True
535        self._loop._remove_reader(self._fileno)
536        self._loop.call_soon(self._call_connection_lost, exc)
537
538    def _call_connection_lost(self, exc):
539        try:
540            self._protocol.connection_lost(exc)
541        finally:
542            self._pipe.close()
543            self._pipe = None
544            self._protocol = None
545            self._loop = None
546
547
548class _UnixWritePipeTransport(transports._FlowControlMixin,
549                              transports.WriteTransport):
550
551    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
552        super().__init__(extra, loop)
553        self._extra['pipe'] = pipe
554        self._pipe = pipe
555        self._fileno = pipe.fileno()
556        self._protocol = protocol
557        self._buffer = bytearray()
558        self._conn_lost = 0
559        self._closing = False  # Set when close() or write_eof() called.
560
561        mode = os.fstat(self._fileno).st_mode
562        is_char = stat.S_ISCHR(mode)
563        is_fifo = stat.S_ISFIFO(mode)
564        is_socket = stat.S_ISSOCK(mode)
565        if not (is_char or is_fifo or is_socket):
566            self._pipe = None
567            self._fileno = None
568            self._protocol = None
569            raise ValueError("Pipe transport is only for "
570                             "pipes, sockets and character devices")
571
572        os.set_blocking(self._fileno, False)
573        self._loop.call_soon(self._protocol.connection_made, self)
574
575        # On AIX, the reader trick (to be notified when the read end of the
576        # socket is closed) only works for sockets. On other platforms it
577        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
578        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
579            # only start reading when connection_made() has been called
580            self._loop.call_soon(self._loop._add_reader,
581                                 self._fileno, self._read_ready)
582
583        if waiter is not None:
584            # only wake up the waiter when connection_made() has been called
585            self._loop.call_soon(futures._set_result_unless_cancelled,
586                                 waiter, None)
587
588    def __repr__(self):
589        info = [self.__class__.__name__]
590        if self._pipe is None:
591            info.append('closed')
592        elif self._closing:
593            info.append('closing')
594        info.append(f'fd={self._fileno}')
595        selector = getattr(self._loop, '_selector', None)
596        if self._pipe is not None and selector is not None:
597            polling = selector_events._test_selector_event(
598                selector, self._fileno, selectors.EVENT_WRITE)
599            if polling:
600                info.append('polling')
601            else:
602                info.append('idle')
603
604            bufsize = self.get_write_buffer_size()
605            info.append(f'bufsize={bufsize}')
606        elif self._pipe is not None:
607            info.append('open')
608        else:
609            info.append('closed')
610        return '<{}>'.format(' '.join(info))
611
612    def get_write_buffer_size(self):
613        return len(self._buffer)
614
615    def _read_ready(self):
616        # Pipe was closed by peer.
617        if self._loop.get_debug():
618            logger.info("%r was closed by peer", self)
619        if self._buffer:
620            self._close(BrokenPipeError())
621        else:
622            self._close()
623
624    def write(self, data):
625        assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
626        if isinstance(data, bytearray):
627            data = memoryview(data)
628        if not data:
629            return
630
631        if self._conn_lost or self._closing:
632            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
633                logger.warning('pipe closed by peer or '
634                               'os.write(pipe, data) raised exception.')
635            self._conn_lost += 1
636            return
637
638        if not self._buffer:
639            # Attempt to send it right away first.
640            try:
641                n = os.write(self._fileno, data)
642            except (BlockingIOError, InterruptedError):
643                n = 0
644            except Exception as exc:
645                self._conn_lost += 1
646                self._fatal_error(exc, 'Fatal write error on pipe transport')
647                return
648            if n == len(data):
649                return
650            elif n > 0:
651                data = memoryview(data)[n:]
652            self._loop._add_writer(self._fileno, self._write_ready)
653
654        self._buffer += data
655        self._maybe_pause_protocol()
656
657    def _write_ready(self):
658        assert self._buffer, 'Data should not be empty'
659
660        try:
661            n = os.write(self._fileno, self._buffer)
662        except (BlockingIOError, InterruptedError):
663            pass
664        except Exception as exc:
665            self._buffer.clear()
666            self._conn_lost += 1
667            # Remove writer here, _fatal_error() doesn't it
668            # because _buffer is empty.
669            self._loop._remove_writer(self._fileno)
670            self._fatal_error(exc, 'Fatal write error on pipe transport')
671        else:
672            if n == len(self._buffer):
673                self._buffer.clear()
674                self._loop._remove_writer(self._fileno)
675                self._maybe_resume_protocol()  # May append to buffer.
676                if self._closing:
677                    self._loop._remove_reader(self._fileno)
678                    self._call_connection_lost(None)
679                return
680            elif n > 0:
681                del self._buffer[:n]
682
683    def can_write_eof(self):
684        return True
685
686    def write_eof(self):
687        if self._closing:
688            return
689        assert self._pipe
690        self._closing = True
691        if not self._buffer:
692            self._loop._remove_reader(self._fileno)
693            self._loop.call_soon(self._call_connection_lost, None)
694
695    def set_protocol(self, protocol):
696        self._protocol = protocol
697
698    def get_protocol(self):
699        return self._protocol
700
701    def is_closing(self):
702        return self._closing
703
704    def close(self):
705        if self._pipe is not None and not self._closing:
706            # write_eof is all what we needed to close the write pipe
707            self.write_eof()
708
709    def __del__(self):
710        if self._pipe is not None:
711            warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
712                          source=self)
713            self._pipe.close()
714
715    def abort(self):
716        self._close(None)
717
718    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
719        # should be called by exception handler only
720        if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
721            if self._loop.get_debug():
722                logger.debug("%r: %s", self, message, exc_info=True)
723        else:
724            self._loop.call_exception_handler({
725                'message': message,
726                'exception': exc,
727                'transport': self,
728                'protocol': self._protocol,
729            })
730        self._close(exc)
731
732    def _close(self, exc=None):
733        self._closing = True
734        if self._buffer:
735            self._loop._remove_writer(self._fileno)
736        self._buffer.clear()
737        self._loop._remove_reader(self._fileno)
738        self._loop.call_soon(self._call_connection_lost, exc)
739
740    def _call_connection_lost(self, exc):
741        try:
742            self._protocol.connection_lost(exc)
743        finally:
744            self._pipe.close()
745            self._pipe = None
746            self._protocol = None
747            self._loop = None
748
749
750class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
751
752    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
753        stdin_w = None
754        if stdin == subprocess.PIPE:
755            # Use a socket pair for stdin, since not all platforms
756            # support selecting read events on the write end of a
757            # socket (which we use in order to detect closing of the
758            # other end).  Notably this is needed on AIX, and works
759            # just fine on other platforms.
760            stdin, stdin_w = socket.socketpair()
761        self._proc = subprocess.Popen(
762            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
763            universal_newlines=False, bufsize=bufsize, **kwargs)
764        if stdin_w is not None:
765            stdin.close()
766            self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
767
768
769class AbstractChildWatcher:
770    """Abstract base class for monitoring child processes.
771
772    Objects derived from this class monitor a collection of subprocesses and
773    report their termination or interruption by a signal.
774
775    New callbacks are registered with .add_child_handler(). Starting a new
776    process must be done within a 'with' block to allow the watcher to suspend
777    its activity until the new process if fully registered (this is needed to
778    prevent a race condition in some implementations).
779
780    Example:
781        with watcher:
782            proc = subprocess.Popen("sleep 1")
783            watcher.add_child_handler(proc.pid, callback)
784
785    Notes:
786        Implementations of this class must be thread-safe.
787
788        Since child watcher objects may catch the SIGCHLD signal and call
789        waitpid(-1), there should be only one active object per process.
790    """
791
792    def add_child_handler(self, pid, callback, *args):
793        """Register a new child handler.
794
795        Arrange for callback(pid, returncode, *args) to be called when
796        process 'pid' terminates. Specifying another callback for the same
797        process replaces the previous handler.
798
799        Note: callback() must be thread-safe.
800        """
801        raise NotImplementedError()
802
803    def remove_child_handler(self, pid):
804        """Removes the handler for process 'pid'.
805
806        The function returns True if the handler was successfully removed,
807        False if there was nothing to remove."""
808
809        raise NotImplementedError()
810
811    def attach_loop(self, loop):
812        """Attach the watcher to an event loop.
813
814        If the watcher was previously attached to an event loop, then it is
815        first detached before attaching to the new loop.
816
817        Note: loop may be None.
818        """
819        raise NotImplementedError()
820
821    def close(self):
822        """Close the watcher.
823
824        This must be called to make sure that any underlying resource is freed.
825        """
826        raise NotImplementedError()
827
828    def __enter__(self):
829        """Enter the watcher's context and allow starting new processes
830
831        This function must return self"""
832        raise NotImplementedError()
833
834    def __exit__(self, a, b, c):
835        """Exit the watcher's context"""
836        raise NotImplementedError()
837
838
839class BaseChildWatcher(AbstractChildWatcher):
840
841    def __init__(self):
842        self._loop = None
843        self._callbacks = {}
844
845    def close(self):
846        self.attach_loop(None)
847
848    def _do_waitpid(self, expected_pid):
849        raise NotImplementedError()
850
851    def _do_waitpid_all(self):
852        raise NotImplementedError()
853
854    def attach_loop(self, loop):
855        assert loop is None or isinstance(loop, events.AbstractEventLoop)
856
857        if self._loop is not None and loop is None and self._callbacks:
858            warnings.warn(
859                'A loop is being detached '
860                'from a child watcher with pending handlers',
861                RuntimeWarning)
862
863        if self._loop is not None:
864            self._loop.remove_signal_handler(signal.SIGCHLD)
865
866        self._loop = loop
867        if loop is not None:
868            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
869
870            # Prevent a race condition in case a child terminated
871            # during the switch.
872            self._do_waitpid_all()
873
874    def _sig_chld(self):
875        try:
876            self._do_waitpid_all()
877        except Exception as exc:
878            # self._loop should always be available here
879            # as '_sig_chld' is added as a signal handler
880            # in 'attach_loop'
881            self._loop.call_exception_handler({
882                'message': 'Unknown exception in SIGCHLD handler',
883                'exception': exc,
884            })
885
886    def _compute_returncode(self, status):
887        if os.WIFSIGNALED(status):
888            # The child process died because of a signal.
889            return -os.WTERMSIG(status)
890        elif os.WIFEXITED(status):
891            # The child process exited (e.g sys.exit()).
892            return os.WEXITSTATUS(status)
893        else:
894            # The child exited, but we don't understand its status.
895            # This shouldn't happen, but if it does, let's just
896            # return that status; perhaps that helps debug it.
897            return status
898
899
900class SafeChildWatcher(BaseChildWatcher):
901    """'Safe' child watcher implementation.
902
903    This implementation avoids disrupting other code spawning processes by
904    polling explicitly each process in the SIGCHLD handler instead of calling
905    os.waitpid(-1).
906
907    This is a safe solution but it has a significant overhead when handling a
908    big number of children (O(n) each time SIGCHLD is raised)
909    """
910
911    def close(self):
912        self._callbacks.clear()
913        super().close()
914
915    def __enter__(self):
916        return self
917
918    def __exit__(self, a, b, c):
919        pass
920
921    def add_child_handler(self, pid, callback, *args):
922        if self._loop is None:
923            raise RuntimeError(
924                "Cannot add child handler, "
925                "the child watcher does not have a loop attached")
926
927        self._callbacks[pid] = (callback, args)
928
929        # Prevent a race condition in case the child is already terminated.
930        self._do_waitpid(pid)
931
932    def remove_child_handler(self, pid):
933        try:
934            del self._callbacks[pid]
935            return True
936        except KeyError:
937            return False
938
939    def _do_waitpid_all(self):
940
941        for pid in list(self._callbacks):
942            self._do_waitpid(pid)
943
944    def _do_waitpid(self, expected_pid):
945        assert expected_pid > 0
946
947        try:
948            pid, status = os.waitpid(expected_pid, os.WNOHANG)
949        except ChildProcessError:
950            # The child process is already reaped
951            # (may happen if waitpid() is called elsewhere).
952            pid = expected_pid
953            returncode = 255
954            logger.warning(
955                "Unknown child process pid %d, will report returncode 255",
956                pid)
957        else:
958            if pid == 0:
959                # The child process is still alive.
960                return
961
962            returncode = self._compute_returncode(status)
963            if self._loop.get_debug():
964                logger.debug('process %s exited with returncode %s',
965                             expected_pid, returncode)
966
967        try:
968            callback, args = self._callbacks.pop(pid)
969        except KeyError:  # pragma: no cover
970            # May happen if .remove_child_handler() is called
971            # after os.waitpid() returns.
972            if self._loop.get_debug():
973                logger.warning("Child watcher got an unexpected pid: %r",
974                               pid, exc_info=True)
975        else:
976            callback(pid, returncode, *args)
977
978
979class FastChildWatcher(BaseChildWatcher):
980    """'Fast' child watcher implementation.
981
982    This implementation reaps every terminated processes by calling
983    os.waitpid(-1) directly, possibly breaking other code spawning processes
984    and waiting for their termination.
985
986    There is no noticeable overhead when handling a big number of children
987    (O(1) each time a child terminates).
988    """
989    def __init__(self):
990        super().__init__()
991        self._lock = threading.Lock()
992        self._zombies = {}
993        self._forks = 0
994
995    def close(self):
996        self._callbacks.clear()
997        self._zombies.clear()
998        super().close()
999
1000    def __enter__(self):
1001        with self._lock:
1002            self._forks += 1
1003
1004            return self
1005
1006    def __exit__(self, a, b, c):
1007        with self._lock:
1008            self._forks -= 1
1009
1010            if self._forks or not self._zombies:
1011                return
1012
1013            collateral_victims = str(self._zombies)
1014            self._zombies.clear()
1015
1016        logger.warning(
1017            "Caught subprocesses termination from unknown pids: %s",
1018            collateral_victims)
1019
1020    def add_child_handler(self, pid, callback, *args):
1021        assert self._forks, "Must use the context manager"
1022
1023        if self._loop is None:
1024            raise RuntimeError(
1025                "Cannot add child handler, "
1026                "the child watcher does not have a loop attached")
1027
1028        with self._lock:
1029            try:
1030                returncode = self._zombies.pop(pid)
1031            except KeyError:
1032                # The child is running.
1033                self._callbacks[pid] = callback, args
1034                return
1035
1036        # The child is dead already. We can fire the callback.
1037        callback(pid, returncode, *args)
1038
1039    def remove_child_handler(self, pid):
1040        try:
1041            del self._callbacks[pid]
1042            return True
1043        except KeyError:
1044            return False
1045
1046    def _do_waitpid_all(self):
1047        # Because of signal coalescing, we must keep calling waitpid() as
1048        # long as we're able to reap a child.
1049        while True:
1050            try:
1051                pid, status = os.waitpid(-1, os.WNOHANG)
1052            except ChildProcessError:
1053                # No more child processes exist.
1054                return
1055            else:
1056                if pid == 0:
1057                    # A child process is still alive.
1058                    return
1059
1060                returncode = self._compute_returncode(status)
1061
1062            with self._lock:
1063                try:
1064                    callback, args = self._callbacks.pop(pid)
1065                except KeyError:
1066                    # unknown child
1067                    if self._forks:
1068                        # It may not be registered yet.
1069                        self._zombies[pid] = returncode
1070                        if self._loop.get_debug():
1071                            logger.debug('unknown process %s exited '
1072                                         'with returncode %s',
1073                                         pid, returncode)
1074                        continue
1075                    callback = None
1076                else:
1077                    if self._loop.get_debug():
1078                        logger.debug('process %s exited with returncode %s',
1079                                     pid, returncode)
1080
1081            if callback is None:
1082                logger.warning(
1083                    "Caught subprocess termination from unknown pid: "
1084                    "%d -> %d", pid, returncode)
1085            else:
1086                callback(pid, returncode, *args)
1087
1088
1089class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1090    """UNIX event loop policy with a watcher for child processes."""
1091    _loop_factory = _UnixSelectorEventLoop
1092
1093    def __init__(self):
1094        super().__init__()
1095        self._watcher = None
1096
1097    def _init_watcher(self):
1098        with events._lock:
1099            if self._watcher is None:  # pragma: no branch
1100                self._watcher = SafeChildWatcher()
1101                if isinstance(threading.current_thread(),
1102                              threading._MainThread):
1103                    self._watcher.attach_loop(self._local._loop)
1104
1105    def set_event_loop(self, loop):
1106        """Set the event loop.
1107
1108        As a side effect, if a child watcher was set before, then calling
1109        .set_event_loop() from the main thread will call .attach_loop(loop) on
1110        the child watcher.
1111        """
1112
1113        super().set_event_loop(loop)
1114
1115        if (self._watcher is not None and
1116                isinstance(threading.current_thread(), threading._MainThread)):
1117            self._watcher.attach_loop(loop)
1118
1119    def get_child_watcher(self):
1120        """Get the watcher for child processes.
1121
1122        If not yet set, a SafeChildWatcher object is automatically created.
1123        """
1124        if self._watcher is None:
1125            self._init_watcher()
1126
1127        return self._watcher
1128
1129    def set_child_watcher(self, watcher):
1130        """Set the watcher for child processes."""
1131
1132        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1133
1134        if self._watcher is not None:
1135            self._watcher.close()
1136
1137        self._watcher = watcher
1138
1139
1140SelectorEventLoop = _UnixSelectorEventLoop
1141DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1142