1"""Tests for lock.py""" 2 3import unittest 4from unittest import mock 5import re 6 7import asyncio 8from test.test_asyncio import utils as test_utils 9 10STR_RGX_REPR = ( 11 r'^<(?P<class>.*?) object at (?P<address>.*?)' 12 r'\[(?P<extras>' 13 r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?' 14 r')\]>\Z' 15) 16RGX_REPR = re.compile(STR_RGX_REPR) 17 18 19class LockTests(test_utils.TestCase): 20 21 def setUp(self): 22 super().setUp() 23 self.loop = self.new_test_loop() 24 25 def test_ctor_loop(self): 26 loop = mock.Mock() 27 lock = asyncio.Lock(loop=loop) 28 self.assertIs(lock._loop, loop) 29 30 lock = asyncio.Lock(loop=self.loop) 31 self.assertIs(lock._loop, self.loop) 32 33 def test_ctor_noloop(self): 34 asyncio.set_event_loop(self.loop) 35 lock = asyncio.Lock() 36 self.assertIs(lock._loop, self.loop) 37 38 def test_repr(self): 39 lock = asyncio.Lock(loop=self.loop) 40 self.assertTrue(repr(lock).endswith('[unlocked]>')) 41 self.assertTrue(RGX_REPR.match(repr(lock))) 42 43 @asyncio.coroutine 44 def acquire_lock(): 45 with self.assertWarns(DeprecationWarning): 46 yield from lock 47 48 self.loop.run_until_complete(acquire_lock()) 49 self.assertTrue(repr(lock).endswith('[locked]>')) 50 self.assertTrue(RGX_REPR.match(repr(lock))) 51 52 def test_lock(self): 53 lock = asyncio.Lock(loop=self.loop) 54 55 @asyncio.coroutine 56 def acquire_lock(): 57 with self.assertWarns(DeprecationWarning): 58 return (yield from lock) 59 60 res = self.loop.run_until_complete(acquire_lock()) 61 62 self.assertTrue(res) 63 self.assertTrue(lock.locked()) 64 65 lock.release() 66 self.assertFalse(lock.locked()) 67 68 def test_lock_by_with_statement(self): 69 loop = asyncio.new_event_loop() # don't use TestLoop quirks 70 self.set_event_loop(loop) 71 primitives = [ 72 asyncio.Lock(loop=loop), 73 asyncio.Condition(loop=loop), 74 asyncio.Semaphore(loop=loop), 75 asyncio.BoundedSemaphore(loop=loop), 76 ] 77 78 @asyncio.coroutine 79 def test(lock): 80 yield from asyncio.sleep(0.01, loop=loop) 81 self.assertFalse(lock.locked()) 82 with self.assertWarns(DeprecationWarning): 83 with (yield from lock) as _lock: 84 self.assertIs(_lock, None) 85 self.assertTrue(lock.locked()) 86 yield from asyncio.sleep(0.01, loop=loop) 87 self.assertTrue(lock.locked()) 88 self.assertFalse(lock.locked()) 89 90 for primitive in primitives: 91 loop.run_until_complete(test(primitive)) 92 self.assertFalse(primitive.locked()) 93 94 def test_acquire(self): 95 lock = asyncio.Lock(loop=self.loop) 96 result = [] 97 98 self.assertTrue(self.loop.run_until_complete(lock.acquire())) 99 100 async def c1(result): 101 if await lock.acquire(): 102 result.append(1) 103 return True 104 105 async def c2(result): 106 if await lock.acquire(): 107 result.append(2) 108 return True 109 110 async def c3(result): 111 if await lock.acquire(): 112 result.append(3) 113 return True 114 115 t1 = asyncio.Task(c1(result), loop=self.loop) 116 t2 = asyncio.Task(c2(result), loop=self.loop) 117 118 test_utils.run_briefly(self.loop) 119 self.assertEqual([], result) 120 121 lock.release() 122 test_utils.run_briefly(self.loop) 123 self.assertEqual([1], result) 124 125 test_utils.run_briefly(self.loop) 126 self.assertEqual([1], result) 127 128 t3 = asyncio.Task(c3(result), loop=self.loop) 129 130 lock.release() 131 test_utils.run_briefly(self.loop) 132 self.assertEqual([1, 2], result) 133 134 lock.release() 135 test_utils.run_briefly(self.loop) 136 self.assertEqual([1, 2, 3], result) 137 138 self.assertTrue(t1.done()) 139 self.assertTrue(t1.result()) 140 self.assertTrue(t2.done()) 141 self.assertTrue(t2.result()) 142 self.assertTrue(t3.done()) 143 self.assertTrue(t3.result()) 144 145 def test_acquire_cancel(self): 146 lock = asyncio.Lock(loop=self.loop) 147 self.assertTrue(self.loop.run_until_complete(lock.acquire())) 148 149 task = asyncio.Task(lock.acquire(), loop=self.loop) 150 self.loop.call_soon(task.cancel) 151 self.assertRaises( 152 asyncio.CancelledError, 153 self.loop.run_until_complete, task) 154 self.assertFalse(lock._waiters) 155 156 def test_cancel_race(self): 157 # Several tasks: 158 # - A acquires the lock 159 # - B is blocked in acquire() 160 # - C is blocked in acquire() 161 # 162 # Now, concurrently: 163 # - B is cancelled 164 # - A releases the lock 165 # 166 # If B's waiter is marked cancelled but not yet removed from 167 # _waiters, A's release() call will crash when trying to set 168 # B's waiter; instead, it should move on to C's waiter. 169 170 # Setup: A has the lock, b and c are waiting. 171 lock = asyncio.Lock(loop=self.loop) 172 173 async def lockit(name, blocker): 174 await lock.acquire() 175 try: 176 if blocker is not None: 177 await blocker 178 finally: 179 lock.release() 180 181 fa = asyncio.Future(loop=self.loop) 182 ta = asyncio.Task(lockit('A', fa), loop=self.loop) 183 test_utils.run_briefly(self.loop) 184 self.assertTrue(lock.locked()) 185 tb = asyncio.Task(lockit('B', None), loop=self.loop) 186 test_utils.run_briefly(self.loop) 187 self.assertEqual(len(lock._waiters), 1) 188 tc = asyncio.Task(lockit('C', None), loop=self.loop) 189 test_utils.run_briefly(self.loop) 190 self.assertEqual(len(lock._waiters), 2) 191 192 # Create the race and check. 193 # Without the fix this failed at the last assert. 194 fa.set_result(None) 195 tb.cancel() 196 self.assertTrue(lock._waiters[0].cancelled()) 197 test_utils.run_briefly(self.loop) 198 self.assertFalse(lock.locked()) 199 self.assertTrue(ta.done()) 200 self.assertTrue(tb.cancelled()) 201 self.assertTrue(tc.done()) 202 203 def test_cancel_release_race(self): 204 # Issue 32734 205 # Acquire 4 locks, cancel second, release first 206 # and 2 locks are taken at once. 207 lock = asyncio.Lock(loop=self.loop) 208 lock_count = 0 209 call_count = 0 210 211 async def lockit(): 212 nonlocal lock_count 213 nonlocal call_count 214 call_count += 1 215 await lock.acquire() 216 lock_count += 1 217 218 async def lockandtrigger(): 219 await lock.acquire() 220 self.loop.call_soon(trigger) 221 222 def trigger(): 223 t1.cancel() 224 lock.release() 225 226 t0 = self.loop.create_task(lockandtrigger()) 227 t1 = self.loop.create_task(lockit()) 228 t2 = self.loop.create_task(lockit()) 229 t3 = self.loop.create_task(lockit()) 230 231 # First loop acquires all 232 test_utils.run_briefly(self.loop) 233 self.assertTrue(t0.done()) 234 235 # Second loop calls trigger 236 test_utils.run_briefly(self.loop) 237 # Third loop calls cancellation 238 test_utils.run_briefly(self.loop) 239 240 # Make sure only one lock was taken 241 self.assertEqual(lock_count, 1) 242 # While 3 calls were made to lockit() 243 self.assertEqual(call_count, 3) 244 self.assertTrue(t1.cancelled() and t2.done()) 245 246 # Cleanup the task that is stuck on acquire. 247 t3.cancel() 248 test_utils.run_briefly(self.loop) 249 self.assertTrue(t3.cancelled()) 250 251 def test_finished_waiter_cancelled(self): 252 lock = asyncio.Lock(loop=self.loop) 253 254 ta = asyncio.Task(lock.acquire(), loop=self.loop) 255 test_utils.run_briefly(self.loop) 256 self.assertTrue(lock.locked()) 257 258 tb = asyncio.Task(lock.acquire(), loop=self.loop) 259 test_utils.run_briefly(self.loop) 260 self.assertEqual(len(lock._waiters), 1) 261 262 # Create a second waiter, wake up the first, and cancel it. 263 # Without the fix, the second was not woken up. 264 tc = asyncio.Task(lock.acquire(), loop=self.loop) 265 lock.release() 266 tb.cancel() 267 test_utils.run_briefly(self.loop) 268 269 self.assertTrue(lock.locked()) 270 self.assertTrue(ta.done()) 271 self.assertTrue(tb.cancelled()) 272 273 def test_release_not_acquired(self): 274 lock = asyncio.Lock(loop=self.loop) 275 276 self.assertRaises(RuntimeError, lock.release) 277 278 def test_release_no_waiters(self): 279 lock = asyncio.Lock(loop=self.loop) 280 self.loop.run_until_complete(lock.acquire()) 281 self.assertTrue(lock.locked()) 282 283 lock.release() 284 self.assertFalse(lock.locked()) 285 286 def test_context_manager(self): 287 lock = asyncio.Lock(loop=self.loop) 288 289 @asyncio.coroutine 290 def acquire_lock(): 291 with self.assertWarns(DeprecationWarning): 292 return (yield from lock) 293 294 with self.loop.run_until_complete(acquire_lock()): 295 self.assertTrue(lock.locked()) 296 297 self.assertFalse(lock.locked()) 298 299 def test_context_manager_cant_reuse(self): 300 lock = asyncio.Lock(loop=self.loop) 301 302 @asyncio.coroutine 303 def acquire_lock(): 304 with self.assertWarns(DeprecationWarning): 305 return (yield from lock) 306 307 # This spells "yield from lock" outside a generator. 308 cm = self.loop.run_until_complete(acquire_lock()) 309 with cm: 310 self.assertTrue(lock.locked()) 311 312 self.assertFalse(lock.locked()) 313 314 with self.assertRaises(AttributeError): 315 with cm: 316 pass 317 318 def test_context_manager_no_yield(self): 319 lock = asyncio.Lock(loop=self.loop) 320 321 try: 322 with lock: 323 self.fail('RuntimeError is not raised in with expression') 324 except RuntimeError as err: 325 self.assertEqual( 326 str(err), 327 '"yield from" should be used as context manager expression') 328 329 self.assertFalse(lock.locked()) 330 331 332class EventTests(test_utils.TestCase): 333 334 def setUp(self): 335 super().setUp() 336 self.loop = self.new_test_loop() 337 338 def test_ctor_loop(self): 339 loop = mock.Mock() 340 ev = asyncio.Event(loop=loop) 341 self.assertIs(ev._loop, loop) 342 343 ev = asyncio.Event(loop=self.loop) 344 self.assertIs(ev._loop, self.loop) 345 346 def test_ctor_noloop(self): 347 asyncio.set_event_loop(self.loop) 348 ev = asyncio.Event() 349 self.assertIs(ev._loop, self.loop) 350 351 def test_repr(self): 352 ev = asyncio.Event(loop=self.loop) 353 self.assertTrue(repr(ev).endswith('[unset]>')) 354 match = RGX_REPR.match(repr(ev)) 355 self.assertEqual(match.group('extras'), 'unset') 356 357 ev.set() 358 self.assertTrue(repr(ev).endswith('[set]>')) 359 self.assertTrue(RGX_REPR.match(repr(ev))) 360 361 ev._waiters.append(mock.Mock()) 362 self.assertTrue('waiters:1' in repr(ev)) 363 self.assertTrue(RGX_REPR.match(repr(ev))) 364 365 def test_wait(self): 366 ev = asyncio.Event(loop=self.loop) 367 self.assertFalse(ev.is_set()) 368 369 result = [] 370 371 async def c1(result): 372 if await ev.wait(): 373 result.append(1) 374 375 async def c2(result): 376 if await ev.wait(): 377 result.append(2) 378 379 async def c3(result): 380 if await ev.wait(): 381 result.append(3) 382 383 t1 = asyncio.Task(c1(result), loop=self.loop) 384 t2 = asyncio.Task(c2(result), loop=self.loop) 385 386 test_utils.run_briefly(self.loop) 387 self.assertEqual([], result) 388 389 t3 = asyncio.Task(c3(result), loop=self.loop) 390 391 ev.set() 392 test_utils.run_briefly(self.loop) 393 self.assertEqual([3, 1, 2], result) 394 395 self.assertTrue(t1.done()) 396 self.assertIsNone(t1.result()) 397 self.assertTrue(t2.done()) 398 self.assertIsNone(t2.result()) 399 self.assertTrue(t3.done()) 400 self.assertIsNone(t3.result()) 401 402 def test_wait_on_set(self): 403 ev = asyncio.Event(loop=self.loop) 404 ev.set() 405 406 res = self.loop.run_until_complete(ev.wait()) 407 self.assertTrue(res) 408 409 def test_wait_cancel(self): 410 ev = asyncio.Event(loop=self.loop) 411 412 wait = asyncio.Task(ev.wait(), loop=self.loop) 413 self.loop.call_soon(wait.cancel) 414 self.assertRaises( 415 asyncio.CancelledError, 416 self.loop.run_until_complete, wait) 417 self.assertFalse(ev._waiters) 418 419 def test_clear(self): 420 ev = asyncio.Event(loop=self.loop) 421 self.assertFalse(ev.is_set()) 422 423 ev.set() 424 self.assertTrue(ev.is_set()) 425 426 ev.clear() 427 self.assertFalse(ev.is_set()) 428 429 def test_clear_with_waiters(self): 430 ev = asyncio.Event(loop=self.loop) 431 result = [] 432 433 async def c1(result): 434 if await ev.wait(): 435 result.append(1) 436 return True 437 438 t = asyncio.Task(c1(result), loop=self.loop) 439 test_utils.run_briefly(self.loop) 440 self.assertEqual([], result) 441 442 ev.set() 443 ev.clear() 444 self.assertFalse(ev.is_set()) 445 446 ev.set() 447 ev.set() 448 self.assertEqual(1, len(ev._waiters)) 449 450 test_utils.run_briefly(self.loop) 451 self.assertEqual([1], result) 452 self.assertEqual(0, len(ev._waiters)) 453 454 self.assertTrue(t.done()) 455 self.assertTrue(t.result()) 456 457 458class ConditionTests(test_utils.TestCase): 459 460 def setUp(self): 461 super().setUp() 462 self.loop = self.new_test_loop() 463 464 def test_ctor_loop(self): 465 loop = mock.Mock() 466 cond = asyncio.Condition(loop=loop) 467 self.assertIs(cond._loop, loop) 468 469 cond = asyncio.Condition(loop=self.loop) 470 self.assertIs(cond._loop, self.loop) 471 472 def test_ctor_noloop(self): 473 asyncio.set_event_loop(self.loop) 474 cond = asyncio.Condition() 475 self.assertIs(cond._loop, self.loop) 476 477 def test_wait(self): 478 cond = asyncio.Condition(loop=self.loop) 479 result = [] 480 481 async def c1(result): 482 await cond.acquire() 483 if await cond.wait(): 484 result.append(1) 485 return True 486 487 async def c2(result): 488 await cond.acquire() 489 if await cond.wait(): 490 result.append(2) 491 return True 492 493 async def c3(result): 494 await cond.acquire() 495 if await cond.wait(): 496 result.append(3) 497 return True 498 499 t1 = asyncio.Task(c1(result), loop=self.loop) 500 t2 = asyncio.Task(c2(result), loop=self.loop) 501 t3 = asyncio.Task(c3(result), loop=self.loop) 502 503 test_utils.run_briefly(self.loop) 504 self.assertEqual([], result) 505 self.assertFalse(cond.locked()) 506 507 self.assertTrue(self.loop.run_until_complete(cond.acquire())) 508 cond.notify() 509 test_utils.run_briefly(self.loop) 510 self.assertEqual([], result) 511 self.assertTrue(cond.locked()) 512 513 cond.release() 514 test_utils.run_briefly(self.loop) 515 self.assertEqual([1], result) 516 self.assertTrue(cond.locked()) 517 518 cond.notify(2) 519 test_utils.run_briefly(self.loop) 520 self.assertEqual([1], result) 521 self.assertTrue(cond.locked()) 522 523 cond.release() 524 test_utils.run_briefly(self.loop) 525 self.assertEqual([1, 2], result) 526 self.assertTrue(cond.locked()) 527 528 cond.release() 529 test_utils.run_briefly(self.loop) 530 self.assertEqual([1, 2, 3], result) 531 self.assertTrue(cond.locked()) 532 533 self.assertTrue(t1.done()) 534 self.assertTrue(t1.result()) 535 self.assertTrue(t2.done()) 536 self.assertTrue(t2.result()) 537 self.assertTrue(t3.done()) 538 self.assertTrue(t3.result()) 539 540 def test_wait_cancel(self): 541 cond = asyncio.Condition(loop=self.loop) 542 self.loop.run_until_complete(cond.acquire()) 543 544 wait = asyncio.Task(cond.wait(), loop=self.loop) 545 self.loop.call_soon(wait.cancel) 546 self.assertRaises( 547 asyncio.CancelledError, 548 self.loop.run_until_complete, wait) 549 self.assertFalse(cond._waiters) 550 self.assertTrue(cond.locked()) 551 552 def test_wait_cancel_contested(self): 553 cond = asyncio.Condition(loop=self.loop) 554 555 self.loop.run_until_complete(cond.acquire()) 556 self.assertTrue(cond.locked()) 557 558 wait_task = asyncio.Task(cond.wait(), loop=self.loop) 559 test_utils.run_briefly(self.loop) 560 self.assertFalse(cond.locked()) 561 562 # Notify, but contest the lock before cancelling 563 self.loop.run_until_complete(cond.acquire()) 564 self.assertTrue(cond.locked()) 565 cond.notify() 566 self.loop.call_soon(wait_task.cancel) 567 self.loop.call_soon(cond.release) 568 569 try: 570 self.loop.run_until_complete(wait_task) 571 except asyncio.CancelledError: 572 # Should not happen, since no cancellation points 573 pass 574 575 self.assertTrue(cond.locked()) 576 577 def test_wait_cancel_after_notify(self): 578 # See bpo-32841 579 cond = asyncio.Condition(loop=self.loop) 580 waited = False 581 582 async def wait_on_cond(): 583 nonlocal waited 584 async with cond: 585 waited = True # Make sure this area was reached 586 await cond.wait() 587 588 waiter = asyncio.ensure_future(wait_on_cond(), loop=self.loop) 589 test_utils.run_briefly(self.loop) # Start waiting 590 591 self.loop.run_until_complete(cond.acquire()) 592 cond.notify() 593 test_utils.run_briefly(self.loop) # Get to acquire() 594 waiter.cancel() 595 test_utils.run_briefly(self.loop) # Activate cancellation 596 cond.release() 597 test_utils.run_briefly(self.loop) # Cancellation should occur 598 599 self.assertTrue(waiter.cancelled()) 600 self.assertTrue(waited) 601 602 def test_wait_unacquired(self): 603 cond = asyncio.Condition(loop=self.loop) 604 self.assertRaises( 605 RuntimeError, 606 self.loop.run_until_complete, cond.wait()) 607 608 def test_wait_for(self): 609 cond = asyncio.Condition(loop=self.loop) 610 presult = False 611 612 def predicate(): 613 return presult 614 615 result = [] 616 617 async def c1(result): 618 await cond.acquire() 619 if await cond.wait_for(predicate): 620 result.append(1) 621 cond.release() 622 return True 623 624 t = asyncio.Task(c1(result), loop=self.loop) 625 626 test_utils.run_briefly(self.loop) 627 self.assertEqual([], result) 628 629 self.loop.run_until_complete(cond.acquire()) 630 cond.notify() 631 cond.release() 632 test_utils.run_briefly(self.loop) 633 self.assertEqual([], result) 634 635 presult = True 636 self.loop.run_until_complete(cond.acquire()) 637 cond.notify() 638 cond.release() 639 test_utils.run_briefly(self.loop) 640 self.assertEqual([1], result) 641 642 self.assertTrue(t.done()) 643 self.assertTrue(t.result()) 644 645 def test_wait_for_unacquired(self): 646 cond = asyncio.Condition(loop=self.loop) 647 648 # predicate can return true immediately 649 res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3])) 650 self.assertEqual([1, 2, 3], res) 651 652 self.assertRaises( 653 RuntimeError, 654 self.loop.run_until_complete, 655 cond.wait_for(lambda: False)) 656 657 def test_notify(self): 658 cond = asyncio.Condition(loop=self.loop) 659 result = [] 660 661 async def c1(result): 662 await cond.acquire() 663 if await cond.wait(): 664 result.append(1) 665 cond.release() 666 return True 667 668 async def c2(result): 669 await cond.acquire() 670 if await cond.wait(): 671 result.append(2) 672 cond.release() 673 return True 674 675 async def c3(result): 676 await cond.acquire() 677 if await cond.wait(): 678 result.append(3) 679 cond.release() 680 return True 681 682 t1 = asyncio.Task(c1(result), loop=self.loop) 683 t2 = asyncio.Task(c2(result), loop=self.loop) 684 t3 = asyncio.Task(c3(result), loop=self.loop) 685 686 test_utils.run_briefly(self.loop) 687 self.assertEqual([], result) 688 689 self.loop.run_until_complete(cond.acquire()) 690 cond.notify(1) 691 cond.release() 692 test_utils.run_briefly(self.loop) 693 self.assertEqual([1], result) 694 695 self.loop.run_until_complete(cond.acquire()) 696 cond.notify(1) 697 cond.notify(2048) 698 cond.release() 699 test_utils.run_briefly(self.loop) 700 self.assertEqual([1, 2, 3], result) 701 702 self.assertTrue(t1.done()) 703 self.assertTrue(t1.result()) 704 self.assertTrue(t2.done()) 705 self.assertTrue(t2.result()) 706 self.assertTrue(t3.done()) 707 self.assertTrue(t3.result()) 708 709 def test_notify_all(self): 710 cond = asyncio.Condition(loop=self.loop) 711 712 result = [] 713 714 async def c1(result): 715 await cond.acquire() 716 if await cond.wait(): 717 result.append(1) 718 cond.release() 719 return True 720 721 async def c2(result): 722 await cond.acquire() 723 if await cond.wait(): 724 result.append(2) 725 cond.release() 726 return True 727 728 t1 = asyncio.Task(c1(result), loop=self.loop) 729 t2 = asyncio.Task(c2(result), loop=self.loop) 730 731 test_utils.run_briefly(self.loop) 732 self.assertEqual([], result) 733 734 self.loop.run_until_complete(cond.acquire()) 735 cond.notify_all() 736 cond.release() 737 test_utils.run_briefly(self.loop) 738 self.assertEqual([1, 2], result) 739 740 self.assertTrue(t1.done()) 741 self.assertTrue(t1.result()) 742 self.assertTrue(t2.done()) 743 self.assertTrue(t2.result()) 744 745 def test_notify_unacquired(self): 746 cond = asyncio.Condition(loop=self.loop) 747 self.assertRaises(RuntimeError, cond.notify) 748 749 def test_notify_all_unacquired(self): 750 cond = asyncio.Condition(loop=self.loop) 751 self.assertRaises(RuntimeError, cond.notify_all) 752 753 def test_repr(self): 754 cond = asyncio.Condition(loop=self.loop) 755 self.assertTrue('unlocked' in repr(cond)) 756 self.assertTrue(RGX_REPR.match(repr(cond))) 757 758 self.loop.run_until_complete(cond.acquire()) 759 self.assertTrue('locked' in repr(cond)) 760 761 cond._waiters.append(mock.Mock()) 762 self.assertTrue('waiters:1' in repr(cond)) 763 self.assertTrue(RGX_REPR.match(repr(cond))) 764 765 cond._waiters.append(mock.Mock()) 766 self.assertTrue('waiters:2' in repr(cond)) 767 self.assertTrue(RGX_REPR.match(repr(cond))) 768 769 def test_context_manager(self): 770 cond = asyncio.Condition(loop=self.loop) 771 772 @asyncio.coroutine 773 def acquire_cond(): 774 with self.assertWarns(DeprecationWarning): 775 return (yield from cond) 776 777 with self.loop.run_until_complete(acquire_cond()): 778 self.assertTrue(cond.locked()) 779 780 self.assertFalse(cond.locked()) 781 782 def test_context_manager_no_yield(self): 783 cond = asyncio.Condition(loop=self.loop) 784 785 try: 786 with cond: 787 self.fail('RuntimeError is not raised in with expression') 788 except RuntimeError as err: 789 self.assertEqual( 790 str(err), 791 '"yield from" should be used as context manager expression') 792 793 self.assertFalse(cond.locked()) 794 795 def test_explicit_lock(self): 796 lock = asyncio.Lock(loop=self.loop) 797 cond = asyncio.Condition(lock, loop=self.loop) 798 799 self.assertIs(cond._lock, lock) 800 self.assertIs(cond._loop, lock._loop) 801 802 def test_ambiguous_loops(self): 803 loop = self.new_test_loop() 804 self.addCleanup(loop.close) 805 806 lock = asyncio.Lock(loop=self.loop) 807 with self.assertRaises(ValueError): 808 asyncio.Condition(lock, loop=loop) 809 810 def test_timeout_in_block(self): 811 loop = asyncio.new_event_loop() 812 self.addCleanup(loop.close) 813 814 async def task_timeout(): 815 condition = asyncio.Condition(loop=loop) 816 async with condition: 817 with self.assertRaises(asyncio.TimeoutError): 818 await asyncio.wait_for(condition.wait(), timeout=0.5, 819 loop=loop) 820 821 loop.run_until_complete(task_timeout()) 822 823 824class SemaphoreTests(test_utils.TestCase): 825 826 def setUp(self): 827 super().setUp() 828 self.loop = self.new_test_loop() 829 830 def test_ctor_loop(self): 831 loop = mock.Mock() 832 sem = asyncio.Semaphore(loop=loop) 833 self.assertIs(sem._loop, loop) 834 835 sem = asyncio.Semaphore(loop=self.loop) 836 self.assertIs(sem._loop, self.loop) 837 838 def test_ctor_noloop(self): 839 asyncio.set_event_loop(self.loop) 840 sem = asyncio.Semaphore() 841 self.assertIs(sem._loop, self.loop) 842 843 def test_initial_value_zero(self): 844 sem = asyncio.Semaphore(0, loop=self.loop) 845 self.assertTrue(sem.locked()) 846 847 def test_repr(self): 848 sem = asyncio.Semaphore(loop=self.loop) 849 self.assertTrue(repr(sem).endswith('[unlocked, value:1]>')) 850 self.assertTrue(RGX_REPR.match(repr(sem))) 851 852 self.loop.run_until_complete(sem.acquire()) 853 self.assertTrue(repr(sem).endswith('[locked]>')) 854 self.assertTrue('waiters' not in repr(sem)) 855 self.assertTrue(RGX_REPR.match(repr(sem))) 856 857 sem._waiters.append(mock.Mock()) 858 self.assertTrue('waiters:1' in repr(sem)) 859 self.assertTrue(RGX_REPR.match(repr(sem))) 860 861 sem._waiters.append(mock.Mock()) 862 self.assertTrue('waiters:2' in repr(sem)) 863 self.assertTrue(RGX_REPR.match(repr(sem))) 864 865 def test_semaphore(self): 866 sem = asyncio.Semaphore(loop=self.loop) 867 self.assertEqual(1, sem._value) 868 869 @asyncio.coroutine 870 def acquire_lock(): 871 with self.assertWarns(DeprecationWarning): 872 return (yield from sem) 873 874 res = self.loop.run_until_complete(acquire_lock()) 875 876 self.assertTrue(res) 877 self.assertTrue(sem.locked()) 878 self.assertEqual(0, sem._value) 879 880 sem.release() 881 self.assertFalse(sem.locked()) 882 self.assertEqual(1, sem._value) 883 884 def test_semaphore_value(self): 885 self.assertRaises(ValueError, asyncio.Semaphore, -1) 886 887 def test_acquire(self): 888 sem = asyncio.Semaphore(3, loop=self.loop) 889 result = [] 890 891 self.assertTrue(self.loop.run_until_complete(sem.acquire())) 892 self.assertTrue(self.loop.run_until_complete(sem.acquire())) 893 self.assertFalse(sem.locked()) 894 895 async def c1(result): 896 await sem.acquire() 897 result.append(1) 898 return True 899 900 async def c2(result): 901 await sem.acquire() 902 result.append(2) 903 return True 904 905 async def c3(result): 906 await sem.acquire() 907 result.append(3) 908 return True 909 910 async def c4(result): 911 await sem.acquire() 912 result.append(4) 913 return True 914 915 t1 = asyncio.Task(c1(result), loop=self.loop) 916 t2 = asyncio.Task(c2(result), loop=self.loop) 917 t3 = asyncio.Task(c3(result), loop=self.loop) 918 919 test_utils.run_briefly(self.loop) 920 self.assertEqual([1], result) 921 self.assertTrue(sem.locked()) 922 self.assertEqual(2, len(sem._waiters)) 923 self.assertEqual(0, sem._value) 924 925 t4 = asyncio.Task(c4(result), loop=self.loop) 926 927 sem.release() 928 sem.release() 929 self.assertEqual(2, sem._value) 930 931 test_utils.run_briefly(self.loop) 932 self.assertEqual(0, sem._value) 933 self.assertEqual(3, len(result)) 934 self.assertTrue(sem.locked()) 935 self.assertEqual(1, len(sem._waiters)) 936 self.assertEqual(0, sem._value) 937 938 self.assertTrue(t1.done()) 939 self.assertTrue(t1.result()) 940 race_tasks = [t2, t3, t4] 941 done_tasks = [t for t in race_tasks if t.done() and t.result()] 942 self.assertTrue(2, len(done_tasks)) 943 944 # cleanup locked semaphore 945 sem.release() 946 self.loop.run_until_complete(asyncio.gather(*race_tasks)) 947 948 def test_acquire_cancel(self): 949 sem = asyncio.Semaphore(loop=self.loop) 950 self.loop.run_until_complete(sem.acquire()) 951 952 acquire = asyncio.Task(sem.acquire(), loop=self.loop) 953 self.loop.call_soon(acquire.cancel) 954 self.assertRaises( 955 asyncio.CancelledError, 956 self.loop.run_until_complete, acquire) 957 self.assertTrue((not sem._waiters) or 958 all(waiter.done() for waiter in sem._waiters)) 959 960 def test_acquire_cancel_before_awoken(self): 961 sem = asyncio.Semaphore(value=0, loop=self.loop) 962 963 t1 = asyncio.Task(sem.acquire(), loop=self.loop) 964 t2 = asyncio.Task(sem.acquire(), loop=self.loop) 965 t3 = asyncio.Task(sem.acquire(), loop=self.loop) 966 t4 = asyncio.Task(sem.acquire(), loop=self.loop) 967 968 test_utils.run_briefly(self.loop) 969 970 sem.release() 971 t1.cancel() 972 t2.cancel() 973 974 test_utils.run_briefly(self.loop) 975 num_done = sum(t.done() for t in [t3, t4]) 976 self.assertEqual(num_done, 1) 977 978 t3.cancel() 979 t4.cancel() 980 test_utils.run_briefly(self.loop) 981 982 def test_acquire_hang(self): 983 sem = asyncio.Semaphore(value=0, loop=self.loop) 984 985 t1 = asyncio.Task(sem.acquire(), loop=self.loop) 986 t2 = asyncio.Task(sem.acquire(), loop=self.loop) 987 988 test_utils.run_briefly(self.loop) 989 990 sem.release() 991 t1.cancel() 992 993 test_utils.run_briefly(self.loop) 994 self.assertTrue(sem.locked()) 995 996 def test_release_not_acquired(self): 997 sem = asyncio.BoundedSemaphore(loop=self.loop) 998 999 self.assertRaises(ValueError, sem.release) 1000 1001 def test_release_no_waiters(self): 1002 sem = asyncio.Semaphore(loop=self.loop) 1003 self.loop.run_until_complete(sem.acquire()) 1004 self.assertTrue(sem.locked()) 1005 1006 sem.release() 1007 self.assertFalse(sem.locked()) 1008 1009 def test_context_manager(self): 1010 sem = asyncio.Semaphore(2, loop=self.loop) 1011 1012 @asyncio.coroutine 1013 def acquire_lock(): 1014 with self.assertWarns(DeprecationWarning): 1015 return (yield from sem) 1016 1017 with self.loop.run_until_complete(acquire_lock()): 1018 self.assertFalse(sem.locked()) 1019 self.assertEqual(1, sem._value) 1020 1021 with self.loop.run_until_complete(acquire_lock()): 1022 self.assertTrue(sem.locked()) 1023 1024 self.assertEqual(2, sem._value) 1025 1026 def test_context_manager_no_yield(self): 1027 sem = asyncio.Semaphore(2, loop=self.loop) 1028 1029 try: 1030 with sem: 1031 self.fail('RuntimeError is not raised in with expression') 1032 except RuntimeError as err: 1033 self.assertEqual( 1034 str(err), 1035 '"yield from" should be used as context manager expression') 1036 1037 self.assertEqual(2, sem._value) 1038 1039 1040if __name__ == '__main__': 1041 unittest.main() 1042