1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.concurrency;
18 
19 import android.annotation.NonNull;
20 import android.os.Process;
21 import android.os.StrictMode;
22 import android.os.StrictMode.ThreadPolicy;
23 
24 import com.android.internal.annotations.VisibleForTesting;
25 
26 import com.google.common.util.concurrent.ListeningExecutorService;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29 
30 import java.util.Optional;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 
38 /**
39  * All executors of the PP API module.
40  *
41  * @hide
42  */
43 // TODO(b/224987182): set appropriate parameters (priority, size, etc..) for the shared thread pools
44 // after doing detailed analysis. Ideally the parameters should be backed by PH flags.
45 public final class AdServicesExecutors {
46     // We set the minimal number of threads for background executor to 4 and lightweight & scheduled
47     //  executors to 2 since Runtime.getRuntime().availableProcessors() may return 1 or 2 for
48     //  low-end devices. This may cause deadlock for starvation in those low-end devices.
49     private static final int MIN_BACKGROUND_EXECUTOR_THREADS = 4;
50     private static final int MIN_LIGHTWEIGHT_EXECUTOR_THREADS = 2;
51     private static final int MAX_SCHEDULED_EXECUTOR_THREADS = 4;
52 
53     private static final String LIGHTWEIGHT_NAME = "lightweight";
54     private static final String BACKGROUND_NAME = "background";
55     private static final String SCHEDULED_NAME = "scheduled";
56     private static final String BLOCKING_NAME = "blocking";
57 
createThreadFactory( String name, int priority, Optional<StrictMode.ThreadPolicy> policy)58     private static ThreadFactory createThreadFactory(
59             String name, int priority, Optional<StrictMode.ThreadPolicy> policy) {
60         return new ThreadFactoryBuilder()
61                 .setDaemon(true)
62                 .setNameFormat(name + "-%d")
63                 .setThreadFactory(
64                         new ThreadFactory() {
65                             @Override
66                             public Thread newThread(final Runnable runnable) {
67                                 return new Thread(
68                                         () -> {
69                                             if (policy.isPresent()) {
70                                                 StrictMode.setThreadPolicy(policy.get());
71                                             }
72                                             // Process class operates on the current thread.
73                                             Process.setThreadPriority(priority);
74                                             runnable.run();
75                                         });
76                             }
77                         })
78                 .build();
79     }
80 
81     private static final ListeningExecutorService sLightWeightExecutor =
82             // Always use at least two threads, so that clients can't depend on light-weight
83             // executor tasks executing sequentially
84             MoreExecutors.listeningDecorator(
85                     new ThreadPoolExecutor(
86                             /* corePoolSize= */ Math.max(
87                                     MIN_LIGHTWEIGHT_EXECUTOR_THREADS,
88                                     Runtime.getRuntime().availableProcessors() - 2),
89                             /* maximumPoolSize */
90                             Math.max(
91                                     MIN_LIGHTWEIGHT_EXECUTOR_THREADS,
92                                     Runtime.getRuntime().availableProcessors() - 2),
93                             /* keepAliveTime= */ 60L,
94                             TimeUnit.SECONDS,
95                             new LinkedBlockingQueue<>(),
96                             createThreadFactory(
97                                     LIGHTWEIGHT_NAME,
98                                     Process.THREAD_PRIORITY_DEFAULT,
99                                     Optional.of(getAsyncThreadPolicy()))));
100 
101     /**
102      * Functions that don't do direct I/O and that are fast (under ten milliseconds or thereabouts)
103      * should run on this Executor.
104      *
105      * <p>Most async code in an app should be written to run on this Executor.
106      */
107     @NonNull
108     public static ListeningExecutorService getLightWeightExecutor() {
109         return sLightWeightExecutor;
110     }
111 
112     private static final ListeningExecutorService sBackgroundExecutor =
113             MoreExecutors.listeningDecorator(
114                     new ThreadPoolExecutor(
115                             /* corePoolSize= */ Math.max(
116                                     MIN_BACKGROUND_EXECUTOR_THREADS,
117                                     Runtime.getRuntime().availableProcessors()),
118                             /* maximumPoolSize */ Math.max(
119                                     MIN_BACKGROUND_EXECUTOR_THREADS,
120                                     Runtime.getRuntime().availableProcessors()),
121                             /* keepAliveTime= */ 60L,
122                             TimeUnit.SECONDS,
123                             new LinkedBlockingQueue<>(),
124                             createThreadFactory(
125                                     BACKGROUND_NAME,
126                                     Process.THREAD_PRIORITY_BACKGROUND,
127                                     Optional.of(getIoThreadPolicy()))));
128 
129     /**
130      * Functions that directly execute disk I/O, or that are CPU bound and long-running (over ten
131      * milliseconds or thereabouts) should run on this Executor.
132      *
133      * <p>Examples include stepping through a database Cursor, or decoding an image into a Bitmap.
134      *
135      * <p>Functions that block on network I/O must run on BlockingExecutor.
136      */
137     @NonNull
138     public static ListeningExecutorService getBackgroundExecutor() {
139         return sBackgroundExecutor;
140     }
141 
142     private static final ScheduledThreadPoolExecutor sScheduler =
143             new ScheduledThreadPoolExecutor(
144                     /* corePoolSize= */ Math.min(
145                             MAX_SCHEDULED_EXECUTOR_THREADS,
146                             Runtime.getRuntime().availableProcessors()),
147                     createThreadFactory(
148                             SCHEDULED_NAME,
149                             Process.THREAD_PRIORITY_DEFAULT,
150                             Optional.of(getIoThreadPolicy())));
151 
152     /**
153      * Functions that require to be run with a delay, or have timed executions should run on this
154      * Executor.
155      *
156      * <p>Example includes having timeouts on Futures.
157      */
158     @NonNull
159     public static ScheduledThreadPoolExecutor getScheduler() {
160         return sScheduler;
161     }
162 
163     private static final ListeningExecutorService sBlockingExecutor =
164             MoreExecutors.listeningDecorator(
165                     Executors.newCachedThreadPool(
166                             createThreadFactory(
167                                     BLOCKING_NAME,
168                                     Process.THREAD_PRIORITY_BACKGROUND
169                                             + Process.THREAD_PRIORITY_LESS_FAVORABLE,
170                                     /* policy = */ Optional.empty())));
171 
172     /**
173      * Functions that directly execute network I/O, or that block their thread awaiting the progress
174      * of at least one other thread, must run on BlockingExecutor.
175      *
176      * <p>BlockingExecutor will launch as many threads as there are tasks available to run
177      * concurrently, stopping and freeing them when the concurrent task count drops again. This
178      * unbounded number of threads negatively impacts performance:
179      *
180      * <p>Extra threads add execution overhead and increase execution latency. Each thread consumes
181      * significant memory for thread-local state and stack, and may increase the total amount of
182      * space used by objects on the heap. Each additional BlockingExecutor thread reduces the time
183      * available to the fixed-size LightweightExecutor and BackgroundExecutor. While
184      * BlockingExecutor's threads have a lower priority to decrease this impact, the extra threads
185      * can still compete for resources. Always prefer to refactor a class or API to avoid blocking
186      * before falling back to using the blocking Executor.
187      */
188     @NonNull
189     public static ListeningExecutorService getBlockingExecutor() {
190         return sBlockingExecutor;
191     }
192 
193     @VisibleForTesting
194     static ThreadPolicy getAsyncThreadPolicy() {
195         return new ThreadPolicy.Builder().detectAll().penaltyLog().build();
196     }
197 
198     @VisibleForTesting
199     static ThreadPolicy getIoThreadPolicy() {
200         return new ThreadPolicy.Builder()
201                 .detectNetwork()
202                 .detectResourceMismatches()
203                 .detectUnbufferedIo()
204                 .penaltyLog()
205                 .build();
206     }
207 
208     private AdServicesExecutors() {}
209 }
210