1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import java.util.concurrent.*
8 import kotlin.coroutines.*
9 
10 internal actual val DefaultDelay: Delay = DefaultExecutor
11 
12 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
13 internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
14     const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
15 
16     init {
17         incrementUseCount() // this event loop is never completed
18     }
19 
20     private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
21 
22     private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
23         try {
24             java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
25         } catch (e: SecurityException) {
26             DEFAULT_KEEP_ALIVE
27         })
28 
29     @Suppress("ObjectPropertyName")
30     @Volatile
31     private var _thread: Thread? = null
32 
33     override val thread: Thread
34         get() = _thread ?: createThreadSync()
35 
36     private const val FRESH = 0
37     private const val ACTIVE = 1
38     private const val SHUTDOWN_REQ = 2
39     private const val SHUTDOWN_ACK = 3
40 
41     @Volatile
42     private var debugStatus: Int = FRESH
43 
44     private val isShutdownRequested: Boolean get() {
45         val debugStatus = debugStatus
46         return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
47     }
48 
49     /**
50      * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
51      * ```
52      * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
53      * ```
54      *
55      * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
56      * but it's not exposed as public API.
57      */
invokeOnTimeoutnull58     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
59         scheduleInvokeOnTimeout(timeMillis, block)
60 
61     override fun run() {
62         ThreadLocalEventLoop.setEventLoop(this)
63         registerTimeLoopThread()
64         try {
65             var shutdownNanos = Long.MAX_VALUE
66             if (!notifyStartup()) return
67             while (true) {
68                 Thread.interrupted() // just reset interruption flag
69                 var parkNanos = processNextEvent()
70                 if (parkNanos == Long.MAX_VALUE) {
71                     // nothing to do, initialize shutdown timeout
72                     val now = nanoTime()
73                     if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
74                     val tillShutdown = shutdownNanos - now
75                     if (tillShutdown <= 0) return // shut thread down
76                     parkNanos = parkNanos.coerceAtMost(tillShutdown)
77                 } else
78                     shutdownNanos = Long.MAX_VALUE
79                 if (parkNanos > 0) {
80                     // check if shutdown was requested and bail out in this case
81                     if (isShutdownRequested) return
82                     parkNanos(this, parkNanos)
83                 }
84             }
85         } finally {
86             _thread = null // this thread is dead
87             acknowledgeShutdownIfNeeded()
88             unregisterTimeLoopThread()
89             // recheck if queues are empty after _thread reference was set to null (!!!)
90             if (!isEmpty) thread // recreate thread if it is needed
91         }
92     }
93 
94     @Synchronized
createThreadSyncnull95     private fun createThreadSync(): Thread {
96         return _thread ?: Thread(this, THREAD_NAME).apply {
97             _thread = this
98             isDaemon = true
99             start()
100         }
101     }
102 
103     // used for tests
104     @Synchronized
ensureStartednull105     internal fun ensureStarted() {
106         assert { _thread == null } // ensure we are at a clean state
107         assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
108         debugStatus = FRESH
109         createThreadSync() // create fresh thread
110         while (debugStatus == FRESH) (this as Object).wait()
111     }
112 
113     @Synchronized
notifyStartupnull114     private fun notifyStartup(): Boolean {
115         if (isShutdownRequested) return false
116         debugStatus = ACTIVE
117         (this as Object).notifyAll()
118         return true
119     }
120 
121     // used for tests
122     @Synchronized
shutdownnull123     fun shutdown(timeout: Long) {
124         val deadline = System.currentTimeMillis() + timeout
125         if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
126         // loop while there is anything to do immediately or deadline passes
127         while (debugStatus != SHUTDOWN_ACK && _thread != null) {
128             _thread?.let { unpark(it) } // wake up thread if present
129             val remaining = deadline - System.currentTimeMillis()
130             if (remaining <= 0) break
131             (this as Object).wait(timeout)
132         }
133         // restore fresh status
134         debugStatus = FRESH
135     }
136 
137     @Synchronized
acknowledgeShutdownIfNeedednull138     private fun acknowledgeShutdownIfNeeded() {
139         if (!isShutdownRequested) return
140         debugStatus = SHUTDOWN_ACK
141         resetAll() // clear queues
142         (this as Object).notifyAll()
143     }
144 
145     internal val isThreadPresent
146         get() = _thread != null
147 }
148