1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.flow.consumeAsFlow 10 import org.junit.Test 11 import org.junit.runner.* 12 import org.junit.runners.* 13 import kotlin.coroutines.* 14 import kotlin.test.* 15 16 @RunWith(Parameterized::class) 17 class IntegrationTest( 18 private val ctx: Ctx, 19 private val delay: Boolean 20 ) : TestBase() { 21 22 enum class Ctx { 23 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) }, 24 DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default }, 25 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined }; 26 27 abstract operator fun invoke(context: CoroutineContext): CoroutineContext 28 } 29 30 companion object { 31 @Parameterized.Parameters(name = "ctx={0}, delay={1}") 32 @JvmStatic 33 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx -> 34 listOf(false, true).map { delay -> 35 arrayOf(ctx, delay) 36 } 37 } 38 } 39 40 @Test 41 fun testEmpty(): Unit = runBlocking { 42 val observable = rxObservable<String>(ctx(coroutineContext)) { 43 if (delay) delay(1) 44 // does not send anything 45 } 46 assertFailsWith<NoSuchElementException> { observable.awaitFirst() } 47 assertEquals("OK", observable.awaitFirstOrDefault("OK")) 48 assertNull(observable.awaitFirstOrNull()) 49 assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" }) 50 assertFailsWith<NoSuchElementException> { observable.awaitLast() } 51 assertFailsWith<NoSuchElementException> { observable.awaitSingle() } 52 var cnt = 0 53 observable.collect { 54 cnt++ 55 } 56 assertEquals(0, cnt) 57 } 58 59 @Test 60 fun testSingle() = runBlocking { 61 val observable = rxObservable(ctx(coroutineContext)) { 62 if (delay) delay(1) 63 send("OK") 64 } 65 assertEquals("OK", observable.awaitFirst()) 66 assertEquals("OK", observable.awaitFirstOrDefault("OK")) 67 assertEquals("OK", observable.awaitFirstOrNull()) 68 assertEquals("OK", observable.awaitFirstOrElse { "ELSE" }) 69 assertEquals("OK", observable.awaitLast()) 70 assertEquals("OK", observable.awaitSingle()) 71 var cnt = 0 72 observable.collect { 73 assertEquals("OK", it) 74 cnt++ 75 } 76 assertEquals(1, cnt) 77 } 78 79 @Test 80 fun testNumbers() = runBlocking<Unit> { 81 val n = 100 * stressTestMultiplier 82 val observable = rxObservable(ctx(coroutineContext)) { 83 for (i in 1..n) { 84 send(i) 85 if (delay) delay(1) 86 } 87 } 88 assertEquals(1, observable.awaitFirst()) 89 assertEquals(1, observable.awaitFirstOrDefault(0)) 90 assertEquals(1, observable.awaitFirstOrNull()) 91 assertEquals(1, observable.awaitFirstOrElse { 0 }) 92 assertEquals(n, observable.awaitLast()) 93 assertFailsWith<IllegalArgumentException> { observable.awaitSingle() } 94 checkNumbers(n, observable) 95 val channel = observable.openSubscription() 96 ctx(coroutineContext) 97 checkNumbers(n, channel.consumeAsFlow().asObservable()) 98 channel.cancel() 99 } 100 101 @Test 102 fun testCancelWithoutValue() = runTest { 103 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { 104 rxObservable<String> { 105 hang { } 106 }.awaitFirst() 107 } 108 109 job.cancel() 110 job.join() 111 } 112 113 @Test 114 fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) { 115 expect(1) 116 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { 117 rxObservable<String> { 118 yield() 119 expect(2) 120 // Nothing to emit 121 }.awaitFirst() 122 } 123 124 job.join() 125 finish(3) 126 } 127 128 private suspend fun checkNumbers(n: Int, observable: Observable<Int>) { 129 var last = 0 130 observable.collect { 131 assertEquals(++last, it) 132 } 133 assertEquals(n, last) 134 } 135 136 } 137