1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.internal.* 9 import kotlin.coroutines.* 10 import kotlin.jvm.* 11 12 /** 13 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can 14 * be asked to process next event from their event queue. 15 * 16 * It may optionally implement [Delay] interface and support time-scheduled tasks. 17 * It is created or pigged back onto (see [ThreadLocalEventLoop]) 18 * by `runBlocking` and by [Dispatchers.Unconfined]. 19 * 20 * @suppress **This an internal API and should not be used from general code.** 21 */ 22 internal abstract class EventLoop : CoroutineDispatcher() { 23 /** 24 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. 25 */ 26 private var useCount = 0L 27 28 /** 29 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so 30 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely 31 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time. 32 */ 33 private var shared = false 34 35 /** 36 * Queue used by [Dispatchers.Unconfined] tasks. 37 * These tasks are thread-local for performance and take precedence over the rest of the queue. 38 */ 39 private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null 40 41 /** 42 * Processes next event in this event loop. 43 * 44 * The result of this function is to be interpreted like this: 45 * * `<= 0` -- there are potentially more events for immediate processing; 46 * * `> 0` -- a number of nanoseconds to wait for next scheduled event; 47 * * [Long.MAX_VALUE] -- no more events. 48 * 49 * **NOTE**: Must be invoked only from the event loop's thread 50 * (no check for performance reasons, may be added in the future). 51 */ 52 public open fun processNextEvent(): Long { 53 if (!processUnconfinedEvent()) return Long.MAX_VALUE 54 return nextTime 55 } 56 57 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty 58 59 protected open val nextTime: Long 60 get() { 61 val queue = unconfinedQueue ?: return Long.MAX_VALUE 62 return if (queue.isEmpty) Long.MAX_VALUE else 0L 63 } 64 65 public fun processUnconfinedEvent(): Boolean { 66 val queue = unconfinedQueue ?: return false 67 val task = queue.removeFirstOrNull() ?: return false 68 task.run() 69 return true 70 } 71 /** 72 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context 73 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). 74 * By default, event loop implementation is thread-local and should not processed in the context 75 * (current thread's event loop should be processed instead). 76 */ 77 public open fun shouldBeProcessedFromContext(): Boolean = false 78 79 /** 80 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] 81 * into the current event loop. 82 */ 83 public fun dispatchUnconfined(task: DispatchedTask<*>) { 84 val queue = unconfinedQueue ?: 85 ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } 86 queue.addLast(task) 87 } 88 89 public val isActive: Boolean 90 get() = useCount > 0 91 92 public val isUnconfinedLoopActive: Boolean 93 get() = useCount >= delta(unconfined = true) 94 95 // May only be used from the event loop's thread 96 public val isUnconfinedQueueEmpty: Boolean 97 get() = unconfinedQueue?.isEmpty ?: true 98 99 private fun delta(unconfined: Boolean) = 100 if (unconfined) (1L shl 32) else 1L 101 102 fun incrementUseCount(unconfined: Boolean = false) { 103 useCount += delta(unconfined) 104 if (!unconfined) shared = true 105 } 106 107 fun decrementUseCount(unconfined: Boolean = false) { 108 useCount -= delta(unconfined) 109 if (useCount > 0) return 110 assert { useCount == 0L } // "Extra decrementUseCount" 111 if (shared) { 112 // shut it down and remove from ThreadLocalEventLoop 113 shutdown() 114 } 115 } 116 117 protected open fun shutdown() {} 118 } 119 120 @NativeThreadLocal 121 internal object ThreadLocalEventLoop { 122 private val ref = CommonThreadLocal<EventLoop?>() 123 124 internal val eventLoop: EventLoop 125 get() = ref.get() ?: createEventLoop().also { ref.set(it) } 126 127 internal fun currentOrNull(): EventLoop? = 128 ref.get() 129 130 internal fun resetEventLoop() { 131 ref.set(null) 132 } 133 134 internal fun setEventLoop(eventLoop: EventLoop) { 135 ref.set(eventLoop) 136 } 137 } 138 139 @SharedImmutable 140 private val DISPOSED_TASK = Symbol("REMOVED_TASK") 141 142 // results for scheduleImpl 143 private const val SCHEDULE_OK = 0 144 private const val SCHEDULE_COMPLETED = 1 145 private const val SCHEDULE_DISPOSED = 2 146 147 private const val MS_TO_NS = 1_000_000L 148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS 149 150 /** 151 * First-line overflow protection -- limit maximal delay. 152 * Delays longer than this one (~146 years) are considered to be delayed "forever". 153 */ 154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2 155 156 internal fun delayToNanos(timeMillis: Long): Long = when { 157 timeMillis <= 0 -> 0L 158 timeMillis >= MAX_MS -> Long.MAX_VALUE 159 else -> timeMillis * MS_TO_NS 160 } 161 162 internal fun delayNanosToMillis(timeNanos: Long): Long = 163 timeNanos / MS_TO_NS 164 165 @SharedImmutable 166 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") 167 168 private typealias Queue<T> = LockFreeTaskQueueCore<T> 169 170 internal expect abstract class EventLoopImplPlatform() : EventLoop { 171 // Called to unpark this event loop's thread 172 protected fun unpark() 173 174 // Called to reschedule to DefaultExecutor when this event loop is complete 175 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) 176 } 177 178 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { 179 // null | CLOSED_EMPTY | task | Queue<Runnable> 180 private val _queue = atomic<Any?>(null) 181 182 // Allocated only only once 183 private val _delayed = atomic<DelayedTaskQueue?>(null) 184 185 @Volatile 186 private var isCompleted = false 187 188 override val isEmpty: Boolean get() { 189 if (!isUnconfinedQueueEmpty) return false 190 val delayed = _delayed.value 191 if (delayed != null && !delayed.isEmpty) return false 192 val queue = _queue.value 193 return when (queue) { 194 null -> true 195 is Queue<*> -> queue.isEmpty 196 else -> queue === CLOSED_EMPTY 197 } 198 } 199 200 protected override val nextTime: Long 201 get() { 202 if (super.nextTime == 0L) return 0L 203 val queue = _queue.value 204 when { 205 queue === null -> {} // empty queue -- proceed 206 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue 207 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed 208 else -> return 0 // non-empty queue 209 } 210 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE 211 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0) 212 } 213 214 override fun shutdown() { 215 // Clean up thread-local reference here -- this event loop is shutting down 216 ThreadLocalEventLoop.resetEventLoop() 217 // We should signal that this event loop should not accept any more tasks 218 // and process queued events (that could have been added after last processNextEvent) 219 isCompleted = true 220 closeQueue() 221 // complete processing of all queued tasks 222 while (processNextEvent() <= 0) { /* spin */ } 223 // reschedule the rest of delayed tasks 224 rescheduleAllDelayed() 225 } 226 227 public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 228 val timeNanos = delayToNanos(timeMillis) 229 if (timeNanos < MAX_DELAY_NS) { 230 val now = nanoTime() 231 DelayedResumeTask(now + timeNanos, continuation).also { task -> 232 continuation.disposeOnCancellation(task) 233 schedule(now, task) 234 } 235 } 236 } 237 238 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { 239 val timeNanos = delayToNanos(timeMillis) 240 return if (timeNanos < MAX_DELAY_NS) { 241 val now = nanoTime() 242 DelayedRunnableTask(now + timeNanos, block).also { task -> 243 schedule(now, task) 244 } 245 } else { 246 NonDisposableHandle 247 } 248 } 249 250 override fun processNextEvent(): Long { 251 // unconfined events take priority 252 if (processUnconfinedEvent()) return nextTime 253 // queue all delayed tasks that are due to be executed 254 val delayed = _delayed.value 255 if (delayed != null && !delayed.isEmpty) { 256 val now = nanoTime() 257 while (true) { 258 // make sure that moving from delayed to queue removes from delayed only after it is added to queue 259 // to make sure that 'isEmpty' and `nextTime` that check both of them 260 // do not transiently report that both delayed and queue are empty during move 261 delayed.removeFirstIf { 262 if (it.timeToExecute(now)) { 263 enqueueImpl(it) 264 } else 265 false 266 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete" 267 } 268 } 269 // then process one event from queue 270 dequeue()?.run() 271 return nextTime 272 } 273 274 public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) 275 276 public fun enqueue(task: Runnable) { 277 if (enqueueImpl(task)) { 278 // todo: we should unpark only when this delayed task became first in the queue 279 unpark() 280 } else { 281 DefaultExecutor.enqueue(task) 282 } 283 } 284 285 @Suppress("UNCHECKED_CAST") 286 private fun enqueueImpl(task: Runnable): Boolean { 287 _queue.loop { queue -> 288 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close 289 when (queue) { 290 null -> if (_queue.compareAndSet(null, task)) return true 291 is Queue<*> -> { 292 when ((queue as Queue<Runnable>).addLast(task)) { 293 Queue.ADD_SUCCESS -> return true 294 Queue.ADD_CLOSED -> return false 295 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next()) 296 } 297 } 298 else -> when { 299 queue === CLOSED_EMPTY -> return false 300 else -> { 301 // update to full-blown queue to add one more 302 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true) 303 newQueue.addLast(queue as Runnable) 304 newQueue.addLast(task) 305 if (_queue.compareAndSet(queue, newQueue)) return true 306 } 307 } 308 } 309 } 310 } 311 312 @Suppress("UNCHECKED_CAST") 313 private fun dequeue(): Runnable? { 314 _queue.loop { queue -> 315 when (queue) { 316 null -> return null 317 is Queue<*> -> { 318 val result = (queue as Queue<Runnable>).removeFirstOrNull() 319 if (result !== Queue.REMOVE_FROZEN) return result as Runnable? 320 _queue.compareAndSet(queue, queue.next()) 321 } 322 else -> when { 323 queue === CLOSED_EMPTY -> return null 324 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable 325 } 326 } 327 } 328 } 329 330 private fun closeQueue() { 331 assert { isCompleted } 332 _queue.loop { queue -> 333 when (queue) { 334 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return 335 is Queue<*> -> { 336 queue.close() 337 return 338 } 339 else -> when { 340 queue === CLOSED_EMPTY -> return 341 else -> { 342 // update to full-blown queue to close 343 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true) 344 newQueue.addLast(queue as Runnable) 345 if (_queue.compareAndSet(queue, newQueue)) return 346 } 347 } 348 } 349 } 350 351 } 352 353 public fun schedule(now: Long, delayedTask: DelayedTask) { 354 when (scheduleImpl(now, delayedTask)) { 355 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark() 356 SCHEDULE_COMPLETED -> reschedule(now, delayedTask) 357 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed 358 else -> error("unexpected result") 359 } 360 } 361 362 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task 363 364 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int { 365 if (isCompleted) return SCHEDULE_COMPLETED 366 val delayedQueue = _delayed.value ?: run { 367 _delayed.compareAndSet(null, DelayedTaskQueue(now)) 368 _delayed.value!! 369 } 370 return delayedTask.scheduleTask(now, delayedQueue, this) 371 } 372 373 // It performs "hard" shutdown for test cleanup purposes 374 protected fun resetAll() { 375 _queue.value = null 376 _delayed.value = null 377 } 378 379 // This is a "soft" (normal) shutdown 380 private fun rescheduleAllDelayed() { 381 val now = nanoTime() 382 while (true) { 383 /* 384 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not 385 * synchronized on DelayedTask itself. All other operation are synchronized both on 386 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose` 387 * first removes DelayedTask from the heap (under synchronization) then 388 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update. 389 */ 390 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break 391 reschedule(now, delayedTask) 392 } 393 } 394 395 internal abstract class DelayedTask( 396 /** 397 * This field can be only modified in [scheduleTask] before putting this DelayedTask 398 * into heap to avoid overflow and corruption of heap data structure. 399 */ 400 @JvmField var nanoTime: Long 401 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode { 402 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK 403 404 override var heap: ThreadSafeHeap<*>? 405 get() = _heap as? ThreadSafeHeap<*> 406 set(value) { 407 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing 408 _heap = value 409 } 410 411 override var index: Int = -1 412 413 override fun compareTo(other: DelayedTask): Int { 414 val dTime = nanoTime - other.nanoTime 415 return when { 416 dTime > 0 -> 1 417 dTime < 0 -> -1 418 else -> 0 419 } 420 } 421 422 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L 423 424 @Synchronized 425 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int { 426 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed 427 delayed.addLastIf(this) { firstTask -> 428 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask 429 /** 430 * We are about to add new task and we have to make sure that [DelayedTaskQueue] 431 * invariant is maintained. The code in this lambda is additionally executed under 432 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe. 433 */ 434 if (firstTask == null) { 435 /** 436 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to 437 * the current now time even if that means "going backwards in time". This makes the structure 438 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks 439 * are removed from the delayed queue for execution. 440 */ 441 delayed.timeNow = now 442 } else { 443 /** 444 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time 445 * and only goes forward in time. We cannot let it go backwards in time or invariant can be 446 * violated for tasks that were already scheduled. 447 */ 448 val firstTime = firstTask.nanoTime 449 // compute min(now, firstTime) using a wrap-safe check 450 val minTime = if (firstTime - now >= 0) now else firstTime 451 // update timeNow only when going forward in time 452 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime 453 } 454 /** 455 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added 456 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask 457 * function can be called to reschedule from one queue to another and this might be another reason 458 * where new task's time might now violate invariant. 459 * We correct invariant violation (if any) by simply changing this task's time to now. 460 */ 461 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow 462 true 463 } 464 return SCHEDULE_OK 465 } 466 467 @Synchronized 468 final override fun dispose() { 469 val heap = _heap 470 if (heap === DISPOSED_TASK) return // already disposed 471 @Suppress("UNCHECKED_CAST") 472 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first) 473 _heap = DISPOSED_TASK // never add again to any heap 474 } 475 476 override fun toString(): String = "Delayed[nanos=$nanoTime]" 477 } 478 479 private inner class DelayedResumeTask( 480 nanoTime: Long, 481 private val cont: CancellableContinuation<Unit> 482 ) : DelayedTask(nanoTime) { 483 override fun run() { with(cont) { resumeUndispatched(Unit) } } 484 override fun toString(): String = super.toString() + cont.toString() 485 } 486 487 private class DelayedRunnableTask( 488 nanoTime: Long, 489 private val block: Runnable 490 ) : DelayedTask(nanoTime) { 491 override fun run() { block.run() } 492 override fun toString(): String = super.toString() + block.toString() 493 } 494 495 /** 496 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in 497 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the 498 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source. 499 * The invariant is that for every scheduled [DelayedTask]: 500 * 501 * ``` 502 * delayedTask.nanoTime - timeNow >= 0 503 * ``` 504 * 505 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as 506 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when 507 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed 508 * (so there is nothing special to do there). 509 */ 510 internal class DelayedTaskQueue( 511 @JvmField var timeNow: Long 512 ) : ThreadSafeHeap<DelayedTask>() 513 } 514 515 internal expect fun createEventLoop(): EventLoop 516 517 internal expect fun nanoTime(): Long 518 519 internal expect object DefaultExecutor { 520 public fun enqueue(task: Runnable) 521 } 522 523