1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.future
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import java.util.concurrent.*
10 import java.util.function.*
11 import kotlin.coroutines.*
12
13 /**
14 * Starts a new coroutine and returns its result as an implementation of [CompletableFuture].
15 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
16 *
17 * The coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with the [context] argument.
18 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
19 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
20 * with corresponding [context] element.
21 *
22 * By default, the coroutine is immediately scheduled for execution.
23 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
24 * A value of [CoroutineStart.LAZY] is not supported
25 * (since `CompletableFuture` framework does not provide the corresponding capability) and
26 * produces [IllegalArgumentException].
27 *
28 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
29 *
30 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
31 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
32 * @param block the coroutine code.
33 */
34 public fun <T> CoroutineScope.future(
35 context: CoroutineContext = EmptyCoroutineContext,
36 start: CoroutineStart = CoroutineStart.DEFAULT,
37 block: suspend CoroutineScope.() -> T
38 ) : CompletableFuture<T> {
39 require(!start.isLazy) { "$start start is not supported" }
40 val newContext = this.newCoroutineContext(context)
41 val future = CompletableFuture<T>()
42 val coroutine = CompletableFutureCoroutine(newContext, future)
43 future.whenComplete(coroutine) // Cancel coroutine if future was completed externally
44 coroutine.start(start, coroutine, block)
45 return future
46 }
47
48 private class CompletableFutureCoroutine<T>(
49 context: CoroutineContext,
50 private val future: CompletableFuture<T>
51 ) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
acceptnull52 override fun accept(value: T?, exception: Throwable?) {
53 cancel()
54 }
55
onCompletednull56 override fun onCompleted(value: T) {
57 future.complete(value)
58 }
59
onCancellednull60 override fun onCancelled(cause: Throwable, handled: Boolean) {
61 if (!future.completeExceptionally(cause) && !handled) {
62 // prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
63 handleCoroutineException(context, cause)
64 }
65 }
66 }
67
68 /**
69 * Converts this deferred value to the instance of [CompletableFuture].
70 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
71 */
asCompletableFuturenull72 public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
73 val future = CompletableFuture<T>()
74 setupCancellation(future)
75 invokeOnCompletion {
76 try {
77 future.complete(getCompleted())
78 } catch (t: Throwable) {
79 future.completeExceptionally(t)
80 }
81 }
82 return future
83 }
84
85 /**
86 * Converts this job to the instance of [CompletableFuture].
87 * The job is cancelled when the resulting future is cancelled or otherwise completed.
88 */
asCompletableFuturenull89 public fun Job.asCompletableFuture(): CompletableFuture<Unit> {
90 val future = CompletableFuture<Unit>()
91 setupCancellation(future)
92 invokeOnCompletion { cause ->
93 if (cause === null) future.complete(Unit)
94 else future.completeExceptionally(cause)
95 }
96 return future
97 }
98
setupCancellationnull99 private fun Job.setupCancellation(future: CompletableFuture<*>) {
100 future.whenComplete { _, exception ->
101 cancel(exception?.let {
102 it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
103 })
104 }
105 }
106
107 /**
108 * Converts this completion stage to an instance of [Deferred].
109 * When this completion stage is an instance of [Future], then it is cancelled when
110 * the resulting deferred is cancelled.
111 */
asDeferrednull112 public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
113 // Fast path if already completed
114 if (this is Future<*> && isDone()){
115 return try {
116 @Suppress("UNCHECKED_CAST")
117 CompletableDeferred(get() as T)
118 } catch (e: Throwable) {
119 // unwrap original cause from ExecutionException
120 val original = (e as? ExecutionException)?.cause ?: e
121 CompletableDeferred<T>().also { it.completeExceptionally(original) }
122 }
123 }
124 val result = CompletableDeferred<T>()
125 whenComplete { value, exception ->
126 if (exception == null) {
127 // the future has completed normally
128 result.complete(value)
129 } else {
130 // the future has completed with an exception, unwrap it consistently with fast path
131 // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
132 result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
133 }
134 }
135 if (this is Future<*>) result.cancelFutureOnCompletion(this)
136 return result
137 }
138
139 /**
140 * Awaits for completion of the completion stage without blocking a thread.
141 *
142 * This suspending function is cancellable.
143 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
144 * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
145 * This method is intended to be used with one-shot futures, so on coroutine cancellation completion stage is cancelled as well if it is instance of [CompletableFuture].
146 * If cancelling given stage is undesired, `stage.asDeferred().await()` should be used instead.
147 */
awaitnull148 public suspend fun <T> CompletionStage<T>.await(): T {
149 // fast path when CompletableFuture is already done (does not suspend)
150 if (this is Future<*> && isDone()) {
151 try {
152 @Suppress("UNCHECKED_CAST")
153 return get() as T
154 } catch (e: ExecutionException) {
155 throw e.cause ?: e // unwrap original cause from ExecutionException
156 }
157 }
158 // slow path -- suspend
159 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
160 val consumer = ContinuationConsumer(cont)
161 whenComplete(consumer)
162 cont.invokeOnCancellation {
163 // mayInterruptIfRunning is not used
164 (this as? CompletableFuture<T>)?.cancel(false)
165 consumer.cont = null // shall clear reference to continuation to aid GC
166 }
167 }
168 }
169
170 private class ContinuationConsumer<T>(
171 @Volatile @JvmField var cont: Continuation<T>?
172 ) : BiConsumer<T?, Throwable?> {
173 @Suppress("UNCHECKED_CAST")
acceptnull174 override fun accept(result: T?, exception: Throwable?) {
175 val cont = this.cont ?: return // atomically read current value unless null
176 if (exception == null) {
177 // the future has completed normally
178 cont.resume(result as T)
179 } else {
180 // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
181 cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
182 }
183 }
184 }
185