1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import org.junit.*
9 import org.reactivestreams.*
10 
11 class PublisherBackpressureTest : TestBase() {
12     @Test
<lambda>null13     fun testCancelWhileBPSuspended() = runBlocking {
14         expect(1)
15         val observable = publish(currentDispatcher()) {
16             expect(5)
17             send("A") // will not suspend, because an item was requested
18             expect(7)
19             send("B") // second requested item
20             expect(9)
21             try {
22                 send("C") // will suspend (no more requested)
23             } finally {
24                 expect(12)
25             }
26             expectUnreached()
27         }
28         expect(2)
29         var sub: Subscription? = null
30         observable.subscribe(object : Subscriber<String> {
31             override fun onSubscribe(s: Subscription) {
32                 sub = s
33                 expect(3)
34                 s.request(2) // request two items
35             }
36 
37             override fun onNext(t: String) {
38                 when (t) {
39                     "A" -> expect(6)
40                     "B" -> expect(8)
41                     else -> error("Should not happen")
42                 }
43             }
44 
45             override fun onComplete() {
46                 expectUnreached()
47             }
48 
49             override fun onError(e: Throwable) {
50                 expectUnreached()
51             }
52         })
53         expect(4)
54         yield() // yield to observable coroutine
55         expect(10)
56         sub!!.cancel() // now unsubscribe -- shall cancel coroutine (& do not signal)
57         expect(11)
58         yield() // shall perform finally in coroutine
59         finish(13)
60     }
61 }