1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import java.util.concurrent.* 8 import kotlin.coroutines.* 9 10 internal actual val DefaultDelay: Delay = DefaultExecutor 11 12 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") 13 internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { 14 const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor" 15 16 init { 17 incrementUseCount() // this event loop is never completed 18 } 19 20 private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds 21 22 private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( 23 try { 24 java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) 25 } catch (e: SecurityException) { 26 DEFAULT_KEEP_ALIVE 27 }) 28 29 @Suppress("ObjectPropertyName") 30 @Volatile 31 private var _thread: Thread? = null 32 33 override val thread: Thread 34 get() = _thread ?: createThreadSync() 35 36 private const val FRESH = 0 37 private const val ACTIVE = 1 38 private const val SHUTDOWN_REQ = 2 39 private const val SHUTDOWN_ACK = 3 40 41 @Volatile 42 private var debugStatus: Int = FRESH 43 44 private val isShutdownRequested: Boolean get() { 45 val debugStatus = debugStatus 46 return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK 47 } 48 49 /** 50 * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on 51 * ``` 52 * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } 53 * ``` 54 * 55 * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), 56 * but it's not exposed as public API. 57 */ invokeOnTimeoutnull58 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = 59 scheduleInvokeOnTimeout(timeMillis, block) 60 61 override fun run() { 62 ThreadLocalEventLoop.setEventLoop(this) 63 registerTimeLoopThread() 64 try { 65 var shutdownNanos = Long.MAX_VALUE 66 if (!notifyStartup()) return 67 while (true) { 68 Thread.interrupted() // just reset interruption flag 69 var parkNanos = processNextEvent() 70 if (parkNanos == Long.MAX_VALUE) { 71 // nothing to do, initialize shutdown timeout 72 val now = nanoTime() 73 if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS 74 val tillShutdown = shutdownNanos - now 75 if (tillShutdown <= 0) return // shut thread down 76 parkNanos = parkNanos.coerceAtMost(tillShutdown) 77 } else 78 shutdownNanos = Long.MAX_VALUE 79 if (parkNanos > 0) { 80 // check if shutdown was requested and bail out in this case 81 if (isShutdownRequested) return 82 parkNanos(this, parkNanos) 83 } 84 } 85 } finally { 86 _thread = null // this thread is dead 87 acknowledgeShutdownIfNeeded() 88 unregisterTimeLoopThread() 89 // recheck if queues are empty after _thread reference was set to null (!!!) 90 if (!isEmpty) thread // recreate thread if it is needed 91 } 92 } 93 94 @Synchronized createThreadSyncnull95 private fun createThreadSync(): Thread { 96 return _thread ?: Thread(this, THREAD_NAME).apply { 97 _thread = this 98 isDaemon = true 99 start() 100 } 101 } 102 103 // used for tests 104 @Synchronized ensureStartednull105 internal fun ensureStarted() { 106 assert { _thread == null } // ensure we are at a clean state 107 assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK } 108 debugStatus = FRESH 109 createThreadSync() // create fresh thread 110 while (debugStatus == FRESH) (this as Object).wait() 111 } 112 113 @Synchronized notifyStartupnull114 private fun notifyStartup(): Boolean { 115 if (isShutdownRequested) return false 116 debugStatus = ACTIVE 117 (this as Object).notifyAll() 118 return true 119 } 120 121 // used for tests 122 @Synchronized shutdownnull123 fun shutdown(timeout: Long) { 124 val deadline = System.currentTimeMillis() + timeout 125 if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ 126 // loop while there is anything to do immediately or deadline passes 127 while (debugStatus != SHUTDOWN_ACK && _thread != null) { 128 _thread?.let { unpark(it) } // wake up thread if present 129 val remaining = deadline - System.currentTimeMillis() 130 if (remaining <= 0) break 131 (this as Object).wait(timeout) 132 } 133 // restore fresh status 134 debugStatus = FRESH 135 } 136 137 @Synchronized acknowledgeShutdownIfNeedednull138 private fun acknowledgeShutdownIfNeeded() { 139 if (!isShutdownRequested) return 140 debugStatus = SHUTDOWN_ACK 141 resetAll() // clear queues 142 (this as Object).notifyAll() 143 } 144 145 internal val isThreadPresent 146 get() = _thread != null 147 } 148