1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.*
10 import org.junit.Test
11 import kotlin.test.*
12 
13 class FlowableTest : TestBase() {
14     @Test
15     fun testBasicSuccess() = runBlocking {
16         expect(1)
17         val observable = rxFlowable(currentDispatcher()) {
18             expect(4)
19             send("OK")
20         }
21         expect(2)
22         observable.subscribe { value ->
23             expect(5)
24             assertEquals("OK", value)
25         }
26         expect(3)
27         yield() // to started coroutine
28         finish(6)
29     }
30 
31     @Test
32     fun testBasicFailure() = runBlocking {
33         expect(1)
34         val observable = rxFlowable<String>(currentDispatcher()) {
35             expect(4)
36             throw RuntimeException("OK")
37         }
38         expect(2)
39         observable.subscribe({
40             expectUnreached()
41         }, { error ->
42             expect(5)
43             assertTrue(error is RuntimeException)
44             assertEquals("OK", error.message)
45         })
46         expect(3)
47         yield() // to started coroutine
48         finish(6)
49     }
50 
51     @Test
52     fun testBasicUnsubscribe() = runBlocking {
53         expect(1)
54         val observable = rxFlowable<String>(currentDispatcher()) {
55             expect(4)
56             yield() // back to main, will get cancelled
57             expectUnreached()
58         }
59         expect(2)
60         val sub = observable.subscribe({
61             expectUnreached()
62         }, {
63             expectUnreached()
64         })
65         expect(3)
66         yield() // to started coroutine
67         expect(5)
68         sub.dispose() // will cancel coroutine
69         yield()
70         finish(6)
71     }
72 
73     @Test
74     fun testNotifyOnceOnCancellation() = runTest {
75         expect(1)
76         val observable =
77             rxFlowable(currentDispatcher()) {
78                 expect(5)
79                 send("OK")
80                 try {
81                     delay(Long.MAX_VALUE)
82                 } catch (e: CancellationException) {
83                     expect(11)
84                 }
85             }
86             .doOnNext {
87                 expect(6)
88                 assertEquals("OK", it)
89             }
90             .doOnCancel {
91                 expect(10) // notified once!
92             }
93         expect(2)
94         val job = launch(start = CoroutineStart.UNDISPATCHED) {
95             expect(3)
96             observable.collect {
97                 expect(8)
98                 assertEquals("OK", it)
99             }
100         }
101         expect(4)
102         yield() // to observable code
103         expect(7)
104         yield() // to consuming coroutines
105         expect(9)
106         job.cancel()
107         job.join()
108         finish(12)
109     }
110 
111     @Test
112     fun testFailingConsumer() = runTest {
113         val pub = rxFlowable(currentDispatcher()) {
114             repeat(3) {
115                 expect(it + 1) // expect(1), expect(2) *should* be invoked
116                 send(it)
117             }
118         }
119         try {
120             pub.collect {
121                 throw TestException()
122             }
123         } catch (e: TestException) {
124             finish(3)
125         }
126     }
127 }