1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import reactor.core.publisher.*
10 import kotlin.coroutines.*
11 
12 /**
13  * Converts this job to the hot reactive mono that signals
14  * with [success][MonoSink.success] when the corresponding job completes.
15  *
16  * Every subscriber gets the signal at the same time.
17  * Unsubscribing from the resulting mono **does not** affect the original job in any way.
18  *
19  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
20  *    in the future to account for the concept of structured concurrency.
21  *
22  * @param context -- the coroutine context from which the resulting mono is going to be signalled
23  */
24 @ExperimentalCoroutinesApi
<lambda>null25 public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
26 /**
27  * Converts this deferred value to the hot reactive mono that signals
28  * [success][MonoSink.success] or [error][MonoSink.error].
29  *
30  * Every subscriber gets the same completion value.
31  * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
32  *
33  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
34  *    in the future to account for the concept of structured concurrency.
35  *
36  * @param context -- the coroutine context from which the resulting mono is going to be signalled
37  */
38 @ExperimentalCoroutinesApi
<lambda>null39 public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
40 
41 /**
42  * Converts a stream of elements received from the channel to the hot reactive flux.
43  *
44  * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
45  * they'll receive values in round-robin way.
46  * @param context -- the coroutine context from which the resulting flux is going to be signalled
47  */
48 @Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
49     level = DeprecationLevel.WARNING,
50     replaceWith = ReplaceWith("this.consumeAsFlow().asFlux()"))
<lambda>null51 public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) {
52     for (t in this@asFlux)
53         send(t)
54 }
55