1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.cinterop.* 8 import platform.posix.* 9 import kotlin.coroutines.* 10 11 /** 12 * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion. 13 * This function should not be used from coroutine. It is designed to bridge regular blocking code 14 * to libraries that are written in suspending style, to be used in `main` functions and in tests. 15 * 16 * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations 17 * in this blocked thread until the completion of this coroutine. 18 * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. 19 * 20 * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of 21 * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop] 22 * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then 23 * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes. 24 * 25 * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and 26 * this `runBlocking` invocation throws [InterruptedException]. 27 * 28 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. 29 * 30 * @param context context of the coroutine. The default value is an implementation of [EventLoop]. 31 * @param block the coroutine code. 32 */ 33 public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { 34 val contextInterceptor = context[ContinuationInterceptor] 35 val eventLoop: EventLoop? 36 var newContext: CoroutineContext = context // todo: kludge for data flow analysis error 37 if (contextInterceptor == null) { 38 // create or use private event loop if no dispatcher is specified 39 eventLoop = ThreadLocalEventLoop.eventLoop 40 newContext = GlobalScope.newCoroutineContext(context + eventLoop) 41 } else { 42 // See if context's interceptor is an event loop that we shall use (to support TestContext) 43 // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) 44 eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } 45 ?: ThreadLocalEventLoop.currentOrNull() 46 newContext = GlobalScope.newCoroutineContext(context) 47 } 48 val coroutine = BlockingCoroutine<T>(newContext, eventLoop) 49 coroutine.start(CoroutineStart.DEFAULT, coroutine, block) 50 return coroutine.joinBlocking() 51 } 52 53 private class BlockingCoroutine<T>( 54 parentContext: CoroutineContext, 55 private val eventLoop: EventLoop? 56 ) : AbstractCoroutine<T>(parentContext, true) { 57 override val isScopedCoroutine: Boolean get() = true 58 59 @Suppress("UNCHECKED_CAST") 60 fun joinBlocking(): T = memScoped { 61 try { 62 eventLoop?.incrementUseCount() 63 val timespec = alloc<timespec>() 64 while (true) { 65 val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE 66 // note: process next even may loose unpark flag, so check if completed before parking 67 if (isCompleted) break 68 timespec.tv_sec = (parkNanos / 1000000000L).convert() // 1e9 ns -> sec 69 timespec.tv_nsec = (parkNanos % 1000000000L).convert() // % 1e9 70 nanosleep(timespec.ptr, null) 71 } 72 } finally { // paranoia 73 eventLoop?.decrementUseCount() 74 } 75 // now return result 76 val state = state 77 (state as? CompletedExceptionally)?.let { throw it.cause } 78 state as T 79 } 80 } 81