1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.flow.*
10 import org.junit.Test
11 import org.junit.runner.*
12 import org.junit.runners.*
13 import kotlin.coroutines.*
14 import kotlin.test.*
15 
16 @RunWith(Parameterized::class)
17 class IntegrationTest(
18     private val ctx: Ctx,
19     private val delay: Boolean
20 ) : TestBase() {
21 
22     enum class Ctx {
23         MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
24         DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
25         UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
26 
27         abstract operator fun invoke(context: CoroutineContext): CoroutineContext
28     }
29 
30     companion object {
31         @Parameterized.Parameters(name = "ctx={0}, delay={1}")
32         @JvmStatic
33         fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
34             listOf(false, true).map { delay ->
35                 arrayOf(ctx, delay)
36             }
37         }
38     }
39 
40     @Test
41     fun testEmpty(): Unit = runBlocking {
42         val observable = rxObservable<String>(ctx(coroutineContext)) {
43             if (delay) delay(1)
44             // does not send anything
45         }
46         assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
47         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
48         assertNull(observable.awaitFirstOrNull())
49         assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
50         assertFailsWith<NoSuchElementException> { observable.awaitLast() }
51         assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
52         var cnt = 0
53         observable.collect {
54             cnt++
55         }
56         assertEquals(0, cnt)
57     }
58 
59     @Test
60     fun testSingle() = runBlocking {
61         val observable = rxObservable(ctx(coroutineContext)) {
62             if (delay) delay(1)
63             send("OK")
64         }
65         assertEquals("OK", observable.awaitFirst())
66         assertEquals("OK", observable.awaitFirstOrDefault("OK"))
67         assertEquals("OK", observable.awaitFirstOrNull())
68         assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
69         assertEquals("OK", observable.awaitLast())
70         assertEquals("OK", observable.awaitSingle())
71         var cnt = 0
72         observable.collect {
73             assertEquals("OK", it)
74             cnt++
75         }
76         assertEquals(1, cnt)
77     }
78 
79     @Test
80     fun testNumbers() = runBlocking<Unit> {
81         val n = 100 * stressTestMultiplier
82         val observable = rxObservable(ctx(coroutineContext)) {
83             for (i in 1..n) {
84                 send(i)
85                 if (delay) delay(1)
86             }
87         }
88         assertEquals(1, observable.awaitFirst())
89         assertEquals(1, observable.awaitFirstOrDefault(0))
90         assertEquals(1, observable.awaitFirstOrNull())
91         assertEquals(1, observable.awaitFirstOrElse { 0 })
92         assertEquals(n, observable.awaitLast())
93         assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
94         checkNumbers(n, observable)
95         val channel = observable.openSubscription()
96         checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
97         channel.cancel()
98     }
99 
100     @Test
101     fun testCancelWithoutValue() = runTest {
102         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
103             rxObservable<String> {
104                 hang {  }
105             }.awaitFirst()
106         }
107 
108         job.cancel()
109         job.join()
110     }
111 
112     @Test
113     fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
114         expect(1)
115         val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
116             rxObservable<String> {
117                 yield()
118                 expect(2)
119                 // Nothing to emit
120             }.awaitFirst()
121         }
122 
123         job.join()
124         finish(3)
125     }
126 
127     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
128         var last = 0
129         observable.collect {
130             assertEquals(++last, it)
131         }
132         assertEquals(n, last)
133     }
134 
135 }
136