1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.selects.* 9 import org.junit.Test 10 import kotlin.test.* 11 12 class ObservableSubscriptionSelectTest : TestBase() { 13 @Test <lambda>null14 fun testSelect() = runTest { 15 // source with n ints 16 val n = 1000 * stressTestMultiplier 17 val source = rxObservable { repeat(n) { send(it) } } 18 var a = 0 19 var b = 0 20 // open two subs 21 val channelA = source.openSubscription() 22 val channelB = source.openSubscription() 23 loop@ while (true) { 24 val done: Int = select { 25 channelA.onReceiveOrNull { 26 if (it != null) assertEquals(a++, it) 27 if (it == null) 0 else 1 28 } 29 channelB.onReceiveOrNull { 30 if (it != null) assertEquals(b++, it) 31 if (it == null) 0 else 2 32 } 33 } 34 when (done) { 35 0 -> break@loop 36 1 -> { 37 val r = channelB.receiveOrNull() 38 if (r != null) assertEquals(b++, r) 39 } 40 2 -> { 41 val r = channelA.receiveOrNull() 42 if (r != null) assertEquals(a++, r) 43 } 44 } 45 } 46 channelA.cancel() 47 channelB.cancel() 48 // should receive one of them fully 49 assertTrue(a == n || b == n) 50 } 51 } 52