1"""Event loop and event loop policy."""
2
3__all__ = (
4    'AbstractEventLoopPolicy',
5    'AbstractEventLoop', 'AbstractServer',
6    'Handle', 'TimerHandle',
7    'get_event_loop_policy', 'set_event_loop_policy',
8    'get_event_loop', 'set_event_loop', 'new_event_loop',
9    'get_child_watcher', 'set_child_watcher',
10    '_set_running_loop', 'get_running_loop',
11    '_get_running_loop',
12)
13
14import contextvars
15import os
16import socket
17import subprocess
18import sys
19import threading
20
21from . import format_helpers
22
23
24class Handle:
25    """Object returned by callback registration methods."""
26
27    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
28                 '_source_traceback', '_repr', '__weakref__',
29                 '_context')
30
31    def __init__(self, callback, args, loop, context=None):
32        if context is None:
33            context = contextvars.copy_context()
34        self._context = context
35        self._loop = loop
36        self._callback = callback
37        self._args = args
38        self._cancelled = False
39        self._repr = None
40        if self._loop.get_debug():
41            self._source_traceback = format_helpers.extract_stack(
42                sys._getframe(1))
43        else:
44            self._source_traceback = None
45
46    def _repr_info(self):
47        info = [self.__class__.__name__]
48        if self._cancelled:
49            info.append('cancelled')
50        if self._callback is not None:
51            info.append(format_helpers._format_callback_source(
52                self._callback, self._args))
53        if self._source_traceback:
54            frame = self._source_traceback[-1]
55            info.append(f'created at {frame[0]}:{frame[1]}')
56        return info
57
58    def __repr__(self):
59        if self._repr is not None:
60            return self._repr
61        info = self._repr_info()
62        return '<{}>'.format(' '.join(info))
63
64    def cancel(self):
65        if not self._cancelled:
66            self._cancelled = True
67            if self._loop.get_debug():
68                # Keep a representation in debug mode to keep callback and
69                # parameters. For example, to log the warning
70                # "Executing <Handle...> took 2.5 second"
71                self._repr = repr(self)
72            self._callback = None
73            self._args = None
74
75    def cancelled(self):
76        return self._cancelled
77
78    def _run(self):
79        try:
80            self._context.run(self._callback, *self._args)
81        except (SystemExit, KeyboardInterrupt):
82            raise
83        except BaseException as exc:
84            cb = format_helpers._format_callback_source(
85                self._callback, self._args)
86            msg = f'Exception in callback {cb}'
87            context = {
88                'message': msg,
89                'exception': exc,
90                'handle': self,
91            }
92            if self._source_traceback:
93                context['source_traceback'] = self._source_traceback
94            self._loop.call_exception_handler(context)
95        self = None  # Needed to break cycles when an exception occurs.
96
97
98class TimerHandle(Handle):
99    """Object returned by timed callback registration methods."""
100
101    __slots__ = ['_scheduled', '_when']
102
103    def __init__(self, when, callback, args, loop, context=None):
104        assert when is not None
105        super().__init__(callback, args, loop, context)
106        if self._source_traceback:
107            del self._source_traceback[-1]
108        self._when = when
109        self._scheduled = False
110
111    def _repr_info(self):
112        info = super()._repr_info()
113        pos = 2 if self._cancelled else 1
114        info.insert(pos, f'when={self._when}')
115        return info
116
117    def __hash__(self):
118        return hash(self._when)
119
120    def __lt__(self, other):
121        if isinstance(other, TimerHandle):
122            return self._when < other._when
123        return NotImplemented
124
125    def __le__(self, other):
126        if isinstance(other, TimerHandle):
127            return self._when < other._when or self.__eq__(other)
128        return NotImplemented
129
130    def __gt__(self, other):
131        if isinstance(other, TimerHandle):
132            return self._when > other._when
133        return NotImplemented
134
135    def __ge__(self, other):
136        if isinstance(other, TimerHandle):
137            return self._when > other._when or self.__eq__(other)
138        return NotImplemented
139
140    def __eq__(self, other):
141        if isinstance(other, TimerHandle):
142            return (self._when == other._when and
143                    self._callback == other._callback and
144                    self._args == other._args and
145                    self._cancelled == other._cancelled)
146        return NotImplemented
147
148    def cancel(self):
149        if not self._cancelled:
150            self._loop._timer_handle_cancelled(self)
151        super().cancel()
152
153    def when(self):
154        """Return a scheduled callback time.
155
156        The time is an absolute timestamp, using the same time
157        reference as loop.time().
158        """
159        return self._when
160
161
162class AbstractServer:
163    """Abstract server returned by create_server()."""
164
165    def close(self):
166        """Stop serving.  This leaves existing connections open."""
167        raise NotImplementedError
168
169    def get_loop(self):
170        """Get the event loop the Server object is attached to."""
171        raise NotImplementedError
172
173    def is_serving(self):
174        """Return True if the server is accepting connections."""
175        raise NotImplementedError
176
177    async def start_serving(self):
178        """Start accepting connections.
179
180        This method is idempotent, so it can be called when
181        the server is already being serving.
182        """
183        raise NotImplementedError
184
185    async def serve_forever(self):
186        """Start accepting connections until the coroutine is cancelled.
187
188        The server is closed when the coroutine is cancelled.
189        """
190        raise NotImplementedError
191
192    async def wait_closed(self):
193        """Coroutine to wait until service is closed."""
194        raise NotImplementedError
195
196    async def __aenter__(self):
197        return self
198
199    async def __aexit__(self, *exc):
200        self.close()
201        await self.wait_closed()
202
203
204class AbstractEventLoop:
205    """Abstract event loop."""
206
207    # Running and stopping the event loop.
208
209    def run_forever(self):
210        """Run the event loop until stop() is called."""
211        raise NotImplementedError
212
213    def run_until_complete(self, future):
214        """Run the event loop until a Future is done.
215
216        Return the Future's result, or raise its exception.
217        """
218        raise NotImplementedError
219
220    def stop(self):
221        """Stop the event loop as soon as reasonable.
222
223        Exactly how soon that is may depend on the implementation, but
224        no more I/O callbacks should be scheduled.
225        """
226        raise NotImplementedError
227
228    def is_running(self):
229        """Return whether the event loop is currently running."""
230        raise NotImplementedError
231
232    def is_closed(self):
233        """Returns True if the event loop was closed."""
234        raise NotImplementedError
235
236    def close(self):
237        """Close the loop.
238
239        The loop should not be running.
240
241        This is idempotent and irreversible.
242
243        No other methods should be called after this one.
244        """
245        raise NotImplementedError
246
247    async def shutdown_asyncgens(self):
248        """Shutdown all active asynchronous generators."""
249        raise NotImplementedError
250
251    async def shutdown_default_executor(self):
252        """Schedule the shutdown of the default executor."""
253        raise NotImplementedError
254
255    # Methods scheduling callbacks.  All these return Handles.
256
257    def _timer_handle_cancelled(self, handle):
258        """Notification that a TimerHandle has been cancelled."""
259        raise NotImplementedError
260
261    def call_soon(self, callback, *args):
262        return self.call_later(0, callback, *args)
263
264    def call_later(self, delay, callback, *args):
265        raise NotImplementedError
266
267    def call_at(self, when, callback, *args):
268        raise NotImplementedError
269
270    def time(self):
271        raise NotImplementedError
272
273    def create_future(self):
274        raise NotImplementedError
275
276    # Method scheduling a coroutine object: create a task.
277
278    def create_task(self, coro, *, name=None):
279        raise NotImplementedError
280
281    # Methods for interacting with threads.
282
283    def call_soon_threadsafe(self, callback, *args):
284        raise NotImplementedError
285
286    def run_in_executor(self, executor, func, *args):
287        raise NotImplementedError
288
289    def set_default_executor(self, executor):
290        raise NotImplementedError
291
292    # Network I/O methods returning Futures.
293
294    async def getaddrinfo(self, host, port, *,
295                          family=0, type=0, proto=0, flags=0):
296        raise NotImplementedError
297
298    async def getnameinfo(self, sockaddr, flags=0):
299        raise NotImplementedError
300
301    async def create_connection(
302            self, protocol_factory, host=None, port=None,
303            *, ssl=None, family=0, proto=0,
304            flags=0, sock=None, local_addr=None,
305            server_hostname=None,
306            ssl_handshake_timeout=None,
307            happy_eyeballs_delay=None, interleave=None):
308        raise NotImplementedError
309
310    async def create_server(
311            self, protocol_factory, host=None, port=None,
312            *, family=socket.AF_UNSPEC,
313            flags=socket.AI_PASSIVE, sock=None, backlog=100,
314            ssl=None, reuse_address=None, reuse_port=None,
315            ssl_handshake_timeout=None,
316            start_serving=True):
317        """A coroutine which creates a TCP server bound to host and port.
318
319        The return value is a Server object which can be used to stop
320        the service.
321
322        If host is an empty string or None all interfaces are assumed
323        and a list of multiple sockets will be returned (most likely
324        one for IPv4 and another one for IPv6). The host parameter can also be
325        a sequence (e.g. list) of hosts to bind to.
326
327        family can be set to either AF_INET or AF_INET6 to force the
328        socket to use IPv4 or IPv6. If not set it will be determined
329        from host (defaults to AF_UNSPEC).
330
331        flags is a bitmask for getaddrinfo().
332
333        sock can optionally be specified in order to use a preexisting
334        socket object.
335
336        backlog is the maximum number of queued connections passed to
337        listen() (defaults to 100).
338
339        ssl can be set to an SSLContext to enable SSL over the
340        accepted connections.
341
342        reuse_address tells the kernel to reuse a local socket in
343        TIME_WAIT state, without waiting for its natural timeout to
344        expire. If not specified will automatically be set to True on
345        UNIX.
346
347        reuse_port tells the kernel to allow this endpoint to be bound to
348        the same port as other existing endpoints are bound to, so long as
349        they all set this flag when being created. This option is not
350        supported on Windows.
351
352        ssl_handshake_timeout is the time in seconds that an SSL server
353        will wait for completion of the SSL handshake before aborting the
354        connection. Default is 60s.
355
356        start_serving set to True (default) causes the created server
357        to start accepting connections immediately.  When set to False,
358        the user should await Server.start_serving() or Server.serve_forever()
359        to make the server to start accepting connections.
360        """
361        raise NotImplementedError
362
363    async def sendfile(self, transport, file, offset=0, count=None,
364                       *, fallback=True):
365        """Send a file through a transport.
366
367        Return an amount of sent bytes.
368        """
369        raise NotImplementedError
370
371    async def start_tls(self, transport, protocol, sslcontext, *,
372                        server_side=False,
373                        server_hostname=None,
374                        ssl_handshake_timeout=None):
375        """Upgrade a transport to TLS.
376
377        Return a new transport that *protocol* should start using
378        immediately.
379        """
380        raise NotImplementedError
381
382    async def create_unix_connection(
383            self, protocol_factory, path=None, *,
384            ssl=None, sock=None,
385            server_hostname=None,
386            ssl_handshake_timeout=None):
387        raise NotImplementedError
388
389    async def create_unix_server(
390            self, protocol_factory, path=None, *,
391            sock=None, backlog=100, ssl=None,
392            ssl_handshake_timeout=None,
393            start_serving=True):
394        """A coroutine which creates a UNIX Domain Socket server.
395
396        The return value is a Server object, which can be used to stop
397        the service.
398
399        path is a str, representing a file system path to bind the
400        server socket to.
401
402        sock can optionally be specified in order to use a preexisting
403        socket object.
404
405        backlog is the maximum number of queued connections passed to
406        listen() (defaults to 100).
407
408        ssl can be set to an SSLContext to enable SSL over the
409        accepted connections.
410
411        ssl_handshake_timeout is the time in seconds that an SSL server
412        will wait for the SSL handshake to complete (defaults to 60s).
413
414        start_serving set to True (default) causes the created server
415        to start accepting connections immediately.  When set to False,
416        the user should await Server.start_serving() or Server.serve_forever()
417        to make the server to start accepting connections.
418        """
419        raise NotImplementedError
420
421    async def create_datagram_endpoint(self, protocol_factory,
422                                       local_addr=None, remote_addr=None, *,
423                                       family=0, proto=0, flags=0,
424                                       reuse_address=None, reuse_port=None,
425                                       allow_broadcast=None, sock=None):
426        """A coroutine which creates a datagram endpoint.
427
428        This method will try to establish the endpoint in the background.
429        When successful, the coroutine returns a (transport, protocol) pair.
430
431        protocol_factory must be a callable returning a protocol instance.
432
433        socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
434        host (or family if specified), socket type SOCK_DGRAM.
435
436        reuse_address tells the kernel to reuse a local socket in
437        TIME_WAIT state, without waiting for its natural timeout to
438        expire. If not specified it will automatically be set to True on
439        UNIX.
440
441        reuse_port tells the kernel to allow this endpoint to be bound to
442        the same port as other existing endpoints are bound to, so long as
443        they all set this flag when being created. This option is not
444        supported on Windows and some UNIX's. If the
445        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
446        capability is unsupported.
447
448        allow_broadcast tells the kernel to allow this endpoint to send
449        messages to the broadcast address.
450
451        sock can optionally be specified in order to use a preexisting
452        socket object.
453        """
454        raise NotImplementedError
455
456    # Pipes and subprocesses.
457
458    async def connect_read_pipe(self, protocol_factory, pipe):
459        """Register read pipe in event loop. Set the pipe to non-blocking mode.
460
461        protocol_factory should instantiate object with Protocol interface.
462        pipe is a file-like object.
463        Return pair (transport, protocol), where transport supports the
464        ReadTransport interface."""
465        # The reason to accept file-like object instead of just file descriptor
466        # is: we need to own pipe and close it at transport finishing
467        # Can got complicated errors if pass f.fileno(),
468        # close fd in pipe transport then close f and vise versa.
469        raise NotImplementedError
470
471    async def connect_write_pipe(self, protocol_factory, pipe):
472        """Register write pipe in event loop.
473
474        protocol_factory should instantiate object with BaseProtocol interface.
475        Pipe is file-like object already switched to nonblocking.
476        Return pair (transport, protocol), where transport support
477        WriteTransport interface."""
478        # The reason to accept file-like object instead of just file descriptor
479        # is: we need to own pipe and close it at transport finishing
480        # Can got complicated errors if pass f.fileno(),
481        # close fd in pipe transport then close f and vise versa.
482        raise NotImplementedError
483
484    async def subprocess_shell(self, protocol_factory, cmd, *,
485                               stdin=subprocess.PIPE,
486                               stdout=subprocess.PIPE,
487                               stderr=subprocess.PIPE,
488                               **kwargs):
489        raise NotImplementedError
490
491    async def subprocess_exec(self, protocol_factory, *args,
492                              stdin=subprocess.PIPE,
493                              stdout=subprocess.PIPE,
494                              stderr=subprocess.PIPE,
495                              **kwargs):
496        raise NotImplementedError
497
498    # Ready-based callback registration methods.
499    # The add_*() methods return None.
500    # The remove_*() methods return True if something was removed,
501    # False if there was nothing to delete.
502
503    def add_reader(self, fd, callback, *args):
504        raise NotImplementedError
505
506    def remove_reader(self, fd):
507        raise NotImplementedError
508
509    def add_writer(self, fd, callback, *args):
510        raise NotImplementedError
511
512    def remove_writer(self, fd):
513        raise NotImplementedError
514
515    # Completion based I/O methods returning Futures.
516
517    async def sock_recv(self, sock, nbytes):
518        raise NotImplementedError
519
520    async def sock_recv_into(self, sock, buf):
521        raise NotImplementedError
522
523    async def sock_sendall(self, sock, data):
524        raise NotImplementedError
525
526    async def sock_connect(self, sock, address):
527        raise NotImplementedError
528
529    async def sock_accept(self, sock):
530        raise NotImplementedError
531
532    async def sock_sendfile(self, sock, file, offset=0, count=None,
533                            *, fallback=None):
534        raise NotImplementedError
535
536    # Signal handling.
537
538    def add_signal_handler(self, sig, callback, *args):
539        raise NotImplementedError
540
541    def remove_signal_handler(self, sig):
542        raise NotImplementedError
543
544    # Task factory.
545
546    def set_task_factory(self, factory):
547        raise NotImplementedError
548
549    def get_task_factory(self):
550        raise NotImplementedError
551
552    # Error handlers.
553
554    def get_exception_handler(self):
555        raise NotImplementedError
556
557    def set_exception_handler(self, handler):
558        raise NotImplementedError
559
560    def default_exception_handler(self, context):
561        raise NotImplementedError
562
563    def call_exception_handler(self, context):
564        raise NotImplementedError
565
566    # Debug flag management.
567
568    def get_debug(self):
569        raise NotImplementedError
570
571    def set_debug(self, enabled):
572        raise NotImplementedError
573
574
575class AbstractEventLoopPolicy:
576    """Abstract policy for accessing the event loop."""
577
578    def get_event_loop(self):
579        """Get the event loop for the current context.
580
581        Returns an event loop object implementing the BaseEventLoop interface,
582        or raises an exception in case no event loop has been set for the
583        current context and the current policy does not specify to create one.
584
585        It should never return None."""
586        raise NotImplementedError
587
588    def set_event_loop(self, loop):
589        """Set the event loop for the current context to loop."""
590        raise NotImplementedError
591
592    def new_event_loop(self):
593        """Create and return a new event loop object according to this
594        policy's rules. If there's need to set this loop as the event loop for
595        the current context, set_event_loop must be called explicitly."""
596        raise NotImplementedError
597
598    # Child processes handling (Unix only).
599
600    def get_child_watcher(self):
601        "Get the watcher for child processes."
602        raise NotImplementedError
603
604    def set_child_watcher(self, watcher):
605        """Set the watcher for child processes."""
606        raise NotImplementedError
607
608
609class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
610    """Default policy implementation for accessing the event loop.
611
612    In this policy, each thread has its own event loop.  However, we
613    only automatically create an event loop by default for the main
614    thread; other threads by default have no event loop.
615
616    Other policies may have different rules (e.g. a single global
617    event loop, or automatically creating an event loop per thread, or
618    using some other notion of context to which an event loop is
619    associated).
620    """
621
622    _loop_factory = None
623
624    class _Local(threading.local):
625        _loop = None
626        _set_called = False
627
628    def __init__(self):
629        self._local = self._Local()
630
631    def get_event_loop(self):
632        """Get the event loop for the current context.
633
634        Returns an instance of EventLoop or raises an exception.
635        """
636        if (self._local._loop is None and
637                not self._local._set_called and
638                threading.current_thread() is threading.main_thread()):
639            self.set_event_loop(self.new_event_loop())
640
641        if self._local._loop is None:
642            raise RuntimeError('There is no current event loop in thread %r.'
643                               % threading.current_thread().name)
644
645        return self._local._loop
646
647    def set_event_loop(self, loop):
648        """Set the event loop."""
649        self._local._set_called = True
650        assert loop is None or isinstance(loop, AbstractEventLoop)
651        self._local._loop = loop
652
653    def new_event_loop(self):
654        """Create a new event loop.
655
656        You must call set_event_loop() to make this the current event
657        loop.
658        """
659        return self._loop_factory()
660
661
662# Event loop policy.  The policy itself is always global, even if the
663# policy's rules say that there is an event loop per thread (or other
664# notion of context).  The default policy is installed by the first
665# call to get_event_loop_policy().
666_event_loop_policy = None
667
668# Lock for protecting the on-the-fly creation of the event loop policy.
669_lock = threading.Lock()
670
671
672# A TLS for the running event loop, used by _get_running_loop.
673class _RunningLoop(threading.local):
674    loop_pid = (None, None)
675
676
677_running_loop = _RunningLoop()
678
679
680def get_running_loop():
681    """Return the running event loop.  Raise a RuntimeError if there is none.
682
683    This function is thread-specific.
684    """
685    # NOTE: this function is implemented in C (see _asynciomodule.c)
686    loop = _get_running_loop()
687    if loop is None:
688        raise RuntimeError('no running event loop')
689    return loop
690
691
692def _get_running_loop():
693    """Return the running event loop or None.
694
695    This is a low-level function intended to be used by event loops.
696    This function is thread-specific.
697    """
698    # NOTE: this function is implemented in C (see _asynciomodule.c)
699    running_loop, pid = _running_loop.loop_pid
700    if running_loop is not None and pid == os.getpid():
701        return running_loop
702
703
704def _set_running_loop(loop):
705    """Set the running event loop.
706
707    This is a low-level function intended to be used by event loops.
708    This function is thread-specific.
709    """
710    # NOTE: this function is implemented in C (see _asynciomodule.c)
711    _running_loop.loop_pid = (loop, os.getpid())
712
713
714def _init_event_loop_policy():
715    global _event_loop_policy
716    with _lock:
717        if _event_loop_policy is None:  # pragma: no branch
718            from . import DefaultEventLoopPolicy
719            _event_loop_policy = DefaultEventLoopPolicy()
720
721
722def get_event_loop_policy():
723    """Get the current event loop policy."""
724    if _event_loop_policy is None:
725        _init_event_loop_policy()
726    return _event_loop_policy
727
728
729def set_event_loop_policy(policy):
730    """Set the current event loop policy.
731
732    If policy is None, the default policy is restored."""
733    global _event_loop_policy
734    assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
735    _event_loop_policy = policy
736
737
738def get_event_loop():
739    """Return an asyncio event loop.
740
741    When called from a coroutine or a callback (e.g. scheduled with call_soon
742    or similar API), this function will always return the running event loop.
743
744    If there is no running event loop set, the function will return
745    the result of `get_event_loop_policy().get_event_loop()` call.
746    """
747    # NOTE: this function is implemented in C (see _asynciomodule.c)
748    current_loop = _get_running_loop()
749    if current_loop is not None:
750        return current_loop
751    return get_event_loop_policy().get_event_loop()
752
753
754def set_event_loop(loop):
755    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
756    get_event_loop_policy().set_event_loop(loop)
757
758
759def new_event_loop():
760    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
761    return get_event_loop_policy().new_event_loop()
762
763
764def get_child_watcher():
765    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
766    return get_event_loop_policy().get_child_watcher()
767
768
769def set_child_watcher(watcher):
770    """Equivalent to calling
771    get_event_loop_policy().set_child_watcher(watcher)."""
772    return get_event_loop_policy().set_child_watcher(watcher)
773
774
775# Alias pure-Python implementations for testing purposes.
776_py__get_running_loop = _get_running_loop
777_py__set_running_loop = _set_running_loop
778_py_get_running_loop = get_running_loop
779_py_get_event_loop = get_event_loop
780
781
782try:
783    # get_event_loop() is one of the most frequently called
784    # functions in asyncio.  Pure Python implementation is
785    # about 4 times slower than C-accelerated.
786    from _asyncio import (_get_running_loop, _set_running_loop,
787                          get_running_loop, get_event_loop)
788except ImportError:
789    pass
790else:
791    # Alias C implementations for testing purposes.
792    _c__get_running_loop = _get_running_loop
793    _c__set_running_loop = _set_running_loop
794    _c_get_running_loop = get_running_loop
795    _c_get_event_loop = get_event_loop
796