1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.Observable
8 import io.reactivex.ObservableSource
9 import io.reactivex.Observer
10 import io.reactivex.disposables.Disposables
11 import io.reactivex.subjects.PublishSubject
12 import kotlinx.coroutines.*
13 import kotlinx.coroutines.channels.*
14 import kotlinx.coroutines.flow.*
15 import kotlin.test.*
16 
17 class ObservableAsFlowTest : TestBase() {
18     @Test
testCancellationnull19     fun testCancellation() = runTest {
20         var onNext = 0
21         var onCancelled = 0
22         var onError = 0
23 
24         val source = rxObservable(currentDispatcher()) {
25             coroutineContext[Job]?.invokeOnCompletion {
26                 if (it is CancellationException) ++onCancelled
27             }
28 
29             repeat(100) {
30                 send(it)
31             }
32         }
33 
34         source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
35             onEach {
36                 ++onNext
37                 throw RuntimeException()
38             }
39             catch<Throwable> {
40                 ++onError
41             }
42         }.join()
43 
44 
45         assertEquals(1, onNext)
46         assertEquals(1, onError)
47         assertEquals(1, onCancelled)
48     }
49 
50     @Test
testImmediateCollectionnull51     fun testImmediateCollection() {
52         val source = PublishSubject.create<Int>()
53         val flow = source.asFlow()
54         GlobalScope.launch(Dispatchers.Unconfined) {
55             expect(1)
56             flow.collect { expect(it) }
57             expect(6)
58         }
59         expect(2)
60         source.onNext(3)
61         expect(4)
62         source.onNext(5)
63         source.onComplete()
64         finish(7)
65     }
66 
67     @Test
testOnErrorCancellationnull68     fun testOnErrorCancellation() {
69         val source = PublishSubject.create<Int>()
70         val flow = source.asFlow()
71         val exception = RuntimeException()
72         GlobalScope.launch(Dispatchers.Unconfined) {
73             try {
74                 expect(1)
75                 flow.collect { expect(it) }
76                 expectUnreached()
77             }
78             catch (e: Exception) {
79                 assertSame(exception, e.cause)
80                 expect(5)
81             }
82             expect(6)
83         }
84         expect(2)
85         source.onNext(3)
86         expect(4)
87         source.onError(exception)
88         finish(7)
89     }
90 
91     @Test
testUnsubscribeOnCollectionExceptionnull92     fun testUnsubscribeOnCollectionException() {
93         val source = PublishSubject.create<Int>()
94         val flow = source.asFlow()
95         val exception = RuntimeException()
96         GlobalScope.launch(Dispatchers.Unconfined) {
97             try {
98                 expect(1)
99                 flow.collect {
100                     expect(it)
101                     if (it == 3) throw exception
102                 }
103                 expectUnreached()
104             }
105             catch (e: Exception) {
106                 assertSame(exception, e.cause)
107                 expect(4)
108             }
109             expect(5)
110         }
111         expect(2)
112         assertTrue(source.hasObservers())
113         source.onNext(3)
114         assertFalse(source.hasObservers())
115         finish(6)
116     }
117 
118     @Test
testLateOnSubscribenull119     fun testLateOnSubscribe() {
120         var observer: Observer<in Int>? = null
121         val source = ObservableSource<Int> { observer = it }
122         val flow = source.asFlow()
123         assertNull(observer)
124         val job = GlobalScope.launch(Dispatchers.Unconfined) {
125             expect(1)
126             flow.collect { expectUnreached() }
127             expectUnreached()
128         }
129         expect(2)
130         assertNotNull(observer)
131         job.cancel()
132         val disposable = Disposables.empty()
133         observer!!.onSubscribe(disposable)
134         assertTrue(disposable.isDisposed)
135         finish(3)
136     }
137 
138     @Test
<lambda>null139     fun testBufferUnlimited() = runTest {
140         val source = rxObservable(currentDispatcher()) {
141             expect(1); send(10)
142             expect(2); send(11)
143             expect(3); send(12)
144             expect(4); send(13)
145             expect(5); send(14)
146             expect(6); send(15)
147             expect(7); send(16)
148             expect(8); send(17)
149             expect(9)
150         }
151         source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
152         finish(18)
153     }
154 
155     @Test
<lambda>null156     fun testConflated() = runTest {
157         val source = Observable.range(1, 5)
158         val list = source.asFlow().conflate().toList()
159         assertEquals(listOf(1, 5), list)
160     }
161 
162     @Test
<lambda>null163     fun testLongRange() = runTest {
164         val source = Observable.range(1, 10_000)
165         val count = source.asFlow().count()
166         assertEquals(10_000, count)
167     }
168 
169     @Test
<lambda>null170     fun testProduce() = runTest {
171         val source = Observable.range(0, 10)
172         val flow = source.asFlow()
173         check((0..9).toList(), flow.produceIn(this))
174         check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
175         check((0..9).toList(), flow.buffer(2).produceIn(this))
176         check((0..9).toList(), flow.buffer(0).produceIn(this))
177         check(listOf(0, 9), flow.conflate().produceIn(this))
178     }
179 
checknull180     private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
181         val result = ArrayList<Int>(10)
182         channel.consumeEach { result.add(it) }
183         assertEquals(expected, result)
184     }
185 }