1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import kotlinx.coroutines.*
9 import org.junit.*
10 import org.junit.Test
11 import java.util.concurrent.*
12 import kotlin.test.*
13 
14 class ObservableSingleTest : TestBase() {
15     @Before
setupnull16     fun setup() {
17         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
18     }
19 
20     @Test
testSingleNoWaitnull21     fun testSingleNoWait() {
22         val observable = rxObservable {
23             send("OK")
24         }
25 
26         checkSingleValue(observable) {
27             assertEquals("OK", it)
28         }
29     }
30 
31     @Test
<lambda>null32     fun testSingleAwait() = runBlocking {
33         assertEquals("OK", Observable.just("O").awaitSingle() + "K")
34     }
35 
36     @Test
testSingleEmitAndAwaitnull37     fun testSingleEmitAndAwait() {
38         val observable = rxObservable {
39             send(Observable.just("O").awaitSingle() + "K")
40         }
41 
42         checkSingleValue(observable) {
43             assertEquals("OK", it)
44         }
45     }
46 
47     @Test
testSingleWithDelaynull48     fun testSingleWithDelay() {
49         val observable = rxObservable {
50             send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
51         }
52 
53         checkSingleValue(observable) {
54             assertEquals("OK", it)
55         }
56     }
57 
58     @Test
testSingleExceptionnull59     fun testSingleException() {
60         val observable = rxObservable {
61             send(Observable.just("O", "K").awaitSingle() + "K")
62         }
63 
64         checkErroneous(observable) {
65             assertTrue(it is IllegalArgumentException)
66         }
67     }
68 
69     @Test
testAwaitFirstnull70     fun testAwaitFirst() {
71         val observable = rxObservable {
72             send(Observable.just("O", "#").awaitFirst() + "K")
73         }
74 
75         checkSingleValue(observable) {
76             assertEquals("OK", it)
77         }
78     }
79 
80     @Test
testAwaitFirstOrDefaultnull81     fun testAwaitFirstOrDefault() {
82         val observable = rxObservable {
83             send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
84         }
85 
86         checkSingleValue(observable) {
87             assertEquals("OK", it)
88         }
89     }
90 
91     @Test
testAwaitFirstOrDefaultWithValuesnull92     fun testAwaitFirstOrDefaultWithValues() {
93         val observable = rxObservable {
94             send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
95         }
96 
97         checkSingleValue(observable) {
98             assertEquals("OK", it)
99         }
100     }
101 
102     @Test
testAwaitFirstOrNullnull103     fun testAwaitFirstOrNull() {
104         val observable = rxObservable<String> {
105             send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
106         }
107 
108         checkSingleValue(observable) {
109             assertEquals("OK", it)
110         }
111     }
112 
113     @Test
testAwaitFirstOrNullWithValuesnull114     fun testAwaitFirstOrNullWithValues() {
115         val observable = rxObservable {
116             send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
117         }
118 
119         checkSingleValue(observable) {
120             assertEquals("OK", it)
121         }
122     }
123 
124     @Test
testAwaitFirstOrElsenull125     fun testAwaitFirstOrElse() {
126         val observable = rxObservable {
127             send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
128         }
129 
130         checkSingleValue(observable) {
131             assertEquals("OK", it)
132         }
133     }
134 
135     @Test
testAwaitFirstOrElseWithValuesnull136     fun testAwaitFirstOrElseWithValues() {
137         val observable = rxObservable {
138             send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
139         }
140 
141         checkSingleValue(observable) {
142             assertEquals("OK", it)
143         }
144     }
145 
146     @Test
testAwaitLastnull147     fun testAwaitLast() {
148         val observable = rxObservable {
149             send(Observable.just("#", "O").awaitLast() + "K")
150         }
151 
152         checkSingleValue(observable) {
153             assertEquals("OK", it)
154         }
155     }
156 
157     @Test
testExceptionFromObservablenull158     fun testExceptionFromObservable() {
159         val observable = rxObservable {
160             try {
161                 send(Observable.error<String>(RuntimeException("O")).awaitFirst())
162             } catch (e: RuntimeException) {
163                 send(Observable.just(e.message!!).awaitLast() + "K")
164             }
165         }
166 
167         checkSingleValue(observable) {
168             assertEquals("OK", it)
169         }
170     }
171 
172     @Test
testExceptionFromCoroutinenull173     fun testExceptionFromCoroutine() {
174         val observable = rxObservable<String> {
175             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
176         }
177 
178         checkErroneous(observable) {
179             assertTrue(it is IllegalStateException)
180             assertEquals("OK", it.message)
181         }
182     }
183 
184     @Test
testObservableIterationnull185     fun testObservableIteration() {
186         val observable = rxObservable {
187             var result = ""
188             Observable.just("O", "K").collect { result += it }
189             send(result)
190         }
191 
192         checkSingleValue(observable) {
193             assertEquals("OK", it)
194         }
195     }
196 
197     @Test
testObservableIterationFailurenull198     fun testObservableIterationFailure() {
199         val observable = rxObservable {
200             try {
201                 Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
202                 send("Fail")
203             } catch (e: RuntimeException) {
204                 send(e.message!!)
205             }
206         }
207 
208         checkSingleValue(observable) {
209             assertEquals("OK", it)
210         }
211     }
212 }
213