1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.internal.*
11 import org.reactivestreams.*
12 
13 /**
14  * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
15  * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
16 
17  * @param request how many items to request from publisher in advance (optional, one by default).
18  *
19  * This method is deprecated in the favor of [Flow].
20  * Instead of iterating over the resulting channel please use [collect][Flow.collect]:
21  * ```
22  * asFlow().collect { value ->
23  *     // process value
24  * }
25  * ```
26  */
27 @Deprecated(
28     message = "Transforming publisher to channel is deprecated, use asFlow() instead",
29     level = DeprecationLevel.WARNING) // Will be error in 1.4
30 public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
31     val channel = SubscriptionChannel<T>(request)
32     subscribe(channel)
33     return channel
34 }
35 
36 // Will be promoted to error in 1.3.0, removed in 1.4.0
37 @Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
consumeEachnull38 public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit): Unit =
39     openSubscription().consumeEach(action)
40 
41 /**
42  * Subscribes to this [Publisher] and performs the specified action for each received element.
43  * Cancels subscription if any exception happens during collect.
44  */
45 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
46     openSubscription().consumeEach(action)
47 
48 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
49 private class SubscriptionChannel<T>(
50     private val request: Int
51 ) : LinkedListChannel<T>(null), Subscriber<T> {
52     init {
53         require(request >= 0) { "Invalid request size: $request" }
54     }
55 
56     private val _subscription = atomic<Subscription?>(null)
57 
58     // requested from subscription minus number of received minus number of enqueued receivers,
59     // can be negative if we have receivers, but no subscription yet
60     private val _requested = atomic(0)
61 
62     // AbstractChannel overrides
63     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
64     override fun onReceiveEnqueued() {
65         _requested.loop { wasRequested ->
66             val subscription = _subscription.value
67             val needRequested = wasRequested - 1
68             if (subscription != null && needRequested < 0) { // need to request more from subscription
69                 // try to fixup by making request
70                 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
71                     return@loop // continue looping if failed
72                 subscription.request((request - needRequested).toLong())
73                 return
74             }
75             // just do book-keeping
76             if (_requested.compareAndSet(wasRequested, needRequested)) return
77         }
78     }
79 
80     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
81     override fun onReceiveDequeued() {
82         _requested.incrementAndGet()
83     }
84 
85     @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
86     override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
87         _subscription.getAndSet(null)?.cancel() // cancel exactly once
88     }
89 
90     // Subscriber overrides
91     override fun onSubscribe(s: Subscription) {
92         _subscription.value = s
93         while (true) { // lock-free loop on _requested
94             if (isClosedForSend) {
95                 s.cancel()
96                 return
97             }
98             val wasRequested = _requested.value
99             if (wasRequested >= request) return // ok -- normal story
100             // otherwise, receivers came before we had subscription or need to make initial request
101             // try to fixup by making request
102             if (!_requested.compareAndSet(wasRequested, request)) continue
103             s.request((request - wasRequested).toLong())
104             return
105         }
106     }
107 
108     override fun onNext(t: T) {
109         _requested.decrementAndGet()
110         offer(t)
111     }
112 
113     override fun onComplete() {
114         close(cause = null)
115     }
116 
117     override fun onError(e: Throwable) {
118         close(cause = e)
119     }
120 }
121 
122