1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import traceback
14import types
15import unittest
16import weakref
17from unittest import mock
18
19import asyncio
20from asyncio import coroutines
21from asyncio import futures
22from asyncio import tasks
23from test.test_asyncio import utils as test_utils
24from test import support
25from test.support.script_helper import assert_python_ok
26
27
28def tearDownModule():
29    asyncio.set_event_loop_policy(None)
30
31
32async def coroutine_function():
33    pass
34
35
36@contextlib.contextmanager
37def set_coroutine_debug(enabled):
38    coroutines = asyncio.coroutines
39
40    old_debug = coroutines._DEBUG
41    try:
42        coroutines._DEBUG = enabled
43        yield
44    finally:
45        coroutines._DEBUG = old_debug
46
47
48def format_coroutine(qualname, state, src, source_traceback, generator=False):
49    if generator:
50        state = '%s' % state
51    else:
52        state = '%s, defined' % state
53    if source_traceback is not None:
54        frame = source_traceback[-1]
55        return ('coro=<%s() %s at %s> created at %s:%s'
56                % (qualname, state, src, frame[0], frame[1]))
57    else:
58        return 'coro=<%s() %s at %s>' % (qualname, state, src)
59
60
61def get_innermost_context(exc):
62    """
63    Return information about the innermost exception context in the chain.
64    """
65    depth = 0
66    while True:
67        context = exc.__context__
68        if context is None:
69            break
70
71        exc = context
72        depth += 1
73
74    return (type(exc), exc.args, depth)
75
76
77class Dummy:
78
79    def __repr__(self):
80        return '<Dummy>'
81
82    def __call__(self, *args):
83        pass
84
85
86class CoroLikeObject:
87    def send(self, v):
88        raise StopIteration(42)
89
90    def throw(self, *exc):
91        pass
92
93    def close(self):
94        pass
95
96    def __await__(self):
97        return self
98
99
100# The following value can be used as a very small timeout:
101# it passes check "timeout > 0", but has almost
102# no effect on the test performance
103_EPSILON = 0.0001
104
105
106class BaseTaskTests:
107
108    Task = None
109    Future = None
110
111    def new_task(self, loop, coro, name='TestTask'):
112        return self.__class__.Task(coro, loop=loop, name=name)
113
114    def new_future(self, loop):
115        return self.__class__.Future(loop=loop)
116
117    def setUp(self):
118        super().setUp()
119        self.loop = self.new_test_loop()
120        self.loop.set_task_factory(self.new_task)
121        self.loop.create_future = lambda: self.new_future(self.loop)
122
123    def test_task_cancel_message_getter(self):
124        async def coro():
125            pass
126        t = self.new_task(self.loop, coro())
127        self.assertTrue(hasattr(t, '_cancel_message'))
128        self.assertEqual(t._cancel_message, None)
129
130        t.cancel('my message')
131        self.assertEqual(t._cancel_message, 'my message')
132
133        with self.assertRaises(asyncio.CancelledError):
134            self.loop.run_until_complete(t)
135
136    def test_task_cancel_message_setter(self):
137        async def coro():
138            pass
139        t = self.new_task(self.loop, coro())
140        t.cancel('my message')
141        t._cancel_message = 'my new message'
142        self.assertEqual(t._cancel_message, 'my new message')
143
144        with self.assertRaises(asyncio.CancelledError):
145            self.loop.run_until_complete(t)
146
147    def test_task_del_collect(self):
148        class Evil:
149            def __del__(self):
150                gc.collect()
151
152        async def run():
153            return Evil()
154
155        self.loop.run_until_complete(
156            asyncio.gather(*[
157                self.new_task(self.loop, run()) for _ in range(100)
158            ], loop=self.loop))
159
160    def test_other_loop_future(self):
161        other_loop = asyncio.new_event_loop()
162        fut = self.new_future(other_loop)
163
164        async def run(fut):
165            await fut
166
167        try:
168            with self.assertRaisesRegex(RuntimeError,
169                                        r'Task .* got Future .* attached'):
170                self.loop.run_until_complete(run(fut))
171        finally:
172            other_loop.close()
173
174    def test_task_awaits_on_itself(self):
175
176        async def test():
177            await task
178
179        task = asyncio.ensure_future(test(), loop=self.loop)
180
181        with self.assertRaisesRegex(RuntimeError,
182                                    'Task cannot await on itself'):
183            self.loop.run_until_complete(task)
184
185    def test_task_class(self):
186        async def notmuch():
187            return 'ok'
188        t = self.new_task(self.loop, notmuch())
189        self.loop.run_until_complete(t)
190        self.assertTrue(t.done())
191        self.assertEqual(t.result(), 'ok')
192        self.assertIs(t._loop, self.loop)
193        self.assertIs(t.get_loop(), self.loop)
194
195        loop = asyncio.new_event_loop()
196        self.set_event_loop(loop)
197        t = self.new_task(loop, notmuch())
198        self.assertIs(t._loop, loop)
199        loop.run_until_complete(t)
200        loop.close()
201
202    def test_ensure_future_coroutine(self):
203        with self.assertWarns(DeprecationWarning):
204            @asyncio.coroutine
205            def notmuch():
206                return 'ok'
207        t = asyncio.ensure_future(notmuch(), loop=self.loop)
208        self.loop.run_until_complete(t)
209        self.assertTrue(t.done())
210        self.assertEqual(t.result(), 'ok')
211        self.assertIs(t._loop, self.loop)
212
213        loop = asyncio.new_event_loop()
214        self.set_event_loop(loop)
215        t = asyncio.ensure_future(notmuch(), loop=loop)
216        self.assertIs(t._loop, loop)
217        loop.run_until_complete(t)
218        loop.close()
219
220    def test_ensure_future_future(self):
221        f_orig = self.new_future(self.loop)
222        f_orig.set_result('ko')
223
224        f = asyncio.ensure_future(f_orig)
225        self.loop.run_until_complete(f)
226        self.assertTrue(f.done())
227        self.assertEqual(f.result(), 'ko')
228        self.assertIs(f, f_orig)
229
230        loop = asyncio.new_event_loop()
231        self.set_event_loop(loop)
232
233        with self.assertRaises(ValueError):
234            f = asyncio.ensure_future(f_orig, loop=loop)
235
236        loop.close()
237
238        f = asyncio.ensure_future(f_orig, loop=self.loop)
239        self.assertIs(f, f_orig)
240
241    def test_ensure_future_task(self):
242        async def notmuch():
243            return 'ok'
244        t_orig = self.new_task(self.loop, notmuch())
245        t = asyncio.ensure_future(t_orig)
246        self.loop.run_until_complete(t)
247        self.assertTrue(t.done())
248        self.assertEqual(t.result(), 'ok')
249        self.assertIs(t, t_orig)
250
251        loop = asyncio.new_event_loop()
252        self.set_event_loop(loop)
253
254        with self.assertRaises(ValueError):
255            t = asyncio.ensure_future(t_orig, loop=loop)
256
257        loop.close()
258
259        t = asyncio.ensure_future(t_orig, loop=self.loop)
260        self.assertIs(t, t_orig)
261
262    def test_ensure_future_awaitable(self):
263        class Aw:
264            def __init__(self, coro):
265                self.coro = coro
266            def __await__(self):
267                return (yield from self.coro)
268
269        with self.assertWarns(DeprecationWarning):
270            @asyncio.coroutine
271            def coro():
272                return 'ok'
273
274        loop = asyncio.new_event_loop()
275        self.set_event_loop(loop)
276        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
277        loop.run_until_complete(fut)
278        assert fut.result() == 'ok'
279
280    def test_ensure_future_neither(self):
281        with self.assertRaises(TypeError):
282            asyncio.ensure_future('ok')
283
284    def test_ensure_future_error_msg(self):
285        loop = asyncio.new_event_loop()
286        f = self.new_future(self.loop)
287        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
288                                    'different loop than the one specified as '
289                                    'the loop argument'):
290            asyncio.ensure_future(f, loop=loop)
291        loop.close()
292
293    def test_get_stack(self):
294        T = None
295
296        async def foo():
297            await bar()
298
299        async def bar():
300            # test get_stack()
301            f = T.get_stack(limit=1)
302            try:
303                self.assertEqual(f[0].f_code.co_name, 'foo')
304            finally:
305                f = None
306
307            # test print_stack()
308            file = io.StringIO()
309            T.print_stack(limit=1, file=file)
310            file.seek(0)
311            tb = file.read()
312            self.assertRegex(tb, r'foo\(\) running')
313
314        async def runner():
315            nonlocal T
316            T = asyncio.ensure_future(foo(), loop=self.loop)
317            await T
318
319        self.loop.run_until_complete(runner())
320
321    def test_task_repr(self):
322        self.loop.set_debug(False)
323
324        async def notmuch():
325            return 'abc'
326
327        # test coroutine function
328        self.assertEqual(notmuch.__name__, 'notmuch')
329        self.assertRegex(notmuch.__qualname__,
330                         r'\w+.test_task_repr.<locals>.notmuch')
331        self.assertEqual(notmuch.__module__, __name__)
332
333        filename, lineno = test_utils.get_function_source(notmuch)
334        src = "%s:%s" % (filename, lineno)
335
336        # test coroutine object
337        gen = notmuch()
338        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
339        self.assertEqual(gen.__name__, 'notmuch')
340        self.assertEqual(gen.__qualname__, coro_qualname)
341
342        # test pending Task
343        t = self.new_task(self.loop, gen)
344        t.add_done_callback(Dummy())
345
346        coro = format_coroutine(coro_qualname, 'running', src,
347                                t._source_traceback, generator=True)
348        self.assertEqual(repr(t),
349                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
350
351        # test cancelling Task
352        t.cancel()  # Does not take immediate effect!
353        self.assertEqual(repr(t),
354                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
355
356        # test cancelled Task
357        self.assertRaises(asyncio.CancelledError,
358                          self.loop.run_until_complete, t)
359        coro = format_coroutine(coro_qualname, 'done', src,
360                                t._source_traceback)
361        self.assertEqual(repr(t),
362                         "<Task cancelled name='TestTask' %s>" % coro)
363
364        # test finished Task
365        t = self.new_task(self.loop, notmuch())
366        self.loop.run_until_complete(t)
367        coro = format_coroutine(coro_qualname, 'done', src,
368                                t._source_traceback)
369        self.assertEqual(repr(t),
370                         "<Task finished name='TestTask' %s result='abc'>" % coro)
371
372    def test_task_repr_autogenerated(self):
373        async def notmuch():
374            return 123
375
376        t1 = self.new_task(self.loop, notmuch(), None)
377        t2 = self.new_task(self.loop, notmuch(), None)
378        self.assertNotEqual(repr(t1), repr(t2))
379
380        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
381        self.assertIsNotNone(match1)
382        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
383        self.assertIsNotNone(match2)
384
385        # Autogenerated task names should have monotonically increasing numbers
386        self.assertLess(int(match1.group(1)), int(match2.group(1)))
387        self.loop.run_until_complete(t1)
388        self.loop.run_until_complete(t2)
389
390    def test_task_repr_name_not_str(self):
391        async def notmuch():
392            return 123
393
394        t = self.new_task(self.loop, notmuch())
395        t.set_name({6})
396        self.assertEqual(t.get_name(), '{6}')
397        self.loop.run_until_complete(t)
398
399    def test_task_repr_coro_decorator(self):
400        self.loop.set_debug(False)
401
402        with self.assertWarns(DeprecationWarning):
403            @asyncio.coroutine
404            def notmuch():
405                # notmuch() function doesn't use yield from: it will be wrapped by
406                # @coroutine decorator
407                return 123
408
409        # test coroutine function
410        self.assertEqual(notmuch.__name__, 'notmuch')
411        self.assertRegex(notmuch.__qualname__,
412                         r'\w+.test_task_repr_coro_decorator'
413                         r'\.<locals>\.notmuch')
414        self.assertEqual(notmuch.__module__, __name__)
415
416        # test coroutine object
417        gen = notmuch()
418        # On Python >= 3.5, generators now inherit the name of the
419        # function, as expected, and have a qualified name (__qualname__
420        # attribute).
421        coro_name = 'notmuch'
422        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
423                         '.<locals>.notmuch')
424        self.assertEqual(gen.__name__, coro_name)
425        self.assertEqual(gen.__qualname__, coro_qualname)
426
427        # test repr(CoroWrapper)
428        if coroutines._DEBUG:
429            # format the coroutine object
430            if coroutines._DEBUG:
431                filename, lineno = test_utils.get_function_source(notmuch)
432                frame = gen._source_traceback[-1]
433                coro = ('%s() running, defined at %s:%s, created at %s:%s'
434                        % (coro_qualname, filename, lineno,
435                           frame[0], frame[1]))
436            else:
437                code = gen.gi_code
438                coro = ('%s() running at %s:%s'
439                        % (coro_qualname, code.co_filename,
440                           code.co_firstlineno))
441
442            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
443
444        # test pending Task
445        t = self.new_task(self.loop, gen)
446        t.add_done_callback(Dummy())
447
448        # format the coroutine object
449        if coroutines._DEBUG:
450            src = '%s:%s' % test_utils.get_function_source(notmuch)
451        else:
452            code = gen.gi_code
453            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
454        coro = format_coroutine(coro_qualname, 'running', src,
455                                t._source_traceback,
456                                generator=not coroutines._DEBUG)
457        self.assertEqual(repr(t),
458                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
459        self.loop.run_until_complete(t)
460
461    def test_task_repr_wait_for(self):
462        self.loop.set_debug(False)
463
464        async def wait_for(fut):
465            return await fut
466
467        fut = self.new_future(self.loop)
468        task = self.new_task(self.loop, wait_for(fut))
469        test_utils.run_briefly(self.loop)
470        self.assertRegex(repr(task),
471                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
472
473        fut.set_result(None)
474        self.loop.run_until_complete(task)
475
476    def test_task_repr_partial_corowrapper(self):
477        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
478        # coroutine is a partial function
479        with set_coroutine_debug(True):
480            self.loop.set_debug(True)
481
482            async def func(x, y):
483                await asyncio.sleep(0)
484
485            with self.assertWarns(DeprecationWarning):
486                partial_func = asyncio.coroutine(functools.partial(func, 1))
487            task = self.loop.create_task(partial_func(2))
488
489            # make warnings quiet
490            task._log_destroy_pending = False
491            self.addCleanup(task._coro.close)
492
493        coro_repr = repr(task._coro)
494        expected = (
495            r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
496            r'\.<locals>\.func at'
497        )
498        self.assertRegex(coro_repr, expected)
499
500    def test_task_basics(self):
501
502        async def outer():
503            a = await inner1()
504            b = await inner2()
505            return a+b
506
507        async def inner1():
508            return 42
509
510        async def inner2():
511            return 1000
512
513        t = outer()
514        self.assertEqual(self.loop.run_until_complete(t), 1042)
515
516    def test_exception_chaining_after_await(self):
517        # Test that when awaiting on a task when an exception is already
518        # active, if the task raises an exception it will be chained
519        # with the original.
520        loop = asyncio.new_event_loop()
521        self.set_event_loop(loop)
522
523        async def raise_error():
524            raise ValueError
525
526        async def run():
527            try:
528                raise KeyError(3)
529            except Exception as exc:
530                task = self.new_task(loop, raise_error())
531                try:
532                    await task
533                except Exception as exc:
534                    self.assertEqual(type(exc), ValueError)
535                    chained = exc.__context__
536                    self.assertEqual((type(chained), chained.args),
537                        (KeyError, (3,)))
538
539        try:
540            task = self.new_task(loop, run())
541            loop.run_until_complete(task)
542        finally:
543            loop.close()
544
545    def test_exception_chaining_after_await_with_context_cycle(self):
546        # Check trying to create an exception context cycle:
547        # https://bugs.python.org/issue40696
548        has_cycle = None
549        loop = asyncio.new_event_loop()
550        self.set_event_loop(loop)
551
552        async def process_exc(exc):
553            raise exc
554
555        async def run():
556            nonlocal has_cycle
557            try:
558                raise KeyError('a')
559            except Exception as exc:
560                task = self.new_task(loop, process_exc(exc))
561                try:
562                    await task
563                except BaseException as exc:
564                    has_cycle = (exc is exc.__context__)
565                    # Prevent a hang if has_cycle is True.
566                    exc.__context__ = None
567
568        try:
569            task = self.new_task(loop, run())
570            loop.run_until_complete(task)
571        finally:
572            loop.close()
573        # This also distinguishes from the initial has_cycle=None.
574        self.assertEqual(has_cycle, False)
575
576    def test_cancel(self):
577
578        def gen():
579            when = yield
580            self.assertAlmostEqual(10.0, when)
581            yield 0
582
583        loop = self.new_test_loop(gen)
584
585        async def task():
586            await asyncio.sleep(10.0)
587            return 12
588
589        t = self.new_task(loop, task())
590        loop.call_soon(t.cancel)
591        with self.assertRaises(asyncio.CancelledError):
592            loop.run_until_complete(t)
593        self.assertTrue(t.done())
594        self.assertTrue(t.cancelled())
595        self.assertFalse(t.cancel())
596
597    def test_cancel_with_message_then_future_result(self):
598        # Test Future.result() after calling cancel() with a message.
599        cases = [
600            ((), ()),
601            ((None,), ()),
602            (('my message',), ('my message',)),
603            # Non-string values should roundtrip.
604            ((5,), (5,)),
605        ]
606        for cancel_args, expected_args in cases:
607            with self.subTest(cancel_args=cancel_args):
608                loop = asyncio.new_event_loop()
609                self.set_event_loop(loop)
610
611                async def sleep():
612                    await asyncio.sleep(10)
613
614                async def coro():
615                    task = self.new_task(loop, sleep())
616                    await asyncio.sleep(0)
617                    task.cancel(*cancel_args)
618                    done, pending = await asyncio.wait([task])
619                    task.result()
620
621                task = self.new_task(loop, coro())
622                with self.assertRaises(asyncio.CancelledError) as cm:
623                    loop.run_until_complete(task)
624                exc = cm.exception
625                self.assertEqual(exc.args, ())
626
627                actual = get_innermost_context(exc)
628                self.assertEqual(actual,
629                    (asyncio.CancelledError, expected_args, 2))
630
631    def test_cancel_with_message_then_future_exception(self):
632        # Test Future.exception() after calling cancel() with a message.
633        cases = [
634            ((), ()),
635            ((None,), ()),
636            (('my message',), ('my message',)),
637            # Non-string values should roundtrip.
638            ((5,), (5,)),
639        ]
640        for cancel_args, expected_args in cases:
641            with self.subTest(cancel_args=cancel_args):
642                loop = asyncio.new_event_loop()
643                self.set_event_loop(loop)
644
645                async def sleep():
646                    await asyncio.sleep(10)
647
648                async def coro():
649                    task = self.new_task(loop, sleep())
650                    await asyncio.sleep(0)
651                    task.cancel(*cancel_args)
652                    done, pending = await asyncio.wait([task])
653                    task.exception()
654
655                task = self.new_task(loop, coro())
656                with self.assertRaises(asyncio.CancelledError) as cm:
657                    loop.run_until_complete(task)
658                exc = cm.exception
659                self.assertEqual(exc.args, ())
660
661                actual = get_innermost_context(exc)
662                self.assertEqual(actual,
663                    (asyncio.CancelledError, expected_args, 2))
664
665    def test_cancel_with_message_before_starting_task(self):
666        loop = asyncio.new_event_loop()
667        self.set_event_loop(loop)
668
669        async def sleep():
670            await asyncio.sleep(10)
671
672        async def coro():
673            task = self.new_task(loop, sleep())
674            # We deliberately leave out the sleep here.
675            task.cancel('my message')
676            done, pending = await asyncio.wait([task])
677            task.exception()
678
679        task = self.new_task(loop, coro())
680        with self.assertRaises(asyncio.CancelledError) as cm:
681            loop.run_until_complete(task)
682        exc = cm.exception
683        self.assertEqual(exc.args, ())
684
685        actual = get_innermost_context(exc)
686        self.assertEqual(actual,
687            (asyncio.CancelledError, ('my message',), 2))
688
689    def test_cancel_yield(self):
690        with self.assertWarns(DeprecationWarning):
691            @asyncio.coroutine
692            def task():
693                yield
694                yield
695                return 12
696
697        t = self.new_task(self.loop, task())
698        test_utils.run_briefly(self.loop)  # start coro
699        t.cancel()
700        self.assertRaises(
701            asyncio.CancelledError, self.loop.run_until_complete, t)
702        self.assertTrue(t.done())
703        self.assertTrue(t.cancelled())
704        self.assertFalse(t.cancel())
705
706    def test_cancel_inner_future(self):
707        f = self.new_future(self.loop)
708
709        async def task():
710            await f
711            return 12
712
713        t = self.new_task(self.loop, task())
714        test_utils.run_briefly(self.loop)  # start task
715        f.cancel()
716        with self.assertRaises(asyncio.CancelledError):
717            self.loop.run_until_complete(t)
718        self.assertTrue(f.cancelled())
719        self.assertTrue(t.cancelled())
720
721    def test_cancel_both_task_and_inner_future(self):
722        f = self.new_future(self.loop)
723
724        async def task():
725            await f
726            return 12
727
728        t = self.new_task(self.loop, task())
729        test_utils.run_briefly(self.loop)
730
731        f.cancel()
732        t.cancel()
733
734        with self.assertRaises(asyncio.CancelledError):
735            self.loop.run_until_complete(t)
736
737        self.assertTrue(t.done())
738        self.assertTrue(f.cancelled())
739        self.assertTrue(t.cancelled())
740
741    def test_cancel_task_catching(self):
742        fut1 = self.new_future(self.loop)
743        fut2 = self.new_future(self.loop)
744
745        async def task():
746            await fut1
747            try:
748                await fut2
749            except asyncio.CancelledError:
750                return 42
751
752        t = self.new_task(self.loop, task())
753        test_utils.run_briefly(self.loop)
754        self.assertIs(t._fut_waiter, fut1)  # White-box test.
755        fut1.set_result(None)
756        test_utils.run_briefly(self.loop)
757        self.assertIs(t._fut_waiter, fut2)  # White-box test.
758        t.cancel()
759        self.assertTrue(fut2.cancelled())
760        res = self.loop.run_until_complete(t)
761        self.assertEqual(res, 42)
762        self.assertFalse(t.cancelled())
763
764    def test_cancel_task_ignoring(self):
765        fut1 = self.new_future(self.loop)
766        fut2 = self.new_future(self.loop)
767        fut3 = self.new_future(self.loop)
768
769        async def task():
770            await fut1
771            try:
772                await fut2
773            except asyncio.CancelledError:
774                pass
775            res = await fut3
776            return res
777
778        t = self.new_task(self.loop, task())
779        test_utils.run_briefly(self.loop)
780        self.assertIs(t._fut_waiter, fut1)  # White-box test.
781        fut1.set_result(None)
782        test_utils.run_briefly(self.loop)
783        self.assertIs(t._fut_waiter, fut2)  # White-box test.
784        t.cancel()
785        self.assertTrue(fut2.cancelled())
786        test_utils.run_briefly(self.loop)
787        self.assertIs(t._fut_waiter, fut3)  # White-box test.
788        fut3.set_result(42)
789        res = self.loop.run_until_complete(t)
790        self.assertEqual(res, 42)
791        self.assertFalse(fut3.cancelled())
792        self.assertFalse(t.cancelled())
793
794    def test_cancel_current_task(self):
795        loop = asyncio.new_event_loop()
796        self.set_event_loop(loop)
797
798        async def task():
799            t.cancel()
800            self.assertTrue(t._must_cancel)  # White-box test.
801            # The sleep should be cancelled immediately.
802            await asyncio.sleep(100)
803            return 12
804
805        t = self.new_task(loop, task())
806        self.assertFalse(t.cancelled())
807        self.assertRaises(
808            asyncio.CancelledError, loop.run_until_complete, t)
809        self.assertTrue(t.done())
810        self.assertTrue(t.cancelled())
811        self.assertFalse(t._must_cancel)  # White-box test.
812        self.assertFalse(t.cancel())
813
814    def test_cancel_at_end(self):
815        """coroutine end right after task is cancelled"""
816        loop = asyncio.new_event_loop()
817        self.set_event_loop(loop)
818
819        async def task():
820            t.cancel()
821            self.assertTrue(t._must_cancel)  # White-box test.
822            return 12
823
824        t = self.new_task(loop, task())
825        self.assertFalse(t.cancelled())
826        self.assertRaises(
827            asyncio.CancelledError, loop.run_until_complete, t)
828        self.assertTrue(t.done())
829        self.assertTrue(t.cancelled())
830        self.assertFalse(t._must_cancel)  # White-box test.
831        self.assertFalse(t.cancel())
832
833    def test_cancel_awaited_task(self):
834        # This tests for a relatively rare condition when
835        # a task cancellation is requested for a task which is not
836        # currently blocked, such as a task cancelling itself.
837        # In this situation we must ensure that whatever next future
838        # or task the cancelled task blocks on is cancelled correctly
839        # as well.  See also bpo-34872.
840        loop = asyncio.new_event_loop()
841        self.addCleanup(lambda: loop.close())
842
843        task = nested_task = None
844        fut = self.new_future(loop)
845
846        async def nested():
847            await fut
848
849        async def coro():
850            nonlocal nested_task
851            # Create a sub-task and wait for it to run.
852            nested_task = self.new_task(loop, nested())
853            await asyncio.sleep(0)
854
855            # Request the current task to be cancelled.
856            task.cancel()
857            # Block on the nested task, which should be immediately
858            # cancelled.
859            await nested_task
860
861        task = self.new_task(loop, coro())
862        with self.assertRaises(asyncio.CancelledError):
863            loop.run_until_complete(task)
864
865        self.assertTrue(task.cancelled())
866        self.assertTrue(nested_task.cancelled())
867        self.assertTrue(fut.cancelled())
868
869    def assert_text_contains(self, text, substr):
870        if substr not in text:
871            raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
872
873    def test_cancel_traceback_for_future_result(self):
874        # When calling Future.result() on a cancelled task, check that the
875        # line of code that was interrupted is included in the traceback.
876        loop = asyncio.new_event_loop()
877        self.set_event_loop(loop)
878
879        async def nested():
880            # This will get cancelled immediately.
881            await asyncio.sleep(10)
882
883        async def coro():
884            task = self.new_task(loop, nested())
885            await asyncio.sleep(0)
886            task.cancel()
887            await task  # search target
888
889        task = self.new_task(loop, coro())
890        try:
891            loop.run_until_complete(task)
892        except asyncio.CancelledError:
893            tb = traceback.format_exc()
894            self.assert_text_contains(tb, "await asyncio.sleep(10)")
895            # The intermediate await should also be included.
896            self.assert_text_contains(tb, "await task  # search target")
897        else:
898            self.fail('CancelledError did not occur')
899
900    def test_cancel_traceback_for_future_exception(self):
901        # When calling Future.exception() on a cancelled task, check that the
902        # line of code that was interrupted is included in the traceback.
903        loop = asyncio.new_event_loop()
904        self.set_event_loop(loop)
905
906        async def nested():
907            # This will get cancelled immediately.
908            await asyncio.sleep(10)
909
910        async def coro():
911            task = self.new_task(loop, nested())
912            await asyncio.sleep(0)
913            task.cancel()
914            done, pending = await asyncio.wait([task])
915            task.exception()  # search target
916
917        task = self.new_task(loop, coro())
918        try:
919            loop.run_until_complete(task)
920        except asyncio.CancelledError:
921            tb = traceback.format_exc()
922            self.assert_text_contains(tb, "await asyncio.sleep(10)")
923            # The intermediate await should also be included.
924            self.assert_text_contains(tb,
925                "task.exception()  # search target")
926        else:
927            self.fail('CancelledError did not occur')
928
929    def test_stop_while_run_in_complete(self):
930
931        def gen():
932            when = yield
933            self.assertAlmostEqual(0.1, when)
934            when = yield 0.1
935            self.assertAlmostEqual(0.2, when)
936            when = yield 0.1
937            self.assertAlmostEqual(0.3, when)
938            yield 0.1
939
940        loop = self.new_test_loop(gen)
941
942        x = 0
943
944        async def task():
945            nonlocal x
946            while x < 10:
947                await asyncio.sleep(0.1)
948                x += 1
949                if x == 2:
950                    loop.stop()
951
952        t = self.new_task(loop, task())
953        with self.assertRaises(RuntimeError) as cm:
954            loop.run_until_complete(t)
955        self.assertEqual(str(cm.exception),
956                         'Event loop stopped before Future completed.')
957        self.assertFalse(t.done())
958        self.assertEqual(x, 2)
959        self.assertAlmostEqual(0.3, loop.time())
960
961        t.cancel()
962        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
963
964    def test_log_traceback(self):
965        async def coro():
966            pass
967
968        task = self.new_task(self.loop, coro())
969        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
970            task._log_traceback = True
971        self.loop.run_until_complete(task)
972
973    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
974        def gen():
975            when = yield
976            self.assertAlmostEqual(0, when)
977
978        loop = self.new_test_loop(gen)
979
980        fut = self.new_future(loop)
981        fut.set_result('done')
982
983        ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
984
985        self.assertEqual(ret, 'done')
986        self.assertTrue(fut.done())
987        self.assertAlmostEqual(0, loop.time())
988
989    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
990        def gen():
991            when = yield
992            self.assertAlmostEqual(0, when)
993
994        loop = self.new_test_loop(gen)
995
996        foo_started = False
997
998        async def foo():
999            nonlocal foo_started
1000            foo_started = True
1001
1002        with self.assertRaises(asyncio.TimeoutError):
1003            loop.run_until_complete(asyncio.wait_for(foo(), 0))
1004
1005        self.assertAlmostEqual(0, loop.time())
1006        self.assertEqual(foo_started, False)
1007
1008    def test_wait_for_timeout_less_then_0_or_0(self):
1009        def gen():
1010            when = yield
1011            self.assertAlmostEqual(0.2, when)
1012            when = yield 0
1013            self.assertAlmostEqual(0, when)
1014
1015        for timeout in [0, -1]:
1016            with self.subTest(timeout=timeout):
1017                loop = self.new_test_loop(gen)
1018
1019                foo_running = None
1020
1021                async def foo():
1022                    nonlocal foo_running
1023                    foo_running = True
1024                    try:
1025                        await asyncio.sleep(0.2)
1026                    finally:
1027                        foo_running = False
1028                    return 'done'
1029
1030                fut = self.new_task(loop, foo())
1031
1032                with self.assertRaises(asyncio.TimeoutError):
1033                    loop.run_until_complete(asyncio.wait_for(fut, timeout))
1034                self.assertTrue(fut.done())
1035                # it should have been cancelled due to the timeout
1036                self.assertTrue(fut.cancelled())
1037                self.assertAlmostEqual(0, loop.time())
1038                self.assertEqual(foo_running, False)
1039
1040    def test_wait_for(self):
1041
1042        def gen():
1043            when = yield
1044            self.assertAlmostEqual(0.2, when)
1045            when = yield 0
1046            self.assertAlmostEqual(0.1, when)
1047            when = yield 0.1
1048
1049        loop = self.new_test_loop(gen)
1050
1051        foo_running = None
1052
1053        async def foo():
1054            nonlocal foo_running
1055            foo_running = True
1056            try:
1057                await asyncio.sleep(0.2)
1058            finally:
1059                foo_running = False
1060            return 'done'
1061
1062        fut = self.new_task(loop, foo())
1063
1064        with self.assertRaises(asyncio.TimeoutError):
1065            loop.run_until_complete(asyncio.wait_for(fut, 0.1))
1066        self.assertTrue(fut.done())
1067        # it should have been cancelled due to the timeout
1068        self.assertTrue(fut.cancelled())
1069        self.assertAlmostEqual(0.1, loop.time())
1070        self.assertEqual(foo_running, False)
1071
1072    def test_wait_for_blocking(self):
1073        loop = self.new_test_loop()
1074
1075        async def coro():
1076            return 'done'
1077
1078        res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
1079        self.assertEqual(res, 'done')
1080
1081    def test_wait_for_with_global_loop(self):
1082
1083        def gen():
1084            when = yield
1085            self.assertAlmostEqual(0.2, when)
1086            when = yield 0
1087            self.assertAlmostEqual(0.01, when)
1088            yield 0.01
1089
1090        loop = self.new_test_loop(gen)
1091
1092        async def foo():
1093            await asyncio.sleep(0.2)
1094            return 'done'
1095
1096        asyncio.set_event_loop(loop)
1097        try:
1098            fut = self.new_task(loop, foo())
1099            with self.assertRaises(asyncio.TimeoutError):
1100                loop.run_until_complete(asyncio.wait_for(fut, 0.01))
1101        finally:
1102            asyncio.set_event_loop(None)
1103
1104        self.assertAlmostEqual(0.01, loop.time())
1105        self.assertTrue(fut.done())
1106        self.assertTrue(fut.cancelled())
1107
1108    def test_wait_for_race_condition(self):
1109
1110        def gen():
1111            yield 0.1
1112            yield 0.1
1113            yield 0.1
1114
1115        loop = self.new_test_loop(gen)
1116
1117        fut = self.new_future(loop)
1118        task = asyncio.wait_for(fut, timeout=0.2)
1119        loop.call_later(0.1, fut.set_result, "ok")
1120        res = loop.run_until_complete(task)
1121        self.assertEqual(res, "ok")
1122
1123    def test_wait_for_cancellation_race_condition(self):
1124        def gen():
1125            yield 0.1
1126            yield 0.1
1127            yield 0.1
1128            yield 0.1
1129
1130        loop = self.new_test_loop(gen)
1131
1132        fut = self.new_future(loop)
1133        loop.call_later(0.1, fut.set_result, "ok")
1134        task = loop.create_task(asyncio.wait_for(fut, timeout=1))
1135        loop.call_later(0.1, task.cancel)
1136        res = loop.run_until_complete(task)
1137        self.assertEqual(res, "ok")
1138
1139    def test_wait_for_waits_for_task_cancellation(self):
1140        loop = asyncio.new_event_loop()
1141        self.addCleanup(loop.close)
1142
1143        task_done = False
1144
1145        async def foo():
1146            async def inner():
1147                nonlocal task_done
1148                try:
1149                    await asyncio.sleep(0.2)
1150                except asyncio.CancelledError:
1151                    await asyncio.sleep(_EPSILON)
1152                    raise
1153                finally:
1154                    task_done = True
1155
1156            inner_task = self.new_task(loop, inner())
1157
1158            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1159
1160        with self.assertRaises(asyncio.TimeoutError) as cm:
1161            loop.run_until_complete(foo())
1162
1163        self.assertTrue(task_done)
1164        chained = cm.exception.__context__
1165        self.assertEqual(type(chained), asyncio.CancelledError)
1166
1167    def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
1168        loop = asyncio.new_event_loop()
1169        self.addCleanup(loop.close)
1170
1171        task_done = False
1172
1173        async def foo():
1174            async def inner():
1175                nonlocal task_done
1176                try:
1177                    await asyncio.sleep(10)
1178                except asyncio.CancelledError:
1179                    await asyncio.sleep(_EPSILON)
1180                    raise
1181                finally:
1182                    task_done = True
1183
1184            inner_task = self.new_task(loop, inner())
1185            await asyncio.sleep(_EPSILON)
1186            await asyncio.wait_for(inner_task, timeout=0)
1187
1188        with self.assertRaises(asyncio.TimeoutError) as cm:
1189            loop.run_until_complete(foo())
1190
1191        self.assertTrue(task_done)
1192        chained = cm.exception.__context__
1193        self.assertEqual(type(chained), asyncio.CancelledError)
1194
1195    def test_wait_for_reraises_exception_during_cancellation(self):
1196        loop = asyncio.new_event_loop()
1197        self.addCleanup(loop.close)
1198
1199        class FooException(Exception):
1200            pass
1201
1202        async def foo():
1203            async def inner():
1204                try:
1205                    await asyncio.sleep(0.2)
1206                finally:
1207                    raise FooException
1208
1209            inner_task = self.new_task(loop, inner())
1210
1211            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1212
1213        with self.assertRaises(FooException):
1214            loop.run_until_complete(foo())
1215
1216    def test_wait_for_raises_timeout_error_if_returned_during_cancellation(self):
1217        loop = asyncio.new_event_loop()
1218        self.addCleanup(loop.close)
1219
1220        async def foo():
1221            async def inner():
1222                try:
1223                    await asyncio.sleep(0.2)
1224                except asyncio.CancelledError:
1225                    return 42
1226
1227            inner_task = self.new_task(loop, inner())
1228
1229            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1230
1231        with self.assertRaises(asyncio.TimeoutError):
1232            loop.run_until_complete(foo())
1233
1234    def test_wait_for_self_cancellation(self):
1235        loop = asyncio.new_event_loop()
1236        self.addCleanup(loop.close)
1237
1238        async def foo():
1239            async def inner():
1240                try:
1241                    await asyncio.sleep(0.3)
1242                except asyncio.CancelledError:
1243                    try:
1244                        await asyncio.sleep(0.3)
1245                    except asyncio.CancelledError:
1246                        await asyncio.sleep(0.3)
1247
1248                return 42
1249
1250            inner_task = self.new_task(loop, inner())
1251
1252            wait = asyncio.wait_for(inner_task, timeout=0.1)
1253
1254            # Test that wait_for itself is properly cancellable
1255            # even when the initial task holds up the initial cancellation.
1256            task = self.new_task(loop, wait)
1257            await asyncio.sleep(0.2)
1258            task.cancel()
1259
1260            with self.assertRaises(asyncio.CancelledError):
1261                await task
1262
1263            self.assertEqual(await inner_task, 42)
1264
1265        loop.run_until_complete(foo())
1266
1267    def test_wait(self):
1268
1269        def gen():
1270            when = yield
1271            self.assertAlmostEqual(0.1, when)
1272            when = yield 0
1273            self.assertAlmostEqual(0.15, when)
1274            yield 0.15
1275
1276        loop = self.new_test_loop(gen)
1277
1278        a = self.new_task(loop, asyncio.sleep(0.1))
1279        b = self.new_task(loop, asyncio.sleep(0.15))
1280
1281        async def foo():
1282            done, pending = await asyncio.wait([b, a])
1283            self.assertEqual(done, set([a, b]))
1284            self.assertEqual(pending, set())
1285            return 42
1286
1287        res = loop.run_until_complete(self.new_task(loop, foo()))
1288        self.assertEqual(res, 42)
1289        self.assertAlmostEqual(0.15, loop.time())
1290
1291        # Doing it again should take no time and exercise a different path.
1292        res = loop.run_until_complete(self.new_task(loop, foo()))
1293        self.assertAlmostEqual(0.15, loop.time())
1294        self.assertEqual(res, 42)
1295
1296    def test_wait_with_global_loop(self):
1297
1298        def gen():
1299            when = yield
1300            self.assertAlmostEqual(0.01, when)
1301            when = yield 0
1302            self.assertAlmostEqual(0.015, when)
1303            yield 0.015
1304
1305        loop = self.new_test_loop(gen)
1306
1307        a = self.new_task(loop, asyncio.sleep(0.01))
1308        b = self.new_task(loop, asyncio.sleep(0.015))
1309
1310        async def foo():
1311            done, pending = await asyncio.wait([b, a])
1312            self.assertEqual(done, set([a, b]))
1313            self.assertEqual(pending, set())
1314            return 42
1315
1316        asyncio.set_event_loop(loop)
1317        res = loop.run_until_complete(
1318            self.new_task(loop, foo()))
1319
1320        self.assertEqual(res, 42)
1321
1322    def test_wait_duplicate_coroutines(self):
1323
1324        with self.assertWarns(DeprecationWarning):
1325            @asyncio.coroutine
1326            def coro(s):
1327                return s
1328        c = coro('test')
1329        task = self.new_task(
1330            self.loop,
1331            asyncio.wait([c, c, coro('spam')]))
1332
1333        with self.assertWarns(DeprecationWarning):
1334            done, pending = self.loop.run_until_complete(task)
1335
1336        self.assertFalse(pending)
1337        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1338
1339    def test_wait_errors(self):
1340        self.assertRaises(
1341            ValueError, self.loop.run_until_complete,
1342            asyncio.wait(set()))
1343
1344        # -1 is an invalid return_when value
1345        sleep_coro = asyncio.sleep(10.0)
1346        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1347        self.assertRaises(ValueError,
1348                          self.loop.run_until_complete, wait_coro)
1349
1350        sleep_coro.close()
1351
1352    def test_wait_first_completed(self):
1353
1354        def gen():
1355            when = yield
1356            self.assertAlmostEqual(10.0, when)
1357            when = yield 0
1358            self.assertAlmostEqual(0.1, when)
1359            yield 0.1
1360
1361        loop = self.new_test_loop(gen)
1362
1363        a = self.new_task(loop, asyncio.sleep(10.0))
1364        b = self.new_task(loop, asyncio.sleep(0.1))
1365        task = self.new_task(
1366            loop,
1367            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1368
1369        done, pending = loop.run_until_complete(task)
1370        self.assertEqual({b}, done)
1371        self.assertEqual({a}, pending)
1372        self.assertFalse(a.done())
1373        self.assertTrue(b.done())
1374        self.assertIsNone(b.result())
1375        self.assertAlmostEqual(0.1, loop.time())
1376
1377        # move forward to close generator
1378        loop.advance_time(10)
1379        loop.run_until_complete(asyncio.wait([a, b]))
1380
1381    def test_wait_really_done(self):
1382        # there is possibility that some tasks in the pending list
1383        # became done but their callbacks haven't all been called yet
1384
1385        async def coro1():
1386            await asyncio.sleep(0)
1387
1388        async def coro2():
1389            await asyncio.sleep(0)
1390            await asyncio.sleep(0)
1391
1392        a = self.new_task(self.loop, coro1())
1393        b = self.new_task(self.loop, coro2())
1394        task = self.new_task(
1395            self.loop,
1396            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1397
1398        done, pending = self.loop.run_until_complete(task)
1399        self.assertEqual({a, b}, done)
1400        self.assertTrue(a.done())
1401        self.assertIsNone(a.result())
1402        self.assertTrue(b.done())
1403        self.assertIsNone(b.result())
1404
1405    def test_wait_first_exception(self):
1406
1407        def gen():
1408            when = yield
1409            self.assertAlmostEqual(10.0, when)
1410            yield 0
1411
1412        loop = self.new_test_loop(gen)
1413
1414        # first_exception, task already has exception
1415        a = self.new_task(loop, asyncio.sleep(10.0))
1416
1417        async def exc():
1418            raise ZeroDivisionError('err')
1419
1420        b = self.new_task(loop, exc())
1421        task = self.new_task(
1422            loop,
1423            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1424
1425        done, pending = loop.run_until_complete(task)
1426        self.assertEqual({b}, done)
1427        self.assertEqual({a}, pending)
1428        self.assertAlmostEqual(0, loop.time())
1429
1430        # move forward to close generator
1431        loop.advance_time(10)
1432        loop.run_until_complete(asyncio.wait([a, b]))
1433
1434    def test_wait_first_exception_in_wait(self):
1435
1436        def gen():
1437            when = yield
1438            self.assertAlmostEqual(10.0, when)
1439            when = yield 0
1440            self.assertAlmostEqual(0.01, when)
1441            yield 0.01
1442
1443        loop = self.new_test_loop(gen)
1444
1445        # first_exception, exception during waiting
1446        a = self.new_task(loop, asyncio.sleep(10.0))
1447
1448        async def exc():
1449            await asyncio.sleep(0.01)
1450            raise ZeroDivisionError('err')
1451
1452        b = self.new_task(loop, exc())
1453        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1454
1455        done, pending = loop.run_until_complete(task)
1456        self.assertEqual({b}, done)
1457        self.assertEqual({a}, pending)
1458        self.assertAlmostEqual(0.01, loop.time())
1459
1460        # move forward to close generator
1461        loop.advance_time(10)
1462        loop.run_until_complete(asyncio.wait([a, b]))
1463
1464    def test_wait_with_exception(self):
1465
1466        def gen():
1467            when = yield
1468            self.assertAlmostEqual(0.1, when)
1469            when = yield 0
1470            self.assertAlmostEqual(0.15, when)
1471            yield 0.15
1472
1473        loop = self.new_test_loop(gen)
1474
1475        a = self.new_task(loop, asyncio.sleep(0.1))
1476
1477        async def sleeper():
1478            await asyncio.sleep(0.15)
1479            raise ZeroDivisionError('really')
1480
1481        b = self.new_task(loop, sleeper())
1482
1483        async def foo():
1484            done, pending = await asyncio.wait([b, a])
1485            self.assertEqual(len(done), 2)
1486            self.assertEqual(pending, set())
1487            errors = set(f for f in done if f.exception() is not None)
1488            self.assertEqual(len(errors), 1)
1489
1490        loop.run_until_complete(self.new_task(loop, foo()))
1491        self.assertAlmostEqual(0.15, loop.time())
1492
1493        loop.run_until_complete(self.new_task(loop, foo()))
1494        self.assertAlmostEqual(0.15, loop.time())
1495
1496    def test_wait_with_timeout(self):
1497
1498        def gen():
1499            when = yield
1500            self.assertAlmostEqual(0.1, when)
1501            when = yield 0
1502            self.assertAlmostEqual(0.15, when)
1503            when = yield 0
1504            self.assertAlmostEqual(0.11, when)
1505            yield 0.11
1506
1507        loop = self.new_test_loop(gen)
1508
1509        a = self.new_task(loop, asyncio.sleep(0.1))
1510        b = self.new_task(loop, asyncio.sleep(0.15))
1511
1512        async def foo():
1513            done, pending = await asyncio.wait([b, a], timeout=0.11)
1514            self.assertEqual(done, set([a]))
1515            self.assertEqual(pending, set([b]))
1516
1517        loop.run_until_complete(self.new_task(loop, foo()))
1518        self.assertAlmostEqual(0.11, loop.time())
1519
1520        # move forward to close generator
1521        loop.advance_time(10)
1522        loop.run_until_complete(asyncio.wait([a, b]))
1523
1524    def test_wait_concurrent_complete(self):
1525
1526        def gen():
1527            when = yield
1528            self.assertAlmostEqual(0.1, when)
1529            when = yield 0
1530            self.assertAlmostEqual(0.15, when)
1531            when = yield 0
1532            self.assertAlmostEqual(0.1, when)
1533            yield 0.1
1534
1535        loop = self.new_test_loop(gen)
1536
1537        a = self.new_task(loop, asyncio.sleep(0.1))
1538        b = self.new_task(loop, asyncio.sleep(0.15))
1539
1540        done, pending = loop.run_until_complete(
1541            asyncio.wait([b, a], timeout=0.1))
1542
1543        self.assertEqual(done, set([a]))
1544        self.assertEqual(pending, set([b]))
1545        self.assertAlmostEqual(0.1, loop.time())
1546
1547        # move forward to close generator
1548        loop.advance_time(10)
1549        loop.run_until_complete(asyncio.wait([a, b]))
1550
1551    def test_wait_with_iterator_of_tasks(self):
1552
1553        def gen():
1554            when = yield
1555            self.assertAlmostEqual(0.1, when)
1556            when = yield 0
1557            self.assertAlmostEqual(0.15, when)
1558            yield 0.15
1559
1560        loop = self.new_test_loop(gen)
1561
1562        a = self.new_task(loop, asyncio.sleep(0.1))
1563        b = self.new_task(loop, asyncio.sleep(0.15))
1564
1565        async def foo():
1566            done, pending = await asyncio.wait(iter([b, a]))
1567            self.assertEqual(done, set([a, b]))
1568            self.assertEqual(pending, set())
1569            return 42
1570
1571        res = loop.run_until_complete(self.new_task(loop, foo()))
1572        self.assertEqual(res, 42)
1573        self.assertAlmostEqual(0.15, loop.time())
1574
1575    def test_as_completed(self):
1576
1577        def gen():
1578            yield 0
1579            yield 0
1580            yield 0.01
1581            yield 0
1582
1583        loop = self.new_test_loop(gen)
1584        # disable "slow callback" warning
1585        loop.slow_callback_duration = 1.0
1586        completed = set()
1587        time_shifted = False
1588
1589        with self.assertWarns(DeprecationWarning):
1590            @asyncio.coroutine
1591            def sleeper(dt, x):
1592                nonlocal time_shifted
1593                yield from  asyncio.sleep(dt)
1594                completed.add(x)
1595                if not time_shifted and 'a' in completed and 'b' in completed:
1596                    time_shifted = True
1597                    loop.advance_time(0.14)
1598                return x
1599
1600        a = sleeper(0.01, 'a')
1601        b = sleeper(0.01, 'b')
1602        c = sleeper(0.15, 'c')
1603
1604        async def foo():
1605            values = []
1606            for f in asyncio.as_completed([b, c, a], loop=loop):
1607                values.append(await f)
1608            return values
1609        with self.assertWarns(DeprecationWarning):
1610            res = loop.run_until_complete(self.new_task(loop, foo()))
1611        self.assertAlmostEqual(0.15, loop.time())
1612        self.assertTrue('a' in res[:2])
1613        self.assertTrue('b' in res[:2])
1614        self.assertEqual(res[2], 'c')
1615
1616        # Doing it again should take no time and exercise a different path.
1617        with self.assertWarns(DeprecationWarning):
1618            res = loop.run_until_complete(self.new_task(loop, foo()))
1619        self.assertAlmostEqual(0.15, loop.time())
1620
1621    def test_as_completed_with_timeout(self):
1622
1623        def gen():
1624            yield
1625            yield 0
1626            yield 0
1627            yield 0.1
1628
1629        loop = self.new_test_loop(gen)
1630
1631        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1632        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1633
1634        async def foo():
1635            values = []
1636            for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
1637                if values:
1638                    loop.advance_time(0.02)
1639                try:
1640                    v = await f
1641                    values.append((1, v))
1642                except asyncio.TimeoutError as exc:
1643                    values.append((2, exc))
1644            return values
1645
1646        with self.assertWarns(DeprecationWarning):
1647            res = loop.run_until_complete(self.new_task(loop, foo()))
1648        self.assertEqual(len(res), 2, res)
1649        self.assertEqual(res[0], (1, 'a'))
1650        self.assertEqual(res[1][0], 2)
1651        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1652        self.assertAlmostEqual(0.12, loop.time())
1653
1654        # move forward to close generator
1655        loop.advance_time(10)
1656        loop.run_until_complete(asyncio.wait([a, b]))
1657
1658    def test_as_completed_with_unused_timeout(self):
1659
1660        def gen():
1661            yield
1662            yield 0
1663            yield 0.01
1664
1665        loop = self.new_test_loop(gen)
1666
1667        a = asyncio.sleep(0.01, 'a')
1668
1669        async def foo():
1670            for f in asyncio.as_completed([a], timeout=1, loop=loop):
1671                v = await f
1672                self.assertEqual(v, 'a')
1673
1674        with self.assertWarns(DeprecationWarning):
1675            loop.run_until_complete(self.new_task(loop, foo()))
1676
1677    def test_as_completed_reverse_wait(self):
1678
1679        def gen():
1680            yield 0
1681            yield 0.05
1682            yield 0
1683
1684        loop = self.new_test_loop(gen)
1685
1686        a = asyncio.sleep(0.05, 'a')
1687        b = asyncio.sleep(0.10, 'b')
1688        fs = {a, b}
1689
1690        with self.assertWarns(DeprecationWarning):
1691            futs = list(asyncio.as_completed(fs, loop=loop))
1692        self.assertEqual(len(futs), 2)
1693
1694        x = loop.run_until_complete(futs[1])
1695        self.assertEqual(x, 'a')
1696        self.assertAlmostEqual(0.05, loop.time())
1697        loop.advance_time(0.05)
1698        y = loop.run_until_complete(futs[0])
1699        self.assertEqual(y, 'b')
1700        self.assertAlmostEqual(0.10, loop.time())
1701
1702    def test_as_completed_concurrent(self):
1703
1704        def gen():
1705            when = yield
1706            self.assertAlmostEqual(0.05, when)
1707            when = yield 0
1708            self.assertAlmostEqual(0.05, when)
1709            yield 0.05
1710
1711        loop = self.new_test_loop(gen)
1712
1713        a = asyncio.sleep(0.05, 'a')
1714        b = asyncio.sleep(0.05, 'b')
1715        fs = {a, b}
1716        with self.assertWarns(DeprecationWarning):
1717            futs = list(asyncio.as_completed(fs, loop=loop))
1718        self.assertEqual(len(futs), 2)
1719        waiter = asyncio.wait(futs)
1720        # Deprecation from passing coros in futs to asyncio.wait()
1721        with self.assertWarns(DeprecationWarning):
1722            done, pending = loop.run_until_complete(waiter)
1723        self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1724
1725    def test_as_completed_duplicate_coroutines(self):
1726
1727        with self.assertWarns(DeprecationWarning):
1728            @asyncio.coroutine
1729            def coro(s):
1730                return s
1731
1732        with self.assertWarns(DeprecationWarning):
1733            @asyncio.coroutine
1734            def runner():
1735                result = []
1736                c = coro('ham')
1737                for f in asyncio.as_completed([c, c, coro('spam')],
1738                                              loop=self.loop):
1739                    result.append((yield from f))
1740                return result
1741
1742        with self.assertWarns(DeprecationWarning):
1743            fut = self.new_task(self.loop, runner())
1744            self.loop.run_until_complete(fut)
1745        result = fut.result()
1746        self.assertEqual(set(result), {'ham', 'spam'})
1747        self.assertEqual(len(result), 2)
1748
1749    def test_sleep(self):
1750
1751        def gen():
1752            when = yield
1753            self.assertAlmostEqual(0.05, when)
1754            when = yield 0.05
1755            self.assertAlmostEqual(0.1, when)
1756            yield 0.05
1757
1758        loop = self.new_test_loop(gen)
1759
1760        async def sleeper(dt, arg):
1761            await asyncio.sleep(dt/2)
1762            res = await asyncio.sleep(dt/2, arg)
1763            return res
1764
1765        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1766        loop.run_until_complete(t)
1767        self.assertTrue(t.done())
1768        self.assertEqual(t.result(), 'yeah')
1769        self.assertAlmostEqual(0.1, loop.time())
1770
1771    def test_sleep_cancel(self):
1772
1773        def gen():
1774            when = yield
1775            self.assertAlmostEqual(10.0, when)
1776            yield 0
1777
1778        loop = self.new_test_loop(gen)
1779
1780        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1781
1782        handle = None
1783        orig_call_later = loop.call_later
1784
1785        def call_later(delay, callback, *args):
1786            nonlocal handle
1787            handle = orig_call_later(delay, callback, *args)
1788            return handle
1789
1790        loop.call_later = call_later
1791        test_utils.run_briefly(loop)
1792
1793        self.assertFalse(handle._cancelled)
1794
1795        t.cancel()
1796        test_utils.run_briefly(loop)
1797        self.assertTrue(handle._cancelled)
1798
1799    def test_task_cancel_sleeping_task(self):
1800
1801        def gen():
1802            when = yield
1803            self.assertAlmostEqual(0.1, when)
1804            when = yield 0
1805            self.assertAlmostEqual(5000, when)
1806            yield 0.1
1807
1808        loop = self.new_test_loop(gen)
1809
1810        async def sleep(dt):
1811            await asyncio.sleep(dt)
1812
1813        async def doit():
1814            sleeper = self.new_task(loop, sleep(5000))
1815            loop.call_later(0.1, sleeper.cancel)
1816            try:
1817                await sleeper
1818            except asyncio.CancelledError:
1819                return 'cancelled'
1820            else:
1821                return 'slept in'
1822
1823        doer = doit()
1824        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1825        self.assertAlmostEqual(0.1, loop.time())
1826
1827    def test_task_cancel_waiter_future(self):
1828        fut = self.new_future(self.loop)
1829
1830        async def coro():
1831            await fut
1832
1833        task = self.new_task(self.loop, coro())
1834        test_utils.run_briefly(self.loop)
1835        self.assertIs(task._fut_waiter, fut)
1836
1837        task.cancel()
1838        test_utils.run_briefly(self.loop)
1839        self.assertRaises(
1840            asyncio.CancelledError, self.loop.run_until_complete, task)
1841        self.assertIsNone(task._fut_waiter)
1842        self.assertTrue(fut.cancelled())
1843
1844    def test_task_set_methods(self):
1845        async def notmuch():
1846            return 'ko'
1847
1848        gen = notmuch()
1849        task = self.new_task(self.loop, gen)
1850
1851        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1852            task.set_result('ok')
1853
1854        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1855            task.set_exception(ValueError())
1856
1857        self.assertEqual(
1858            self.loop.run_until_complete(task),
1859            'ko')
1860
1861    def test_step_result(self):
1862        with self.assertWarns(DeprecationWarning):
1863            @asyncio.coroutine
1864            def notmuch():
1865                yield None
1866                yield 1
1867                return 'ko'
1868
1869        self.assertRaises(
1870            RuntimeError, self.loop.run_until_complete, notmuch())
1871
1872    def test_step_result_future(self):
1873        # If coroutine returns future, task waits on this future.
1874
1875        class Fut(asyncio.Future):
1876            def __init__(self, *args, **kwds):
1877                self.cb_added = False
1878                super().__init__(*args, **kwds)
1879
1880            def add_done_callback(self, *args, **kwargs):
1881                self.cb_added = True
1882                super().add_done_callback(*args, **kwargs)
1883
1884        fut = Fut(loop=self.loop)
1885        result = None
1886
1887        async def wait_for_future():
1888            nonlocal result
1889            result = await fut
1890
1891        t = self.new_task(self.loop, wait_for_future())
1892        test_utils.run_briefly(self.loop)
1893        self.assertTrue(fut.cb_added)
1894
1895        res = object()
1896        fut.set_result(res)
1897        test_utils.run_briefly(self.loop)
1898        self.assertIs(res, result)
1899        self.assertTrue(t.done())
1900        self.assertIsNone(t.result())
1901
1902    def test_baseexception_during_cancel(self):
1903
1904        def gen():
1905            when = yield
1906            self.assertAlmostEqual(10.0, when)
1907            yield 0
1908
1909        loop = self.new_test_loop(gen)
1910
1911        async def sleeper():
1912            await asyncio.sleep(10)
1913
1914        base_exc = SystemExit()
1915
1916        async def notmutch():
1917            try:
1918                await sleeper()
1919            except asyncio.CancelledError:
1920                raise base_exc
1921
1922        task = self.new_task(loop, notmutch())
1923        test_utils.run_briefly(loop)
1924
1925        task.cancel()
1926        self.assertFalse(task.done())
1927
1928        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1929
1930        self.assertTrue(task.done())
1931        self.assertFalse(task.cancelled())
1932        self.assertIs(task.exception(), base_exc)
1933
1934    def test_iscoroutinefunction(self):
1935        def fn():
1936            pass
1937
1938        self.assertFalse(asyncio.iscoroutinefunction(fn))
1939
1940        def fn1():
1941            yield
1942        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1943
1944        with self.assertWarns(DeprecationWarning):
1945            @asyncio.coroutine
1946            def fn2():
1947                yield
1948        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1949
1950        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1951
1952    def test_yield_vs_yield_from(self):
1953        fut = self.new_future(self.loop)
1954
1955        with self.assertWarns(DeprecationWarning):
1956            @asyncio.coroutine
1957            def wait_for_future():
1958                yield fut
1959
1960        task = wait_for_future()
1961        with self.assertRaises(RuntimeError):
1962            self.loop.run_until_complete(task)
1963
1964        self.assertFalse(fut.done())
1965
1966    def test_yield_vs_yield_from_generator(self):
1967        with self.assertWarns(DeprecationWarning):
1968            @asyncio.coroutine
1969            def coro():
1970                yield
1971
1972        with self.assertWarns(DeprecationWarning):
1973            @asyncio.coroutine
1974            def wait_for_future():
1975                gen = coro()
1976                try:
1977                    yield gen
1978                finally:
1979                    gen.close()
1980
1981        task = wait_for_future()
1982        self.assertRaises(
1983            RuntimeError,
1984            self.loop.run_until_complete, task)
1985
1986    def test_coroutine_non_gen_function(self):
1987        with self.assertWarns(DeprecationWarning):
1988            @asyncio.coroutine
1989            def func():
1990                return 'test'
1991
1992        self.assertTrue(asyncio.iscoroutinefunction(func))
1993
1994        coro = func()
1995        self.assertTrue(asyncio.iscoroutine(coro))
1996
1997        res = self.loop.run_until_complete(coro)
1998        self.assertEqual(res, 'test')
1999
2000    def test_coroutine_non_gen_function_return_future(self):
2001        fut = self.new_future(self.loop)
2002
2003        with self.assertWarns(DeprecationWarning):
2004            @asyncio.coroutine
2005            def func():
2006                return fut
2007
2008        async def coro():
2009            fut.set_result('test')
2010
2011        t1 = self.new_task(self.loop, func())
2012        t2 = self.new_task(self.loop, coro())
2013        res = self.loop.run_until_complete(t1)
2014        self.assertEqual(res, 'test')
2015        self.assertIsNone(t2.result())
2016
2017    def test_current_task(self):
2018        self.assertIsNone(asyncio.current_task(loop=self.loop))
2019
2020        async def coro(loop):
2021            self.assertIs(asyncio.current_task(loop=loop), task)
2022
2023            self.assertIs(asyncio.current_task(None), task)
2024            self.assertIs(asyncio.current_task(), task)
2025
2026        task = self.new_task(self.loop, coro(self.loop))
2027        self.loop.run_until_complete(task)
2028        self.assertIsNone(asyncio.current_task(loop=self.loop))
2029
2030    def test_current_task_with_interleaving_tasks(self):
2031        self.assertIsNone(asyncio.current_task(loop=self.loop))
2032
2033        fut1 = self.new_future(self.loop)
2034        fut2 = self.new_future(self.loop)
2035
2036        async def coro1(loop):
2037            self.assertTrue(asyncio.current_task(loop=loop) is task1)
2038            await fut1
2039            self.assertTrue(asyncio.current_task(loop=loop) is task1)
2040            fut2.set_result(True)
2041
2042        async def coro2(loop):
2043            self.assertTrue(asyncio.current_task(loop=loop) is task2)
2044            fut1.set_result(True)
2045            await fut2
2046            self.assertTrue(asyncio.current_task(loop=loop) is task2)
2047
2048        task1 = self.new_task(self.loop, coro1(self.loop))
2049        task2 = self.new_task(self.loop, coro2(self.loop))
2050
2051        self.loop.run_until_complete(asyncio.wait((task1, task2)))
2052        self.assertIsNone(asyncio.current_task(loop=self.loop))
2053
2054    # Some thorough tests for cancellation propagation through
2055    # coroutines, tasks and wait().
2056
2057    def test_yield_future_passes_cancel(self):
2058        # Cancelling outer() cancels inner() cancels waiter.
2059        proof = 0
2060        waiter = self.new_future(self.loop)
2061
2062        async def inner():
2063            nonlocal proof
2064            try:
2065                await waiter
2066            except asyncio.CancelledError:
2067                proof += 1
2068                raise
2069            else:
2070                self.fail('got past sleep() in inner()')
2071
2072        async def outer():
2073            nonlocal proof
2074            try:
2075                await inner()
2076            except asyncio.CancelledError:
2077                proof += 100  # Expect this path.
2078            else:
2079                proof += 10
2080
2081        f = asyncio.ensure_future(outer(), loop=self.loop)
2082        test_utils.run_briefly(self.loop)
2083        f.cancel()
2084        self.loop.run_until_complete(f)
2085        self.assertEqual(proof, 101)
2086        self.assertTrue(waiter.cancelled())
2087
2088    def test_yield_wait_does_not_shield_cancel(self):
2089        # Cancelling outer() makes wait() return early, leaves inner()
2090        # running.
2091        proof = 0
2092        waiter = self.new_future(self.loop)
2093
2094        async def inner():
2095            nonlocal proof
2096            await waiter
2097            proof += 1
2098
2099        async def outer():
2100            nonlocal proof
2101            with self.assertWarns(DeprecationWarning):
2102                d, p = await asyncio.wait([inner()])
2103            proof += 100
2104
2105        f = asyncio.ensure_future(outer(), loop=self.loop)
2106        test_utils.run_briefly(self.loop)
2107        f.cancel()
2108        self.assertRaises(
2109            asyncio.CancelledError, self.loop.run_until_complete, f)
2110        waiter.set_result(None)
2111        test_utils.run_briefly(self.loop)
2112        self.assertEqual(proof, 1)
2113
2114    def test_shield_result(self):
2115        inner = self.new_future(self.loop)
2116        outer = asyncio.shield(inner)
2117        inner.set_result(42)
2118        res = self.loop.run_until_complete(outer)
2119        self.assertEqual(res, 42)
2120
2121    def test_shield_exception(self):
2122        inner = self.new_future(self.loop)
2123        outer = asyncio.shield(inner)
2124        test_utils.run_briefly(self.loop)
2125        exc = RuntimeError('expected')
2126        inner.set_exception(exc)
2127        test_utils.run_briefly(self.loop)
2128        self.assertIs(outer.exception(), exc)
2129
2130    def test_shield_cancel_inner(self):
2131        inner = self.new_future(self.loop)
2132        outer = asyncio.shield(inner)
2133        test_utils.run_briefly(self.loop)
2134        inner.cancel()
2135        test_utils.run_briefly(self.loop)
2136        self.assertTrue(outer.cancelled())
2137
2138    def test_shield_cancel_outer(self):
2139        inner = self.new_future(self.loop)
2140        outer = asyncio.shield(inner)
2141        test_utils.run_briefly(self.loop)
2142        outer.cancel()
2143        test_utils.run_briefly(self.loop)
2144        self.assertTrue(outer.cancelled())
2145        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
2146
2147    def test_shield_shortcut(self):
2148        fut = self.new_future(self.loop)
2149        fut.set_result(42)
2150        res = self.loop.run_until_complete(asyncio.shield(fut))
2151        self.assertEqual(res, 42)
2152
2153    def test_shield_effect(self):
2154        # Cancelling outer() does not affect inner().
2155        proof = 0
2156        waiter = self.new_future(self.loop)
2157
2158        async def inner():
2159            nonlocal proof
2160            await waiter
2161            proof += 1
2162
2163        async def outer():
2164            nonlocal proof
2165            await asyncio.shield(inner())
2166            proof += 100
2167
2168        f = asyncio.ensure_future(outer(), loop=self.loop)
2169        test_utils.run_briefly(self.loop)
2170        f.cancel()
2171        with self.assertRaises(asyncio.CancelledError):
2172            self.loop.run_until_complete(f)
2173        waiter.set_result(None)
2174        test_utils.run_briefly(self.loop)
2175        self.assertEqual(proof, 1)
2176
2177    def test_shield_gather(self):
2178        child1 = self.new_future(self.loop)
2179        child2 = self.new_future(self.loop)
2180        parent = asyncio.gather(child1, child2)
2181        outer = asyncio.shield(parent)
2182        test_utils.run_briefly(self.loop)
2183        outer.cancel()
2184        test_utils.run_briefly(self.loop)
2185        self.assertTrue(outer.cancelled())
2186        child1.set_result(1)
2187        child2.set_result(2)
2188        test_utils.run_briefly(self.loop)
2189        self.assertEqual(parent.result(), [1, 2])
2190
2191    def test_gather_shield(self):
2192        child1 = self.new_future(self.loop)
2193        child2 = self.new_future(self.loop)
2194        inner1 = asyncio.shield(child1)
2195        inner2 = asyncio.shield(child2)
2196        parent = asyncio.gather(inner1, inner2)
2197        test_utils.run_briefly(self.loop)
2198        parent.cancel()
2199        # This should cancel inner1 and inner2 but bot child1 and child2.
2200        test_utils.run_briefly(self.loop)
2201        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
2202        self.assertTrue(inner1.cancelled())
2203        self.assertTrue(inner2.cancelled())
2204        child1.set_result(1)
2205        child2.set_result(2)
2206        test_utils.run_briefly(self.loop)
2207
2208    def test_as_completed_invalid_args(self):
2209        fut = self.new_future(self.loop)
2210
2211        # as_completed() expects a list of futures, not a future instance
2212        self.assertRaises(TypeError, self.loop.run_until_complete,
2213            asyncio.as_completed(fut, loop=self.loop))
2214        coro = coroutine_function()
2215        self.assertRaises(TypeError, self.loop.run_until_complete,
2216            asyncio.as_completed(coro, loop=self.loop))
2217        coro.close()
2218
2219    def test_wait_invalid_args(self):
2220        fut = self.new_future(self.loop)
2221
2222        # wait() expects a list of futures, not a future instance
2223        self.assertRaises(TypeError, self.loop.run_until_complete,
2224            asyncio.wait(fut))
2225        coro = coroutine_function()
2226        self.assertRaises(TypeError, self.loop.run_until_complete,
2227            asyncio.wait(coro))
2228        coro.close()
2229
2230        # wait() expects at least a future
2231        self.assertRaises(ValueError, self.loop.run_until_complete,
2232            asyncio.wait([]))
2233
2234    def test_corowrapper_mocks_generator(self):
2235
2236        def check():
2237            # A function that asserts various things.
2238            # Called twice, with different debug flag values.
2239
2240            with self.assertWarns(DeprecationWarning):
2241                @asyncio.coroutine
2242                def coro():
2243                    # The actual coroutine.
2244                    self.assertTrue(gen.gi_running)
2245                    yield from fut
2246
2247            # A completed Future used to run the coroutine.
2248            fut = self.new_future(self.loop)
2249            fut.set_result(None)
2250
2251            # Call the coroutine.
2252            gen = coro()
2253
2254            # Check some properties.
2255            self.assertTrue(asyncio.iscoroutine(gen))
2256            self.assertIsInstance(gen.gi_frame, types.FrameType)
2257            self.assertFalse(gen.gi_running)
2258            self.assertIsInstance(gen.gi_code, types.CodeType)
2259
2260            # Run it.
2261            self.loop.run_until_complete(gen)
2262
2263            # The frame should have changed.
2264            self.assertIsNone(gen.gi_frame)
2265
2266        # Test with debug flag cleared.
2267        with set_coroutine_debug(False):
2268            check()
2269
2270        # Test with debug flag set.
2271        with set_coroutine_debug(True):
2272            check()
2273
2274    def test_yield_from_corowrapper(self):
2275        with set_coroutine_debug(True):
2276            with self.assertWarns(DeprecationWarning):
2277                @asyncio.coroutine
2278                def t1():
2279                    return (yield from t2())
2280
2281            with self.assertWarns(DeprecationWarning):
2282                @asyncio.coroutine
2283                def t2():
2284                    f = self.new_future(self.loop)
2285                    self.new_task(self.loop, t3(f))
2286                    return (yield from f)
2287
2288            with self.assertWarns(DeprecationWarning):
2289                @asyncio.coroutine
2290                def t3(f):
2291                    f.set_result((1, 2, 3))
2292
2293            task = self.new_task(self.loop, t1())
2294            val = self.loop.run_until_complete(task)
2295            self.assertEqual(val, (1, 2, 3))
2296
2297    def test_yield_from_corowrapper_send(self):
2298        def foo():
2299            a = yield
2300            return a
2301
2302        def call(arg):
2303            cw = asyncio.coroutines.CoroWrapper(foo())
2304            cw.send(None)
2305            try:
2306                cw.send(arg)
2307            except StopIteration as ex:
2308                return ex.args[0]
2309            else:
2310                raise AssertionError('StopIteration was expected')
2311
2312        self.assertEqual(call((1, 2)), (1, 2))
2313        self.assertEqual(call('spam'), 'spam')
2314
2315    def test_corowrapper_weakref(self):
2316        wd = weakref.WeakValueDictionary()
2317        def foo(): yield from []
2318        cw = asyncio.coroutines.CoroWrapper(foo())
2319        wd['cw'] = cw  # Would fail without __weakref__ slot.
2320        cw.gen = None  # Suppress warning from __del__.
2321
2322    def test_corowrapper_throw(self):
2323        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
2324        def foo():
2325            value = None
2326            while True:
2327                try:
2328                    value = yield value
2329                except Exception as e:
2330                    value = e
2331
2332        exception = Exception("foo")
2333        cw = asyncio.coroutines.CoroWrapper(foo())
2334        cw.send(None)
2335        self.assertIs(exception, cw.throw(exception))
2336
2337        cw = asyncio.coroutines.CoroWrapper(foo())
2338        cw.send(None)
2339        self.assertIs(exception, cw.throw(Exception, exception))
2340
2341        cw = asyncio.coroutines.CoroWrapper(foo())
2342        cw.send(None)
2343        exception = cw.throw(Exception, "foo")
2344        self.assertIsInstance(exception, Exception)
2345        self.assertEqual(exception.args, ("foo", ))
2346
2347        cw = asyncio.coroutines.CoroWrapper(foo())
2348        cw.send(None)
2349        exception = cw.throw(Exception, "foo", None)
2350        self.assertIsInstance(exception, Exception)
2351        self.assertEqual(exception.args, ("foo", ))
2352
2353    def test_log_destroyed_pending_task(self):
2354        Task = self.__class__.Task
2355
2356        with self.assertWarns(DeprecationWarning):
2357            @asyncio.coroutine
2358            def kill_me(loop):
2359                future = self.new_future(loop)
2360                yield from future
2361                # at this point, the only reference to kill_me() task is
2362                # the Task._wakeup() method in future._callbacks
2363                raise Exception("code never reached")
2364
2365        mock_handler = mock.Mock()
2366        self.loop.set_debug(True)
2367        self.loop.set_exception_handler(mock_handler)
2368
2369        # schedule the task
2370        coro = kill_me(self.loop)
2371        task = asyncio.ensure_future(coro, loop=self.loop)
2372
2373        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2374
2375        asyncio.set_event_loop(None)
2376
2377        # execute the task so it waits for future
2378        self.loop._run_once()
2379        self.assertEqual(len(self.loop._ready), 0)
2380
2381        # remove the future used in kill_me(), and references to the task
2382        del coro.gi_frame.f_locals['future']
2383        coro = None
2384        source_traceback = task._source_traceback
2385        task = None
2386
2387        # no more reference to kill_me() task: the task is destroyed by the GC
2388        support.gc_collect()
2389
2390        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2391
2392        mock_handler.assert_called_with(self.loop, {
2393            'message': 'Task was destroyed but it is pending!',
2394            'task': mock.ANY,
2395            'source_traceback': source_traceback,
2396        })
2397        mock_handler.reset_mock()
2398
2399    @mock.patch('asyncio.base_events.logger')
2400    def test_tb_logger_not_called_after_cancel(self, m_log):
2401        loop = asyncio.new_event_loop()
2402        self.set_event_loop(loop)
2403
2404        async def coro():
2405            raise TypeError
2406
2407        async def runner():
2408            task = self.new_task(loop, coro())
2409            await asyncio.sleep(0.05)
2410            task.cancel()
2411            task = None
2412
2413        loop.run_until_complete(runner())
2414        self.assertFalse(m_log.error.called)
2415
2416    @mock.patch('asyncio.coroutines.logger')
2417    def test_coroutine_never_yielded(self, m_log):
2418        with set_coroutine_debug(True):
2419            with self.assertWarns(DeprecationWarning):
2420                @asyncio.coroutine
2421                def coro_noop():
2422                    pass
2423
2424        tb_filename = __file__
2425        tb_lineno = sys._getframe().f_lineno + 2
2426        # create a coroutine object but don't use it
2427        coro_noop()
2428        support.gc_collect()
2429
2430        self.assertTrue(m_log.error.called)
2431        message = m_log.error.call_args[0][0]
2432        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2433
2434        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2435                    r'was never yielded from\n'
2436                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2437                 r'.*\n'
2438                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2439                 r'    coro_noop\(\)$'
2440                 % (re.escape(coro_noop.__qualname__),
2441                    re.escape(func_filename), func_lineno,
2442                    re.escape(tb_filename), tb_lineno))
2443
2444        self.assertRegex(message, re.compile(regex, re.DOTALL))
2445
2446    def test_return_coroutine_from_coroutine(self):
2447        """Return of @asyncio.coroutine()-wrapped function generator object
2448        from @asyncio.coroutine()-wrapped function should have same effect as
2449        returning generator object or Future."""
2450        def check():
2451            with self.assertWarns(DeprecationWarning):
2452                @asyncio.coroutine
2453                def outer_coro():
2454                    with self.assertWarns(DeprecationWarning):
2455                        @asyncio.coroutine
2456                        def inner_coro():
2457                            return 1
2458
2459                    return inner_coro()
2460
2461            result = self.loop.run_until_complete(outer_coro())
2462            self.assertEqual(result, 1)
2463
2464        # Test with debug flag cleared.
2465        with set_coroutine_debug(False):
2466            check()
2467
2468        # Test with debug flag set.
2469        with set_coroutine_debug(True):
2470            check()
2471
2472    def test_task_source_traceback(self):
2473        self.loop.set_debug(True)
2474
2475        task = self.new_task(self.loop, coroutine_function())
2476        lineno = sys._getframe().f_lineno - 1
2477        self.assertIsInstance(task._source_traceback, list)
2478        self.assertEqual(task._source_traceback[-2][:3],
2479                         (__file__,
2480                          lineno,
2481                          'test_task_source_traceback'))
2482        self.loop.run_until_complete(task)
2483
2484    def _test_cancel_wait_for(self, timeout):
2485        loop = asyncio.new_event_loop()
2486        self.addCleanup(loop.close)
2487
2488        async def blocking_coroutine():
2489            fut = self.new_future(loop)
2490            # Block: fut result is never set
2491            await fut
2492
2493        task = loop.create_task(blocking_coroutine())
2494
2495        wait = loop.create_task(asyncio.wait_for(task, timeout))
2496        loop.call_soon(wait.cancel)
2497
2498        self.assertRaises(asyncio.CancelledError,
2499                          loop.run_until_complete, wait)
2500
2501        # Python issue #23219: cancelling the wait must also cancel the task
2502        self.assertTrue(task.cancelled())
2503
2504    def test_cancel_blocking_wait_for(self):
2505        self._test_cancel_wait_for(None)
2506
2507    def test_cancel_wait_for(self):
2508        self._test_cancel_wait_for(60.0)
2509
2510    def test_cancel_gather_1(self):
2511        """Ensure that a gathering future refuses to be cancelled once all
2512        children are done"""
2513        loop = asyncio.new_event_loop()
2514        self.addCleanup(loop.close)
2515
2516        fut = self.new_future(loop)
2517        # The indirection fut->child_coro is needed since otherwise the
2518        # gathering task is done at the same time as the child future
2519        def child_coro():
2520            return (yield from fut)
2521        gather_future = asyncio.gather(child_coro(), loop=loop)
2522        gather_task = asyncio.ensure_future(gather_future, loop=loop)
2523
2524        cancel_result = None
2525        def cancelling_callback(_):
2526            nonlocal cancel_result
2527            cancel_result = gather_task.cancel()
2528        fut.add_done_callback(cancelling_callback)
2529
2530        fut.set_result(42) # calls the cancelling_callback after fut is done()
2531
2532        # At this point the task should complete.
2533        loop.run_until_complete(gather_task)
2534
2535        # Python issue #26923: asyncio.gather drops cancellation
2536        self.assertEqual(cancel_result, False)
2537        self.assertFalse(gather_task.cancelled())
2538        self.assertEqual(gather_task.result(), [42])
2539
2540    def test_cancel_gather_2(self):
2541        cases = [
2542            ((), ()),
2543            ((None,), ()),
2544            (('my message',), ('my message',)),
2545            # Non-string values should roundtrip.
2546            ((5,), (5,)),
2547        ]
2548        for cancel_args, expected_args in cases:
2549            with self.subTest(cancel_args=cancel_args):
2550                loop = asyncio.new_event_loop()
2551                self.addCleanup(loop.close)
2552
2553                async def test():
2554                    time = 0
2555                    while True:
2556                        time += 0.05
2557                        await asyncio.gather(asyncio.sleep(0.05),
2558                                             return_exceptions=True,
2559                                             loop=loop)
2560                        if time > 1:
2561                            return
2562
2563                async def main():
2564                    qwe = self.new_task(loop, test())
2565                    await asyncio.sleep(0.2)
2566                    qwe.cancel(*cancel_args)
2567                    await qwe
2568
2569                try:
2570                    loop.run_until_complete(main())
2571                except asyncio.CancelledError as exc:
2572                    self.assertEqual(exc.args, ())
2573                    exc_type, exc_args, depth = get_innermost_context(exc)
2574                    self.assertEqual((exc_type, exc_args),
2575                        (asyncio.CancelledError, expected_args))
2576                    # The exact traceback seems to vary in CI.
2577                    self.assertIn(depth, (2, 3))
2578                else:
2579                    self.fail('gather did not propagate the cancellation '
2580                              'request')
2581
2582    def test_exception_traceback(self):
2583        # See http://bugs.python.org/issue28843
2584
2585        async def foo():
2586            1 / 0
2587
2588        async def main():
2589            task = self.new_task(self.loop, foo())
2590            await asyncio.sleep(0)  # skip one loop iteration
2591            self.assertIsNotNone(task.exception().__traceback__)
2592
2593        self.loop.run_until_complete(main())
2594
2595    @mock.patch('asyncio.base_events.logger')
2596    def test_error_in_call_soon(self, m_log):
2597        def call_soon(callback, *args, **kwargs):
2598            raise ValueError
2599        self.loop.call_soon = call_soon
2600
2601        with self.assertWarns(DeprecationWarning):
2602            @asyncio.coroutine
2603            def coro():
2604                pass
2605
2606        self.assertFalse(m_log.error.called)
2607
2608        with self.assertRaises(ValueError):
2609            gen = coro()
2610            try:
2611                self.new_task(self.loop, gen)
2612            finally:
2613                gen.close()
2614
2615        self.assertTrue(m_log.error.called)
2616        message = m_log.error.call_args[0][0]
2617        self.assertIn('Task was destroyed but it is pending', message)
2618
2619        self.assertEqual(asyncio.all_tasks(self.loop), set())
2620
2621    def test_create_task_with_noncoroutine(self):
2622        with self.assertRaisesRegex(TypeError,
2623                                    "a coroutine was expected, got 123"):
2624            self.new_task(self.loop, 123)
2625
2626        # test it for the second time to ensure that caching
2627        # in asyncio.iscoroutine() doesn't break things.
2628        with self.assertRaisesRegex(TypeError,
2629                                    "a coroutine was expected, got 123"):
2630            self.new_task(self.loop, 123)
2631
2632    def test_create_task_with_oldstyle_coroutine(self):
2633
2634        with self.assertWarns(DeprecationWarning):
2635            @asyncio.coroutine
2636            def coro():
2637                pass
2638
2639        task = self.new_task(self.loop, coro())
2640        self.assertIsInstance(task, self.Task)
2641        self.loop.run_until_complete(task)
2642
2643        # test it for the second time to ensure that caching
2644        # in asyncio.iscoroutine() doesn't break things.
2645        task = self.new_task(self.loop, coro())
2646        self.assertIsInstance(task, self.Task)
2647        self.loop.run_until_complete(task)
2648
2649    def test_create_task_with_async_function(self):
2650
2651        async def coro():
2652            pass
2653
2654        task = self.new_task(self.loop, coro())
2655        self.assertIsInstance(task, self.Task)
2656        self.loop.run_until_complete(task)
2657
2658        # test it for the second time to ensure that caching
2659        # in asyncio.iscoroutine() doesn't break things.
2660        task = self.new_task(self.loop, coro())
2661        self.assertIsInstance(task, self.Task)
2662        self.loop.run_until_complete(task)
2663
2664    def test_create_task_with_asynclike_function(self):
2665        task = self.new_task(self.loop, CoroLikeObject())
2666        self.assertIsInstance(task, self.Task)
2667        self.assertEqual(self.loop.run_until_complete(task), 42)
2668
2669        # test it for the second time to ensure that caching
2670        # in asyncio.iscoroutine() doesn't break things.
2671        task = self.new_task(self.loop, CoroLikeObject())
2672        self.assertIsInstance(task, self.Task)
2673        self.assertEqual(self.loop.run_until_complete(task), 42)
2674
2675    def test_bare_create_task(self):
2676
2677        async def inner():
2678            return 1
2679
2680        async def coro():
2681            task = asyncio.create_task(inner())
2682            self.assertIsInstance(task, self.Task)
2683            ret = await task
2684            self.assertEqual(1, ret)
2685
2686        self.loop.run_until_complete(coro())
2687
2688    def test_bare_create_named_task(self):
2689
2690        async def coro_noop():
2691            pass
2692
2693        async def coro():
2694            task = asyncio.create_task(coro_noop(), name='No-op')
2695            self.assertEqual(task.get_name(), 'No-op')
2696            await task
2697
2698        self.loop.run_until_complete(coro())
2699
2700    def test_context_1(self):
2701        cvar = contextvars.ContextVar('cvar', default='nope')
2702
2703        async def sub():
2704            await asyncio.sleep(0.01)
2705            self.assertEqual(cvar.get(), 'nope')
2706            cvar.set('something else')
2707
2708        async def main():
2709            self.assertEqual(cvar.get(), 'nope')
2710            subtask = self.new_task(loop, sub())
2711            cvar.set('yes')
2712            self.assertEqual(cvar.get(), 'yes')
2713            await subtask
2714            self.assertEqual(cvar.get(), 'yes')
2715
2716        loop = asyncio.new_event_loop()
2717        try:
2718            task = self.new_task(loop, main())
2719            loop.run_until_complete(task)
2720        finally:
2721            loop.close()
2722
2723    def test_context_2(self):
2724        cvar = contextvars.ContextVar('cvar', default='nope')
2725
2726        async def main():
2727            def fut_on_done(fut):
2728                # This change must not pollute the context
2729                # of the "main()" task.
2730                cvar.set('something else')
2731
2732            self.assertEqual(cvar.get(), 'nope')
2733
2734            for j in range(2):
2735                fut = self.new_future(loop)
2736                fut.add_done_callback(fut_on_done)
2737                cvar.set(f'yes{j}')
2738                loop.call_soon(fut.set_result, None)
2739                await fut
2740                self.assertEqual(cvar.get(), f'yes{j}')
2741
2742                for i in range(3):
2743                    # Test that task passed its context to add_done_callback:
2744                    cvar.set(f'yes{i}-{j}')
2745                    await asyncio.sleep(0.001)
2746                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2747
2748        loop = asyncio.new_event_loop()
2749        try:
2750            task = self.new_task(loop, main())
2751            loop.run_until_complete(task)
2752        finally:
2753            loop.close()
2754
2755        self.assertEqual(cvar.get(), 'nope')
2756
2757    def test_context_3(self):
2758        # Run 100 Tasks in parallel, each modifying cvar.
2759
2760        cvar = contextvars.ContextVar('cvar', default=-1)
2761
2762        async def sub(num):
2763            for i in range(10):
2764                cvar.set(num + i)
2765                await asyncio.sleep(random.uniform(0.001, 0.05))
2766                self.assertEqual(cvar.get(), num + i)
2767
2768        async def main():
2769            tasks = []
2770            for i in range(100):
2771                task = loop.create_task(sub(random.randint(0, 10)))
2772                tasks.append(task)
2773
2774            await asyncio.gather(*tasks, loop=loop)
2775
2776        loop = asyncio.new_event_loop()
2777        try:
2778            loop.run_until_complete(main())
2779        finally:
2780            loop.close()
2781
2782        self.assertEqual(cvar.get(), -1)
2783
2784    def test_get_coro(self):
2785        loop = asyncio.new_event_loop()
2786        coro = coroutine_function()
2787        try:
2788            task = self.new_task(loop, coro)
2789            loop.run_until_complete(task)
2790            self.assertIs(task.get_coro(), coro)
2791        finally:
2792            loop.close()
2793
2794
2795def add_subclass_tests(cls):
2796    BaseTask = cls.Task
2797    BaseFuture = cls.Future
2798
2799    if BaseTask is None or BaseFuture is None:
2800        return cls
2801
2802    class CommonFuture:
2803        def __init__(self, *args, **kwargs):
2804            self.calls = collections.defaultdict(lambda: 0)
2805            super().__init__(*args, **kwargs)
2806
2807        def add_done_callback(self, *args, **kwargs):
2808            self.calls['add_done_callback'] += 1
2809            return super().add_done_callback(*args, **kwargs)
2810
2811    class Task(CommonFuture, BaseTask):
2812        pass
2813
2814    class Future(CommonFuture, BaseFuture):
2815        pass
2816
2817    def test_subclasses_ctask_cfuture(self):
2818        fut = self.Future(loop=self.loop)
2819
2820        async def func():
2821            self.loop.call_soon(lambda: fut.set_result('spam'))
2822            return await fut
2823
2824        task = self.Task(func(), loop=self.loop)
2825
2826        result = self.loop.run_until_complete(task)
2827
2828        self.assertEqual(result, 'spam')
2829
2830        self.assertEqual(
2831            dict(task.calls),
2832            {'add_done_callback': 1})
2833
2834        self.assertEqual(
2835            dict(fut.calls),
2836            {'add_done_callback': 1})
2837
2838    # Add patched Task & Future back to the test case
2839    cls.Task = Task
2840    cls.Future = Future
2841
2842    # Add an extra unit-test
2843    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2844
2845    # Disable the "test_task_source_traceback" test
2846    # (the test is hardcoded for a particular call stack, which
2847    # is slightly different for Task subclasses)
2848    cls.test_task_source_traceback = None
2849
2850    return cls
2851
2852
2853class SetMethodsTest:
2854
2855    def test_set_result_causes_invalid_state(self):
2856        Future = type(self).Future
2857        self.loop.call_exception_handler = exc_handler = mock.Mock()
2858
2859        async def foo():
2860            await asyncio.sleep(0.1)
2861            return 10
2862
2863        coro = foo()
2864        task = self.new_task(self.loop, coro)
2865        Future.set_result(task, 'spam')
2866
2867        self.assertEqual(
2868            self.loop.run_until_complete(task),
2869            'spam')
2870
2871        exc_handler.assert_called_once()
2872        exc = exc_handler.call_args[0][0]['exception']
2873        with self.assertRaisesRegex(asyncio.InvalidStateError,
2874                                    r'step\(\): already done'):
2875            raise exc
2876
2877        coro.close()
2878
2879    def test_set_exception_causes_invalid_state(self):
2880        class MyExc(Exception):
2881            pass
2882
2883        Future = type(self).Future
2884        self.loop.call_exception_handler = exc_handler = mock.Mock()
2885
2886        async def foo():
2887            await asyncio.sleep(0.1)
2888            return 10
2889
2890        coro = foo()
2891        task = self.new_task(self.loop, coro)
2892        Future.set_exception(task, MyExc())
2893
2894        with self.assertRaises(MyExc):
2895            self.loop.run_until_complete(task)
2896
2897        exc_handler.assert_called_once()
2898        exc = exc_handler.call_args[0][0]['exception']
2899        with self.assertRaisesRegex(asyncio.InvalidStateError,
2900                                    r'step\(\): already done'):
2901            raise exc
2902
2903        coro.close()
2904
2905
2906@unittest.skipUnless(hasattr(futures, '_CFuture') and
2907                     hasattr(tasks, '_CTask'),
2908                     'requires the C _asyncio module')
2909class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2910                          test_utils.TestCase):
2911
2912    Task = getattr(tasks, '_CTask', None)
2913    Future = getattr(futures, '_CFuture', None)
2914
2915    @support.refcount_test
2916    def test_refleaks_in_task___init__(self):
2917        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2918        async def coro():
2919            pass
2920        task = self.new_task(self.loop, coro())
2921        self.loop.run_until_complete(task)
2922        refs_before = gettotalrefcount()
2923        for i in range(100):
2924            task.__init__(coro(), loop=self.loop)
2925            self.loop.run_until_complete(task)
2926        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2927
2928    def test_del__log_destroy_pending_segfault(self):
2929        async def coro():
2930            pass
2931        task = self.new_task(self.loop, coro())
2932        self.loop.run_until_complete(task)
2933        with self.assertRaises(AttributeError):
2934            del task._log_destroy_pending
2935
2936
2937@unittest.skipUnless(hasattr(futures, '_CFuture') and
2938                     hasattr(tasks, '_CTask'),
2939                     'requires the C _asyncio module')
2940@add_subclass_tests
2941class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2942
2943    Task = getattr(tasks, '_CTask', None)
2944    Future = getattr(futures, '_CFuture', None)
2945
2946
2947@unittest.skipUnless(hasattr(tasks, '_CTask'),
2948                     'requires the C _asyncio module')
2949@add_subclass_tests
2950class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2951
2952    Task = getattr(tasks, '_CTask', None)
2953    Future = futures._PyFuture
2954
2955
2956@unittest.skipUnless(hasattr(futures, '_CFuture'),
2957                     'requires the C _asyncio module')
2958@add_subclass_tests
2959class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2960
2961    Future = getattr(futures, '_CFuture', None)
2962    Task = tasks._PyTask
2963
2964
2965@unittest.skipUnless(hasattr(tasks, '_CTask'),
2966                     'requires the C _asyncio module')
2967class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2968
2969    Task = getattr(tasks, '_CTask', None)
2970    Future = futures._PyFuture
2971
2972
2973@unittest.skipUnless(hasattr(futures, '_CFuture'),
2974                     'requires the C _asyncio module')
2975class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2976
2977    Task = tasks._PyTask
2978    Future = getattr(futures, '_CFuture', None)
2979
2980
2981class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2982                            test_utils.TestCase):
2983
2984    Task = tasks._PyTask
2985    Future = futures._PyFuture
2986
2987
2988@add_subclass_tests
2989class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2990    Task = tasks._PyTask
2991    Future = futures._PyFuture
2992
2993
2994@unittest.skipUnless(hasattr(tasks, '_CTask'),
2995                     'requires the C _asyncio module')
2996class CTask_Future_Tests(test_utils.TestCase):
2997
2998    def test_foobar(self):
2999        class Fut(asyncio.Future):
3000            @property
3001            def get_loop(self):
3002                raise AttributeError
3003
3004        async def coro():
3005            await fut
3006            return 'spam'
3007
3008        self.loop = asyncio.new_event_loop()
3009        try:
3010            fut = Fut(loop=self.loop)
3011            self.loop.call_later(0.1, fut.set_result, 1)
3012            task = self.loop.create_task(coro())
3013            res = self.loop.run_until_complete(task)
3014        finally:
3015            self.loop.close()
3016
3017        self.assertEqual(res, 'spam')
3018
3019
3020class BaseTaskIntrospectionTests:
3021    _register_task = None
3022    _unregister_task = None
3023    _enter_task = None
3024    _leave_task = None
3025
3026    def test__register_task_1(self):
3027        class TaskLike:
3028            @property
3029            def _loop(self):
3030                return loop
3031
3032            def done(self):
3033                return False
3034
3035        task = TaskLike()
3036        loop = mock.Mock()
3037
3038        self.assertEqual(asyncio.all_tasks(loop), set())
3039        self._register_task(task)
3040        self.assertEqual(asyncio.all_tasks(loop), {task})
3041        self._unregister_task(task)
3042
3043    def test__register_task_2(self):
3044        class TaskLike:
3045            def get_loop(self):
3046                return loop
3047
3048            def done(self):
3049                return False
3050
3051        task = TaskLike()
3052        loop = mock.Mock()
3053
3054        self.assertEqual(asyncio.all_tasks(loop), set())
3055        self._register_task(task)
3056        self.assertEqual(asyncio.all_tasks(loop), {task})
3057        self._unregister_task(task)
3058
3059    def test__register_task_3(self):
3060        class TaskLike:
3061            def get_loop(self):
3062                return loop
3063
3064            def done(self):
3065                return True
3066
3067        task = TaskLike()
3068        loop = mock.Mock()
3069
3070        self.assertEqual(asyncio.all_tasks(loop), set())
3071        self._register_task(task)
3072        self.assertEqual(asyncio.all_tasks(loop), set())
3073        self._unregister_task(task)
3074
3075    def test__enter_task(self):
3076        task = mock.Mock()
3077        loop = mock.Mock()
3078        self.assertIsNone(asyncio.current_task(loop))
3079        self._enter_task(loop, task)
3080        self.assertIs(asyncio.current_task(loop), task)
3081        self._leave_task(loop, task)
3082
3083    def test__enter_task_failure(self):
3084        task1 = mock.Mock()
3085        task2 = mock.Mock()
3086        loop = mock.Mock()
3087        self._enter_task(loop, task1)
3088        with self.assertRaises(RuntimeError):
3089            self._enter_task(loop, task2)
3090        self.assertIs(asyncio.current_task(loop), task1)
3091        self._leave_task(loop, task1)
3092
3093    def test__leave_task(self):
3094        task = mock.Mock()
3095        loop = mock.Mock()
3096        self._enter_task(loop, task)
3097        self._leave_task(loop, task)
3098        self.assertIsNone(asyncio.current_task(loop))
3099
3100    def test__leave_task_failure1(self):
3101        task1 = mock.Mock()
3102        task2 = mock.Mock()
3103        loop = mock.Mock()
3104        self._enter_task(loop, task1)
3105        with self.assertRaises(RuntimeError):
3106            self._leave_task(loop, task2)
3107        self.assertIs(asyncio.current_task(loop), task1)
3108        self._leave_task(loop, task1)
3109
3110    def test__leave_task_failure2(self):
3111        task = mock.Mock()
3112        loop = mock.Mock()
3113        with self.assertRaises(RuntimeError):
3114            self._leave_task(loop, task)
3115        self.assertIsNone(asyncio.current_task(loop))
3116
3117    def test__unregister_task(self):
3118        task = mock.Mock()
3119        loop = mock.Mock()
3120        task.get_loop = lambda: loop
3121        self._register_task(task)
3122        self._unregister_task(task)
3123        self.assertEqual(asyncio.all_tasks(loop), set())
3124
3125    def test__unregister_task_not_registered(self):
3126        task = mock.Mock()
3127        loop = mock.Mock()
3128        self._unregister_task(task)
3129        self.assertEqual(asyncio.all_tasks(loop), set())
3130
3131
3132class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3133    _register_task = staticmethod(tasks._py_register_task)
3134    _unregister_task = staticmethod(tasks._py_unregister_task)
3135    _enter_task = staticmethod(tasks._py_enter_task)
3136    _leave_task = staticmethod(tasks._py_leave_task)
3137
3138
3139@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
3140                     'requires the C _asyncio module')
3141class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3142    if hasattr(tasks, '_c_register_task'):
3143        _register_task = staticmethod(tasks._c_register_task)
3144        _unregister_task = staticmethod(tasks._c_unregister_task)
3145        _enter_task = staticmethod(tasks._c_enter_task)
3146        _leave_task = staticmethod(tasks._c_leave_task)
3147    else:
3148        _register_task = _unregister_task = _enter_task = _leave_task = None
3149
3150
3151class BaseCurrentLoopTests:
3152
3153    def setUp(self):
3154        super().setUp()
3155        self.loop = asyncio.new_event_loop()
3156        self.set_event_loop(self.loop)
3157
3158    def new_task(self, coro):
3159        raise NotImplementedError
3160
3161    def test_current_task_no_running_loop(self):
3162        self.assertIsNone(asyncio.current_task(loop=self.loop))
3163
3164    def test_current_task_no_running_loop_implicit(self):
3165        with self.assertRaises(RuntimeError):
3166            asyncio.current_task()
3167
3168    def test_current_task_with_implicit_loop(self):
3169        async def coro():
3170            self.assertIs(asyncio.current_task(loop=self.loop), task)
3171
3172            self.assertIs(asyncio.current_task(None), task)
3173            self.assertIs(asyncio.current_task(), task)
3174
3175        task = self.new_task(coro())
3176        self.loop.run_until_complete(task)
3177        self.assertIsNone(asyncio.current_task(loop=self.loop))
3178
3179
3180class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3181
3182    def new_task(self, coro):
3183        return tasks._PyTask(coro, loop=self.loop)
3184
3185
3186@unittest.skipUnless(hasattr(tasks, '_CTask'),
3187                     'requires the C _asyncio module')
3188class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3189
3190    def new_task(self, coro):
3191        return getattr(tasks, '_CTask')(coro, loop=self.loop)
3192
3193
3194class GenericTaskTests(test_utils.TestCase):
3195
3196    def test_future_subclass(self):
3197        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
3198
3199    def test_asyncio_module_compiled(self):
3200        # Because of circular imports it's easy to make _asyncio
3201        # module non-importable.  This is a simple test that will
3202        # fail on systems where C modules were successfully compiled
3203        # (hence the test for _functools), but _asyncio somehow didn't.
3204        try:
3205            import _functools
3206        except ImportError:
3207            pass
3208        else:
3209            try:
3210                import _asyncio
3211            except ImportError:
3212                self.fail('_asyncio module is missing')
3213
3214
3215class GatherTestsBase:
3216
3217    def setUp(self):
3218        super().setUp()
3219        self.one_loop = self.new_test_loop()
3220        self.other_loop = self.new_test_loop()
3221        self.set_event_loop(self.one_loop, cleanup=False)
3222
3223    def _run_loop(self, loop):
3224        while loop._ready:
3225            test_utils.run_briefly(loop)
3226
3227    def _check_success(self, **kwargs):
3228        a, b, c = [self.one_loop.create_future() for i in range(3)]
3229        fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
3230        cb = test_utils.MockCallback()
3231        fut.add_done_callback(cb)
3232        b.set_result(1)
3233        a.set_result(2)
3234        self._run_loop(self.one_loop)
3235        self.assertEqual(cb.called, False)
3236        self.assertFalse(fut.done())
3237        c.set_result(3)
3238        self._run_loop(self.one_loop)
3239        cb.assert_called_once_with(fut)
3240        self.assertEqual(fut.result(), [2, 1, 3])
3241
3242    def test_success(self):
3243        self._check_success()
3244        self._check_success(return_exceptions=False)
3245
3246    def test_result_exception_success(self):
3247        self._check_success(return_exceptions=True)
3248
3249    def test_one_exception(self):
3250        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3251        fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
3252        cb = test_utils.MockCallback()
3253        fut.add_done_callback(cb)
3254        exc = ZeroDivisionError()
3255        a.set_result(1)
3256        b.set_exception(exc)
3257        self._run_loop(self.one_loop)
3258        self.assertTrue(fut.done())
3259        cb.assert_called_once_with(fut)
3260        self.assertIs(fut.exception(), exc)
3261        # Does nothing
3262        c.set_result(3)
3263        d.cancel()
3264        e.set_exception(RuntimeError())
3265        e.exception()
3266
3267    def test_return_exceptions(self):
3268        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
3269        fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
3270                             return_exceptions=True)
3271        cb = test_utils.MockCallback()
3272        fut.add_done_callback(cb)
3273        exc = ZeroDivisionError()
3274        exc2 = RuntimeError()
3275        b.set_result(1)
3276        c.set_exception(exc)
3277        a.set_result(3)
3278        self._run_loop(self.one_loop)
3279        self.assertFalse(fut.done())
3280        d.set_exception(exc2)
3281        self._run_loop(self.one_loop)
3282        self.assertTrue(fut.done())
3283        cb.assert_called_once_with(fut)
3284        self.assertEqual(fut.result(), [3, 1, exc, exc2])
3285
3286    def test_env_var_debug(self):
3287        code = '\n'.join((
3288            'import asyncio.coroutines',
3289            'print(asyncio.coroutines._DEBUG)'))
3290
3291        # Test with -E to not fail if the unit test was run with
3292        # PYTHONASYNCIODEBUG set to a non-empty string
3293        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
3294        self.assertEqual(stdout.rstrip(), b'False')
3295
3296        sts, stdout, stderr = assert_python_ok('-c', code,
3297                                               PYTHONASYNCIODEBUG='',
3298                                               PYTHONDEVMODE='')
3299        self.assertEqual(stdout.rstrip(), b'False')
3300
3301        sts, stdout, stderr = assert_python_ok('-c', code,
3302                                               PYTHONASYNCIODEBUG='1',
3303                                               PYTHONDEVMODE='')
3304        self.assertEqual(stdout.rstrip(), b'True')
3305
3306        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3307                                               PYTHONASYNCIODEBUG='1',
3308                                               PYTHONDEVMODE='')
3309        self.assertEqual(stdout.rstrip(), b'False')
3310
3311        # -X dev
3312        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3313                                               '-c', code)
3314        self.assertEqual(stdout.rstrip(), b'True')
3315
3316
3317class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
3318
3319    def wrap_futures(self, *futures):
3320        return futures
3321
3322    def _check_empty_sequence(self, seq_or_iter):
3323        asyncio.set_event_loop(self.one_loop)
3324        self.addCleanup(asyncio.set_event_loop, None)
3325        fut = asyncio.gather(*seq_or_iter)
3326        self.assertIsInstance(fut, asyncio.Future)
3327        self.assertIs(fut._loop, self.one_loop)
3328        self._run_loop(self.one_loop)
3329        self.assertTrue(fut.done())
3330        self.assertEqual(fut.result(), [])
3331        with self.assertWarns(DeprecationWarning):
3332            fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
3333        self.assertIs(fut._loop, self.other_loop)
3334
3335    def test_constructor_empty_sequence(self):
3336        self._check_empty_sequence([])
3337        self._check_empty_sequence(())
3338        self._check_empty_sequence(set())
3339        self._check_empty_sequence(iter(""))
3340
3341    def test_constructor_heterogenous_futures(self):
3342        fut1 = self.one_loop.create_future()
3343        fut2 = self.other_loop.create_future()
3344        with self.assertRaises(ValueError):
3345            asyncio.gather(fut1, fut2)
3346        with self.assertRaises(ValueError):
3347            asyncio.gather(fut1, loop=self.other_loop)
3348
3349    def test_constructor_homogenous_futures(self):
3350        children = [self.other_loop.create_future() for i in range(3)]
3351        fut = asyncio.gather(*children)
3352        self.assertIs(fut._loop, self.other_loop)
3353        self._run_loop(self.other_loop)
3354        self.assertFalse(fut.done())
3355        fut = asyncio.gather(*children, loop=self.other_loop)
3356        self.assertIs(fut._loop, self.other_loop)
3357        self._run_loop(self.other_loop)
3358        self.assertFalse(fut.done())
3359
3360    def test_one_cancellation(self):
3361        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3362        fut = asyncio.gather(a, b, c, d, e)
3363        cb = test_utils.MockCallback()
3364        fut.add_done_callback(cb)
3365        a.set_result(1)
3366        b.cancel()
3367        self._run_loop(self.one_loop)
3368        self.assertTrue(fut.done())
3369        cb.assert_called_once_with(fut)
3370        self.assertFalse(fut.cancelled())
3371        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3372        # Does nothing
3373        c.set_result(3)
3374        d.cancel()
3375        e.set_exception(RuntimeError())
3376        e.exception()
3377
3378    def test_result_exception_one_cancellation(self):
3379        a, b, c, d, e, f = [self.one_loop.create_future()
3380                            for i in range(6)]
3381        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3382        cb = test_utils.MockCallback()
3383        fut.add_done_callback(cb)
3384        a.set_result(1)
3385        zde = ZeroDivisionError()
3386        b.set_exception(zde)
3387        c.cancel()
3388        self._run_loop(self.one_loop)
3389        self.assertFalse(fut.done())
3390        d.set_result(3)
3391        e.cancel()
3392        rte = RuntimeError()
3393        f.set_exception(rte)
3394        res = self.one_loop.run_until_complete(fut)
3395        self.assertIsInstance(res[2], asyncio.CancelledError)
3396        self.assertIsInstance(res[4], asyncio.CancelledError)
3397        res[2] = res[4] = None
3398        self.assertEqual(res, [1, zde, None, 3, None, rte])
3399        cb.assert_called_once_with(fut)
3400
3401
3402class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3403
3404    def setUp(self):
3405        super().setUp()
3406        asyncio.set_event_loop(self.one_loop)
3407
3408    def wrap_futures(self, *futures):
3409        coros = []
3410        for fut in futures:
3411            async def coro(fut=fut):
3412                return await fut
3413            coros.append(coro())
3414        return coros
3415
3416    def test_constructor_loop_selection(self):
3417        async def coro():
3418            return 'abc'
3419        gen1 = coro()
3420        gen2 = coro()
3421        fut = asyncio.gather(gen1, gen2)
3422        self.assertIs(fut._loop, self.one_loop)
3423        self.one_loop.run_until_complete(fut)
3424
3425        self.set_event_loop(self.other_loop, cleanup=False)
3426        gen3 = coro()
3427        gen4 = coro()
3428        fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
3429        self.assertIs(fut2._loop, self.other_loop)
3430        self.other_loop.run_until_complete(fut2)
3431
3432    def test_duplicate_coroutines(self):
3433        with self.assertWarns(DeprecationWarning):
3434            @asyncio.coroutine
3435            def coro(s):
3436                return s
3437        c = coro('abc')
3438        fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
3439        self._run_loop(self.one_loop)
3440        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3441
3442    def test_cancellation_broadcast(self):
3443        # Cancelling outer() cancels all children.
3444        proof = 0
3445        waiter = self.one_loop.create_future()
3446
3447        async def inner():
3448            nonlocal proof
3449            await waiter
3450            proof += 1
3451
3452        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3453        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3454        gatherer = None
3455
3456        async def outer():
3457            nonlocal proof, gatherer
3458            gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
3459            await gatherer
3460            proof += 100
3461
3462        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3463        test_utils.run_briefly(self.one_loop)
3464        self.assertTrue(f.cancel())
3465        with self.assertRaises(asyncio.CancelledError):
3466            self.one_loop.run_until_complete(f)
3467        self.assertFalse(gatherer.cancel())
3468        self.assertTrue(waiter.cancelled())
3469        self.assertTrue(child1.cancelled())
3470        self.assertTrue(child2.cancelled())
3471        test_utils.run_briefly(self.one_loop)
3472        self.assertEqual(proof, 0)
3473
3474    def test_exception_marking(self):
3475        # Test for the first line marked "Mark exception retrieved."
3476
3477        async def inner(f):
3478            await f
3479            raise RuntimeError('should not be ignored')
3480
3481        a = self.one_loop.create_future()
3482        b = self.one_loop.create_future()
3483
3484        async def outer():
3485            await asyncio.gather(inner(a), inner(b), loop=self.one_loop)
3486
3487        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3488        test_utils.run_briefly(self.one_loop)
3489        a.set_result(None)
3490        test_utils.run_briefly(self.one_loop)
3491        b.set_result(None)
3492        test_utils.run_briefly(self.one_loop)
3493        self.assertIsInstance(f.exception(), RuntimeError)
3494
3495
3496class RunCoroutineThreadsafeTests(test_utils.TestCase):
3497    """Test case for asyncio.run_coroutine_threadsafe."""
3498
3499    def setUp(self):
3500        super().setUp()
3501        self.loop = asyncio.new_event_loop()
3502        self.set_event_loop(self.loop) # Will cleanup properly
3503
3504    async def add(self, a, b, fail=False, cancel=False):
3505        """Wait 0.05 second and return a + b."""
3506        await asyncio.sleep(0.05)
3507        if fail:
3508            raise RuntimeError("Fail!")
3509        if cancel:
3510            asyncio.current_task(self.loop).cancel()
3511            await asyncio.sleep(0)
3512        return a + b
3513
3514    def target(self, fail=False, cancel=False, timeout=None,
3515               advance_coro=False):
3516        """Run add coroutine in the event loop."""
3517        coro = self.add(1, 2, fail=fail, cancel=cancel)
3518        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3519        if advance_coro:
3520            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3521            # otherwise it spills errors and breaks **other** unittests, since
3522            # 'target' is interacting with threads.
3523
3524            # With this call, `coro` will be advanced, so that
3525            # CoroWrapper.__del__ won't do anything when asyncio tests run
3526            # in debug mode.
3527            self.loop.call_soon_threadsafe(coro.send, None)
3528        try:
3529            return future.result(timeout)
3530        finally:
3531            future.done() or future.cancel()
3532
3533    def test_run_coroutine_threadsafe(self):
3534        """Test coroutine submission from a thread to an event loop."""
3535        future = self.loop.run_in_executor(None, self.target)
3536        result = self.loop.run_until_complete(future)
3537        self.assertEqual(result, 3)
3538
3539    def test_run_coroutine_threadsafe_with_exception(self):
3540        """Test coroutine submission from a thread to an event loop
3541        when an exception is raised."""
3542        future = self.loop.run_in_executor(None, self.target, True)
3543        with self.assertRaises(RuntimeError) as exc_context:
3544            self.loop.run_until_complete(future)
3545        self.assertIn("Fail!", exc_context.exception.args)
3546
3547    def test_run_coroutine_threadsafe_with_timeout(self):
3548        """Test coroutine submission from a thread to an event loop
3549        when a timeout is raised."""
3550        callback = lambda: self.target(timeout=0)
3551        future = self.loop.run_in_executor(None, callback)
3552        with self.assertRaises(asyncio.TimeoutError):
3553            self.loop.run_until_complete(future)
3554        test_utils.run_briefly(self.loop)
3555        # Check that there's no pending task (add has been cancelled)
3556        for task in asyncio.all_tasks(self.loop):
3557            self.assertTrue(task.done())
3558
3559    def test_run_coroutine_threadsafe_task_cancelled(self):
3560        """Test coroutine submission from a tread to an event loop
3561        when the task is cancelled."""
3562        callback = lambda: self.target(cancel=True)
3563        future = self.loop.run_in_executor(None, callback)
3564        with self.assertRaises(asyncio.CancelledError):
3565            self.loop.run_until_complete(future)
3566
3567    def test_run_coroutine_threadsafe_task_factory_exception(self):
3568        """Test coroutine submission from a tread to an event loop
3569        when the task factory raise an exception."""
3570
3571        def task_factory(loop, coro):
3572            raise NameError
3573
3574        run = self.loop.run_in_executor(
3575            None, lambda: self.target(advance_coro=True))
3576
3577        # Set exception handler
3578        callback = test_utils.MockCallback()
3579        self.loop.set_exception_handler(callback)
3580
3581        # Set corrupted task factory
3582        self.addCleanup(self.loop.set_task_factory,
3583                        self.loop.get_task_factory())
3584        self.loop.set_task_factory(task_factory)
3585
3586        # Run event loop
3587        with self.assertRaises(NameError) as exc_context:
3588            self.loop.run_until_complete(run)
3589
3590        # Check exceptions
3591        self.assertEqual(len(callback.call_args_list), 1)
3592        (loop, context), kwargs = callback.call_args
3593        self.assertEqual(context['exception'], exc_context.exception)
3594
3595
3596class SleepTests(test_utils.TestCase):
3597    def setUp(self):
3598        super().setUp()
3599        self.loop = asyncio.new_event_loop()
3600        self.set_event_loop(self.loop)
3601
3602    def tearDown(self):
3603        self.loop.close()
3604        self.loop = None
3605        super().tearDown()
3606
3607    def test_sleep_zero(self):
3608        result = 0
3609
3610        def inc_result(num):
3611            nonlocal result
3612            result += num
3613
3614        async def coro():
3615            self.loop.call_soon(inc_result, 1)
3616            self.assertEqual(result, 0)
3617            num = await asyncio.sleep(0, result=10)
3618            self.assertEqual(result, 1) # inc'ed by call_soon
3619            inc_result(num) # num should be 11
3620
3621        self.loop.run_until_complete(coro())
3622        self.assertEqual(result, 11)
3623
3624    def test_loop_argument_is_deprecated(self):
3625        # Remove test when loop argument is removed in Python 3.10
3626        with self.assertWarns(DeprecationWarning):
3627            self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
3628
3629
3630class WaitTests(test_utils.TestCase):
3631    def setUp(self):
3632        super().setUp()
3633        self.loop = asyncio.new_event_loop()
3634        self.set_event_loop(self.loop)
3635
3636    def tearDown(self):
3637        self.loop.close()
3638        self.loop = None
3639        super().tearDown()
3640
3641    def test_loop_argument_is_deprecated_in_wait(self):
3642        # Remove test when loop argument is removed in Python 3.10
3643        with self.assertWarns(DeprecationWarning):
3644            self.loop.run_until_complete(
3645                asyncio.wait([coroutine_function()], loop=self.loop))
3646
3647    def test_loop_argument_is_deprecated_in_wait_for(self):
3648        # Remove test when loop argument is removed in Python 3.10
3649        with self.assertWarns(DeprecationWarning):
3650            self.loop.run_until_complete(
3651                asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
3652
3653    def test_coro_is_deprecated_in_wait(self):
3654        # Remove test when passing coros to asyncio.wait() is removed in 3.11
3655        with self.assertWarns(DeprecationWarning):
3656            self.loop.run_until_complete(
3657                asyncio.wait([coroutine_function()]))
3658
3659        task = self.loop.create_task(coroutine_function())
3660        with self.assertWarns(DeprecationWarning):
3661            self.loop.run_until_complete(
3662                asyncio.wait([task, coroutine_function()]))
3663
3664
3665class CompatibilityTests(test_utils.TestCase):
3666    # Tests for checking a bridge between old-styled coroutines
3667    # and async/await syntax
3668
3669    def setUp(self):
3670        super().setUp()
3671        self.loop = asyncio.new_event_loop()
3672        self.set_event_loop(self.loop)
3673
3674    def tearDown(self):
3675        self.loop.close()
3676        self.loop = None
3677        super().tearDown()
3678
3679    def test_yield_from_awaitable(self):
3680
3681        with self.assertWarns(DeprecationWarning):
3682            @asyncio.coroutine
3683            def coro():
3684                yield from asyncio.sleep(0)
3685                return 'ok'
3686
3687        result = self.loop.run_until_complete(coro())
3688        self.assertEqual('ok', result)
3689
3690    def test_await_old_style_coro(self):
3691
3692        with self.assertWarns(DeprecationWarning):
3693            @asyncio.coroutine
3694            def coro1():
3695                return 'ok1'
3696
3697        with self.assertWarns(DeprecationWarning):
3698            @asyncio.coroutine
3699            def coro2():
3700                yield from asyncio.sleep(0)
3701                return 'ok2'
3702
3703        async def inner():
3704            return await asyncio.gather(coro1(), coro2(), loop=self.loop)
3705
3706        result = self.loop.run_until_complete(inner())
3707        self.assertEqual(['ok1', 'ok2'], result)
3708
3709    def test_debug_mode_interop(self):
3710        # https://bugs.python.org/issue32636
3711        code = textwrap.dedent("""
3712            import asyncio
3713
3714            async def native_coro():
3715                pass
3716
3717            @asyncio.coroutine
3718            def old_style_coro():
3719                yield from native_coro()
3720
3721            asyncio.run(old_style_coro())
3722        """)
3723
3724        assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
3725                         PYTHONASYNCIODEBUG="1")
3726
3727
3728if __name__ == '__main__':
3729    unittest.main()
3730