1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6 
7 package kotlinx.coroutines.rx2
8 
9 import io.reactivex.*
10 import kotlinx.coroutines.*
11 import kotlin.coroutines.*
12 import kotlin.internal.*
13 
14 /**
15  * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
16  * Every time the returned observable is subscribed, it starts a new coroutine.
17  * Unsubscribing cancels running coroutine.
18  * Coroutine context can be specified with [context] argument.
19  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
20  * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
21  */
22 public fun <T : Any> rxSingle(
23     context: CoroutineContext = EmptyCoroutineContext,
24     block: suspend CoroutineScope.() -> T
25 ): Single<T> {
26     require(context[Job] === null) { "Single context cannot contain job in it." +
27             "Its lifecycle should be managed via Disposable handle. Had $context" }
28     return rxSingleInternal(GlobalScope, context, block)
29 }
30 
31 @Deprecated(
32     message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
33     level = DeprecationLevel.ERROR,
34     replaceWith = ReplaceWith("rxSingle(context, block)")
35 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
36 @LowPriorityInOverloadResolution
rxSinglenull37 public fun <T : Any> CoroutineScope.rxSingle(
38     context: CoroutineContext = EmptyCoroutineContext,
39     block: suspend CoroutineScope.() -> T
40 ): Single<T> = rxSingleInternal(this, context, block)
41 
42 private fun <T : Any> rxSingleInternal(
43     scope: CoroutineScope, // support for legacy rxSingle in scope
44     context: CoroutineContext,
45     block: suspend CoroutineScope.() -> T
46 ): Single<T> = Single.create { subscriber ->
47     val newContext = scope.newCoroutineContext(context)
48     val coroutine = RxSingleCoroutine(newContext, subscriber)
49     subscriber.setCancellable(RxCancellable(coroutine))
50     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
51 }
52 
53 private class RxSingleCoroutine<T: Any>(
54     parentContext: CoroutineContext,
55     private val subscriber: SingleEmitter<T>
56 ) : AbstractCoroutine<T>(parentContext, true) {
onCompletednull57     override fun onCompleted(value: T) {
58         try {
59             subscriber.onSuccess(value)
60         } catch (e: Throwable) {
61             handleUndeliverableException(e, context)
62         }
63     }
64 
onCancellednull65     override fun onCancelled(cause: Throwable, handled: Boolean) {
66         try {
67             if (!subscriber.tryOnError(cause)) {
68                 handleUndeliverableException(cause, context)
69             }
70         } catch (e: Throwable) {
71             handleUndeliverableException(e, context)
72         }
73     }
74 }
75