1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactive
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import org.reactivestreams.*
12
13 /**
14 * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
15 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
16
17 * @param request how many items to request from publisher in advance (optional, one by default).
18 *
19 * This method is deprecated in the favor of [Flow].
20 * Instead of iterating over the resulting channel please use [collect][Flow.collect]:
21 * ```
22 * asFlow().collect { value ->
23 * // process value
24 * }
25 * ```
26 */
27 @Deprecated(
28 message = "Transforming publisher to channel is deprecated, use asFlow() instead",
29 level = DeprecationLevel.WARNING) // Will be error in 1.4
30 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
31 val channel = SubscriptionChannel<T>(request)
32 subscribe(channel)
33 return channel
34 }
35
36 // Will be promoted to error in 1.3.0, removed in 1.4.0
37 @Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
consumeEachnull38 public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit): Unit =
39 openSubscription().consumeEach(action)
40
41 /**
42 * Subscribes to this [Publisher] and performs the specified action for each received element.
43 * Cancels subscription if any exception happens during collect.
44 */
45 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
46 openSubscription().consumeEach(action)
47
48 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
49 private class SubscriptionChannel<T>(
50 private val request: Int
51 ) : LinkedListChannel<T>(null), Subscriber<T> {
52 init {
53 require(request >= 0) { "Invalid request size: $request" }
54 }
55
56 private val _subscription = atomic<Subscription?>(null)
57
58 // requested from subscription minus number of received minus number of enqueued receivers,
59 // can be negative if we have receivers, but no subscription yet
60 private val _requested = atomic(0)
61
62 // AbstractChannel overrides
63 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
64 override fun onReceiveEnqueued() {
65 _requested.loop { wasRequested ->
66 val subscription = _subscription.value
67 val needRequested = wasRequested - 1
68 if (subscription != null && needRequested < 0) { // need to request more from subscription
69 // try to fixup by making request
70 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
71 return@loop // continue looping if failed
72 subscription.request((request - needRequested).toLong())
73 return
74 }
75 // just do book-keeping
76 if (_requested.compareAndSet(wasRequested, needRequested)) return
77 }
78 }
79
80 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
81 override fun onReceiveDequeued() {
82 _requested.incrementAndGet()
83 }
84
85 @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
86 override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
87 _subscription.getAndSet(null)?.cancel() // cancel exactly once
88 }
89
90 // Subscriber overrides
91 override fun onSubscribe(s: Subscription) {
92 _subscription.value = s
93 while (true) { // lock-free loop on _requested
94 if (isClosedForSend) {
95 s.cancel()
96 return
97 }
98 val wasRequested = _requested.value
99 if (wasRequested >= request) return // ok -- normal story
100 // otherwise, receivers came before we had subscription or need to make initial request
101 // try to fixup by making request
102 if (!_requested.compareAndSet(wasRequested, request)) continue
103 s.request((request - wasRequested).toLong())
104 return
105 }
106 }
107
108 override fun onNext(t: T) {
109 _requested.decrementAndGet()
110 offer(t)
111 }
112
113 override fun onComplete() {
114 close(cause = null)
115 }
116
117 override fun onError(e: Throwable) {
118 close(cause = e)
119 }
120 }
121
122