1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6 
7 package kotlinx.coroutines.reactive
8 
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.selects.*
13 import kotlinx.coroutines.sync.*
14 import org.reactivestreams.*
15 import kotlin.coroutines.*
16 import kotlin.internal.LowPriorityInOverloadResolution
17 
18 /**
19  * Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
20  * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
21  * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
22  * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
23  * if coroutine throws an exception or closes channel with a cause.
24  * Unsubscribing cancels running coroutine.
25  *
26  * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
27  * `onNext` is not invoked concurrently.
28  *
29  * Coroutine context can be specified with [context] argument.
30  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
31  * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
32  *
33  * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
34  *        to cancellation and error handling may change in the future.
35  */
36 @ExperimentalCoroutinesApi
37 public fun <T> publish(
38     context: CoroutineContext = EmptyCoroutineContext,
39     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
40 ): Publisher<T> {
41     require(context[Job] === null) { "Publisher context cannot contain job in it." +
42             "Its lifecycle should be managed via subscription. Had $context" }
43     return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
44 }
45 
46 @Deprecated(
47     message = "CoroutineScope.publish is deprecated in favour of top-level publish",
48     level = DeprecationLevel.ERROR,
49     replaceWith = ReplaceWith("publish(context, block)")
50 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
51 @LowPriorityInOverloadResolution
publishnull52 public fun <T> CoroutineScope.publish(
53     context: CoroutineContext = EmptyCoroutineContext,
54     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
55 ): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)
56 
57 /** @suppress For internal use from other reactive integration modules only */
58 @InternalCoroutinesApi
59 public fun <T> publishInternal(
60     scope: CoroutineScope, // support for legacy publish in scope
61     context: CoroutineContext,
62     exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
63     block: suspend ProducerScope<T>.() -> Unit
64 ): Publisher<T> = Publisher { subscriber ->
65     // specification requires NPE on null subscriber
66     if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
67     val newContext = scope.newCoroutineContext(context)
68     val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
69     subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
70     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
71 }
72 
73 private const val CLOSED = -1L    // closed, but have not signalled onCompleted/onError yet
74 private const val SIGNALLED = -2L  // already signalled subscriber onCompleted/onError
tnull75 private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
76 
77 @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
78 @InternalCoroutinesApi
79 public class PublisherCoroutine<in T>(
80     parentContext: CoroutineContext,
81     private val subscriber: Subscriber<T>,
82     private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
83 ) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
84     override val channel: SendChannel<T> get() = this
85 
86     // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
87     private val mutex = Mutex(locked = true)
88     private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
89 
90     @Volatile
91     private var cancelled = false // true when Subscription.cancel() is invoked
92 
93     override val isClosedForSend: Boolean get() = isCompleted
94     override val isFull: Boolean = mutex.isLocked
closenull95     override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
96     override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
97         throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
98 
99     override fun offer(element: T): Boolean {
100         if (!mutex.tryLock()) return false
101         doLockedNext(element)
102         return true
103     }
104 
sendnull105     public override suspend fun send(element: T) {
106         // fast-path -- try send without suspension
107         if (offer(element)) return
108         // slow-path does suspend
109         return sendSuspend(element)
110     }
111 
sendSuspendnull112     private suspend fun sendSuspend(element: T) {
113         mutex.lock()
114         doLockedNext(element)
115     }
116 
117     override val onSend: SelectClause2<T, SendChannel<T>>
118         get() = this
119 
120     // registerSelectSend
121     @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
registerSelectClause2null122     override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
123         mutex.onLock.registerSelectClause2(select, null) {
124             doLockedNext(element)
125             block(this)
126         }
127     }
128 
129     /*
130      * This code is not trivial because of the two properties:
131      * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
132      *    be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
133      *    coroutines are invoking `send` function.
134      * 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
135      *    However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
136      *    globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
137      *    lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
138      *    `onComplete/onError` is also done under the same mutex.
139      */
140 
141     // assert: mutex.isLocked()
doLockedNextnull142     private fun doLockedNext(elem: T) {
143         // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked,
144         // because the job is cancelled, so this check also ensure conformance to the reactive specification's
145         // requirement that after cancellation requested we don't call onXXX
146         if (!isActive) {
147             unlockAndCheckCompleted()
148             throw getCancellationException()
149         }
150         // notify subscriber
151         try {
152             subscriber.onNext(elem)
153         } catch (e: Throwable) {
154             // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
155             // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
156             // this failure is essentially equivalent to a failure of a child coroutine.
157             cancelCoroutine(e)
158             unlockAndCheckCompleted()
159             throw e
160         }
161         // now update nRequested
162         while (true) { // lock-free loop on nRequested
163             val current = _nRequested.value
164             if (current < 0) break // closed from inside onNext => unlock
165             if (current == Long.MAX_VALUE) break // no back-pressure => unlock
166             val updated = current - 1
167             if (_nRequested.compareAndSet(current, updated)) {
168                 if (updated == 0L) {
169                     // return to keep locked due to back-pressure
170                     return
171                 }
172                 break // unlock if updated > 0
173             }
174         }
175         unlockAndCheckCompleted()
176     }
177 
unlockAndCheckCompletednull178     private fun unlockAndCheckCompleted() {
179        /*
180         * There is no sense to check completion before doing `unlock`, because completion might
181         * happen after this check and before `unlock` (see `signalCompleted` that does not do anything
182         * if it fails to acquire the lock that we are still holding).
183         * We have to recheck `isCompleted` after `unlock` anyway.
184         */
185         mutex.unlock()
186         // check isCompleted and and try to regain lock to signal completion
187         if (isCompleted && mutex.tryLock()) {
188             doLockedSignalCompleted(completionCause, completionCauseHandled)
189         }
190     }
191 
192     // assert: mutex.isLocked() & isCompleted
doLockedSignalCompletednull193     private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
194         try {
195             if (_nRequested.value >= CLOSED) {
196                 _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
197                 // Specification requires that after cancellation requested we don't call onXXX
198                 if (cancelled) {
199                     // If the parent had failed to handle our exception, then we must not lose this exception
200                     if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
201                     return
202                 }
203 
204                 try {
205                     if (cause != null && cause !is CancellationException) {
206                         /*
207                          * Reactive frameworks have two types of exceptions: regular and fatal.
208                          * Regular are passed to onError.
209                          * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
210                          * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
211                          * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
212                          * thrown by subscriber or upstream).
213                          * To make behaviour consistent and least surprising, we always handle fatal exceptions
214                          * by coroutines machinery, anyway, they should not be present in regular program flow,
215                          * thus our goal here is just to expose it as soon as possible.
216                          */
217                         subscriber.onError(cause)
218                         if (!handled && cause.isFatal()) {
219                             exceptionOnCancelHandler(cause, context)
220                         }
221                     } else {
222                         subscriber.onComplete()
223                     }
224                 } catch (e: Throwable) {
225                     handleCoroutineException(context, e)
226                 }
227             }
228         } finally {
229             mutex.unlock()
230         }
231     }
232 
requestnull233     override fun request(n: Long) {
234         if (n <= 0) {
235             // Specification requires IAE for n <= 0
236             cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
237             return
238         }
239         while (true) { // lock-free loop for nRequested
240             val cur = _nRequested.value
241             if (cur < 0) return // already closed for send, ignore requests
242             var upd = cur + n
243             if (upd < 0 || n == Long.MAX_VALUE)
244                 upd = Long.MAX_VALUE
245             if (cur == upd) return // nothing to do
246             if (_nRequested.compareAndSet(cur, upd)) {
247                 // unlock the mutex when we don't have back-pressure anymore
248                 if (cur == 0L) {
249                     unlockAndCheckCompleted()
250                 }
251                 return
252             }
253         }
254     }
255 
256     // assert: isCompleted
signalCompletednull257     private fun signalCompleted(cause: Throwable?, handled: Boolean) {
258         while (true) { // lock-free loop for nRequested
259             val current = _nRequested.value
260             if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
261             check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
262             if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
263             // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
264             if (current == 0L) {
265                 doLockedSignalCompleted(cause, handled)
266             } else {
267                 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
268                 if (mutex.tryLock()) doLockedSignalCompleted(cause, handled)
269                 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
270             }
271             return // done anyway
272         }
273     }
274 
onCompletednull275     override fun onCompleted(value: Unit) {
276         signalCompleted(null, false)
277     }
278 
onCancellednull279     override fun onCancelled(cause: Throwable, handled: Boolean) {
280         signalCompleted(cause, handled)
281     }
282 
cancelnull283     override fun cancel() {
284         // Specification requires that after cancellation publisher stops signalling
285         // This flag distinguishes subscription cancellation request from the job crash
286         cancelled = true
287         super.cancel(null)
288     }
289 
Throwablenull290     private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
291 }
292