1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.selects.*
9 import org.junit.Test
10 import kotlin.coroutines.*
11 import kotlin.test.*
12 
13 class ReusableCancellableContinuationTest : TestBase() {
14     @Test
<lambda>null15     fun testReusable() = runTest {
16         testContinuationsCount(10, 1, ::suspendCancellableCoroutineReusable)
17     }
18 
19     @Test
<lambda>null20     fun testRegular() = runTest {
21         testContinuationsCount(10, 10, ::suspendCancellableCoroutine)
22     }
23 
testContinuationsCountnull24     private suspend inline fun CoroutineScope.testContinuationsCount(
25         iterations: Int,
26         expectedInstances: Int,
27         suspender: suspend ((CancellableContinuation<Unit>) -> Unit) -> Unit
28     ) {
29         val result = mutableSetOf<CancellableContinuation<*>>()
30         val job = coroutineContext[Job]!!
31         val channel = Channel<Continuation<Unit>>(1)
32         launch {
33             channel.consumeEach {
34                 val f = FieldWalker.walk(job)
35                 result.addAll(f.filterIsInstance<CancellableContinuation<*>>())
36                 it.resumeWith(Result.success(Unit))
37             }
38         }
39 
40         repeat(iterations) {
41             suspender {
42                 assertTrue(channel.offer(it))
43             }
44         }
45         channel.close()
46         assertEquals(expectedInstances, result.size - 1)
47     }
48 
49     @Test
<lambda>null50     fun testCancelledOnClaimedCancel() = runTest {
51         expect(1)
52         try {
53             suspendCancellableCoroutineReusable<Unit> {
54                 it.cancel()
55             }
56             expectUnreached()
57         } catch (e: CancellationException) {
58             finish(2)
59         }
60     }
61 
62     @Test
<lambda>null63     fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) {
64         expect(1)
65         // Bind child at first
66         var continuation: Continuation<*>? = null
<lambda>null67         suspendCancellableCoroutineReusable<Unit> {
68             expect(2)
69             continuation = it
70             launch {  // Attach to the parent, avoid fast path
71                 expect(3)
72                 it.resume(Unit)
73             }
74         }
75         expect(4)
76         ensureActive()
77         // Verify child was bound
<lambda>null78         FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it === continuation }
79         try {
<lambda>null80             suspendCancellableCoroutineReusable<Unit> {
81                 expect(5)
82                 coroutineContext[Job]!!.cancel()
83                 it.resume(Unit) // will not dispatch, will get CancellationException
84             }
85         } catch (e: CancellationException) {
86             assertFalse(isActive)
87             finish(6)
88         }
89     }
90 
91     @Test
<lambda>null92     fun testResumeReusablePreservesReference() = runTest {
93         expect(1)
94         var cont: Continuation<Unit>? = null
95         launch {
96             cont!!.resumeWith(Result.success(Unit))
97         }
98         suspendCancellableCoroutineReusable<Unit> {
99             cont = it
100         }
101         ensureActive()
102         assertTrue { FieldWalker.walk(coroutineContext[Job]).contains(cont!!) }
103         finish(2)
104     }
105 
106     @Test
<lambda>null107     fun testResumeRegularDoesntPreservesReference() = runTest {
108         expect(1)
109         var cont: Continuation<Unit>? = null
110         launch { // Attach to the parent, avoid fast path
111             cont!!.resumeWith(Result.success(Unit))
112         }
113         suspendCancellableCoroutine<Unit> {
114             cont = it
115         }
116         ensureActive()
117         FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont }
118         finish(2)
119     }
120 
121     @Test
testDetachedOnCancelnull122     fun testDetachedOnCancel() = runTest {
123         expect(1)
124         var cont: Continuation<*>? = null
125         try {
126             suspendCancellableCoroutineReusable<Unit> {
127                 cont = it
128                 it.cancel()
129             }
130             expectUnreached()
131         } catch (e: CancellationException) {
132             FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont }
133             finish(2)
134         }
135     }
136 
137     @Test
<lambda>null138     fun testPropagatedCancel() = runTest({it is CancellationException}) {
139         val currentJob = coroutineContext[Job]!!
140         expect(1)
141         // Bind child at first
<lambda>null142         suspendCancellableCoroutineReusable<Unit> {
143             expect(2)
144             // Attach to the parent, avoid fast path
145             launch {
146                 expect(3)
147                 it.resume(Unit)
148             }
149         }
150         expect(4)
151         ensureActive()
152         // Verify child was bound
<lambda>null153         FieldWalker.assertReachableCount(1, currentJob) { it is CancellableContinuation<*> }
154         currentJob.cancel()
155         assertFalse(isActive)
156         // Child detached
<lambda>null157         FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
158         expect(5)
159         try {
160             // Resume is non-atomic, so it throws cancellation exception
<lambda>null161             suspendCancellableCoroutineReusable<Unit> {
162                 expect(6) // but the code inside the block is executed
163                 it.resume(Unit)
164             }
165         } catch (e: CancellationException) {
<lambda>null166             FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
167             expect(7)
168         }
169         try {
170             // No resume -- still cancellation exception
<lambda>null171             suspendCancellableCoroutineReusable<Unit> {}
172         } catch (e: CancellationException) {
<lambda>null173             FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> }
174             finish(8)
175         }
176     }
177 
178     @Test
<lambda>null179     fun testChannelMemoryLeak() = runTest {
180         val iterations = 100
181         val channel = Channel<Unit>()
182         launch {
183             repeat(iterations) {
184                 select {
185                     channel.onSend(Unit) {}
186                 }
187             }
188         }
189 
190         val receiver = launch {
191             repeat(iterations) {
192                 channel.receive()
193             }
194             expect(2)
195             val job = coroutineContext[Job]!!
196             // 1 for reusable CC, another one for outer joiner
197             FieldWalker.assertReachableCount(2, job) { it is CancellableContinuation<*> }
198         }
199         expect(1)
200         receiver.join()
201         // Reference should be claimed at this point
202         FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> }
203         finish(3)
204     }
205 
206     @Test
<lambda>null207     fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest {
208         val channel =  produce {
209             repeat(10) {
210                 send(Unit)
211             }
212         }
213         for (value in channel) {
214             delay(1)
215         }
216         FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it is ChildContinuation }
217     }
218 }
219