1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.cinterop.*
8 import platform.posix.*
9 import kotlin.coroutines.*
10 
11 /**
12  * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
13  * This function should not be used from coroutine. It is designed to bridge regular blocking code
14  * to libraries that are written in suspending style, to be used in `main` functions and in tests.
15  *
16  * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
17  * in this blocked thread until the completion of this coroutine.
18  * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
19  *
20  * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
21  * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop]
22  * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then
23  * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes.
24  *
25  * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
26  * this `runBlocking` invocation throws [InterruptedException].
27  *
28  * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
29  *
30  * @param context context of the coroutine. The default value is an implementation of [EventLoop].
31  * @param block the coroutine code.
32  */
33 public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
34     val contextInterceptor = context[ContinuationInterceptor]
35     val eventLoop: EventLoop?
36     var newContext: CoroutineContext = context // todo: kludge for data flow analysis error
37     if (contextInterceptor == null) {
38         // create or use private event loop if no dispatcher is specified
39         eventLoop = ThreadLocalEventLoop.eventLoop
40         newContext = GlobalScope.newCoroutineContext(context + eventLoop)
41     } else {
42         // See if context's interceptor is an event loop that we shall use (to support TestContext)
43         // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
44         eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
45             ?: ThreadLocalEventLoop.currentOrNull()
46         newContext = GlobalScope.newCoroutineContext(context)
47     }
48     val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
49     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
50     return coroutine.joinBlocking()
51 }
52 
53 private class BlockingCoroutine<T>(
54     parentContext: CoroutineContext,
55     private val eventLoop: EventLoop?
56 ) : AbstractCoroutine<T>(parentContext, true) {
57     override val isScopedCoroutine: Boolean get() = true
58 
59     @Suppress("UNCHECKED_CAST")
60     fun joinBlocking(): T = memScoped {
61         try {
62             eventLoop?.incrementUseCount()
63             val timespec = alloc<timespec>()
64             while (true) {
65                 val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
66                 // note: process next even may loose unpark flag, so check if completed before parking
67                 if (isCompleted) break
68                 timespec.tv_sec = (parkNanos / 1000000000L).convert() // 1e9 ns -> sec
69                 timespec.tv_nsec = (parkNanos % 1000000000L).convert() // % 1e9
70                 nanosleep(timespec.ptr, null)
71             }
72         } finally { // paranoia
73             eventLoop?.decrementUseCount()
74         }
75         // now return result
76         val state = state
77         (state as? CompletedExceptionally)?.let { throw it.cause }
78         state as T
79     }
80 }
81