1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.common;
18 
19 import androidx.annotation.NonNull;
20 
21 import com.android.adservices.LogUtil;
22 import com.android.adservices.concurrency.AdServicesExecutors;
23 
24 import com.google.common.util.concurrent.FluentFuture;
25 
26 import java.util.Objects;
27 import java.util.function.Supplier;
28 
29 /**
30  * Utility class helping to ensure that only one thread is running a given task.
31  *
32  * @param <T> Type of the result returned by the task.
33  */
34 public class SingletonRunner<T> {
35     /**
36      * Represents the runner of an interruptable task. The parameter {@code stopFlagChecker} can be
37      * invoked to check if a request to stop the task has been received.
38      *
39      * @param <T> the return type of the task.
40      */
41     public interface InterruptableTaskRunner<T> {
42         /**
43          * Run the task.
44          *
45          * @param stopFlagChecker returns true if the task should be stopped.
46          */
run(Supplier<Boolean> stopFlagChecker)47         FluentFuture<T> run(Supplier<Boolean> stopFlagChecker);
48     }
49 
50     // Critical region to guarantee that write changes to mWorkInProgress and
51     // mStopWorkRequestPending are logically consistent
52     // (i.e. we are checking that there is WIP before setting mStopWorkRequestPending to true)
53     private final Object mWorkStatusWriteLock = new Object();
54     private final String mTaskDescription;
55     private volatile boolean mStopWorkRequestPending;
56     private FluentFuture<T> mRunningTaskResult = null;
57     InterruptableTaskRunner<T> mTaskRunner;
58 
SingletonRunner( @onNull String taskDescription, @NonNull InterruptableTaskRunner<T> taskRunner)59     public SingletonRunner(
60             @NonNull String taskDescription, @NonNull InterruptableTaskRunner<T> taskRunner) {
61         Objects.requireNonNull(taskDescription);
62         Objects.requireNonNull(taskRunner);
63 
64         mTaskDescription = taskDescription;
65         mStopWorkRequestPending = false;
66         mTaskRunner = taskRunner;
67     }
68 
69     /**
70      * Ensures that there is only one thread running the task and that requests to stop are not
71      * interfering with new starting jobs
72      */
runSingleInstance()73     public FluentFuture<T> runSingleInstance() {
74         synchronized (mWorkStatusWriteLock) {
75             if (mRunningTaskResult != null) {
76                 LogUtil.w("Already running %s, skipping call", mTaskDescription);
77             } else {
78                 mRunningTaskResult =
79                         mTaskRunner
80                                 .run(this::shouldStop)
81                                 // not using a callback to be sure that the status is reset
82                                 // when the future is completed.
83                                 .transform(
84                                         result -> {
85                                             signalWorkIsComplete();
86                                             return result;
87                                         },
88                                         AdServicesExecutors.getLightWeightExecutor())
89                                 .catching(
90                                         RuntimeException.class,
91                                         e -> {
92                                             signalWorkIsComplete();
93                                             throw e;
94                                         },
95                                         AdServicesExecutors.getLightWeightExecutor());
96             }
97 
98             // Returning the value of mRunningTaskResult from inside the critical region
99             // to avoid returning null if the runner completes so early that it is actually
100             // resetting the mRunningTaskResult in this instance.
101             return mRunningTaskResult;
102         }
103     }
104 
signalWorkIsComplete()105     private void signalWorkIsComplete() {
106         synchronized (mWorkStatusWriteLock) {
107             mRunningTaskResult = null;
108             mStopWorkRequestPending = false;
109         }
110     }
111 
shouldStop()112     private boolean shouldStop() {
113         return mStopWorkRequestPending;
114     }
115 
116     /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */
stopWork()117     public void stopWork() {
118         LogUtil.d("%s stop work requested", mTaskDescription);
119 
120         synchronized (mWorkStatusWriteLock) {
121             if (mRunningTaskResult == null) {
122                 LogUtil.d("%s not running", mTaskDescription);
123                 return;
124             }
125 
126             mStopWorkRequestPending = true;
127         }
128 
129         LogUtil.d("%s configured to shut down", mTaskDescription);
130     }
131 }
132