1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.flow.consumeAsFlow
10 import org.junit.Test
11 import org.junit.runner.*
12 import org.junit.runners.*
13 import kotlin.coroutines.*
14 import kotlin.test.*
15 
16 @RunWith(Parameterized::class)
17 class IntegrationTest(
18     private val ctx: Ctx,
19     private val delay: Boolean
20 ) : TestBase() {
21 
22     enum class Ctx {
23         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
24         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
25         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
26 
27         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
28     }
29 
30     companion object {
31         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
32         @JvmStatic
33         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
34             listOf(false, true).map { delay ->
35                 arrayOf(ctx, delay)
36             }
37         }
38     }
39 
40     @Test
41     fun testEmpty(): Unit = runBlocking {
42         val observable = rxObservable<String>(ctx(coroutineContext)) {
43             if (delay) delay(1)
44             // does not send anything
45         }
46         assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
47         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
48         assertNull(observable.awaitFirstOrNull())
49         assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
50         assertFailsWith<NoSuchElementException> { observable.awaitLast() }
51         assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
52         var cnt = 0
53         observable.collect {
54             cnt++
55         }
56         assertEquals(0, cnt)
57     }
58 
59     @Test
60     fun testSingle() = runBlocking {
61         val observable = rxObservable(ctx(coroutineContext)) {
62             if (delay) delay(1)
63             send("OK")
64         }
65         assertEquals("OK", observable.awaitFirst())
66         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
67         assertEquals("OK", observable.awaitFirstOrNull())
68         assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
69         assertEquals("OK", observable.awaitLast())
70         assertEquals("OK", observable.awaitSingle())
71         var cnt = 0
72         observable.collect {
73             assertEquals("OK", it)
74             cnt++
75         }
76         assertEquals(1, cnt)
77     }
78 
79     @Test
80     fun testNumbers() = runBlocking<Unit> {
81         val n = 100 * stressTestMultiplier
82         val observable = rxObservable(ctx(coroutineContext)) {
83             for (i in 1..n) {
84                 send(i)
85                 if (delay) delay(1)
86             }
87         }
88         assertEquals(1, observable.awaitFirst())
89         assertEquals(1, observable.awaitFirstOrDefault(0))
90         assertEquals(1, observable.awaitFirstOrNull())
91         assertEquals(1, observable.awaitFirstOrElse { 0 })
92         assertEquals(n, observable.awaitLast())
93         assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
94         checkNumbers(n, observable)
95         val channel = observable.openSubscription()
96         ctx(coroutineContext)
97         checkNumbers(n, channel.consumeAsFlow().asObservable())
98         channel.cancel()
99     }
100 
101     @Test
102     fun testCancelWithoutValue() = runTest {
103         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
104             rxObservable<String> {
105                 hang {  }
106             }.awaitFirst()
107         }
108 
109         job.cancel()
110         job.join()
111     }
112 
113     @Test
114     fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
115         expect(1)
116         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
117             rxObservable<String> {
118                 yield()
119                 expect(2)
120                 // Nothing to emit
121             }.awaitFirst()
122         }
123 
124         job.join()
125         finish(3)
126     }
127 
128     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
129         var last = 0
130         observable.collect {
131             assertEquals(++last, it)
132         }
133         assertEquals(n, last)
134     }
135 
136 }
137