1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import org.junit.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 import kotlin.test.*
11 
12 class JoinStressTest : TestBase() {
13 
14     private val iterations = 50_000 * stressTestMultiplier
15     private val pool = newFixedThreadPoolContext(3, "JoinStressTest")
16 
17     @After
tearDownnull18     fun tearDown() {
19         pool.close()
20     }
21 
22     @Test
<lambda>null23     fun testExceptionalJoinWithCancellation() = runBlocking {
24         val results = IntArray(2)
25 
26         repeat(iterations) {
27             val barrier = CyclicBarrier(3)
28             val exceptionalJob = async(pool + NonCancellable) {
29                 barrier.await()
30                 throw TestException()
31             }
32 
33 
34             val awaiterJob = async(pool) {
35                 barrier.await()
36                 try {
37                     exceptionalJob.await()
38                 } catch (e: TestException) {
39                     0
40                 } catch (e: CancellationException) {
41                     1
42                 }
43             }
44 
45             barrier.await()
46             exceptionalJob.cancel()
47             ++results[awaiterJob.await()]
48         }
49 
50         // Check that concurrent cancellation of job which throws TestException without suspends doesn't suppress TestException
51         assertEquals(iterations, results[0], results.toList().toString())
52         assertEquals(0, results[1], results.toList().toString())
53     }
54 
55     @Test
<lambda>null56     fun testExceptionalJoinWithMultipleCancellations() = runBlocking {
57         val results = IntArray(2)
58 
59         repeat(iterations) {
60             val barrier = CyclicBarrier(4)
61             val exceptionalJob = async<Unit>(pool + NonCancellable) {
62                 barrier.await()
63                 throw TestException()
64             }
65 
66             val awaiterJob = async(pool) {
67                 barrier.await()
68                 try {
69                     exceptionalJob.await()
70                     2
71                 } catch (e: TestException) {
72                     0
73                 } catch (e: TestException1) {
74                     1
75                 }
76             }
77 
78             val canceller = async(pool + NonCancellable) {
79                 barrier.await()
80                 // cast for test purposes only
81                 (exceptionalJob as AbstractCoroutine<*>).cancelInternal(TestException1())
82             }
83 
84             barrier.await()
85             val awaiterResult = awaiterJob.await()
86             canceller.await()
87             ++results[awaiterResult]
88         }
89 
90         assertTrue(results[0] > 0, results.toList().toString())
91         assertTrue(results[1] > 0, results.toList().toString())
92     }
93 }
94