1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import org.junit.* 10 import org.junit.Test 11 import kotlin.test.* 12 13 class FlowableTest : TestBase() { 14 @Test 15 fun testBasicSuccess() = runBlocking { 16 expect(1) 17 val observable = rxFlowable(currentDispatcher()) { 18 expect(4) 19 send("OK") 20 } 21 expect(2) 22 observable.subscribe { value -> 23 expect(5) 24 assertEquals("OK", value) 25 } 26 expect(3) 27 yield() // to started coroutine 28 finish(6) 29 } 30 31 @Test 32 fun testBasicFailure() = runBlocking { 33 expect(1) 34 val observable = rxFlowable<String>(currentDispatcher()) { 35 expect(4) 36 throw RuntimeException("OK") 37 } 38 expect(2) 39 observable.subscribe({ 40 expectUnreached() 41 }, { error -> 42 expect(5) 43 assertTrue(error is RuntimeException) 44 assertEquals("OK", error.message) 45 }) 46 expect(3) 47 yield() // to started coroutine 48 finish(6) 49 } 50 51 @Test 52 fun testBasicUnsubscribe() = runBlocking { 53 expect(1) 54 val observable = rxFlowable<String>(currentDispatcher()) { 55 expect(4) 56 yield() // back to main, will get cancelled 57 expectUnreached() 58 } 59 expect(2) 60 val sub = observable.subscribe({ 61 expectUnreached() 62 }, { 63 expectUnreached() 64 }) 65 expect(3) 66 yield() // to started coroutine 67 expect(5) 68 sub.dispose() // will cancel coroutine 69 yield() 70 finish(6) 71 } 72 73 @Test 74 fun testNotifyOnceOnCancellation() = runTest { 75 expect(1) 76 val observable = 77 rxFlowable(currentDispatcher()) { 78 expect(5) 79 send("OK") 80 try { 81 delay(Long.MAX_VALUE) 82 } catch (e: CancellationException) { 83 expect(11) 84 } 85 } 86 .doOnNext { 87 expect(6) 88 assertEquals("OK", it) 89 } 90 .doOnCancel { 91 expect(10) // notified once! 92 } 93 expect(2) 94 val job = launch(start = CoroutineStart.UNDISPATCHED) { 95 expect(3) 96 observable.collect { 97 expect(8) 98 assertEquals("OK", it) 99 } 100 } 101 expect(4) 102 yield() // to observable code 103 expect(7) 104 yield() // to consuming coroutines 105 expect(9) 106 job.cancel() 107 job.join() 108 finish(12) 109 } 110 111 @Test 112 fun testFailingConsumer() = runTest { 113 val pub = rxFlowable(currentDispatcher()) { 114 repeat(3) { 115 expect(it + 1) // expect(1), expect(2) *should* be invoked 116 send(it) 117 } 118 } 119 try { 120 pub.collect { 121 throw TestException() 122 } 123 } catch (e: TestException) { 124 finish(3) 125 } 126 } 127 }