1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import kotlinx.coroutines.* 8 import org.junit.* 9 import java.util.* 10 import kotlin.coroutines.* 11 12 class ObservableCompletionStressTest : TestBase() { 13 private val N_REPEATS = 10_000 * stressTestMultiplier 14 15 private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) { 16 for (x in start until start + count) send(x) 17 } 18 19 @Test 20 fun testCompletion() { 21 val rnd = Random() 22 repeat(N_REPEATS) { 23 val count = rnd.nextInt(5) 24 runBlocking { 25 withTimeout(5000) { 26 var received = 0 27 range(Dispatchers.Default, 1, count).collect { x -> 28 received++ 29 if (x != received) error("$x != $received") 30 } 31 if (received != count) error("$received != $count") 32 } 33 } 34 } 35 } 36 }