1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.consumeAsFlow
10 import org.junit.Assert
11 import org.junit.Test
12 import kotlin.test.*
13 
14 class ConvertTest : TestBase() {
15     @Test
16     fun testToCompletableSuccess() = runBlocking {
17         expect(1)
18         val job = launch {
19             expect(3)
20         }
21         val completable = job.asCompletable(coroutineContext.minusKey(Job))
22         completable.subscribe {
23             expect(4)
24         }
25         expect(2)
26         yield()
27         finish(5)
28     }
29 
30     @Test
31     fun testToCompletableFail() = runBlocking {
32         expect(1)
33         val job = async(NonCancellable) { // don't kill parent on exception
34             expect(3)
35             throw RuntimeException("OK")
36         }
37         val completable = job.asCompletable(coroutineContext.minusKey(Job))
38         completable.subscribe {
39             expect(4)
40         }
41         expect(2)
42         yield()
43         finish(5)
44     }
45 
46     @Test
47     fun testToMaybe() {
48         val d = GlobalScope.async {
49             delay(50)
50             "OK"
51         }
52         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
53         checkMaybeValue(maybe1) {
54             assertEquals("OK", it)
55         }
56         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
57         checkMaybeValue(maybe2) {
58             assertEquals("OK", it)
59         }
60     }
61 
62     @Test
63     fun testToMaybeEmpty() {
64         val d = GlobalScope.async {
65             delay(50)
66             null
67         }
68         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
69         checkMaybeValue(maybe1, Assert::assertNull)
70         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
71         checkMaybeValue(maybe2, Assert::assertNull)
72     }
73 
74     @Test
75     fun testToMaybeFail() {
76         val d = GlobalScope.async {
77             delay(50)
78             throw TestRuntimeException("OK")
79         }
80         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
81         checkErroneous(maybe1) {
82             check(it is TestRuntimeException && it.message == "OK") { "$it" }
83         }
84         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
85         checkErroneous(maybe2) {
86             check(it is TestRuntimeException && it.message == "OK") { "$it" }
87         }
88     }
89 
90     @Test
91     fun testToSingle() {
92         val d = GlobalScope.async {
93             delay(50)
94             "OK"
95         }
96         val single1 = d.asSingle(Dispatchers.Unconfined)
97         checkSingleValue(single1) {
98             assertEquals("OK", it)
99         }
100         val single2 = d.asSingle(Dispatchers.Unconfined)
101         checkSingleValue(single2) {
102             assertEquals("OK", it)
103         }
104     }
105 
106     @Test
107     fun testToSingleFail() {
108         val d = GlobalScope.async {
109             delay(50)
110             throw TestRuntimeException("OK")
111         }
112         val single1 = d.asSingle(Dispatchers.Unconfined)
113         checkErroneous(single1) {
114             check(it is TestRuntimeException && it.message == "OK") { "$it" }
115         }
116         val single2 = d.asSingle(Dispatchers.Unconfined)
117         checkErroneous(single2) {
118             check(it is TestRuntimeException && it.message == "OK") { "$it" }
119         }
120     }
121 
122     @Test
123     fun testToObservable() {
124         val c = GlobalScope.produce {
125             delay(50)
126             send("O")
127             delay(50)
128             send("K")
129         }
130         val observable = c.consumeAsFlow().asObservable()
131         checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
132             assertEquals("OK", it)
133         }
134     }
135 
136     @Test
137     fun testToObservableFail() {
138         val c = GlobalScope.produce {
139             delay(50)
140             send("O")
141             delay(50)
142             throw TestException("K")
143         }
144         val observable = c.consumeAsFlow().asObservable()
145         val single = rxSingle(Dispatchers.Unconfined) {
146             var result = ""
147             try {
148                 observable.collect { result += it }
149             } catch(e: Throwable) {
150                 check(e is TestException)
151                 result += e.message
152             }
153             result
154         }
155         checkSingleValue(single) {
156             assertEquals("OK", it)
157         }
158     }
159 }
160