1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import io.reactivex.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14
15 // ------------------------ CompletableSource ------------------------
16
17 /**
18 * Awaits for completion of this completable without blocking a thread.
19 * Returns `Unit` or throws the corresponding exception if this completable had produced error.
20 *
21 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22 * suspending function is suspended, this function immediately resumes with [CancellationException].
23 */
24 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
25 subscribe(object : CompletableObserver {
26 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
27 override fun onComplete() { cont.resume(Unit) }
28 override fun onError(e: Throwable) { cont.resumeWithException(e) }
29 })
30 }
31
32 // ------------------------ MaybeSource ------------------------
33
34 /**
35 * Awaits for completion of the maybe without blocking a thread.
36 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
37 * maybe had produced error.
38 *
39 * This suspending function is cancellable.
40 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
41 * immediately resumes with [CancellationException].
42 */
43 @Suppress("UNCHECKED_CAST")
awaitnull44 public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
45
46 /**
47 * Awaits for completion of the maybe without blocking a thread.
48 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
49 * maybe had produced error.
50 *
51 * This suspending function is cancellable.
52 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
53 * immediately resumes with [CancellationException].
54 */
55 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
56 subscribe(object : MaybeObserver<T> {
57 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
58 override fun onComplete() { cont.resume(default) }
59 override fun onSuccess(t: T) { cont.resume(t) }
60 override fun onError(error: Throwable) { cont.resumeWithException(error) }
61 })
62 }
63
64 // ------------------------ SingleSource ------------------------
65
66 /**
67 * Awaits for completion of the single value without blocking a thread.
68 * Returns the resulting value or throws the corresponding exception if this single had produced error.
69 *
70 * This suspending function is cancellable.
71 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
72 * immediately resumes with [CancellationException].
73 */
awaitnull74 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
75 subscribe(object : SingleObserver<T> {
76 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
77 override fun onSuccess(t: T) { cont.resume(t) }
78 override fun onError(error: Throwable) { cont.resumeWithException(error) }
79 })
80 }
81
82 // ------------------------ ObservableSource ------------------------
83
84 /**
85 * Awaits for the first value from the given observable without blocking a thread.
86 * Returns the resulting value or throws the corresponding exception if this observable had produced error.
87 *
88 * This suspending function is cancellable.
89 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
90 * immediately resumes with [CancellationException].
91 *
92 * @throws NoSuchElementException if observable does not emit any value
93 */
awaitFirstnull94 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
95
96 /**
97 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
98 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
99 *
100 * This suspending function is cancellable.
101 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
102 * immediately resumes with [CancellationException].
103 */
104 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
105
106 /**
107 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
108 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
109 *
110 * This suspending function is cancellable.
111 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
112 * immediately resumes with [CancellationException].
113 */
114 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
115
116 /**
117 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
118 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
119 *
120 * This suspending function is cancellable.
121 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
122 * immediately resumes with [CancellationException].
123 */
124 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
125
126 /**
127 * Awaits for the last value from the given observable without blocking a thread.
128 * Returns the resulting value or throws the corresponding exception if this observable had produced error.
129 *
130 * This suspending function is cancellable.
131 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
132 * immediately resumes with [CancellationException].
133 *
134 * @throws NoSuchElementException if observable does not emit any value
135 */
136 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
137
138 /**
139 * Awaits for the single value from the given observable without blocking a thread.
140 * Returns the resulting value or throws the corresponding exception if this observable had produced error.
141 *
142 * This suspending function is cancellable.
143 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144 * immediately resumes with [CancellationException].
145 *
146 * @throws NoSuchElementException if observable does not emit any value
147 * @throws IllegalArgumentException if observable emits more than one value
148 */
149 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
150
151 // ------------------------ private ------------------------
152
153 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
154 invokeOnCancellation { d.dispose() }
155
156 private enum class Mode(val s: String) {
157 FIRST("awaitFirst"),
158 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
159 LAST("awaitLast"),
160 SINGLE("awaitSingle");
toStringnull161 override fun toString(): String = s
162 }
163
164 private suspend fun <T> ObservableSource<T>.awaitOne(
165 mode: Mode,
166 default: T? = null
167 ): T = suspendCancellableCoroutine { cont ->
168 subscribe(object : Observer<T> {
169 private lateinit var subscription: Disposable
170 private var value: T? = null
171 private var seenValue = false
172
173 override fun onSubscribe(sub: Disposable) {
174 subscription = sub
175 cont.invokeOnCancellation { sub.dispose() }
176 }
177
178 override fun onNext(t: T) {
179 when (mode) {
180 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
181 if (!seenValue) {
182 seenValue = true
183 cont.resume(t)
184 subscription.dispose()
185 }
186 }
187 Mode.LAST, Mode.SINGLE -> {
188 if (mode == Mode.SINGLE && seenValue) {
189 if (cont.isActive)
190 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
191 subscription.dispose()
192 } else {
193 value = t
194 seenValue = true
195 }
196 }
197 }
198 }
199
200 @Suppress("UNCHECKED_CAST")
201 override fun onComplete() {
202 if (seenValue) {
203 if (cont.isActive) cont.resume(value as T)
204 return
205 }
206 when {
207 mode == Mode.FIRST_OR_DEFAULT -> {
208 cont.resume(default as T)
209 }
210 cont.isActive -> {
211 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
212 }
213 }
214 }
215
216 override fun onError(e: Throwable) {
217 cont.resumeWithException(e)
218 }
219 })
220 }
221
222