1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.*
10 import java.util.*
11 import kotlin.coroutines.*
12 
13 class FluxCompletionStressTest : TestBase() {
14     private val N_REPEATS = 10_000 * stressTestMultiplier
15 
16     private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = flux(context) {
17         for (x in start until start + count) send(x)
18     }
19 
20     @Test
21     fun testCompletion() {
22         val rnd = Random()
23         repeat(N_REPEATS) {
24             val count = rnd.nextInt(5)
25             runBlocking {
26                 withTimeout(5000) {
27                     var received = 0
28                     range(Dispatchers.Default, 1, count).collect { x ->
29                         received++
30                         if (x != received) error("$x != $received")
31                     }
32                     if (received != count) error("$received != $count")
33                 }
34             }
35         }
36     }
37 }