1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import org.junit.Test 8 import kotlin.concurrent.thread 9 10 /** 11 * Tests concurrent cancel & dispose of the jobs. 12 */ 13 class JobDisposeStressTest: TestBase() { 14 private val TEST_DURATION = 3 * stressTestMultiplier // seconds 15 16 @Volatile 17 private var done = false 18 @Volatile 19 private var job: TestJob? = null 20 @Volatile 21 private var handle: DisposableHandle? = null 22 23 @Volatile 24 private var exception: Throwable? = null 25 26 private fun testThread(name: String, block: () -> Unit): Thread = 27 thread(start = false, name = name, block = block).apply { 28 setUncaughtExceptionHandler { t, e -> 29 exception = e 30 println("Exception in ${t.name}: $e") 31 e.printStackTrace() 32 } 33 } 34 35 @Test 36 fun testConcurrentDispose() { 37 // create threads 38 val threads = mutableListOf<Thread>() 39 threads += testThread("creator") { 40 while (!done) { 41 val job = TestJob() 42 val handle = job.invokeOnCompletion(onCancelling = true) { /* nothing */ } 43 this.job = job // post job to cancelling thread 44 this.handle = handle // post handle to concurrent disposer thread 45 handle.dispose() // dispose of handle from this thread (concurrently with other disposer) 46 } 47 } 48 49 threads += testThread("canceller") { 50 while (!done) { 51 val job = this.job ?: continue 52 job.cancel() 53 // Always returns true, TestJob never completes 54 } 55 } 56 57 threads += testThread("disposer") { 58 while (!done) { 59 handle?.dispose() 60 } 61 } 62 63 // start threads 64 threads.forEach { it.start() } 65 // wait 66 for (i in 1..TEST_DURATION) { 67 println("$i: Running") 68 Thread.sleep(1000) 69 if (exception != null) break 70 } 71 // done 72 done = true 73 // join threads 74 threads.forEach { it.join() } 75 // rethrow exception if any 76 } 77 78 @Suppress("DEPRECATION_ERROR") 79 private class TestJob : JobSupport(active = true) 80 }