/* * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines import kotlinx.atomicfu.* import org.junit.* import java.util.concurrent.* import kotlin.test.* import kotlin.test.Test class CancellableContinuationResumeCloseStressTest : TestBase() { @get:Rule public val dispatcher = ExecutorRule(2) private val startBarrier = CyclicBarrier(3) private val doneBarrier = CyclicBarrier(2) private val nRepeats = 1_000 * stressTestMultiplier private val closed = atomic(false) private var returnedOk = false @Test @Suppress("BlockingMethodInNonBlockingContext") fun testStress() = runTest { repeat(nRepeats) { closed.value = false returnedOk = false val job = testJob() startBarrier.await() job.cancel() // (1) cancel job job.join() // check consistency doneBarrier.await() if (returnedOk) { assertFalse(closed.value, "should not have closed resource -- returned Ok") } else { assertTrue(closed.value, "should have closed resource -- was cancelled") } } } private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) { val ok = resumeClose() // might be cancelled assertEquals("OK", ok) returnedOk = true } private suspend fun resumeClose() = suspendCancellableCoroutine { cont -> dispatcher.executor.execute { startBarrier.await() // (2) resume at the same time cont.resume("OK") { close() } doneBarrier.await() } startBarrier.await() // (3) return at the same time } fun close() { assertFalse(closed.getAndSet(true)) } }