1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import types
17import warnings
18import weakref
19
20from . import base_tasks
21from . import coroutines
22from . import events
23from . import futures
24from .coroutines import coroutine
25
26
27def current_task(loop=None):
28    """Return a currently executed task."""
29    if loop is None:
30        loop = events.get_running_loop()
31    return _current_tasks.get(loop)
32
33
34def all_tasks(loop=None):
35    """Return a set of all tasks for the loop."""
36    if loop is None:
37        loop = events.get_running_loop()
38    # NB: set(_all_tasks) is required to protect
39    # from https://bugs.python.org/issue34970 bug
40    return {t for t in list(_all_tasks)
41            if futures._get_loop(t) is loop and not t.done()}
42
43
44def _all_tasks_compat(loop=None):
45    # Different from "all_task()" by returning *all* Tasks, including
46    # the completed ones.  Used to implement deprecated "Tasks.all_task()"
47    # method.
48    if loop is None:
49        loop = events.get_event_loop()
50    # NB: set(_all_tasks) is required to protect
51    # from https://bugs.python.org/issue34970 bug
52    return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
53
54
55class Task(futures._PyFuture):  # Inherit Python Task implementation
56                                # from a Python Future implementation.
57
58    """A coroutine wrapped in a Future."""
59
60    # An important invariant maintained while a Task not done:
61    #
62    # - Either _fut_waiter is None, and _step() is scheduled;
63    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
64    #
65    # The only transition from the latter to the former is through
66    # _wakeup().  When _fut_waiter is not None, one of its callbacks
67    # must be _wakeup().
68
69    # If False, don't log a message if the task is destroyed whereas its
70    # status is still pending
71    _log_destroy_pending = True
72
73    @classmethod
74    def current_task(cls, loop=None):
75        """Return the currently running task in an event loop or None.
76
77        By default the current task for the current event loop is returned.
78
79        None is returned when called not in the context of a Task.
80        """
81        warnings.warn("Task.current_task() is deprecated, "
82                      "use asyncio.current_task() instead",
83                      PendingDeprecationWarning,
84                      stacklevel=2)
85        if loop is None:
86            loop = events.get_event_loop()
87        return current_task(loop)
88
89    @classmethod
90    def all_tasks(cls, loop=None):
91        """Return a set of all tasks for an event loop.
92
93        By default all tasks for the current event loop are returned.
94        """
95        warnings.warn("Task.all_tasks() is deprecated, "
96                      "use asyncio.all_tasks() instead",
97                      PendingDeprecationWarning,
98                      stacklevel=2)
99        return _all_tasks_compat(loop)
100
101    def __init__(self, coro, *, loop=None):
102        super().__init__(loop=loop)
103        if self._source_traceback:
104            del self._source_traceback[-1]
105        if not coroutines.iscoroutine(coro):
106            # raise after Future.__init__(), attrs are required for __del__
107            # prevent logging for pending task in __del__
108            self._log_destroy_pending = False
109            raise TypeError(f"a coroutine was expected, got {coro!r}")
110
111        self._must_cancel = False
112        self._fut_waiter = None
113        self._coro = coro
114        self._context = contextvars.copy_context()
115
116        self._loop.call_soon(self.__step, context=self._context)
117        _register_task(self)
118
119    def __del__(self):
120        if self._state == futures._PENDING and self._log_destroy_pending:
121            context = {
122                'task': self,
123                'message': 'Task was destroyed but it is pending!',
124            }
125            if self._source_traceback:
126                context['source_traceback'] = self._source_traceback
127            self._loop.call_exception_handler(context)
128        super().__del__()
129
130    def _repr_info(self):
131        return base_tasks._task_repr_info(self)
132
133    def set_result(self, result):
134        raise RuntimeError('Task does not support set_result operation')
135
136    def set_exception(self, exception):
137        raise RuntimeError('Task does not support set_exception operation')
138
139    def get_stack(self, *, limit=None):
140        """Return the list of stack frames for this task's coroutine.
141
142        If the coroutine is not done, this returns the stack where it is
143        suspended.  If the coroutine has completed successfully or was
144        cancelled, this returns an empty list.  If the coroutine was
145        terminated by an exception, this returns the list of traceback
146        frames.
147
148        The frames are always ordered from oldest to newest.
149
150        The optional limit gives the maximum number of frames to
151        return; by default all available frames are returned.  Its
152        meaning differs depending on whether a stack or a traceback is
153        returned: the newest frames of a stack are returned, but the
154        oldest frames of a traceback are returned.  (This matches the
155        behavior of the traceback module.)
156
157        For reasons beyond our control, only one stack frame is
158        returned for a suspended coroutine.
159        """
160        return base_tasks._task_get_stack(self, limit)
161
162    def print_stack(self, *, limit=None, file=None):
163        """Print the stack or traceback for this task's coroutine.
164
165        This produces output similar to that of the traceback module,
166        for the frames retrieved by get_stack().  The limit argument
167        is passed to get_stack().  The file argument is an I/O stream
168        to which the output is written; by default output is written
169        to sys.stderr.
170        """
171        return base_tasks._task_print_stack(self, limit, file)
172
173    def cancel(self):
174        """Request that this task cancel itself.
175
176        This arranges for a CancelledError to be thrown into the
177        wrapped coroutine on the next cycle through the event loop.
178        The coroutine then has a chance to clean up or even deny
179        the request using try/except/finally.
180
181        Unlike Future.cancel, this does not guarantee that the
182        task will be cancelled: the exception might be caught and
183        acted upon, delaying cancellation of the task or preventing
184        cancellation completely.  The task may also return a value or
185        raise a different exception.
186
187        Immediately after this method is called, Task.cancelled() will
188        not return True (unless the task was already cancelled).  A
189        task will be marked as cancelled when the wrapped coroutine
190        terminates with a CancelledError exception (even if cancel()
191        was not called).
192        """
193        self._log_traceback = False
194        if self.done():
195            return False
196        if self._fut_waiter is not None:
197            if self._fut_waiter.cancel():
198                # Leave self._fut_waiter; it may be a Task that
199                # catches and ignores the cancellation so we may have
200                # to cancel it again later.
201                return True
202        # It must be the case that self.__step is already scheduled.
203        self._must_cancel = True
204        return True
205
206    def __step(self, exc=None):
207        if self.done():
208            raise futures.InvalidStateError(
209                f'_step(): already done: {self!r}, {exc!r}')
210        if self._must_cancel:
211            if not isinstance(exc, futures.CancelledError):
212                exc = futures.CancelledError()
213            self._must_cancel = False
214        coro = self._coro
215        self._fut_waiter = None
216
217        _enter_task(self._loop, self)
218        # Call either coro.throw(exc) or coro.send(None).
219        try:
220            if exc is None:
221                # We use the `send` method directly, because coroutines
222                # don't have `__iter__` and `__next__` methods.
223                result = coro.send(None)
224            else:
225                result = coro.throw(exc)
226        except StopIteration as exc:
227            if self._must_cancel:
228                # Task is cancelled right before coro stops.
229                self._must_cancel = False
230                super().set_exception(futures.CancelledError())
231            else:
232                super().set_result(exc.value)
233        except futures.CancelledError:
234            super().cancel()  # I.e., Future.cancel(self).
235        except Exception as exc:
236            super().set_exception(exc)
237        except BaseException as exc:
238            super().set_exception(exc)
239            raise
240        else:
241            blocking = getattr(result, '_asyncio_future_blocking', None)
242            if blocking is not None:
243                # Yielded Future must come from Future.__iter__().
244                if futures._get_loop(result) is not self._loop:
245                    new_exc = RuntimeError(
246                        f'Task {self!r} got Future '
247                        f'{result!r} attached to a different loop')
248                    self._loop.call_soon(
249                        self.__step, new_exc, context=self._context)
250                elif blocking:
251                    if result is self:
252                        new_exc = RuntimeError(
253                            f'Task cannot await on itself: {self!r}')
254                        self._loop.call_soon(
255                            self.__step, new_exc, context=self._context)
256                    else:
257                        result._asyncio_future_blocking = False
258                        result.add_done_callback(
259                            self.__wakeup, context=self._context)
260                        self._fut_waiter = result
261                        if self._must_cancel:
262                            if self._fut_waiter.cancel():
263                                self._must_cancel = False
264                else:
265                    new_exc = RuntimeError(
266                        f'yield was used instead of yield from '
267                        f'in task {self!r} with {result!r}')
268                    self._loop.call_soon(
269                        self.__step, new_exc, context=self._context)
270
271            elif result is None:
272                # Bare yield relinquishes control for one event loop iteration.
273                self._loop.call_soon(self.__step, context=self._context)
274            elif inspect.isgenerator(result):
275                # Yielding a generator is just wrong.
276                new_exc = RuntimeError(
277                    f'yield was used instead of yield from for '
278                    f'generator in task {self!r} with {result!r}')
279                self._loop.call_soon(
280                    self.__step, new_exc, context=self._context)
281            else:
282                # Yielding something else is an error.
283                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
284                self._loop.call_soon(
285                    self.__step, new_exc, context=self._context)
286        finally:
287            _leave_task(self._loop, self)
288            self = None  # Needed to break cycles when an exception occurs.
289
290    def __wakeup(self, future):
291        try:
292            future.result()
293        except Exception as exc:
294            # This may also be a cancellation.
295            self.__step(exc)
296        else:
297            # Don't pass the value of `future.result()` explicitly,
298            # as `Future.__iter__` and `Future.__await__` don't need it.
299            # If we call `_step(value, None)` instead of `_step()`,
300            # Python eval loop would use `.send(value)` method call,
301            # instead of `__next__()`, which is slower for futures
302            # that return non-generator iterators from their `__iter__`.
303            self.__step()
304        self = None  # Needed to break cycles when an exception occurs.
305
306
307_PyTask = Task
308
309
310try:
311    import _asyncio
312except ImportError:
313    pass
314else:
315    # _CTask is needed for tests.
316    Task = _CTask = _asyncio.Task
317
318
319def create_task(coro):
320    """Schedule the execution of a coroutine object in a spawn task.
321
322    Return a Task object.
323    """
324    loop = events.get_running_loop()
325    return loop.create_task(coro)
326
327
328# wait() and as_completed() similar to those in PEP 3148.
329
330FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
331FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
332ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
333
334
335async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
336    """Wait for the Futures and coroutines given by fs to complete.
337
338    The sequence futures must not be empty.
339
340    Coroutines will be wrapped in Tasks.
341
342    Returns two sets of Future: (done, pending).
343
344    Usage:
345
346        done, pending = await asyncio.wait(fs)
347
348    Note: This does not raise TimeoutError! Futures that aren't done
349    when the timeout occurs are returned in the second set.
350    """
351    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
352        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
353    if not fs:
354        raise ValueError('Set of coroutines/Futures is empty.')
355    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
356        raise ValueError(f'Invalid return_when value: {return_when}')
357
358    if loop is None:
359        loop = events.get_event_loop()
360
361    fs = {ensure_future(f, loop=loop) for f in set(fs)}
362
363    return await _wait(fs, timeout, return_when, loop)
364
365
366def _release_waiter(waiter, *args):
367    if not waiter.done():
368        waiter.set_result(None)
369
370
371async def wait_for(fut, timeout, *, loop=None):
372    """Wait for the single Future or coroutine to complete, with timeout.
373
374    Coroutine will be wrapped in Task.
375
376    Returns result of the Future or coroutine.  When a timeout occurs,
377    it cancels the task and raises TimeoutError.  To avoid the task
378    cancellation, wrap it in shield().
379
380    If the wait is cancelled, the task is also cancelled.
381
382    This function is a coroutine.
383    """
384    if loop is None:
385        loop = events.get_event_loop()
386
387    if timeout is None:
388        return await fut
389
390    if timeout <= 0:
391        fut = ensure_future(fut, loop=loop)
392
393        if fut.done():
394            return fut.result()
395
396        fut.cancel()
397        raise futures.TimeoutError()
398
399    waiter = loop.create_future()
400    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
401    cb = functools.partial(_release_waiter, waiter)
402
403    fut = ensure_future(fut, loop=loop)
404    fut.add_done_callback(cb)
405
406    try:
407        # wait until the future completes or the timeout
408        try:
409            await waiter
410        except futures.CancelledError:
411            fut.remove_done_callback(cb)
412            fut.cancel()
413            raise
414
415        if fut.done():
416            return fut.result()
417        else:
418            fut.remove_done_callback(cb)
419            # We must ensure that the task is not running
420            # after wait_for() returns.
421            # See https://bugs.python.org/issue32751
422            await _cancel_and_wait(fut, loop=loop)
423            raise futures.TimeoutError()
424    finally:
425        timeout_handle.cancel()
426
427
428async def _wait(fs, timeout, return_when, loop):
429    """Internal helper for wait().
430
431    The fs argument must be a collection of Futures.
432    """
433    assert fs, 'Set of Futures is empty.'
434    waiter = loop.create_future()
435    timeout_handle = None
436    if timeout is not None:
437        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
438    counter = len(fs)
439
440    def _on_completion(f):
441        nonlocal counter
442        counter -= 1
443        if (counter <= 0 or
444            return_when == FIRST_COMPLETED or
445            return_when == FIRST_EXCEPTION and (not f.cancelled() and
446                                                f.exception() is not None)):
447            if timeout_handle is not None:
448                timeout_handle.cancel()
449            if not waiter.done():
450                waiter.set_result(None)
451
452    for f in fs:
453        f.add_done_callback(_on_completion)
454
455    try:
456        await waiter
457    finally:
458        if timeout_handle is not None:
459            timeout_handle.cancel()
460
461    done, pending = set(), set()
462    for f in fs:
463        f.remove_done_callback(_on_completion)
464        if f.done():
465            done.add(f)
466        else:
467            pending.add(f)
468    return done, pending
469
470
471async def _cancel_and_wait(fut, loop):
472    """Cancel the *fut* future or task and wait until it completes."""
473
474    waiter = loop.create_future()
475    cb = functools.partial(_release_waiter, waiter)
476    fut.add_done_callback(cb)
477
478    try:
479        fut.cancel()
480        # We cannot wait on *fut* directly to make
481        # sure _cancel_and_wait itself is reliably cancellable.
482        await waiter
483    finally:
484        fut.remove_done_callback(cb)
485
486
487# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
488def as_completed(fs, *, loop=None, timeout=None):
489    """Return an iterator whose values are coroutines.
490
491    When waiting for the yielded coroutines you'll get the results (or
492    exceptions!) of the original Futures (or coroutines), in the order
493    in which and as soon as they complete.
494
495    This differs from PEP 3148; the proper way to use this is:
496
497        for f in as_completed(fs):
498            result = await f  # The 'await' may raise.
499            # Use result.
500
501    If a timeout is specified, the 'await' will raise
502    TimeoutError when the timeout occurs before all Futures are done.
503
504    Note: The futures 'f' are not necessarily members of fs.
505    """
506    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
507        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
508    loop = loop if loop is not None else events.get_event_loop()
509    todo = {ensure_future(f, loop=loop) for f in set(fs)}
510    from .queues import Queue  # Import here to avoid circular import problem.
511    done = Queue(loop=loop)
512    timeout_handle = None
513
514    def _on_timeout():
515        for f in todo:
516            f.remove_done_callback(_on_completion)
517            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
518        todo.clear()  # Can't do todo.remove(f) in the loop.
519
520    def _on_completion(f):
521        if not todo:
522            return  # _on_timeout() was here first.
523        todo.remove(f)
524        done.put_nowait(f)
525        if not todo and timeout_handle is not None:
526            timeout_handle.cancel()
527
528    async def _wait_for_one():
529        f = await done.get()
530        if f is None:
531            # Dummy value from _on_timeout().
532            raise futures.TimeoutError
533        return f.result()  # May raise f.exception().
534
535    for f in todo:
536        f.add_done_callback(_on_completion)
537    if todo and timeout is not None:
538        timeout_handle = loop.call_later(timeout, _on_timeout)
539    for _ in range(len(todo)):
540        yield _wait_for_one()
541
542
543@types.coroutine
544def __sleep0():
545    """Skip one event loop run cycle.
546
547    This is a private helper for 'asyncio.sleep()', used
548    when the 'delay' is set to 0.  It uses a bare 'yield'
549    expression (which Task.__step knows how to handle)
550    instead of creating a Future object.
551    """
552    yield
553
554
555async def sleep(delay, result=None, *, loop=None):
556    """Coroutine that completes after a given time (in seconds)."""
557    if delay <= 0:
558        await __sleep0()
559        return result
560
561    if loop is None:
562        loop = events.get_event_loop()
563    future = loop.create_future()
564    h = loop.call_later(delay,
565                        futures._set_result_unless_cancelled,
566                        future, result)
567    try:
568        return await future
569    finally:
570        h.cancel()
571
572
573def ensure_future(coro_or_future, *, loop=None):
574    """Wrap a coroutine or an awaitable in a future.
575
576    If the argument is a Future, it is returned directly.
577    """
578    if coroutines.iscoroutine(coro_or_future):
579        if loop is None:
580            loop = events.get_event_loop()
581        task = loop.create_task(coro_or_future)
582        if task._source_traceback:
583            del task._source_traceback[-1]
584        return task
585    elif futures.isfuture(coro_or_future):
586        if loop is not None and loop is not futures._get_loop(coro_or_future):
587            raise ValueError('loop argument must agree with Future')
588        return coro_or_future
589    elif inspect.isawaitable(coro_or_future):
590        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
591    else:
592        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
593                        'required')
594
595
596@coroutine
597def _wrap_awaitable(awaitable):
598    """Helper for asyncio.ensure_future().
599
600    Wraps awaitable (an object with __await__) into a coroutine
601    that will later be wrapped in a Task by ensure_future().
602    """
603    return (yield from awaitable.__await__())
604
605
606class _GatheringFuture(futures.Future):
607    """Helper for gather().
608
609    This overrides cancel() to cancel all the children and act more
610    like Task.cancel(), which doesn't immediately mark itself as
611    cancelled.
612    """
613
614    def __init__(self, children, *, loop=None):
615        super().__init__(loop=loop)
616        self._children = children
617        self._cancel_requested = False
618
619    def cancel(self):
620        if self.done():
621            return False
622        ret = False
623        for child in self._children:
624            if child.cancel():
625                ret = True
626        if ret:
627            # If any child tasks were actually cancelled, we should
628            # propagate the cancellation request regardless of
629            # *return_exceptions* argument.  See issue 32684.
630            self._cancel_requested = True
631        return ret
632
633
634def gather(*coros_or_futures, loop=None, return_exceptions=False):
635    """Return a future aggregating results from the given coroutines/futures.
636
637    Coroutines will be wrapped in a future and scheduled in the event
638    loop. They will not necessarily be scheduled in the same order as
639    passed in.
640
641    All futures must share the same event loop.  If all the tasks are
642    done successfully, the returned future's result is the list of
643    results (in the order of the original sequence, not necessarily
644    the order of results arrival).  If *return_exceptions* is True,
645    exceptions in the tasks are treated the same as successful
646    results, and gathered in the result list; otherwise, the first
647    raised exception will be immediately propagated to the returned
648    future.
649
650    Cancellation: if the outer Future is cancelled, all children (that
651    have not completed yet) are also cancelled.  If any child is
652    cancelled, this is treated as if it raised CancelledError --
653    the outer Future is *not* cancelled in this case.  (This is to
654    prevent the cancellation of one child to cause other children to
655    be cancelled.)
656    """
657    if not coros_or_futures:
658        if loop is None:
659            loop = events.get_event_loop()
660        outer = loop.create_future()
661        outer.set_result([])
662        return outer
663
664    def _done_callback(fut):
665        nonlocal nfinished
666        nfinished += 1
667
668        if outer.done():
669            if not fut.cancelled():
670                # Mark exception retrieved.
671                fut.exception()
672            return
673
674        if not return_exceptions:
675            if fut.cancelled():
676                # Check if 'fut' is cancelled first, as
677                # 'fut.exception()' will *raise* a CancelledError
678                # instead of returning it.
679                exc = futures.CancelledError()
680                outer.set_exception(exc)
681                return
682            else:
683                exc = fut.exception()
684                if exc is not None:
685                    outer.set_exception(exc)
686                    return
687
688        if nfinished == nfuts:
689            # All futures are done; create a list of results
690            # and set it to the 'outer' future.
691            results = []
692
693            for fut in children:
694                if fut.cancelled():
695                    # Check if 'fut' is cancelled first, as
696                    # 'fut.exception()' will *raise* a CancelledError
697                    # instead of returning it.
698                    res = futures.CancelledError()
699                else:
700                    res = fut.exception()
701                    if res is None:
702                        res = fut.result()
703                results.append(res)
704
705            if outer._cancel_requested:
706                # If gather is being cancelled we must propagate the
707                # cancellation regardless of *return_exceptions* argument.
708                # See issue 32684.
709                outer.set_exception(futures.CancelledError())
710            else:
711                outer.set_result(results)
712
713    arg_to_fut = {}
714    children = []
715    nfuts = 0
716    nfinished = 0
717    for arg in coros_or_futures:
718        if arg not in arg_to_fut:
719            fut = ensure_future(arg, loop=loop)
720            if loop is None:
721                loop = futures._get_loop(fut)
722            if fut is not arg:
723                # 'arg' was not a Future, therefore, 'fut' is a new
724                # Future created specifically for 'arg'.  Since the caller
725                # can't control it, disable the "destroy pending task"
726                # warning.
727                fut._log_destroy_pending = False
728
729            nfuts += 1
730            arg_to_fut[arg] = fut
731            fut.add_done_callback(_done_callback)
732
733        else:
734            # There's a duplicate Future object in coros_or_futures.
735            fut = arg_to_fut[arg]
736
737        children.append(fut)
738
739    outer = _GatheringFuture(children, loop=loop)
740    return outer
741
742
743def shield(arg, *, loop=None):
744    """Wait for a future, shielding it from cancellation.
745
746    The statement
747
748        res = await shield(something())
749
750    is exactly equivalent to the statement
751
752        res = await something()
753
754    *except* that if the coroutine containing it is cancelled, the
755    task running in something() is not cancelled.  From the POV of
756    something(), the cancellation did not happen.  But its caller is
757    still cancelled, so the yield-from expression still raises
758    CancelledError.  Note: If something() is cancelled by other means
759    this will still cancel shield().
760
761    If you want to completely ignore cancellation (not recommended)
762    you can combine shield() with a try/except clause, as follows:
763
764        try:
765            res = await shield(something())
766        except CancelledError:
767            res = None
768    """
769    inner = ensure_future(arg, loop=loop)
770    if inner.done():
771        # Shortcut.
772        return inner
773    loop = futures._get_loop(inner)
774    outer = loop.create_future()
775
776    def _done_callback(inner):
777        if outer.cancelled():
778            if not inner.cancelled():
779                # Mark inner's result as retrieved.
780                inner.exception()
781            return
782
783        if inner.cancelled():
784            outer.cancel()
785        else:
786            exc = inner.exception()
787            if exc is not None:
788                outer.set_exception(exc)
789            else:
790                outer.set_result(inner.result())
791
792    inner.add_done_callback(_done_callback)
793    return outer
794
795
796def run_coroutine_threadsafe(coro, loop):
797    """Submit a coroutine object to a given event loop.
798
799    Return a concurrent.futures.Future to access the result.
800    """
801    if not coroutines.iscoroutine(coro):
802        raise TypeError('A coroutine object is required')
803    future = concurrent.futures.Future()
804
805    def callback():
806        try:
807            futures._chain_future(ensure_future(coro, loop=loop), future)
808        except Exception as exc:
809            if future.set_running_or_notify_cancel():
810                future.set_exception(exc)
811            raise
812
813    loop.call_soon_threadsafe(callback)
814    return future
815
816
817# WeakSet containing all alive tasks.
818_all_tasks = weakref.WeakSet()
819
820# Dictionary containing tasks that are currently active in
821# all running event loops.  {EventLoop: Task}
822_current_tasks = {}
823
824
825def _register_task(task):
826    """Register a new task in asyncio as executed by loop."""
827    _all_tasks.add(task)
828
829
830def _enter_task(loop, task):
831    current_task = _current_tasks.get(loop)
832    if current_task is not None:
833        raise RuntimeError(f"Cannot enter into task {task!r} while another "
834                           f"task {current_task!r} is being executed.")
835    _current_tasks[loop] = task
836
837
838def _leave_task(loop, task):
839    current_task = _current_tasks.get(loop)
840    if current_task is not task:
841        raise RuntimeError(f"Leaving task {task!r} does not match "
842                           f"the current task {current_task!r}.")
843    del _current_tasks[loop]
844
845
846def _unregister_task(task):
847    """Unregister a task."""
848    _all_tasks.discard(task)
849
850
851_py_register_task = _register_task
852_py_unregister_task = _unregister_task
853_py_enter_task = _enter_task
854_py_leave_task = _leave_task
855
856
857try:
858    from _asyncio import (_register_task, _unregister_task,
859                          _enter_task, _leave_task,
860                          _all_tasks, _current_tasks)
861except ImportError:
862    pass
863else:
864    _c_register_task = _register_task
865    _c_unregister_task = _unregister_task
866    _c_enter_task = _enter_task
867    _c_leave_task = _leave_task
868