1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.flow.* 10 import org.junit.Test 11 import org.junit.runner.* 12 import org.junit.runners.* 13 import kotlin.coroutines.* 14 import kotlin.test.* 15 16 @RunWith(Parameterized::class) 17 class IntegrationTest( 18 private val ctx: Ctx, 19 private val delay: Boolean 20 ) : TestBase() { 21 22 enum class Ctx { 23 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) }, 24 DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default }, 25 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined }; 26 27 abstract operator fun invoke(context: CoroutineContext): CoroutineContext 28 } 29 30 companion object { 31 @Parameterized.Parameters(name = "ctx={0}, delay={1}") 32 @JvmStatic 33 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx -> 34 listOf(false, true).map { delay -> 35 arrayOf(ctx, delay) 36 } 37 } 38 } 39 40 @Test 41 fun testEmpty(): Unit = runBlocking { 42 val observable = rxObservable<String>(ctx(coroutineContext)) { 43 if (delay) delay(1) 44 // does not send anything 45 } 46 assertFailsWith<NoSuchElementException> { observable.awaitFirst() } 47 assertEquals("OK", observable.awaitFirstOrDefault("OK")) 48 assertNull(observable.awaitFirstOrNull()) 49 assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" }) 50 assertFailsWith<NoSuchElementException> { observable.awaitLast() } 51 assertFailsWith<NoSuchElementException> { observable.awaitSingle() } 52 var cnt = 0 53 observable.collect { 54 cnt++ 55 } 56 assertEquals(0, cnt) 57 } 58 59 @Test 60 fun testSingle() = runBlocking { 61 val observable = rxObservable(ctx(coroutineContext)) { 62 if (delay) delay(1) 63 send("OK") 64 } 65 assertEquals("OK", observable.awaitFirst()) 66 assertEquals("OK", observable.awaitFirstOrDefault("OK")) 67 assertEquals("OK", observable.awaitFirstOrNull()) 68 assertEquals("OK", observable.awaitFirstOrElse { "ELSE" }) 69 assertEquals("OK", observable.awaitLast()) 70 assertEquals("OK", observable.awaitSingle()) 71 var cnt = 0 72 observable.collect { 73 assertEquals("OK", it) 74 cnt++ 75 } 76 assertEquals(1, cnt) 77 } 78 79 @Test 80 fun testNumbers() = runBlocking<Unit> { 81 val n = 100 * stressTestMultiplier 82 val observable = rxObservable(ctx(coroutineContext)) { 83 for (i in 1..n) { 84 send(i) 85 if (delay) delay(1) 86 } 87 } 88 assertEquals(1, observable.awaitFirst()) 89 assertEquals(1, observable.awaitFirstOrDefault(0)) 90 assertEquals(1, observable.awaitFirstOrNull()) 91 assertEquals(1, observable.awaitFirstOrElse { 0 }) 92 assertEquals(n, observable.awaitLast()) 93 assertFailsWith<IllegalArgumentException> { observable.awaitSingle() } 94 checkNumbers(n, observable) 95 val channel = observable.openSubscription() 96 checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext))) 97 channel.cancel() 98 } 99 100 @Test 101 fun testCancelWithoutValue() = runTest { 102 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { 103 rxObservable<String> { 104 hang { } 105 }.awaitFirst() 106 } 107 108 job.cancel() 109 job.join() 110 } 111 112 @Test 113 fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) { 114 expect(1) 115 val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { 116 rxObservable<String> { 117 yield() 118 expect(2) 119 // Nothing to emit 120 }.awaitFirst() 121 } 122 123 job.join() 124 finish(3) 125 } 126 127 private suspend fun checkNumbers(n: Int, observable: Observable<Int>) { 128 var last = 0 129 observable.collect { 130 assertEquals(++last, it) 131 } 132 assertEquals(n, last) 133 } 134 135 } 136