1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.intentresolver.util
18 
19 import android.os.SystemClock
20 import kotlinx.coroutines.CoroutineStart
21 import kotlinx.coroutines.Job
22 import kotlinx.coroutines.coroutineScope
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.channelFlow
26 import kotlinx.coroutines.launch
27 
28 /**
29  * Returns a flow that mirrors the original flow, but delays values following emitted values for the
30  * given [periodMs]. If the original flow emits more than one value during this period, only the
31  * latest value is emitted.
32  *
33  * Example:
34  * ```kotlin
35  * flow {
36  *     emit(1)     // t=0ms
37  *     delay(90)
38  *     emit(2)     // t=90ms
39  *     delay(90)
40  *     emit(3)     // t=180ms
41  *     delay(1010)
42  *     emit(4)     // t=1190ms
43  *     delay(1010)
44  *     emit(5)     // t=2200ms
45  * }.throttle(1000)
46  * ```
47  *
48  * produces the following emissions at the following times
49  *
50  * ```text
51  * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
52  * ```
53  */
54 // A SystemUI com.android.systemui.util.kotlin.throttle copy.
<lambda>null55 fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = channelFlow {
56     coroutineScope {
57         var previousEmitTimeMs = 0L
58         var delayJob: Job? = null
59         var sendJob: Job? = null
60         val outerScope = this
61 
62         collect {
63             delayJob?.cancel()
64             sendJob?.join()
65             val currentTimeMs = SystemClock.elapsedRealtime()
66             val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
67             val timeUntilNextEmit = maxOf(0L, periodMs - timeSinceLastEmit)
68             if (timeUntilNextEmit > 0L) {
69                 // We create delayJob to allow cancellation during the delay period
70                 delayJob = launch {
71                     delay(timeUntilNextEmit)
72                     sendJob =
73                         outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
74                             send(it)
75                             previousEmitTimeMs = SystemClock.elapsedRealtime()
76                         }
77                 }
78             } else {
79                 send(it)
80                 previousEmitTimeMs = currentTimeMs
81             }
82         }
83     }
84 }
85