1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.plugins.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.CancellationException
11 import org.junit.*
12 import org.junit.Test
13 import java.util.concurrent.*
14 import kotlin.test.*
15 
16 class ObservableTest : TestBase() {
17     @Before
18     fun setup() {
19         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
20     }
21 
22     @Test
23     fun testBasicSuccess() = runBlocking {
24         expect(1)
25         val observable = rxObservable(currentDispatcher()) {
26             expect(4)
27             send("OK")
28         }
29         expect(2)
30         observable.subscribe { value ->
31             expect(5)
32             assertEquals("OK", value)
33         }
34         expect(3)
35         yield() // to started coroutine
36         finish(6)
37     }
38 
39     @Test
40     fun testBasicFailure() = runBlocking {
41         expect(1)
42         val observable = rxObservable<String>(currentDispatcher()) {
43             expect(4)
44             throw RuntimeException("OK")
45         }
46         expect(2)
47         observable.subscribe({
48             expectUnreached()
49         }, { error ->
50             expect(5)
51             assertTrue(error is RuntimeException)
52             assertEquals("OK", error.message)
53         })
54         expect(3)
55         yield() // to started coroutine
56         finish(6)
57     }
58 
59     @Test
60     fun testBasicUnsubscribe() = runBlocking {
61         expect(1)
62         val observable = rxObservable<String>(currentDispatcher()) {
63             expect(4)
64             yield() // back to main, will get cancelled
65             expectUnreached()
66         }
67         expect(2)
68         val sub = observable.subscribe({
69             expectUnreached()
70         }, {
71             expectUnreached()
72         })
73         expect(3)
74         yield() // to started coroutine
75         expect(5)
76         sub.dispose() // will cancel coroutine
77         yield()
78         finish(6)
79     }
80 
81     @Test
82     fun testNotifyOnceOnCancellation() = runTest {
83         expect(1)
84         val observable =
85             rxObservable(currentDispatcher()) {
86                 expect(5)
87                 send("OK")
88                 try {
89                     delay(Long.MAX_VALUE)
90                 } catch (e: CancellationException) {
91                     expect(11)
92                 }
93             }
94             .doOnNext {
95                 expect(6)
96                 assertEquals("OK", it)
97             }
98             .doOnDispose {
99                 expect(10) // notified once!
100             }
101         expect(2)
102         val job = launch(start = CoroutineStart.UNDISPATCHED) {
103             expect(3)
104             observable.collect {
105                 expect(8)
106                 assertEquals("OK", it)
107             }
108         }
109         expect(4)
110         yield() // to observable code
111         expect(7)
112         yield() // to consuming coroutines
113         expect(9)
114         job.cancel()
115         job.join()
116         finish(12)
117     }
118 
119     @Test
120     fun testFailingConsumer() = runTest {
121         expect(1)
122         val pub = rxObservable(currentDispatcher()) {
123             expect(2)
124             send("OK")
125             try {
126                 delay(Long.MAX_VALUE)
127             } catch (e: CancellationException) {
128                 finish(5)
129             }
130         }
131         try {
132             pub.collect {
133                 expect(3)
134                 throw TestException()
135             }
136         } catch (e: TestException) {
137             expect(4)
138         }
139     }
140 
141     @Test
142     fun testExceptionAfterCancellation() {
143         // Test that no exceptions were reported to the global EH (it will fail the test if so)
144         val handler = { e: Throwable ->
145             assertFalse(e is CancellationException)
146         }
147         withExceptionHandler(handler) {
148             RxJavaPlugins.setErrorHandler {
149                 require(it !is CancellationException)
150             }
151             Observable
152                 .interval(1, TimeUnit.MILLISECONDS)
153                 .take(1000)
154                 .switchMapSingle {
155                     rxSingle {
156                         timeBomb().await()
157                     }
158                 }
159                 .blockingSubscribe({}, {})
160         }
161     }
162 
163     private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
164 }
165