1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import traceback 14import types 15import unittest 16import weakref 17from unittest import mock 18 19import asyncio 20from asyncio import coroutines 21from asyncio import futures 22from asyncio import tasks 23from test.test_asyncio import utils as test_utils 24from test import support 25from test.support.script_helper import assert_python_ok 26 27 28def tearDownModule(): 29 asyncio.set_event_loop_policy(None) 30 31 32async def coroutine_function(): 33 pass 34 35 36@contextlib.contextmanager 37def set_coroutine_debug(enabled): 38 coroutines = asyncio.coroutines 39 40 old_debug = coroutines._DEBUG 41 try: 42 coroutines._DEBUG = enabled 43 yield 44 finally: 45 coroutines._DEBUG = old_debug 46 47 48def format_coroutine(qualname, state, src, source_traceback, generator=False): 49 if generator: 50 state = '%s' % state 51 else: 52 state = '%s, defined' % state 53 if source_traceback is not None: 54 frame = source_traceback[-1] 55 return ('coro=<%s() %s at %s> created at %s:%s' 56 % (qualname, state, src, frame[0], frame[1])) 57 else: 58 return 'coro=<%s() %s at %s>' % (qualname, state, src) 59 60 61def get_innermost_context(exc): 62 """ 63 Return information about the innermost exception context in the chain. 64 """ 65 depth = 0 66 while True: 67 context = exc.__context__ 68 if context is None: 69 break 70 71 exc = context 72 depth += 1 73 74 return (type(exc), exc.args, depth) 75 76 77class Dummy: 78 79 def __repr__(self): 80 return '<Dummy>' 81 82 def __call__(self, *args): 83 pass 84 85 86class CoroLikeObject: 87 def send(self, v): 88 raise StopIteration(42) 89 90 def throw(self, *exc): 91 pass 92 93 def close(self): 94 pass 95 96 def __await__(self): 97 return self 98 99 100# The following value can be used as a very small timeout: 101# it passes check "timeout > 0", but has almost 102# no effect on the test performance 103_EPSILON = 0.0001 104 105 106class BaseTaskTests: 107 108 Task = None 109 Future = None 110 111 def new_task(self, loop, coro, name='TestTask'): 112 return self.__class__.Task(coro, loop=loop, name=name) 113 114 def new_future(self, loop): 115 return self.__class__.Future(loop=loop) 116 117 def setUp(self): 118 super().setUp() 119 self.loop = self.new_test_loop() 120 self.loop.set_task_factory(self.new_task) 121 self.loop.create_future = lambda: self.new_future(self.loop) 122 123 def test_task_cancel_message_getter(self): 124 async def coro(): 125 pass 126 t = self.new_task(self.loop, coro()) 127 self.assertTrue(hasattr(t, '_cancel_message')) 128 self.assertEqual(t._cancel_message, None) 129 130 t.cancel('my message') 131 self.assertEqual(t._cancel_message, 'my message') 132 133 with self.assertRaises(asyncio.CancelledError): 134 self.loop.run_until_complete(t) 135 136 def test_task_cancel_message_setter(self): 137 async def coro(): 138 pass 139 t = self.new_task(self.loop, coro()) 140 t.cancel('my message') 141 t._cancel_message = 'my new message' 142 self.assertEqual(t._cancel_message, 'my new message') 143 144 with self.assertRaises(asyncio.CancelledError): 145 self.loop.run_until_complete(t) 146 147 def test_task_del_collect(self): 148 class Evil: 149 def __del__(self): 150 gc.collect() 151 152 async def run(): 153 return Evil() 154 155 self.loop.run_until_complete( 156 asyncio.gather(*[ 157 self.new_task(self.loop, run()) for _ in range(100) 158 ], loop=self.loop)) 159 160 def test_other_loop_future(self): 161 other_loop = asyncio.new_event_loop() 162 fut = self.new_future(other_loop) 163 164 async def run(fut): 165 await fut 166 167 try: 168 with self.assertRaisesRegex(RuntimeError, 169 r'Task .* got Future .* attached'): 170 self.loop.run_until_complete(run(fut)) 171 finally: 172 other_loop.close() 173 174 def test_task_awaits_on_itself(self): 175 176 async def test(): 177 await task 178 179 task = asyncio.ensure_future(test(), loop=self.loop) 180 181 with self.assertRaisesRegex(RuntimeError, 182 'Task cannot await on itself'): 183 self.loop.run_until_complete(task) 184 185 def test_task_class(self): 186 async def notmuch(): 187 return 'ok' 188 t = self.new_task(self.loop, notmuch()) 189 self.loop.run_until_complete(t) 190 self.assertTrue(t.done()) 191 self.assertEqual(t.result(), 'ok') 192 self.assertIs(t._loop, self.loop) 193 self.assertIs(t.get_loop(), self.loop) 194 195 loop = asyncio.new_event_loop() 196 self.set_event_loop(loop) 197 t = self.new_task(loop, notmuch()) 198 self.assertIs(t._loop, loop) 199 loop.run_until_complete(t) 200 loop.close() 201 202 def test_ensure_future_coroutine(self): 203 with self.assertWarns(DeprecationWarning): 204 @asyncio.coroutine 205 def notmuch(): 206 return 'ok' 207 t = asyncio.ensure_future(notmuch(), loop=self.loop) 208 self.loop.run_until_complete(t) 209 self.assertTrue(t.done()) 210 self.assertEqual(t.result(), 'ok') 211 self.assertIs(t._loop, self.loop) 212 213 loop = asyncio.new_event_loop() 214 self.set_event_loop(loop) 215 t = asyncio.ensure_future(notmuch(), loop=loop) 216 self.assertIs(t._loop, loop) 217 loop.run_until_complete(t) 218 loop.close() 219 220 def test_ensure_future_future(self): 221 f_orig = self.new_future(self.loop) 222 f_orig.set_result('ko') 223 224 f = asyncio.ensure_future(f_orig) 225 self.loop.run_until_complete(f) 226 self.assertTrue(f.done()) 227 self.assertEqual(f.result(), 'ko') 228 self.assertIs(f, f_orig) 229 230 loop = asyncio.new_event_loop() 231 self.set_event_loop(loop) 232 233 with self.assertRaises(ValueError): 234 f = asyncio.ensure_future(f_orig, loop=loop) 235 236 loop.close() 237 238 f = asyncio.ensure_future(f_orig, loop=self.loop) 239 self.assertIs(f, f_orig) 240 241 def test_ensure_future_task(self): 242 async def notmuch(): 243 return 'ok' 244 t_orig = self.new_task(self.loop, notmuch()) 245 t = asyncio.ensure_future(t_orig) 246 self.loop.run_until_complete(t) 247 self.assertTrue(t.done()) 248 self.assertEqual(t.result(), 'ok') 249 self.assertIs(t, t_orig) 250 251 loop = asyncio.new_event_loop() 252 self.set_event_loop(loop) 253 254 with self.assertRaises(ValueError): 255 t = asyncio.ensure_future(t_orig, loop=loop) 256 257 loop.close() 258 259 t = asyncio.ensure_future(t_orig, loop=self.loop) 260 self.assertIs(t, t_orig) 261 262 def test_ensure_future_awaitable(self): 263 class Aw: 264 def __init__(self, coro): 265 self.coro = coro 266 def __await__(self): 267 return (yield from self.coro) 268 269 with self.assertWarns(DeprecationWarning): 270 @asyncio.coroutine 271 def coro(): 272 return 'ok' 273 274 loop = asyncio.new_event_loop() 275 self.set_event_loop(loop) 276 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 277 loop.run_until_complete(fut) 278 assert fut.result() == 'ok' 279 280 def test_ensure_future_neither(self): 281 with self.assertRaises(TypeError): 282 asyncio.ensure_future('ok') 283 284 def test_ensure_future_error_msg(self): 285 loop = asyncio.new_event_loop() 286 f = self.new_future(self.loop) 287 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 288 'different loop than the one specified as ' 289 'the loop argument'): 290 asyncio.ensure_future(f, loop=loop) 291 loop.close() 292 293 def test_get_stack(self): 294 T = None 295 296 async def foo(): 297 await bar() 298 299 async def bar(): 300 # test get_stack() 301 f = T.get_stack(limit=1) 302 try: 303 self.assertEqual(f[0].f_code.co_name, 'foo') 304 finally: 305 f = None 306 307 # test print_stack() 308 file = io.StringIO() 309 T.print_stack(limit=1, file=file) 310 file.seek(0) 311 tb = file.read() 312 self.assertRegex(tb, r'foo\(\) running') 313 314 async def runner(): 315 nonlocal T 316 T = asyncio.ensure_future(foo(), loop=self.loop) 317 await T 318 319 self.loop.run_until_complete(runner()) 320 321 def test_task_repr(self): 322 self.loop.set_debug(False) 323 324 async def notmuch(): 325 return 'abc' 326 327 # test coroutine function 328 self.assertEqual(notmuch.__name__, 'notmuch') 329 self.assertRegex(notmuch.__qualname__, 330 r'\w+.test_task_repr.<locals>.notmuch') 331 self.assertEqual(notmuch.__module__, __name__) 332 333 filename, lineno = test_utils.get_function_source(notmuch) 334 src = "%s:%s" % (filename, lineno) 335 336 # test coroutine object 337 gen = notmuch() 338 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 339 self.assertEqual(gen.__name__, 'notmuch') 340 self.assertEqual(gen.__qualname__, coro_qualname) 341 342 # test pending Task 343 t = self.new_task(self.loop, gen) 344 t.add_done_callback(Dummy()) 345 346 coro = format_coroutine(coro_qualname, 'running', src, 347 t._source_traceback, generator=True) 348 self.assertEqual(repr(t), 349 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 350 351 # test cancelling Task 352 t.cancel() # Does not take immediate effect! 353 self.assertEqual(repr(t), 354 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 355 356 # test cancelled Task 357 self.assertRaises(asyncio.CancelledError, 358 self.loop.run_until_complete, t) 359 coro = format_coroutine(coro_qualname, 'done', src, 360 t._source_traceback) 361 self.assertEqual(repr(t), 362 "<Task cancelled name='TestTask' %s>" % coro) 363 364 # test finished Task 365 t = self.new_task(self.loop, notmuch()) 366 self.loop.run_until_complete(t) 367 coro = format_coroutine(coro_qualname, 'done', src, 368 t._source_traceback) 369 self.assertEqual(repr(t), 370 "<Task finished name='TestTask' %s result='abc'>" % coro) 371 372 def test_task_repr_autogenerated(self): 373 async def notmuch(): 374 return 123 375 376 t1 = self.new_task(self.loop, notmuch(), None) 377 t2 = self.new_task(self.loop, notmuch(), None) 378 self.assertNotEqual(repr(t1), repr(t2)) 379 380 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 381 self.assertIsNotNone(match1) 382 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 383 self.assertIsNotNone(match2) 384 385 # Autogenerated task names should have monotonically increasing numbers 386 self.assertLess(int(match1.group(1)), int(match2.group(1))) 387 self.loop.run_until_complete(t1) 388 self.loop.run_until_complete(t2) 389 390 def test_task_repr_name_not_str(self): 391 async def notmuch(): 392 return 123 393 394 t = self.new_task(self.loop, notmuch()) 395 t.set_name({6}) 396 self.assertEqual(t.get_name(), '{6}') 397 self.loop.run_until_complete(t) 398 399 def test_task_repr_coro_decorator(self): 400 self.loop.set_debug(False) 401 402 with self.assertWarns(DeprecationWarning): 403 @asyncio.coroutine 404 def notmuch(): 405 # notmuch() function doesn't use yield from: it will be wrapped by 406 # @coroutine decorator 407 return 123 408 409 # test coroutine function 410 self.assertEqual(notmuch.__name__, 'notmuch') 411 self.assertRegex(notmuch.__qualname__, 412 r'\w+.test_task_repr_coro_decorator' 413 r'\.<locals>\.notmuch') 414 self.assertEqual(notmuch.__module__, __name__) 415 416 # test coroutine object 417 gen = notmuch() 418 # On Python >= 3.5, generators now inherit the name of the 419 # function, as expected, and have a qualified name (__qualname__ 420 # attribute). 421 coro_name = 'notmuch' 422 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 423 '.<locals>.notmuch') 424 self.assertEqual(gen.__name__, coro_name) 425 self.assertEqual(gen.__qualname__, coro_qualname) 426 427 # test repr(CoroWrapper) 428 if coroutines._DEBUG: 429 # format the coroutine object 430 if coroutines._DEBUG: 431 filename, lineno = test_utils.get_function_source(notmuch) 432 frame = gen._source_traceback[-1] 433 coro = ('%s() running, defined at %s:%s, created at %s:%s' 434 % (coro_qualname, filename, lineno, 435 frame[0], frame[1])) 436 else: 437 code = gen.gi_code 438 coro = ('%s() running at %s:%s' 439 % (coro_qualname, code.co_filename, 440 code.co_firstlineno)) 441 442 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 443 444 # test pending Task 445 t = self.new_task(self.loop, gen) 446 t.add_done_callback(Dummy()) 447 448 # format the coroutine object 449 if coroutines._DEBUG: 450 src = '%s:%s' % test_utils.get_function_source(notmuch) 451 else: 452 code = gen.gi_code 453 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 454 coro = format_coroutine(coro_qualname, 'running', src, 455 t._source_traceback, 456 generator=not coroutines._DEBUG) 457 self.assertEqual(repr(t), 458 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 459 self.loop.run_until_complete(t) 460 461 def test_task_repr_wait_for(self): 462 self.loop.set_debug(False) 463 464 async def wait_for(fut): 465 return await fut 466 467 fut = self.new_future(self.loop) 468 task = self.new_task(self.loop, wait_for(fut)) 469 test_utils.run_briefly(self.loop) 470 self.assertRegex(repr(task), 471 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 472 473 fut.set_result(None) 474 self.loop.run_until_complete(task) 475 476 def test_task_repr_partial_corowrapper(self): 477 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 478 # coroutine is a partial function 479 with set_coroutine_debug(True): 480 self.loop.set_debug(True) 481 482 async def func(x, y): 483 await asyncio.sleep(0) 484 485 with self.assertWarns(DeprecationWarning): 486 partial_func = asyncio.coroutine(functools.partial(func, 1)) 487 task = self.loop.create_task(partial_func(2)) 488 489 # make warnings quiet 490 task._log_destroy_pending = False 491 self.addCleanup(task._coro.close) 492 493 coro_repr = repr(task._coro) 494 expected = ( 495 r'<coroutine object \w+\.test_task_repr_partial_corowrapper' 496 r'\.<locals>\.func at' 497 ) 498 self.assertRegex(coro_repr, expected) 499 500 def test_task_basics(self): 501 502 async def outer(): 503 a = await inner1() 504 b = await inner2() 505 return a+b 506 507 async def inner1(): 508 return 42 509 510 async def inner2(): 511 return 1000 512 513 t = outer() 514 self.assertEqual(self.loop.run_until_complete(t), 1042) 515 516 def test_exception_chaining_after_await(self): 517 # Test that when awaiting on a task when an exception is already 518 # active, if the task raises an exception it will be chained 519 # with the original. 520 loop = asyncio.new_event_loop() 521 self.set_event_loop(loop) 522 523 async def raise_error(): 524 raise ValueError 525 526 async def run(): 527 try: 528 raise KeyError(3) 529 except Exception as exc: 530 task = self.new_task(loop, raise_error()) 531 try: 532 await task 533 except Exception as exc: 534 self.assertEqual(type(exc), ValueError) 535 chained = exc.__context__ 536 self.assertEqual((type(chained), chained.args), 537 (KeyError, (3,))) 538 539 try: 540 task = self.new_task(loop, run()) 541 loop.run_until_complete(task) 542 finally: 543 loop.close() 544 545 def test_exception_chaining_after_await_with_context_cycle(self): 546 # Check trying to create an exception context cycle: 547 # https://bugs.python.org/issue40696 548 has_cycle = None 549 loop = asyncio.new_event_loop() 550 self.set_event_loop(loop) 551 552 async def process_exc(exc): 553 raise exc 554 555 async def run(): 556 nonlocal has_cycle 557 try: 558 raise KeyError('a') 559 except Exception as exc: 560 task = self.new_task(loop, process_exc(exc)) 561 try: 562 await task 563 except BaseException as exc: 564 has_cycle = (exc is exc.__context__) 565 # Prevent a hang if has_cycle is True. 566 exc.__context__ = None 567 568 try: 569 task = self.new_task(loop, run()) 570 loop.run_until_complete(task) 571 finally: 572 loop.close() 573 # This also distinguishes from the initial has_cycle=None. 574 self.assertEqual(has_cycle, False) 575 576 def test_cancel(self): 577 578 def gen(): 579 when = yield 580 self.assertAlmostEqual(10.0, when) 581 yield 0 582 583 loop = self.new_test_loop(gen) 584 585 async def task(): 586 await asyncio.sleep(10.0) 587 return 12 588 589 t = self.new_task(loop, task()) 590 loop.call_soon(t.cancel) 591 with self.assertRaises(asyncio.CancelledError): 592 loop.run_until_complete(t) 593 self.assertTrue(t.done()) 594 self.assertTrue(t.cancelled()) 595 self.assertFalse(t.cancel()) 596 597 def test_cancel_with_message_then_future_result(self): 598 # Test Future.result() after calling cancel() with a message. 599 cases = [ 600 ((), ()), 601 ((None,), ()), 602 (('my message',), ('my message',)), 603 # Non-string values should roundtrip. 604 ((5,), (5,)), 605 ] 606 for cancel_args, expected_args in cases: 607 with self.subTest(cancel_args=cancel_args): 608 loop = asyncio.new_event_loop() 609 self.set_event_loop(loop) 610 611 async def sleep(): 612 await asyncio.sleep(10) 613 614 async def coro(): 615 task = self.new_task(loop, sleep()) 616 await asyncio.sleep(0) 617 task.cancel(*cancel_args) 618 done, pending = await asyncio.wait([task]) 619 task.result() 620 621 task = self.new_task(loop, coro()) 622 with self.assertRaises(asyncio.CancelledError) as cm: 623 loop.run_until_complete(task) 624 exc = cm.exception 625 self.assertEqual(exc.args, ()) 626 627 actual = get_innermost_context(exc) 628 self.assertEqual(actual, 629 (asyncio.CancelledError, expected_args, 2)) 630 631 def test_cancel_with_message_then_future_exception(self): 632 # Test Future.exception() after calling cancel() with a message. 633 cases = [ 634 ((), ()), 635 ((None,), ()), 636 (('my message',), ('my message',)), 637 # Non-string values should roundtrip. 638 ((5,), (5,)), 639 ] 640 for cancel_args, expected_args in cases: 641 with self.subTest(cancel_args=cancel_args): 642 loop = asyncio.new_event_loop() 643 self.set_event_loop(loop) 644 645 async def sleep(): 646 await asyncio.sleep(10) 647 648 async def coro(): 649 task = self.new_task(loop, sleep()) 650 await asyncio.sleep(0) 651 task.cancel(*cancel_args) 652 done, pending = await asyncio.wait([task]) 653 task.exception() 654 655 task = self.new_task(loop, coro()) 656 with self.assertRaises(asyncio.CancelledError) as cm: 657 loop.run_until_complete(task) 658 exc = cm.exception 659 self.assertEqual(exc.args, ()) 660 661 actual = get_innermost_context(exc) 662 self.assertEqual(actual, 663 (asyncio.CancelledError, expected_args, 2)) 664 665 def test_cancel_with_message_before_starting_task(self): 666 loop = asyncio.new_event_loop() 667 self.set_event_loop(loop) 668 669 async def sleep(): 670 await asyncio.sleep(10) 671 672 async def coro(): 673 task = self.new_task(loop, sleep()) 674 # We deliberately leave out the sleep here. 675 task.cancel('my message') 676 done, pending = await asyncio.wait([task]) 677 task.exception() 678 679 task = self.new_task(loop, coro()) 680 with self.assertRaises(asyncio.CancelledError) as cm: 681 loop.run_until_complete(task) 682 exc = cm.exception 683 self.assertEqual(exc.args, ()) 684 685 actual = get_innermost_context(exc) 686 self.assertEqual(actual, 687 (asyncio.CancelledError, ('my message',), 2)) 688 689 def test_cancel_yield(self): 690 with self.assertWarns(DeprecationWarning): 691 @asyncio.coroutine 692 def task(): 693 yield 694 yield 695 return 12 696 697 t = self.new_task(self.loop, task()) 698 test_utils.run_briefly(self.loop) # start coro 699 t.cancel() 700 self.assertRaises( 701 asyncio.CancelledError, self.loop.run_until_complete, t) 702 self.assertTrue(t.done()) 703 self.assertTrue(t.cancelled()) 704 self.assertFalse(t.cancel()) 705 706 def test_cancel_inner_future(self): 707 f = self.new_future(self.loop) 708 709 async def task(): 710 await f 711 return 12 712 713 t = self.new_task(self.loop, task()) 714 test_utils.run_briefly(self.loop) # start task 715 f.cancel() 716 with self.assertRaises(asyncio.CancelledError): 717 self.loop.run_until_complete(t) 718 self.assertTrue(f.cancelled()) 719 self.assertTrue(t.cancelled()) 720 721 def test_cancel_both_task_and_inner_future(self): 722 f = self.new_future(self.loop) 723 724 async def task(): 725 await f 726 return 12 727 728 t = self.new_task(self.loop, task()) 729 test_utils.run_briefly(self.loop) 730 731 f.cancel() 732 t.cancel() 733 734 with self.assertRaises(asyncio.CancelledError): 735 self.loop.run_until_complete(t) 736 737 self.assertTrue(t.done()) 738 self.assertTrue(f.cancelled()) 739 self.assertTrue(t.cancelled()) 740 741 def test_cancel_task_catching(self): 742 fut1 = self.new_future(self.loop) 743 fut2 = self.new_future(self.loop) 744 745 async def task(): 746 await fut1 747 try: 748 await fut2 749 except asyncio.CancelledError: 750 return 42 751 752 t = self.new_task(self.loop, task()) 753 test_utils.run_briefly(self.loop) 754 self.assertIs(t._fut_waiter, fut1) # White-box test. 755 fut1.set_result(None) 756 test_utils.run_briefly(self.loop) 757 self.assertIs(t._fut_waiter, fut2) # White-box test. 758 t.cancel() 759 self.assertTrue(fut2.cancelled()) 760 res = self.loop.run_until_complete(t) 761 self.assertEqual(res, 42) 762 self.assertFalse(t.cancelled()) 763 764 def test_cancel_task_ignoring(self): 765 fut1 = self.new_future(self.loop) 766 fut2 = self.new_future(self.loop) 767 fut3 = self.new_future(self.loop) 768 769 async def task(): 770 await fut1 771 try: 772 await fut2 773 except asyncio.CancelledError: 774 pass 775 res = await fut3 776 return res 777 778 t = self.new_task(self.loop, task()) 779 test_utils.run_briefly(self.loop) 780 self.assertIs(t._fut_waiter, fut1) # White-box test. 781 fut1.set_result(None) 782 test_utils.run_briefly(self.loop) 783 self.assertIs(t._fut_waiter, fut2) # White-box test. 784 t.cancel() 785 self.assertTrue(fut2.cancelled()) 786 test_utils.run_briefly(self.loop) 787 self.assertIs(t._fut_waiter, fut3) # White-box test. 788 fut3.set_result(42) 789 res = self.loop.run_until_complete(t) 790 self.assertEqual(res, 42) 791 self.assertFalse(fut3.cancelled()) 792 self.assertFalse(t.cancelled()) 793 794 def test_cancel_current_task(self): 795 loop = asyncio.new_event_loop() 796 self.set_event_loop(loop) 797 798 async def task(): 799 t.cancel() 800 self.assertTrue(t._must_cancel) # White-box test. 801 # The sleep should be cancelled immediately. 802 await asyncio.sleep(100) 803 return 12 804 805 t = self.new_task(loop, task()) 806 self.assertFalse(t.cancelled()) 807 self.assertRaises( 808 asyncio.CancelledError, loop.run_until_complete, t) 809 self.assertTrue(t.done()) 810 self.assertTrue(t.cancelled()) 811 self.assertFalse(t._must_cancel) # White-box test. 812 self.assertFalse(t.cancel()) 813 814 def test_cancel_at_end(self): 815 """coroutine end right after task is cancelled""" 816 loop = asyncio.new_event_loop() 817 self.set_event_loop(loop) 818 819 async def task(): 820 t.cancel() 821 self.assertTrue(t._must_cancel) # White-box test. 822 return 12 823 824 t = self.new_task(loop, task()) 825 self.assertFalse(t.cancelled()) 826 self.assertRaises( 827 asyncio.CancelledError, loop.run_until_complete, t) 828 self.assertTrue(t.done()) 829 self.assertTrue(t.cancelled()) 830 self.assertFalse(t._must_cancel) # White-box test. 831 self.assertFalse(t.cancel()) 832 833 def test_cancel_awaited_task(self): 834 # This tests for a relatively rare condition when 835 # a task cancellation is requested for a task which is not 836 # currently blocked, such as a task cancelling itself. 837 # In this situation we must ensure that whatever next future 838 # or task the cancelled task blocks on is cancelled correctly 839 # as well. See also bpo-34872. 840 loop = asyncio.new_event_loop() 841 self.addCleanup(lambda: loop.close()) 842 843 task = nested_task = None 844 fut = self.new_future(loop) 845 846 async def nested(): 847 await fut 848 849 async def coro(): 850 nonlocal nested_task 851 # Create a sub-task and wait for it to run. 852 nested_task = self.new_task(loop, nested()) 853 await asyncio.sleep(0) 854 855 # Request the current task to be cancelled. 856 task.cancel() 857 # Block on the nested task, which should be immediately 858 # cancelled. 859 await nested_task 860 861 task = self.new_task(loop, coro()) 862 with self.assertRaises(asyncio.CancelledError): 863 loop.run_until_complete(task) 864 865 self.assertTrue(task.cancelled()) 866 self.assertTrue(nested_task.cancelled()) 867 self.assertTrue(fut.cancelled()) 868 869 def assert_text_contains(self, text, substr): 870 if substr not in text: 871 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 872 873 def test_cancel_traceback_for_future_result(self): 874 # When calling Future.result() on a cancelled task, check that the 875 # line of code that was interrupted is included in the traceback. 876 loop = asyncio.new_event_loop() 877 self.set_event_loop(loop) 878 879 async def nested(): 880 # This will get cancelled immediately. 881 await asyncio.sleep(10) 882 883 async def coro(): 884 task = self.new_task(loop, nested()) 885 await asyncio.sleep(0) 886 task.cancel() 887 await task # search target 888 889 task = self.new_task(loop, coro()) 890 try: 891 loop.run_until_complete(task) 892 except asyncio.CancelledError: 893 tb = traceback.format_exc() 894 self.assert_text_contains(tb, "await asyncio.sleep(10)") 895 # The intermediate await should also be included. 896 self.assert_text_contains(tb, "await task # search target") 897 else: 898 self.fail('CancelledError did not occur') 899 900 def test_cancel_traceback_for_future_exception(self): 901 # When calling Future.exception() on a cancelled task, check that the 902 # line of code that was interrupted is included in the traceback. 903 loop = asyncio.new_event_loop() 904 self.set_event_loop(loop) 905 906 async def nested(): 907 # This will get cancelled immediately. 908 await asyncio.sleep(10) 909 910 async def coro(): 911 task = self.new_task(loop, nested()) 912 await asyncio.sleep(0) 913 task.cancel() 914 done, pending = await asyncio.wait([task]) 915 task.exception() # search target 916 917 task = self.new_task(loop, coro()) 918 try: 919 loop.run_until_complete(task) 920 except asyncio.CancelledError: 921 tb = traceback.format_exc() 922 self.assert_text_contains(tb, "await asyncio.sleep(10)") 923 # The intermediate await should also be included. 924 self.assert_text_contains(tb, 925 "task.exception() # search target") 926 else: 927 self.fail('CancelledError did not occur') 928 929 def test_stop_while_run_in_complete(self): 930 931 def gen(): 932 when = yield 933 self.assertAlmostEqual(0.1, when) 934 when = yield 0.1 935 self.assertAlmostEqual(0.2, when) 936 when = yield 0.1 937 self.assertAlmostEqual(0.3, when) 938 yield 0.1 939 940 loop = self.new_test_loop(gen) 941 942 x = 0 943 944 async def task(): 945 nonlocal x 946 while x < 10: 947 await asyncio.sleep(0.1) 948 x += 1 949 if x == 2: 950 loop.stop() 951 952 t = self.new_task(loop, task()) 953 with self.assertRaises(RuntimeError) as cm: 954 loop.run_until_complete(t) 955 self.assertEqual(str(cm.exception), 956 'Event loop stopped before Future completed.') 957 self.assertFalse(t.done()) 958 self.assertEqual(x, 2) 959 self.assertAlmostEqual(0.3, loop.time()) 960 961 t.cancel() 962 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 963 964 def test_log_traceback(self): 965 async def coro(): 966 pass 967 968 task = self.new_task(self.loop, coro()) 969 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 970 task._log_traceback = True 971 self.loop.run_until_complete(task) 972 973 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 974 def gen(): 975 when = yield 976 self.assertAlmostEqual(0, when) 977 978 loop = self.new_test_loop(gen) 979 980 fut = self.new_future(loop) 981 fut.set_result('done') 982 983 ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) 984 985 self.assertEqual(ret, 'done') 986 self.assertTrue(fut.done()) 987 self.assertAlmostEqual(0, loop.time()) 988 989 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 990 def gen(): 991 when = yield 992 self.assertAlmostEqual(0, when) 993 994 loop = self.new_test_loop(gen) 995 996 foo_started = False 997 998 async def foo(): 999 nonlocal foo_started 1000 foo_started = True 1001 1002 with self.assertRaises(asyncio.TimeoutError): 1003 loop.run_until_complete(asyncio.wait_for(foo(), 0)) 1004 1005 self.assertAlmostEqual(0, loop.time()) 1006 self.assertEqual(foo_started, False) 1007 1008 def test_wait_for_timeout_less_then_0_or_0(self): 1009 def gen(): 1010 when = yield 1011 self.assertAlmostEqual(0.2, when) 1012 when = yield 0 1013 self.assertAlmostEqual(0, when) 1014 1015 for timeout in [0, -1]: 1016 with self.subTest(timeout=timeout): 1017 loop = self.new_test_loop(gen) 1018 1019 foo_running = None 1020 1021 async def foo(): 1022 nonlocal foo_running 1023 foo_running = True 1024 try: 1025 await asyncio.sleep(0.2) 1026 finally: 1027 foo_running = False 1028 return 'done' 1029 1030 fut = self.new_task(loop, foo()) 1031 1032 with self.assertRaises(asyncio.TimeoutError): 1033 loop.run_until_complete(asyncio.wait_for(fut, timeout)) 1034 self.assertTrue(fut.done()) 1035 # it should have been cancelled due to the timeout 1036 self.assertTrue(fut.cancelled()) 1037 self.assertAlmostEqual(0, loop.time()) 1038 self.assertEqual(foo_running, False) 1039 1040 def test_wait_for(self): 1041 1042 def gen(): 1043 when = yield 1044 self.assertAlmostEqual(0.2, when) 1045 when = yield 0 1046 self.assertAlmostEqual(0.1, when) 1047 when = yield 0.1 1048 1049 loop = self.new_test_loop(gen) 1050 1051 foo_running = None 1052 1053 async def foo(): 1054 nonlocal foo_running 1055 foo_running = True 1056 try: 1057 await asyncio.sleep(0.2) 1058 finally: 1059 foo_running = False 1060 return 'done' 1061 1062 fut = self.new_task(loop, foo()) 1063 1064 with self.assertRaises(asyncio.TimeoutError): 1065 loop.run_until_complete(asyncio.wait_for(fut, 0.1)) 1066 self.assertTrue(fut.done()) 1067 # it should have been cancelled due to the timeout 1068 self.assertTrue(fut.cancelled()) 1069 self.assertAlmostEqual(0.1, loop.time()) 1070 self.assertEqual(foo_running, False) 1071 1072 def test_wait_for_blocking(self): 1073 loop = self.new_test_loop() 1074 1075 async def coro(): 1076 return 'done' 1077 1078 res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) 1079 self.assertEqual(res, 'done') 1080 1081 def test_wait_for_with_global_loop(self): 1082 1083 def gen(): 1084 when = yield 1085 self.assertAlmostEqual(0.2, when) 1086 when = yield 0 1087 self.assertAlmostEqual(0.01, when) 1088 yield 0.01 1089 1090 loop = self.new_test_loop(gen) 1091 1092 async def foo(): 1093 await asyncio.sleep(0.2) 1094 return 'done' 1095 1096 asyncio.set_event_loop(loop) 1097 try: 1098 fut = self.new_task(loop, foo()) 1099 with self.assertRaises(asyncio.TimeoutError): 1100 loop.run_until_complete(asyncio.wait_for(fut, 0.01)) 1101 finally: 1102 asyncio.set_event_loop(None) 1103 1104 self.assertAlmostEqual(0.01, loop.time()) 1105 self.assertTrue(fut.done()) 1106 self.assertTrue(fut.cancelled()) 1107 1108 def test_wait_for_race_condition(self): 1109 1110 def gen(): 1111 yield 0.1 1112 yield 0.1 1113 yield 0.1 1114 1115 loop = self.new_test_loop(gen) 1116 1117 fut = self.new_future(loop) 1118 task = asyncio.wait_for(fut, timeout=0.2) 1119 loop.call_later(0.1, fut.set_result, "ok") 1120 res = loop.run_until_complete(task) 1121 self.assertEqual(res, "ok") 1122 1123 def test_wait_for_cancellation_race_condition(self): 1124 def gen(): 1125 yield 0.1 1126 yield 0.1 1127 yield 0.1 1128 yield 0.1 1129 1130 loop = self.new_test_loop(gen) 1131 1132 fut = self.new_future(loop) 1133 loop.call_later(0.1, fut.set_result, "ok") 1134 task = loop.create_task(asyncio.wait_for(fut, timeout=1)) 1135 loop.call_later(0.1, task.cancel) 1136 res = loop.run_until_complete(task) 1137 self.assertEqual(res, "ok") 1138 1139 def test_wait_for_waits_for_task_cancellation(self): 1140 loop = asyncio.new_event_loop() 1141 self.addCleanup(loop.close) 1142 1143 task_done = False 1144 1145 async def foo(): 1146 async def inner(): 1147 nonlocal task_done 1148 try: 1149 await asyncio.sleep(0.2) 1150 except asyncio.CancelledError: 1151 await asyncio.sleep(_EPSILON) 1152 raise 1153 finally: 1154 task_done = True 1155 1156 inner_task = self.new_task(loop, inner()) 1157 1158 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1159 1160 with self.assertRaises(asyncio.TimeoutError) as cm: 1161 loop.run_until_complete(foo()) 1162 1163 self.assertTrue(task_done) 1164 chained = cm.exception.__context__ 1165 self.assertEqual(type(chained), asyncio.CancelledError) 1166 1167 def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): 1168 loop = asyncio.new_event_loop() 1169 self.addCleanup(loop.close) 1170 1171 task_done = False 1172 1173 async def foo(): 1174 async def inner(): 1175 nonlocal task_done 1176 try: 1177 await asyncio.sleep(10) 1178 except asyncio.CancelledError: 1179 await asyncio.sleep(_EPSILON) 1180 raise 1181 finally: 1182 task_done = True 1183 1184 inner_task = self.new_task(loop, inner()) 1185 await asyncio.sleep(_EPSILON) 1186 await asyncio.wait_for(inner_task, timeout=0) 1187 1188 with self.assertRaises(asyncio.TimeoutError) as cm: 1189 loop.run_until_complete(foo()) 1190 1191 self.assertTrue(task_done) 1192 chained = cm.exception.__context__ 1193 self.assertEqual(type(chained), asyncio.CancelledError) 1194 1195 def test_wait_for_reraises_exception_during_cancellation(self): 1196 loop = asyncio.new_event_loop() 1197 self.addCleanup(loop.close) 1198 1199 class FooException(Exception): 1200 pass 1201 1202 async def foo(): 1203 async def inner(): 1204 try: 1205 await asyncio.sleep(0.2) 1206 finally: 1207 raise FooException 1208 1209 inner_task = self.new_task(loop, inner()) 1210 1211 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1212 1213 with self.assertRaises(FooException): 1214 loop.run_until_complete(foo()) 1215 1216 def test_wait_for_raises_timeout_error_if_returned_during_cancellation(self): 1217 loop = asyncio.new_event_loop() 1218 self.addCleanup(loop.close) 1219 1220 async def foo(): 1221 async def inner(): 1222 try: 1223 await asyncio.sleep(0.2) 1224 except asyncio.CancelledError: 1225 return 42 1226 1227 inner_task = self.new_task(loop, inner()) 1228 1229 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1230 1231 with self.assertRaises(asyncio.TimeoutError): 1232 loop.run_until_complete(foo()) 1233 1234 def test_wait_for_self_cancellation(self): 1235 loop = asyncio.new_event_loop() 1236 self.addCleanup(loop.close) 1237 1238 async def foo(): 1239 async def inner(): 1240 try: 1241 await asyncio.sleep(0.3) 1242 except asyncio.CancelledError: 1243 try: 1244 await asyncio.sleep(0.3) 1245 except asyncio.CancelledError: 1246 await asyncio.sleep(0.3) 1247 1248 return 42 1249 1250 inner_task = self.new_task(loop, inner()) 1251 1252 wait = asyncio.wait_for(inner_task, timeout=0.1) 1253 1254 # Test that wait_for itself is properly cancellable 1255 # even when the initial task holds up the initial cancellation. 1256 task = self.new_task(loop, wait) 1257 await asyncio.sleep(0.2) 1258 task.cancel() 1259 1260 with self.assertRaises(asyncio.CancelledError): 1261 await task 1262 1263 self.assertEqual(await inner_task, 42) 1264 1265 loop.run_until_complete(foo()) 1266 1267 def test_wait(self): 1268 1269 def gen(): 1270 when = yield 1271 self.assertAlmostEqual(0.1, when) 1272 when = yield 0 1273 self.assertAlmostEqual(0.15, when) 1274 yield 0.15 1275 1276 loop = self.new_test_loop(gen) 1277 1278 a = self.new_task(loop, asyncio.sleep(0.1)) 1279 b = self.new_task(loop, asyncio.sleep(0.15)) 1280 1281 async def foo(): 1282 done, pending = await asyncio.wait([b, a]) 1283 self.assertEqual(done, set([a, b])) 1284 self.assertEqual(pending, set()) 1285 return 42 1286 1287 res = loop.run_until_complete(self.new_task(loop, foo())) 1288 self.assertEqual(res, 42) 1289 self.assertAlmostEqual(0.15, loop.time()) 1290 1291 # Doing it again should take no time and exercise a different path. 1292 res = loop.run_until_complete(self.new_task(loop, foo())) 1293 self.assertAlmostEqual(0.15, loop.time()) 1294 self.assertEqual(res, 42) 1295 1296 def test_wait_with_global_loop(self): 1297 1298 def gen(): 1299 when = yield 1300 self.assertAlmostEqual(0.01, when) 1301 when = yield 0 1302 self.assertAlmostEqual(0.015, when) 1303 yield 0.015 1304 1305 loop = self.new_test_loop(gen) 1306 1307 a = self.new_task(loop, asyncio.sleep(0.01)) 1308 b = self.new_task(loop, asyncio.sleep(0.015)) 1309 1310 async def foo(): 1311 done, pending = await asyncio.wait([b, a]) 1312 self.assertEqual(done, set([a, b])) 1313 self.assertEqual(pending, set()) 1314 return 42 1315 1316 asyncio.set_event_loop(loop) 1317 res = loop.run_until_complete( 1318 self.new_task(loop, foo())) 1319 1320 self.assertEqual(res, 42) 1321 1322 def test_wait_duplicate_coroutines(self): 1323 1324 with self.assertWarns(DeprecationWarning): 1325 @asyncio.coroutine 1326 def coro(s): 1327 return s 1328 c = coro('test') 1329 task = self.new_task( 1330 self.loop, 1331 asyncio.wait([c, c, coro('spam')])) 1332 1333 with self.assertWarns(DeprecationWarning): 1334 done, pending = self.loop.run_until_complete(task) 1335 1336 self.assertFalse(pending) 1337 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1338 1339 def test_wait_errors(self): 1340 self.assertRaises( 1341 ValueError, self.loop.run_until_complete, 1342 asyncio.wait(set())) 1343 1344 # -1 is an invalid return_when value 1345 sleep_coro = asyncio.sleep(10.0) 1346 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1347 self.assertRaises(ValueError, 1348 self.loop.run_until_complete, wait_coro) 1349 1350 sleep_coro.close() 1351 1352 def test_wait_first_completed(self): 1353 1354 def gen(): 1355 when = yield 1356 self.assertAlmostEqual(10.0, when) 1357 when = yield 0 1358 self.assertAlmostEqual(0.1, when) 1359 yield 0.1 1360 1361 loop = self.new_test_loop(gen) 1362 1363 a = self.new_task(loop, asyncio.sleep(10.0)) 1364 b = self.new_task(loop, asyncio.sleep(0.1)) 1365 task = self.new_task( 1366 loop, 1367 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1368 1369 done, pending = loop.run_until_complete(task) 1370 self.assertEqual({b}, done) 1371 self.assertEqual({a}, pending) 1372 self.assertFalse(a.done()) 1373 self.assertTrue(b.done()) 1374 self.assertIsNone(b.result()) 1375 self.assertAlmostEqual(0.1, loop.time()) 1376 1377 # move forward to close generator 1378 loop.advance_time(10) 1379 loop.run_until_complete(asyncio.wait([a, b])) 1380 1381 def test_wait_really_done(self): 1382 # there is possibility that some tasks in the pending list 1383 # became done but their callbacks haven't all been called yet 1384 1385 async def coro1(): 1386 await asyncio.sleep(0) 1387 1388 async def coro2(): 1389 await asyncio.sleep(0) 1390 await asyncio.sleep(0) 1391 1392 a = self.new_task(self.loop, coro1()) 1393 b = self.new_task(self.loop, coro2()) 1394 task = self.new_task( 1395 self.loop, 1396 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1397 1398 done, pending = self.loop.run_until_complete(task) 1399 self.assertEqual({a, b}, done) 1400 self.assertTrue(a.done()) 1401 self.assertIsNone(a.result()) 1402 self.assertTrue(b.done()) 1403 self.assertIsNone(b.result()) 1404 1405 def test_wait_first_exception(self): 1406 1407 def gen(): 1408 when = yield 1409 self.assertAlmostEqual(10.0, when) 1410 yield 0 1411 1412 loop = self.new_test_loop(gen) 1413 1414 # first_exception, task already has exception 1415 a = self.new_task(loop, asyncio.sleep(10.0)) 1416 1417 async def exc(): 1418 raise ZeroDivisionError('err') 1419 1420 b = self.new_task(loop, exc()) 1421 task = self.new_task( 1422 loop, 1423 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1424 1425 done, pending = loop.run_until_complete(task) 1426 self.assertEqual({b}, done) 1427 self.assertEqual({a}, pending) 1428 self.assertAlmostEqual(0, loop.time()) 1429 1430 # move forward to close generator 1431 loop.advance_time(10) 1432 loop.run_until_complete(asyncio.wait([a, b])) 1433 1434 def test_wait_first_exception_in_wait(self): 1435 1436 def gen(): 1437 when = yield 1438 self.assertAlmostEqual(10.0, when) 1439 when = yield 0 1440 self.assertAlmostEqual(0.01, when) 1441 yield 0.01 1442 1443 loop = self.new_test_loop(gen) 1444 1445 # first_exception, exception during waiting 1446 a = self.new_task(loop, asyncio.sleep(10.0)) 1447 1448 async def exc(): 1449 await asyncio.sleep(0.01) 1450 raise ZeroDivisionError('err') 1451 1452 b = self.new_task(loop, exc()) 1453 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1454 1455 done, pending = loop.run_until_complete(task) 1456 self.assertEqual({b}, done) 1457 self.assertEqual({a}, pending) 1458 self.assertAlmostEqual(0.01, loop.time()) 1459 1460 # move forward to close generator 1461 loop.advance_time(10) 1462 loop.run_until_complete(asyncio.wait([a, b])) 1463 1464 def test_wait_with_exception(self): 1465 1466 def gen(): 1467 when = yield 1468 self.assertAlmostEqual(0.1, when) 1469 when = yield 0 1470 self.assertAlmostEqual(0.15, when) 1471 yield 0.15 1472 1473 loop = self.new_test_loop(gen) 1474 1475 a = self.new_task(loop, asyncio.sleep(0.1)) 1476 1477 async def sleeper(): 1478 await asyncio.sleep(0.15) 1479 raise ZeroDivisionError('really') 1480 1481 b = self.new_task(loop, sleeper()) 1482 1483 async def foo(): 1484 done, pending = await asyncio.wait([b, a]) 1485 self.assertEqual(len(done), 2) 1486 self.assertEqual(pending, set()) 1487 errors = set(f for f in done if f.exception() is not None) 1488 self.assertEqual(len(errors), 1) 1489 1490 loop.run_until_complete(self.new_task(loop, foo())) 1491 self.assertAlmostEqual(0.15, loop.time()) 1492 1493 loop.run_until_complete(self.new_task(loop, foo())) 1494 self.assertAlmostEqual(0.15, loop.time()) 1495 1496 def test_wait_with_timeout(self): 1497 1498 def gen(): 1499 when = yield 1500 self.assertAlmostEqual(0.1, when) 1501 when = yield 0 1502 self.assertAlmostEqual(0.15, when) 1503 when = yield 0 1504 self.assertAlmostEqual(0.11, when) 1505 yield 0.11 1506 1507 loop = self.new_test_loop(gen) 1508 1509 a = self.new_task(loop, asyncio.sleep(0.1)) 1510 b = self.new_task(loop, asyncio.sleep(0.15)) 1511 1512 async def foo(): 1513 done, pending = await asyncio.wait([b, a], timeout=0.11) 1514 self.assertEqual(done, set([a])) 1515 self.assertEqual(pending, set([b])) 1516 1517 loop.run_until_complete(self.new_task(loop, foo())) 1518 self.assertAlmostEqual(0.11, loop.time()) 1519 1520 # move forward to close generator 1521 loop.advance_time(10) 1522 loop.run_until_complete(asyncio.wait([a, b])) 1523 1524 def test_wait_concurrent_complete(self): 1525 1526 def gen(): 1527 when = yield 1528 self.assertAlmostEqual(0.1, when) 1529 when = yield 0 1530 self.assertAlmostEqual(0.15, when) 1531 when = yield 0 1532 self.assertAlmostEqual(0.1, when) 1533 yield 0.1 1534 1535 loop = self.new_test_loop(gen) 1536 1537 a = self.new_task(loop, asyncio.sleep(0.1)) 1538 b = self.new_task(loop, asyncio.sleep(0.15)) 1539 1540 done, pending = loop.run_until_complete( 1541 asyncio.wait([b, a], timeout=0.1)) 1542 1543 self.assertEqual(done, set([a])) 1544 self.assertEqual(pending, set([b])) 1545 self.assertAlmostEqual(0.1, loop.time()) 1546 1547 # move forward to close generator 1548 loop.advance_time(10) 1549 loop.run_until_complete(asyncio.wait([a, b])) 1550 1551 def test_wait_with_iterator_of_tasks(self): 1552 1553 def gen(): 1554 when = yield 1555 self.assertAlmostEqual(0.1, when) 1556 when = yield 0 1557 self.assertAlmostEqual(0.15, when) 1558 yield 0.15 1559 1560 loop = self.new_test_loop(gen) 1561 1562 a = self.new_task(loop, asyncio.sleep(0.1)) 1563 b = self.new_task(loop, asyncio.sleep(0.15)) 1564 1565 async def foo(): 1566 done, pending = await asyncio.wait(iter([b, a])) 1567 self.assertEqual(done, set([a, b])) 1568 self.assertEqual(pending, set()) 1569 return 42 1570 1571 res = loop.run_until_complete(self.new_task(loop, foo())) 1572 self.assertEqual(res, 42) 1573 self.assertAlmostEqual(0.15, loop.time()) 1574 1575 def test_as_completed(self): 1576 1577 def gen(): 1578 yield 0 1579 yield 0 1580 yield 0.01 1581 yield 0 1582 1583 loop = self.new_test_loop(gen) 1584 # disable "slow callback" warning 1585 loop.slow_callback_duration = 1.0 1586 completed = set() 1587 time_shifted = False 1588 1589 with self.assertWarns(DeprecationWarning): 1590 @asyncio.coroutine 1591 def sleeper(dt, x): 1592 nonlocal time_shifted 1593 yield from asyncio.sleep(dt) 1594 completed.add(x) 1595 if not time_shifted and 'a' in completed and 'b' in completed: 1596 time_shifted = True 1597 loop.advance_time(0.14) 1598 return x 1599 1600 a = sleeper(0.01, 'a') 1601 b = sleeper(0.01, 'b') 1602 c = sleeper(0.15, 'c') 1603 1604 async def foo(): 1605 values = [] 1606 for f in asyncio.as_completed([b, c, a], loop=loop): 1607 values.append(await f) 1608 return values 1609 with self.assertWarns(DeprecationWarning): 1610 res = loop.run_until_complete(self.new_task(loop, foo())) 1611 self.assertAlmostEqual(0.15, loop.time()) 1612 self.assertTrue('a' in res[:2]) 1613 self.assertTrue('b' in res[:2]) 1614 self.assertEqual(res[2], 'c') 1615 1616 # Doing it again should take no time and exercise a different path. 1617 with self.assertWarns(DeprecationWarning): 1618 res = loop.run_until_complete(self.new_task(loop, foo())) 1619 self.assertAlmostEqual(0.15, loop.time()) 1620 1621 def test_as_completed_with_timeout(self): 1622 1623 def gen(): 1624 yield 1625 yield 0 1626 yield 0 1627 yield 0.1 1628 1629 loop = self.new_test_loop(gen) 1630 1631 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1632 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1633 1634 async def foo(): 1635 values = [] 1636 for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): 1637 if values: 1638 loop.advance_time(0.02) 1639 try: 1640 v = await f 1641 values.append((1, v)) 1642 except asyncio.TimeoutError as exc: 1643 values.append((2, exc)) 1644 return values 1645 1646 with self.assertWarns(DeprecationWarning): 1647 res = loop.run_until_complete(self.new_task(loop, foo())) 1648 self.assertEqual(len(res), 2, res) 1649 self.assertEqual(res[0], (1, 'a')) 1650 self.assertEqual(res[1][0], 2) 1651 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1652 self.assertAlmostEqual(0.12, loop.time()) 1653 1654 # move forward to close generator 1655 loop.advance_time(10) 1656 loop.run_until_complete(asyncio.wait([a, b])) 1657 1658 def test_as_completed_with_unused_timeout(self): 1659 1660 def gen(): 1661 yield 1662 yield 0 1663 yield 0.01 1664 1665 loop = self.new_test_loop(gen) 1666 1667 a = asyncio.sleep(0.01, 'a') 1668 1669 async def foo(): 1670 for f in asyncio.as_completed([a], timeout=1, loop=loop): 1671 v = await f 1672 self.assertEqual(v, 'a') 1673 1674 with self.assertWarns(DeprecationWarning): 1675 loop.run_until_complete(self.new_task(loop, foo())) 1676 1677 def test_as_completed_reverse_wait(self): 1678 1679 def gen(): 1680 yield 0 1681 yield 0.05 1682 yield 0 1683 1684 loop = self.new_test_loop(gen) 1685 1686 a = asyncio.sleep(0.05, 'a') 1687 b = asyncio.sleep(0.10, 'b') 1688 fs = {a, b} 1689 1690 with self.assertWarns(DeprecationWarning): 1691 futs = list(asyncio.as_completed(fs, loop=loop)) 1692 self.assertEqual(len(futs), 2) 1693 1694 x = loop.run_until_complete(futs[1]) 1695 self.assertEqual(x, 'a') 1696 self.assertAlmostEqual(0.05, loop.time()) 1697 loop.advance_time(0.05) 1698 y = loop.run_until_complete(futs[0]) 1699 self.assertEqual(y, 'b') 1700 self.assertAlmostEqual(0.10, loop.time()) 1701 1702 def test_as_completed_concurrent(self): 1703 1704 def gen(): 1705 when = yield 1706 self.assertAlmostEqual(0.05, when) 1707 when = yield 0 1708 self.assertAlmostEqual(0.05, when) 1709 yield 0.05 1710 1711 loop = self.new_test_loop(gen) 1712 1713 a = asyncio.sleep(0.05, 'a') 1714 b = asyncio.sleep(0.05, 'b') 1715 fs = {a, b} 1716 with self.assertWarns(DeprecationWarning): 1717 futs = list(asyncio.as_completed(fs, loop=loop)) 1718 self.assertEqual(len(futs), 2) 1719 waiter = asyncio.wait(futs) 1720 # Deprecation from passing coros in futs to asyncio.wait() 1721 with self.assertWarns(DeprecationWarning): 1722 done, pending = loop.run_until_complete(waiter) 1723 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1724 1725 def test_as_completed_duplicate_coroutines(self): 1726 1727 with self.assertWarns(DeprecationWarning): 1728 @asyncio.coroutine 1729 def coro(s): 1730 return s 1731 1732 with self.assertWarns(DeprecationWarning): 1733 @asyncio.coroutine 1734 def runner(): 1735 result = [] 1736 c = coro('ham') 1737 for f in asyncio.as_completed([c, c, coro('spam')], 1738 loop=self.loop): 1739 result.append((yield from f)) 1740 return result 1741 1742 with self.assertWarns(DeprecationWarning): 1743 fut = self.new_task(self.loop, runner()) 1744 self.loop.run_until_complete(fut) 1745 result = fut.result() 1746 self.assertEqual(set(result), {'ham', 'spam'}) 1747 self.assertEqual(len(result), 2) 1748 1749 def test_sleep(self): 1750 1751 def gen(): 1752 when = yield 1753 self.assertAlmostEqual(0.05, when) 1754 when = yield 0.05 1755 self.assertAlmostEqual(0.1, when) 1756 yield 0.05 1757 1758 loop = self.new_test_loop(gen) 1759 1760 async def sleeper(dt, arg): 1761 await asyncio.sleep(dt/2) 1762 res = await asyncio.sleep(dt/2, arg) 1763 return res 1764 1765 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1766 loop.run_until_complete(t) 1767 self.assertTrue(t.done()) 1768 self.assertEqual(t.result(), 'yeah') 1769 self.assertAlmostEqual(0.1, loop.time()) 1770 1771 def test_sleep_cancel(self): 1772 1773 def gen(): 1774 when = yield 1775 self.assertAlmostEqual(10.0, when) 1776 yield 0 1777 1778 loop = self.new_test_loop(gen) 1779 1780 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1781 1782 handle = None 1783 orig_call_later = loop.call_later 1784 1785 def call_later(delay, callback, *args): 1786 nonlocal handle 1787 handle = orig_call_later(delay, callback, *args) 1788 return handle 1789 1790 loop.call_later = call_later 1791 test_utils.run_briefly(loop) 1792 1793 self.assertFalse(handle._cancelled) 1794 1795 t.cancel() 1796 test_utils.run_briefly(loop) 1797 self.assertTrue(handle._cancelled) 1798 1799 def test_task_cancel_sleeping_task(self): 1800 1801 def gen(): 1802 when = yield 1803 self.assertAlmostEqual(0.1, when) 1804 when = yield 0 1805 self.assertAlmostEqual(5000, when) 1806 yield 0.1 1807 1808 loop = self.new_test_loop(gen) 1809 1810 async def sleep(dt): 1811 await asyncio.sleep(dt) 1812 1813 async def doit(): 1814 sleeper = self.new_task(loop, sleep(5000)) 1815 loop.call_later(0.1, sleeper.cancel) 1816 try: 1817 await sleeper 1818 except asyncio.CancelledError: 1819 return 'cancelled' 1820 else: 1821 return 'slept in' 1822 1823 doer = doit() 1824 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1825 self.assertAlmostEqual(0.1, loop.time()) 1826 1827 def test_task_cancel_waiter_future(self): 1828 fut = self.new_future(self.loop) 1829 1830 async def coro(): 1831 await fut 1832 1833 task = self.new_task(self.loop, coro()) 1834 test_utils.run_briefly(self.loop) 1835 self.assertIs(task._fut_waiter, fut) 1836 1837 task.cancel() 1838 test_utils.run_briefly(self.loop) 1839 self.assertRaises( 1840 asyncio.CancelledError, self.loop.run_until_complete, task) 1841 self.assertIsNone(task._fut_waiter) 1842 self.assertTrue(fut.cancelled()) 1843 1844 def test_task_set_methods(self): 1845 async def notmuch(): 1846 return 'ko' 1847 1848 gen = notmuch() 1849 task = self.new_task(self.loop, gen) 1850 1851 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1852 task.set_result('ok') 1853 1854 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1855 task.set_exception(ValueError()) 1856 1857 self.assertEqual( 1858 self.loop.run_until_complete(task), 1859 'ko') 1860 1861 def test_step_result(self): 1862 with self.assertWarns(DeprecationWarning): 1863 @asyncio.coroutine 1864 def notmuch(): 1865 yield None 1866 yield 1 1867 return 'ko' 1868 1869 self.assertRaises( 1870 RuntimeError, self.loop.run_until_complete, notmuch()) 1871 1872 def test_step_result_future(self): 1873 # If coroutine returns future, task waits on this future. 1874 1875 class Fut(asyncio.Future): 1876 def __init__(self, *args, **kwds): 1877 self.cb_added = False 1878 super().__init__(*args, **kwds) 1879 1880 def add_done_callback(self, *args, **kwargs): 1881 self.cb_added = True 1882 super().add_done_callback(*args, **kwargs) 1883 1884 fut = Fut(loop=self.loop) 1885 result = None 1886 1887 async def wait_for_future(): 1888 nonlocal result 1889 result = await fut 1890 1891 t = self.new_task(self.loop, wait_for_future()) 1892 test_utils.run_briefly(self.loop) 1893 self.assertTrue(fut.cb_added) 1894 1895 res = object() 1896 fut.set_result(res) 1897 test_utils.run_briefly(self.loop) 1898 self.assertIs(res, result) 1899 self.assertTrue(t.done()) 1900 self.assertIsNone(t.result()) 1901 1902 def test_baseexception_during_cancel(self): 1903 1904 def gen(): 1905 when = yield 1906 self.assertAlmostEqual(10.0, when) 1907 yield 0 1908 1909 loop = self.new_test_loop(gen) 1910 1911 async def sleeper(): 1912 await asyncio.sleep(10) 1913 1914 base_exc = SystemExit() 1915 1916 async def notmutch(): 1917 try: 1918 await sleeper() 1919 except asyncio.CancelledError: 1920 raise base_exc 1921 1922 task = self.new_task(loop, notmutch()) 1923 test_utils.run_briefly(loop) 1924 1925 task.cancel() 1926 self.assertFalse(task.done()) 1927 1928 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1929 1930 self.assertTrue(task.done()) 1931 self.assertFalse(task.cancelled()) 1932 self.assertIs(task.exception(), base_exc) 1933 1934 def test_iscoroutinefunction(self): 1935 def fn(): 1936 pass 1937 1938 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1939 1940 def fn1(): 1941 yield 1942 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1943 1944 with self.assertWarns(DeprecationWarning): 1945 @asyncio.coroutine 1946 def fn2(): 1947 yield 1948 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1949 1950 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1951 1952 def test_yield_vs_yield_from(self): 1953 fut = self.new_future(self.loop) 1954 1955 with self.assertWarns(DeprecationWarning): 1956 @asyncio.coroutine 1957 def wait_for_future(): 1958 yield fut 1959 1960 task = wait_for_future() 1961 with self.assertRaises(RuntimeError): 1962 self.loop.run_until_complete(task) 1963 1964 self.assertFalse(fut.done()) 1965 1966 def test_yield_vs_yield_from_generator(self): 1967 with self.assertWarns(DeprecationWarning): 1968 @asyncio.coroutine 1969 def coro(): 1970 yield 1971 1972 with self.assertWarns(DeprecationWarning): 1973 @asyncio.coroutine 1974 def wait_for_future(): 1975 gen = coro() 1976 try: 1977 yield gen 1978 finally: 1979 gen.close() 1980 1981 task = wait_for_future() 1982 self.assertRaises( 1983 RuntimeError, 1984 self.loop.run_until_complete, task) 1985 1986 def test_coroutine_non_gen_function(self): 1987 with self.assertWarns(DeprecationWarning): 1988 @asyncio.coroutine 1989 def func(): 1990 return 'test' 1991 1992 self.assertTrue(asyncio.iscoroutinefunction(func)) 1993 1994 coro = func() 1995 self.assertTrue(asyncio.iscoroutine(coro)) 1996 1997 res = self.loop.run_until_complete(coro) 1998 self.assertEqual(res, 'test') 1999 2000 def test_coroutine_non_gen_function_return_future(self): 2001 fut = self.new_future(self.loop) 2002 2003 with self.assertWarns(DeprecationWarning): 2004 @asyncio.coroutine 2005 def func(): 2006 return fut 2007 2008 async def coro(): 2009 fut.set_result('test') 2010 2011 t1 = self.new_task(self.loop, func()) 2012 t2 = self.new_task(self.loop, coro()) 2013 res = self.loop.run_until_complete(t1) 2014 self.assertEqual(res, 'test') 2015 self.assertIsNone(t2.result()) 2016 2017 def test_current_task(self): 2018 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2019 2020 async def coro(loop): 2021 self.assertIs(asyncio.current_task(loop=loop), task) 2022 2023 self.assertIs(asyncio.current_task(None), task) 2024 self.assertIs(asyncio.current_task(), task) 2025 2026 task = self.new_task(self.loop, coro(self.loop)) 2027 self.loop.run_until_complete(task) 2028 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2029 2030 def test_current_task_with_interleaving_tasks(self): 2031 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2032 2033 fut1 = self.new_future(self.loop) 2034 fut2 = self.new_future(self.loop) 2035 2036 async def coro1(loop): 2037 self.assertTrue(asyncio.current_task(loop=loop) is task1) 2038 await fut1 2039 self.assertTrue(asyncio.current_task(loop=loop) is task1) 2040 fut2.set_result(True) 2041 2042 async def coro2(loop): 2043 self.assertTrue(asyncio.current_task(loop=loop) is task2) 2044 fut1.set_result(True) 2045 await fut2 2046 self.assertTrue(asyncio.current_task(loop=loop) is task2) 2047 2048 task1 = self.new_task(self.loop, coro1(self.loop)) 2049 task2 = self.new_task(self.loop, coro2(self.loop)) 2050 2051 self.loop.run_until_complete(asyncio.wait((task1, task2))) 2052 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2053 2054 # Some thorough tests for cancellation propagation through 2055 # coroutines, tasks and wait(). 2056 2057 def test_yield_future_passes_cancel(self): 2058 # Cancelling outer() cancels inner() cancels waiter. 2059 proof = 0 2060 waiter = self.new_future(self.loop) 2061 2062 async def inner(): 2063 nonlocal proof 2064 try: 2065 await waiter 2066 except asyncio.CancelledError: 2067 proof += 1 2068 raise 2069 else: 2070 self.fail('got past sleep() in inner()') 2071 2072 async def outer(): 2073 nonlocal proof 2074 try: 2075 await inner() 2076 except asyncio.CancelledError: 2077 proof += 100 # Expect this path. 2078 else: 2079 proof += 10 2080 2081 f = asyncio.ensure_future(outer(), loop=self.loop) 2082 test_utils.run_briefly(self.loop) 2083 f.cancel() 2084 self.loop.run_until_complete(f) 2085 self.assertEqual(proof, 101) 2086 self.assertTrue(waiter.cancelled()) 2087 2088 def test_yield_wait_does_not_shield_cancel(self): 2089 # Cancelling outer() makes wait() return early, leaves inner() 2090 # running. 2091 proof = 0 2092 waiter = self.new_future(self.loop) 2093 2094 async def inner(): 2095 nonlocal proof 2096 await waiter 2097 proof += 1 2098 2099 async def outer(): 2100 nonlocal proof 2101 with self.assertWarns(DeprecationWarning): 2102 d, p = await asyncio.wait([inner()]) 2103 proof += 100 2104 2105 f = asyncio.ensure_future(outer(), loop=self.loop) 2106 test_utils.run_briefly(self.loop) 2107 f.cancel() 2108 self.assertRaises( 2109 asyncio.CancelledError, self.loop.run_until_complete, f) 2110 waiter.set_result(None) 2111 test_utils.run_briefly(self.loop) 2112 self.assertEqual(proof, 1) 2113 2114 def test_shield_result(self): 2115 inner = self.new_future(self.loop) 2116 outer = asyncio.shield(inner) 2117 inner.set_result(42) 2118 res = self.loop.run_until_complete(outer) 2119 self.assertEqual(res, 42) 2120 2121 def test_shield_exception(self): 2122 inner = self.new_future(self.loop) 2123 outer = asyncio.shield(inner) 2124 test_utils.run_briefly(self.loop) 2125 exc = RuntimeError('expected') 2126 inner.set_exception(exc) 2127 test_utils.run_briefly(self.loop) 2128 self.assertIs(outer.exception(), exc) 2129 2130 def test_shield_cancel_inner(self): 2131 inner = self.new_future(self.loop) 2132 outer = asyncio.shield(inner) 2133 test_utils.run_briefly(self.loop) 2134 inner.cancel() 2135 test_utils.run_briefly(self.loop) 2136 self.assertTrue(outer.cancelled()) 2137 2138 def test_shield_cancel_outer(self): 2139 inner = self.new_future(self.loop) 2140 outer = asyncio.shield(inner) 2141 test_utils.run_briefly(self.loop) 2142 outer.cancel() 2143 test_utils.run_briefly(self.loop) 2144 self.assertTrue(outer.cancelled()) 2145 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 2146 2147 def test_shield_shortcut(self): 2148 fut = self.new_future(self.loop) 2149 fut.set_result(42) 2150 res = self.loop.run_until_complete(asyncio.shield(fut)) 2151 self.assertEqual(res, 42) 2152 2153 def test_shield_effect(self): 2154 # Cancelling outer() does not affect inner(). 2155 proof = 0 2156 waiter = self.new_future(self.loop) 2157 2158 async def inner(): 2159 nonlocal proof 2160 await waiter 2161 proof += 1 2162 2163 async def outer(): 2164 nonlocal proof 2165 await asyncio.shield(inner()) 2166 proof += 100 2167 2168 f = asyncio.ensure_future(outer(), loop=self.loop) 2169 test_utils.run_briefly(self.loop) 2170 f.cancel() 2171 with self.assertRaises(asyncio.CancelledError): 2172 self.loop.run_until_complete(f) 2173 waiter.set_result(None) 2174 test_utils.run_briefly(self.loop) 2175 self.assertEqual(proof, 1) 2176 2177 def test_shield_gather(self): 2178 child1 = self.new_future(self.loop) 2179 child2 = self.new_future(self.loop) 2180 parent = asyncio.gather(child1, child2) 2181 outer = asyncio.shield(parent) 2182 test_utils.run_briefly(self.loop) 2183 outer.cancel() 2184 test_utils.run_briefly(self.loop) 2185 self.assertTrue(outer.cancelled()) 2186 child1.set_result(1) 2187 child2.set_result(2) 2188 test_utils.run_briefly(self.loop) 2189 self.assertEqual(parent.result(), [1, 2]) 2190 2191 def test_gather_shield(self): 2192 child1 = self.new_future(self.loop) 2193 child2 = self.new_future(self.loop) 2194 inner1 = asyncio.shield(child1) 2195 inner2 = asyncio.shield(child2) 2196 parent = asyncio.gather(inner1, inner2) 2197 test_utils.run_briefly(self.loop) 2198 parent.cancel() 2199 # This should cancel inner1 and inner2 but bot child1 and child2. 2200 test_utils.run_briefly(self.loop) 2201 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 2202 self.assertTrue(inner1.cancelled()) 2203 self.assertTrue(inner2.cancelled()) 2204 child1.set_result(1) 2205 child2.set_result(2) 2206 test_utils.run_briefly(self.loop) 2207 2208 def test_as_completed_invalid_args(self): 2209 fut = self.new_future(self.loop) 2210 2211 # as_completed() expects a list of futures, not a future instance 2212 self.assertRaises(TypeError, self.loop.run_until_complete, 2213 asyncio.as_completed(fut, loop=self.loop)) 2214 coro = coroutine_function() 2215 self.assertRaises(TypeError, self.loop.run_until_complete, 2216 asyncio.as_completed(coro, loop=self.loop)) 2217 coro.close() 2218 2219 def test_wait_invalid_args(self): 2220 fut = self.new_future(self.loop) 2221 2222 # wait() expects a list of futures, not a future instance 2223 self.assertRaises(TypeError, self.loop.run_until_complete, 2224 asyncio.wait(fut)) 2225 coro = coroutine_function() 2226 self.assertRaises(TypeError, self.loop.run_until_complete, 2227 asyncio.wait(coro)) 2228 coro.close() 2229 2230 # wait() expects at least a future 2231 self.assertRaises(ValueError, self.loop.run_until_complete, 2232 asyncio.wait([])) 2233 2234 def test_corowrapper_mocks_generator(self): 2235 2236 def check(): 2237 # A function that asserts various things. 2238 # Called twice, with different debug flag values. 2239 2240 with self.assertWarns(DeprecationWarning): 2241 @asyncio.coroutine 2242 def coro(): 2243 # The actual coroutine. 2244 self.assertTrue(gen.gi_running) 2245 yield from fut 2246 2247 # A completed Future used to run the coroutine. 2248 fut = self.new_future(self.loop) 2249 fut.set_result(None) 2250 2251 # Call the coroutine. 2252 gen = coro() 2253 2254 # Check some properties. 2255 self.assertTrue(asyncio.iscoroutine(gen)) 2256 self.assertIsInstance(gen.gi_frame, types.FrameType) 2257 self.assertFalse(gen.gi_running) 2258 self.assertIsInstance(gen.gi_code, types.CodeType) 2259 2260 # Run it. 2261 self.loop.run_until_complete(gen) 2262 2263 # The frame should have changed. 2264 self.assertIsNone(gen.gi_frame) 2265 2266 # Test with debug flag cleared. 2267 with set_coroutine_debug(False): 2268 check() 2269 2270 # Test with debug flag set. 2271 with set_coroutine_debug(True): 2272 check() 2273 2274 def test_yield_from_corowrapper(self): 2275 with set_coroutine_debug(True): 2276 with self.assertWarns(DeprecationWarning): 2277 @asyncio.coroutine 2278 def t1(): 2279 return (yield from t2()) 2280 2281 with self.assertWarns(DeprecationWarning): 2282 @asyncio.coroutine 2283 def t2(): 2284 f = self.new_future(self.loop) 2285 self.new_task(self.loop, t3(f)) 2286 return (yield from f) 2287 2288 with self.assertWarns(DeprecationWarning): 2289 @asyncio.coroutine 2290 def t3(f): 2291 f.set_result((1, 2, 3)) 2292 2293 task = self.new_task(self.loop, t1()) 2294 val = self.loop.run_until_complete(task) 2295 self.assertEqual(val, (1, 2, 3)) 2296 2297 def test_yield_from_corowrapper_send(self): 2298 def foo(): 2299 a = yield 2300 return a 2301 2302 def call(arg): 2303 cw = asyncio.coroutines.CoroWrapper(foo()) 2304 cw.send(None) 2305 try: 2306 cw.send(arg) 2307 except StopIteration as ex: 2308 return ex.args[0] 2309 else: 2310 raise AssertionError('StopIteration was expected') 2311 2312 self.assertEqual(call((1, 2)), (1, 2)) 2313 self.assertEqual(call('spam'), 'spam') 2314 2315 def test_corowrapper_weakref(self): 2316 wd = weakref.WeakValueDictionary() 2317 def foo(): yield from [] 2318 cw = asyncio.coroutines.CoroWrapper(foo()) 2319 wd['cw'] = cw # Would fail without __weakref__ slot. 2320 cw.gen = None # Suppress warning from __del__. 2321 2322 def test_corowrapper_throw(self): 2323 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 2324 def foo(): 2325 value = None 2326 while True: 2327 try: 2328 value = yield value 2329 except Exception as e: 2330 value = e 2331 2332 exception = Exception("foo") 2333 cw = asyncio.coroutines.CoroWrapper(foo()) 2334 cw.send(None) 2335 self.assertIs(exception, cw.throw(exception)) 2336 2337 cw = asyncio.coroutines.CoroWrapper(foo()) 2338 cw.send(None) 2339 self.assertIs(exception, cw.throw(Exception, exception)) 2340 2341 cw = asyncio.coroutines.CoroWrapper(foo()) 2342 cw.send(None) 2343 exception = cw.throw(Exception, "foo") 2344 self.assertIsInstance(exception, Exception) 2345 self.assertEqual(exception.args, ("foo", )) 2346 2347 cw = asyncio.coroutines.CoroWrapper(foo()) 2348 cw.send(None) 2349 exception = cw.throw(Exception, "foo", None) 2350 self.assertIsInstance(exception, Exception) 2351 self.assertEqual(exception.args, ("foo", )) 2352 2353 def test_log_destroyed_pending_task(self): 2354 Task = self.__class__.Task 2355 2356 with self.assertWarns(DeprecationWarning): 2357 @asyncio.coroutine 2358 def kill_me(loop): 2359 future = self.new_future(loop) 2360 yield from future 2361 # at this point, the only reference to kill_me() task is 2362 # the Task._wakeup() method in future._callbacks 2363 raise Exception("code never reached") 2364 2365 mock_handler = mock.Mock() 2366 self.loop.set_debug(True) 2367 self.loop.set_exception_handler(mock_handler) 2368 2369 # schedule the task 2370 coro = kill_me(self.loop) 2371 task = asyncio.ensure_future(coro, loop=self.loop) 2372 2373 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2374 2375 asyncio.set_event_loop(None) 2376 2377 # execute the task so it waits for future 2378 self.loop._run_once() 2379 self.assertEqual(len(self.loop._ready), 0) 2380 2381 # remove the future used in kill_me(), and references to the task 2382 del coro.gi_frame.f_locals['future'] 2383 coro = None 2384 source_traceback = task._source_traceback 2385 task = None 2386 2387 # no more reference to kill_me() task: the task is destroyed by the GC 2388 support.gc_collect() 2389 2390 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2391 2392 mock_handler.assert_called_with(self.loop, { 2393 'message': 'Task was destroyed but it is pending!', 2394 'task': mock.ANY, 2395 'source_traceback': source_traceback, 2396 }) 2397 mock_handler.reset_mock() 2398 2399 @mock.patch('asyncio.base_events.logger') 2400 def test_tb_logger_not_called_after_cancel(self, m_log): 2401 loop = asyncio.new_event_loop() 2402 self.set_event_loop(loop) 2403 2404 async def coro(): 2405 raise TypeError 2406 2407 async def runner(): 2408 task = self.new_task(loop, coro()) 2409 await asyncio.sleep(0.05) 2410 task.cancel() 2411 task = None 2412 2413 loop.run_until_complete(runner()) 2414 self.assertFalse(m_log.error.called) 2415 2416 @mock.patch('asyncio.coroutines.logger') 2417 def test_coroutine_never_yielded(self, m_log): 2418 with set_coroutine_debug(True): 2419 with self.assertWarns(DeprecationWarning): 2420 @asyncio.coroutine 2421 def coro_noop(): 2422 pass 2423 2424 tb_filename = __file__ 2425 tb_lineno = sys._getframe().f_lineno + 2 2426 # create a coroutine object but don't use it 2427 coro_noop() 2428 support.gc_collect() 2429 2430 self.assertTrue(m_log.error.called) 2431 message = m_log.error.call_args[0][0] 2432 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2433 2434 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2435 r'was never yielded from\n' 2436 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2437 r'.*\n' 2438 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2439 r' coro_noop\(\)$' 2440 % (re.escape(coro_noop.__qualname__), 2441 re.escape(func_filename), func_lineno, 2442 re.escape(tb_filename), tb_lineno)) 2443 2444 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2445 2446 def test_return_coroutine_from_coroutine(self): 2447 """Return of @asyncio.coroutine()-wrapped function generator object 2448 from @asyncio.coroutine()-wrapped function should have same effect as 2449 returning generator object or Future.""" 2450 def check(): 2451 with self.assertWarns(DeprecationWarning): 2452 @asyncio.coroutine 2453 def outer_coro(): 2454 with self.assertWarns(DeprecationWarning): 2455 @asyncio.coroutine 2456 def inner_coro(): 2457 return 1 2458 2459 return inner_coro() 2460 2461 result = self.loop.run_until_complete(outer_coro()) 2462 self.assertEqual(result, 1) 2463 2464 # Test with debug flag cleared. 2465 with set_coroutine_debug(False): 2466 check() 2467 2468 # Test with debug flag set. 2469 with set_coroutine_debug(True): 2470 check() 2471 2472 def test_task_source_traceback(self): 2473 self.loop.set_debug(True) 2474 2475 task = self.new_task(self.loop, coroutine_function()) 2476 lineno = sys._getframe().f_lineno - 1 2477 self.assertIsInstance(task._source_traceback, list) 2478 self.assertEqual(task._source_traceback[-2][:3], 2479 (__file__, 2480 lineno, 2481 'test_task_source_traceback')) 2482 self.loop.run_until_complete(task) 2483 2484 def _test_cancel_wait_for(self, timeout): 2485 loop = asyncio.new_event_loop() 2486 self.addCleanup(loop.close) 2487 2488 async def blocking_coroutine(): 2489 fut = self.new_future(loop) 2490 # Block: fut result is never set 2491 await fut 2492 2493 task = loop.create_task(blocking_coroutine()) 2494 2495 wait = loop.create_task(asyncio.wait_for(task, timeout)) 2496 loop.call_soon(wait.cancel) 2497 2498 self.assertRaises(asyncio.CancelledError, 2499 loop.run_until_complete, wait) 2500 2501 # Python issue #23219: cancelling the wait must also cancel the task 2502 self.assertTrue(task.cancelled()) 2503 2504 def test_cancel_blocking_wait_for(self): 2505 self._test_cancel_wait_for(None) 2506 2507 def test_cancel_wait_for(self): 2508 self._test_cancel_wait_for(60.0) 2509 2510 def test_cancel_gather_1(self): 2511 """Ensure that a gathering future refuses to be cancelled once all 2512 children are done""" 2513 loop = asyncio.new_event_loop() 2514 self.addCleanup(loop.close) 2515 2516 fut = self.new_future(loop) 2517 # The indirection fut->child_coro is needed since otherwise the 2518 # gathering task is done at the same time as the child future 2519 def child_coro(): 2520 return (yield from fut) 2521 gather_future = asyncio.gather(child_coro(), loop=loop) 2522 gather_task = asyncio.ensure_future(gather_future, loop=loop) 2523 2524 cancel_result = None 2525 def cancelling_callback(_): 2526 nonlocal cancel_result 2527 cancel_result = gather_task.cancel() 2528 fut.add_done_callback(cancelling_callback) 2529 2530 fut.set_result(42) # calls the cancelling_callback after fut is done() 2531 2532 # At this point the task should complete. 2533 loop.run_until_complete(gather_task) 2534 2535 # Python issue #26923: asyncio.gather drops cancellation 2536 self.assertEqual(cancel_result, False) 2537 self.assertFalse(gather_task.cancelled()) 2538 self.assertEqual(gather_task.result(), [42]) 2539 2540 def test_cancel_gather_2(self): 2541 cases = [ 2542 ((), ()), 2543 ((None,), ()), 2544 (('my message',), ('my message',)), 2545 # Non-string values should roundtrip. 2546 ((5,), (5,)), 2547 ] 2548 for cancel_args, expected_args in cases: 2549 with self.subTest(cancel_args=cancel_args): 2550 loop = asyncio.new_event_loop() 2551 self.addCleanup(loop.close) 2552 2553 async def test(): 2554 time = 0 2555 while True: 2556 time += 0.05 2557 await asyncio.gather(asyncio.sleep(0.05), 2558 return_exceptions=True, 2559 loop=loop) 2560 if time > 1: 2561 return 2562 2563 async def main(): 2564 qwe = self.new_task(loop, test()) 2565 await asyncio.sleep(0.2) 2566 qwe.cancel(*cancel_args) 2567 await qwe 2568 2569 try: 2570 loop.run_until_complete(main()) 2571 except asyncio.CancelledError as exc: 2572 self.assertEqual(exc.args, ()) 2573 exc_type, exc_args, depth = get_innermost_context(exc) 2574 self.assertEqual((exc_type, exc_args), 2575 (asyncio.CancelledError, expected_args)) 2576 # The exact traceback seems to vary in CI. 2577 self.assertIn(depth, (2, 3)) 2578 else: 2579 self.fail('gather did not propagate the cancellation ' 2580 'request') 2581 2582 def test_exception_traceback(self): 2583 # See http://bugs.python.org/issue28843 2584 2585 async def foo(): 2586 1 / 0 2587 2588 async def main(): 2589 task = self.new_task(self.loop, foo()) 2590 await asyncio.sleep(0) # skip one loop iteration 2591 self.assertIsNotNone(task.exception().__traceback__) 2592 2593 self.loop.run_until_complete(main()) 2594 2595 @mock.patch('asyncio.base_events.logger') 2596 def test_error_in_call_soon(self, m_log): 2597 def call_soon(callback, *args, **kwargs): 2598 raise ValueError 2599 self.loop.call_soon = call_soon 2600 2601 with self.assertWarns(DeprecationWarning): 2602 @asyncio.coroutine 2603 def coro(): 2604 pass 2605 2606 self.assertFalse(m_log.error.called) 2607 2608 with self.assertRaises(ValueError): 2609 gen = coro() 2610 try: 2611 self.new_task(self.loop, gen) 2612 finally: 2613 gen.close() 2614 2615 self.assertTrue(m_log.error.called) 2616 message = m_log.error.call_args[0][0] 2617 self.assertIn('Task was destroyed but it is pending', message) 2618 2619 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2620 2621 def test_create_task_with_noncoroutine(self): 2622 with self.assertRaisesRegex(TypeError, 2623 "a coroutine was expected, got 123"): 2624 self.new_task(self.loop, 123) 2625 2626 # test it for the second time to ensure that caching 2627 # in asyncio.iscoroutine() doesn't break things. 2628 with self.assertRaisesRegex(TypeError, 2629 "a coroutine was expected, got 123"): 2630 self.new_task(self.loop, 123) 2631 2632 def test_create_task_with_oldstyle_coroutine(self): 2633 2634 with self.assertWarns(DeprecationWarning): 2635 @asyncio.coroutine 2636 def coro(): 2637 pass 2638 2639 task = self.new_task(self.loop, coro()) 2640 self.assertIsInstance(task, self.Task) 2641 self.loop.run_until_complete(task) 2642 2643 # test it for the second time to ensure that caching 2644 # in asyncio.iscoroutine() doesn't break things. 2645 task = self.new_task(self.loop, coro()) 2646 self.assertIsInstance(task, self.Task) 2647 self.loop.run_until_complete(task) 2648 2649 def test_create_task_with_async_function(self): 2650 2651 async def coro(): 2652 pass 2653 2654 task = self.new_task(self.loop, coro()) 2655 self.assertIsInstance(task, self.Task) 2656 self.loop.run_until_complete(task) 2657 2658 # test it for the second time to ensure that caching 2659 # in asyncio.iscoroutine() doesn't break things. 2660 task = self.new_task(self.loop, coro()) 2661 self.assertIsInstance(task, self.Task) 2662 self.loop.run_until_complete(task) 2663 2664 def test_create_task_with_asynclike_function(self): 2665 task = self.new_task(self.loop, CoroLikeObject()) 2666 self.assertIsInstance(task, self.Task) 2667 self.assertEqual(self.loop.run_until_complete(task), 42) 2668 2669 # test it for the second time to ensure that caching 2670 # in asyncio.iscoroutine() doesn't break things. 2671 task = self.new_task(self.loop, CoroLikeObject()) 2672 self.assertIsInstance(task, self.Task) 2673 self.assertEqual(self.loop.run_until_complete(task), 42) 2674 2675 def test_bare_create_task(self): 2676 2677 async def inner(): 2678 return 1 2679 2680 async def coro(): 2681 task = asyncio.create_task(inner()) 2682 self.assertIsInstance(task, self.Task) 2683 ret = await task 2684 self.assertEqual(1, ret) 2685 2686 self.loop.run_until_complete(coro()) 2687 2688 def test_bare_create_named_task(self): 2689 2690 async def coro_noop(): 2691 pass 2692 2693 async def coro(): 2694 task = asyncio.create_task(coro_noop(), name='No-op') 2695 self.assertEqual(task.get_name(), 'No-op') 2696 await task 2697 2698 self.loop.run_until_complete(coro()) 2699 2700 def test_context_1(self): 2701 cvar = contextvars.ContextVar('cvar', default='nope') 2702 2703 async def sub(): 2704 await asyncio.sleep(0.01) 2705 self.assertEqual(cvar.get(), 'nope') 2706 cvar.set('something else') 2707 2708 async def main(): 2709 self.assertEqual(cvar.get(), 'nope') 2710 subtask = self.new_task(loop, sub()) 2711 cvar.set('yes') 2712 self.assertEqual(cvar.get(), 'yes') 2713 await subtask 2714 self.assertEqual(cvar.get(), 'yes') 2715 2716 loop = asyncio.new_event_loop() 2717 try: 2718 task = self.new_task(loop, main()) 2719 loop.run_until_complete(task) 2720 finally: 2721 loop.close() 2722 2723 def test_context_2(self): 2724 cvar = contextvars.ContextVar('cvar', default='nope') 2725 2726 async def main(): 2727 def fut_on_done(fut): 2728 # This change must not pollute the context 2729 # of the "main()" task. 2730 cvar.set('something else') 2731 2732 self.assertEqual(cvar.get(), 'nope') 2733 2734 for j in range(2): 2735 fut = self.new_future(loop) 2736 fut.add_done_callback(fut_on_done) 2737 cvar.set(f'yes{j}') 2738 loop.call_soon(fut.set_result, None) 2739 await fut 2740 self.assertEqual(cvar.get(), f'yes{j}') 2741 2742 for i in range(3): 2743 # Test that task passed its context to add_done_callback: 2744 cvar.set(f'yes{i}-{j}') 2745 await asyncio.sleep(0.001) 2746 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2747 2748 loop = asyncio.new_event_loop() 2749 try: 2750 task = self.new_task(loop, main()) 2751 loop.run_until_complete(task) 2752 finally: 2753 loop.close() 2754 2755 self.assertEqual(cvar.get(), 'nope') 2756 2757 def test_context_3(self): 2758 # Run 100 Tasks in parallel, each modifying cvar. 2759 2760 cvar = contextvars.ContextVar('cvar', default=-1) 2761 2762 async def sub(num): 2763 for i in range(10): 2764 cvar.set(num + i) 2765 await asyncio.sleep(random.uniform(0.001, 0.05)) 2766 self.assertEqual(cvar.get(), num + i) 2767 2768 async def main(): 2769 tasks = [] 2770 for i in range(100): 2771 task = loop.create_task(sub(random.randint(0, 10))) 2772 tasks.append(task) 2773 2774 await asyncio.gather(*tasks, loop=loop) 2775 2776 loop = asyncio.new_event_loop() 2777 try: 2778 loop.run_until_complete(main()) 2779 finally: 2780 loop.close() 2781 2782 self.assertEqual(cvar.get(), -1) 2783 2784 def test_get_coro(self): 2785 loop = asyncio.new_event_loop() 2786 coro = coroutine_function() 2787 try: 2788 task = self.new_task(loop, coro) 2789 loop.run_until_complete(task) 2790 self.assertIs(task.get_coro(), coro) 2791 finally: 2792 loop.close() 2793 2794 2795def add_subclass_tests(cls): 2796 BaseTask = cls.Task 2797 BaseFuture = cls.Future 2798 2799 if BaseTask is None or BaseFuture is None: 2800 return cls 2801 2802 class CommonFuture: 2803 def __init__(self, *args, **kwargs): 2804 self.calls = collections.defaultdict(lambda: 0) 2805 super().__init__(*args, **kwargs) 2806 2807 def add_done_callback(self, *args, **kwargs): 2808 self.calls['add_done_callback'] += 1 2809 return super().add_done_callback(*args, **kwargs) 2810 2811 class Task(CommonFuture, BaseTask): 2812 pass 2813 2814 class Future(CommonFuture, BaseFuture): 2815 pass 2816 2817 def test_subclasses_ctask_cfuture(self): 2818 fut = self.Future(loop=self.loop) 2819 2820 async def func(): 2821 self.loop.call_soon(lambda: fut.set_result('spam')) 2822 return await fut 2823 2824 task = self.Task(func(), loop=self.loop) 2825 2826 result = self.loop.run_until_complete(task) 2827 2828 self.assertEqual(result, 'spam') 2829 2830 self.assertEqual( 2831 dict(task.calls), 2832 {'add_done_callback': 1}) 2833 2834 self.assertEqual( 2835 dict(fut.calls), 2836 {'add_done_callback': 1}) 2837 2838 # Add patched Task & Future back to the test case 2839 cls.Task = Task 2840 cls.Future = Future 2841 2842 # Add an extra unit-test 2843 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2844 2845 # Disable the "test_task_source_traceback" test 2846 # (the test is hardcoded for a particular call stack, which 2847 # is slightly different for Task subclasses) 2848 cls.test_task_source_traceback = None 2849 2850 return cls 2851 2852 2853class SetMethodsTest: 2854 2855 def test_set_result_causes_invalid_state(self): 2856 Future = type(self).Future 2857 self.loop.call_exception_handler = exc_handler = mock.Mock() 2858 2859 async def foo(): 2860 await asyncio.sleep(0.1) 2861 return 10 2862 2863 coro = foo() 2864 task = self.new_task(self.loop, coro) 2865 Future.set_result(task, 'spam') 2866 2867 self.assertEqual( 2868 self.loop.run_until_complete(task), 2869 'spam') 2870 2871 exc_handler.assert_called_once() 2872 exc = exc_handler.call_args[0][0]['exception'] 2873 with self.assertRaisesRegex(asyncio.InvalidStateError, 2874 r'step\(\): already done'): 2875 raise exc 2876 2877 coro.close() 2878 2879 def test_set_exception_causes_invalid_state(self): 2880 class MyExc(Exception): 2881 pass 2882 2883 Future = type(self).Future 2884 self.loop.call_exception_handler = exc_handler = mock.Mock() 2885 2886 async def foo(): 2887 await asyncio.sleep(0.1) 2888 return 10 2889 2890 coro = foo() 2891 task = self.new_task(self.loop, coro) 2892 Future.set_exception(task, MyExc()) 2893 2894 with self.assertRaises(MyExc): 2895 self.loop.run_until_complete(task) 2896 2897 exc_handler.assert_called_once() 2898 exc = exc_handler.call_args[0][0]['exception'] 2899 with self.assertRaisesRegex(asyncio.InvalidStateError, 2900 r'step\(\): already done'): 2901 raise exc 2902 2903 coro.close() 2904 2905 2906@unittest.skipUnless(hasattr(futures, '_CFuture') and 2907 hasattr(tasks, '_CTask'), 2908 'requires the C _asyncio module') 2909class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2910 test_utils.TestCase): 2911 2912 Task = getattr(tasks, '_CTask', None) 2913 Future = getattr(futures, '_CFuture', None) 2914 2915 @support.refcount_test 2916 def test_refleaks_in_task___init__(self): 2917 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2918 async def coro(): 2919 pass 2920 task = self.new_task(self.loop, coro()) 2921 self.loop.run_until_complete(task) 2922 refs_before = gettotalrefcount() 2923 for i in range(100): 2924 task.__init__(coro(), loop=self.loop) 2925 self.loop.run_until_complete(task) 2926 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2927 2928 def test_del__log_destroy_pending_segfault(self): 2929 async def coro(): 2930 pass 2931 task = self.new_task(self.loop, coro()) 2932 self.loop.run_until_complete(task) 2933 with self.assertRaises(AttributeError): 2934 del task._log_destroy_pending 2935 2936 2937@unittest.skipUnless(hasattr(futures, '_CFuture') and 2938 hasattr(tasks, '_CTask'), 2939 'requires the C _asyncio module') 2940@add_subclass_tests 2941class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2942 2943 Task = getattr(tasks, '_CTask', None) 2944 Future = getattr(futures, '_CFuture', None) 2945 2946 2947@unittest.skipUnless(hasattr(tasks, '_CTask'), 2948 'requires the C _asyncio module') 2949@add_subclass_tests 2950class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2951 2952 Task = getattr(tasks, '_CTask', None) 2953 Future = futures._PyFuture 2954 2955 2956@unittest.skipUnless(hasattr(futures, '_CFuture'), 2957 'requires the C _asyncio module') 2958@add_subclass_tests 2959class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2960 2961 Future = getattr(futures, '_CFuture', None) 2962 Task = tasks._PyTask 2963 2964 2965@unittest.skipUnless(hasattr(tasks, '_CTask'), 2966 'requires the C _asyncio module') 2967class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2968 2969 Task = getattr(tasks, '_CTask', None) 2970 Future = futures._PyFuture 2971 2972 2973@unittest.skipUnless(hasattr(futures, '_CFuture'), 2974 'requires the C _asyncio module') 2975class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2976 2977 Task = tasks._PyTask 2978 Future = getattr(futures, '_CFuture', None) 2979 2980 2981class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2982 test_utils.TestCase): 2983 2984 Task = tasks._PyTask 2985 Future = futures._PyFuture 2986 2987 2988@add_subclass_tests 2989class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2990 Task = tasks._PyTask 2991 Future = futures._PyFuture 2992 2993 2994@unittest.skipUnless(hasattr(tasks, '_CTask'), 2995 'requires the C _asyncio module') 2996class CTask_Future_Tests(test_utils.TestCase): 2997 2998 def test_foobar(self): 2999 class Fut(asyncio.Future): 3000 @property 3001 def get_loop(self): 3002 raise AttributeError 3003 3004 async def coro(): 3005 await fut 3006 return 'spam' 3007 3008 self.loop = asyncio.new_event_loop() 3009 try: 3010 fut = Fut(loop=self.loop) 3011 self.loop.call_later(0.1, fut.set_result, 1) 3012 task = self.loop.create_task(coro()) 3013 res = self.loop.run_until_complete(task) 3014 finally: 3015 self.loop.close() 3016 3017 self.assertEqual(res, 'spam') 3018 3019 3020class BaseTaskIntrospectionTests: 3021 _register_task = None 3022 _unregister_task = None 3023 _enter_task = None 3024 _leave_task = None 3025 3026 def test__register_task_1(self): 3027 class TaskLike: 3028 @property 3029 def _loop(self): 3030 return loop 3031 3032 def done(self): 3033 return False 3034 3035 task = TaskLike() 3036 loop = mock.Mock() 3037 3038 self.assertEqual(asyncio.all_tasks(loop), set()) 3039 self._register_task(task) 3040 self.assertEqual(asyncio.all_tasks(loop), {task}) 3041 self._unregister_task(task) 3042 3043 def test__register_task_2(self): 3044 class TaskLike: 3045 def get_loop(self): 3046 return loop 3047 3048 def done(self): 3049 return False 3050 3051 task = TaskLike() 3052 loop = mock.Mock() 3053 3054 self.assertEqual(asyncio.all_tasks(loop), set()) 3055 self._register_task(task) 3056 self.assertEqual(asyncio.all_tasks(loop), {task}) 3057 self._unregister_task(task) 3058 3059 def test__register_task_3(self): 3060 class TaskLike: 3061 def get_loop(self): 3062 return loop 3063 3064 def done(self): 3065 return True 3066 3067 task = TaskLike() 3068 loop = mock.Mock() 3069 3070 self.assertEqual(asyncio.all_tasks(loop), set()) 3071 self._register_task(task) 3072 self.assertEqual(asyncio.all_tasks(loop), set()) 3073 self._unregister_task(task) 3074 3075 def test__enter_task(self): 3076 task = mock.Mock() 3077 loop = mock.Mock() 3078 self.assertIsNone(asyncio.current_task(loop)) 3079 self._enter_task(loop, task) 3080 self.assertIs(asyncio.current_task(loop), task) 3081 self._leave_task(loop, task) 3082 3083 def test__enter_task_failure(self): 3084 task1 = mock.Mock() 3085 task2 = mock.Mock() 3086 loop = mock.Mock() 3087 self._enter_task(loop, task1) 3088 with self.assertRaises(RuntimeError): 3089 self._enter_task(loop, task2) 3090 self.assertIs(asyncio.current_task(loop), task1) 3091 self._leave_task(loop, task1) 3092 3093 def test__leave_task(self): 3094 task = mock.Mock() 3095 loop = mock.Mock() 3096 self._enter_task(loop, task) 3097 self._leave_task(loop, task) 3098 self.assertIsNone(asyncio.current_task(loop)) 3099 3100 def test__leave_task_failure1(self): 3101 task1 = mock.Mock() 3102 task2 = mock.Mock() 3103 loop = mock.Mock() 3104 self._enter_task(loop, task1) 3105 with self.assertRaises(RuntimeError): 3106 self._leave_task(loop, task2) 3107 self.assertIs(asyncio.current_task(loop), task1) 3108 self._leave_task(loop, task1) 3109 3110 def test__leave_task_failure2(self): 3111 task = mock.Mock() 3112 loop = mock.Mock() 3113 with self.assertRaises(RuntimeError): 3114 self._leave_task(loop, task) 3115 self.assertIsNone(asyncio.current_task(loop)) 3116 3117 def test__unregister_task(self): 3118 task = mock.Mock() 3119 loop = mock.Mock() 3120 task.get_loop = lambda: loop 3121 self._register_task(task) 3122 self._unregister_task(task) 3123 self.assertEqual(asyncio.all_tasks(loop), set()) 3124 3125 def test__unregister_task_not_registered(self): 3126 task = mock.Mock() 3127 loop = mock.Mock() 3128 self._unregister_task(task) 3129 self.assertEqual(asyncio.all_tasks(loop), set()) 3130 3131 3132class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3133 _register_task = staticmethod(tasks._py_register_task) 3134 _unregister_task = staticmethod(tasks._py_unregister_task) 3135 _enter_task = staticmethod(tasks._py_enter_task) 3136 _leave_task = staticmethod(tasks._py_leave_task) 3137 3138 3139@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 3140 'requires the C _asyncio module') 3141class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3142 if hasattr(tasks, '_c_register_task'): 3143 _register_task = staticmethod(tasks._c_register_task) 3144 _unregister_task = staticmethod(tasks._c_unregister_task) 3145 _enter_task = staticmethod(tasks._c_enter_task) 3146 _leave_task = staticmethod(tasks._c_leave_task) 3147 else: 3148 _register_task = _unregister_task = _enter_task = _leave_task = None 3149 3150 3151class BaseCurrentLoopTests: 3152 3153 def setUp(self): 3154 super().setUp() 3155 self.loop = asyncio.new_event_loop() 3156 self.set_event_loop(self.loop) 3157 3158 def new_task(self, coro): 3159 raise NotImplementedError 3160 3161 def test_current_task_no_running_loop(self): 3162 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3163 3164 def test_current_task_no_running_loop_implicit(self): 3165 with self.assertRaises(RuntimeError): 3166 asyncio.current_task() 3167 3168 def test_current_task_with_implicit_loop(self): 3169 async def coro(): 3170 self.assertIs(asyncio.current_task(loop=self.loop), task) 3171 3172 self.assertIs(asyncio.current_task(None), task) 3173 self.assertIs(asyncio.current_task(), task) 3174 3175 task = self.new_task(coro()) 3176 self.loop.run_until_complete(task) 3177 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3178 3179 3180class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3181 3182 def new_task(self, coro): 3183 return tasks._PyTask(coro, loop=self.loop) 3184 3185 3186@unittest.skipUnless(hasattr(tasks, '_CTask'), 3187 'requires the C _asyncio module') 3188class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3189 3190 def new_task(self, coro): 3191 return getattr(tasks, '_CTask')(coro, loop=self.loop) 3192 3193 3194class GenericTaskTests(test_utils.TestCase): 3195 3196 def test_future_subclass(self): 3197 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 3198 3199 def test_asyncio_module_compiled(self): 3200 # Because of circular imports it's easy to make _asyncio 3201 # module non-importable. This is a simple test that will 3202 # fail on systems where C modules were successfully compiled 3203 # (hence the test for _functools), but _asyncio somehow didn't. 3204 try: 3205 import _functools 3206 except ImportError: 3207 pass 3208 else: 3209 try: 3210 import _asyncio 3211 except ImportError: 3212 self.fail('_asyncio module is missing') 3213 3214 3215class GatherTestsBase: 3216 3217 def setUp(self): 3218 super().setUp() 3219 self.one_loop = self.new_test_loop() 3220 self.other_loop = self.new_test_loop() 3221 self.set_event_loop(self.one_loop, cleanup=False) 3222 3223 def _run_loop(self, loop): 3224 while loop._ready: 3225 test_utils.run_briefly(loop) 3226 3227 def _check_success(self, **kwargs): 3228 a, b, c = [self.one_loop.create_future() for i in range(3)] 3229 fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) 3230 cb = test_utils.MockCallback() 3231 fut.add_done_callback(cb) 3232 b.set_result(1) 3233 a.set_result(2) 3234 self._run_loop(self.one_loop) 3235 self.assertEqual(cb.called, False) 3236 self.assertFalse(fut.done()) 3237 c.set_result(3) 3238 self._run_loop(self.one_loop) 3239 cb.assert_called_once_with(fut) 3240 self.assertEqual(fut.result(), [2, 1, 3]) 3241 3242 def test_success(self): 3243 self._check_success() 3244 self._check_success(return_exceptions=False) 3245 3246 def test_result_exception_success(self): 3247 self._check_success(return_exceptions=True) 3248 3249 def test_one_exception(self): 3250 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3251 fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) 3252 cb = test_utils.MockCallback() 3253 fut.add_done_callback(cb) 3254 exc = ZeroDivisionError() 3255 a.set_result(1) 3256 b.set_exception(exc) 3257 self._run_loop(self.one_loop) 3258 self.assertTrue(fut.done()) 3259 cb.assert_called_once_with(fut) 3260 self.assertIs(fut.exception(), exc) 3261 # Does nothing 3262 c.set_result(3) 3263 d.cancel() 3264 e.set_exception(RuntimeError()) 3265 e.exception() 3266 3267 def test_return_exceptions(self): 3268 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 3269 fut = asyncio.gather(*self.wrap_futures(a, b, c, d), 3270 return_exceptions=True) 3271 cb = test_utils.MockCallback() 3272 fut.add_done_callback(cb) 3273 exc = ZeroDivisionError() 3274 exc2 = RuntimeError() 3275 b.set_result(1) 3276 c.set_exception(exc) 3277 a.set_result(3) 3278 self._run_loop(self.one_loop) 3279 self.assertFalse(fut.done()) 3280 d.set_exception(exc2) 3281 self._run_loop(self.one_loop) 3282 self.assertTrue(fut.done()) 3283 cb.assert_called_once_with(fut) 3284 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 3285 3286 def test_env_var_debug(self): 3287 code = '\n'.join(( 3288 'import asyncio.coroutines', 3289 'print(asyncio.coroutines._DEBUG)')) 3290 3291 # Test with -E to not fail if the unit test was run with 3292 # PYTHONASYNCIODEBUG set to a non-empty string 3293 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 3294 self.assertEqual(stdout.rstrip(), b'False') 3295 3296 sts, stdout, stderr = assert_python_ok('-c', code, 3297 PYTHONASYNCIODEBUG='', 3298 PYTHONDEVMODE='') 3299 self.assertEqual(stdout.rstrip(), b'False') 3300 3301 sts, stdout, stderr = assert_python_ok('-c', code, 3302 PYTHONASYNCIODEBUG='1', 3303 PYTHONDEVMODE='') 3304 self.assertEqual(stdout.rstrip(), b'True') 3305 3306 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 3307 PYTHONASYNCIODEBUG='1', 3308 PYTHONDEVMODE='') 3309 self.assertEqual(stdout.rstrip(), b'False') 3310 3311 # -X dev 3312 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 3313 '-c', code) 3314 self.assertEqual(stdout.rstrip(), b'True') 3315 3316 3317class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 3318 3319 def wrap_futures(self, *futures): 3320 return futures 3321 3322 def _check_empty_sequence(self, seq_or_iter): 3323 asyncio.set_event_loop(self.one_loop) 3324 self.addCleanup(asyncio.set_event_loop, None) 3325 fut = asyncio.gather(*seq_or_iter) 3326 self.assertIsInstance(fut, asyncio.Future) 3327 self.assertIs(fut._loop, self.one_loop) 3328 self._run_loop(self.one_loop) 3329 self.assertTrue(fut.done()) 3330 self.assertEqual(fut.result(), []) 3331 with self.assertWarns(DeprecationWarning): 3332 fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) 3333 self.assertIs(fut._loop, self.other_loop) 3334 3335 def test_constructor_empty_sequence(self): 3336 self._check_empty_sequence([]) 3337 self._check_empty_sequence(()) 3338 self._check_empty_sequence(set()) 3339 self._check_empty_sequence(iter("")) 3340 3341 def test_constructor_heterogenous_futures(self): 3342 fut1 = self.one_loop.create_future() 3343 fut2 = self.other_loop.create_future() 3344 with self.assertRaises(ValueError): 3345 asyncio.gather(fut1, fut2) 3346 with self.assertRaises(ValueError): 3347 asyncio.gather(fut1, loop=self.other_loop) 3348 3349 def test_constructor_homogenous_futures(self): 3350 children = [self.other_loop.create_future() for i in range(3)] 3351 fut = asyncio.gather(*children) 3352 self.assertIs(fut._loop, self.other_loop) 3353 self._run_loop(self.other_loop) 3354 self.assertFalse(fut.done()) 3355 fut = asyncio.gather(*children, loop=self.other_loop) 3356 self.assertIs(fut._loop, self.other_loop) 3357 self._run_loop(self.other_loop) 3358 self.assertFalse(fut.done()) 3359 3360 def test_one_cancellation(self): 3361 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3362 fut = asyncio.gather(a, b, c, d, e) 3363 cb = test_utils.MockCallback() 3364 fut.add_done_callback(cb) 3365 a.set_result(1) 3366 b.cancel() 3367 self._run_loop(self.one_loop) 3368 self.assertTrue(fut.done()) 3369 cb.assert_called_once_with(fut) 3370 self.assertFalse(fut.cancelled()) 3371 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3372 # Does nothing 3373 c.set_result(3) 3374 d.cancel() 3375 e.set_exception(RuntimeError()) 3376 e.exception() 3377 3378 def test_result_exception_one_cancellation(self): 3379 a, b, c, d, e, f = [self.one_loop.create_future() 3380 for i in range(6)] 3381 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3382 cb = test_utils.MockCallback() 3383 fut.add_done_callback(cb) 3384 a.set_result(1) 3385 zde = ZeroDivisionError() 3386 b.set_exception(zde) 3387 c.cancel() 3388 self._run_loop(self.one_loop) 3389 self.assertFalse(fut.done()) 3390 d.set_result(3) 3391 e.cancel() 3392 rte = RuntimeError() 3393 f.set_exception(rte) 3394 res = self.one_loop.run_until_complete(fut) 3395 self.assertIsInstance(res[2], asyncio.CancelledError) 3396 self.assertIsInstance(res[4], asyncio.CancelledError) 3397 res[2] = res[4] = None 3398 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3399 cb.assert_called_once_with(fut) 3400 3401 3402class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3403 3404 def setUp(self): 3405 super().setUp() 3406 asyncio.set_event_loop(self.one_loop) 3407 3408 def wrap_futures(self, *futures): 3409 coros = [] 3410 for fut in futures: 3411 async def coro(fut=fut): 3412 return await fut 3413 coros.append(coro()) 3414 return coros 3415 3416 def test_constructor_loop_selection(self): 3417 async def coro(): 3418 return 'abc' 3419 gen1 = coro() 3420 gen2 = coro() 3421 fut = asyncio.gather(gen1, gen2) 3422 self.assertIs(fut._loop, self.one_loop) 3423 self.one_loop.run_until_complete(fut) 3424 3425 self.set_event_loop(self.other_loop, cleanup=False) 3426 gen3 = coro() 3427 gen4 = coro() 3428 fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) 3429 self.assertIs(fut2._loop, self.other_loop) 3430 self.other_loop.run_until_complete(fut2) 3431 3432 def test_duplicate_coroutines(self): 3433 with self.assertWarns(DeprecationWarning): 3434 @asyncio.coroutine 3435 def coro(s): 3436 return s 3437 c = coro('abc') 3438 fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) 3439 self._run_loop(self.one_loop) 3440 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3441 3442 def test_cancellation_broadcast(self): 3443 # Cancelling outer() cancels all children. 3444 proof = 0 3445 waiter = self.one_loop.create_future() 3446 3447 async def inner(): 3448 nonlocal proof 3449 await waiter 3450 proof += 1 3451 3452 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3453 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3454 gatherer = None 3455 3456 async def outer(): 3457 nonlocal proof, gatherer 3458 gatherer = asyncio.gather(child1, child2, loop=self.one_loop) 3459 await gatherer 3460 proof += 100 3461 3462 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3463 test_utils.run_briefly(self.one_loop) 3464 self.assertTrue(f.cancel()) 3465 with self.assertRaises(asyncio.CancelledError): 3466 self.one_loop.run_until_complete(f) 3467 self.assertFalse(gatherer.cancel()) 3468 self.assertTrue(waiter.cancelled()) 3469 self.assertTrue(child1.cancelled()) 3470 self.assertTrue(child2.cancelled()) 3471 test_utils.run_briefly(self.one_loop) 3472 self.assertEqual(proof, 0) 3473 3474 def test_exception_marking(self): 3475 # Test for the first line marked "Mark exception retrieved." 3476 3477 async def inner(f): 3478 await f 3479 raise RuntimeError('should not be ignored') 3480 3481 a = self.one_loop.create_future() 3482 b = self.one_loop.create_future() 3483 3484 async def outer(): 3485 await asyncio.gather(inner(a), inner(b), loop=self.one_loop) 3486 3487 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3488 test_utils.run_briefly(self.one_loop) 3489 a.set_result(None) 3490 test_utils.run_briefly(self.one_loop) 3491 b.set_result(None) 3492 test_utils.run_briefly(self.one_loop) 3493 self.assertIsInstance(f.exception(), RuntimeError) 3494 3495 3496class RunCoroutineThreadsafeTests(test_utils.TestCase): 3497 """Test case for asyncio.run_coroutine_threadsafe.""" 3498 3499 def setUp(self): 3500 super().setUp() 3501 self.loop = asyncio.new_event_loop() 3502 self.set_event_loop(self.loop) # Will cleanup properly 3503 3504 async def add(self, a, b, fail=False, cancel=False): 3505 """Wait 0.05 second and return a + b.""" 3506 await asyncio.sleep(0.05) 3507 if fail: 3508 raise RuntimeError("Fail!") 3509 if cancel: 3510 asyncio.current_task(self.loop).cancel() 3511 await asyncio.sleep(0) 3512 return a + b 3513 3514 def target(self, fail=False, cancel=False, timeout=None, 3515 advance_coro=False): 3516 """Run add coroutine in the event loop.""" 3517 coro = self.add(1, 2, fail=fail, cancel=cancel) 3518 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3519 if advance_coro: 3520 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3521 # otherwise it spills errors and breaks **other** unittests, since 3522 # 'target' is interacting with threads. 3523 3524 # With this call, `coro` will be advanced, so that 3525 # CoroWrapper.__del__ won't do anything when asyncio tests run 3526 # in debug mode. 3527 self.loop.call_soon_threadsafe(coro.send, None) 3528 try: 3529 return future.result(timeout) 3530 finally: 3531 future.done() or future.cancel() 3532 3533 def test_run_coroutine_threadsafe(self): 3534 """Test coroutine submission from a thread to an event loop.""" 3535 future = self.loop.run_in_executor(None, self.target) 3536 result = self.loop.run_until_complete(future) 3537 self.assertEqual(result, 3) 3538 3539 def test_run_coroutine_threadsafe_with_exception(self): 3540 """Test coroutine submission from a thread to an event loop 3541 when an exception is raised.""" 3542 future = self.loop.run_in_executor(None, self.target, True) 3543 with self.assertRaises(RuntimeError) as exc_context: 3544 self.loop.run_until_complete(future) 3545 self.assertIn("Fail!", exc_context.exception.args) 3546 3547 def test_run_coroutine_threadsafe_with_timeout(self): 3548 """Test coroutine submission from a thread to an event loop 3549 when a timeout is raised.""" 3550 callback = lambda: self.target(timeout=0) 3551 future = self.loop.run_in_executor(None, callback) 3552 with self.assertRaises(asyncio.TimeoutError): 3553 self.loop.run_until_complete(future) 3554 test_utils.run_briefly(self.loop) 3555 # Check that there's no pending task (add has been cancelled) 3556 for task in asyncio.all_tasks(self.loop): 3557 self.assertTrue(task.done()) 3558 3559 def test_run_coroutine_threadsafe_task_cancelled(self): 3560 """Test coroutine submission from a tread to an event loop 3561 when the task is cancelled.""" 3562 callback = lambda: self.target(cancel=True) 3563 future = self.loop.run_in_executor(None, callback) 3564 with self.assertRaises(asyncio.CancelledError): 3565 self.loop.run_until_complete(future) 3566 3567 def test_run_coroutine_threadsafe_task_factory_exception(self): 3568 """Test coroutine submission from a tread to an event loop 3569 when the task factory raise an exception.""" 3570 3571 def task_factory(loop, coro): 3572 raise NameError 3573 3574 run = self.loop.run_in_executor( 3575 None, lambda: self.target(advance_coro=True)) 3576 3577 # Set exception handler 3578 callback = test_utils.MockCallback() 3579 self.loop.set_exception_handler(callback) 3580 3581 # Set corrupted task factory 3582 self.addCleanup(self.loop.set_task_factory, 3583 self.loop.get_task_factory()) 3584 self.loop.set_task_factory(task_factory) 3585 3586 # Run event loop 3587 with self.assertRaises(NameError) as exc_context: 3588 self.loop.run_until_complete(run) 3589 3590 # Check exceptions 3591 self.assertEqual(len(callback.call_args_list), 1) 3592 (loop, context), kwargs = callback.call_args 3593 self.assertEqual(context['exception'], exc_context.exception) 3594 3595 3596class SleepTests(test_utils.TestCase): 3597 def setUp(self): 3598 super().setUp() 3599 self.loop = asyncio.new_event_loop() 3600 self.set_event_loop(self.loop) 3601 3602 def tearDown(self): 3603 self.loop.close() 3604 self.loop = None 3605 super().tearDown() 3606 3607 def test_sleep_zero(self): 3608 result = 0 3609 3610 def inc_result(num): 3611 nonlocal result 3612 result += num 3613 3614 async def coro(): 3615 self.loop.call_soon(inc_result, 1) 3616 self.assertEqual(result, 0) 3617 num = await asyncio.sleep(0, result=10) 3618 self.assertEqual(result, 1) # inc'ed by call_soon 3619 inc_result(num) # num should be 11 3620 3621 self.loop.run_until_complete(coro()) 3622 self.assertEqual(result, 11) 3623 3624 def test_loop_argument_is_deprecated(self): 3625 # Remove test when loop argument is removed in Python 3.10 3626 with self.assertWarns(DeprecationWarning): 3627 self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop)) 3628 3629 3630class WaitTests(test_utils.TestCase): 3631 def setUp(self): 3632 super().setUp() 3633 self.loop = asyncio.new_event_loop() 3634 self.set_event_loop(self.loop) 3635 3636 def tearDown(self): 3637 self.loop.close() 3638 self.loop = None 3639 super().tearDown() 3640 3641 def test_loop_argument_is_deprecated_in_wait(self): 3642 # Remove test when loop argument is removed in Python 3.10 3643 with self.assertWarns(DeprecationWarning): 3644 self.loop.run_until_complete( 3645 asyncio.wait([coroutine_function()], loop=self.loop)) 3646 3647 def test_loop_argument_is_deprecated_in_wait_for(self): 3648 # Remove test when loop argument is removed in Python 3.10 3649 with self.assertWarns(DeprecationWarning): 3650 self.loop.run_until_complete( 3651 asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop)) 3652 3653 def test_coro_is_deprecated_in_wait(self): 3654 # Remove test when passing coros to asyncio.wait() is removed in 3.11 3655 with self.assertWarns(DeprecationWarning): 3656 self.loop.run_until_complete( 3657 asyncio.wait([coroutine_function()])) 3658 3659 task = self.loop.create_task(coroutine_function()) 3660 with self.assertWarns(DeprecationWarning): 3661 self.loop.run_until_complete( 3662 asyncio.wait([task, coroutine_function()])) 3663 3664 3665class CompatibilityTests(test_utils.TestCase): 3666 # Tests for checking a bridge between old-styled coroutines 3667 # and async/await syntax 3668 3669 def setUp(self): 3670 super().setUp() 3671 self.loop = asyncio.new_event_loop() 3672 self.set_event_loop(self.loop) 3673 3674 def tearDown(self): 3675 self.loop.close() 3676 self.loop = None 3677 super().tearDown() 3678 3679 def test_yield_from_awaitable(self): 3680 3681 with self.assertWarns(DeprecationWarning): 3682 @asyncio.coroutine 3683 def coro(): 3684 yield from asyncio.sleep(0) 3685 return 'ok' 3686 3687 result = self.loop.run_until_complete(coro()) 3688 self.assertEqual('ok', result) 3689 3690 def test_await_old_style_coro(self): 3691 3692 with self.assertWarns(DeprecationWarning): 3693 @asyncio.coroutine 3694 def coro1(): 3695 return 'ok1' 3696 3697 with self.assertWarns(DeprecationWarning): 3698 @asyncio.coroutine 3699 def coro2(): 3700 yield from asyncio.sleep(0) 3701 return 'ok2' 3702 3703 async def inner(): 3704 return await asyncio.gather(coro1(), coro2(), loop=self.loop) 3705 3706 result = self.loop.run_until_complete(inner()) 3707 self.assertEqual(['ok1', 'ok2'], result) 3708 3709 def test_debug_mode_interop(self): 3710 # https://bugs.python.org/issue32636 3711 code = textwrap.dedent(""" 3712 import asyncio 3713 3714 async def native_coro(): 3715 pass 3716 3717 @asyncio.coroutine 3718 def old_style_coro(): 3719 yield from native_coro() 3720 3721 asyncio.run(old_style_coro()) 3722 """) 3723 3724 assert_python_ok("-Wignore::DeprecationWarning", "-c", code, 3725 PYTHONASYNCIODEBUG="1") 3726 3727 3728if __name__ == '__main__': 3729 unittest.main() 3730