1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import org.junit.* 8 import org.junit.Test 9 import java.util.concurrent.* 10 import kotlin.test.* 11 12 class JoinStressTest : TestBase() { 13 14 private val iterations = 50_000 * stressTestMultiplier 15 private val pool = newFixedThreadPoolContext(3, "JoinStressTest") 16 17 @After 18 fun tearDown() { 19 pool.close() 20 } 21 22 @Test 23 fun testExceptionalJoinWithCancellation() = runBlocking { 24 val results = IntArray(2) 25 26 repeat(iterations) { 27 val barrier = CyclicBarrier(3) 28 val exceptionalJob = async(pool + NonCancellable) { 29 barrier.await() 30 throw TestException() 31 } 32 33 34 val awaiterJob = async(pool) { 35 barrier.await() 36 try { 37 exceptionalJob.await() 38 } catch (e: TestException) { 39 0 40 } catch (e: CancellationException) { 41 1 42 } 43 } 44 45 barrier.await() 46 exceptionalJob.cancel() 47 ++results[awaiterJob.await()] 48 } 49 50 // Check that concurrent cancellation of job which throws TestException without suspends doesn't suppress TestException 51 assertEquals(iterations, results[0], results.toList().toString()) 52 assertEquals(0, results[1], results.toList().toString()) 53 } 54 55 @Test 56 fun testExceptionalJoinWithMultipleCancellations() = runBlocking { 57 val results = IntArray(2) 58 var successfulCancellations = 0 59 60 repeat(iterations) { 61 val barrier = CyclicBarrier(4) 62 val exceptionalJob = async(pool + NonCancellable) { 63 barrier.await() 64 throw TestException() 65 } 66 67 val awaiterJob = async(pool) { 68 barrier.await() 69 try { 70 exceptionalJob.await() 71 } catch (e: TestException) { 72 0 73 } catch (e: TestException1) { 74 1 75 } 76 } 77 78 val canceller = async(pool + NonCancellable) { 79 barrier.await() 80 // cast for test purposes only 81 (exceptionalJob as AbstractCoroutine<*>).cancelInternal(TestException1()) 82 } 83 84 barrier.await() 85 val awaiterResult = awaiterJob.await() 86 val cancellerResult = canceller.await() 87 if (awaiterResult == 1) { 88 assertTrue(cancellerResult) 89 } 90 ++results[awaiterResult] 91 92 if (cancellerResult) { 93 ++successfulCancellations 94 } 95 } 96 97 assertTrue(results[0] > 0, results.toList().toString()) 98 assertTrue(results[1] > 0, results.toList().toString()) 99 require(successfulCancellations > 0) { "Cancellation never succeeds, something wrong with stress test infra" } 100 } 101 } 102