1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import org.junit.* 9 import java.util.concurrent.* 10 import kotlin.test.* 11 import kotlin.test.Test 12 13 class CancellableContinuationResumeCloseStressTest : TestBase() { 14 @get:Rule 15 public val dispatcher = ExecutorRule(2) 16 17 private val startBarrier = CyclicBarrier(3) 18 private val doneBarrier = CyclicBarrier(2) 19 private val nRepeats = 1_000 * stressTestMultiplier 20 21 private val closed = atomic(false) 22 private var returnedOk = false 23 24 @Test 25 @Suppress("BlockingMethodInNonBlockingContext") 26 fun testStress() = runTest { 27 repeat(nRepeats) { 28 closed.value = false 29 returnedOk = false 30 val job = testJob() 31 startBarrier.await() 32 job.cancel() // (1) cancel job 33 job.join() 34 // check consistency 35 doneBarrier.await() 36 if (returnedOk) { 37 assertFalse(closed.value, "should not have closed resource -- returned Ok") 38 } else { 39 assertTrue(closed.value, "should have closed resource -- was cancelled") 40 } 41 } 42 } 43 44 private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) { 45 val ok = resumeClose() // might be cancelled 46 assertEquals("OK", ok) 47 returnedOk = true 48 } 49 50 private suspend fun resumeClose() = suspendCancellableCoroutine<String> { cont -> 51 dispatcher.executor.execute { 52 startBarrier.await() // (2) resume at the same time 53 cont.resume("OK") { 54 close() 55 } 56 doneBarrier.await() 57 } 58 startBarrier.await() // (3) return at the same time 59 } 60 61 fun close() { 62 assertFalse(closed.getAndSet(true)) 63 } 64 } 65