1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.flow.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import kotlin.test.*
14 
15 class FlowAsObservableTest : TestBase() {
16     @Test
17     fun testBasicSuccess() = runTest {
18         expect(1)
19         val observable = flow {
20             expect(3)
21             emit("OK")
22         }.asObservable()
23 
24         expect(2)
25         observable.subscribe { value ->
26             expect(4)
27             assertEquals("OK", value)
28         }
29 
30         finish(5)
31     }
32 
33     @Test
34     fun testBasicFailure() = runTest {
35         expect(1)
36         val observable = flow<Int> {
37             expect(3)
38             throw RuntimeException("OK")
39         }.asObservable()
40 
41         expect(2)
42         observable.subscribe({ expectUnreached() }, { error ->
43             expect(4)
44             assertTrue(error is RuntimeException)
45             assertEquals("OK", error.message)
46         })
47         finish(5)
48     }
49 
50     @Test
51     fun testBasicUnsubscribe() = runTest {
52         expect(1)
53         val observable = flow<Int> {
54             expect(3)
55             hang {
56                 expect(4)
57             }
58         }.asObservable()
59 
60         expect(2)
61         val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
62         sub.dispose() // will cancel coroutine
63         finish(5)
64     }
65 
66     @Test
67     fun testNotifyOnceOnCancellation() = runTest {
68         val observable =
69             flow {
70                 expect(3)
71                 emit("OK")
72                 hang {
73                     expect(7)
74                 }
75             }.asObservable()
76                 .doOnNext {
77                     expect(4)
78                     assertEquals("OK", it)
79                 }
80                 .doOnDispose {
81                     expect(6) // notified once!
82                 }
83 
84         expect(1)
85         val job = launch(start = CoroutineStart.UNDISPATCHED) {
86             expect(2)
87             observable.collect {
88                 expect(5)
89                 assertEquals("OK", it)
90             }
91         }
92 
93         yield()
94         job.cancelAndJoin()
95         finish(8)
96     }
97 
98     @Test
99     fun testFailingConsumer() = runTest {
100         expect(1)
101         val observable = flow {
102             expect(2)
103             emit("OK")
104             hang {
105                 expect(4)
106             }
107 
108         }.asObservable()
109 
110         try {
111             observable.collect {
112                 expect(3)
113                 throw TestException()
114             }
115         } catch (e: TestException) {
116             finish(5)
117         }
118     }
119 
120     @Test
121     fun testNonAtomicStart() = runTest {
122         withContext(Dispatchers.Unconfined) {
123             val observable = flow<Int> {
124                 expect(1)
125             }.asObservable()
126 
127             val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
128             disposable.dispose()
129         }
130         finish(2)
131     }
132 
133     @Test
134     fun testFlowCancelledFromWithin() = runTest {
135         val observable = flow {
136             expect(1)
137             emit(1)
138             kotlin.coroutines.coroutineContext.cancel()
139             kotlin.coroutines.coroutineContext.ensureActive()
140             expectUnreached()
141         }.asObservable()
142 
143         observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
144     }
145 
146     @Test
147     fun testUnconfinedDefaultContext() {
148         expect(1)
149         val thread = Thread.currentThread()
150         fun checkThread() {
151             assertSame(thread, Thread.currentThread())
152         }
153         flowOf(42).asObservable().subscribe(object : Observer<Int> {
154             override fun onSubscribe(d: Disposable) {
155                 expect(2)
156             }
157 
158             override fun onNext(t: Int) {
159                 checkThread()
160                 expect(3)
161                 assertEquals(42, t)
162             }
163 
164             override fun onComplete() {
165                 checkThread()
166                 expect(4)
167             }
168 
169             override fun onError(t: Throwable) {
170                 expectUnreached()
171             }
172         })
173         finish(5)
174     }
175 
176     @Test
177     fun testConfinedContext() {
178         expect(1)
179         val threadName = "FlowAsObservableTest.testConfinedContext"
180         fun checkThread() {
181             val currentThread = Thread.currentThread()
182             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
183         }
184         val completed = CountDownLatch(1)
185         newSingleThreadContext(threadName).use { dispatcher ->
186             flowOf(42).asObservable(dispatcher).subscribe(object : Observer<Int> {
187                 override fun onSubscribe(d: Disposable) {
188                     expect(2)
189                 }
190 
191                 override fun onNext(t: Int) {
192                     checkThread()
193                     expect(3)
194                     assertEquals(42, t)
195                 }
196 
197                 override fun onComplete() {
198                     checkThread()
199                     expect(4)
200                     completed.countDown()
201                 }
202 
203                 override fun onError(e: Throwable) {
204                     expectUnreached()
205                 }
206             })
207             completed.await()
208         }
209         finish(5)
210     }
211 }
212