1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.flow.consumeAsFlow 10 import org.junit.Assert 11 import org.junit.Test 12 import kotlin.test.* 13 14 class ConvertTest : TestBase() { 15 @Test 16 fun testToCompletableSuccess() = runBlocking { 17 expect(1) 18 val job = launch { 19 expect(3) 20 } 21 val completable = job.asCompletable(coroutineContext.minusKey(Job)) 22 completable.subscribe { 23 expect(4) 24 } 25 expect(2) 26 yield() 27 finish(5) 28 } 29 30 @Test 31 fun testToCompletableFail() = runBlocking { 32 expect(1) 33 val job = async(NonCancellable) { // don't kill parent on exception 34 expect(3) 35 throw RuntimeException("OK") 36 } 37 val completable = job.asCompletable(coroutineContext.minusKey(Job)) 38 completable.subscribe { 39 expect(4) 40 } 41 expect(2) 42 yield() 43 finish(5) 44 } 45 46 @Test 47 fun testToMaybe() { 48 val d = GlobalScope.async { 49 delay(50) 50 "OK" 51 } 52 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 53 checkMaybeValue(maybe1) { 54 assertEquals("OK", it) 55 } 56 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 57 checkMaybeValue(maybe2) { 58 assertEquals("OK", it) 59 } 60 } 61 62 @Test 63 fun testToMaybeEmpty() { 64 val d = GlobalScope.async { 65 delay(50) 66 null 67 } 68 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 69 checkMaybeValue(maybe1, Assert::assertNull) 70 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 71 checkMaybeValue(maybe2, Assert::assertNull) 72 } 73 74 @Test 75 fun testToMaybeFail() { 76 val d = GlobalScope.async { 77 delay(50) 78 throw TestRuntimeException("OK") 79 } 80 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 81 checkErroneous(maybe1) { 82 check(it is TestRuntimeException && it.message == "OK") { "$it" } 83 } 84 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 85 checkErroneous(maybe2) { 86 check(it is TestRuntimeException && it.message == "OK") { "$it" } 87 } 88 } 89 90 @Test 91 fun testToSingle() { 92 val d = GlobalScope.async { 93 delay(50) 94 "OK" 95 } 96 val single1 = d.asSingle(Dispatchers.Unconfined) 97 checkSingleValue(single1) { 98 assertEquals("OK", it) 99 } 100 val single2 = d.asSingle(Dispatchers.Unconfined) 101 checkSingleValue(single2) { 102 assertEquals("OK", it) 103 } 104 } 105 106 @Test 107 fun testToSingleFail() { 108 val d = GlobalScope.async { 109 delay(50) 110 throw TestRuntimeException("OK") 111 } 112 val single1 = d.asSingle(Dispatchers.Unconfined) 113 checkErroneous(single1) { 114 check(it is TestRuntimeException && it.message == "OK") { "$it" } 115 } 116 val single2 = d.asSingle(Dispatchers.Unconfined) 117 checkErroneous(single2) { 118 check(it is TestRuntimeException && it.message == "OK") { "$it" } 119 } 120 } 121 122 @Test 123 fun testToObservable() { 124 val c = GlobalScope.produce { 125 delay(50) 126 send("O") 127 delay(50) 128 send("K") 129 } 130 val observable = c.consumeAsFlow().asObservable() 131 checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) { 132 assertEquals("OK", it) 133 } 134 } 135 136 @Test 137 fun testToObservableFail() { 138 val c = GlobalScope.produce { 139 delay(50) 140 send("O") 141 delay(50) 142 throw TestException("K") 143 } 144 val observable = c.consumeAsFlow().asObservable() 145 val single = rxSingle(Dispatchers.Unconfined) { 146 var result = "" 147 try { 148 observable.collect { result += it } 149 } catch(e: Throwable) { 150 check(e is TestException) 151 result += e.message 152 } 153 result 154 } 155 checkSingleValue(single) { 156 assertEquals("OK", it) 157 } 158 } 159 } 160