1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import types
14import unittest
15import weakref
16from unittest import mock
17
18import asyncio
19from asyncio import coroutines
20from asyncio import futures
21from asyncio import tasks
22from test.test_asyncio import utils as test_utils
23from test import support
24from test.support.script_helper import assert_python_ok
25
26
27@asyncio.coroutine
28def coroutine_function():
29    pass
30
31
32@contextlib.contextmanager
33def set_coroutine_debug(enabled):
34    coroutines = asyncio.coroutines
35
36    old_debug = coroutines._DEBUG
37    try:
38        coroutines._DEBUG = enabled
39        yield
40    finally:
41        coroutines._DEBUG = old_debug
42
43
44
45def format_coroutine(qualname, state, src, source_traceback, generator=False):
46    if generator:
47        state = '%s' % state
48    else:
49        state = '%s, defined' % state
50    if source_traceback is not None:
51        frame = source_traceback[-1]
52        return ('coro=<%s() %s at %s> created at %s:%s'
53                % (qualname, state, src, frame[0], frame[1]))
54    else:
55        return 'coro=<%s() %s at %s>' % (qualname, state, src)
56
57
58class Dummy:
59
60    def __repr__(self):
61        return '<Dummy>'
62
63    def __call__(self, *args):
64        pass
65
66
67class CoroLikeObject:
68    def send(self, v):
69        raise StopIteration(42)
70
71    def throw(self, *exc):
72        pass
73
74    def close(self):
75        pass
76
77    def __await__(self):
78        return self
79
80
81class BaseTaskTests:
82
83    Task = None
84    Future = None
85
86    def new_task(self, loop, coro):
87        return self.__class__.Task(coro, loop=loop)
88
89    def new_future(self, loop):
90        return self.__class__.Future(loop=loop)
91
92    def setUp(self):
93        super().setUp()
94        self.loop = self.new_test_loop()
95        self.loop.set_task_factory(self.new_task)
96        self.loop.create_future = lambda: self.new_future(self.loop)
97
98    def test_task_del_collect(self):
99        class Evil:
100            def __del__(self):
101                gc.collect()
102
103        @asyncio.coroutine
104        def run():
105            return Evil()
106
107        self.loop.run_until_complete(
108            asyncio.gather(*[
109                self.new_task(self.loop, run()) for _ in range(100)
110            ], loop=self.loop))
111
112    def test_other_loop_future(self):
113        other_loop = asyncio.new_event_loop()
114        fut = self.new_future(other_loop)
115
116        async def run(fut):
117            await fut
118
119        try:
120            with self.assertRaisesRegex(RuntimeError,
121                                        r'Task .* got Future .* attached'):
122                self.loop.run_until_complete(run(fut))
123        finally:
124            other_loop.close()
125
126    def test_task_awaits_on_itself(self):
127
128        async def test():
129            await task
130
131        task = asyncio.ensure_future(test(), loop=self.loop)
132
133        with self.assertRaisesRegex(RuntimeError,
134                                    'Task cannot await on itself'):
135            self.loop.run_until_complete(task)
136
137    def test_task_class(self):
138        @asyncio.coroutine
139        def notmuch():
140            return 'ok'
141        t = self.new_task(self.loop, notmuch())
142        self.loop.run_until_complete(t)
143        self.assertTrue(t.done())
144        self.assertEqual(t.result(), 'ok')
145        self.assertIs(t._loop, self.loop)
146        self.assertIs(t.get_loop(), self.loop)
147
148        loop = asyncio.new_event_loop()
149        self.set_event_loop(loop)
150        t = self.new_task(loop, notmuch())
151        self.assertIs(t._loop, loop)
152        loop.run_until_complete(t)
153        loop.close()
154
155    def test_ensure_future_coroutine(self):
156        @asyncio.coroutine
157        def notmuch():
158            return 'ok'
159        t = asyncio.ensure_future(notmuch(), loop=self.loop)
160        self.loop.run_until_complete(t)
161        self.assertTrue(t.done())
162        self.assertEqual(t.result(), 'ok')
163        self.assertIs(t._loop, self.loop)
164
165        loop = asyncio.new_event_loop()
166        self.set_event_loop(loop)
167        t = asyncio.ensure_future(notmuch(), loop=loop)
168        self.assertIs(t._loop, loop)
169        loop.run_until_complete(t)
170        loop.close()
171
172    def test_ensure_future_future(self):
173        f_orig = self.new_future(self.loop)
174        f_orig.set_result('ko')
175
176        f = asyncio.ensure_future(f_orig)
177        self.loop.run_until_complete(f)
178        self.assertTrue(f.done())
179        self.assertEqual(f.result(), 'ko')
180        self.assertIs(f, f_orig)
181
182        loop = asyncio.new_event_loop()
183        self.set_event_loop(loop)
184
185        with self.assertRaises(ValueError):
186            f = asyncio.ensure_future(f_orig, loop=loop)
187
188        loop.close()
189
190        f = asyncio.ensure_future(f_orig, loop=self.loop)
191        self.assertIs(f, f_orig)
192
193    def test_ensure_future_task(self):
194        @asyncio.coroutine
195        def notmuch():
196            return 'ok'
197        t_orig = self.new_task(self.loop, notmuch())
198        t = asyncio.ensure_future(t_orig)
199        self.loop.run_until_complete(t)
200        self.assertTrue(t.done())
201        self.assertEqual(t.result(), 'ok')
202        self.assertIs(t, t_orig)
203
204        loop = asyncio.new_event_loop()
205        self.set_event_loop(loop)
206
207        with self.assertRaises(ValueError):
208            t = asyncio.ensure_future(t_orig, loop=loop)
209
210        loop.close()
211
212        t = asyncio.ensure_future(t_orig, loop=self.loop)
213        self.assertIs(t, t_orig)
214
215    def test_ensure_future_awaitable(self):
216        class Aw:
217            def __init__(self, coro):
218                self.coro = coro
219            def __await__(self):
220                return (yield from self.coro)
221
222        @asyncio.coroutine
223        def coro():
224            return 'ok'
225
226        loop = asyncio.new_event_loop()
227        self.set_event_loop(loop)
228        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
229        loop.run_until_complete(fut)
230        assert fut.result() == 'ok'
231
232    def test_ensure_future_neither(self):
233        with self.assertRaises(TypeError):
234            asyncio.ensure_future('ok')
235
236    def test_get_stack(self):
237        T = None
238
239        async def foo():
240            await bar()
241
242        async def bar():
243            # test get_stack()
244            f = T.get_stack(limit=1)
245            try:
246                self.assertEqual(f[0].f_code.co_name, 'foo')
247            finally:
248                f = None
249
250            # test print_stack()
251            file = io.StringIO()
252            T.print_stack(limit=1, file=file)
253            file.seek(0)
254            tb = file.read()
255            self.assertRegex(tb, r'foo\(\) running')
256
257        async def runner():
258            nonlocal T
259            T = asyncio.ensure_future(foo(), loop=self.loop)
260            await T
261
262        self.loop.run_until_complete(runner())
263
264    def test_task_repr(self):
265        self.loop.set_debug(False)
266
267        @asyncio.coroutine
268        def notmuch():
269            yield from []
270            return 'abc'
271
272        # test coroutine function
273        self.assertEqual(notmuch.__name__, 'notmuch')
274        self.assertRegex(notmuch.__qualname__,
275                         r'\w+.test_task_repr.<locals>.notmuch')
276        self.assertEqual(notmuch.__module__, __name__)
277
278        filename, lineno = test_utils.get_function_source(notmuch)
279        src = "%s:%s" % (filename, lineno)
280
281        # test coroutine object
282        gen = notmuch()
283        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
284        self.assertEqual(gen.__name__, 'notmuch')
285        self.assertEqual(gen.__qualname__, coro_qualname)
286
287        # test pending Task
288        t = self.new_task(self.loop, gen)
289        t.add_done_callback(Dummy())
290
291        coro = format_coroutine(coro_qualname, 'running', src,
292                                t._source_traceback, generator=True)
293        self.assertEqual(repr(t),
294                         '<Task pending %s cb=[<Dummy>()]>' % coro)
295
296        # test cancelling Task
297        t.cancel()  # Does not take immediate effect!
298        self.assertEqual(repr(t),
299                         '<Task cancelling %s cb=[<Dummy>()]>' % coro)
300
301        # test cancelled Task
302        self.assertRaises(asyncio.CancelledError,
303                          self.loop.run_until_complete, t)
304        coro = format_coroutine(coro_qualname, 'done', src,
305                                t._source_traceback)
306        self.assertEqual(repr(t),
307                         '<Task cancelled %s>' % coro)
308
309        # test finished Task
310        t = self.new_task(self.loop, notmuch())
311        self.loop.run_until_complete(t)
312        coro = format_coroutine(coro_qualname, 'done', src,
313                                t._source_traceback)
314        self.assertEqual(repr(t),
315                         "<Task finished %s result='abc'>" % coro)
316
317    def test_task_repr_coro_decorator(self):
318        self.loop.set_debug(False)
319
320        @asyncio.coroutine
321        def notmuch():
322            # notmuch() function doesn't use yield from: it will be wrapped by
323            # @coroutine decorator
324            return 123
325
326        # test coroutine function
327        self.assertEqual(notmuch.__name__, 'notmuch')
328        self.assertRegex(notmuch.__qualname__,
329                         r'\w+.test_task_repr_coro_decorator'
330                         r'\.<locals>\.notmuch')
331        self.assertEqual(notmuch.__module__, __name__)
332
333        # test coroutine object
334        gen = notmuch()
335        # On Python >= 3.5, generators now inherit the name of the
336        # function, as expected, and have a qualified name (__qualname__
337        # attribute).
338        coro_name = 'notmuch'
339        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
340                         '.<locals>.notmuch')
341        self.assertEqual(gen.__name__, coro_name)
342        self.assertEqual(gen.__qualname__, coro_qualname)
343
344        # test repr(CoroWrapper)
345        if coroutines._DEBUG:
346            # format the coroutine object
347            if coroutines._DEBUG:
348                filename, lineno = test_utils.get_function_source(notmuch)
349                frame = gen._source_traceback[-1]
350                coro = ('%s() running, defined at %s:%s, created at %s:%s'
351                        % (coro_qualname, filename, lineno,
352                           frame[0], frame[1]))
353            else:
354                code = gen.gi_code
355                coro = ('%s() running at %s:%s'
356                        % (coro_qualname, code.co_filename,
357                           code.co_firstlineno))
358
359            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
360
361        # test pending Task
362        t = self.new_task(self.loop, gen)
363        t.add_done_callback(Dummy())
364
365        # format the coroutine object
366        if coroutines._DEBUG:
367            src = '%s:%s' % test_utils.get_function_source(notmuch)
368        else:
369            code = gen.gi_code
370            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
371        coro = format_coroutine(coro_qualname, 'running', src,
372                                t._source_traceback,
373                                generator=not coroutines._DEBUG)
374        self.assertEqual(repr(t),
375                         '<Task pending %s cb=[<Dummy>()]>' % coro)
376        self.loop.run_until_complete(t)
377
378    def test_task_repr_wait_for(self):
379        self.loop.set_debug(False)
380
381        async def wait_for(fut):
382            return await fut
383
384        fut = self.new_future(self.loop)
385        task = self.new_task(self.loop, wait_for(fut))
386        test_utils.run_briefly(self.loop)
387        self.assertRegex(repr(task),
388                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
389
390        fut.set_result(None)
391        self.loop.run_until_complete(task)
392
393    def test_task_repr_partial_corowrapper(self):
394        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
395        # coroutine is a partial function
396        with set_coroutine_debug(True):
397            self.loop.set_debug(True)
398
399            async def func(x, y):
400                await asyncio.sleep(0)
401
402            partial_func = asyncio.coroutine(functools.partial(func, 1))
403            task = self.loop.create_task(partial_func(2))
404
405            # make warnings quiet
406            task._log_destroy_pending = False
407            self.addCleanup(task._coro.close)
408
409        coro_repr = repr(task._coro)
410        expected = (
411            r'<CoroWrapper \w+.test_task_repr_partial_corowrapper'
412            r'\.<locals>\.func\(1\)\(\) running, '
413        )
414        self.assertRegex(coro_repr, expected)
415
416    def test_task_basics(self):
417
418        async def outer():
419            a = await inner1()
420            b = await inner2()
421            return a+b
422
423        async def inner1():
424            return 42
425
426        async def inner2():
427            return 1000
428
429        t = outer()
430        self.assertEqual(self.loop.run_until_complete(t), 1042)
431
432    def test_cancel(self):
433
434        def gen():
435            when = yield
436            self.assertAlmostEqual(10.0, when)
437            yield 0
438
439        loop = self.new_test_loop(gen)
440
441        async def task():
442            await asyncio.sleep(10.0, loop=loop)
443            return 12
444
445        t = self.new_task(loop, task())
446        loop.call_soon(t.cancel)
447        with self.assertRaises(asyncio.CancelledError):
448            loop.run_until_complete(t)
449        self.assertTrue(t.done())
450        self.assertTrue(t.cancelled())
451        self.assertFalse(t.cancel())
452
453    def test_cancel_yield(self):
454        @asyncio.coroutine
455        def task():
456            yield
457            yield
458            return 12
459
460        t = self.new_task(self.loop, task())
461        test_utils.run_briefly(self.loop)  # start coro
462        t.cancel()
463        self.assertRaises(
464            asyncio.CancelledError, self.loop.run_until_complete, t)
465        self.assertTrue(t.done())
466        self.assertTrue(t.cancelled())
467        self.assertFalse(t.cancel())
468
469    def test_cancel_inner_future(self):
470        f = self.new_future(self.loop)
471
472        async def task():
473            await f
474            return 12
475
476        t = self.new_task(self.loop, task())
477        test_utils.run_briefly(self.loop)  # start task
478        f.cancel()
479        with self.assertRaises(asyncio.CancelledError):
480            self.loop.run_until_complete(t)
481        self.assertTrue(f.cancelled())
482        self.assertTrue(t.cancelled())
483
484    def test_cancel_both_task_and_inner_future(self):
485        f = self.new_future(self.loop)
486
487        async def task():
488            await f
489            return 12
490
491        t = self.new_task(self.loop, task())
492        test_utils.run_briefly(self.loop)
493
494        f.cancel()
495        t.cancel()
496
497        with self.assertRaises(asyncio.CancelledError):
498            self.loop.run_until_complete(t)
499
500        self.assertTrue(t.done())
501        self.assertTrue(f.cancelled())
502        self.assertTrue(t.cancelled())
503
504    def test_cancel_task_catching(self):
505        fut1 = self.new_future(self.loop)
506        fut2 = self.new_future(self.loop)
507
508        async def task():
509            await fut1
510            try:
511                await fut2
512            except asyncio.CancelledError:
513                return 42
514
515        t = self.new_task(self.loop, task())
516        test_utils.run_briefly(self.loop)
517        self.assertIs(t._fut_waiter, fut1)  # White-box test.
518        fut1.set_result(None)
519        test_utils.run_briefly(self.loop)
520        self.assertIs(t._fut_waiter, fut2)  # White-box test.
521        t.cancel()
522        self.assertTrue(fut2.cancelled())
523        res = self.loop.run_until_complete(t)
524        self.assertEqual(res, 42)
525        self.assertFalse(t.cancelled())
526
527    def test_cancel_task_ignoring(self):
528        fut1 = self.new_future(self.loop)
529        fut2 = self.new_future(self.loop)
530        fut3 = self.new_future(self.loop)
531
532        async def task():
533            await fut1
534            try:
535                await fut2
536            except asyncio.CancelledError:
537                pass
538            res = await fut3
539            return res
540
541        t = self.new_task(self.loop, task())
542        test_utils.run_briefly(self.loop)
543        self.assertIs(t._fut_waiter, fut1)  # White-box test.
544        fut1.set_result(None)
545        test_utils.run_briefly(self.loop)
546        self.assertIs(t._fut_waiter, fut2)  # White-box test.
547        t.cancel()
548        self.assertTrue(fut2.cancelled())
549        test_utils.run_briefly(self.loop)
550        self.assertIs(t._fut_waiter, fut3)  # White-box test.
551        fut3.set_result(42)
552        res = self.loop.run_until_complete(t)
553        self.assertEqual(res, 42)
554        self.assertFalse(fut3.cancelled())
555        self.assertFalse(t.cancelled())
556
557    def test_cancel_current_task(self):
558        loop = asyncio.new_event_loop()
559        self.set_event_loop(loop)
560
561        async def task():
562            t.cancel()
563            self.assertTrue(t._must_cancel)  # White-box test.
564            # The sleep should be cancelled immediately.
565            await asyncio.sleep(100, loop=loop)
566            return 12
567
568        t = self.new_task(loop, task())
569        self.assertRaises(
570            asyncio.CancelledError, loop.run_until_complete, t)
571        self.assertTrue(t.done())
572        self.assertFalse(t._must_cancel)  # White-box test.
573        self.assertFalse(t.cancel())
574
575    def test_cancel_at_end(self):
576        """coroutine end right after task is cancelled"""
577        loop = asyncio.new_event_loop()
578        self.set_event_loop(loop)
579
580        @asyncio.coroutine
581        def task():
582            t.cancel()
583            self.assertTrue(t._must_cancel)  # White-box test.
584            return 12
585
586        t = self.new_task(loop, task())
587        self.assertRaises(
588            asyncio.CancelledError, loop.run_until_complete, t)
589        self.assertTrue(t.done())
590        self.assertFalse(t._must_cancel)  # White-box test.
591        self.assertFalse(t.cancel())
592
593    def test_cancel_awaited_task(self):
594        # This tests for a relatively rare condition when
595        # a task cancellation is requested for a task which is not
596        # currently blocked, such as a task cancelling itself.
597        # In this situation we must ensure that whatever next future
598        # or task the cancelled task blocks on is cancelled correctly
599        # as well.  See also bpo-34872.
600        loop = asyncio.new_event_loop()
601        self.addCleanup(lambda: loop.close())
602
603        task = nested_task = None
604        fut = self.new_future(loop)
605
606        async def nested():
607            await fut
608
609        async def coro():
610            nonlocal nested_task
611            # Create a sub-task and wait for it to run.
612            nested_task = self.new_task(loop, nested())
613            await asyncio.sleep(0)
614
615            # Request the current task to be cancelled.
616            task.cancel()
617            # Block on the nested task, which should be immediately
618            # cancelled.
619            await nested_task
620
621        task = self.new_task(loop, coro())
622        with self.assertRaises(asyncio.CancelledError):
623            loop.run_until_complete(task)
624
625        self.assertTrue(task.cancelled())
626        self.assertTrue(nested_task.cancelled())
627        self.assertTrue(fut.cancelled())
628
629    def test_stop_while_run_in_complete(self):
630
631        def gen():
632            when = yield
633            self.assertAlmostEqual(0.1, when)
634            when = yield 0.1
635            self.assertAlmostEqual(0.2, when)
636            when = yield 0.1
637            self.assertAlmostEqual(0.3, when)
638            yield 0.1
639
640        loop = self.new_test_loop(gen)
641
642        x = 0
643
644        async def task():
645            nonlocal x
646            while x < 10:
647                await asyncio.sleep(0.1, loop=loop)
648                x += 1
649                if x == 2:
650                    loop.stop()
651
652        t = self.new_task(loop, task())
653        with self.assertRaises(RuntimeError) as cm:
654            loop.run_until_complete(t)
655        self.assertEqual(str(cm.exception),
656                         'Event loop stopped before Future completed.')
657        self.assertFalse(t.done())
658        self.assertEqual(x, 2)
659        self.assertAlmostEqual(0.3, loop.time())
660
661        t.cancel()
662        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
663
664    def test_log_traceback(self):
665        async def coro():
666            pass
667
668        task = self.new_task(self.loop, coro())
669        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
670            task._log_traceback = True
671        self.loop.run_until_complete(task)
672
673    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
674        def gen():
675            when = yield
676            self.assertAlmostEqual(0, when)
677
678        loop = self.new_test_loop(gen)
679
680        fut = self.new_future(loop)
681        fut.set_result('done')
682
683        ret = loop.run_until_complete(asyncio.wait_for(fut, 0, loop=loop))
684
685        self.assertEqual(ret, 'done')
686        self.assertTrue(fut.done())
687        self.assertAlmostEqual(0, loop.time())
688
689    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
690        def gen():
691            when = yield
692            self.assertAlmostEqual(0, when)
693
694        loop = self.new_test_loop(gen)
695
696        foo_started = False
697
698        @asyncio.coroutine
699        def foo():
700            nonlocal foo_started
701            foo_started = True
702
703        with self.assertRaises(asyncio.TimeoutError):
704            loop.run_until_complete(asyncio.wait_for(foo(), 0, loop=loop))
705
706        self.assertAlmostEqual(0, loop.time())
707        self.assertEqual(foo_started, False)
708
709    def test_wait_for_timeout_less_then_0_or_0(self):
710        def gen():
711            when = yield
712            self.assertAlmostEqual(0.2, when)
713            when = yield 0
714            self.assertAlmostEqual(0, when)
715
716        for timeout in [0, -1]:
717            with self.subTest(timeout=timeout):
718                loop = self.new_test_loop(gen)
719
720                foo_running = None
721
722                async def foo():
723                    nonlocal foo_running
724                    foo_running = True
725                    try:
726                        await asyncio.sleep(0.2, loop=loop)
727                    finally:
728                        foo_running = False
729                    return 'done'
730
731                fut = self.new_task(loop, foo())
732
733                with self.assertRaises(asyncio.TimeoutError):
734                    loop.run_until_complete(asyncio.wait_for(
735                        fut, timeout, loop=loop))
736                self.assertTrue(fut.done())
737                # it should have been cancelled due to the timeout
738                self.assertTrue(fut.cancelled())
739                self.assertAlmostEqual(0, loop.time())
740                self.assertEqual(foo_running, False)
741
742    def test_wait_for(self):
743
744        def gen():
745            when = yield
746            self.assertAlmostEqual(0.2, when)
747            when = yield 0
748            self.assertAlmostEqual(0.1, when)
749            when = yield 0.1
750
751        loop = self.new_test_loop(gen)
752
753        foo_running = None
754
755        async def foo():
756            nonlocal foo_running
757            foo_running = True
758            try:
759                await asyncio.sleep(0.2, loop=loop)
760            finally:
761                foo_running = False
762            return 'done'
763
764        fut = self.new_task(loop, foo())
765
766        with self.assertRaises(asyncio.TimeoutError):
767            loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop))
768        self.assertTrue(fut.done())
769        # it should have been cancelled due to the timeout
770        self.assertTrue(fut.cancelled())
771        self.assertAlmostEqual(0.1, loop.time())
772        self.assertEqual(foo_running, False)
773
774    def test_wait_for_blocking(self):
775        loop = self.new_test_loop()
776
777        @asyncio.coroutine
778        def coro():
779            return 'done'
780
781        res = loop.run_until_complete(asyncio.wait_for(coro(),
782                                                       timeout=None,
783                                                       loop=loop))
784        self.assertEqual(res, 'done')
785
786    def test_wait_for_with_global_loop(self):
787
788        def gen():
789            when = yield
790            self.assertAlmostEqual(0.2, when)
791            when = yield 0
792            self.assertAlmostEqual(0.01, when)
793            yield 0.01
794
795        loop = self.new_test_loop(gen)
796
797        async def foo():
798            await asyncio.sleep(0.2, loop=loop)
799            return 'done'
800
801        asyncio.set_event_loop(loop)
802        try:
803            fut = self.new_task(loop, foo())
804            with self.assertRaises(asyncio.TimeoutError):
805                loop.run_until_complete(asyncio.wait_for(fut, 0.01))
806        finally:
807            asyncio.set_event_loop(None)
808
809        self.assertAlmostEqual(0.01, loop.time())
810        self.assertTrue(fut.done())
811        self.assertTrue(fut.cancelled())
812
813    def test_wait_for_race_condition(self):
814
815        def gen():
816            yield 0.1
817            yield 0.1
818            yield 0.1
819
820        loop = self.new_test_loop(gen)
821
822        fut = self.new_future(loop)
823        task = asyncio.wait_for(fut, timeout=0.2, loop=loop)
824        loop.call_later(0.1, fut.set_result, "ok")
825        res = loop.run_until_complete(task)
826        self.assertEqual(res, "ok")
827
828    def test_wait_for_waits_for_task_cancellation(self):
829        loop = asyncio.new_event_loop()
830        self.addCleanup(loop.close)
831
832        task_done = False
833
834        async def foo():
835            async def inner():
836                nonlocal task_done
837                try:
838                    await asyncio.sleep(0.2, loop=loop)
839                finally:
840                    task_done = True
841
842            inner_task = self.new_task(loop, inner())
843
844            with self.assertRaises(asyncio.TimeoutError):
845                await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
846
847            self.assertTrue(task_done)
848
849        loop.run_until_complete(foo())
850
851    def test_wait_for_self_cancellation(self):
852        loop = asyncio.new_event_loop()
853        self.addCleanup(loop.close)
854
855        async def foo():
856            async def inner():
857                try:
858                    await asyncio.sleep(0.3, loop=loop)
859                except asyncio.CancelledError:
860                    try:
861                        await asyncio.sleep(0.3, loop=loop)
862                    except asyncio.CancelledError:
863                        await asyncio.sleep(0.3, loop=loop)
864
865                return 42
866
867            inner_task = self.new_task(loop, inner())
868
869            wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
870
871            # Test that wait_for itself is properly cancellable
872            # even when the initial task holds up the initial cancellation.
873            task = self.new_task(loop, wait)
874            await asyncio.sleep(0.2, loop=loop)
875            task.cancel()
876
877            with self.assertRaises(asyncio.CancelledError):
878                await task
879
880            self.assertEqual(await inner_task, 42)
881
882        loop.run_until_complete(foo())
883
884    def test_wait(self):
885
886        def gen():
887            when = yield
888            self.assertAlmostEqual(0.1, when)
889            when = yield 0
890            self.assertAlmostEqual(0.15, when)
891            yield 0.15
892
893        loop = self.new_test_loop(gen)
894
895        a = self.new_task(loop, asyncio.sleep(0.1, loop=loop))
896        b = self.new_task(loop, asyncio.sleep(0.15, loop=loop))
897
898        async def foo():
899            done, pending = await asyncio.wait([b, a], loop=loop)
900            self.assertEqual(done, set([a, b]))
901            self.assertEqual(pending, set())
902            return 42
903
904        res = loop.run_until_complete(self.new_task(loop, foo()))
905        self.assertEqual(res, 42)
906        self.assertAlmostEqual(0.15, loop.time())
907
908        # Doing it again should take no time and exercise a different path.
909        res = loop.run_until_complete(self.new_task(loop, foo()))
910        self.assertAlmostEqual(0.15, loop.time())
911        self.assertEqual(res, 42)
912
913    def test_wait_with_global_loop(self):
914
915        def gen():
916            when = yield
917            self.assertAlmostEqual(0.01, when)
918            when = yield 0
919            self.assertAlmostEqual(0.015, when)
920            yield 0.015
921
922        loop = self.new_test_loop(gen)
923
924        a = self.new_task(loop, asyncio.sleep(0.01, loop=loop))
925        b = self.new_task(loop, asyncio.sleep(0.015, loop=loop))
926
927        async def foo():
928            done, pending = await asyncio.wait([b, a])
929            self.assertEqual(done, set([a, b]))
930            self.assertEqual(pending, set())
931            return 42
932
933        asyncio.set_event_loop(loop)
934        res = loop.run_until_complete(
935            self.new_task(loop, foo()))
936
937        self.assertEqual(res, 42)
938
939    def test_wait_duplicate_coroutines(self):
940
941        @asyncio.coroutine
942        def coro(s):
943            return s
944        c = coro('test')
945
946        task =self.new_task(
947            self.loop,
948            asyncio.wait([c, c, coro('spam')], loop=self.loop))
949
950        done, pending = self.loop.run_until_complete(task)
951
952        self.assertFalse(pending)
953        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
954
955    def test_wait_errors(self):
956        self.assertRaises(
957            ValueError, self.loop.run_until_complete,
958            asyncio.wait(set(), loop=self.loop))
959
960        # -1 is an invalid return_when value
961        sleep_coro = asyncio.sleep(10.0, loop=self.loop)
962        wait_coro = asyncio.wait([sleep_coro], return_when=-1, loop=self.loop)
963        self.assertRaises(ValueError,
964                          self.loop.run_until_complete, wait_coro)
965
966        sleep_coro.close()
967
968    def test_wait_first_completed(self):
969
970        def gen():
971            when = yield
972            self.assertAlmostEqual(10.0, when)
973            when = yield 0
974            self.assertAlmostEqual(0.1, when)
975            yield 0.1
976
977        loop = self.new_test_loop(gen)
978
979        a = self.new_task(loop, asyncio.sleep(10.0, loop=loop))
980        b = self.new_task(loop, asyncio.sleep(0.1, loop=loop))
981        task = self.new_task(
982            loop,
983            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED,
984                         loop=loop))
985
986        done, pending = loop.run_until_complete(task)
987        self.assertEqual({b}, done)
988        self.assertEqual({a}, pending)
989        self.assertFalse(a.done())
990        self.assertTrue(b.done())
991        self.assertIsNone(b.result())
992        self.assertAlmostEqual(0.1, loop.time())
993
994        # move forward to close generator
995        loop.advance_time(10)
996        loop.run_until_complete(asyncio.wait([a, b], loop=loop))
997
998    def test_wait_really_done(self):
999        # there is possibility that some tasks in the pending list
1000        # became done but their callbacks haven't all been called yet
1001
1002        @asyncio.coroutine
1003        def coro1():
1004            yield
1005
1006        @asyncio.coroutine
1007        def coro2():
1008            yield
1009            yield
1010
1011        a = self.new_task(self.loop, coro1())
1012        b = self.new_task(self.loop, coro2())
1013        task = self.new_task(
1014            self.loop,
1015            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED,
1016                         loop=self.loop))
1017
1018        done, pending = self.loop.run_until_complete(task)
1019        self.assertEqual({a, b}, done)
1020        self.assertTrue(a.done())
1021        self.assertIsNone(a.result())
1022        self.assertTrue(b.done())
1023        self.assertIsNone(b.result())
1024
1025    def test_wait_first_exception(self):
1026
1027        def gen():
1028            when = yield
1029            self.assertAlmostEqual(10.0, when)
1030            yield 0
1031
1032        loop = self.new_test_loop(gen)
1033
1034        # first_exception, task already has exception
1035        a = self.new_task(loop, asyncio.sleep(10.0, loop=loop))
1036
1037        @asyncio.coroutine
1038        def exc():
1039            raise ZeroDivisionError('err')
1040
1041        b = self.new_task(loop, exc())
1042        task = self.new_task(
1043            loop,
1044            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION,
1045                         loop=loop))
1046
1047        done, pending = loop.run_until_complete(task)
1048        self.assertEqual({b}, done)
1049        self.assertEqual({a}, pending)
1050        self.assertAlmostEqual(0, loop.time())
1051
1052        # move forward to close generator
1053        loop.advance_time(10)
1054        loop.run_until_complete(asyncio.wait([a, b], loop=loop))
1055
1056    def test_wait_first_exception_in_wait(self):
1057
1058        def gen():
1059            when = yield
1060            self.assertAlmostEqual(10.0, when)
1061            when = yield 0
1062            self.assertAlmostEqual(0.01, when)
1063            yield 0.01
1064
1065        loop = self.new_test_loop(gen)
1066
1067        # first_exception, exception during waiting
1068        a = self.new_task(loop, asyncio.sleep(10.0, loop=loop))
1069
1070        async def exc():
1071            await asyncio.sleep(0.01, loop=loop)
1072            raise ZeroDivisionError('err')
1073
1074        b = self.new_task(loop, exc())
1075        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION,
1076                            loop=loop)
1077
1078        done, pending = loop.run_until_complete(task)
1079        self.assertEqual({b}, done)
1080        self.assertEqual({a}, pending)
1081        self.assertAlmostEqual(0.01, loop.time())
1082
1083        # move forward to close generator
1084        loop.advance_time(10)
1085        loop.run_until_complete(asyncio.wait([a, b], loop=loop))
1086
1087    def test_wait_with_exception(self):
1088
1089        def gen():
1090            when = yield
1091            self.assertAlmostEqual(0.1, when)
1092            when = yield 0
1093            self.assertAlmostEqual(0.15, when)
1094            yield 0.15
1095
1096        loop = self.new_test_loop(gen)
1097
1098        a = self.new_task(loop, asyncio.sleep(0.1, loop=loop))
1099
1100        @asyncio.coroutine
1101        def sleeper():
1102            yield from asyncio.sleep(0.15, loop=loop)
1103            raise ZeroDivisionError('really')
1104
1105        b = self.new_task(loop, sleeper())
1106
1107        async def foo():
1108            done, pending = await asyncio.wait([b, a], loop=loop)
1109            self.assertEqual(len(done), 2)
1110            self.assertEqual(pending, set())
1111            errors = set(f for f in done if f.exception() is not None)
1112            self.assertEqual(len(errors), 1)
1113
1114        loop.run_until_complete(self.new_task(loop, foo()))
1115        self.assertAlmostEqual(0.15, loop.time())
1116
1117        loop.run_until_complete(self.new_task(loop, foo()))
1118        self.assertAlmostEqual(0.15, loop.time())
1119
1120    def test_wait_with_timeout(self):
1121
1122        def gen():
1123            when = yield
1124            self.assertAlmostEqual(0.1, when)
1125            when = yield 0
1126            self.assertAlmostEqual(0.15, when)
1127            when = yield 0
1128            self.assertAlmostEqual(0.11, when)
1129            yield 0.11
1130
1131        loop = self.new_test_loop(gen)
1132
1133        a = self.new_task(loop, asyncio.sleep(0.1, loop=loop))
1134        b = self.new_task(loop, asyncio.sleep(0.15, loop=loop))
1135
1136        async def foo():
1137            done, pending = await asyncio.wait([b, a], timeout=0.11,
1138                                                    loop=loop)
1139            self.assertEqual(done, set([a]))
1140            self.assertEqual(pending, set([b]))
1141
1142        loop.run_until_complete(self.new_task(loop, foo()))
1143        self.assertAlmostEqual(0.11, loop.time())
1144
1145        # move forward to close generator
1146        loop.advance_time(10)
1147        loop.run_until_complete(asyncio.wait([a, b], loop=loop))
1148
1149    def test_wait_concurrent_complete(self):
1150
1151        def gen():
1152            when = yield
1153            self.assertAlmostEqual(0.1, when)
1154            when = yield 0
1155            self.assertAlmostEqual(0.15, when)
1156            when = yield 0
1157            self.assertAlmostEqual(0.1, when)
1158            yield 0.1
1159
1160        loop = self.new_test_loop(gen)
1161
1162        a = self.new_task(loop, asyncio.sleep(0.1, loop=loop))
1163        b = self.new_task(loop, asyncio.sleep(0.15, loop=loop))
1164
1165        done, pending = loop.run_until_complete(
1166            asyncio.wait([b, a], timeout=0.1, loop=loop))
1167
1168        self.assertEqual(done, set([a]))
1169        self.assertEqual(pending, set([b]))
1170        self.assertAlmostEqual(0.1, loop.time())
1171
1172        # move forward to close generator
1173        loop.advance_time(10)
1174        loop.run_until_complete(asyncio.wait([a, b], loop=loop))
1175
1176    def test_as_completed(self):
1177
1178        def gen():
1179            yield 0
1180            yield 0
1181            yield 0.01
1182            yield 0
1183
1184        loop = self.new_test_loop(gen)
1185        # disable "slow callback" warning
1186        loop.slow_callback_duration = 1.0
1187        completed = set()
1188        time_shifted = False
1189
1190        @asyncio.coroutine
1191        def sleeper(dt, x):
1192            nonlocal time_shifted
1193            yield from asyncio.sleep(dt, loop=loop)
1194            completed.add(x)
1195            if not time_shifted and 'a' in completed and 'b' in completed:
1196                time_shifted = True
1197                loop.advance_time(0.14)
1198            return x
1199
1200        a = sleeper(0.01, 'a')
1201        b = sleeper(0.01, 'b')
1202        c = sleeper(0.15, 'c')
1203
1204        @asyncio.coroutine
1205        def foo():
1206            values = []
1207            for f in asyncio.as_completed([b, c, a], loop=loop):
1208                values.append((yield from f))
1209            return values
1210
1211        res = loop.run_until_complete(self.new_task(loop, foo()))
1212        self.assertAlmostEqual(0.15, loop.time())
1213        self.assertTrue('a' in res[:2])
1214        self.assertTrue('b' in res[:2])
1215        self.assertEqual(res[2], 'c')
1216
1217        # Doing it again should take no time and exercise a different path.
1218        res = loop.run_until_complete(self.new_task(loop, foo()))
1219        self.assertAlmostEqual(0.15, loop.time())
1220
1221    def test_as_completed_with_timeout(self):
1222
1223        def gen():
1224            yield
1225            yield 0
1226            yield 0
1227            yield 0.1
1228
1229        loop = self.new_test_loop(gen)
1230
1231        a = loop.create_task(asyncio.sleep(0.1, 'a', loop=loop))
1232        b = loop.create_task(asyncio.sleep(0.15, 'b', loop=loop))
1233
1234        async def foo():
1235            values = []
1236            for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
1237                if values:
1238                    loop.advance_time(0.02)
1239                try:
1240                    v = await f
1241                    values.append((1, v))
1242                except asyncio.TimeoutError as exc:
1243                    values.append((2, exc))
1244            return values
1245
1246        res = loop.run_until_complete(self.new_task(loop, foo()))
1247        self.assertEqual(len(res), 2, res)
1248        self.assertEqual(res[0], (1, 'a'))
1249        self.assertEqual(res[1][0], 2)
1250        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1251        self.assertAlmostEqual(0.12, loop.time())
1252
1253        # move forward to close generator
1254        loop.advance_time(10)
1255        loop.run_until_complete(asyncio.wait([a, b], loop=loop))
1256
1257    def test_as_completed_with_unused_timeout(self):
1258
1259        def gen():
1260            yield
1261            yield 0
1262            yield 0.01
1263
1264        loop = self.new_test_loop(gen)
1265
1266        a = asyncio.sleep(0.01, 'a', loop=loop)
1267
1268        async def foo():
1269            for f in asyncio.as_completed([a], timeout=1, loop=loop):
1270                v = await f
1271                self.assertEqual(v, 'a')
1272
1273        loop.run_until_complete(self.new_task(loop, foo()))
1274
1275    def test_as_completed_reverse_wait(self):
1276
1277        def gen():
1278            yield 0
1279            yield 0.05
1280            yield 0
1281
1282        loop = self.new_test_loop(gen)
1283
1284        a = asyncio.sleep(0.05, 'a', loop=loop)
1285        b = asyncio.sleep(0.10, 'b', loop=loop)
1286        fs = {a, b}
1287        futs = list(asyncio.as_completed(fs, loop=loop))
1288        self.assertEqual(len(futs), 2)
1289
1290        x = loop.run_until_complete(futs[1])
1291        self.assertEqual(x, 'a')
1292        self.assertAlmostEqual(0.05, loop.time())
1293        loop.advance_time(0.05)
1294        y = loop.run_until_complete(futs[0])
1295        self.assertEqual(y, 'b')
1296        self.assertAlmostEqual(0.10, loop.time())
1297
1298    def test_as_completed_concurrent(self):
1299
1300        def gen():
1301            when = yield
1302            self.assertAlmostEqual(0.05, when)
1303            when = yield 0
1304            self.assertAlmostEqual(0.05, when)
1305            yield 0.05
1306
1307        loop = self.new_test_loop(gen)
1308
1309        a = asyncio.sleep(0.05, 'a', loop=loop)
1310        b = asyncio.sleep(0.05, 'b', loop=loop)
1311        fs = {a, b}
1312        futs = list(asyncio.as_completed(fs, loop=loop))
1313        self.assertEqual(len(futs), 2)
1314        waiter = asyncio.wait(futs, loop=loop)
1315        done, pending = loop.run_until_complete(waiter)
1316        self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1317
1318    def test_as_completed_duplicate_coroutines(self):
1319
1320        @asyncio.coroutine
1321        def coro(s):
1322            return s
1323
1324        @asyncio.coroutine
1325        def runner():
1326            result = []
1327            c = coro('ham')
1328            for f in asyncio.as_completed([c, c, coro('spam')],
1329                                          loop=self.loop):
1330                result.append((yield from f))
1331            return result
1332
1333        fut = self.new_task(self.loop, runner())
1334        self.loop.run_until_complete(fut)
1335        result = fut.result()
1336        self.assertEqual(set(result), {'ham', 'spam'})
1337        self.assertEqual(len(result), 2)
1338
1339    def test_sleep(self):
1340
1341        def gen():
1342            when = yield
1343            self.assertAlmostEqual(0.05, when)
1344            when = yield 0.05
1345            self.assertAlmostEqual(0.1, when)
1346            yield 0.05
1347
1348        loop = self.new_test_loop(gen)
1349
1350        @asyncio.coroutine
1351        def sleeper(dt, arg):
1352            yield from asyncio.sleep(dt/2, loop=loop)
1353            res = yield from asyncio.sleep(dt/2, arg, loop=loop)
1354            return res
1355
1356        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1357        loop.run_until_complete(t)
1358        self.assertTrue(t.done())
1359        self.assertEqual(t.result(), 'yeah')
1360        self.assertAlmostEqual(0.1, loop.time())
1361
1362    def test_sleep_cancel(self):
1363
1364        def gen():
1365            when = yield
1366            self.assertAlmostEqual(10.0, when)
1367            yield 0
1368
1369        loop = self.new_test_loop(gen)
1370
1371        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah', loop=loop))
1372
1373        handle = None
1374        orig_call_later = loop.call_later
1375
1376        def call_later(delay, callback, *args):
1377            nonlocal handle
1378            handle = orig_call_later(delay, callback, *args)
1379            return handle
1380
1381        loop.call_later = call_later
1382        test_utils.run_briefly(loop)
1383
1384        self.assertFalse(handle._cancelled)
1385
1386        t.cancel()
1387        test_utils.run_briefly(loop)
1388        self.assertTrue(handle._cancelled)
1389
1390    def test_task_cancel_sleeping_task(self):
1391
1392        def gen():
1393            when = yield
1394            self.assertAlmostEqual(0.1, when)
1395            when = yield 0
1396            self.assertAlmostEqual(5000, when)
1397            yield 0.1
1398
1399        loop = self.new_test_loop(gen)
1400
1401        @asyncio.coroutine
1402        def sleep(dt):
1403            yield from asyncio.sleep(dt, loop=loop)
1404
1405        @asyncio.coroutine
1406        def doit():
1407            sleeper = self.new_task(loop, sleep(5000))
1408            loop.call_later(0.1, sleeper.cancel)
1409            try:
1410                yield from sleeper
1411            except asyncio.CancelledError:
1412                return 'cancelled'
1413            else:
1414                return 'slept in'
1415
1416        doer = doit()
1417        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1418        self.assertAlmostEqual(0.1, loop.time())
1419
1420    def test_task_cancel_waiter_future(self):
1421        fut = self.new_future(self.loop)
1422
1423        @asyncio.coroutine
1424        def coro():
1425            yield from fut
1426
1427        task = self.new_task(self.loop, coro())
1428        test_utils.run_briefly(self.loop)
1429        self.assertIs(task._fut_waiter, fut)
1430
1431        task.cancel()
1432        test_utils.run_briefly(self.loop)
1433        self.assertRaises(
1434            asyncio.CancelledError, self.loop.run_until_complete, task)
1435        self.assertIsNone(task._fut_waiter)
1436        self.assertTrue(fut.cancelled())
1437
1438    def test_task_set_methods(self):
1439        @asyncio.coroutine
1440        def notmuch():
1441            return 'ko'
1442
1443        gen = notmuch()
1444        task = self.new_task(self.loop, gen)
1445
1446        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1447            task.set_result('ok')
1448
1449        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1450            task.set_exception(ValueError())
1451
1452        self.assertEqual(
1453            self.loop.run_until_complete(task),
1454            'ko')
1455
1456    def test_step_result(self):
1457        @asyncio.coroutine
1458        def notmuch():
1459            yield None
1460            yield 1
1461            return 'ko'
1462
1463        self.assertRaises(
1464            RuntimeError, self.loop.run_until_complete, notmuch())
1465
1466    def test_step_result_future(self):
1467        # If coroutine returns future, task waits on this future.
1468
1469        class Fut(asyncio.Future):
1470            def __init__(self, *args, **kwds):
1471                self.cb_added = False
1472                super().__init__(*args, **kwds)
1473
1474            def add_done_callback(self, *args, **kwargs):
1475                self.cb_added = True
1476                super().add_done_callback(*args, **kwargs)
1477
1478        fut = Fut(loop=self.loop)
1479        result = None
1480
1481        @asyncio.coroutine
1482        def wait_for_future():
1483            nonlocal result
1484            result = yield from fut
1485
1486        t = self.new_task(self.loop, wait_for_future())
1487        test_utils.run_briefly(self.loop)
1488        self.assertTrue(fut.cb_added)
1489
1490        res = object()
1491        fut.set_result(res)
1492        test_utils.run_briefly(self.loop)
1493        self.assertIs(res, result)
1494        self.assertTrue(t.done())
1495        self.assertIsNone(t.result())
1496
1497    def test_baseexception_during_cancel(self):
1498
1499        def gen():
1500            when = yield
1501            self.assertAlmostEqual(10.0, when)
1502            yield 0
1503
1504        loop = self.new_test_loop(gen)
1505
1506        @asyncio.coroutine
1507        def sleeper():
1508            yield from asyncio.sleep(10, loop=loop)
1509
1510        base_exc = BaseException()
1511
1512        @asyncio.coroutine
1513        def notmutch():
1514            try:
1515                yield from sleeper()
1516            except asyncio.CancelledError:
1517                raise base_exc
1518
1519        task = self.new_task(loop, notmutch())
1520        test_utils.run_briefly(loop)
1521
1522        task.cancel()
1523        self.assertFalse(task.done())
1524
1525        self.assertRaises(BaseException, test_utils.run_briefly, loop)
1526
1527        self.assertTrue(task.done())
1528        self.assertFalse(task.cancelled())
1529        self.assertIs(task.exception(), base_exc)
1530
1531    def test_iscoroutinefunction(self):
1532        def fn():
1533            pass
1534
1535        self.assertFalse(asyncio.iscoroutinefunction(fn))
1536
1537        def fn1():
1538            yield
1539        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1540
1541        @asyncio.coroutine
1542        def fn2():
1543            yield
1544        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1545
1546        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1547
1548    def test_yield_vs_yield_from(self):
1549        fut = self.new_future(self.loop)
1550
1551        @asyncio.coroutine
1552        def wait_for_future():
1553            yield fut
1554
1555        task = wait_for_future()
1556        with self.assertRaises(RuntimeError):
1557            self.loop.run_until_complete(task)
1558
1559        self.assertFalse(fut.done())
1560
1561    def test_yield_vs_yield_from_generator(self):
1562        @asyncio.coroutine
1563        def coro():
1564            yield
1565
1566        @asyncio.coroutine
1567        def wait_for_future():
1568            gen = coro()
1569            try:
1570                yield gen
1571            finally:
1572                gen.close()
1573
1574        task = wait_for_future()
1575        self.assertRaises(
1576            RuntimeError,
1577            self.loop.run_until_complete, task)
1578
1579    def test_coroutine_non_gen_function(self):
1580        @asyncio.coroutine
1581        def func():
1582            return 'test'
1583
1584        self.assertTrue(asyncio.iscoroutinefunction(func))
1585
1586        coro = func()
1587        self.assertTrue(asyncio.iscoroutine(coro))
1588
1589        res = self.loop.run_until_complete(coro)
1590        self.assertEqual(res, 'test')
1591
1592    def test_coroutine_non_gen_function_return_future(self):
1593        fut = self.new_future(self.loop)
1594
1595        @asyncio.coroutine
1596        def func():
1597            return fut
1598
1599        @asyncio.coroutine
1600        def coro():
1601            fut.set_result('test')
1602
1603        t1 = self.new_task(self.loop, func())
1604        t2 = self.new_task(self.loop, coro())
1605        res = self.loop.run_until_complete(t1)
1606        self.assertEqual(res, 'test')
1607        self.assertIsNone(t2.result())
1608
1609
1610    def test_current_task_deprecated(self):
1611        Task = self.__class__.Task
1612
1613        with self.assertWarns(PendingDeprecationWarning):
1614            self.assertIsNone(Task.current_task(loop=self.loop))
1615
1616        async def coro(loop):
1617            with self.assertWarns(PendingDeprecationWarning):
1618                self.assertIs(Task.current_task(loop=loop), task)
1619
1620            # See http://bugs.python.org/issue29271 for details:
1621            asyncio.set_event_loop(loop)
1622            try:
1623                with self.assertWarns(PendingDeprecationWarning):
1624                    self.assertIs(Task.current_task(None), task)
1625                with self.assertWarns(PendingDeprecationWarning):
1626                    self.assertIs(Task.current_task(), task)
1627            finally:
1628                asyncio.set_event_loop(None)
1629
1630        task = self.new_task(self.loop, coro(self.loop))
1631        self.loop.run_until_complete(task)
1632        with self.assertWarns(PendingDeprecationWarning):
1633            self.assertIsNone(Task.current_task(loop=self.loop))
1634
1635    def test_current_task(self):
1636        self.assertIsNone(asyncio.current_task(loop=self.loop))
1637
1638        async def coro(loop):
1639            self.assertIs(asyncio.current_task(loop=loop), task)
1640
1641            self.assertIs(asyncio.current_task(None), task)
1642            self.assertIs(asyncio.current_task(), task)
1643
1644        task = self.new_task(self.loop, coro(self.loop))
1645        self.loop.run_until_complete(task)
1646        self.assertIsNone(asyncio.current_task(loop=self.loop))
1647
1648    def test_current_task_with_interleaving_tasks(self):
1649        self.assertIsNone(asyncio.current_task(loop=self.loop))
1650
1651        fut1 = self.new_future(self.loop)
1652        fut2 = self.new_future(self.loop)
1653
1654        async def coro1(loop):
1655            self.assertTrue(asyncio.current_task(loop=loop) is task1)
1656            await fut1
1657            self.assertTrue(asyncio.current_task(loop=loop) is task1)
1658            fut2.set_result(True)
1659
1660        async def coro2(loop):
1661            self.assertTrue(asyncio.current_task(loop=loop) is task2)
1662            fut1.set_result(True)
1663            await fut2
1664            self.assertTrue(asyncio.current_task(loop=loop) is task2)
1665
1666        task1 = self.new_task(self.loop, coro1(self.loop))
1667        task2 = self.new_task(self.loop, coro2(self.loop))
1668
1669        self.loop.run_until_complete(asyncio.wait((task1, task2),
1670                                                  loop=self.loop))
1671        self.assertIsNone(asyncio.current_task(loop=self.loop))
1672
1673    # Some thorough tests for cancellation propagation through
1674    # coroutines, tasks and wait().
1675
1676    def test_yield_future_passes_cancel(self):
1677        # Cancelling outer() cancels inner() cancels waiter.
1678        proof = 0
1679        waiter = self.new_future(self.loop)
1680
1681        async def inner():
1682            nonlocal proof
1683            try:
1684                await waiter
1685            except asyncio.CancelledError:
1686                proof += 1
1687                raise
1688            else:
1689                self.fail('got past sleep() in inner()')
1690
1691        async def outer():
1692            nonlocal proof
1693            try:
1694                await inner()
1695            except asyncio.CancelledError:
1696                proof += 100  # Expect this path.
1697            else:
1698                proof += 10
1699
1700        f = asyncio.ensure_future(outer(), loop=self.loop)
1701        test_utils.run_briefly(self.loop)
1702        f.cancel()
1703        self.loop.run_until_complete(f)
1704        self.assertEqual(proof, 101)
1705        self.assertTrue(waiter.cancelled())
1706
1707    def test_yield_wait_does_not_shield_cancel(self):
1708        # Cancelling outer() makes wait() return early, leaves inner()
1709        # running.
1710        proof = 0
1711        waiter = self.new_future(self.loop)
1712
1713        async def inner():
1714            nonlocal proof
1715            await waiter
1716            proof += 1
1717
1718        async def outer():
1719            nonlocal proof
1720            d, p = await asyncio.wait([inner()], loop=self.loop)
1721            proof += 100
1722
1723        f = asyncio.ensure_future(outer(), loop=self.loop)
1724        test_utils.run_briefly(self.loop)
1725        f.cancel()
1726        self.assertRaises(
1727            asyncio.CancelledError, self.loop.run_until_complete, f)
1728        waiter.set_result(None)
1729        test_utils.run_briefly(self.loop)
1730        self.assertEqual(proof, 1)
1731
1732    def test_shield_result(self):
1733        inner = self.new_future(self.loop)
1734        outer = asyncio.shield(inner)
1735        inner.set_result(42)
1736        res = self.loop.run_until_complete(outer)
1737        self.assertEqual(res, 42)
1738
1739    def test_shield_exception(self):
1740        inner = self.new_future(self.loop)
1741        outer = asyncio.shield(inner)
1742        test_utils.run_briefly(self.loop)
1743        exc = RuntimeError('expected')
1744        inner.set_exception(exc)
1745        test_utils.run_briefly(self.loop)
1746        self.assertIs(outer.exception(), exc)
1747
1748    def test_shield_cancel(self):
1749        inner = self.new_future(self.loop)
1750        outer = asyncio.shield(inner)
1751        test_utils.run_briefly(self.loop)
1752        inner.cancel()
1753        test_utils.run_briefly(self.loop)
1754        self.assertTrue(outer.cancelled())
1755
1756    def test_shield_shortcut(self):
1757        fut = self.new_future(self.loop)
1758        fut.set_result(42)
1759        res = self.loop.run_until_complete(asyncio.shield(fut))
1760        self.assertEqual(res, 42)
1761
1762    def test_shield_effect(self):
1763        # Cancelling outer() does not affect inner().
1764        proof = 0
1765        waiter = self.new_future(self.loop)
1766
1767        async def inner():
1768            nonlocal proof
1769            await waiter
1770            proof += 1
1771
1772        async def outer():
1773            nonlocal proof
1774            await asyncio.shield(inner(), loop=self.loop)
1775            proof += 100
1776
1777        f = asyncio.ensure_future(outer(), loop=self.loop)
1778        test_utils.run_briefly(self.loop)
1779        f.cancel()
1780        with self.assertRaises(asyncio.CancelledError):
1781            self.loop.run_until_complete(f)
1782        waiter.set_result(None)
1783        test_utils.run_briefly(self.loop)
1784        self.assertEqual(proof, 1)
1785
1786    def test_shield_gather(self):
1787        child1 = self.new_future(self.loop)
1788        child2 = self.new_future(self.loop)
1789        parent = asyncio.gather(child1, child2, loop=self.loop)
1790        outer = asyncio.shield(parent, loop=self.loop)
1791        test_utils.run_briefly(self.loop)
1792        outer.cancel()
1793        test_utils.run_briefly(self.loop)
1794        self.assertTrue(outer.cancelled())
1795        child1.set_result(1)
1796        child2.set_result(2)
1797        test_utils.run_briefly(self.loop)
1798        self.assertEqual(parent.result(), [1, 2])
1799
1800    def test_gather_shield(self):
1801        child1 = self.new_future(self.loop)
1802        child2 = self.new_future(self.loop)
1803        inner1 = asyncio.shield(child1, loop=self.loop)
1804        inner2 = asyncio.shield(child2, loop=self.loop)
1805        parent = asyncio.gather(inner1, inner2, loop=self.loop)
1806        test_utils.run_briefly(self.loop)
1807        parent.cancel()
1808        # This should cancel inner1 and inner2 but bot child1 and child2.
1809        test_utils.run_briefly(self.loop)
1810        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
1811        self.assertTrue(inner1.cancelled())
1812        self.assertTrue(inner2.cancelled())
1813        child1.set_result(1)
1814        child2.set_result(2)
1815        test_utils.run_briefly(self.loop)
1816
1817    def test_as_completed_invalid_args(self):
1818        fut = self.new_future(self.loop)
1819
1820        # as_completed() expects a list of futures, not a future instance
1821        self.assertRaises(TypeError, self.loop.run_until_complete,
1822            asyncio.as_completed(fut, loop=self.loop))
1823        coro = coroutine_function()
1824        self.assertRaises(TypeError, self.loop.run_until_complete,
1825            asyncio.as_completed(coro, loop=self.loop))
1826        coro.close()
1827
1828    def test_wait_invalid_args(self):
1829        fut = self.new_future(self.loop)
1830
1831        # wait() expects a list of futures, not a future instance
1832        self.assertRaises(TypeError, self.loop.run_until_complete,
1833            asyncio.wait(fut, loop=self.loop))
1834        coro = coroutine_function()
1835        self.assertRaises(TypeError, self.loop.run_until_complete,
1836            asyncio.wait(coro, loop=self.loop))
1837        coro.close()
1838
1839        # wait() expects at least a future
1840        self.assertRaises(ValueError, self.loop.run_until_complete,
1841            asyncio.wait([], loop=self.loop))
1842
1843    def test_corowrapper_mocks_generator(self):
1844
1845        def check():
1846            # A function that asserts various things.
1847            # Called twice, with different debug flag values.
1848
1849            @asyncio.coroutine
1850            def coro():
1851                # The actual coroutine.
1852                self.assertTrue(gen.gi_running)
1853                yield from fut
1854
1855            # A completed Future used to run the coroutine.
1856            fut = self.new_future(self.loop)
1857            fut.set_result(None)
1858
1859            # Call the coroutine.
1860            gen = coro()
1861
1862            # Check some properties.
1863            self.assertTrue(asyncio.iscoroutine(gen))
1864            self.assertIsInstance(gen.gi_frame, types.FrameType)
1865            self.assertFalse(gen.gi_running)
1866            self.assertIsInstance(gen.gi_code, types.CodeType)
1867
1868            # Run it.
1869            self.loop.run_until_complete(gen)
1870
1871            # The frame should have changed.
1872            self.assertIsNone(gen.gi_frame)
1873
1874        # Test with debug flag cleared.
1875        with set_coroutine_debug(False):
1876            check()
1877
1878        # Test with debug flag set.
1879        with set_coroutine_debug(True):
1880            check()
1881
1882    def test_yield_from_corowrapper(self):
1883        with set_coroutine_debug(True):
1884            @asyncio.coroutine
1885            def t1():
1886                return (yield from t2())
1887
1888            @asyncio.coroutine
1889            def t2():
1890                f = self.new_future(self.loop)
1891                self.new_task(self.loop, t3(f))
1892                return (yield from f)
1893
1894            @asyncio.coroutine
1895            def t3(f):
1896                f.set_result((1, 2, 3))
1897
1898            task = self.new_task(self.loop, t1())
1899            val = self.loop.run_until_complete(task)
1900            self.assertEqual(val, (1, 2, 3))
1901
1902    def test_yield_from_corowrapper_send(self):
1903        def foo():
1904            a = yield
1905            return a
1906
1907        def call(arg):
1908            cw = asyncio.coroutines.CoroWrapper(foo())
1909            cw.send(None)
1910            try:
1911                cw.send(arg)
1912            except StopIteration as ex:
1913                return ex.args[0]
1914            else:
1915                raise AssertionError('StopIteration was expected')
1916
1917        self.assertEqual(call((1, 2)), (1, 2))
1918        self.assertEqual(call('spam'), 'spam')
1919
1920    def test_corowrapper_weakref(self):
1921        wd = weakref.WeakValueDictionary()
1922        def foo(): yield from []
1923        cw = asyncio.coroutines.CoroWrapper(foo())
1924        wd['cw'] = cw  # Would fail without __weakref__ slot.
1925        cw.gen = None  # Suppress warning from __del__.
1926
1927    def test_corowrapper_throw(self):
1928        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
1929        def foo():
1930            value = None
1931            while True:
1932                try:
1933                    value = yield value
1934                except Exception as e:
1935                    value = e
1936
1937        exception = Exception("foo")
1938        cw = asyncio.coroutines.CoroWrapper(foo())
1939        cw.send(None)
1940        self.assertIs(exception, cw.throw(exception))
1941
1942        cw = asyncio.coroutines.CoroWrapper(foo())
1943        cw.send(None)
1944        self.assertIs(exception, cw.throw(Exception, exception))
1945
1946        cw = asyncio.coroutines.CoroWrapper(foo())
1947        cw.send(None)
1948        exception = cw.throw(Exception, "foo")
1949        self.assertIsInstance(exception, Exception)
1950        self.assertEqual(exception.args, ("foo", ))
1951
1952        cw = asyncio.coroutines.CoroWrapper(foo())
1953        cw.send(None)
1954        exception = cw.throw(Exception, "foo", None)
1955        self.assertIsInstance(exception, Exception)
1956        self.assertEqual(exception.args, ("foo", ))
1957
1958    def test_all_tasks_deprecated(self):
1959        Task = self.__class__.Task
1960
1961        async def coro():
1962            with self.assertWarns(PendingDeprecationWarning):
1963                assert Task.all_tasks(self.loop) == {t}
1964
1965        t = self.new_task(self.loop, coro())
1966        self.loop.run_until_complete(t)
1967
1968    def test_log_destroyed_pending_task(self):
1969        Task = self.__class__.Task
1970
1971        @asyncio.coroutine
1972        def kill_me(loop):
1973            future = self.new_future(loop)
1974            yield from future
1975            # at this point, the only reference to kill_me() task is
1976            # the Task._wakeup() method in future._callbacks
1977            raise Exception("code never reached")
1978
1979        mock_handler = mock.Mock()
1980        self.loop.set_debug(True)
1981        self.loop.set_exception_handler(mock_handler)
1982
1983        # schedule the task
1984        coro = kill_me(self.loop)
1985        task = asyncio.ensure_future(coro, loop=self.loop)
1986
1987        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
1988
1989        # See http://bugs.python.org/issue29271 for details:
1990        asyncio.set_event_loop(self.loop)
1991        try:
1992            with self.assertWarns(PendingDeprecationWarning):
1993                self.assertEqual(Task.all_tasks(), {task})
1994            with self.assertWarns(PendingDeprecationWarning):
1995                self.assertEqual(Task.all_tasks(None), {task})
1996        finally:
1997            asyncio.set_event_loop(None)
1998
1999        # execute the task so it waits for future
2000        self.loop._run_once()
2001        self.assertEqual(len(self.loop._ready), 0)
2002
2003        # remove the future used in kill_me(), and references to the task
2004        del coro.gi_frame.f_locals['future']
2005        coro = None
2006        source_traceback = task._source_traceback
2007        task = None
2008
2009        # no more reference to kill_me() task: the task is destroyed by the GC
2010        support.gc_collect()
2011
2012        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2013
2014        mock_handler.assert_called_with(self.loop, {
2015            'message': 'Task was destroyed but it is pending!',
2016            'task': mock.ANY,
2017            'source_traceback': source_traceback,
2018        })
2019        mock_handler.reset_mock()
2020
2021    @mock.patch('asyncio.base_events.logger')
2022    def test_tb_logger_not_called_after_cancel(self, m_log):
2023        loop = asyncio.new_event_loop()
2024        self.set_event_loop(loop)
2025
2026        @asyncio.coroutine
2027        def coro():
2028            raise TypeError
2029
2030        @asyncio.coroutine
2031        def runner():
2032            task = self.new_task(loop, coro())
2033            yield from asyncio.sleep(0.05, loop=loop)
2034            task.cancel()
2035            task = None
2036
2037        loop.run_until_complete(runner())
2038        self.assertFalse(m_log.error.called)
2039
2040    @mock.patch('asyncio.coroutines.logger')
2041    def test_coroutine_never_yielded(self, m_log):
2042        with set_coroutine_debug(True):
2043            @asyncio.coroutine
2044            def coro_noop():
2045                pass
2046
2047        tb_filename = __file__
2048        tb_lineno = sys._getframe().f_lineno + 2
2049        # create a coroutine object but don't use it
2050        coro_noop()
2051        support.gc_collect()
2052
2053        self.assertTrue(m_log.error.called)
2054        message = m_log.error.call_args[0][0]
2055        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2056
2057        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2058                    r'was never yielded from\n'
2059                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2060                 r'.*\n'
2061                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2062                 r'    coro_noop\(\)$'
2063                 % (re.escape(coro_noop.__qualname__),
2064                    re.escape(func_filename), func_lineno,
2065                    re.escape(tb_filename), tb_lineno))
2066
2067        self.assertRegex(message, re.compile(regex, re.DOTALL))
2068
2069    def test_return_coroutine_from_coroutine(self):
2070        """Return of @asyncio.coroutine()-wrapped function generator object
2071        from @asyncio.coroutine()-wrapped function should have same effect as
2072        returning generator object or Future."""
2073        def check():
2074            @asyncio.coroutine
2075            def outer_coro():
2076                @asyncio.coroutine
2077                def inner_coro():
2078                    return 1
2079
2080                return inner_coro()
2081
2082            result = self.loop.run_until_complete(outer_coro())
2083            self.assertEqual(result, 1)
2084
2085        # Test with debug flag cleared.
2086        with set_coroutine_debug(False):
2087            check()
2088
2089        # Test with debug flag set.
2090        with set_coroutine_debug(True):
2091            check()
2092
2093    def test_task_source_traceback(self):
2094        self.loop.set_debug(True)
2095
2096        task = self.new_task(self.loop, coroutine_function())
2097        lineno = sys._getframe().f_lineno - 1
2098        self.assertIsInstance(task._source_traceback, list)
2099        self.assertEqual(task._source_traceback[-2][:3],
2100                         (__file__,
2101                          lineno,
2102                          'test_task_source_traceback'))
2103        self.loop.run_until_complete(task)
2104
2105    def _test_cancel_wait_for(self, timeout):
2106        loop = asyncio.new_event_loop()
2107        self.addCleanup(loop.close)
2108
2109        @asyncio.coroutine
2110        def blocking_coroutine():
2111            fut = self.new_future(loop)
2112            # Block: fut result is never set
2113            yield from fut
2114
2115        task = loop.create_task(blocking_coroutine())
2116
2117        wait = loop.create_task(asyncio.wait_for(task, timeout, loop=loop))
2118        loop.call_soon(wait.cancel)
2119
2120        self.assertRaises(asyncio.CancelledError,
2121                          loop.run_until_complete, wait)
2122
2123        # Python issue #23219: cancelling the wait must also cancel the task
2124        self.assertTrue(task.cancelled())
2125
2126    def test_cancel_blocking_wait_for(self):
2127        self._test_cancel_wait_for(None)
2128
2129    def test_cancel_wait_for(self):
2130        self._test_cancel_wait_for(60.0)
2131
2132    def test_cancel_gather_1(self):
2133        """Ensure that a gathering future refuses to be cancelled once all
2134        children are done"""
2135        loop = asyncio.new_event_loop()
2136        self.addCleanup(loop.close)
2137
2138        fut = self.new_future(loop)
2139        # The indirection fut->child_coro is needed since otherwise the
2140        # gathering task is done at the same time as the child future
2141        def child_coro():
2142            return (yield from fut)
2143        gather_future = asyncio.gather(child_coro(), loop=loop)
2144        gather_task = asyncio.ensure_future(gather_future, loop=loop)
2145
2146        cancel_result = None
2147        def cancelling_callback(_):
2148            nonlocal cancel_result
2149            cancel_result = gather_task.cancel()
2150        fut.add_done_callback(cancelling_callback)
2151
2152        fut.set_result(42) # calls the cancelling_callback after fut is done()
2153
2154        # At this point the task should complete.
2155        loop.run_until_complete(gather_task)
2156
2157        # Python issue #26923: asyncio.gather drops cancellation
2158        self.assertEqual(cancel_result, False)
2159        self.assertFalse(gather_task.cancelled())
2160        self.assertEqual(gather_task.result(), [42])
2161
2162    def test_cancel_gather_2(self):
2163        loop = asyncio.new_event_loop()
2164        self.addCleanup(loop.close)
2165
2166        async def test():
2167            time = 0
2168            while True:
2169                time += 0.05
2170                await asyncio.gather(asyncio.sleep(0.05, loop=loop),
2171                                     return_exceptions=True,
2172                                     loop=loop)
2173                if time > 1:
2174                    return
2175
2176        async def main():
2177            qwe = self.new_task(loop, test())
2178            await asyncio.sleep(0.2, loop=loop)
2179            qwe.cancel()
2180            try:
2181                await qwe
2182            except asyncio.CancelledError:
2183                pass
2184            else:
2185                self.fail('gather did not propagate the cancellation request')
2186
2187        loop.run_until_complete(main())
2188
2189    def test_exception_traceback(self):
2190        # See http://bugs.python.org/issue28843
2191
2192        @asyncio.coroutine
2193        def foo():
2194            1 / 0
2195
2196        @asyncio.coroutine
2197        def main():
2198            task = self.new_task(self.loop, foo())
2199            yield  # skip one loop iteration
2200            self.assertIsNotNone(task.exception().__traceback__)
2201
2202        self.loop.run_until_complete(main())
2203
2204    @mock.patch('asyncio.base_events.logger')
2205    def test_error_in_call_soon(self, m_log):
2206        def call_soon(callback, *args, **kwargs):
2207            raise ValueError
2208        self.loop.call_soon = call_soon
2209
2210        @asyncio.coroutine
2211        def coro():
2212            pass
2213
2214        self.assertFalse(m_log.error.called)
2215
2216        with self.assertRaises(ValueError):
2217            gen = coro()
2218            try:
2219                self.new_task(self.loop, gen)
2220            finally:
2221                gen.close()
2222
2223        self.assertTrue(m_log.error.called)
2224        message = m_log.error.call_args[0][0]
2225        self.assertIn('Task was destroyed but it is pending', message)
2226
2227        self.assertEqual(asyncio.all_tasks(self.loop), set())
2228
2229    def test_create_task_with_noncoroutine(self):
2230        with self.assertRaisesRegex(TypeError,
2231                                    "a coroutine was expected, got 123"):
2232            self.new_task(self.loop, 123)
2233
2234        # test it for the second time to ensure that caching
2235        # in asyncio.iscoroutine() doesn't break things.
2236        with self.assertRaisesRegex(TypeError,
2237                                    "a coroutine was expected, got 123"):
2238            self.new_task(self.loop, 123)
2239
2240    def test_create_task_with_oldstyle_coroutine(self):
2241
2242        @asyncio.coroutine
2243        def coro():
2244            pass
2245
2246        task = self.new_task(self.loop, coro())
2247        self.assertIsInstance(task, self.Task)
2248        self.loop.run_until_complete(task)
2249
2250        # test it for the second time to ensure that caching
2251        # in asyncio.iscoroutine() doesn't break things.
2252        task = self.new_task(self.loop, coro())
2253        self.assertIsInstance(task, self.Task)
2254        self.loop.run_until_complete(task)
2255
2256    def test_create_task_with_async_function(self):
2257
2258        async def coro():
2259            pass
2260
2261        task = self.new_task(self.loop, coro())
2262        self.assertIsInstance(task, self.Task)
2263        self.loop.run_until_complete(task)
2264
2265        # test it for the second time to ensure that caching
2266        # in asyncio.iscoroutine() doesn't break things.
2267        task = self.new_task(self.loop, coro())
2268        self.assertIsInstance(task, self.Task)
2269        self.loop.run_until_complete(task)
2270
2271    def test_create_task_with_asynclike_function(self):
2272        task = self.new_task(self.loop, CoroLikeObject())
2273        self.assertIsInstance(task, self.Task)
2274        self.assertEqual(self.loop.run_until_complete(task), 42)
2275
2276        # test it for the second time to ensure that caching
2277        # in asyncio.iscoroutine() doesn't break things.
2278        task = self.new_task(self.loop, CoroLikeObject())
2279        self.assertIsInstance(task, self.Task)
2280        self.assertEqual(self.loop.run_until_complete(task), 42)
2281
2282    def test_bare_create_task(self):
2283
2284        async def inner():
2285            return 1
2286
2287        async def coro():
2288            task = asyncio.create_task(inner())
2289            self.assertIsInstance(task, self.Task)
2290            ret = await task
2291            self.assertEqual(1, ret)
2292
2293        self.loop.run_until_complete(coro())
2294
2295    def test_context_1(self):
2296        cvar = contextvars.ContextVar('cvar', default='nope')
2297
2298        async def sub():
2299            await asyncio.sleep(0.01, loop=loop)
2300            self.assertEqual(cvar.get(), 'nope')
2301            cvar.set('something else')
2302
2303        async def main():
2304            self.assertEqual(cvar.get(), 'nope')
2305            subtask = self.new_task(loop, sub())
2306            cvar.set('yes')
2307            self.assertEqual(cvar.get(), 'yes')
2308            await subtask
2309            self.assertEqual(cvar.get(), 'yes')
2310
2311        loop = asyncio.new_event_loop()
2312        try:
2313            task = self.new_task(loop, main())
2314            loop.run_until_complete(task)
2315        finally:
2316            loop.close()
2317
2318    def test_context_2(self):
2319        cvar = contextvars.ContextVar('cvar', default='nope')
2320
2321        async def main():
2322            def fut_on_done(fut):
2323                # This change must not pollute the context
2324                # of the "main()" task.
2325                cvar.set('something else')
2326
2327            self.assertEqual(cvar.get(), 'nope')
2328
2329            for j in range(2):
2330                fut = self.new_future(loop)
2331                fut.add_done_callback(fut_on_done)
2332                cvar.set(f'yes{j}')
2333                loop.call_soon(fut.set_result, None)
2334                await fut
2335                self.assertEqual(cvar.get(), f'yes{j}')
2336
2337                for i in range(3):
2338                    # Test that task passed its context to add_done_callback:
2339                    cvar.set(f'yes{i}-{j}')
2340                    await asyncio.sleep(0.001, loop=loop)
2341                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2342
2343        loop = asyncio.new_event_loop()
2344        try:
2345            task = self.new_task(loop, main())
2346            loop.run_until_complete(task)
2347        finally:
2348            loop.close()
2349
2350        self.assertEqual(cvar.get(), 'nope')
2351
2352    def test_context_3(self):
2353        # Run 100 Tasks in parallel, each modifying cvar.
2354
2355        cvar = contextvars.ContextVar('cvar', default=-1)
2356
2357        async def sub(num):
2358            for i in range(10):
2359                cvar.set(num + i)
2360                await asyncio.sleep(
2361                    random.uniform(0.001, 0.05), loop=loop)
2362                self.assertEqual(cvar.get(), num + i)
2363
2364        async def main():
2365            tasks = []
2366            for i in range(100):
2367                task = loop.create_task(sub(random.randint(0, 10)))
2368                tasks.append(task)
2369
2370            await asyncio.gather(*tasks, loop=loop)
2371
2372        loop = asyncio.new_event_loop()
2373        try:
2374            loop.run_until_complete(main())
2375        finally:
2376            loop.close()
2377
2378        self.assertEqual(cvar.get(), -1)
2379
2380
2381def add_subclass_tests(cls):
2382    BaseTask = cls.Task
2383    BaseFuture = cls.Future
2384
2385    if BaseTask is None or BaseFuture is None:
2386        return cls
2387
2388    class CommonFuture:
2389        def __init__(self, *args, **kwargs):
2390            self.calls = collections.defaultdict(lambda: 0)
2391            super().__init__(*args, **kwargs)
2392
2393        def add_done_callback(self, *args, **kwargs):
2394            self.calls['add_done_callback'] += 1
2395            return super().add_done_callback(*args, **kwargs)
2396
2397    class Task(CommonFuture, BaseTask):
2398        pass
2399
2400    class Future(CommonFuture, BaseFuture):
2401        pass
2402
2403    def test_subclasses_ctask_cfuture(self):
2404        fut = self.Future(loop=self.loop)
2405
2406        async def func():
2407            self.loop.call_soon(lambda: fut.set_result('spam'))
2408            return await fut
2409
2410        task = self.Task(func(), loop=self.loop)
2411
2412        result = self.loop.run_until_complete(task)
2413
2414        self.assertEqual(result, 'spam')
2415
2416        self.assertEqual(
2417            dict(task.calls),
2418            {'add_done_callback': 1})
2419
2420        self.assertEqual(
2421            dict(fut.calls),
2422            {'add_done_callback': 1})
2423
2424    # Add patched Task & Future back to the test case
2425    cls.Task = Task
2426    cls.Future = Future
2427
2428    # Add an extra unit-test
2429    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2430
2431    # Disable the "test_task_source_traceback" test
2432    # (the test is hardcoded for a particular call stack, which
2433    # is slightly different for Task subclasses)
2434    cls.test_task_source_traceback = None
2435
2436    return cls
2437
2438
2439class SetMethodsTest:
2440
2441    def test_set_result_causes_invalid_state(self):
2442        Future = type(self).Future
2443        self.loop.call_exception_handler = exc_handler = mock.Mock()
2444
2445        async def foo():
2446            await asyncio.sleep(0.1, loop=self.loop)
2447            return 10
2448
2449        coro = foo()
2450        task = self.new_task(self.loop, coro)
2451        Future.set_result(task, 'spam')
2452
2453        self.assertEqual(
2454            self.loop.run_until_complete(task),
2455            'spam')
2456
2457        exc_handler.assert_called_once()
2458        exc = exc_handler.call_args[0][0]['exception']
2459        with self.assertRaisesRegex(asyncio.InvalidStateError,
2460                                    r'step\(\): already done'):
2461            raise exc
2462
2463        coro.close()
2464
2465    def test_set_exception_causes_invalid_state(self):
2466        class MyExc(Exception):
2467            pass
2468
2469        Future = type(self).Future
2470        self.loop.call_exception_handler = exc_handler = mock.Mock()
2471
2472        async def foo():
2473            await asyncio.sleep(0.1, loop=self.loop)
2474            return 10
2475
2476        coro = foo()
2477        task = self.new_task(self.loop, coro)
2478        Future.set_exception(task, MyExc())
2479
2480        with self.assertRaises(MyExc):
2481            self.loop.run_until_complete(task)
2482
2483        exc_handler.assert_called_once()
2484        exc = exc_handler.call_args[0][0]['exception']
2485        with self.assertRaisesRegex(asyncio.InvalidStateError,
2486                                    r'step\(\): already done'):
2487            raise exc
2488
2489        coro.close()
2490
2491
2492@unittest.skipUnless(hasattr(futures, '_CFuture') and
2493                     hasattr(tasks, '_CTask'),
2494                     'requires the C _asyncio module')
2495class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2496                          test_utils.TestCase):
2497
2498    Task = getattr(tasks, '_CTask', None)
2499    Future = getattr(futures, '_CFuture', None)
2500
2501    @support.refcount_test
2502    def test_refleaks_in_task___init__(self):
2503        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2504        @asyncio.coroutine
2505        def coro():
2506            pass
2507        task = self.new_task(self.loop, coro())
2508        self.loop.run_until_complete(task)
2509        refs_before = gettotalrefcount()
2510        for i in range(100):
2511            task.__init__(coro(), loop=self.loop)
2512            self.loop.run_until_complete(task)
2513        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2514
2515    def test_del__log_destroy_pending_segfault(self):
2516        @asyncio.coroutine
2517        def coro():
2518            pass
2519        task = self.new_task(self.loop, coro())
2520        self.loop.run_until_complete(task)
2521        with self.assertRaises(AttributeError):
2522            del task._log_destroy_pending
2523
2524
2525@unittest.skipUnless(hasattr(futures, '_CFuture') and
2526                     hasattr(tasks, '_CTask'),
2527                     'requires the C _asyncio module')
2528@add_subclass_tests
2529class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2530
2531    Task = getattr(tasks, '_CTask', None)
2532    Future = getattr(futures, '_CFuture', None)
2533
2534
2535@unittest.skipUnless(hasattr(tasks, '_CTask'),
2536                     'requires the C _asyncio module')
2537@add_subclass_tests
2538class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2539
2540    Task = getattr(tasks, '_CTask', None)
2541    Future = futures._PyFuture
2542
2543
2544@unittest.skipUnless(hasattr(futures, '_CFuture'),
2545                     'requires the C _asyncio module')
2546@add_subclass_tests
2547class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2548
2549    Future = getattr(futures, '_CFuture', None)
2550    Task = tasks._PyTask
2551
2552
2553@unittest.skipUnless(hasattr(tasks, '_CTask'),
2554                     'requires the C _asyncio module')
2555class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2556
2557    Task = getattr(tasks, '_CTask', None)
2558    Future = futures._PyFuture
2559
2560
2561@unittest.skipUnless(hasattr(futures, '_CFuture'),
2562                     'requires the C _asyncio module')
2563class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2564
2565    Task = tasks._PyTask
2566    Future = getattr(futures, '_CFuture', None)
2567
2568
2569class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2570                            test_utils.TestCase):
2571
2572    Task = tasks._PyTask
2573    Future = futures._PyFuture
2574
2575
2576@add_subclass_tests
2577class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2578    Task = tasks._PyTask
2579    Future = futures._PyFuture
2580
2581
2582@unittest.skipUnless(hasattr(tasks, '_CTask'),
2583                     'requires the C _asyncio module')
2584class CTask_Future_Tests(test_utils.TestCase):
2585
2586    def test_foobar(self):
2587        class Fut(asyncio.Future):
2588            @property
2589            def get_loop(self):
2590                raise AttributeError
2591
2592        async def coro():
2593            await fut
2594            return 'spam'
2595
2596        self.loop = asyncio.new_event_loop()
2597        try:
2598            fut = Fut(loop=self.loop)
2599            self.loop.call_later(0.1, fut.set_result, 1)
2600            task = asyncio.Task(coro(), loop=self.loop)
2601            res = self.loop.run_until_complete(task)
2602        finally:
2603            self.loop.close()
2604
2605        self.assertEqual(res, 'spam')
2606
2607
2608class BaseTaskIntrospectionTests:
2609    _register_task = None
2610    _unregister_task = None
2611    _enter_task = None
2612    _leave_task = None
2613
2614    def test__register_task_1(self):
2615        class TaskLike:
2616            @property
2617            def _loop(self):
2618                return loop
2619
2620            def done(self):
2621                return False
2622
2623        task = TaskLike()
2624        loop = mock.Mock()
2625
2626        self.assertEqual(asyncio.all_tasks(loop), set())
2627        self._register_task(task)
2628        self.assertEqual(asyncio.all_tasks(loop), {task})
2629        self._unregister_task(task)
2630
2631    def test__register_task_2(self):
2632        class TaskLike:
2633            def get_loop(self):
2634                return loop
2635
2636            def done(self):
2637                return False
2638
2639        task = TaskLike()
2640        loop = mock.Mock()
2641
2642        self.assertEqual(asyncio.all_tasks(loop), set())
2643        self._register_task(task)
2644        self.assertEqual(asyncio.all_tasks(loop), {task})
2645        self._unregister_task(task)
2646
2647    def test__register_task_3(self):
2648        class TaskLike:
2649            def get_loop(self):
2650                return loop
2651
2652            def done(self):
2653                return True
2654
2655        task = TaskLike()
2656        loop = mock.Mock()
2657
2658        self.assertEqual(asyncio.all_tasks(loop), set())
2659        self._register_task(task)
2660        self.assertEqual(asyncio.all_tasks(loop), set())
2661        with self.assertWarns(PendingDeprecationWarning):
2662            self.assertEqual(asyncio.Task.all_tasks(loop), {task})
2663        self._unregister_task(task)
2664
2665    def test__enter_task(self):
2666        task = mock.Mock()
2667        loop = mock.Mock()
2668        self.assertIsNone(asyncio.current_task(loop))
2669        self._enter_task(loop, task)
2670        self.assertIs(asyncio.current_task(loop), task)
2671        self._leave_task(loop, task)
2672
2673    def test__enter_task_failure(self):
2674        task1 = mock.Mock()
2675        task2 = mock.Mock()
2676        loop = mock.Mock()
2677        self._enter_task(loop, task1)
2678        with self.assertRaises(RuntimeError):
2679            self._enter_task(loop, task2)
2680        self.assertIs(asyncio.current_task(loop), task1)
2681        self._leave_task(loop, task1)
2682
2683    def test__leave_task(self):
2684        task = mock.Mock()
2685        loop = mock.Mock()
2686        self._enter_task(loop, task)
2687        self._leave_task(loop, task)
2688        self.assertIsNone(asyncio.current_task(loop))
2689
2690    def test__leave_task_failure1(self):
2691        task1 = mock.Mock()
2692        task2 = mock.Mock()
2693        loop = mock.Mock()
2694        self._enter_task(loop, task1)
2695        with self.assertRaises(RuntimeError):
2696            self._leave_task(loop, task2)
2697        self.assertIs(asyncio.current_task(loop), task1)
2698        self._leave_task(loop, task1)
2699
2700    def test__leave_task_failure2(self):
2701        task = mock.Mock()
2702        loop = mock.Mock()
2703        with self.assertRaises(RuntimeError):
2704            self._leave_task(loop, task)
2705        self.assertIsNone(asyncio.current_task(loop))
2706
2707    def test__unregister_task(self):
2708        task = mock.Mock()
2709        loop = mock.Mock()
2710        task.get_loop = lambda: loop
2711        self._register_task(task)
2712        self._unregister_task(task)
2713        self.assertEqual(asyncio.all_tasks(loop), set())
2714
2715    def test__unregister_task_not_registered(self):
2716        task = mock.Mock()
2717        loop = mock.Mock()
2718        self._unregister_task(task)
2719        self.assertEqual(asyncio.all_tasks(loop), set())
2720
2721
2722class PyIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests):
2723    _register_task = staticmethod(tasks._py_register_task)
2724    _unregister_task = staticmethod(tasks._py_unregister_task)
2725    _enter_task = staticmethod(tasks._py_enter_task)
2726    _leave_task = staticmethod(tasks._py_leave_task)
2727
2728
2729@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
2730                     'requires the C _asyncio module')
2731class CIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests):
2732    if hasattr(tasks, '_c_register_task'):
2733        _register_task = staticmethod(tasks._c_register_task)
2734        _unregister_task = staticmethod(tasks._c_unregister_task)
2735        _enter_task = staticmethod(tasks._c_enter_task)
2736        _leave_task = staticmethod(tasks._c_leave_task)
2737    else:
2738        _register_task = _unregister_task = _enter_task = _leave_task = None
2739
2740
2741class BaseCurrentLoopTests:
2742
2743    def setUp(self):
2744        super().setUp()
2745        self.loop = asyncio.new_event_loop()
2746        asyncio.set_event_loop(self.loop)
2747
2748    def tearDown(self):
2749        self.loop.close()
2750        asyncio.set_event_loop(None)
2751        super().tearDown()
2752
2753    def new_task(self, coro):
2754        raise NotImplementedError
2755
2756    def test_current_task_no_running_loop(self):
2757        self.assertIsNone(asyncio.current_task(loop=self.loop))
2758
2759    def test_current_task_no_running_loop_implicit(self):
2760        with self.assertRaises(RuntimeError):
2761            asyncio.current_task()
2762
2763    def test_current_task_with_implicit_loop(self):
2764        async def coro():
2765            self.assertIs(asyncio.current_task(loop=self.loop), task)
2766
2767            self.assertIs(asyncio.current_task(None), task)
2768            self.assertIs(asyncio.current_task(), task)
2769
2770        task = self.new_task(coro())
2771        self.loop.run_until_complete(task)
2772        self.assertIsNone(asyncio.current_task(loop=self.loop))
2773
2774
2775class PyCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase):
2776
2777    def new_task(self, coro):
2778        return tasks._PyTask(coro, loop=self.loop)
2779
2780
2781@unittest.skipUnless(hasattr(tasks, '_CTask'),
2782                     'requires the C _asyncio module')
2783class CCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase):
2784
2785    def new_task(self, coro):
2786        return getattr(tasks, '_CTask')(coro, loop=self.loop)
2787
2788
2789class GenericTaskTests(test_utils.TestCase):
2790
2791    def test_future_subclass(self):
2792        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
2793
2794    def test_asyncio_module_compiled(self):
2795        # Because of circular imports it's easy to make _asyncio
2796        # module non-importable.  This is a simple test that will
2797        # fail on systems where C modules were successfully compiled
2798        # (hence the test for _functools), but _asyncio somehow didn't.
2799        try:
2800            import _functools
2801        except ImportError:
2802            pass
2803        else:
2804            try:
2805                import _asyncio
2806            except ImportError:
2807                self.fail('_asyncio module is missing')
2808
2809
2810class GatherTestsBase:
2811
2812    def setUp(self):
2813        super().setUp()
2814        self.one_loop = self.new_test_loop()
2815        self.other_loop = self.new_test_loop()
2816        self.set_event_loop(self.one_loop, cleanup=False)
2817
2818    def _run_loop(self, loop):
2819        while loop._ready:
2820            test_utils.run_briefly(loop)
2821
2822    def _check_success(self, **kwargs):
2823        a, b, c = [asyncio.Future(loop=self.one_loop) for i in range(3)]
2824        fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
2825        cb = test_utils.MockCallback()
2826        fut.add_done_callback(cb)
2827        b.set_result(1)
2828        a.set_result(2)
2829        self._run_loop(self.one_loop)
2830        self.assertEqual(cb.called, False)
2831        self.assertFalse(fut.done())
2832        c.set_result(3)
2833        self._run_loop(self.one_loop)
2834        cb.assert_called_once_with(fut)
2835        self.assertEqual(fut.result(), [2, 1, 3])
2836
2837    def test_success(self):
2838        self._check_success()
2839        self._check_success(return_exceptions=False)
2840
2841    def test_result_exception_success(self):
2842        self._check_success(return_exceptions=True)
2843
2844    def test_one_exception(self):
2845        a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)]
2846        fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
2847        cb = test_utils.MockCallback()
2848        fut.add_done_callback(cb)
2849        exc = ZeroDivisionError()
2850        a.set_result(1)
2851        b.set_exception(exc)
2852        self._run_loop(self.one_loop)
2853        self.assertTrue(fut.done())
2854        cb.assert_called_once_with(fut)
2855        self.assertIs(fut.exception(), exc)
2856        # Does nothing
2857        c.set_result(3)
2858        d.cancel()
2859        e.set_exception(RuntimeError())
2860        e.exception()
2861
2862    def test_return_exceptions(self):
2863        a, b, c, d = [asyncio.Future(loop=self.one_loop) for i in range(4)]
2864        fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
2865                             return_exceptions=True)
2866        cb = test_utils.MockCallback()
2867        fut.add_done_callback(cb)
2868        exc = ZeroDivisionError()
2869        exc2 = RuntimeError()
2870        b.set_result(1)
2871        c.set_exception(exc)
2872        a.set_result(3)
2873        self._run_loop(self.one_loop)
2874        self.assertFalse(fut.done())
2875        d.set_exception(exc2)
2876        self._run_loop(self.one_loop)
2877        self.assertTrue(fut.done())
2878        cb.assert_called_once_with(fut)
2879        self.assertEqual(fut.result(), [3, 1, exc, exc2])
2880
2881    def test_env_var_debug(self):
2882        code = '\n'.join((
2883            'import asyncio.coroutines',
2884            'print(asyncio.coroutines._DEBUG)'))
2885
2886        # Test with -E to not fail if the unit test was run with
2887        # PYTHONASYNCIODEBUG set to a non-empty string
2888        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
2889        self.assertEqual(stdout.rstrip(), b'False')
2890
2891        sts, stdout, stderr = assert_python_ok('-c', code,
2892                                               PYTHONASYNCIODEBUG='',
2893                                               PYTHONDEVMODE='')
2894        self.assertEqual(stdout.rstrip(), b'False')
2895
2896        sts, stdout, stderr = assert_python_ok('-c', code,
2897                                               PYTHONASYNCIODEBUG='1',
2898                                               PYTHONDEVMODE='')
2899        self.assertEqual(stdout.rstrip(), b'True')
2900
2901        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
2902                                               PYTHONASYNCIODEBUG='1',
2903                                               PYTHONDEVMODE='')
2904        self.assertEqual(stdout.rstrip(), b'False')
2905
2906        # -X dev
2907        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
2908                                               '-c', code)
2909        self.assertEqual(stdout.rstrip(), b'True')
2910
2911
2912class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
2913
2914    def wrap_futures(self, *futures):
2915        return futures
2916
2917    def _check_empty_sequence(self, seq_or_iter):
2918        asyncio.set_event_loop(self.one_loop)
2919        self.addCleanup(asyncio.set_event_loop, None)
2920        fut = asyncio.gather(*seq_or_iter)
2921        self.assertIsInstance(fut, asyncio.Future)
2922        self.assertIs(fut._loop, self.one_loop)
2923        self._run_loop(self.one_loop)
2924        self.assertTrue(fut.done())
2925        self.assertEqual(fut.result(), [])
2926        fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
2927        self.assertIs(fut._loop, self.other_loop)
2928
2929    def test_constructor_empty_sequence(self):
2930        self._check_empty_sequence([])
2931        self._check_empty_sequence(())
2932        self._check_empty_sequence(set())
2933        self._check_empty_sequence(iter(""))
2934
2935    def test_constructor_heterogenous_futures(self):
2936        fut1 = asyncio.Future(loop=self.one_loop)
2937        fut2 = asyncio.Future(loop=self.other_loop)
2938        with self.assertRaises(ValueError):
2939            asyncio.gather(fut1, fut2)
2940        with self.assertRaises(ValueError):
2941            asyncio.gather(fut1, loop=self.other_loop)
2942
2943    def test_constructor_homogenous_futures(self):
2944        children = [asyncio.Future(loop=self.other_loop) for i in range(3)]
2945        fut = asyncio.gather(*children)
2946        self.assertIs(fut._loop, self.other_loop)
2947        self._run_loop(self.other_loop)
2948        self.assertFalse(fut.done())
2949        fut = asyncio.gather(*children, loop=self.other_loop)
2950        self.assertIs(fut._loop, self.other_loop)
2951        self._run_loop(self.other_loop)
2952        self.assertFalse(fut.done())
2953
2954    def test_one_cancellation(self):
2955        a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)]
2956        fut = asyncio.gather(a, b, c, d, e)
2957        cb = test_utils.MockCallback()
2958        fut.add_done_callback(cb)
2959        a.set_result(1)
2960        b.cancel()
2961        self._run_loop(self.one_loop)
2962        self.assertTrue(fut.done())
2963        cb.assert_called_once_with(fut)
2964        self.assertFalse(fut.cancelled())
2965        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
2966        # Does nothing
2967        c.set_result(3)
2968        d.cancel()
2969        e.set_exception(RuntimeError())
2970        e.exception()
2971
2972    def test_result_exception_one_cancellation(self):
2973        a, b, c, d, e, f = [asyncio.Future(loop=self.one_loop)
2974                            for i in range(6)]
2975        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
2976        cb = test_utils.MockCallback()
2977        fut.add_done_callback(cb)
2978        a.set_result(1)
2979        zde = ZeroDivisionError()
2980        b.set_exception(zde)
2981        c.cancel()
2982        self._run_loop(self.one_loop)
2983        self.assertFalse(fut.done())
2984        d.set_result(3)
2985        e.cancel()
2986        rte = RuntimeError()
2987        f.set_exception(rte)
2988        res = self.one_loop.run_until_complete(fut)
2989        self.assertIsInstance(res[2], asyncio.CancelledError)
2990        self.assertIsInstance(res[4], asyncio.CancelledError)
2991        res[2] = res[4] = None
2992        self.assertEqual(res, [1, zde, None, 3, None, rte])
2993        cb.assert_called_once_with(fut)
2994
2995
2996class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
2997
2998    def setUp(self):
2999        super().setUp()
3000        asyncio.set_event_loop(self.one_loop)
3001
3002    def wrap_futures(self, *futures):
3003        coros = []
3004        for fut in futures:
3005            @asyncio.coroutine
3006            def coro(fut=fut):
3007                return (yield from fut)
3008            coros.append(coro())
3009        return coros
3010
3011    def test_constructor_loop_selection(self):
3012        @asyncio.coroutine
3013        def coro():
3014            return 'abc'
3015        gen1 = coro()
3016        gen2 = coro()
3017        fut = asyncio.gather(gen1, gen2)
3018        self.assertIs(fut._loop, self.one_loop)
3019        self.one_loop.run_until_complete(fut)
3020
3021        self.set_event_loop(self.other_loop, cleanup=False)
3022        gen3 = coro()
3023        gen4 = coro()
3024        fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
3025        self.assertIs(fut2._loop, self.other_loop)
3026        self.other_loop.run_until_complete(fut2)
3027
3028    def test_duplicate_coroutines(self):
3029        @asyncio.coroutine
3030        def coro(s):
3031            return s
3032        c = coro('abc')
3033        fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
3034        self._run_loop(self.one_loop)
3035        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3036
3037    def test_cancellation_broadcast(self):
3038        # Cancelling outer() cancels all children.
3039        proof = 0
3040        waiter = asyncio.Future(loop=self.one_loop)
3041
3042        @asyncio.coroutine
3043        def inner():
3044            nonlocal proof
3045            yield from waiter
3046            proof += 1
3047
3048        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3049        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3050        gatherer = None
3051
3052        @asyncio.coroutine
3053        def outer():
3054            nonlocal proof, gatherer
3055            gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
3056            yield from gatherer
3057            proof += 100
3058
3059        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3060        test_utils.run_briefly(self.one_loop)
3061        self.assertTrue(f.cancel())
3062        with self.assertRaises(asyncio.CancelledError):
3063            self.one_loop.run_until_complete(f)
3064        self.assertFalse(gatherer.cancel())
3065        self.assertTrue(waiter.cancelled())
3066        self.assertTrue(child1.cancelled())
3067        self.assertTrue(child2.cancelled())
3068        test_utils.run_briefly(self.one_loop)
3069        self.assertEqual(proof, 0)
3070
3071    def test_exception_marking(self):
3072        # Test for the first line marked "Mark exception retrieved."
3073
3074        @asyncio.coroutine
3075        def inner(f):
3076            yield from f
3077            raise RuntimeError('should not be ignored')
3078
3079        a = asyncio.Future(loop=self.one_loop)
3080        b = asyncio.Future(loop=self.one_loop)
3081
3082        @asyncio.coroutine
3083        def outer():
3084            yield from asyncio.gather(inner(a), inner(b), loop=self.one_loop)
3085
3086        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3087        test_utils.run_briefly(self.one_loop)
3088        a.set_result(None)
3089        test_utils.run_briefly(self.one_loop)
3090        b.set_result(None)
3091        test_utils.run_briefly(self.one_loop)
3092        self.assertIsInstance(f.exception(), RuntimeError)
3093
3094
3095class RunCoroutineThreadsafeTests(test_utils.TestCase):
3096    """Test case for asyncio.run_coroutine_threadsafe."""
3097
3098    def setUp(self):
3099        super().setUp()
3100        self.loop = asyncio.new_event_loop()
3101        self.set_event_loop(self.loop) # Will cleanup properly
3102
3103    @asyncio.coroutine
3104    def add(self, a, b, fail=False, cancel=False):
3105        """Wait 0.05 second and return a + b."""
3106        yield from asyncio.sleep(0.05, loop=self.loop)
3107        if fail:
3108            raise RuntimeError("Fail!")
3109        if cancel:
3110            asyncio.current_task(self.loop).cancel()
3111            yield
3112        return a + b
3113
3114    def target(self, fail=False, cancel=False, timeout=None,
3115               advance_coro=False):
3116        """Run add coroutine in the event loop."""
3117        coro = self.add(1, 2, fail=fail, cancel=cancel)
3118        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3119        if advance_coro:
3120            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3121            # otherwise it spills errors and breaks **other** unittests, since
3122            # 'target' is interacting with threads.
3123
3124            # With this call, `coro` will be advanced, so that
3125            # CoroWrapper.__del__ won't do anything when asyncio tests run
3126            # in debug mode.
3127            self.loop.call_soon_threadsafe(coro.send, None)
3128        try:
3129            return future.result(timeout)
3130        finally:
3131            future.done() or future.cancel()
3132
3133    def test_run_coroutine_threadsafe(self):
3134        """Test coroutine submission from a thread to an event loop."""
3135        future = self.loop.run_in_executor(None, self.target)
3136        result = self.loop.run_until_complete(future)
3137        self.assertEqual(result, 3)
3138
3139    def test_run_coroutine_threadsafe_with_exception(self):
3140        """Test coroutine submission from a thread to an event loop
3141        when an exception is raised."""
3142        future = self.loop.run_in_executor(None, self.target, True)
3143        with self.assertRaises(RuntimeError) as exc_context:
3144            self.loop.run_until_complete(future)
3145        self.assertIn("Fail!", exc_context.exception.args)
3146
3147    def test_run_coroutine_threadsafe_with_timeout(self):
3148        """Test coroutine submission from a thread to an event loop
3149        when a timeout is raised."""
3150        callback = lambda: self.target(timeout=0)
3151        future = self.loop.run_in_executor(None, callback)
3152        with self.assertRaises(asyncio.TimeoutError):
3153            self.loop.run_until_complete(future)
3154        test_utils.run_briefly(self.loop)
3155        # Check that there's no pending task (add has been cancelled)
3156        for task in asyncio.all_tasks(self.loop):
3157            self.assertTrue(task.done())
3158
3159    def test_run_coroutine_threadsafe_task_cancelled(self):
3160        """Test coroutine submission from a tread to an event loop
3161        when the task is cancelled."""
3162        callback = lambda: self.target(cancel=True)
3163        future = self.loop.run_in_executor(None, callback)
3164        with self.assertRaises(asyncio.CancelledError):
3165            self.loop.run_until_complete(future)
3166
3167    def test_run_coroutine_threadsafe_task_factory_exception(self):
3168        """Test coroutine submission from a tread to an event loop
3169        when the task factory raise an exception."""
3170
3171        def task_factory(loop, coro):
3172            raise NameError
3173
3174        run = self.loop.run_in_executor(
3175            None, lambda: self.target(advance_coro=True))
3176
3177        # Set exception handler
3178        callback = test_utils.MockCallback()
3179        self.loop.set_exception_handler(callback)
3180
3181        # Set corrupted task factory
3182        self.loop.set_task_factory(task_factory)
3183
3184        # Run event loop
3185        with self.assertRaises(NameError) as exc_context:
3186            self.loop.run_until_complete(run)
3187
3188        # Check exceptions
3189        self.assertEqual(len(callback.call_args_list), 1)
3190        (loop, context), kwargs = callback.call_args
3191        self.assertEqual(context['exception'], exc_context.exception)
3192
3193
3194class SleepTests(test_utils.TestCase):
3195    def setUp(self):
3196        super().setUp()
3197        self.loop = asyncio.new_event_loop()
3198        asyncio.set_event_loop(None)
3199
3200    def tearDown(self):
3201        self.loop.close()
3202        self.loop = None
3203        super().tearDown()
3204
3205    def test_sleep_zero(self):
3206        result = 0
3207
3208        def inc_result(num):
3209            nonlocal result
3210            result += num
3211
3212        @asyncio.coroutine
3213        def coro():
3214            self.loop.call_soon(inc_result, 1)
3215            self.assertEqual(result, 0)
3216            num = yield from asyncio.sleep(0, loop=self.loop, result=10)
3217            self.assertEqual(result, 1) # inc'ed by call_soon
3218            inc_result(num) # num should be 11
3219
3220        self.loop.run_until_complete(coro())
3221        self.assertEqual(result, 11)
3222
3223
3224class CompatibilityTests(test_utils.TestCase):
3225    # Tests for checking a bridge between old-styled coroutines
3226    # and async/await syntax
3227
3228    def setUp(self):
3229        super().setUp()
3230        self.loop = asyncio.new_event_loop()
3231        asyncio.set_event_loop(None)
3232
3233    def tearDown(self):
3234        self.loop.close()
3235        self.loop = None
3236        super().tearDown()
3237
3238    def test_yield_from_awaitable(self):
3239
3240        @asyncio.coroutine
3241        def coro():
3242            yield from asyncio.sleep(0, loop=self.loop)
3243            return 'ok'
3244
3245        result = self.loop.run_until_complete(coro())
3246        self.assertEqual('ok', result)
3247
3248    def test_await_old_style_coro(self):
3249
3250        @asyncio.coroutine
3251        def coro1():
3252            return 'ok1'
3253
3254        @asyncio.coroutine
3255        def coro2():
3256            yield from asyncio.sleep(0, loop=self.loop)
3257            return 'ok2'
3258
3259        async def inner():
3260            return await asyncio.gather(coro1(), coro2(), loop=self.loop)
3261
3262        result = self.loop.run_until_complete(inner())
3263        self.assertEqual(['ok1', 'ok2'], result)
3264
3265    def test_debug_mode_interop(self):
3266        # https://bugs.python.org/issue32636
3267        code = textwrap.dedent("""
3268            import asyncio
3269
3270            async def native_coro():
3271                pass
3272
3273            @asyncio.coroutine
3274            def old_style_coro():
3275                yield from native_coro()
3276
3277            asyncio.run(old_style_coro())
3278        """)
3279        assert_python_ok("-c", code, PYTHONASYNCIODEBUG="1")
3280
3281
3282if __name__ == '__main__':
3283    unittest.main()
3284