1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.Observable 8 import io.reactivex.ObservableSource 9 import io.reactivex.Observer 10 import io.reactivex.disposables.Disposables 11 import io.reactivex.subjects.PublishSubject 12 import kotlinx.coroutines.* 13 import kotlinx.coroutines.channels.* 14 import kotlinx.coroutines.flow.* 15 import kotlin.test.* 16 17 class ObservableAsFlowTest : TestBase() { 18 @Test testCancellationnull19 fun testCancellation() = runTest { 20 var onNext = 0 21 var onCancelled = 0 22 var onError = 0 23 24 val source = rxObservable(currentDispatcher()) { 25 coroutineContext[Job]?.invokeOnCompletion { 26 if (it is CancellationException) ++onCancelled 27 } 28 29 repeat(100) { 30 send(it) 31 } 32 } 33 34 source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { 35 onEach { 36 ++onNext 37 throw RuntimeException() 38 } 39 catch<Throwable> { 40 ++onError 41 } 42 }.join() 43 44 45 assertEquals(1, onNext) 46 assertEquals(1, onError) 47 assertEquals(1, onCancelled) 48 } 49 50 @Test testImmediateCollectionnull51 fun testImmediateCollection() { 52 val source = PublishSubject.create<Int>() 53 val flow = source.asFlow() 54 GlobalScope.launch(Dispatchers.Unconfined) { 55 expect(1) 56 flow.collect { expect(it) } 57 expect(6) 58 } 59 expect(2) 60 source.onNext(3) 61 expect(4) 62 source.onNext(5) 63 source.onComplete() 64 finish(7) 65 } 66 67 @Test testOnErrorCancellationnull68 fun testOnErrorCancellation() { 69 val source = PublishSubject.create<Int>() 70 val flow = source.asFlow() 71 val exception = RuntimeException() 72 GlobalScope.launch(Dispatchers.Unconfined) { 73 try { 74 expect(1) 75 flow.collect { expect(it) } 76 expectUnreached() 77 } 78 catch (e: Exception) { 79 assertSame(exception, e.cause) 80 expect(5) 81 } 82 expect(6) 83 } 84 expect(2) 85 source.onNext(3) 86 expect(4) 87 source.onError(exception) 88 finish(7) 89 } 90 91 @Test testUnsubscribeOnCollectionExceptionnull92 fun testUnsubscribeOnCollectionException() { 93 val source = PublishSubject.create<Int>() 94 val flow = source.asFlow() 95 val exception = RuntimeException() 96 GlobalScope.launch(Dispatchers.Unconfined) { 97 try { 98 expect(1) 99 flow.collect { 100 expect(it) 101 if (it == 3) throw exception 102 } 103 expectUnreached() 104 } 105 catch (e: Exception) { 106 assertSame(exception, e.cause) 107 expect(4) 108 } 109 expect(5) 110 } 111 expect(2) 112 assertTrue(source.hasObservers()) 113 source.onNext(3) 114 assertFalse(source.hasObservers()) 115 finish(6) 116 } 117 118 @Test testLateOnSubscribenull119 fun testLateOnSubscribe() { 120 var observer: Observer<in Int>? = null 121 val source = ObservableSource<Int> { observer = it } 122 val flow = source.asFlow() 123 assertNull(observer) 124 val job = GlobalScope.launch(Dispatchers.Unconfined) { 125 expect(1) 126 flow.collect { expectUnreached() } 127 expectUnreached() 128 } 129 expect(2) 130 assertNotNull(observer) 131 job.cancel() 132 val disposable = Disposables.empty() 133 observer!!.onSubscribe(disposable) 134 assertTrue(disposable.isDisposed) 135 finish(3) 136 } 137 138 @Test <lambda>null139 fun testBufferUnlimited() = runTest { 140 val source = rxObservable(currentDispatcher()) { 141 expect(1); send(10) 142 expect(2); send(11) 143 expect(3); send(12) 144 expect(4); send(13) 145 expect(5); send(14) 146 expect(6); send(15) 147 expect(7); send(16) 148 expect(8); send(17) 149 expect(9) 150 } 151 source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) } 152 finish(18) 153 } 154 155 @Test <lambda>null156 fun testConflated() = runTest { 157 val source = Observable.range(1, 5) 158 val list = source.asFlow().conflate().toList() 159 assertEquals(listOf(1, 5), list) 160 } 161 162 @Test <lambda>null163 fun testLongRange() = runTest { 164 val source = Observable.range(1, 10_000) 165 val count = source.asFlow().count() 166 assertEquals(10_000, count) 167 } 168 169 @Test <lambda>null170 fun testProduce() = runTest { 171 val source = Observable.range(0, 10) 172 val flow = source.asFlow() 173 check((0..9).toList(), flow.produceIn(this)) 174 check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) 175 check((0..9).toList(), flow.buffer(2).produceIn(this)) 176 check((0..9).toList(), flow.buffer(0).produceIn(this)) 177 check(listOf(0, 9), flow.conflate().produceIn(this)) 178 } 179 checknull180 private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) { 181 val result = ArrayList<Int>(10) 182 channel.consumeEach { result.add(it) } 183 assertEquals(expected, result) 184 } 185 }