1"""Tests for lock.py"""
2
3import unittest
4from unittest import mock
5import re
6
7import asyncio
8from test.test_asyncio import utils as test_utils
9
10STR_RGX_REPR = (
11    r'^<(?P<class>.*?) object at (?P<address>.*?)'
12    r'\[(?P<extras>'
13    r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?'
14    r')\]>\Z'
15)
16RGX_REPR = re.compile(STR_RGX_REPR)
17
18
19class LockTests(test_utils.TestCase):
20
21    def setUp(self):
22        super().setUp()
23        self.loop = self.new_test_loop()
24
25    def test_ctor_loop(self):
26        loop = mock.Mock()
27        lock = asyncio.Lock(loop=loop)
28        self.assertIs(lock._loop, loop)
29
30        lock = asyncio.Lock(loop=self.loop)
31        self.assertIs(lock._loop, self.loop)
32
33    def test_ctor_noloop(self):
34        asyncio.set_event_loop(self.loop)
35        lock = asyncio.Lock()
36        self.assertIs(lock._loop, self.loop)
37
38    def test_repr(self):
39        lock = asyncio.Lock(loop=self.loop)
40        self.assertTrue(repr(lock).endswith('[unlocked]>'))
41        self.assertTrue(RGX_REPR.match(repr(lock)))
42
43        @asyncio.coroutine
44        def acquire_lock():
45            with self.assertWarns(DeprecationWarning):
46                yield from lock
47
48        self.loop.run_until_complete(acquire_lock())
49        self.assertTrue(repr(lock).endswith('[locked]>'))
50        self.assertTrue(RGX_REPR.match(repr(lock)))
51
52    def test_lock(self):
53        lock = asyncio.Lock(loop=self.loop)
54
55        @asyncio.coroutine
56        def acquire_lock():
57            with self.assertWarns(DeprecationWarning):
58                return (yield from lock)
59
60        res = self.loop.run_until_complete(acquire_lock())
61
62        self.assertTrue(res)
63        self.assertTrue(lock.locked())
64
65        lock.release()
66        self.assertFalse(lock.locked())
67
68    def test_lock_by_with_statement(self):
69        loop = asyncio.new_event_loop()  # don't use TestLoop quirks
70        self.set_event_loop(loop)
71        primitives = [
72            asyncio.Lock(loop=loop),
73            asyncio.Condition(loop=loop),
74            asyncio.Semaphore(loop=loop),
75            asyncio.BoundedSemaphore(loop=loop),
76        ]
77
78        @asyncio.coroutine
79        def test(lock):
80            yield from asyncio.sleep(0.01, loop=loop)
81            self.assertFalse(lock.locked())
82            with self.assertWarns(DeprecationWarning):
83                with (yield from lock) as _lock:
84                    self.assertIs(_lock, None)
85                    self.assertTrue(lock.locked())
86                    yield from asyncio.sleep(0.01, loop=loop)
87                    self.assertTrue(lock.locked())
88                self.assertFalse(lock.locked())
89
90        for primitive in primitives:
91            loop.run_until_complete(test(primitive))
92            self.assertFalse(primitive.locked())
93
94    def test_acquire(self):
95        lock = asyncio.Lock(loop=self.loop)
96        result = []
97
98        self.assertTrue(self.loop.run_until_complete(lock.acquire()))
99
100        async def c1(result):
101            if await lock.acquire():
102                result.append(1)
103            return True
104
105        async def c2(result):
106            if await lock.acquire():
107                result.append(2)
108            return True
109
110        async def c3(result):
111            if await lock.acquire():
112                result.append(3)
113            return True
114
115        t1 = asyncio.Task(c1(result), loop=self.loop)
116        t2 = asyncio.Task(c2(result), loop=self.loop)
117
118        test_utils.run_briefly(self.loop)
119        self.assertEqual([], result)
120
121        lock.release()
122        test_utils.run_briefly(self.loop)
123        self.assertEqual([1], result)
124
125        test_utils.run_briefly(self.loop)
126        self.assertEqual([1], result)
127
128        t3 = asyncio.Task(c3(result), loop=self.loop)
129
130        lock.release()
131        test_utils.run_briefly(self.loop)
132        self.assertEqual([1, 2], result)
133
134        lock.release()
135        test_utils.run_briefly(self.loop)
136        self.assertEqual([1, 2, 3], result)
137
138        self.assertTrue(t1.done())
139        self.assertTrue(t1.result())
140        self.assertTrue(t2.done())
141        self.assertTrue(t2.result())
142        self.assertTrue(t3.done())
143        self.assertTrue(t3.result())
144
145    def test_acquire_cancel(self):
146        lock = asyncio.Lock(loop=self.loop)
147        self.assertTrue(self.loop.run_until_complete(lock.acquire()))
148
149        task = asyncio.Task(lock.acquire(), loop=self.loop)
150        self.loop.call_soon(task.cancel)
151        self.assertRaises(
152            asyncio.CancelledError,
153            self.loop.run_until_complete, task)
154        self.assertFalse(lock._waiters)
155
156    def test_cancel_race(self):
157        # Several tasks:
158        # - A acquires the lock
159        # - B is blocked in acquire()
160        # - C is blocked in acquire()
161        #
162        # Now, concurrently:
163        # - B is cancelled
164        # - A releases the lock
165        #
166        # If B's waiter is marked cancelled but not yet removed from
167        # _waiters, A's release() call will crash when trying to set
168        # B's waiter; instead, it should move on to C's waiter.
169
170        # Setup: A has the lock, b and c are waiting.
171        lock = asyncio.Lock(loop=self.loop)
172
173        async def lockit(name, blocker):
174            await lock.acquire()
175            try:
176                if blocker is not None:
177                    await blocker
178            finally:
179                lock.release()
180
181        fa = asyncio.Future(loop=self.loop)
182        ta = asyncio.Task(lockit('A', fa), loop=self.loop)
183        test_utils.run_briefly(self.loop)
184        self.assertTrue(lock.locked())
185        tb = asyncio.Task(lockit('B', None), loop=self.loop)
186        test_utils.run_briefly(self.loop)
187        self.assertEqual(len(lock._waiters), 1)
188        tc = asyncio.Task(lockit('C', None), loop=self.loop)
189        test_utils.run_briefly(self.loop)
190        self.assertEqual(len(lock._waiters), 2)
191
192        # Create the race and check.
193        # Without the fix this failed at the last assert.
194        fa.set_result(None)
195        tb.cancel()
196        self.assertTrue(lock._waiters[0].cancelled())
197        test_utils.run_briefly(self.loop)
198        self.assertFalse(lock.locked())
199        self.assertTrue(ta.done())
200        self.assertTrue(tb.cancelled())
201        self.assertTrue(tc.done())
202
203    def test_cancel_release_race(self):
204        # Issue 32734
205        # Acquire 4 locks, cancel second, release first
206        # and 2 locks are taken at once.
207        lock = asyncio.Lock(loop=self.loop)
208        lock_count = 0
209        call_count = 0
210
211        async def lockit():
212            nonlocal lock_count
213            nonlocal call_count
214            call_count += 1
215            await lock.acquire()
216            lock_count += 1
217
218        async def lockandtrigger():
219            await lock.acquire()
220            self.loop.call_soon(trigger)
221
222        def trigger():
223            t1.cancel()
224            lock.release()
225
226        t0 = self.loop.create_task(lockandtrigger())
227        t1 = self.loop.create_task(lockit())
228        t2 = self.loop.create_task(lockit())
229        t3 = self.loop.create_task(lockit())
230
231        # First loop acquires all
232        test_utils.run_briefly(self.loop)
233        self.assertTrue(t0.done())
234
235        # Second loop calls trigger
236        test_utils.run_briefly(self.loop)
237        # Third loop calls cancellation
238        test_utils.run_briefly(self.loop)
239
240        # Make sure only one lock was taken
241        self.assertEqual(lock_count, 1)
242        # While 3 calls were made to lockit()
243        self.assertEqual(call_count, 3)
244        self.assertTrue(t1.cancelled() and t2.done())
245
246        # Cleanup the task that is stuck on acquire.
247        t3.cancel()
248        test_utils.run_briefly(self.loop)
249        self.assertTrue(t3.cancelled())
250
251    def test_finished_waiter_cancelled(self):
252        lock = asyncio.Lock(loop=self.loop)
253
254        ta = asyncio.Task(lock.acquire(), loop=self.loop)
255        test_utils.run_briefly(self.loop)
256        self.assertTrue(lock.locked())
257
258        tb = asyncio.Task(lock.acquire(), loop=self.loop)
259        test_utils.run_briefly(self.loop)
260        self.assertEqual(len(lock._waiters), 1)
261
262        # Create a second waiter, wake up the first, and cancel it.
263        # Without the fix, the second was not woken up.
264        tc = asyncio.Task(lock.acquire(), loop=self.loop)
265        lock.release()
266        tb.cancel()
267        test_utils.run_briefly(self.loop)
268
269        self.assertTrue(lock.locked())
270        self.assertTrue(ta.done())
271        self.assertTrue(tb.cancelled())
272
273    def test_release_not_acquired(self):
274        lock = asyncio.Lock(loop=self.loop)
275
276        self.assertRaises(RuntimeError, lock.release)
277
278    def test_release_no_waiters(self):
279        lock = asyncio.Lock(loop=self.loop)
280        self.loop.run_until_complete(lock.acquire())
281        self.assertTrue(lock.locked())
282
283        lock.release()
284        self.assertFalse(lock.locked())
285
286    def test_context_manager(self):
287        lock = asyncio.Lock(loop=self.loop)
288
289        @asyncio.coroutine
290        def acquire_lock():
291            with self.assertWarns(DeprecationWarning):
292                return (yield from lock)
293
294        with self.loop.run_until_complete(acquire_lock()):
295            self.assertTrue(lock.locked())
296
297        self.assertFalse(lock.locked())
298
299    def test_context_manager_cant_reuse(self):
300        lock = asyncio.Lock(loop=self.loop)
301
302        @asyncio.coroutine
303        def acquire_lock():
304            with self.assertWarns(DeprecationWarning):
305                return (yield from lock)
306
307        # This spells "yield from lock" outside a generator.
308        cm = self.loop.run_until_complete(acquire_lock())
309        with cm:
310            self.assertTrue(lock.locked())
311
312        self.assertFalse(lock.locked())
313
314        with self.assertRaises(AttributeError):
315            with cm:
316                pass
317
318    def test_context_manager_no_yield(self):
319        lock = asyncio.Lock(loop=self.loop)
320
321        try:
322            with lock:
323                self.fail('RuntimeError is not raised in with expression')
324        except RuntimeError as err:
325            self.assertEqual(
326                str(err),
327                '"yield from" should be used as context manager expression')
328
329        self.assertFalse(lock.locked())
330
331
332class EventTests(test_utils.TestCase):
333
334    def setUp(self):
335        super().setUp()
336        self.loop = self.new_test_loop()
337
338    def test_ctor_loop(self):
339        loop = mock.Mock()
340        ev = asyncio.Event(loop=loop)
341        self.assertIs(ev._loop, loop)
342
343        ev = asyncio.Event(loop=self.loop)
344        self.assertIs(ev._loop, self.loop)
345
346    def test_ctor_noloop(self):
347        asyncio.set_event_loop(self.loop)
348        ev = asyncio.Event()
349        self.assertIs(ev._loop, self.loop)
350
351    def test_repr(self):
352        ev = asyncio.Event(loop=self.loop)
353        self.assertTrue(repr(ev).endswith('[unset]>'))
354        match = RGX_REPR.match(repr(ev))
355        self.assertEqual(match.group('extras'), 'unset')
356
357        ev.set()
358        self.assertTrue(repr(ev).endswith('[set]>'))
359        self.assertTrue(RGX_REPR.match(repr(ev)))
360
361        ev._waiters.append(mock.Mock())
362        self.assertTrue('waiters:1' in repr(ev))
363        self.assertTrue(RGX_REPR.match(repr(ev)))
364
365    def test_wait(self):
366        ev = asyncio.Event(loop=self.loop)
367        self.assertFalse(ev.is_set())
368
369        result = []
370
371        async def c1(result):
372            if await ev.wait():
373                result.append(1)
374
375        async def c2(result):
376            if await ev.wait():
377                result.append(2)
378
379        async def c3(result):
380            if await ev.wait():
381                result.append(3)
382
383        t1 = asyncio.Task(c1(result), loop=self.loop)
384        t2 = asyncio.Task(c2(result), loop=self.loop)
385
386        test_utils.run_briefly(self.loop)
387        self.assertEqual([], result)
388
389        t3 = asyncio.Task(c3(result), loop=self.loop)
390
391        ev.set()
392        test_utils.run_briefly(self.loop)
393        self.assertEqual([3, 1, 2], result)
394
395        self.assertTrue(t1.done())
396        self.assertIsNone(t1.result())
397        self.assertTrue(t2.done())
398        self.assertIsNone(t2.result())
399        self.assertTrue(t3.done())
400        self.assertIsNone(t3.result())
401
402    def test_wait_on_set(self):
403        ev = asyncio.Event(loop=self.loop)
404        ev.set()
405
406        res = self.loop.run_until_complete(ev.wait())
407        self.assertTrue(res)
408
409    def test_wait_cancel(self):
410        ev = asyncio.Event(loop=self.loop)
411
412        wait = asyncio.Task(ev.wait(), loop=self.loop)
413        self.loop.call_soon(wait.cancel)
414        self.assertRaises(
415            asyncio.CancelledError,
416            self.loop.run_until_complete, wait)
417        self.assertFalse(ev._waiters)
418
419    def test_clear(self):
420        ev = asyncio.Event(loop=self.loop)
421        self.assertFalse(ev.is_set())
422
423        ev.set()
424        self.assertTrue(ev.is_set())
425
426        ev.clear()
427        self.assertFalse(ev.is_set())
428
429    def test_clear_with_waiters(self):
430        ev = asyncio.Event(loop=self.loop)
431        result = []
432
433        async def c1(result):
434            if await ev.wait():
435                result.append(1)
436            return True
437
438        t = asyncio.Task(c1(result), loop=self.loop)
439        test_utils.run_briefly(self.loop)
440        self.assertEqual([], result)
441
442        ev.set()
443        ev.clear()
444        self.assertFalse(ev.is_set())
445
446        ev.set()
447        ev.set()
448        self.assertEqual(1, len(ev._waiters))
449
450        test_utils.run_briefly(self.loop)
451        self.assertEqual([1], result)
452        self.assertEqual(0, len(ev._waiters))
453
454        self.assertTrue(t.done())
455        self.assertTrue(t.result())
456
457
458class ConditionTests(test_utils.TestCase):
459
460    def setUp(self):
461        super().setUp()
462        self.loop = self.new_test_loop()
463
464    def test_ctor_loop(self):
465        loop = mock.Mock()
466        cond = asyncio.Condition(loop=loop)
467        self.assertIs(cond._loop, loop)
468
469        cond = asyncio.Condition(loop=self.loop)
470        self.assertIs(cond._loop, self.loop)
471
472    def test_ctor_noloop(self):
473        asyncio.set_event_loop(self.loop)
474        cond = asyncio.Condition()
475        self.assertIs(cond._loop, self.loop)
476
477    def test_wait(self):
478        cond = asyncio.Condition(loop=self.loop)
479        result = []
480
481        async def c1(result):
482            await cond.acquire()
483            if await cond.wait():
484                result.append(1)
485            return True
486
487        async def c2(result):
488            await cond.acquire()
489            if await cond.wait():
490                result.append(2)
491            return True
492
493        async def c3(result):
494            await cond.acquire()
495            if await cond.wait():
496                result.append(3)
497            return True
498
499        t1 = asyncio.Task(c1(result), loop=self.loop)
500        t2 = asyncio.Task(c2(result), loop=self.loop)
501        t3 = asyncio.Task(c3(result), loop=self.loop)
502
503        test_utils.run_briefly(self.loop)
504        self.assertEqual([], result)
505        self.assertFalse(cond.locked())
506
507        self.assertTrue(self.loop.run_until_complete(cond.acquire()))
508        cond.notify()
509        test_utils.run_briefly(self.loop)
510        self.assertEqual([], result)
511        self.assertTrue(cond.locked())
512
513        cond.release()
514        test_utils.run_briefly(self.loop)
515        self.assertEqual([1], result)
516        self.assertTrue(cond.locked())
517
518        cond.notify(2)
519        test_utils.run_briefly(self.loop)
520        self.assertEqual([1], result)
521        self.assertTrue(cond.locked())
522
523        cond.release()
524        test_utils.run_briefly(self.loop)
525        self.assertEqual([1, 2], result)
526        self.assertTrue(cond.locked())
527
528        cond.release()
529        test_utils.run_briefly(self.loop)
530        self.assertEqual([1, 2, 3], result)
531        self.assertTrue(cond.locked())
532
533        self.assertTrue(t1.done())
534        self.assertTrue(t1.result())
535        self.assertTrue(t2.done())
536        self.assertTrue(t2.result())
537        self.assertTrue(t3.done())
538        self.assertTrue(t3.result())
539
540    def test_wait_cancel(self):
541        cond = asyncio.Condition(loop=self.loop)
542        self.loop.run_until_complete(cond.acquire())
543
544        wait = asyncio.Task(cond.wait(), loop=self.loop)
545        self.loop.call_soon(wait.cancel)
546        self.assertRaises(
547            asyncio.CancelledError,
548            self.loop.run_until_complete, wait)
549        self.assertFalse(cond._waiters)
550        self.assertTrue(cond.locked())
551
552    def test_wait_cancel_contested(self):
553        cond = asyncio.Condition(loop=self.loop)
554
555        self.loop.run_until_complete(cond.acquire())
556        self.assertTrue(cond.locked())
557
558        wait_task = asyncio.Task(cond.wait(), loop=self.loop)
559        test_utils.run_briefly(self.loop)
560        self.assertFalse(cond.locked())
561
562        # Notify, but contest the lock before cancelling
563        self.loop.run_until_complete(cond.acquire())
564        self.assertTrue(cond.locked())
565        cond.notify()
566        self.loop.call_soon(wait_task.cancel)
567        self.loop.call_soon(cond.release)
568
569        try:
570            self.loop.run_until_complete(wait_task)
571        except asyncio.CancelledError:
572            # Should not happen, since no cancellation points
573            pass
574
575        self.assertTrue(cond.locked())
576
577    def test_wait_cancel_after_notify(self):
578        # See bpo-32841
579        cond = asyncio.Condition(loop=self.loop)
580        waited = False
581
582        async def wait_on_cond():
583            nonlocal waited
584            async with cond:
585                waited = True  # Make sure this area was reached
586                await cond.wait()
587
588        waiter = asyncio.ensure_future(wait_on_cond(), loop=self.loop)
589        test_utils.run_briefly(self.loop)  # Start waiting
590
591        self.loop.run_until_complete(cond.acquire())
592        cond.notify()
593        test_utils.run_briefly(self.loop)  # Get to acquire()
594        waiter.cancel()
595        test_utils.run_briefly(self.loop)  # Activate cancellation
596        cond.release()
597        test_utils.run_briefly(self.loop)  # Cancellation should occur
598
599        self.assertTrue(waiter.cancelled())
600        self.assertTrue(waited)
601
602    def test_wait_unacquired(self):
603        cond = asyncio.Condition(loop=self.loop)
604        self.assertRaises(
605            RuntimeError,
606            self.loop.run_until_complete, cond.wait())
607
608    def test_wait_for(self):
609        cond = asyncio.Condition(loop=self.loop)
610        presult = False
611
612        def predicate():
613            return presult
614
615        result = []
616
617        async def c1(result):
618            await cond.acquire()
619            if await cond.wait_for(predicate):
620                result.append(1)
621                cond.release()
622            return True
623
624        t = asyncio.Task(c1(result), loop=self.loop)
625
626        test_utils.run_briefly(self.loop)
627        self.assertEqual([], result)
628
629        self.loop.run_until_complete(cond.acquire())
630        cond.notify()
631        cond.release()
632        test_utils.run_briefly(self.loop)
633        self.assertEqual([], result)
634
635        presult = True
636        self.loop.run_until_complete(cond.acquire())
637        cond.notify()
638        cond.release()
639        test_utils.run_briefly(self.loop)
640        self.assertEqual([1], result)
641
642        self.assertTrue(t.done())
643        self.assertTrue(t.result())
644
645    def test_wait_for_unacquired(self):
646        cond = asyncio.Condition(loop=self.loop)
647
648        # predicate can return true immediately
649        res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3]))
650        self.assertEqual([1, 2, 3], res)
651
652        self.assertRaises(
653            RuntimeError,
654            self.loop.run_until_complete,
655            cond.wait_for(lambda: False))
656
657    def test_notify(self):
658        cond = asyncio.Condition(loop=self.loop)
659        result = []
660
661        async def c1(result):
662            await cond.acquire()
663            if await cond.wait():
664                result.append(1)
665                cond.release()
666            return True
667
668        async def c2(result):
669            await cond.acquire()
670            if await cond.wait():
671                result.append(2)
672                cond.release()
673            return True
674
675        async def c3(result):
676            await cond.acquire()
677            if await cond.wait():
678                result.append(3)
679                cond.release()
680            return True
681
682        t1 = asyncio.Task(c1(result), loop=self.loop)
683        t2 = asyncio.Task(c2(result), loop=self.loop)
684        t3 = asyncio.Task(c3(result), loop=self.loop)
685
686        test_utils.run_briefly(self.loop)
687        self.assertEqual([], result)
688
689        self.loop.run_until_complete(cond.acquire())
690        cond.notify(1)
691        cond.release()
692        test_utils.run_briefly(self.loop)
693        self.assertEqual([1], result)
694
695        self.loop.run_until_complete(cond.acquire())
696        cond.notify(1)
697        cond.notify(2048)
698        cond.release()
699        test_utils.run_briefly(self.loop)
700        self.assertEqual([1, 2, 3], result)
701
702        self.assertTrue(t1.done())
703        self.assertTrue(t1.result())
704        self.assertTrue(t2.done())
705        self.assertTrue(t2.result())
706        self.assertTrue(t3.done())
707        self.assertTrue(t3.result())
708
709    def test_notify_all(self):
710        cond = asyncio.Condition(loop=self.loop)
711
712        result = []
713
714        async def c1(result):
715            await cond.acquire()
716            if await cond.wait():
717                result.append(1)
718                cond.release()
719            return True
720
721        async def c2(result):
722            await cond.acquire()
723            if await cond.wait():
724                result.append(2)
725                cond.release()
726            return True
727
728        t1 = asyncio.Task(c1(result), loop=self.loop)
729        t2 = asyncio.Task(c2(result), loop=self.loop)
730
731        test_utils.run_briefly(self.loop)
732        self.assertEqual([], result)
733
734        self.loop.run_until_complete(cond.acquire())
735        cond.notify_all()
736        cond.release()
737        test_utils.run_briefly(self.loop)
738        self.assertEqual([1, 2], result)
739
740        self.assertTrue(t1.done())
741        self.assertTrue(t1.result())
742        self.assertTrue(t2.done())
743        self.assertTrue(t2.result())
744
745    def test_notify_unacquired(self):
746        cond = asyncio.Condition(loop=self.loop)
747        self.assertRaises(RuntimeError, cond.notify)
748
749    def test_notify_all_unacquired(self):
750        cond = asyncio.Condition(loop=self.loop)
751        self.assertRaises(RuntimeError, cond.notify_all)
752
753    def test_repr(self):
754        cond = asyncio.Condition(loop=self.loop)
755        self.assertTrue('unlocked' in repr(cond))
756        self.assertTrue(RGX_REPR.match(repr(cond)))
757
758        self.loop.run_until_complete(cond.acquire())
759        self.assertTrue('locked' in repr(cond))
760
761        cond._waiters.append(mock.Mock())
762        self.assertTrue('waiters:1' in repr(cond))
763        self.assertTrue(RGX_REPR.match(repr(cond)))
764
765        cond._waiters.append(mock.Mock())
766        self.assertTrue('waiters:2' in repr(cond))
767        self.assertTrue(RGX_REPR.match(repr(cond)))
768
769    def test_context_manager(self):
770        cond = asyncio.Condition(loop=self.loop)
771
772        @asyncio.coroutine
773        def acquire_cond():
774            with self.assertWarns(DeprecationWarning):
775                return (yield from cond)
776
777        with self.loop.run_until_complete(acquire_cond()):
778            self.assertTrue(cond.locked())
779
780        self.assertFalse(cond.locked())
781
782    def test_context_manager_no_yield(self):
783        cond = asyncio.Condition(loop=self.loop)
784
785        try:
786            with cond:
787                self.fail('RuntimeError is not raised in with expression')
788        except RuntimeError as err:
789            self.assertEqual(
790                str(err),
791                '"yield from" should be used as context manager expression')
792
793        self.assertFalse(cond.locked())
794
795    def test_explicit_lock(self):
796        lock = asyncio.Lock(loop=self.loop)
797        cond = asyncio.Condition(lock, loop=self.loop)
798
799        self.assertIs(cond._lock, lock)
800        self.assertIs(cond._loop, lock._loop)
801
802    def test_ambiguous_loops(self):
803        loop = self.new_test_loop()
804        self.addCleanup(loop.close)
805
806        lock = asyncio.Lock(loop=self.loop)
807        with self.assertRaises(ValueError):
808            asyncio.Condition(lock, loop=loop)
809
810    def test_timeout_in_block(self):
811        loop = asyncio.new_event_loop()
812        self.addCleanup(loop.close)
813
814        async def task_timeout():
815            condition = asyncio.Condition(loop=loop)
816            async with condition:
817                with self.assertRaises(asyncio.TimeoutError):
818                    await asyncio.wait_for(condition.wait(), timeout=0.5,
819                                           loop=loop)
820
821        loop.run_until_complete(task_timeout())
822
823
824class SemaphoreTests(test_utils.TestCase):
825
826    def setUp(self):
827        super().setUp()
828        self.loop = self.new_test_loop()
829
830    def test_ctor_loop(self):
831        loop = mock.Mock()
832        sem = asyncio.Semaphore(loop=loop)
833        self.assertIs(sem._loop, loop)
834
835        sem = asyncio.Semaphore(loop=self.loop)
836        self.assertIs(sem._loop, self.loop)
837
838    def test_ctor_noloop(self):
839        asyncio.set_event_loop(self.loop)
840        sem = asyncio.Semaphore()
841        self.assertIs(sem._loop, self.loop)
842
843    def test_initial_value_zero(self):
844        sem = asyncio.Semaphore(0, loop=self.loop)
845        self.assertTrue(sem.locked())
846
847    def test_repr(self):
848        sem = asyncio.Semaphore(loop=self.loop)
849        self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
850        self.assertTrue(RGX_REPR.match(repr(sem)))
851
852        self.loop.run_until_complete(sem.acquire())
853        self.assertTrue(repr(sem).endswith('[locked]>'))
854        self.assertTrue('waiters' not in repr(sem))
855        self.assertTrue(RGX_REPR.match(repr(sem)))
856
857        sem._waiters.append(mock.Mock())
858        self.assertTrue('waiters:1' in repr(sem))
859        self.assertTrue(RGX_REPR.match(repr(sem)))
860
861        sem._waiters.append(mock.Mock())
862        self.assertTrue('waiters:2' in repr(sem))
863        self.assertTrue(RGX_REPR.match(repr(sem)))
864
865    def test_semaphore(self):
866        sem = asyncio.Semaphore(loop=self.loop)
867        self.assertEqual(1, sem._value)
868
869        @asyncio.coroutine
870        def acquire_lock():
871            with self.assertWarns(DeprecationWarning):
872                return (yield from sem)
873
874        res = self.loop.run_until_complete(acquire_lock())
875
876        self.assertTrue(res)
877        self.assertTrue(sem.locked())
878        self.assertEqual(0, sem._value)
879
880        sem.release()
881        self.assertFalse(sem.locked())
882        self.assertEqual(1, sem._value)
883
884    def test_semaphore_value(self):
885        self.assertRaises(ValueError, asyncio.Semaphore, -1)
886
887    def test_acquire(self):
888        sem = asyncio.Semaphore(3, loop=self.loop)
889        result = []
890
891        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
892        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
893        self.assertFalse(sem.locked())
894
895        async def c1(result):
896            await sem.acquire()
897            result.append(1)
898            return True
899
900        async def c2(result):
901            await sem.acquire()
902            result.append(2)
903            return True
904
905        async def c3(result):
906            await sem.acquire()
907            result.append(3)
908            return True
909
910        async def c4(result):
911            await sem.acquire()
912            result.append(4)
913            return True
914
915        t1 = asyncio.Task(c1(result), loop=self.loop)
916        t2 = asyncio.Task(c2(result), loop=self.loop)
917        t3 = asyncio.Task(c3(result), loop=self.loop)
918
919        test_utils.run_briefly(self.loop)
920        self.assertEqual([1], result)
921        self.assertTrue(sem.locked())
922        self.assertEqual(2, len(sem._waiters))
923        self.assertEqual(0, sem._value)
924
925        t4 = asyncio.Task(c4(result), loop=self.loop)
926
927        sem.release()
928        sem.release()
929        self.assertEqual(2, sem._value)
930
931        test_utils.run_briefly(self.loop)
932        self.assertEqual(0, sem._value)
933        self.assertEqual(3, len(result))
934        self.assertTrue(sem.locked())
935        self.assertEqual(1, len(sem._waiters))
936        self.assertEqual(0, sem._value)
937
938        self.assertTrue(t1.done())
939        self.assertTrue(t1.result())
940        race_tasks = [t2, t3, t4]
941        done_tasks = [t for t in race_tasks if t.done() and t.result()]
942        self.assertTrue(2, len(done_tasks))
943
944        # cleanup locked semaphore
945        sem.release()
946        self.loop.run_until_complete(asyncio.gather(*race_tasks))
947
948    def test_acquire_cancel(self):
949        sem = asyncio.Semaphore(loop=self.loop)
950        self.loop.run_until_complete(sem.acquire())
951
952        acquire = asyncio.Task(sem.acquire(), loop=self.loop)
953        self.loop.call_soon(acquire.cancel)
954        self.assertRaises(
955            asyncio.CancelledError,
956            self.loop.run_until_complete, acquire)
957        self.assertTrue((not sem._waiters) or
958                        all(waiter.done() for waiter in sem._waiters))
959
960    def test_acquire_cancel_before_awoken(self):
961        sem = asyncio.Semaphore(value=0, loop=self.loop)
962
963        t1 = asyncio.Task(sem.acquire(), loop=self.loop)
964        t2 = asyncio.Task(sem.acquire(), loop=self.loop)
965        t3 = asyncio.Task(sem.acquire(), loop=self.loop)
966        t4 = asyncio.Task(sem.acquire(), loop=self.loop)
967
968        test_utils.run_briefly(self.loop)
969
970        sem.release()
971        t1.cancel()
972        t2.cancel()
973
974        test_utils.run_briefly(self.loop)
975        num_done = sum(t.done() for t in [t3, t4])
976        self.assertEqual(num_done, 1)
977
978        t3.cancel()
979        t4.cancel()
980        test_utils.run_briefly(self.loop)
981
982    def test_acquire_hang(self):
983        sem = asyncio.Semaphore(value=0, loop=self.loop)
984
985        t1 = asyncio.Task(sem.acquire(), loop=self.loop)
986        t2 = asyncio.Task(sem.acquire(), loop=self.loop)
987
988        test_utils.run_briefly(self.loop)
989
990        sem.release()
991        t1.cancel()
992
993        test_utils.run_briefly(self.loop)
994        self.assertTrue(sem.locked())
995
996    def test_release_not_acquired(self):
997        sem = asyncio.BoundedSemaphore(loop=self.loop)
998
999        self.assertRaises(ValueError, sem.release)
1000
1001    def test_release_no_waiters(self):
1002        sem = asyncio.Semaphore(loop=self.loop)
1003        self.loop.run_until_complete(sem.acquire())
1004        self.assertTrue(sem.locked())
1005
1006        sem.release()
1007        self.assertFalse(sem.locked())
1008
1009    def test_context_manager(self):
1010        sem = asyncio.Semaphore(2, loop=self.loop)
1011
1012        @asyncio.coroutine
1013        def acquire_lock():
1014            with self.assertWarns(DeprecationWarning):
1015                return (yield from sem)
1016
1017        with self.loop.run_until_complete(acquire_lock()):
1018            self.assertFalse(sem.locked())
1019            self.assertEqual(1, sem._value)
1020
1021            with self.loop.run_until_complete(acquire_lock()):
1022                self.assertTrue(sem.locked())
1023
1024        self.assertEqual(2, sem._value)
1025
1026    def test_context_manager_no_yield(self):
1027        sem = asyncio.Semaphore(2, loop=self.loop)
1028
1029        try:
1030            with sem:
1031                self.fail('RuntimeError is not raised in with expression')
1032        except RuntimeError as err:
1033            self.assertEqual(
1034                str(err),
1035                '"yield from" should be used as context manager expression')
1036
1037        self.assertEqual(2, sem._value)
1038
1039
1040if __name__ == '__main__':
1041    unittest.main()
1042