1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import java.util.* 9 import java.util.concurrent.* 10 import kotlin.concurrent.* 11 import kotlin.test.* 12 13 class JobHandlersUpgradeStressTest : TestBase() { 14 private val nSeconds = 3 * stressTestMultiplier 15 private val nThreads = 4 16 17 private val cyclicBarrier = CyclicBarrier(1 + nThreads) 18 private val threads = mutableListOf<Thread>() 19 20 private val inters = atomic(0) 21 private val removed = atomic(0) 22 private val fired = atomic(0) 23 24 private val sink = atomic(0) 25 26 @Volatile 27 private var done = false 28 29 @Volatile 30 private var job: Job? = null 31 32 class State { 33 val state = atomic(0) 34 } 35 36 @Test 37 fun testStress() { 38 println("--- JobHandlersUpgradeStressTest") 39 threads += thread(name = "creator", start = false) { 40 val rnd = Random() 41 while (true) { 42 job = if (done) null else Job() 43 cyclicBarrier.await() 44 val job = job ?: break 45 // burn some time 46 repeat(rnd.nextInt(3000)) { sink.incrementAndGet() } 47 // cancel job 48 job.cancel() 49 cyclicBarrier.await() 50 inters.incrementAndGet() 51 } 52 } 53 threads += List(nThreads) { threadId -> 54 thread(name = "handler-$threadId", start = false) { 55 val rnd = Random() 56 while (true) { 57 val onCancelling = rnd.nextBoolean() 58 val invokeImmediately: Boolean = rnd.nextBoolean() 59 cyclicBarrier.await() 60 val job = job ?: break 61 val state = State() 62 // burn some time 63 repeat(rnd.nextInt(1000)) { sink.incrementAndGet() } 64 val handle = 65 job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) { 66 if (!state.state.compareAndSet(0, 1)) 67 error("Fired more than once or too late: state=${state.state.value}") 68 } 69 // burn some time 70 repeat(rnd.nextInt(1000)) { sink.incrementAndGet() } 71 // dispose 72 handle.dispose() 73 cyclicBarrier.await() 74 val resultingState = state.state.value 75 when (resultingState) { 76 0 -> removed.incrementAndGet() 77 1 -> fired.incrementAndGet() 78 else -> error("Cannot happen") 79 } 80 if (!state.state.compareAndSet(resultingState, 2)) 81 error("Cannot fire late: resultingState=$resultingState") 82 } 83 } 84 } 85 threads.forEach { it.start() } 86 repeat(nSeconds) { second -> 87 Thread.sleep(1000) 88 println("${second + 1}: ${inters.value} iterations") 89 } 90 done = true 91 threads.forEach { it.join() } 92 println(" Completed ${inters.value} iterations") 93 println(" Removed handler ${removed.value} times") 94 println(" Fired handler ${fired.value} times") 95 96 } 97 }