1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import heapq
20import itertools
21import logging
22import os
23import socket
24import subprocess
25import threading
26import time
27import traceback
28import sys
29import warnings
30import weakref
31
32try:
33    import ssl
34except ImportError:  # pragma: no cover
35    ssl = None
36
37from . import constants
38from . import coroutines
39from . import events
40from . import futures
41from . import protocols
42from . import sslproto
43from . import tasks
44from . import transports
45from .log import logger
46
47
48__all__ = 'BaseEventLoop',
49
50
51# Minimum number of _scheduled timer handles before cleanup of
52# cancelled handles is performed.
53_MIN_SCHEDULED_TIMER_HANDLES = 100
54
55# Minimum fraction of _scheduled timer handles that are cancelled
56# before cleanup of cancelled handles is performed.
57_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
58
59# Exceptions which must not call the exception handler in fatal error
60# methods (_fatal_error())
61_FATAL_ERROR_IGNORE = (BrokenPipeError,
62                       ConnectionResetError, ConnectionAbortedError)
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69
70def _format_handle(handle):
71    cb = handle._callback
72    if isinstance(getattr(cb, '__self__', None), tasks.Task):
73        # format the task
74        return repr(cb.__self__)
75    else:
76        return str(handle)
77
78
79def _format_pipe(fd):
80    if fd == subprocess.PIPE:
81        return '<pipe>'
82    elif fd == subprocess.STDOUT:
83        return '<stdout>'
84    else:
85        return repr(fd)
86
87
88def _set_reuseport(sock):
89    if not hasattr(socket, 'SO_REUSEPORT'):
90        raise ValueError('reuse_port not supported by socket module')
91    else:
92        try:
93            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
94        except OSError:
95            raise ValueError('reuse_port not supported by socket module, '
96                             'SO_REUSEPORT defined but not implemented.')
97
98
99def _ipaddr_info(host, port, family, type, proto):
100    # Try to skip getaddrinfo if "host" is already an IP. Users might have
101    # handled name resolution in their own code and pass in resolved IPs.
102    if not hasattr(socket, 'inet_pton'):
103        return
104
105    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
106            host is None:
107        return None
108
109    if type == socket.SOCK_STREAM:
110        proto = socket.IPPROTO_TCP
111    elif type == socket.SOCK_DGRAM:
112        proto = socket.IPPROTO_UDP
113    else:
114        return None
115
116    if port is None:
117        port = 0
118    elif isinstance(port, bytes) and port == b'':
119        port = 0
120    elif isinstance(port, str) and port == '':
121        port = 0
122    else:
123        # If port's a service name like "http", don't skip getaddrinfo.
124        try:
125            port = int(port)
126        except (TypeError, ValueError):
127            return None
128
129    if family == socket.AF_UNSPEC:
130        afs = [socket.AF_INET]
131        if _HAS_IPv6:
132            afs.append(socket.AF_INET6)
133    else:
134        afs = [family]
135
136    if isinstance(host, bytes):
137        host = host.decode('idna')
138    if '%' in host:
139        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
140        # like '::1%lo0'.
141        return None
142
143    for af in afs:
144        try:
145            socket.inet_pton(af, host)
146            # The host has already been resolved.
147            if _HAS_IPv6 and af == socket.AF_INET6:
148                return af, type, proto, '', (host, port, 0, 0)
149            else:
150                return af, type, proto, '', (host, port)
151        except OSError:
152            pass
153
154    # "host" is not an IP address.
155    return None
156
157
158def _run_until_complete_cb(fut):
159    if not fut.cancelled():
160        exc = fut.exception()
161        if isinstance(exc, BaseException) and not isinstance(exc, Exception):
162            # Issue #22429: run_forever() already finished, no need to
163            # stop it.
164            return
165    futures._get_loop(fut).stop()
166
167
168if hasattr(socket, 'TCP_NODELAY'):
169    def _set_nodelay(sock):
170        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
171                sock.type == socket.SOCK_STREAM and
172                sock.proto == socket.IPPROTO_TCP):
173            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
174else:
175    def _set_nodelay(sock):
176        pass
177
178
179class _SendfileFallbackProtocol(protocols.Protocol):
180    def __init__(self, transp):
181        if not isinstance(transp, transports._FlowControlMixin):
182            raise TypeError("transport should be _FlowControlMixin instance")
183        self._transport = transp
184        self._proto = transp.get_protocol()
185        self._should_resume_reading = transp.is_reading()
186        self._should_resume_writing = transp._protocol_paused
187        transp.pause_reading()
188        transp.set_protocol(self)
189        if self._should_resume_writing:
190            self._write_ready_fut = self._transport._loop.create_future()
191        else:
192            self._write_ready_fut = None
193
194    async def drain(self):
195        if self._transport.is_closing():
196            raise ConnectionError("Connection closed by peer")
197        fut = self._write_ready_fut
198        if fut is None:
199            return
200        await fut
201
202    def connection_made(self, transport):
203        raise RuntimeError("Invalid state: "
204                           "connection should have been established already.")
205
206    def connection_lost(self, exc):
207        if self._write_ready_fut is not None:
208            # Never happens if peer disconnects after sending the whole content
209            # Thus disconnection is always an exception from user perspective
210            if exc is None:
211                self._write_ready_fut.set_exception(
212                    ConnectionError("Connection is closed by peer"))
213            else:
214                self._write_ready_fut.set_exception(exc)
215        self._proto.connection_lost(exc)
216
217    def pause_writing(self):
218        if self._write_ready_fut is not None:
219            return
220        self._write_ready_fut = self._transport._loop.create_future()
221
222    def resume_writing(self):
223        if self._write_ready_fut is None:
224            return
225        self._write_ready_fut.set_result(False)
226        self._write_ready_fut = None
227
228    def data_received(self, data):
229        raise RuntimeError("Invalid state: reading should be paused")
230
231    def eof_received(self):
232        raise RuntimeError("Invalid state: reading should be paused")
233
234    async def restore(self):
235        self._transport.set_protocol(self._proto)
236        if self._should_resume_reading:
237            self._transport.resume_reading()
238        if self._write_ready_fut is not None:
239            # Cancel the future.
240            # Basically it has no effect because protocol is switched back,
241            # no code should wait for it anymore.
242            self._write_ready_fut.cancel()
243        if self._should_resume_writing:
244            self._proto.resume_writing()
245
246
247class Server(events.AbstractServer):
248
249    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
250                 ssl_handshake_timeout):
251        self._loop = loop
252        self._sockets = sockets
253        self._active_count = 0
254        self._waiters = []
255        self._protocol_factory = protocol_factory
256        self._backlog = backlog
257        self._ssl_context = ssl_context
258        self._ssl_handshake_timeout = ssl_handshake_timeout
259        self._serving = False
260        self._serving_forever_fut = None
261
262    def __repr__(self):
263        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
264
265    def _attach(self):
266        assert self._sockets is not None
267        self._active_count += 1
268
269    def _detach(self):
270        assert self._active_count > 0
271        self._active_count -= 1
272        if self._active_count == 0 and self._sockets is None:
273            self._wakeup()
274
275    def _wakeup(self):
276        waiters = self._waiters
277        self._waiters = None
278        for waiter in waiters:
279            if not waiter.done():
280                waiter.set_result(waiter)
281
282    def _start_serving(self):
283        if self._serving:
284            return
285        self._serving = True
286        for sock in self._sockets:
287            sock.listen(self._backlog)
288            self._loop._start_serving(
289                self._protocol_factory, sock, self._ssl_context,
290                self, self._backlog, self._ssl_handshake_timeout)
291
292    def get_loop(self):
293        return self._loop
294
295    def is_serving(self):
296        return self._serving
297
298    @property
299    def sockets(self):
300        if self._sockets is None:
301            return []
302        return list(self._sockets)
303
304    def close(self):
305        sockets = self._sockets
306        if sockets is None:
307            return
308        self._sockets = None
309
310        for sock in sockets:
311            self._loop._stop_serving(sock)
312
313        self._serving = False
314
315        if (self._serving_forever_fut is not None and
316                not self._serving_forever_fut.done()):
317            self._serving_forever_fut.cancel()
318            self._serving_forever_fut = None
319
320        if self._active_count == 0:
321            self._wakeup()
322
323    async def start_serving(self):
324        self._start_serving()
325        # Skip one loop iteration so that all 'loop.add_reader'
326        # go through.
327        await tasks.sleep(0, loop=self._loop)
328
329    async def serve_forever(self):
330        if self._serving_forever_fut is not None:
331            raise RuntimeError(
332                f'server {self!r} is already being awaited on serve_forever()')
333        if self._sockets is None:
334            raise RuntimeError(f'server {self!r} is closed')
335
336        self._start_serving()
337        self._serving_forever_fut = self._loop.create_future()
338
339        try:
340            await self._serving_forever_fut
341        except futures.CancelledError:
342            try:
343                self.close()
344                await self.wait_closed()
345            finally:
346                raise
347        finally:
348            self._serving_forever_fut = None
349
350    async def wait_closed(self):
351        if self._sockets is None or self._waiters is None:
352            return
353        waiter = self._loop.create_future()
354        self._waiters.append(waiter)
355        await waiter
356
357
358class BaseEventLoop(events.AbstractEventLoop):
359
360    def __init__(self):
361        self._timer_cancelled_count = 0
362        self._closed = False
363        self._stopping = False
364        self._ready = collections.deque()
365        self._scheduled = []
366        self._default_executor = None
367        self._internal_fds = 0
368        # Identifier of the thread running the event loop, or None if the
369        # event loop is not running
370        self._thread_id = None
371        self._clock_resolution = time.get_clock_info('monotonic').resolution
372        self._exception_handler = None
373        self.set_debug(coroutines._is_debug_mode())
374        # In debug mode, if the execution of a callback or a step of a task
375        # exceed this duration in seconds, the slow callback/task is logged.
376        self.slow_callback_duration = 0.1
377        self._current_handle = None
378        self._task_factory = None
379        self._coroutine_origin_tracking_enabled = False
380        self._coroutine_origin_tracking_saved_depth = None
381
382        # A weak set of all asynchronous generators that are
383        # being iterated by the loop.
384        self._asyncgens = weakref.WeakSet()
385        # Set to True when `loop.shutdown_asyncgens` is called.
386        self._asyncgens_shutdown_called = False
387
388    def __repr__(self):
389        return (
390            f'<{self.__class__.__name__} running={self.is_running()} '
391            f'closed={self.is_closed()} debug={self.get_debug()}>'
392        )
393
394    def create_future(self):
395        """Create a Future object attached to the loop."""
396        return futures.Future(loop=self)
397
398    def create_task(self, coro):
399        """Schedule a coroutine object.
400
401        Return a task object.
402        """
403        self._check_closed()
404        if self._task_factory is None:
405            task = tasks.Task(coro, loop=self)
406            if task._source_traceback:
407                del task._source_traceback[-1]
408        else:
409            task = self._task_factory(self, coro)
410        return task
411
412    def set_task_factory(self, factory):
413        """Set a task factory that will be used by loop.create_task().
414
415        If factory is None the default task factory will be set.
416
417        If factory is a callable, it should have a signature matching
418        '(loop, coro)', where 'loop' will be a reference to the active
419        event loop, 'coro' will be a coroutine object.  The callable
420        must return a Future.
421        """
422        if factory is not None and not callable(factory):
423            raise TypeError('task factory must be a callable or None')
424        self._task_factory = factory
425
426    def get_task_factory(self):
427        """Return a task factory, or None if the default one is in use."""
428        return self._task_factory
429
430    def _make_socket_transport(self, sock, protocol, waiter=None, *,
431                               extra=None, server=None):
432        """Create socket transport."""
433        raise NotImplementedError
434
435    def _make_ssl_transport(
436            self, rawsock, protocol, sslcontext, waiter=None,
437            *, server_side=False, server_hostname=None,
438            extra=None, server=None,
439            ssl_handshake_timeout=None,
440            call_connection_made=True):
441        """Create SSL transport."""
442        raise NotImplementedError
443
444    def _make_datagram_transport(self, sock, protocol,
445                                 address=None, waiter=None, extra=None):
446        """Create datagram transport."""
447        raise NotImplementedError
448
449    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
450                                  extra=None):
451        """Create read pipe transport."""
452        raise NotImplementedError
453
454    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
455                                   extra=None):
456        """Create write pipe transport."""
457        raise NotImplementedError
458
459    async def _make_subprocess_transport(self, protocol, args, shell,
460                                         stdin, stdout, stderr, bufsize,
461                                         extra=None, **kwargs):
462        """Create subprocess transport."""
463        raise NotImplementedError
464
465    def _write_to_self(self):
466        """Write a byte to self-pipe, to wake up the event loop.
467
468        This may be called from a different thread.
469
470        The subclass is responsible for implementing the self-pipe.
471        """
472        raise NotImplementedError
473
474    def _process_events(self, event_list):
475        """Process selector events."""
476        raise NotImplementedError
477
478    def _check_closed(self):
479        if self._closed:
480            raise RuntimeError('Event loop is closed')
481
482    def _asyncgen_finalizer_hook(self, agen):
483        self._asyncgens.discard(agen)
484        if not self.is_closed():
485            self.call_soon_threadsafe(self.create_task, agen.aclose())
486
487    def _asyncgen_firstiter_hook(self, agen):
488        if self._asyncgens_shutdown_called:
489            warnings.warn(
490                f"asynchronous generator {agen!r} was scheduled after "
491                f"loop.shutdown_asyncgens() call",
492                ResourceWarning, source=self)
493
494        self._asyncgens.add(agen)
495
496    async def shutdown_asyncgens(self):
497        """Shutdown all active asynchronous generators."""
498        self._asyncgens_shutdown_called = True
499
500        if not len(self._asyncgens):
501            # If Python version is <3.6 or we don't have any asynchronous
502            # generators alive.
503            return
504
505        closing_agens = list(self._asyncgens)
506        self._asyncgens.clear()
507
508        results = await tasks.gather(
509            *[ag.aclose() for ag in closing_agens],
510            return_exceptions=True,
511            loop=self)
512
513        for result, agen in zip(results, closing_agens):
514            if isinstance(result, Exception):
515                self.call_exception_handler({
516                    'message': f'an error occurred during closing of '
517                               f'asynchronous generator {agen!r}',
518                    'exception': result,
519                    'asyncgen': agen
520                })
521
522    def run_forever(self):
523        """Run until stop() is called."""
524        self._check_closed()
525        if self.is_running():
526            raise RuntimeError('This event loop is already running')
527        if events._get_running_loop() is not None:
528            raise RuntimeError(
529                'Cannot run the event loop while another loop is running')
530        self._set_coroutine_origin_tracking(self._debug)
531        self._thread_id = threading.get_ident()
532
533        old_agen_hooks = sys.get_asyncgen_hooks()
534        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
535                               finalizer=self._asyncgen_finalizer_hook)
536        try:
537            events._set_running_loop(self)
538            while True:
539                self._run_once()
540                if self._stopping:
541                    break
542        finally:
543            self._stopping = False
544            self._thread_id = None
545            events._set_running_loop(None)
546            self._set_coroutine_origin_tracking(False)
547            sys.set_asyncgen_hooks(*old_agen_hooks)
548
549    def run_until_complete(self, future):
550        """Run until the Future is done.
551
552        If the argument is a coroutine, it is wrapped in a Task.
553
554        WARNING: It would be disastrous to call run_until_complete()
555        with the same coroutine twice -- it would wrap it in two
556        different Tasks and that can't be good.
557
558        Return the Future's result, or raise its exception.
559        """
560        self._check_closed()
561
562        new_task = not futures.isfuture(future)
563        future = tasks.ensure_future(future, loop=self)
564        if new_task:
565            # An exception is raised if the future didn't complete, so there
566            # is no need to log the "destroy pending task" message
567            future._log_destroy_pending = False
568
569        future.add_done_callback(_run_until_complete_cb)
570        try:
571            self.run_forever()
572        except:
573            if new_task and future.done() and not future.cancelled():
574                # The coroutine raised a BaseException. Consume the exception
575                # to not log a warning, the caller doesn't have access to the
576                # local task.
577                future.exception()
578            raise
579        finally:
580            future.remove_done_callback(_run_until_complete_cb)
581        if not future.done():
582            raise RuntimeError('Event loop stopped before Future completed.')
583
584        return future.result()
585
586    def stop(self):
587        """Stop running the event loop.
588
589        Every callback already scheduled will still run.  This simply informs
590        run_forever to stop looping after a complete iteration.
591        """
592        self._stopping = True
593
594    def close(self):
595        """Close the event loop.
596
597        This clears the queues and shuts down the executor,
598        but does not wait for the executor to finish.
599
600        The event loop must not be running.
601        """
602        if self.is_running():
603            raise RuntimeError("Cannot close a running event loop")
604        if self._closed:
605            return
606        if self._debug:
607            logger.debug("Close %r", self)
608        self._closed = True
609        self._ready.clear()
610        self._scheduled.clear()
611        executor = self._default_executor
612        if executor is not None:
613            self._default_executor = None
614            executor.shutdown(wait=False)
615
616    def is_closed(self):
617        """Returns True if the event loop was closed."""
618        return self._closed
619
620    def __del__(self):
621        if not self.is_closed():
622            warnings.warn(f"unclosed event loop {self!r}", ResourceWarning,
623                          source=self)
624            if not self.is_running():
625                self.close()
626
627    def is_running(self):
628        """Returns True if the event loop is running."""
629        return (self._thread_id is not None)
630
631    def time(self):
632        """Return the time according to the event loop's clock.
633
634        This is a float expressed in seconds since an epoch, but the
635        epoch, precision, accuracy and drift are unspecified and may
636        differ per event loop.
637        """
638        return time.monotonic()
639
640    def call_later(self, delay, callback, *args, context=None):
641        """Arrange for a callback to be called at a given time.
642
643        Return a Handle: an opaque object with a cancel() method that
644        can be used to cancel the call.
645
646        The delay can be an int or float, expressed in seconds.  It is
647        always relative to the current time.
648
649        Each callback will be called exactly once.  If two callbacks
650        are scheduled for exactly the same time, it undefined which
651        will be called first.
652
653        Any positional arguments after the callback will be passed to
654        the callback when it is called.
655        """
656        timer = self.call_at(self.time() + delay, callback, *args,
657                             context=context)
658        if timer._source_traceback:
659            del timer._source_traceback[-1]
660        return timer
661
662    def call_at(self, when, callback, *args, context=None):
663        """Like call_later(), but uses an absolute time.
664
665        Absolute time corresponds to the event loop's time() method.
666        """
667        self._check_closed()
668        if self._debug:
669            self._check_thread()
670            self._check_callback(callback, 'call_at')
671        timer = events.TimerHandle(when, callback, args, self, context)
672        if timer._source_traceback:
673            del timer._source_traceback[-1]
674        heapq.heappush(self._scheduled, timer)
675        timer._scheduled = True
676        return timer
677
678    def call_soon(self, callback, *args, context=None):
679        """Arrange for a callback to be called as soon as possible.
680
681        This operates as a FIFO queue: callbacks are called in the
682        order in which they are registered.  Each callback will be
683        called exactly once.
684
685        Any positional arguments after the callback will be passed to
686        the callback when it is called.
687        """
688        self._check_closed()
689        if self._debug:
690            self._check_thread()
691            self._check_callback(callback, 'call_soon')
692        handle = self._call_soon(callback, args, context)
693        if handle._source_traceback:
694            del handle._source_traceback[-1]
695        return handle
696
697    def _check_callback(self, callback, method):
698        if (coroutines.iscoroutine(callback) or
699                coroutines.iscoroutinefunction(callback)):
700            raise TypeError(
701                f"coroutines cannot be used with {method}()")
702        if not callable(callback):
703            raise TypeError(
704                f'a callable object was expected by {method}(), '
705                f'got {callback!r}')
706
707    def _call_soon(self, callback, args, context):
708        handle = events.Handle(callback, args, self, context)
709        if handle._source_traceback:
710            del handle._source_traceback[-1]
711        self._ready.append(handle)
712        return handle
713
714    def _check_thread(self):
715        """Check that the current thread is the thread running the event loop.
716
717        Non-thread-safe methods of this class make this assumption and will
718        likely behave incorrectly when the assumption is violated.
719
720        Should only be called when (self._debug == True).  The caller is
721        responsible for checking this condition for performance reasons.
722        """
723        if self._thread_id is None:
724            return
725        thread_id = threading.get_ident()
726        if thread_id != self._thread_id:
727            raise RuntimeError(
728                "Non-thread-safe operation invoked on an event loop other "
729                "than the current one")
730
731    def call_soon_threadsafe(self, callback, *args, context=None):
732        """Like call_soon(), but thread-safe."""
733        self._check_closed()
734        if self._debug:
735            self._check_callback(callback, 'call_soon_threadsafe')
736        handle = self._call_soon(callback, args, context)
737        if handle._source_traceback:
738            del handle._source_traceback[-1]
739        self._write_to_self()
740        return handle
741
742    def run_in_executor(self, executor, func, *args):
743        self._check_closed()
744        if self._debug:
745            self._check_callback(func, 'run_in_executor')
746        if executor is None:
747            executor = self._default_executor
748            if executor is None:
749                executor = concurrent.futures.ThreadPoolExecutor()
750                self._default_executor = executor
751        return futures.wrap_future(
752            executor.submit(func, *args), loop=self)
753
754    def set_default_executor(self, executor):
755        self._default_executor = executor
756
757    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
758        msg = [f"{host}:{port!r}"]
759        if family:
760            msg.append(f'family={family!r}')
761        if type:
762            msg.append(f'type={type!r}')
763        if proto:
764            msg.append(f'proto={proto!r}')
765        if flags:
766            msg.append(f'flags={flags!r}')
767        msg = ', '.join(msg)
768        logger.debug('Get address info %s', msg)
769
770        t0 = self.time()
771        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
772        dt = self.time() - t0
773
774        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
775        if dt >= self.slow_callback_duration:
776            logger.info(msg)
777        else:
778            logger.debug(msg)
779        return addrinfo
780
781    async def getaddrinfo(self, host, port, *,
782                          family=0, type=0, proto=0, flags=0):
783        if self._debug:
784            getaddr_func = self._getaddrinfo_debug
785        else:
786            getaddr_func = socket.getaddrinfo
787
788        return await self.run_in_executor(
789            None, getaddr_func, host, port, family, type, proto, flags)
790
791    async def getnameinfo(self, sockaddr, flags=0):
792        return await self.run_in_executor(
793            None, socket.getnameinfo, sockaddr, flags)
794
795    async def sock_sendfile(self, sock, file, offset=0, count=None,
796                            *, fallback=True):
797        if self._debug and sock.gettimeout() != 0:
798            raise ValueError("the socket must be non-blocking")
799        self._check_sendfile_params(sock, file, offset, count)
800        try:
801            return await self._sock_sendfile_native(sock, file,
802                                                    offset, count)
803        except events.SendfileNotAvailableError as exc:
804            if not fallback:
805                raise
806        return await self._sock_sendfile_fallback(sock, file,
807                                                  offset, count)
808
809    async def _sock_sendfile_native(self, sock, file, offset, count):
810        # NB: sendfile syscall is not supported for SSL sockets and
811        # non-mmap files even if sendfile is supported by OS
812        raise events.SendfileNotAvailableError(
813            f"syscall sendfile is not available for socket {sock!r} "
814            "and file {file!r} combination")
815
816    async def _sock_sendfile_fallback(self, sock, file, offset, count):
817        if offset:
818            file.seek(offset)
819        blocksize = (
820            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
821            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
822        )
823        buf = bytearray(blocksize)
824        total_sent = 0
825        try:
826            while True:
827                if count:
828                    blocksize = min(count - total_sent, blocksize)
829                    if blocksize <= 0:
830                        break
831                view = memoryview(buf)[:blocksize]
832                read = await self.run_in_executor(None, file.readinto, view)
833                if not read:
834                    break  # EOF
835                await self.sock_sendall(sock, view)
836                total_sent += read
837            return total_sent
838        finally:
839            if total_sent > 0 and hasattr(file, 'seek'):
840                file.seek(offset + total_sent)
841
842    def _check_sendfile_params(self, sock, file, offset, count):
843        if 'b' not in getattr(file, 'mode', 'b'):
844            raise ValueError("file should be opened in binary mode")
845        if not sock.type == socket.SOCK_STREAM:
846            raise ValueError("only SOCK_STREAM type sockets are supported")
847        if count is not None:
848            if not isinstance(count, int):
849                raise TypeError(
850                    "count must be a positive integer (got {!r})".format(count))
851            if count <= 0:
852                raise ValueError(
853                    "count must be a positive integer (got {!r})".format(count))
854        if not isinstance(offset, int):
855            raise TypeError(
856                "offset must be a non-negative integer (got {!r})".format(
857                    offset))
858        if offset < 0:
859            raise ValueError(
860                "offset must be a non-negative integer (got {!r})".format(
861                    offset))
862
863    async def create_connection(
864            self, protocol_factory, host=None, port=None,
865            *, ssl=None, family=0,
866            proto=0, flags=0, sock=None,
867            local_addr=None, server_hostname=None,
868            ssl_handshake_timeout=None):
869        """Connect to a TCP server.
870
871        Create a streaming transport connection to a given Internet host and
872        port: socket family AF_INET or socket.AF_INET6 depending on host (or
873        family if specified), socket type SOCK_STREAM. protocol_factory must be
874        a callable returning a protocol instance.
875
876        This method is a coroutine which will try to establish the connection
877        in the background.  When successful, the coroutine returns a
878        (transport, protocol) pair.
879        """
880        if server_hostname is not None and not ssl:
881            raise ValueError('server_hostname is only meaningful with ssl')
882
883        if server_hostname is None and ssl:
884            # Use host as default for server_hostname.  It is an error
885            # if host is empty or not set, e.g. when an
886            # already-connected socket was passed or when only a port
887            # is given.  To avoid this error, you can pass
888            # server_hostname='' -- this will bypass the hostname
889            # check.  (This also means that if host is a numeric
890            # IP/IPv6 address, we will attempt to verify that exact
891            # address; this will probably fail, but it is possible to
892            # create a certificate for a specific IP address, so we
893            # don't judge it here.)
894            if not host:
895                raise ValueError('You must set server_hostname '
896                                 'when using ssl without a host')
897            server_hostname = host
898
899        if ssl_handshake_timeout is not None and not ssl:
900            raise ValueError(
901                'ssl_handshake_timeout is only meaningful with ssl')
902
903        if host is not None or port is not None:
904            if sock is not None:
905                raise ValueError(
906                    'host/port and sock can not be specified at the same time')
907
908            infos = await self._ensure_resolved(
909                (host, port), family=family,
910                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
911            if not infos:
912                raise OSError('getaddrinfo() returned empty list')
913
914            if local_addr is not None:
915                laddr_infos = await self._ensure_resolved(
916                    local_addr, family=family,
917                    type=socket.SOCK_STREAM, proto=proto,
918                    flags=flags, loop=self)
919                if not laddr_infos:
920                    raise OSError('getaddrinfo() returned empty list')
921
922            exceptions = []
923            for family, type, proto, cname, address in infos:
924                try:
925                    sock = socket.socket(family=family, type=type, proto=proto)
926                    sock.setblocking(False)
927                    if local_addr is not None:
928                        for _, _, _, _, laddr in laddr_infos:
929                            try:
930                                sock.bind(laddr)
931                                break
932                            except OSError as exc:
933                                msg = (
934                                    f'error while attempting to bind on '
935                                    f'address {laddr!r}: '
936                                    f'{exc.strerror.lower()}'
937                                )
938                                exc = OSError(exc.errno, msg)
939                                exceptions.append(exc)
940                        else:
941                            sock.close()
942                            sock = None
943                            continue
944                    if self._debug:
945                        logger.debug("connect %r to %r", sock, address)
946                    await self.sock_connect(sock, address)
947                except OSError as exc:
948                    if sock is not None:
949                        sock.close()
950                    exceptions.append(exc)
951                except:
952                    if sock is not None:
953                        sock.close()
954                    raise
955                else:
956                    break
957            else:
958                if len(exceptions) == 1:
959                    raise exceptions[0]
960                else:
961                    # If they all have the same str(), raise one.
962                    model = str(exceptions[0])
963                    if all(str(exc) == model for exc in exceptions):
964                        raise exceptions[0]
965                    # Raise a combined exception so the user can see all
966                    # the various error messages.
967                    raise OSError('Multiple exceptions: {}'.format(
968                        ', '.join(str(exc) for exc in exceptions)))
969
970        else:
971            if sock is None:
972                raise ValueError(
973                    'host and port was not specified and no sock specified')
974            if sock.type != socket.SOCK_STREAM:
975                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
976                # are SOCK_STREAM.
977                # We support passing AF_UNIX sockets even though we have
978                # a dedicated API for that: create_unix_connection.
979                # Disallowing AF_UNIX in this method, breaks backwards
980                # compatibility.
981                raise ValueError(
982                    f'A Stream Socket was expected, got {sock!r}')
983
984        transport, protocol = await self._create_connection_transport(
985            sock, protocol_factory, ssl, server_hostname,
986            ssl_handshake_timeout=ssl_handshake_timeout)
987        if self._debug:
988            # Get the socket from the transport because SSL transport closes
989            # the old socket and creates a new SSL socket
990            sock = transport.get_extra_info('socket')
991            logger.debug("%r connected to %s:%r: (%r, %r)",
992                         sock, host, port, transport, protocol)
993        return transport, protocol
994
995    async def _create_connection_transport(
996            self, sock, protocol_factory, ssl,
997            server_hostname, server_side=False,
998            ssl_handshake_timeout=None):
999
1000        sock.setblocking(False)
1001
1002        protocol = protocol_factory()
1003        waiter = self.create_future()
1004        if ssl:
1005            sslcontext = None if isinstance(ssl, bool) else ssl
1006            transport = self._make_ssl_transport(
1007                sock, protocol, sslcontext, waiter,
1008                server_side=server_side, server_hostname=server_hostname,
1009                ssl_handshake_timeout=ssl_handshake_timeout)
1010        else:
1011            transport = self._make_socket_transport(sock, protocol, waiter)
1012
1013        try:
1014            await waiter
1015        except:
1016            transport.close()
1017            raise
1018
1019        return transport, protocol
1020
1021    async def sendfile(self, transport, file, offset=0, count=None,
1022                       *, fallback=True):
1023        """Send a file to transport.
1024
1025        Return the total number of bytes which were sent.
1026
1027        The method uses high-performance os.sendfile if available.
1028
1029        file must be a regular file object opened in binary mode.
1030
1031        offset tells from where to start reading the file. If specified,
1032        count is the total number of bytes to transmit as opposed to
1033        sending the file until EOF is reached. File position is updated on
1034        return or also in case of error in which case file.tell()
1035        can be used to figure out the number of bytes
1036        which were sent.
1037
1038        fallback set to True makes asyncio to manually read and send
1039        the file when the platform does not support the sendfile syscall
1040        (e.g. Windows or SSL socket on Unix).
1041
1042        Raise SendfileNotAvailableError if the system does not support
1043        sendfile syscall and fallback is False.
1044        """
1045        if transport.is_closing():
1046            raise RuntimeError("Transport is closing")
1047        mode = getattr(transport, '_sendfile_compatible',
1048                       constants._SendfileMode.UNSUPPORTED)
1049        if mode is constants._SendfileMode.UNSUPPORTED:
1050            raise RuntimeError(
1051                f"sendfile is not supported for transport {transport!r}")
1052        if mode is constants._SendfileMode.TRY_NATIVE:
1053            try:
1054                return await self._sendfile_native(transport, file,
1055                                                   offset, count)
1056            except events.SendfileNotAvailableError as exc:
1057                if not fallback:
1058                    raise
1059
1060        if not fallback:
1061            raise RuntimeError(
1062                f"fallback is disabled and native sendfile is not "
1063                f"supported for transport {transport!r}")
1064
1065        return await self._sendfile_fallback(transport, file,
1066                                             offset, count)
1067
1068    async def _sendfile_native(self, transp, file, offset, count):
1069        raise events.SendfileNotAvailableError(
1070            "sendfile syscall is not supported")
1071
1072    async def _sendfile_fallback(self, transp, file, offset, count):
1073        if offset:
1074            file.seek(offset)
1075        blocksize = min(count, 16384) if count else 16384
1076        buf = bytearray(blocksize)
1077        total_sent = 0
1078        proto = _SendfileFallbackProtocol(transp)
1079        try:
1080            while True:
1081                if count:
1082                    blocksize = min(count - total_sent, blocksize)
1083                    if blocksize <= 0:
1084                        return total_sent
1085                view = memoryview(buf)[:blocksize]
1086                read = file.readinto(view)
1087                if not read:
1088                    return total_sent  # EOF
1089                await proto.drain()
1090                transp.write(view)
1091                total_sent += read
1092        finally:
1093            if total_sent > 0 and hasattr(file, 'seek'):
1094                file.seek(offset + total_sent)
1095            await proto.restore()
1096
1097    async def start_tls(self, transport, protocol, sslcontext, *,
1098                        server_side=False,
1099                        server_hostname=None,
1100                        ssl_handshake_timeout=None):
1101        """Upgrade transport to TLS.
1102
1103        Return a new transport that *protocol* should start using
1104        immediately.
1105        """
1106        if ssl is None:
1107            raise RuntimeError('Python ssl module is not available')
1108
1109        if not isinstance(sslcontext, ssl.SSLContext):
1110            raise TypeError(
1111                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1112                f'got {sslcontext!r}')
1113
1114        if not getattr(transport, '_start_tls_compatible', False):
1115            raise TypeError(
1116                f'transport {transport!r} is not supported by start_tls()')
1117
1118        waiter = self.create_future()
1119        ssl_protocol = sslproto.SSLProtocol(
1120            self, protocol, sslcontext, waiter,
1121            server_side, server_hostname,
1122            ssl_handshake_timeout=ssl_handshake_timeout,
1123            call_connection_made=False)
1124
1125        # Pause early so that "ssl_protocol.data_received()" doesn't
1126        # have a chance to get called before "ssl_protocol.connection_made()".
1127        transport.pause_reading()
1128
1129        transport.set_protocol(ssl_protocol)
1130        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1131        resume_cb = self.call_soon(transport.resume_reading)
1132
1133        try:
1134            await waiter
1135        except Exception:
1136            transport.close()
1137            conmade_cb.cancel()
1138            resume_cb.cancel()
1139            raise
1140
1141        return ssl_protocol._app_transport
1142
1143    async def create_datagram_endpoint(self, protocol_factory,
1144                                       local_addr=None, remote_addr=None, *,
1145                                       family=0, proto=0, flags=0,
1146                                       reuse_address=None, reuse_port=None,
1147                                       allow_broadcast=None, sock=None):
1148        """Create datagram connection."""
1149        if sock is not None:
1150            if sock.type != socket.SOCK_DGRAM:
1151                raise ValueError(
1152                    f'A UDP Socket was expected, got {sock!r}')
1153            if (local_addr or remote_addr or
1154                    family or proto or flags or
1155                    reuse_address or reuse_port or allow_broadcast):
1156                # show the problematic kwargs in exception msg
1157                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1158                            family=family, proto=proto, flags=flags,
1159                            reuse_address=reuse_address, reuse_port=reuse_port,
1160                            allow_broadcast=allow_broadcast)
1161                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1162                raise ValueError(
1163                    f'socket modifier keyword arguments can not be used '
1164                    f'when sock is specified. ({problems})')
1165            sock.setblocking(False)
1166            r_addr = None
1167        else:
1168            if not (local_addr or remote_addr):
1169                if family == 0:
1170                    raise ValueError('unexpected address family')
1171                addr_pairs_info = (((family, proto), (None, None)),)
1172            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1173                for addr in (local_addr, remote_addr):
1174                    if addr is not None and not isinstance(addr, str):
1175                        raise TypeError('string is expected')
1176                addr_pairs_info = (((family, proto),
1177                                    (local_addr, remote_addr)), )
1178            else:
1179                # join address by (family, protocol)
1180                addr_infos = collections.OrderedDict()
1181                for idx, addr in ((0, local_addr), (1, remote_addr)):
1182                    if addr is not None:
1183                        assert isinstance(addr, tuple) and len(addr) == 2, (
1184                            '2-tuple is expected')
1185
1186                        infos = await self._ensure_resolved(
1187                            addr, family=family, type=socket.SOCK_DGRAM,
1188                            proto=proto, flags=flags, loop=self)
1189                        if not infos:
1190                            raise OSError('getaddrinfo() returned empty list')
1191
1192                        for fam, _, pro, _, address in infos:
1193                            key = (fam, pro)
1194                            if key not in addr_infos:
1195                                addr_infos[key] = [None, None]
1196                            addr_infos[key][idx] = address
1197
1198                # each addr has to have info for each (family, proto) pair
1199                addr_pairs_info = [
1200                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1201                    if not ((local_addr and addr_pair[0] is None) or
1202                            (remote_addr and addr_pair[1] is None))]
1203
1204                if not addr_pairs_info:
1205                    raise ValueError('can not get address information')
1206
1207            exceptions = []
1208
1209            if reuse_address is None:
1210                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1211
1212            for ((family, proto),
1213                 (local_address, remote_address)) in addr_pairs_info:
1214                sock = None
1215                r_addr = None
1216                try:
1217                    sock = socket.socket(
1218                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1219                    if reuse_address:
1220                        sock.setsockopt(
1221                            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1222                    if reuse_port:
1223                        _set_reuseport(sock)
1224                    if allow_broadcast:
1225                        sock.setsockopt(
1226                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1227                    sock.setblocking(False)
1228
1229                    if local_addr:
1230                        sock.bind(local_address)
1231                    if remote_addr:
1232                        await self.sock_connect(sock, remote_address)
1233                        r_addr = remote_address
1234                except OSError as exc:
1235                    if sock is not None:
1236                        sock.close()
1237                    exceptions.append(exc)
1238                except:
1239                    if sock is not None:
1240                        sock.close()
1241                    raise
1242                else:
1243                    break
1244            else:
1245                raise exceptions[0]
1246
1247        protocol = protocol_factory()
1248        waiter = self.create_future()
1249        transport = self._make_datagram_transport(
1250            sock, protocol, r_addr, waiter)
1251        if self._debug:
1252            if local_addr:
1253                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1254                            "created: (%r, %r)",
1255                            local_addr, remote_addr, transport, protocol)
1256            else:
1257                logger.debug("Datagram endpoint remote_addr=%r created: "
1258                             "(%r, %r)",
1259                             remote_addr, transport, protocol)
1260
1261        try:
1262            await waiter
1263        except:
1264            transport.close()
1265            raise
1266
1267        return transport, protocol
1268
1269    async def _ensure_resolved(self, address, *,
1270                               family=0, type=socket.SOCK_STREAM,
1271                               proto=0, flags=0, loop):
1272        host, port = address[:2]
1273        info = _ipaddr_info(host, port, family, type, proto)
1274        if info is not None:
1275            # "host" is already a resolved IP.
1276            return [info]
1277        else:
1278            return await loop.getaddrinfo(host, port, family=family, type=type,
1279                                          proto=proto, flags=flags)
1280
1281    async def _create_server_getaddrinfo(self, host, port, family, flags):
1282        infos = await self._ensure_resolved((host, port), family=family,
1283                                            type=socket.SOCK_STREAM,
1284                                            flags=flags, loop=self)
1285        if not infos:
1286            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1287        return infos
1288
1289    async def create_server(
1290            self, protocol_factory, host=None, port=None,
1291            *,
1292            family=socket.AF_UNSPEC,
1293            flags=socket.AI_PASSIVE,
1294            sock=None,
1295            backlog=100,
1296            ssl=None,
1297            reuse_address=None,
1298            reuse_port=None,
1299            ssl_handshake_timeout=None,
1300            start_serving=True):
1301        """Create a TCP server.
1302
1303        The host parameter can be a string, in that case the TCP server is
1304        bound to host and port.
1305
1306        The host parameter can also be a sequence of strings and in that case
1307        the TCP server is bound to all hosts of the sequence. If a host
1308        appears multiple times (possibly indirectly e.g. when hostnames
1309        resolve to the same IP address), the server is only bound once to that
1310        host.
1311
1312        Return a Server object which can be used to stop the service.
1313
1314        This method is a coroutine.
1315        """
1316        if isinstance(ssl, bool):
1317            raise TypeError('ssl argument must be an SSLContext or None')
1318
1319        if ssl_handshake_timeout is not None and ssl is None:
1320            raise ValueError(
1321                'ssl_handshake_timeout is only meaningful with ssl')
1322
1323        if host is not None or port is not None:
1324            if sock is not None:
1325                raise ValueError(
1326                    'host/port and sock can not be specified at the same time')
1327
1328            if reuse_address is None:
1329                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1330            sockets = []
1331            if host == '':
1332                hosts = [None]
1333            elif (isinstance(host, str) or
1334                  not isinstance(host, collections.abc.Iterable)):
1335                hosts = [host]
1336            else:
1337                hosts = host
1338
1339            fs = [self._create_server_getaddrinfo(host, port, family=family,
1340                                                  flags=flags)
1341                  for host in hosts]
1342            infos = await tasks.gather(*fs, loop=self)
1343            infos = set(itertools.chain.from_iterable(infos))
1344
1345            completed = False
1346            try:
1347                for res in infos:
1348                    af, socktype, proto, canonname, sa = res
1349                    try:
1350                        sock = socket.socket(af, socktype, proto)
1351                    except socket.error:
1352                        # Assume it's a bad family/type/protocol combination.
1353                        if self._debug:
1354                            logger.warning('create_server() failed to create '
1355                                           'socket.socket(%r, %r, %r)',
1356                                           af, socktype, proto, exc_info=True)
1357                        continue
1358                    sockets.append(sock)
1359                    if reuse_address:
1360                        sock.setsockopt(
1361                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1362                    if reuse_port:
1363                        _set_reuseport(sock)
1364                    # Disable IPv4/IPv6 dual stack support (enabled by
1365                    # default on Linux) which makes a single socket
1366                    # listen on both address families.
1367                    if (_HAS_IPv6 and
1368                            af == socket.AF_INET6 and
1369                            hasattr(socket, 'IPPROTO_IPV6')):
1370                        sock.setsockopt(socket.IPPROTO_IPV6,
1371                                        socket.IPV6_V6ONLY,
1372                                        True)
1373                    try:
1374                        sock.bind(sa)
1375                    except OSError as err:
1376                        raise OSError(err.errno, 'error while attempting '
1377                                      'to bind on address %r: %s'
1378                                      % (sa, err.strerror.lower())) from None
1379                completed = True
1380            finally:
1381                if not completed:
1382                    for sock in sockets:
1383                        sock.close()
1384        else:
1385            if sock is None:
1386                raise ValueError('Neither host/port nor sock were specified')
1387            if sock.type != socket.SOCK_STREAM:
1388                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1389            sockets = [sock]
1390
1391        for sock in sockets:
1392            sock.setblocking(False)
1393
1394        server = Server(self, sockets, protocol_factory,
1395                        ssl, backlog, ssl_handshake_timeout)
1396        if start_serving:
1397            server._start_serving()
1398            # Skip one loop iteration so that all 'loop.add_reader'
1399            # go through.
1400            await tasks.sleep(0, loop=self)
1401
1402        if self._debug:
1403            logger.info("%r is serving", server)
1404        return server
1405
1406    async def connect_accepted_socket(
1407            self, protocol_factory, sock,
1408            *, ssl=None,
1409            ssl_handshake_timeout=None):
1410        """Handle an accepted connection.
1411
1412        This is used by servers that accept connections outside of
1413        asyncio but that use asyncio to handle connections.
1414
1415        This method is a coroutine.  When completed, the coroutine
1416        returns a (transport, protocol) pair.
1417        """
1418        if sock.type != socket.SOCK_STREAM:
1419            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1420
1421        if ssl_handshake_timeout is not None and not ssl:
1422            raise ValueError(
1423                'ssl_handshake_timeout is only meaningful with ssl')
1424
1425        transport, protocol = await self._create_connection_transport(
1426            sock, protocol_factory, ssl, '', server_side=True,
1427            ssl_handshake_timeout=ssl_handshake_timeout)
1428        if self._debug:
1429            # Get the socket from the transport because SSL transport closes
1430            # the old socket and creates a new SSL socket
1431            sock = transport.get_extra_info('socket')
1432            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1433        return transport, protocol
1434
1435    async def connect_read_pipe(self, protocol_factory, pipe):
1436        protocol = protocol_factory()
1437        waiter = self.create_future()
1438        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1439
1440        try:
1441            await waiter
1442        except:
1443            transport.close()
1444            raise
1445
1446        if self._debug:
1447            logger.debug('Read pipe %r connected: (%r, %r)',
1448                         pipe.fileno(), transport, protocol)
1449        return transport, protocol
1450
1451    async def connect_write_pipe(self, protocol_factory, pipe):
1452        protocol = protocol_factory()
1453        waiter = self.create_future()
1454        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1455
1456        try:
1457            await waiter
1458        except:
1459            transport.close()
1460            raise
1461
1462        if self._debug:
1463            logger.debug('Write pipe %r connected: (%r, %r)',
1464                         pipe.fileno(), transport, protocol)
1465        return transport, protocol
1466
1467    def _log_subprocess(self, msg, stdin, stdout, stderr):
1468        info = [msg]
1469        if stdin is not None:
1470            info.append(f'stdin={_format_pipe(stdin)}')
1471        if stdout is not None and stderr == subprocess.STDOUT:
1472            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1473        else:
1474            if stdout is not None:
1475                info.append(f'stdout={_format_pipe(stdout)}')
1476            if stderr is not None:
1477                info.append(f'stderr={_format_pipe(stderr)}')
1478        logger.debug(' '.join(info))
1479
1480    async def subprocess_shell(self, protocol_factory, cmd, *,
1481                               stdin=subprocess.PIPE,
1482                               stdout=subprocess.PIPE,
1483                               stderr=subprocess.PIPE,
1484                               universal_newlines=False,
1485                               shell=True, bufsize=0,
1486                               **kwargs):
1487        if not isinstance(cmd, (bytes, str)):
1488            raise ValueError("cmd must be a string")
1489        if universal_newlines:
1490            raise ValueError("universal_newlines must be False")
1491        if not shell:
1492            raise ValueError("shell must be True")
1493        if bufsize != 0:
1494            raise ValueError("bufsize must be 0")
1495        protocol = protocol_factory()
1496        debug_log = None
1497        if self._debug:
1498            # don't log parameters: they may contain sensitive information
1499            # (password) and may be too long
1500            debug_log = 'run shell command %r' % cmd
1501            self._log_subprocess(debug_log, stdin, stdout, stderr)
1502        transport = await self._make_subprocess_transport(
1503            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1504        if self._debug and debug_log is not None:
1505            logger.info('%s: %r', debug_log, transport)
1506        return transport, protocol
1507
1508    async def subprocess_exec(self, protocol_factory, program, *args,
1509                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1510                              stderr=subprocess.PIPE, universal_newlines=False,
1511                              shell=False, bufsize=0, **kwargs):
1512        if universal_newlines:
1513            raise ValueError("universal_newlines must be False")
1514        if shell:
1515            raise ValueError("shell must be False")
1516        if bufsize != 0:
1517            raise ValueError("bufsize must be 0")
1518        popen_args = (program,) + args
1519        for arg in popen_args:
1520            if not isinstance(arg, (str, bytes)):
1521                raise TypeError(
1522                    f"program arguments must be a bytes or text string, "
1523                    f"not {type(arg).__name__}")
1524        protocol = protocol_factory()
1525        debug_log = None
1526        if self._debug:
1527            # don't log parameters: they may contain sensitive information
1528            # (password) and may be too long
1529            debug_log = f'execute program {program!r}'
1530            self._log_subprocess(debug_log, stdin, stdout, stderr)
1531        transport = await self._make_subprocess_transport(
1532            protocol, popen_args, False, stdin, stdout, stderr,
1533            bufsize, **kwargs)
1534        if self._debug and debug_log is not None:
1535            logger.info('%s: %r', debug_log, transport)
1536        return transport, protocol
1537
1538    def get_exception_handler(self):
1539        """Return an exception handler, or None if the default one is in use.
1540        """
1541        return self._exception_handler
1542
1543    def set_exception_handler(self, handler):
1544        """Set handler as the new event loop exception handler.
1545
1546        If handler is None, the default exception handler will
1547        be set.
1548
1549        If handler is a callable object, it should have a
1550        signature matching '(loop, context)', where 'loop'
1551        will be a reference to the active event loop, 'context'
1552        will be a dict object (see `call_exception_handler()`
1553        documentation for details about context).
1554        """
1555        if handler is not None and not callable(handler):
1556            raise TypeError(f'A callable object or None is expected, '
1557                            f'got {handler!r}')
1558        self._exception_handler = handler
1559
1560    def default_exception_handler(self, context):
1561        """Default exception handler.
1562
1563        This is called when an exception occurs and no exception
1564        handler is set, and can be called by a custom exception
1565        handler that wants to defer to the default behavior.
1566
1567        This default handler logs the error message and other
1568        context-dependent information.  In debug mode, a truncated
1569        stack trace is also appended showing where the given object
1570        (e.g. a handle or future or task) was created, if any.
1571
1572        The context parameter has the same meaning as in
1573        `call_exception_handler()`.
1574        """
1575        message = context.get('message')
1576        if not message:
1577            message = 'Unhandled exception in event loop'
1578
1579        exception = context.get('exception')
1580        if exception is not None:
1581            exc_info = (type(exception), exception, exception.__traceback__)
1582        else:
1583            exc_info = False
1584
1585        if ('source_traceback' not in context and
1586                self._current_handle is not None and
1587                self._current_handle._source_traceback):
1588            context['handle_traceback'] = \
1589                self._current_handle._source_traceback
1590
1591        log_lines = [message]
1592        for key in sorted(context):
1593            if key in {'message', 'exception'}:
1594                continue
1595            value = context[key]
1596            if key == 'source_traceback':
1597                tb = ''.join(traceback.format_list(value))
1598                value = 'Object created at (most recent call last):\n'
1599                value += tb.rstrip()
1600            elif key == 'handle_traceback':
1601                tb = ''.join(traceback.format_list(value))
1602                value = 'Handle created at (most recent call last):\n'
1603                value += tb.rstrip()
1604            else:
1605                value = repr(value)
1606            log_lines.append(f'{key}: {value}')
1607
1608        logger.error('\n'.join(log_lines), exc_info=exc_info)
1609
1610    def call_exception_handler(self, context):
1611        """Call the current event loop's exception handler.
1612
1613        The context argument is a dict containing the following keys:
1614
1615        - 'message': Error message;
1616        - 'exception' (optional): Exception object;
1617        - 'future' (optional): Future instance;
1618        - 'task' (optional): Task instance;
1619        - 'handle' (optional): Handle instance;
1620        - 'protocol' (optional): Protocol instance;
1621        - 'transport' (optional): Transport instance;
1622        - 'socket' (optional): Socket instance;
1623        - 'asyncgen' (optional): Asynchronous generator that caused
1624                                 the exception.
1625
1626        New keys maybe introduced in the future.
1627
1628        Note: do not overload this method in an event loop subclass.
1629        For custom exception handling, use the
1630        `set_exception_handler()` method.
1631        """
1632        if self._exception_handler is None:
1633            try:
1634                self.default_exception_handler(context)
1635            except Exception:
1636                # Second protection layer for unexpected errors
1637                # in the default implementation, as well as for subclassed
1638                # event loops with overloaded "default_exception_handler".
1639                logger.error('Exception in default exception handler',
1640                             exc_info=True)
1641        else:
1642            try:
1643                self._exception_handler(self, context)
1644            except Exception as exc:
1645                # Exception in the user set custom exception handler.
1646                try:
1647                    # Let's try default handler.
1648                    self.default_exception_handler({
1649                        'message': 'Unhandled error in exception handler',
1650                        'exception': exc,
1651                        'context': context,
1652                    })
1653                except Exception:
1654                    # Guard 'default_exception_handler' in case it is
1655                    # overloaded.
1656                    logger.error('Exception in default exception handler '
1657                                 'while handling an unexpected error '
1658                                 'in custom exception handler',
1659                                 exc_info=True)
1660
1661    def _add_callback(self, handle):
1662        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1663        assert isinstance(handle, events.Handle), 'A Handle is required here'
1664        if handle._cancelled:
1665            return
1666        assert not isinstance(handle, events.TimerHandle)
1667        self._ready.append(handle)
1668
1669    def _add_callback_signalsafe(self, handle):
1670        """Like _add_callback() but called from a signal handler."""
1671        self._add_callback(handle)
1672        self._write_to_self()
1673
1674    def _timer_handle_cancelled(self, handle):
1675        """Notification that a TimerHandle has been cancelled."""
1676        if handle._scheduled:
1677            self._timer_cancelled_count += 1
1678
1679    def _run_once(self):
1680        """Run one full iteration of the event loop.
1681
1682        This calls all currently ready callbacks, polls for I/O,
1683        schedules the resulting callbacks, and finally schedules
1684        'call_later' callbacks.
1685        """
1686
1687        sched_count = len(self._scheduled)
1688        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1689            self._timer_cancelled_count / sched_count >
1690                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1691            # Remove delayed calls that were cancelled if their number
1692            # is too high
1693            new_scheduled = []
1694            for handle in self._scheduled:
1695                if handle._cancelled:
1696                    handle._scheduled = False
1697                else:
1698                    new_scheduled.append(handle)
1699
1700            heapq.heapify(new_scheduled)
1701            self._scheduled = new_scheduled
1702            self._timer_cancelled_count = 0
1703        else:
1704            # Remove delayed calls that were cancelled from head of queue.
1705            while self._scheduled and self._scheduled[0]._cancelled:
1706                self._timer_cancelled_count -= 1
1707                handle = heapq.heappop(self._scheduled)
1708                handle._scheduled = False
1709
1710        timeout = None
1711        if self._ready or self._stopping:
1712            timeout = 0
1713        elif self._scheduled:
1714            # Compute the desired timeout.
1715            when = self._scheduled[0]._when
1716            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1717
1718        if self._debug and timeout != 0:
1719            t0 = self.time()
1720            event_list = self._selector.select(timeout)
1721            dt = self.time() - t0
1722            if dt >= 1.0:
1723                level = logging.INFO
1724            else:
1725                level = logging.DEBUG
1726            nevent = len(event_list)
1727            if timeout is None:
1728                logger.log(level, 'poll took %.3f ms: %s events',
1729                           dt * 1e3, nevent)
1730            elif nevent:
1731                logger.log(level,
1732                           'poll %.3f ms took %.3f ms: %s events',
1733                           timeout * 1e3, dt * 1e3, nevent)
1734            elif dt >= 1.0:
1735                logger.log(level,
1736                           'poll %.3f ms took %.3f ms: timeout',
1737                           timeout * 1e3, dt * 1e3)
1738        else:
1739            event_list = self._selector.select(timeout)
1740        self._process_events(event_list)
1741
1742        # Handle 'later' callbacks that are ready.
1743        end_time = self.time() + self._clock_resolution
1744        while self._scheduled:
1745            handle = self._scheduled[0]
1746            if handle._when >= end_time:
1747                break
1748            handle = heapq.heappop(self._scheduled)
1749            handle._scheduled = False
1750            self._ready.append(handle)
1751
1752        # This is the only place where callbacks are actually *called*.
1753        # All other places just add them to ready.
1754        # Note: We run all currently scheduled callbacks, but not any
1755        # callbacks scheduled by callbacks run this time around --
1756        # they will be run the next time (after another I/O poll).
1757        # Use an idiom that is thread-safe without using locks.
1758        ntodo = len(self._ready)
1759        for i in range(ntodo):
1760            handle = self._ready.popleft()
1761            if handle._cancelled:
1762                continue
1763            if self._debug:
1764                try:
1765                    self._current_handle = handle
1766                    t0 = self.time()
1767                    handle._run()
1768                    dt = self.time() - t0
1769                    if dt >= self.slow_callback_duration:
1770                        logger.warning('Executing %s took %.3f seconds',
1771                                       _format_handle(handle), dt)
1772                finally:
1773                    self._current_handle = None
1774            else:
1775                handle._run()
1776        handle = None  # Needed to break cycles when an exception occurs.
1777
1778    def _set_coroutine_origin_tracking(self, enabled):
1779        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1780            return
1781
1782        if enabled:
1783            self._coroutine_origin_tracking_saved_depth = (
1784                sys.get_coroutine_origin_tracking_depth())
1785            sys.set_coroutine_origin_tracking_depth(
1786                constants.DEBUG_STACK_DEPTH)
1787        else:
1788            sys.set_coroutine_origin_tracking_depth(
1789                self._coroutine_origin_tracking_saved_depth)
1790
1791        self._coroutine_origin_tracking_enabled = enabled
1792
1793    def get_debug(self):
1794        return self._debug
1795
1796    def set_debug(self, enabled):
1797        self._debug = enabled
1798
1799        if self.is_running():
1800            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1801