1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.common; 18 19 import androidx.annotation.NonNull; 20 21 import com.android.adservices.LogUtil; 22 import com.android.adservices.concurrency.AdServicesExecutors; 23 24 import com.google.common.util.concurrent.FluentFuture; 25 26 import java.util.Objects; 27 import java.util.function.Supplier; 28 29 /** 30 * Utility class helping to ensure that only one thread is running a given task. 31 * 32 * @param <T> Type of the result returned by the task. 33 */ 34 public class SingletonRunner<T> { 35 /** 36 * Represents the runner of an interruptable task. The parameter {@code stopFlagChecker} can be 37 * invoked to check if a request to stop the task has been received. 38 * 39 * @param <T> the return type of the task. 40 */ 41 public interface InterruptableTaskRunner<T> { 42 /** 43 * Run the task. 44 * 45 * @param stopFlagChecker returns true if the task should be stopped. 46 */ run(Supplier<Boolean> stopFlagChecker)47 FluentFuture<T> run(Supplier<Boolean> stopFlagChecker); 48 } 49 50 // Critical region to guarantee that write changes to mWorkInProgress and 51 // mStopWorkRequestPending are logically consistent 52 // (i.e. we are checking that there is WIP before setting mStopWorkRequestPending to true) 53 private final Object mWorkStatusWriteLock = new Object(); 54 private final String mTaskDescription; 55 private volatile boolean mStopWorkRequestPending; 56 private FluentFuture<T> mRunningTaskResult = null; 57 InterruptableTaskRunner<T> mTaskRunner; 58 SingletonRunner( @onNull String taskDescription, @NonNull InterruptableTaskRunner<T> taskRunner)59 public SingletonRunner( 60 @NonNull String taskDescription, @NonNull InterruptableTaskRunner<T> taskRunner) { 61 Objects.requireNonNull(taskDescription); 62 Objects.requireNonNull(taskRunner); 63 64 mTaskDescription = taskDescription; 65 mStopWorkRequestPending = false; 66 mTaskRunner = taskRunner; 67 } 68 69 /** 70 * Ensures that there is only one thread running the task and that requests to stop are not 71 * interfering with new starting jobs 72 */ runSingleInstance()73 public FluentFuture<T> runSingleInstance() { 74 synchronized (mWorkStatusWriteLock) { 75 if (mRunningTaskResult != null) { 76 LogUtil.w("Already running %s, skipping call", mTaskDescription); 77 } else { 78 mRunningTaskResult = 79 mTaskRunner 80 .run(this::shouldStop) 81 // not using a callback to be sure that the status is reset 82 // when the future is completed. 83 .transform( 84 result -> { 85 signalWorkIsComplete(); 86 return result; 87 }, 88 AdServicesExecutors.getLightWeightExecutor()) 89 .catching( 90 RuntimeException.class, 91 e -> { 92 signalWorkIsComplete(); 93 throw e; 94 }, 95 AdServicesExecutors.getLightWeightExecutor()); 96 } 97 98 // Returning the value of mRunningTaskResult from inside the critical region 99 // to avoid returning null if the runner completes so early that it is actually 100 // resetting the mRunningTaskResult in this instance. 101 return mRunningTaskResult; 102 } 103 } 104 signalWorkIsComplete()105 private void signalWorkIsComplete() { 106 synchronized (mWorkStatusWriteLock) { 107 mRunningTaskResult = null; 108 mStopWorkRequestPending = false; 109 } 110 } 111 shouldStop()112 private boolean shouldStop() { 113 return mStopWorkRequestPending; 114 } 115 116 /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */ stopWork()117 public void stopWork() { 118 LogUtil.d("%s stop work requested", mTaskDescription); 119 120 synchronized (mWorkStatusWriteLock) { 121 if (mRunningTaskResult == null) { 122 LogUtil.d("%s not running", mTaskDescription); 123 return; 124 } 125 126 mStopWorkRequestPending = true; 127 } 128 129 LogUtil.d("%s configured to shut down", mTaskDescription); 130 } 131 } 132