1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.future
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import java.util.concurrent.*
10 import java.util.function.*
11 import kotlin.coroutines.*
12 
13 /**
14  * Starts a new coroutine and returns its result as an implementation of [CompletableFuture].
15  * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
16  *
17  * The coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with the [context] argument.
18  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
19  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
20  * with corresponding [context] element.
21  *
22  * By default, the coroutine is immediately scheduled for execution.
23  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
24  * A value of [CoroutineStart.LAZY] is not supported
25  * (since `CompletableFuture` framework does not provide the corresponding capability) and
26  * produces [IllegalArgumentException].
27  *
28  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
29  *
30  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
31  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
32  * @param block the coroutine code.
33  */
34 public fun <T> CoroutineScope.future(
35     context: CoroutineContext = EmptyCoroutineContext,
36     start: CoroutineStart = CoroutineStart.DEFAULT,
37     block: suspend CoroutineScope.() -> T
38 ) : CompletableFuture<T> {
39     require(!start.isLazy) { "$start start is not supported" }
40     val newContext = this.newCoroutineContext(context)
41     val future = CompletableFuture<T>()
42     val coroutine = CompletableFutureCoroutine(newContext, future)
43     future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
44     coroutine.start(start, coroutine, block)
45     return future
46 }
47 
48 private class CompletableFutureCoroutine<T>(
49     context: CoroutineContext,
50     private val future: CompletableFuture<T>
51 ) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
acceptnull52     override fun accept(value: T?, exception: Throwable?) {
53         cancel()
54     }
55 
onCompletednull56     override fun onCompleted(value: T) {
57         future.complete(value)
58     }
59 
onCancellednull60     override fun onCancelled(cause: Throwable, handled: Boolean) {
61         if (!future.completeExceptionally(cause) && !handled) {
62             // prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
63             handleCoroutineException(context, cause)
64         }
65     }
66 }
67 
68 /**
69  * Converts this deferred value to the instance of [CompletableFuture].
70  * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
71  */
asCompletableFuturenull72 public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
73     val future = CompletableFuture<T>()
74     setupCancellation(future)
75     invokeOnCompletion {
76         try {
77             future.complete(getCompleted())
78         } catch (t: Throwable) {
79             future.completeExceptionally(t)
80         }
81     }
82     return future
83 }
84 
85 /**
86  * Converts this job to the instance of [CompletableFuture].
87  * The job is cancelled when the resulting future is cancelled or otherwise completed.
88  */
asCompletableFuturenull89 public fun Job.asCompletableFuture(): CompletableFuture<Unit> {
90     val future = CompletableFuture<Unit>()
91     setupCancellation(future)
92     invokeOnCompletion { cause ->
93         if (cause === null) future.complete(Unit)
94         else future.completeExceptionally(cause)
95     }
96     return future
97 }
98 
setupCancellationnull99 private fun Job.setupCancellation(future: CompletableFuture<*>) {
100     future.whenComplete { _, exception ->
101         cancel(exception?.let {
102             it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
103         })
104     }
105 }
106 
107 /**
108  * Converts this completion stage to an instance of [Deferred].
109  * When this completion stage is an instance of [Future], then it is cancelled when
110  * the resulting deferred is cancelled.
111  */
asDeferrednull112 public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
113     // Fast path if already completed
114     if (this is Future<*> && isDone()){
115         return try {
116             @Suppress("UNCHECKED_CAST")
117             CompletableDeferred(get() as T)
118         } catch (e: Throwable) {
119             // unwrap original cause from ExecutionException
120             val original = (e as? ExecutionException)?.cause ?: e
121             CompletableDeferred<T>().also { it.completeExceptionally(original) }
122         }
123     }
124     val result = CompletableDeferred<T>()
125     whenComplete { value, exception ->
126         if (exception == null) {
127             // the future has completed normally
128             result.complete(value)
129         } else {
130             // the future has completed with an exception, unwrap it consistently with fast path
131             // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
132             result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
133         }
134     }
135     if (this is Future<*>) result.cancelFutureOnCompletion(this)
136     return result
137 }
138 
139 /**
140  * Awaits for completion of the completion stage without blocking a thread.
141  *
142  * This suspending function is cancellable.
143  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144  * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
145  * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
146  * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
147  */
awaitnull148 public suspend fun <T> CompletionStage<T>.await(): T {
149     // fast path when CompletableFuture is already done (does not suspend)
150     if (this is Future<*> && isDone()) {
151         try {
152             @Suppress("UNCHECKED_CAST")
153             return get() as T
154         } catch (e: ExecutionException) {
155             throw e.cause ?: e // unwrap original cause from ExecutionException
156         }
157     }
158     // slow path -- suspend
159     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
160         val consumer = ContinuationConsumer(cont)
161         whenComplete(consumer)
162         cont.invokeOnCancellation {
163             // mayInterruptIfRunning is not used
164             (this as? CompletableFuture<T>)?.cancel(false)
165             consumer.cont = null // shall clear reference to continuation to aid GC
166         }
167     }
168 }
169 
170 private class ContinuationConsumer<T>(
171     @Volatile @JvmField var cont: Continuation<T>?
172 ) : BiConsumer<T?, Throwable?> {
173     @Suppress("UNCHECKED_CAST")
acceptnull174     override fun accept(result: T?, exception: Throwable?) {
175         val cont = this.cont ?: return // atomically read current value unless null
176         if (exception == null) {
177             // the future has completed normally
178             cont.resume(result as T)
179         } else {
180             // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
181             cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
182         }
183     }
184 }
185