1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.coroutines.channels.* 8 import kotlinx.coroutines.selects.* 9 import org.junit.Test 10 import kotlin.coroutines.* 11 import kotlin.test.* 12 13 class ReusableCancellableContinuationTest : TestBase() { 14 @Test <lambda>null15 fun testReusable() = runTest { 16 testContinuationsCount(10, 1, ::suspendCancellableCoroutineReusable) 17 } 18 19 @Test <lambda>null20 fun testRegular() = runTest { 21 testContinuationsCount(10, 10, ::suspendCancellableCoroutine) 22 } 23 testContinuationsCountnull24 private suspend inline fun CoroutineScope.testContinuationsCount( 25 iterations: Int, 26 expectedInstances: Int, 27 suspender: suspend ((CancellableContinuation<Unit>) -> Unit) -> Unit 28 ) { 29 val result = mutableSetOf<CancellableContinuation<*>>() 30 val job = coroutineContext[Job]!! 31 val channel = Channel<Continuation<Unit>>(1) 32 launch { 33 channel.consumeEach { 34 val f = FieldWalker.walk(job) 35 result.addAll(f.filterIsInstance<CancellableContinuation<*>>()) 36 it.resumeWith(Result.success(Unit)) 37 } 38 } 39 40 repeat(iterations) { 41 suspender { 42 assertTrue(channel.offer(it)) 43 } 44 } 45 channel.close() 46 assertEquals(expectedInstances, result.size - 1) 47 } 48 49 @Test <lambda>null50 fun testCancelledOnClaimedCancel() = runTest { 51 expect(1) 52 try { 53 suspendCancellableCoroutineReusable<Unit> { 54 it.cancel() 55 } 56 expectUnreached() 57 } catch (e: CancellationException) { 58 finish(2) 59 } 60 } 61 62 @Test <lambda>null63 fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) { 64 expect(1) 65 // Bind child at first 66 var continuation: Continuation<*>? = null <lambda>null67 suspendCancellableCoroutineReusable<Unit> { 68 expect(2) 69 continuation = it 70 launch { // Attach to the parent, avoid fast path 71 expect(3) 72 it.resume(Unit) 73 } 74 } 75 expect(4) 76 ensureActive() 77 // Verify child was bound <lambda>null78 FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it === continuation } 79 try { <lambda>null80 suspendCancellableCoroutineReusable<Unit> { 81 expect(5) 82 coroutineContext[Job]!!.cancel() 83 it.resume(Unit) // will not dispatch, will get CancellationException 84 } 85 } catch (e: CancellationException) { 86 assertFalse(isActive) 87 finish(6) 88 } 89 } 90 91 @Test <lambda>null92 fun testResumeReusablePreservesReference() = runTest { 93 expect(1) 94 var cont: Continuation<Unit>? = null 95 launch { 96 cont!!.resumeWith(Result.success(Unit)) 97 } 98 suspendCancellableCoroutineReusable<Unit> { 99 cont = it 100 } 101 ensureActive() 102 assertTrue { FieldWalker.walk(coroutineContext[Job]).contains(cont!!) } 103 finish(2) 104 } 105 106 @Test <lambda>null107 fun testResumeRegularDoesntPreservesReference() = runTest { 108 expect(1) 109 var cont: Continuation<Unit>? = null 110 launch { // Attach to the parent, avoid fast path 111 cont!!.resumeWith(Result.success(Unit)) 112 } 113 suspendCancellableCoroutine<Unit> { 114 cont = it 115 } 116 ensureActive() 117 FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont } 118 finish(2) 119 } 120 121 @Test testDetachedOnCancelnull122 fun testDetachedOnCancel() = runTest { 123 expect(1) 124 var cont: Continuation<*>? = null 125 try { 126 suspendCancellableCoroutineReusable<Unit> { 127 cont = it 128 it.cancel() 129 } 130 expectUnreached() 131 } catch (e: CancellationException) { 132 FieldWalker.assertReachableCount(0, coroutineContext[Job]) { it === cont } 133 finish(2) 134 } 135 } 136 137 @Test <lambda>null138 fun testPropagatedCancel() = runTest({it is CancellationException}) { 139 val currentJob = coroutineContext[Job]!! 140 expect(1) 141 // Bind child at first <lambda>null142 suspendCancellableCoroutineReusable<Unit> { 143 expect(2) 144 // Attach to the parent, avoid fast path 145 launch { 146 expect(3) 147 it.resume(Unit) 148 } 149 } 150 expect(4) 151 ensureActive() 152 // Verify child was bound <lambda>null153 FieldWalker.assertReachableCount(1, currentJob) { it is CancellableContinuation<*> } 154 currentJob.cancel() 155 assertFalse(isActive) 156 // Child detached <lambda>null157 FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } 158 expect(5) 159 try { 160 // Resume is non-atomic, so it throws cancellation exception <lambda>null161 suspendCancellableCoroutineReusable<Unit> { 162 expect(6) // but the code inside the block is executed 163 it.resume(Unit) 164 } 165 } catch (e: CancellationException) { <lambda>null166 FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } 167 expect(7) 168 } 169 try { 170 // No resume -- still cancellation exception <lambda>null171 suspendCancellableCoroutineReusable<Unit> {} 172 } catch (e: CancellationException) { <lambda>null173 FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } 174 finish(8) 175 } 176 } 177 178 @Test <lambda>null179 fun testChannelMemoryLeak() = runTest { 180 val iterations = 100 181 val channel = Channel<Unit>() 182 launch { 183 repeat(iterations) { 184 select { 185 channel.onSend(Unit) {} 186 } 187 } 188 } 189 190 val receiver = launch { 191 repeat(iterations) { 192 channel.receive() 193 } 194 expect(2) 195 val job = coroutineContext[Job]!! 196 // 1 for reusable CC, another one for outer joiner 197 FieldWalker.assertReachableCount(2, job) { it is CancellableContinuation<*> } 198 } 199 expect(1) 200 receiver.join() 201 // Reference should be claimed at this point 202 FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> } 203 finish(3) 204 } 205 206 @Test <lambda>null207 fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest { 208 val channel = produce { 209 repeat(10) { 210 send(Unit) 211 } 212 } 213 for (value in channel) { 214 delay(1) 215 } 216 FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it is ChildContinuation } 217 } 218 } 219