1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.selects.*
9 import org.junit.Test
10 import kotlin.test.*
11 
12 class ObservableSubscriptionSelectTest : TestBase() {
13     @Test
<lambda>null14     fun testSelect() = runTest {
15         // source with n ints
16         val n = 1000 * stressTestMultiplier
17         val source = rxObservable { repeat(n) { send(it) } }
18         var a = 0
19         var b = 0
20         // open two subs
21         val channelA = source.openSubscription()
22         val channelB = source.openSubscription()
23         loop@ while (true) {
24             val done: Int = select {
25                 channelA.onReceiveOrNull {
26                     if (it != null) assertEquals(a++, it)
27                     if (it == null) 0 else 1
28                 }
29                 channelB.onReceiveOrNull {
30                     if (it != null) assertEquals(b++, it)
31                     if (it == null) 0 else 2
32                 }
33             }
34             when (done) {
35                 0 -> break@loop
36                 1 -> {
37                     val r = channelB.receiveOrNull()
38                     if (r != null) assertEquals(b++, r)
39                 }
40                 2 -> {
41                     val r = channelA.receiveOrNull()
42                     if (r != null) assertEquals(a++, r)
43                 }
44             }
45         }
46         channelA.cancel()
47         channelB.cancel()
48         // should receive one of them fully
49         assertTrue(a == n || b == n)
50     }
51 }
52