1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.reactive.*
11 import org.junit.*
12 import org.junit.Test
13 import kotlin.test.*
14 
15 class FlowableContextTest : TestBase() {
16     private val dispatcher = newSingleThreadContext("FlowableContextTest")
17 
18     @After
tearDownnull19     fun tearDown() {
20         dispatcher.close()
21     }
22 
23     @Test
<lambda>null24     fun testFlowableCreateAsFlowThread() = runTest {
25         expect(1)
26         val mainThread = Thread.currentThread()
27         val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
28         assertTrue(dispatcherThread != mainThread)
29         Flowable.create<String>({
30             assertEquals(dispatcherThread, Thread.currentThread())
31             it.onNext("OK")
32             it.onComplete()
33         }, BackpressureStrategy.BUFFER)
34             .asFlow()
35             .flowOn(dispatcher)
36             .collect {
37                 expect(2)
38                 assertEquals("OK", it)
39                 assertEquals(mainThread, Thread.currentThread())
40             }
41         finish(3)
42     }
43 }
44