1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import kotlinx.coroutines.*
8 import org.junit.*
9 import java.util.*
10 import kotlin.coroutines.*
11 
12 class ObservableCompletionStressTest : TestBase() {
13     private val N_REPEATS = 10_000 * stressTestMultiplier
14 
15     private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
16         for (x in start until start + count) send(x)
17     }
18 
19     @Test
20     fun testCompletion() {
21         val rnd = Random()
22         repeat(N_REPEATS) {
23             val count = rnd.nextInt(5)
24             runBlocking {
25                 withTimeout(5000) {
26                     var received = 0
27                     range(Dispatchers.Default, 1, count).collect { x ->
28                         received++
29                         if (x != received) error("$x != $received")
30                     }
31                     if (received != count) error("$received != $count")
32                 }
33             }
34         }
35     }
36 }