1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.intentresolver.util
18
19 import android.os.SystemClock
20 import kotlinx.coroutines.CoroutineStart
21 import kotlinx.coroutines.Job
22 import kotlinx.coroutines.coroutineScope
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.channelFlow
26 import kotlinx.coroutines.launch
27
28 /**
29 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
30 * given [periodMs]. If the original flow emits more than one value during this period, only the
31 * latest value is emitted.
32 *
33 * Example:
34 * ```kotlin
35 * flow {
36 * emit(1) // t=0ms
37 * delay(90)
38 * emit(2) // t=90ms
39 * delay(90)
40 * emit(3) // t=180ms
41 * delay(1010)
42 * emit(4) // t=1190ms
43 * delay(1010)
44 * emit(5) // t=2200ms
45 * }.throttle(1000)
46 * ```
47 *
48 * produces the following emissions at the following times
49 *
50 * ```text
51 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
52 * ```
53 */
54 // A SystemUI com.android.systemui.util.kotlin.throttle copy.
<lambda>null55 fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = channelFlow {
56 coroutineScope {
57 var previousEmitTimeMs = 0L
58 var delayJob: Job? = null
59 var sendJob: Job? = null
60 val outerScope = this
61
62 collect {
63 delayJob?.cancel()
64 sendJob?.join()
65 val currentTimeMs = SystemClock.elapsedRealtime()
66 val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
67 val timeUntilNextEmit = maxOf(0L, periodMs - timeSinceLastEmit)
68 if (timeUntilNextEmit > 0L) {
69 // We create delayJob to allow cancellation during the delay period
70 delayJob = launch {
71 delay(timeUntilNextEmit)
72 sendJob =
73 outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
74 send(it)
75 previousEmitTimeMs = SystemClock.elapsedRealtime()
76 }
77 }
78 } else {
79 send(it)
80 previousEmitTimeMs = currentTimeMs
81 }
82 }
83 }
84 }
85