1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import java.util.*
9 import java.util.concurrent.*
10 import kotlin.concurrent.*
11 import kotlin.test.*
12 
13 class JobHandlersUpgradeStressTest : TestBase() {
14     private val nSeconds = 3 * stressTestMultiplier
15     private val nThreads = 4
16 
17     private val cyclicBarrier = CyclicBarrier(1 + nThreads)
18     private val threads = mutableListOf<Thread>()
19 
20     private val inters = atomic(0)
21     private val removed = atomic(0)
22     private val fired = atomic(0)
23 
24     private val sink = atomic(0)
25 
26     @Volatile
27     private var done = false
28 
29     @Volatile
30     private var job: Job? = null
31 
32     class State {
33         val state = atomic(0)
34     }
35 
36     @Test
37     fun testStress() {
38         println("--- JobHandlersUpgradeStressTest")
39         threads += thread(name = "creator", start = false) {
40             val rnd = Random()
41             while (true) {
42                 job = if (done) null else Job()
43                 cyclicBarrier.await()
44                 val job = job ?: break
45                 // burn some time
46                 repeat(rnd.nextInt(3000)) { sink.incrementAndGet() }
47                 // cancel job
48                 job.cancel()
49                 cyclicBarrier.await()
50                 inters.incrementAndGet()
51             }
52         }
53         threads += List(nThreads) { threadId ->
54             thread(name = "handler-$threadId", start = false) {
55                 val rnd = Random()
56                 while (true) {
57                     val onCancelling = rnd.nextBoolean()
58                     val invokeImmediately: Boolean = rnd.nextBoolean()
59                     cyclicBarrier.await()
60                     val job = job ?: break
61                     val state = State()
62                     // burn some time
63                     repeat(rnd.nextInt(1000)) { sink.incrementAndGet() }
64                     val handle =
65                         job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) {
66                             if (!state.state.compareAndSet(0, 1))
67                                 error("Fired more than once or too late: state=${state.state.value}")
68                         }
69                     // burn some time
70                     repeat(rnd.nextInt(1000)) { sink.incrementAndGet() }
71                     // dispose
72                     handle.dispose()
73                     cyclicBarrier.await()
74                     val resultingState = state.state.value
75                     when (resultingState) {
76                         0 -> removed.incrementAndGet()
77                         1 -> fired.incrementAndGet()
78                         else -> error("Cannot happen")
79                     }
80                     if (!state.state.compareAndSet(resultingState, 2))
81                         error("Cannot fire late: resultingState=$resultingState")
82                 }
83             }
84         }
85         threads.forEach { it.start() }
86         repeat(nSeconds) { second ->
87             Thread.sleep(1000)
88             println("${second + 1}: ${inters.value} iterations")
89         }
90         done = true
91         threads.forEach { it.join() }
92         println("        Completed ${inters.value} iterations")
93         println("  Removed handler ${removed.value} times")
94         println("    Fired handler ${fired.value} times")
95 
96     }
97 }