1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.plugins.* 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.CancellationException 11 import org.junit.* 12 import org.junit.Test 13 import java.util.concurrent.* 14 import kotlin.test.* 15 16 class ObservableTest : TestBase() { 17 @Before 18 fun setup() { 19 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 20 } 21 22 @Test 23 fun testBasicSuccess() = runBlocking { 24 expect(1) 25 val observable = rxObservable(currentDispatcher()) { 26 expect(4) 27 send("OK") 28 } 29 expect(2) 30 observable.subscribe { value -> 31 expect(5) 32 assertEquals("OK", value) 33 } 34 expect(3) 35 yield() // to started coroutine 36 finish(6) 37 } 38 39 @Test 40 fun testBasicFailure() = runBlocking { 41 expect(1) 42 val observable = rxObservable<String>(currentDispatcher()) { 43 expect(4) 44 throw RuntimeException("OK") 45 } 46 expect(2) 47 observable.subscribe({ 48 expectUnreached() 49 }, { error -> 50 expect(5) 51 assertTrue(error is RuntimeException) 52 assertEquals("OK", error.message) 53 }) 54 expect(3) 55 yield() // to started coroutine 56 finish(6) 57 } 58 59 @Test 60 fun testBasicUnsubscribe() = runBlocking { 61 expect(1) 62 val observable = rxObservable<String>(currentDispatcher()) { 63 expect(4) 64 yield() // back to main, will get cancelled 65 expectUnreached() 66 } 67 expect(2) 68 val sub = observable.subscribe({ 69 expectUnreached() 70 }, { 71 expectUnreached() 72 }) 73 expect(3) 74 yield() // to started coroutine 75 expect(5) 76 sub.dispose() // will cancel coroutine 77 yield() 78 finish(6) 79 } 80 81 @Test 82 fun testNotifyOnceOnCancellation() = runTest { 83 expect(1) 84 val observable = 85 rxObservable(currentDispatcher()) { 86 expect(5) 87 send("OK") 88 try { 89 delay(Long.MAX_VALUE) 90 } catch (e: CancellationException) { 91 expect(11) 92 } 93 } 94 .doOnNext { 95 expect(6) 96 assertEquals("OK", it) 97 } 98 .doOnDispose { 99 expect(10) // notified once! 100 } 101 expect(2) 102 val job = launch(start = CoroutineStart.UNDISPATCHED) { 103 expect(3) 104 observable.collect { 105 expect(8) 106 assertEquals("OK", it) 107 } 108 } 109 expect(4) 110 yield() // to observable code 111 expect(7) 112 yield() // to consuming coroutines 113 expect(9) 114 job.cancel() 115 job.join() 116 finish(12) 117 } 118 119 @Test 120 fun testFailingConsumer() = runTest { 121 expect(1) 122 val pub = rxObservable(currentDispatcher()) { 123 expect(2) 124 send("OK") 125 try { 126 delay(Long.MAX_VALUE) 127 } catch (e: CancellationException) { 128 finish(5) 129 } 130 } 131 try { 132 pub.collect { 133 expect(3) 134 throw TestException() 135 } 136 } catch (e: TestException) { 137 expect(4) 138 } 139 } 140 141 @Test 142 fun testExceptionAfterCancellation() { 143 // Test that no exceptions were reported to the global EH (it will fail the test if so) 144 val handler = { e: Throwable -> 145 assertFalse(e is CancellationException) 146 } 147 withExceptionHandler(handler) { 148 RxJavaPlugins.setErrorHandler { 149 require(it !is CancellationException) 150 } 151 Observable 152 .interval(1, TimeUnit.MILLISECONDS) 153 .take(1000) 154 .switchMapSingle { 155 rxSingle { 156 timeBomb().await() 157 } 158 } 159 .blockingSubscribe({}, {}) 160 } 161 } 162 163 private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() } 164 } 165