1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.concurrency; 18 19 import android.annotation.NonNull; 20 import android.os.Process; 21 import android.os.StrictMode; 22 import android.os.StrictMode.ThreadPolicy; 23 24 import com.android.internal.annotations.VisibleForTesting; 25 26 import com.google.common.util.concurrent.ListeningExecutorService; 27 import com.google.common.util.concurrent.MoreExecutors; 28 import com.google.common.util.concurrent.ThreadFactoryBuilder; 29 30 import java.util.Optional; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.ScheduledThreadPoolExecutor; 34 import java.util.concurrent.ThreadFactory; 35 import java.util.concurrent.ThreadPoolExecutor; 36 import java.util.concurrent.TimeUnit; 37 38 /** 39 * All executors of the PP API module. 40 * 41 * @hide 42 */ 43 // TODO(b/224987182): set appropriate parameters (priority, size, etc..) for the shared thread pools 44 // after doing detailed analysis. Ideally the parameters should be backed by PH flags. 45 public final class AdServicesExecutors { 46 // We set the minimal number of threads for background executor to 4 and lightweight & scheduled 47 // executors to 2 since Runtime.getRuntime().availableProcessors() may return 1 or 2 for 48 // low-end devices. This may cause deadlock for starvation in those low-end devices. 49 private static final int MIN_BACKGROUND_EXECUTOR_THREADS = 4; 50 private static final int MIN_LIGHTWEIGHT_EXECUTOR_THREADS = 2; 51 private static final int MAX_SCHEDULED_EXECUTOR_THREADS = 4; 52 53 private static final String LIGHTWEIGHT_NAME = "lightweight"; 54 private static final String BACKGROUND_NAME = "background"; 55 private static final String SCHEDULED_NAME = "scheduled"; 56 private static final String BLOCKING_NAME = "blocking"; 57 createThreadFactory( String name, int priority, Optional<StrictMode.ThreadPolicy> policy)58 private static ThreadFactory createThreadFactory( 59 String name, int priority, Optional<StrictMode.ThreadPolicy> policy) { 60 return new ThreadFactoryBuilder() 61 .setDaemon(true) 62 .setNameFormat(name + "-%d") 63 .setThreadFactory( 64 new ThreadFactory() { 65 @Override 66 public Thread newThread(final Runnable runnable) { 67 return new Thread( 68 () -> { 69 if (policy.isPresent()) { 70 StrictMode.setThreadPolicy(policy.get()); 71 } 72 // Process class operates on the current thread. 73 Process.setThreadPriority(priority); 74 runnable.run(); 75 }); 76 } 77 }) 78 .build(); 79 } 80 81 private static final ListeningExecutorService sLightWeightExecutor = 82 // Always use at least two threads, so that clients can't depend on light-weight 83 // executor tasks executing sequentially 84 MoreExecutors.listeningDecorator( 85 new ThreadPoolExecutor( 86 /* corePoolSize= */ Math.max( 87 MIN_LIGHTWEIGHT_EXECUTOR_THREADS, 88 Runtime.getRuntime().availableProcessors() - 2), 89 /* maximumPoolSize */ 90 Math.max( 91 MIN_LIGHTWEIGHT_EXECUTOR_THREADS, 92 Runtime.getRuntime().availableProcessors() - 2), 93 /* keepAliveTime= */ 60L, 94 TimeUnit.SECONDS, 95 new LinkedBlockingQueue<>(), 96 createThreadFactory( 97 LIGHTWEIGHT_NAME, 98 Process.THREAD_PRIORITY_DEFAULT, 99 Optional.of(getAsyncThreadPolicy())))); 100 101 /** 102 * Functions that don't do direct I/O and that are fast (under ten milliseconds or thereabouts) 103 * should run on this Executor. 104 * 105 * <p>Most async code in an app should be written to run on this Executor. 106 */ 107 @NonNull 108 public static ListeningExecutorService getLightWeightExecutor() { 109 return sLightWeightExecutor; 110 } 111 112 private static final ListeningExecutorService sBackgroundExecutor = 113 MoreExecutors.listeningDecorator( 114 new ThreadPoolExecutor( 115 /* corePoolSize= */ Math.max( 116 MIN_BACKGROUND_EXECUTOR_THREADS, 117 Runtime.getRuntime().availableProcessors()), 118 /* maximumPoolSize */ Math.max( 119 MIN_BACKGROUND_EXECUTOR_THREADS, 120 Runtime.getRuntime().availableProcessors()), 121 /* keepAliveTime= */ 60L, 122 TimeUnit.SECONDS, 123 new LinkedBlockingQueue<>(), 124 createThreadFactory( 125 BACKGROUND_NAME, 126 Process.THREAD_PRIORITY_BACKGROUND, 127 Optional.of(getIoThreadPolicy())))); 128 129 /** 130 * Functions that directly execute disk I/O, or that are CPU bound and long-running (over ten 131 * milliseconds or thereabouts) should run on this Executor. 132 * 133 * <p>Examples include stepping through a database Cursor, or decoding an image into a Bitmap. 134 * 135 * <p>Functions that block on network I/O must run on BlockingExecutor. 136 */ 137 @NonNull 138 public static ListeningExecutorService getBackgroundExecutor() { 139 return sBackgroundExecutor; 140 } 141 142 private static final ScheduledThreadPoolExecutor sScheduler = 143 new ScheduledThreadPoolExecutor( 144 /* corePoolSize= */ Math.min( 145 MAX_SCHEDULED_EXECUTOR_THREADS, 146 Runtime.getRuntime().availableProcessors()), 147 createThreadFactory( 148 SCHEDULED_NAME, 149 Process.THREAD_PRIORITY_DEFAULT, 150 Optional.of(getIoThreadPolicy()))); 151 152 /** 153 * Functions that require to be run with a delay, or have timed executions should run on this 154 * Executor. 155 * 156 * <p>Example includes having timeouts on Futures. 157 */ 158 @NonNull 159 public static ScheduledThreadPoolExecutor getScheduler() { 160 return sScheduler; 161 } 162 163 private static final ListeningExecutorService sBlockingExecutor = 164 MoreExecutors.listeningDecorator( 165 Executors.newCachedThreadPool( 166 createThreadFactory( 167 BLOCKING_NAME, 168 Process.THREAD_PRIORITY_BACKGROUND 169 + Process.THREAD_PRIORITY_LESS_FAVORABLE, 170 /* policy = */ Optional.empty()))); 171 172 /** 173 * Functions that directly execute network I/O, or that block their thread awaiting the progress 174 * of at least one other thread, must run on BlockingExecutor. 175 * 176 * <p>BlockingExecutor will launch as many threads as there are tasks available to run 177 * concurrently, stopping and freeing them when the concurrent task count drops again. This 178 * unbounded number of threads negatively impacts performance: 179 * 180 * <p>Extra threads add execution overhead and increase execution latency. Each thread consumes 181 * significant memory for thread-local state and stack, and may increase the total amount of 182 * space used by objects on the heap. Each additional BlockingExecutor thread reduces the time 183 * available to the fixed-size LightweightExecutor and BackgroundExecutor. While 184 * BlockingExecutor's threads have a lower priority to decrease this impact, the extra threads 185 * can still compete for resources. Always prefer to refactor a class or API to avoid blocking 186 * before falling back to using the blocking Executor. 187 */ 188 @NonNull 189 public static ListeningExecutorService getBlockingExecutor() { 190 return sBlockingExecutor; 191 } 192 193 @VisibleForTesting 194 static ThreadPolicy getAsyncThreadPolicy() { 195 return new ThreadPolicy.Builder().detectAll().penaltyLog().build(); 196 } 197 198 @VisibleForTesting 199 static ThreadPolicy getIoThreadPolicy() { 200 return new ThreadPolicy.Builder() 201 .detectNetwork() 202 .detectResourceMismatches() 203 .detectUnbufferedIo() 204 .penaltyLog() 205 .build(); 206 } 207 208 private AdServicesExecutors() {} 209 } 210