1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.Disposable
9 import kotlinx.coroutines.CancellableContinuation
10 import kotlinx.coroutines.CancellationException
11 import kotlinx.coroutines.Job
12 import kotlinx.coroutines.suspendCancellableCoroutine
13 import kotlin.coroutines.*
14 
15 // ------------------------ CompletableSource ------------------------
16 
17 /**
18  * Awaits for completion of this completable without blocking a thread.
19  * Returns `Unit` or throws the corresponding exception if this completable had produced error.
20  *
21  * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
22  * suspending function is suspended, this function immediately resumes with [CancellationException].
23  */
24 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
25     subscribe(object : CompletableObserver {
26         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
27         override fun onComplete() { cont.resume(Unit) }
28         override fun onError(e: Throwable) { cont.resumeWithException(e) }
29     })
30 }
31 
32 // ------------------------ MaybeSource ------------------------
33 
34 /**
35  * Awaits for completion of the maybe without blocking a thread.
36  * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
37  * maybe had produced error.
38  *
39  * This suspending function is cancellable.
40  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
41  * immediately resumes with [CancellationException].
42  */
43 @Suppress("UNCHECKED_CAST")
awaitnull44 public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
45 
46 /**
47  * Awaits for completion of the maybe without blocking a thread.
48  * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
49  * maybe had produced error.
50  *
51  * This suspending function is cancellable.
52  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
53  * immediately resumes with [CancellationException].
54  */
55 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
56     subscribe(object : MaybeObserver<T> {
57         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
58         override fun onComplete() { cont.resume(default) }
59         override fun onSuccess(t: T) { cont.resume(t) }
60         override fun onError(error: Throwable) { cont.resumeWithException(error) }
61     })
62 }
63 
64 // ------------------------ SingleSource ------------------------
65 
66 /**
67  * Awaits for completion of the single value without blocking a thread.
68  * Returns the resulting value or throws the corresponding exception if this single had produced error.
69  *
70  * This suspending function is cancellable.
71  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
72  * immediately resumes with [CancellationException].
73  */
awaitnull74 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
75     subscribe(object : SingleObserver<T> {
76         override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
77         override fun onSuccess(t: T) { cont.resume(t) }
78         override fun onError(error: Throwable) { cont.resumeWithException(error) }
79     })
80 }
81 
82 // ------------------------ ObservableSource ------------------------
83 
84 /**
85  * Awaits for the first value from the given observable without blocking a thread.
86  * Returns the resulting value or throws the corresponding exception if this observable had produced error.
87  *
88  * This suspending function is cancellable.
89  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
90  * immediately resumes with [CancellationException].
91  *
92  * @throws NoSuchElementException if observable does not emit any value
93  */
awaitFirstnull94 public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
95 
96 /**
97  * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
98  * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
99  *
100  * This suspending function is cancellable.
101  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
102  * immediately resumes with [CancellationException].
103  */
104 public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
105 
106 /**
107  * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
108  * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
109  *
110  * This suspending function is cancellable.
111  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
112  * immediately resumes with [CancellationException].
113  */
114 public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
115 
116 /**
117  * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
118  * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
119  *
120  * This suspending function is cancellable.
121  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
122  * immediately resumes with [CancellationException].
123  */
124 public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
125 
126 /**
127  * Awaits for the last value from the given observable without blocking a thread.
128  * Returns the resulting value or throws the corresponding exception if this observable had produced error.
129  *
130  * This suspending function is cancellable.
131  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
132  * immediately resumes with [CancellationException].
133  *
134  * @throws NoSuchElementException if observable does not emit any value
135  */
136 public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
137 
138 /**
139  * Awaits for the single value from the given observable without blocking a thread.
140  * Returns the resulting value or throws the corresponding exception if this observable had produced error.
141  *
142  * This suspending function is cancellable.
143  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144  * immediately resumes with [CancellationException].
145  *
146  * @throws NoSuchElementException if observable does not emit any value
147  * @throws IllegalArgumentException if observable emits more than one value
148  */
149 public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
150 
151 // ------------------------ private ------------------------
152 
153 internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
154     invokeOnCancellation { d.dispose() }
155 
156 private enum class Mode(val s: String) {
157     FIRST("awaitFirst"),
158     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
159     LAST("awaitLast"),
160     SINGLE("awaitSingle");
toStringnull161     override fun toString(): String = s
162 }
163 
164 private suspend fun <T> ObservableSource<T>.awaitOne(
165     mode: Mode,
166     default: T? = null
167 ): T = suspendCancellableCoroutine { cont ->
168     subscribe(object : Observer<T> {
169         private lateinit var subscription: Disposable
170         private var value: T? = null
171         private var seenValue = false
172 
173         override fun onSubscribe(sub: Disposable) {
174             subscription = sub
175             cont.invokeOnCancellation { sub.dispose() }
176         }
177 
178         override fun onNext(t: T) {
179             when (mode) {
180                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
181                     if (!seenValue) {
182                         seenValue = true
183                         cont.resume(t)
184                         subscription.dispose()
185                     }
186                 }
187                 Mode.LAST, Mode.SINGLE -> {
188                     if (mode == Mode.SINGLE && seenValue) {
189                         if (cont.isActive)
190                             cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
191                         subscription.dispose()
192                     } else {
193                         value = t
194                         seenValue = true
195                     }
196                 }
197             }
198         }
199 
200         @Suppress("UNCHECKED_CAST")
201         override fun onComplete() {
202             if (seenValue) {
203                 if (cont.isActive) cont.resume(value as T)
204                 return
205             }
206             when {
207                 mode == Mode.FIRST_OR_DEFAULT -> {
208                     cont.resume(default as T)
209                 }
210                 cont.isActive -> {
211                     cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
212                 }
213             }
214         }
215 
216         override fun onError(e: Throwable) {
217             cont.resumeWithException(e)
218         }
219     })
220 }
221 
222