1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 
12 /**
13  * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
14  * be asked to process next event from their event queue.
15  *
16  * It may optionally implement [Delay] interface and support time-scheduled tasks.
17  * It is created or pigged back onto (see [ThreadLocalEventLoop])
18  * by `runBlocking` and by [Dispatchers.Unconfined].
19  *
20  * @suppress **This an internal API and should not be used from general code.**
21  */
22 internal abstract class EventLoop : CoroutineDispatcher() {
23     /**
24      * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
25      */
26     private var useCount = 0L
27 
28     /**
29      * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
30      * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
31      * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
32      */
33     private var shared = false
34 
35     /**
36      * Queue used by [Dispatchers.Unconfined] tasks.
37      * These tasks are thread-local for performance and take precedence over the rest of the queue.
38      */
39     private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
40 
41     /**
42      * Processes next event in this event loop.
43      *
44      * The result of this function is to be interpreted like this:
45      * * `<= 0` -- there are potentially more events for immediate processing;
46      * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
47      * * [Long.MAX_VALUE] -- no more events.
48      *
49      * **NOTE**: Must be invoked only from the event loop's thread
50      *          (no check for performance reasons, may be added in the future).
51      */
52     public open fun processNextEvent(): Long {
53         if (!processUnconfinedEvent()) return Long.MAX_VALUE
54         return nextTime
55     }
56 
57     protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
58 
59     protected open val nextTime: Long
60         get() {
61             val queue = unconfinedQueue ?: return Long.MAX_VALUE
62             return if (queue.isEmpty) Long.MAX_VALUE else 0L
63         }
64 
65     public fun processUnconfinedEvent(): Boolean {
66         val queue = unconfinedQueue ?: return false
67         val task = queue.removeFirstOrNull() ?: return false
68         task.run()
69         return true
70     }
71     /**
72      * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
73      * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
74      * By default, event loop implementation is thread-local and should not processed in the context
75      * (current thread's event loop should be processed instead).
76      */
77     public open fun shouldBeProcessedFromContext(): Boolean = false
78 
79     /**
80      * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
81      * into the current event loop.
82      */
83     public fun dispatchUnconfined(task: DispatchedTask<*>) {
84         val queue = unconfinedQueue ?:
85             ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
86         queue.addLast(task)
87     }
88 
89     public val isActive: Boolean
90         get() = useCount > 0
91 
92     public val isUnconfinedLoopActive: Boolean
93         get() = useCount >= delta(unconfined = true)
94 
95     // May only be used from the event loop's thread
96     public val isUnconfinedQueueEmpty: Boolean
97         get() = unconfinedQueue?.isEmpty ?: true
98 
99     private fun delta(unconfined: Boolean) =
100         if (unconfined) (1L shl 32) else 1L
101 
102     fun incrementUseCount(unconfined: Boolean = false) {
103         useCount += delta(unconfined)
104         if (!unconfined) shared = true
105     }
106 
107     fun decrementUseCount(unconfined: Boolean = false) {
108         useCount -= delta(unconfined)
109         if (useCount > 0) return
110         assert { useCount == 0L } // "Extra decrementUseCount"
111         if (shared) {
112             // shut it down and remove from ThreadLocalEventLoop
113             shutdown()
114         }
115     }
116 
117     protected open fun shutdown() {}
118 }
119 
120 @NativeThreadLocal
121 internal object ThreadLocalEventLoop {
122     private val ref = CommonThreadLocal<EventLoop?>()
123 
124     internal val eventLoop: EventLoop
125         get() = ref.get() ?: createEventLoop().also { ref.set(it) }
126 
127     internal fun currentOrNull(): EventLoop? =
128         ref.get()
129 
130     internal fun resetEventLoop() {
131         ref.set(null)
132     }
133 
134     internal fun setEventLoop(eventLoop: EventLoop) {
135         ref.set(eventLoop)
136     }
137 }
138 
139 @SharedImmutable
140 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141 
142 // results for scheduleImpl
143 private const val SCHEDULE_OK = 0
144 private const val SCHEDULE_COMPLETED = 1
145 private const val SCHEDULE_DISPOSED = 2
146 
147 private const val MS_TO_NS = 1_000_000L
148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149 
150 /**
151  * First-line overflow protection -- limit maximal delay.
152  * Delays longer than this one (~146 years) are considered to be delayed "forever".
153  */
154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
155 
156 internal fun delayToNanos(timeMillis: Long): Long = when {
157     timeMillis <= 0 -> 0L
158     timeMillis >= MAX_MS -> Long.MAX_VALUE
159     else -> timeMillis * MS_TO_NS
160 }
161 
162 internal fun delayNanosToMillis(timeNanos: Long): Long =
163     timeNanos / MS_TO_NS
164 
165 @SharedImmutable
166 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
167 
168 private typealias Queue<T> = LockFreeTaskQueueCore<T>
169 
170 internal expect abstract class EventLoopImplPlatform() : EventLoop {
171     // Called to unpark this event loop's thread
172     protected fun unpark()
173 
174     // Called to reschedule to DefaultExecutor when this event loop is complete
175     protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
176 }
177 
178 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
179     // null | CLOSED_EMPTY | task | Queue<Runnable>
180     private val _queue = atomic<Any?>(null)
181 
182     // Allocated only only once
183     private val _delayed = atomic<DelayedTaskQueue?>(null)
184 
185     @Volatile
186     private var isCompleted = false
187 
188     override val isEmpty: Boolean get() {
189         if (!isUnconfinedQueueEmpty) return false
190         val delayed = _delayed.value
191         if (delayed != null && !delayed.isEmpty) return false
192         val queue = _queue.value
193         return when (queue) {
194             null -> true
195             is Queue<*> -> queue.isEmpty
196             else -> queue === CLOSED_EMPTY
197         }
198     }
199 
200     protected override val nextTime: Long
201         get() {
202             if (super.nextTime == 0L) return 0L
203             val queue = _queue.value
204             when {
205                 queue === null -> {} // empty queue -- proceed
206                 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
207                 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
208                 else -> return 0 // non-empty queue
209             }
210             val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
211             return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
212         }
213 
214     override fun shutdown() {
215         // Clean up thread-local reference here -- this event loop is shutting down
216         ThreadLocalEventLoop.resetEventLoop()
217         // We should signal that this event loop should not accept any more tasks
218         // and process queued events (that could have been added after last processNextEvent)
219         isCompleted = true
220         closeQueue()
221         // complete processing of all queued tasks
222         while (processNextEvent() <= 0) { /* spin */ }
223         // reschedule the rest of delayed tasks
224         rescheduleAllDelayed()
225     }
226 
227     public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
228         val timeNanos = delayToNanos(timeMillis)
229         if (timeNanos < MAX_DELAY_NS) {
230             val now = nanoTime()
231             DelayedResumeTask(now + timeNanos, continuation).also { task ->
232                 continuation.disposeOnCancellation(task)
233                 schedule(now, task)
234             }
235         }
236     }
237 
238     protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
239         val timeNanos = delayToNanos(timeMillis)
240         return if (timeNanos < MAX_DELAY_NS) {
241             val now = nanoTime()
242             DelayedRunnableTask(now + timeNanos, block).also { task ->
243                 schedule(now, task)
244             }
245         } else {
246             NonDisposableHandle
247         }
248     }
249 
250     override fun processNextEvent(): Long {
251         // unconfined events take priority
252         if (processUnconfinedEvent()) return nextTime
253         // queue all delayed tasks that are due to be executed
254         val delayed = _delayed.value
255         if (delayed != null && !delayed.isEmpty) {
256             val now = nanoTime()
257             while (true) {
258                 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
259                 // to make sure that 'isEmpty' and `nextTime` that check both of them
260                 // do not transiently report that both delayed and queue are empty during move
261                 delayed.removeFirstIf {
262                     if (it.timeToExecute(now)) {
263                         enqueueImpl(it)
264                     } else
265                         false
266                 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
267             }
268         }
269         // then process one event from queue
270         dequeue()?.run()
271         return nextTime
272     }
273 
274     public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
275 
276     public fun enqueue(task: Runnable) {
277         if (enqueueImpl(task)) {
278             // todo: we should unpark only when this delayed task became first in the queue
279             unpark()
280         } else {
281             DefaultExecutor.enqueue(task)
282         }
283     }
284 
285     @Suppress("UNCHECKED_CAST")
286     private fun enqueueImpl(task: Runnable): Boolean {
287         _queue.loop { queue ->
288             if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
289             when (queue) {
290                 null -> if (_queue.compareAndSet(null, task)) return true
291                 is Queue<*> -> {
292                     when ((queue as Queue<Runnable>).addLast(task)) {
293                         Queue.ADD_SUCCESS -> return true
294                         Queue.ADD_CLOSED -> return false
295                         Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
296                     }
297                 }
298                 else -> when {
299                     queue === CLOSED_EMPTY -> return false
300                     else -> {
301                         // update to full-blown queue to add one more
302                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
303                         newQueue.addLast(queue as Runnable)
304                         newQueue.addLast(task)
305                         if (_queue.compareAndSet(queue, newQueue)) return true
306                     }
307                 }
308             }
309         }
310     }
311 
312     @Suppress("UNCHECKED_CAST")
313     private fun dequeue(): Runnable? {
314         _queue.loop { queue ->
315             when (queue) {
316                 null -> return null
317                 is Queue<*> -> {
318                     val result = (queue as Queue<Runnable>).removeFirstOrNull()
319                     if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
320                     _queue.compareAndSet(queue, queue.next())
321                 }
322                 else -> when {
323                     queue === CLOSED_EMPTY -> return null
324                     else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
325                 }
326             }
327         }
328     }
329 
330     private fun closeQueue() {
331         assert { isCompleted }
332         _queue.loop { queue ->
333             when (queue) {
334                 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
335                 is Queue<*> -> {
336                     queue.close()
337                     return
338                 }
339                 else -> when {
340                     queue === CLOSED_EMPTY -> return
341                     else -> {
342                         // update to full-blown queue to close
343                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
344                         newQueue.addLast(queue as Runnable)
345                         if (_queue.compareAndSet(queue, newQueue)) return
346                     }
347                 }
348             }
349         }
350 
351     }
352 
353     public fun schedule(now: Long, delayedTask: DelayedTask) {
354         when (scheduleImpl(now, delayedTask)) {
355             SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
356             SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
357             SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
358             else -> error("unexpected result")
359         }
360     }
361 
362     private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
363 
364     private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
365         if (isCompleted) return SCHEDULE_COMPLETED
366         val delayedQueue = _delayed.value ?: run {
367             _delayed.compareAndSet(null, DelayedTaskQueue(now))
368             _delayed.value!!
369         }
370         return delayedTask.scheduleTask(now, delayedQueue, this)
371     }
372 
373     // It performs "hard" shutdown for test cleanup purposes
374     protected fun resetAll() {
375         _queue.value = null
376         _delayed.value = null
377     }
378 
379     // This is a "soft" (normal) shutdown
380     private fun rescheduleAllDelayed() {
381         val now = nanoTime()
382         while (true) {
383             /*
384              * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
385              * synchronized on DelayedTask itself. All other operation are synchronized both on
386              * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
387              * first removes DelayedTask from the heap (under synchronization) then
388              * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
389              */
390             val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
391             reschedule(now, delayedTask)
392         }
393     }
394 
395     internal abstract class DelayedTask(
396         /**
397          * This field can be only modified in [scheduleTask] before putting this DelayedTask
398          * into heap to avoid overflow and corruption of heap data structure.
399          */
400         @JvmField var nanoTime: Long
401     ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
402         private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
403 
404         override var heap: ThreadSafeHeap<*>?
405             get() = _heap as? ThreadSafeHeap<*>
406             set(value) {
407                 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
408                 _heap = value
409             }
410 
411         override var index: Int = -1
412 
413         override fun compareTo(other: DelayedTask): Int {
414             val dTime = nanoTime - other.nanoTime
415             return when {
416                 dTime > 0 -> 1
417                 dTime < 0 -> -1
418                 else -> 0
419             }
420         }
421 
422         fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
423 
424         @Synchronized
425         fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
426             if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
427             delayed.addLastIf(this) { firstTask ->
428                 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
429                 /**
430                  * We are about to add new task and we have to make sure that [DelayedTaskQueue]
431                  * invariant is maintained. The code in this lambda is additionally executed under
432                  * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
433                  */
434                 if (firstTask == null) {
435                     /**
436                      * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
437                      * the current now time even if that means "going backwards in time". This makes the structure
438                      * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
439                      * are removed from the delayed queue for execution.
440                      */
441                     delayed.timeNow = now
442                 } else {
443                     /**
444                      * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
445                      * and only goes forward in time. We cannot let it go backwards in time or invariant can be
446                      * violated for tasks that were already scheduled.
447                      */
448                     val firstTime = firstTask.nanoTime
449                     // compute min(now, firstTime) using a wrap-safe check
450                     val minTime = if (firstTime - now >= 0) now else firstTime
451                     // update timeNow only when going forward in time
452                     if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
453                 }
454                 /**
455                  * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
456                  * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
457                  * function can be called to reschedule from one queue to another and this might be another reason
458                  * where new task's time might now violate invariant.
459                  * We correct invariant violation (if any) by simply changing this task's time to now.
460                  */
461                 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
462                 true
463             }
464             return SCHEDULE_OK
465         }
466 
467         @Synchronized
468         final override fun dispose() {
469             val heap = _heap
470             if (heap === DISPOSED_TASK) return // already disposed
471             @Suppress("UNCHECKED_CAST")
472             (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
473             _heap = DISPOSED_TASK // never add again to any heap
474         }
475 
476         override fun toString(): String = "Delayed[nanos=$nanoTime]"
477     }
478 
479     private inner class DelayedResumeTask(
480         nanoTime: Long,
481         private val cont: CancellableContinuation<Unit>
482     ) : DelayedTask(nanoTime) {
483         override fun run() { with(cont) { resumeUndispatched(Unit) } }
484         override fun toString(): String = super.toString() + cont.toString()
485     }
486 
487     private class DelayedRunnableTask(
488         nanoTime: Long,
489         private val block: Runnable
490     ) : DelayedTask(nanoTime) {
491         override fun run() { block.run() }
492         override fun toString(): String = super.toString() + block.toString()
493     }
494 
495     /**
496      * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
497      * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
498      * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
499      * The invariant is that for every scheduled [DelayedTask]:
500      *
501      * ```
502      * delayedTask.nanoTime - timeNow >= 0
503      * ```
504      *
505      * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
506      * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
507      * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
508      * (so there is nothing special to do there).
509      */
510     internal class DelayedTaskQueue(
511         @JvmField var timeNow: Long
512     ) : ThreadSafeHeap<DelayedTask>()
513 }
514 
515 internal expect fun createEventLoop(): EventLoop
516 
517 internal expect fun nanoTime(): Long
518 
519 internal expect object DefaultExecutor {
520     public fun enqueue(task: Runnable)
521 }
522 
523