1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import org.junit.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 import kotlin.test.*
11 
12 class JoinStressTest : TestBase() {
13 
14     private val iterations = 50_000 * stressTestMultiplier
15     private val pool = newFixedThreadPoolContext(3, "JoinStressTest")
16 
17     @After
18     fun tearDown() {
19         pool.close()
20     }
21 
22     @Test
23     fun testExceptionalJoinWithCancellation() = runBlocking {
24         val results = IntArray(2)
25 
26         repeat(iterations) {
27             val barrier = CyclicBarrier(3)
28             val exceptionalJob = async(pool + NonCancellable) {
29                 barrier.await()
30                 throw TestException()
31             }
32 
33 
34             val awaiterJob = async(pool) {
35                 barrier.await()
36                 try {
37                     exceptionalJob.await()
38                 } catch (e: TestException) {
39                     0
40                 } catch (e: CancellationException) {
41                     1
42                 }
43             }
44 
45             barrier.await()
46             exceptionalJob.cancel()
47             ++results[awaiterJob.await()]
48         }
49 
50         // Check that concurrent cancellation of job which throws TestException without suspends doesn't suppress TestException
51         assertEquals(iterations, results[0], results.toList().toString())
52         assertEquals(0, results[1], results.toList().toString())
53     }
54 
55     @Test
56     fun testExceptionalJoinWithMultipleCancellations() = runBlocking {
57         val results = IntArray(2)
58         var successfulCancellations = 0
59 
60         repeat(iterations) {
61             val barrier = CyclicBarrier(4)
62             val exceptionalJob = async(pool + NonCancellable) {
63                 barrier.await()
64                 throw TestException()
65             }
66 
67             val awaiterJob = async(pool) {
68                 barrier.await()
69                 try {
70                     exceptionalJob.await()
71                 } catch (e: TestException) {
72                     0
73                 } catch (e: TestException1) {
74                     1
75                 }
76             }
77 
78             val canceller = async(pool + NonCancellable) {
79                 barrier.await()
80                 // cast for test purposes only
81                 (exceptionalJob as AbstractCoroutine<*>).cancelInternal(TestException1())
82             }
83 
84             barrier.await()
85             val awaiterResult = awaiterJob.await()
86             val cancellerResult = canceller.await()
87             if (awaiterResult == 1) {
88                 assertTrue(cancellerResult)
89             }
90             ++results[awaiterResult]
91 
92             if (cancellerResult) {
93                 ++successfulCancellations
94             }
95         }
96 
97         assertTrue(results[0] > 0, results.toList().toString())
98         assertTrue(results[1] > 0, results.toList().toString())
99         require(successfulCancellations > 0) { "Cancellation never succeeds, something wrong with stress test infra" }
100     }
101 }
102