1"""Event loop and event loop policy."""
2
3__all__ = (
4    'AbstractEventLoopPolicy',
5    'AbstractEventLoop', 'AbstractServer',
6    'Handle', 'TimerHandle', 'SendfileNotAvailableError',
7    'get_event_loop_policy', 'set_event_loop_policy',
8    'get_event_loop', 'set_event_loop', 'new_event_loop',
9    'get_child_watcher', 'set_child_watcher',
10    '_set_running_loop', 'get_running_loop',
11    '_get_running_loop',
12)
13
14import contextvars
15import os
16import socket
17import subprocess
18import sys
19import threading
20
21from . import format_helpers
22
23
24class SendfileNotAvailableError(RuntimeError):
25    """Sendfile syscall is not available.
26
27    Raised if OS does not support sendfile syscall for given socket or
28    file type.
29    """
30
31
32class Handle:
33    """Object returned by callback registration methods."""
34
35    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
36                 '_source_traceback', '_repr', '__weakref__',
37                 '_context')
38
39    def __init__(self, callback, args, loop, context=None):
40        if context is None:
41            context = contextvars.copy_context()
42        self._context = context
43        self._loop = loop
44        self._callback = callback
45        self._args = args
46        self._cancelled = False
47        self._repr = None
48        if self._loop.get_debug():
49            self._source_traceback = format_helpers.extract_stack(
50                sys._getframe(1))
51        else:
52            self._source_traceback = None
53
54    def _repr_info(self):
55        info = [self.__class__.__name__]
56        if self._cancelled:
57            info.append('cancelled')
58        if self._callback is not None:
59            info.append(format_helpers._format_callback_source(
60                self._callback, self._args))
61        if self._source_traceback:
62            frame = self._source_traceback[-1]
63            info.append(f'created at {frame[0]}:{frame[1]}')
64        return info
65
66    def __repr__(self):
67        if self._repr is not None:
68            return self._repr
69        info = self._repr_info()
70        return '<{}>'.format(' '.join(info))
71
72    def cancel(self):
73        if not self._cancelled:
74            self._cancelled = True
75            if self._loop.get_debug():
76                # Keep a representation in debug mode to keep callback and
77                # parameters. For example, to log the warning
78                # "Executing <Handle...> took 2.5 second"
79                self._repr = repr(self)
80            self._callback = None
81            self._args = None
82
83    def cancelled(self):
84        return self._cancelled
85
86    def _run(self):
87        try:
88            self._context.run(self._callback, *self._args)
89        except Exception as exc:
90            cb = format_helpers._format_callback_source(
91                self._callback, self._args)
92            msg = f'Exception in callback {cb}'
93            context = {
94                'message': msg,
95                'exception': exc,
96                'handle': self,
97            }
98            if self._source_traceback:
99                context['source_traceback'] = self._source_traceback
100            self._loop.call_exception_handler(context)
101        self = None  # Needed to break cycles when an exception occurs.
102
103
104class TimerHandle(Handle):
105    """Object returned by timed callback registration methods."""
106
107    __slots__ = ['_scheduled', '_when']
108
109    def __init__(self, when, callback, args, loop, context=None):
110        assert when is not None
111        super().__init__(callback, args, loop, context)
112        if self._source_traceback:
113            del self._source_traceback[-1]
114        self._when = when
115        self._scheduled = False
116
117    def _repr_info(self):
118        info = super()._repr_info()
119        pos = 2 if self._cancelled else 1
120        info.insert(pos, f'when={self._when}')
121        return info
122
123    def __hash__(self):
124        return hash(self._when)
125
126    def __lt__(self, other):
127        return self._when < other._when
128
129    def __le__(self, other):
130        if self._when < other._when:
131            return True
132        return self.__eq__(other)
133
134    def __gt__(self, other):
135        return self._when > other._when
136
137    def __ge__(self, other):
138        if self._when > other._when:
139            return True
140        return self.__eq__(other)
141
142    def __eq__(self, other):
143        if isinstance(other, TimerHandle):
144            return (self._when == other._when and
145                    self._callback == other._callback and
146                    self._args == other._args and
147                    self._cancelled == other._cancelled)
148        return NotImplemented
149
150    def __ne__(self, other):
151        equal = self.__eq__(other)
152        return NotImplemented if equal is NotImplemented else not equal
153
154    def cancel(self):
155        if not self._cancelled:
156            self._loop._timer_handle_cancelled(self)
157        super().cancel()
158
159    def when(self):
160        """Return a scheduled callback time.
161
162        The time is an absolute timestamp, using the same time
163        reference as loop.time().
164        """
165        return self._when
166
167
168class AbstractServer:
169    """Abstract server returned by create_server()."""
170
171    def close(self):
172        """Stop serving.  This leaves existing connections open."""
173        raise NotImplementedError
174
175    def get_loop(self):
176        """Get the event loop the Server object is attached to."""
177        raise NotImplementedError
178
179    def is_serving(self):
180        """Return True if the server is accepting connections."""
181        raise NotImplementedError
182
183    async def start_serving(self):
184        """Start accepting connections.
185
186        This method is idempotent, so it can be called when
187        the server is already being serving.
188        """
189        raise NotImplementedError
190
191    async def serve_forever(self):
192        """Start accepting connections until the coroutine is cancelled.
193
194        The server is closed when the coroutine is cancelled.
195        """
196        raise NotImplementedError
197
198    async def wait_closed(self):
199        """Coroutine to wait until service is closed."""
200        raise NotImplementedError
201
202    async def __aenter__(self):
203        return self
204
205    async def __aexit__(self, *exc):
206        self.close()
207        await self.wait_closed()
208
209
210class AbstractEventLoop:
211    """Abstract event loop."""
212
213    # Running and stopping the event loop.
214
215    def run_forever(self):
216        """Run the event loop until stop() is called."""
217        raise NotImplementedError
218
219    def run_until_complete(self, future):
220        """Run the event loop until a Future is done.
221
222        Return the Future's result, or raise its exception.
223        """
224        raise NotImplementedError
225
226    def stop(self):
227        """Stop the event loop as soon as reasonable.
228
229        Exactly how soon that is may depend on the implementation, but
230        no more I/O callbacks should be scheduled.
231        """
232        raise NotImplementedError
233
234    def is_running(self):
235        """Return whether the event loop is currently running."""
236        raise NotImplementedError
237
238    def is_closed(self):
239        """Returns True if the event loop was closed."""
240        raise NotImplementedError
241
242    def close(self):
243        """Close the loop.
244
245        The loop should not be running.
246
247        This is idempotent and irreversible.
248
249        No other methods should be called after this one.
250        """
251        raise NotImplementedError
252
253    async def shutdown_asyncgens(self):
254        """Shutdown all active asynchronous generators."""
255        raise NotImplementedError
256
257    # Methods scheduling callbacks.  All these return Handles.
258
259    def _timer_handle_cancelled(self, handle):
260        """Notification that a TimerHandle has been cancelled."""
261        raise NotImplementedError
262
263    def call_soon(self, callback, *args):
264        return self.call_later(0, callback, *args)
265
266    def call_later(self, delay, callback, *args):
267        raise NotImplementedError
268
269    def call_at(self, when, callback, *args):
270        raise NotImplementedError
271
272    def time(self):
273        raise NotImplementedError
274
275    def create_future(self):
276        raise NotImplementedError
277
278    # Method scheduling a coroutine object: create a task.
279
280    def create_task(self, coro):
281        raise NotImplementedError
282
283    # Methods for interacting with threads.
284
285    def call_soon_threadsafe(self, callback, *args):
286        raise NotImplementedError
287
288    async def run_in_executor(self, executor, func, *args):
289        raise NotImplementedError
290
291    def set_default_executor(self, executor):
292        raise NotImplementedError
293
294    # Network I/O methods returning Futures.
295
296    async def getaddrinfo(self, host, port, *,
297                          family=0, type=0, proto=0, flags=0):
298        raise NotImplementedError
299
300    async def getnameinfo(self, sockaddr, flags=0):
301        raise NotImplementedError
302
303    async def create_connection(
304            self, protocol_factory, host=None, port=None,
305            *, ssl=None, family=0, proto=0,
306            flags=0, sock=None, local_addr=None,
307            server_hostname=None,
308            ssl_handshake_timeout=None):
309        raise NotImplementedError
310
311    async def create_server(
312            self, protocol_factory, host=None, port=None,
313            *, family=socket.AF_UNSPEC,
314            flags=socket.AI_PASSIVE, sock=None, backlog=100,
315            ssl=None, reuse_address=None, reuse_port=None,
316            ssl_handshake_timeout=None,
317            start_serving=True):
318        """A coroutine which creates a TCP server bound to host and port.
319
320        The return value is a Server object which can be used to stop
321        the service.
322
323        If host is an empty string or None all interfaces are assumed
324        and a list of multiple sockets will be returned (most likely
325        one for IPv4 and another one for IPv6). The host parameter can also be
326        a sequence (e.g. list) of hosts to bind to.
327
328        family can be set to either AF_INET or AF_INET6 to force the
329        socket to use IPv4 or IPv6. If not set it will be determined
330        from host (defaults to AF_UNSPEC).
331
332        flags is a bitmask for getaddrinfo().
333
334        sock can optionally be specified in order to use a preexisting
335        socket object.
336
337        backlog is the maximum number of queued connections passed to
338        listen() (defaults to 100).
339
340        ssl can be set to an SSLContext to enable SSL over the
341        accepted connections.
342
343        reuse_address tells the kernel to reuse a local socket in
344        TIME_WAIT state, without waiting for its natural timeout to
345        expire. If not specified will automatically be set to True on
346        UNIX.
347
348        reuse_port tells the kernel to allow this endpoint to be bound to
349        the same port as other existing endpoints are bound to, so long as
350        they all set this flag when being created. This option is not
351        supported on Windows.
352
353        ssl_handshake_timeout is the time in seconds that an SSL server
354        will wait for completion of the SSL handshake before aborting the
355        connection. Default is 60s.
356
357        start_serving set to True (default) causes the created server
358        to start accepting connections immediately.  When set to False,
359        the user should await Server.start_serving() or Server.serve_forever()
360        to make the server to start accepting connections.
361        """
362        raise NotImplementedError
363
364    async def sendfile(self, transport, file, offset=0, count=None,
365                       *, fallback=True):
366        """Send a file through a transport.
367
368        Return an amount of sent bytes.
369        """
370        raise NotImplementedError
371
372    async def start_tls(self, transport, protocol, sslcontext, *,
373                        server_side=False,
374                        server_hostname=None,
375                        ssl_handshake_timeout=None):
376        """Upgrade a transport to TLS.
377
378        Return a new transport that *protocol* should start using
379        immediately.
380        """
381        raise NotImplementedError
382
383    async def create_unix_connection(
384            self, protocol_factory, path=None, *,
385            ssl=None, sock=None,
386            server_hostname=None,
387            ssl_handshake_timeout=None):
388        raise NotImplementedError
389
390    async def create_unix_server(
391            self, protocol_factory, path=None, *,
392            sock=None, backlog=100, ssl=None,
393            ssl_handshake_timeout=None,
394            start_serving=True):
395        """A coroutine which creates a UNIX Domain Socket server.
396
397        The return value is a Server object, which can be used to stop
398        the service.
399
400        path is a str, representing a file systsem path to bind the
401        server socket to.
402
403        sock can optionally be specified in order to use a preexisting
404        socket object.
405
406        backlog is the maximum number of queued connections passed to
407        listen() (defaults to 100).
408
409        ssl can be set to an SSLContext to enable SSL over the
410        accepted connections.
411
412        ssl_handshake_timeout is the time in seconds that an SSL server
413        will wait for the SSL handshake to complete (defaults to 60s).
414
415        start_serving set to True (default) causes the created server
416        to start accepting connections immediately.  When set to False,
417        the user should await Server.start_serving() or Server.serve_forever()
418        to make the server to start accepting connections.
419        """
420        raise NotImplementedError
421
422    async def create_datagram_endpoint(self, protocol_factory,
423                                       local_addr=None, remote_addr=None, *,
424                                       family=0, proto=0, flags=0,
425                                       reuse_address=None, reuse_port=None,
426                                       allow_broadcast=None, sock=None):
427        """A coroutine which creates a datagram endpoint.
428
429        This method will try to establish the endpoint in the background.
430        When successful, the coroutine returns a (transport, protocol) pair.
431
432        protocol_factory must be a callable returning a protocol instance.
433
434        socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
435        host (or family if specified), socket type SOCK_DGRAM.
436
437        reuse_address tells the kernel to reuse a local socket in
438        TIME_WAIT state, without waiting for its natural timeout to
439        expire. If not specified it will automatically be set to True on
440        UNIX.
441
442        reuse_port tells the kernel to allow this endpoint to be bound to
443        the same port as other existing endpoints are bound to, so long as
444        they all set this flag when being created. This option is not
445        supported on Windows and some UNIX's. If the
446        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
447        capability is unsupported.
448
449        allow_broadcast tells the kernel to allow this endpoint to send
450        messages to the broadcast address.
451
452        sock can optionally be specified in order to use a preexisting
453        socket object.
454        """
455        raise NotImplementedError
456
457    # Pipes and subprocesses.
458
459    async def connect_read_pipe(self, protocol_factory, pipe):
460        """Register read pipe in event loop. Set the pipe to non-blocking mode.
461
462        protocol_factory should instantiate object with Protocol interface.
463        pipe is a file-like object.
464        Return pair (transport, protocol), where transport supports the
465        ReadTransport interface."""
466        # The reason to accept file-like object instead of just file descriptor
467        # is: we need to own pipe and close it at transport finishing
468        # Can got complicated errors if pass f.fileno(),
469        # close fd in pipe transport then close f and vise versa.
470        raise NotImplementedError
471
472    async def connect_write_pipe(self, protocol_factory, pipe):
473        """Register write pipe in event loop.
474
475        protocol_factory should instantiate object with BaseProtocol interface.
476        Pipe is file-like object already switched to nonblocking.
477        Return pair (transport, protocol), where transport support
478        WriteTransport interface."""
479        # The reason to accept file-like object instead of just file descriptor
480        # is: we need to own pipe and close it at transport finishing
481        # Can got complicated errors if pass f.fileno(),
482        # close fd in pipe transport then close f and vise versa.
483        raise NotImplementedError
484
485    async def subprocess_shell(self, protocol_factory, cmd, *,
486                               stdin=subprocess.PIPE,
487                               stdout=subprocess.PIPE,
488                               stderr=subprocess.PIPE,
489                               **kwargs):
490        raise NotImplementedError
491
492    async def subprocess_exec(self, protocol_factory, *args,
493                              stdin=subprocess.PIPE,
494                              stdout=subprocess.PIPE,
495                              stderr=subprocess.PIPE,
496                              **kwargs):
497        raise NotImplementedError
498
499    # Ready-based callback registration methods.
500    # The add_*() methods return None.
501    # The remove_*() methods return True if something was removed,
502    # False if there was nothing to delete.
503
504    def add_reader(self, fd, callback, *args):
505        raise NotImplementedError
506
507    def remove_reader(self, fd):
508        raise NotImplementedError
509
510    def add_writer(self, fd, callback, *args):
511        raise NotImplementedError
512
513    def remove_writer(self, fd):
514        raise NotImplementedError
515
516    # Completion based I/O methods returning Futures.
517
518    async def sock_recv(self, sock, nbytes):
519        raise NotImplementedError
520
521    async def sock_recv_into(self, sock, buf):
522        raise NotImplementedError
523
524    async def sock_sendall(self, sock, data):
525        raise NotImplementedError
526
527    async def sock_connect(self, sock, address):
528        raise NotImplementedError
529
530    async def sock_accept(self, sock):
531        raise NotImplementedError
532
533    async def sock_sendfile(self, sock, file, offset=0, count=None,
534                            *, fallback=None):
535        raise NotImplementedError
536
537    # Signal handling.
538
539    def add_signal_handler(self, sig, callback, *args):
540        raise NotImplementedError
541
542    def remove_signal_handler(self, sig):
543        raise NotImplementedError
544
545    # Task factory.
546
547    def set_task_factory(self, factory):
548        raise NotImplementedError
549
550    def get_task_factory(self):
551        raise NotImplementedError
552
553    # Error handlers.
554
555    def get_exception_handler(self):
556        raise NotImplementedError
557
558    def set_exception_handler(self, handler):
559        raise NotImplementedError
560
561    def default_exception_handler(self, context):
562        raise NotImplementedError
563
564    def call_exception_handler(self, context):
565        raise NotImplementedError
566
567    # Debug flag management.
568
569    def get_debug(self):
570        raise NotImplementedError
571
572    def set_debug(self, enabled):
573        raise NotImplementedError
574
575
576class AbstractEventLoopPolicy:
577    """Abstract policy for accessing the event loop."""
578
579    def get_event_loop(self):
580        """Get the event loop for the current context.
581
582        Returns an event loop object implementing the BaseEventLoop interface,
583        or raises an exception in case no event loop has been set for the
584        current context and the current policy does not specify to create one.
585
586        It should never return None."""
587        raise NotImplementedError
588
589    def set_event_loop(self, loop):
590        """Set the event loop for the current context to loop."""
591        raise NotImplementedError
592
593    def new_event_loop(self):
594        """Create and return a new event loop object according to this
595        policy's rules. If there's need to set this loop as the event loop for
596        the current context, set_event_loop must be called explicitly."""
597        raise NotImplementedError
598
599    # Child processes handling (Unix only).
600
601    def get_child_watcher(self):
602        "Get the watcher for child processes."
603        raise NotImplementedError
604
605    def set_child_watcher(self, watcher):
606        """Set the watcher for child processes."""
607        raise NotImplementedError
608
609
610class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
611    """Default policy implementation for accessing the event loop.
612
613    In this policy, each thread has its own event loop.  However, we
614    only automatically create an event loop by default for the main
615    thread; other threads by default have no event loop.
616
617    Other policies may have different rules (e.g. a single global
618    event loop, or automatically creating an event loop per thread, or
619    using some other notion of context to which an event loop is
620    associated).
621    """
622
623    _loop_factory = None
624
625    class _Local(threading.local):
626        _loop = None
627        _set_called = False
628
629    def __init__(self):
630        self._local = self._Local()
631
632    def get_event_loop(self):
633        """Get the event loop.
634
635        This may be None or an instance of EventLoop.
636        """
637        if (self._local._loop is None and
638                not self._local._set_called and
639                isinstance(threading.current_thread(), threading._MainThread)):
640            self.set_event_loop(self.new_event_loop())
641
642        if self._local._loop is None:
643            raise RuntimeError('There is no current event loop in thread %r.'
644                               % threading.current_thread().name)
645
646        return self._local._loop
647
648    def set_event_loop(self, loop):
649        """Set the event loop."""
650        self._local._set_called = True
651        assert loop is None or isinstance(loop, AbstractEventLoop)
652        self._local._loop = loop
653
654    def new_event_loop(self):
655        """Create a new event loop.
656
657        You must call set_event_loop() to make this the current event
658        loop.
659        """
660        return self._loop_factory()
661
662
663# Event loop policy.  The policy itself is always global, even if the
664# policy's rules say that there is an event loop per thread (or other
665# notion of context).  The default policy is installed by the first
666# call to get_event_loop_policy().
667_event_loop_policy = None
668
669# Lock for protecting the on-the-fly creation of the event loop policy.
670_lock = threading.Lock()
671
672
673# A TLS for the running event loop, used by _get_running_loop.
674class _RunningLoop(threading.local):
675    loop_pid = (None, None)
676
677
678_running_loop = _RunningLoop()
679
680
681def get_running_loop():
682    """Return the running event loop.  Raise a RuntimeError if there is none.
683
684    This function is thread-specific.
685    """
686    # NOTE: this function is implemented in C (see _asynciomodule.c)
687    loop = _get_running_loop()
688    if loop is None:
689        raise RuntimeError('no running event loop')
690    return loop
691
692
693def _get_running_loop():
694    """Return the running event loop or None.
695
696    This is a low-level function intended to be used by event loops.
697    This function is thread-specific.
698    """
699    # NOTE: this function is implemented in C (see _asynciomodule.c)
700    running_loop, pid = _running_loop.loop_pid
701    if running_loop is not None and pid == os.getpid():
702        return running_loop
703
704
705def _set_running_loop(loop):
706    """Set the running event loop.
707
708    This is a low-level function intended to be used by event loops.
709    This function is thread-specific.
710    """
711    # NOTE: this function is implemented in C (see _asynciomodule.c)
712    _running_loop.loop_pid = (loop, os.getpid())
713
714
715def _init_event_loop_policy():
716    global _event_loop_policy
717    with _lock:
718        if _event_loop_policy is None:  # pragma: no branch
719            from . import DefaultEventLoopPolicy
720            _event_loop_policy = DefaultEventLoopPolicy()
721
722
723def get_event_loop_policy():
724    """Get the current event loop policy."""
725    if _event_loop_policy is None:
726        _init_event_loop_policy()
727    return _event_loop_policy
728
729
730def set_event_loop_policy(policy):
731    """Set the current event loop policy.
732
733    If policy is None, the default policy is restored."""
734    global _event_loop_policy
735    assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
736    _event_loop_policy = policy
737
738
739def get_event_loop():
740    """Return an asyncio event loop.
741
742    When called from a coroutine or a callback (e.g. scheduled with call_soon
743    or similar API), this function will always return the running event loop.
744
745    If there is no running event loop set, the function will return
746    the result of `get_event_loop_policy().get_event_loop()` call.
747    """
748    # NOTE: this function is implemented in C (see _asynciomodule.c)
749    current_loop = _get_running_loop()
750    if current_loop is not None:
751        return current_loop
752    return get_event_loop_policy().get_event_loop()
753
754
755def set_event_loop(loop):
756    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
757    get_event_loop_policy().set_event_loop(loop)
758
759
760def new_event_loop():
761    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
762    return get_event_loop_policy().new_event_loop()
763
764
765def get_child_watcher():
766    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
767    return get_event_loop_policy().get_child_watcher()
768
769
770def set_child_watcher(watcher):
771    """Equivalent to calling
772    get_event_loop_policy().set_child_watcher(watcher)."""
773    return get_event_loop_policy().set_child_watcher(watcher)
774
775
776# Alias pure-Python implementations for testing purposes.
777_py__get_running_loop = _get_running_loop
778_py__set_running_loop = _set_running_loop
779_py_get_running_loop = get_running_loop
780_py_get_event_loop = get_event_loop
781
782
783try:
784    # get_event_loop() is one of the most frequently called
785    # functions in asyncio.  Pure Python implementation is
786    # about 4 times slower than C-accelerated.
787    from _asyncio import (_get_running_loop, _set_running_loop,
788                          get_running_loop, get_event_loop)
789except ImportError:
790    pass
791else:
792    # Alias C implementations for testing purposes.
793    _c__get_running_loop = _get_running_loop
794    _c__set_running_loop = _set_running_loop
795    _c_get_running_loop = get_running_loop
796    _c_get_event_loop = get_event_loop
797