1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.signals; 18 19 import static com.android.adservices.service.stats.AdsRelevanceStatusUtils.JS_RUN_STATUS_OTHER_FAILURE; 20 import static com.android.adservices.service.stats.AdsRelevanceStatusUtils.JS_RUN_STATUS_SUCCESS; 21 22 import android.adservices.common.AdTechIdentifier; 23 import android.content.Context; 24 25 import com.android.adservices.LoggerFactory; 26 import com.android.adservices.concurrency.AdServicesExecutors; 27 import com.android.adservices.data.enrollment.EnrollmentDao; 28 import com.android.adservices.data.signals.DBEncodedPayload; 29 import com.android.adservices.data.signals.DBEncoderLogicMetadata; 30 import com.android.adservices.data.signals.DBSignalsUpdateMetadata; 31 import com.android.adservices.data.signals.EncodedPayloadDao; 32 import com.android.adservices.data.signals.EncoderLogicHandler; 33 import com.android.adservices.data.signals.EncoderLogicMetadataDao; 34 import com.android.adservices.data.signals.ProtectedSignalsDao; 35 import com.android.adservices.data.signals.ProtectedSignalsDatabase; 36 import com.android.adservices.service.DebugFlags; 37 import com.android.adservices.service.Flags; 38 import com.android.adservices.service.FlagsFactory; 39 import com.android.adservices.service.common.RetryStrategy; 40 import com.android.adservices.service.common.RetryStrategyFactory; 41 import com.android.adservices.service.common.SingletonRunner; 42 import com.android.adservices.service.devapi.DevContext; 43 import com.android.adservices.service.stats.AdServicesLogger; 44 import com.android.adservices.service.stats.AdServicesLoggerImpl; 45 import com.android.adservices.service.stats.pas.EncodingExecutionLogHelper; 46 import com.android.adservices.service.stats.pas.EncodingExecutionLogHelperImpl; 47 import com.android.adservices.service.stats.pas.EncodingExecutionLogHelperNoOpImpl; 48 import com.android.adservices.service.stats.pas.EncodingJobRunStats; 49 import com.android.adservices.service.stats.pas.EncodingJobRunStatsLogger; 50 import com.android.adservices.service.stats.pas.EncodingJobRunStatsLoggerImpl; 51 import com.android.adservices.service.stats.pas.EncodingJobRunStatsLoggerNoLoggingImpl; 52 import com.android.adservices.shared.common.ApplicationContextSingleton; 53 import com.android.adservices.shared.util.Clock; 54 55 import com.google.common.annotations.VisibleForTesting; 56 import com.google.common.util.concurrent.FluentFuture; 57 import com.google.common.util.concurrent.Futures; 58 import com.google.common.util.concurrent.ListenableFuture; 59 import com.google.common.util.concurrent.ListeningExecutorService; 60 61 import java.time.Instant; 62 import java.time.temporal.ChronoUnit; 63 import java.util.List; 64 import java.util.Map; 65 import java.util.concurrent.TimeUnit; 66 import java.util.function.Supplier; 67 import java.util.stream.Collectors; 68 69 /** 70 * Handles the periodic encoding responsibilities, such as fetching the raw signals and triggering 71 * the JS engine for encoding. 72 */ 73 public final class PeriodicEncodingJobWorker { 74 75 private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger(); 76 77 public static final String JOB_DESCRIPTION = "Protected-Signals Periodic Encoding"; 78 79 public static final String PAYLOAD_PERSISTENCE_ERROR_MSG = "Failed to persist encoded payload"; 80 81 private final int mPerBuyerEncodingTimeoutMs; 82 83 private final int mEncodedPayLoadMaxSizeBytes; 84 private final int mEncoderLogicMaximumFailure; 85 86 private final EncoderLogicHandler mEncoderLogicHandler; 87 private final EncoderLogicMetadataDao mEncoderLogicMetadataDao; 88 private final EncodedPayloadDao mEncodedPayloadDao; 89 private final SignalsProvider mSignalsProvider; 90 private final ProtectedSignalsDao mProtectedSignalsDao; 91 private final SignalsScriptEngine mScriptEngine; 92 private final ListeningExecutorService mBackgroundExecutor; 93 private final ListeningExecutorService mLightWeightExecutor; 94 private final EnrollmentDao mEnrollmentDao; 95 private final Clock mClock; 96 private final AdServicesLogger mAdServicesLogger; 97 private final SingletonRunner<Void> mSingletonRunner = 98 new SingletonRunner<>(JOB_DESCRIPTION, this::doRun); 99 100 private final Flags mFlags; 101 102 @VisibleForTesting PeriodicEncodingJobWorker( EncoderLogicHandler encoderLogicHandler, EncoderLogicMetadataDao encoderLogicMetadataDao, EncodedPayloadDao encodedPayloadDao, SignalsProviderImpl signalStorageManager, ProtectedSignalsDao protectedSignalsDao, SignalsScriptEngine scriptEngine, ListeningExecutorService backgroundExecutor, ListeningExecutorService lightWeightExecutor, Flags flags, EnrollmentDao enrollmentDao, Clock clock, AdServicesLogger adServicesLogger)103 public PeriodicEncodingJobWorker( 104 EncoderLogicHandler encoderLogicHandler, 105 EncoderLogicMetadataDao encoderLogicMetadataDao, 106 EncodedPayloadDao encodedPayloadDao, 107 SignalsProviderImpl signalStorageManager, 108 ProtectedSignalsDao protectedSignalsDao, 109 SignalsScriptEngine scriptEngine, 110 ListeningExecutorService backgroundExecutor, 111 ListeningExecutorService lightWeightExecutor, 112 Flags flags, 113 EnrollmentDao enrollmentDao, 114 Clock clock, 115 AdServicesLogger adServicesLogger) { 116 mEncoderLogicHandler = encoderLogicHandler; 117 mEncoderLogicMetadataDao = encoderLogicMetadataDao; 118 mEncodedPayloadDao = encodedPayloadDao; 119 mSignalsProvider = signalStorageManager; 120 mProtectedSignalsDao = protectedSignalsDao; 121 mScriptEngine = scriptEngine; 122 mBackgroundExecutor = backgroundExecutor; 123 mLightWeightExecutor = lightWeightExecutor; 124 mFlags = flags; 125 mEncodedPayLoadMaxSizeBytes = mFlags.getProtectedSignalsEncodedPayloadMaxSizeBytes(); 126 mEncoderLogicMaximumFailure = 127 mFlags.getProtectedSignalsMaxJsFailureExecutionOnCertainVersionBeforeStop(); 128 mEnrollmentDao = enrollmentDao; 129 mClock = clock; 130 mAdServicesLogger = adServicesLogger; 131 mPerBuyerEncodingTimeoutMs = mFlags.getPasScriptExecutionTimeoutMs(); 132 } 133 134 /** 135 * @return an instance of {@link PeriodicEncodingJobWorker} 136 */ getInstance()137 public static PeriodicEncodingJobWorker getInstance() { 138 return FieldHolder.sInstance; 139 } 140 141 // Lazy initialization holder class idiom for static fields as described in Effective Java Item 142 // 83 143 private static final class FieldHolder { 144 private static final PeriodicEncodingJobWorker sInstance = computeFieldValue(); 145 computeFieldValue()146 private static PeriodicEncodingJobWorker computeFieldValue() { 147 Context context = ApplicationContextSingleton.get(); 148 ProtectedSignalsDatabase signalsDatabase = ProtectedSignalsDatabase.getInstance(); 149 Flags flags = FlagsFactory.getFlags(); 150 RetryStrategy retryStrategy = 151 RetryStrategyFactory.createInstance( 152 flags.getAdServicesRetryStrategyEnabled(), 153 AdServicesExecutors.getLightWeightExecutor()) 154 .createRetryStrategy( 155 flags.getAdServicesJsScriptEngineMaxRetryAttempts()); 156 return new PeriodicEncodingJobWorker( 157 new EncoderLogicHandler(context), 158 signalsDatabase.getEncoderLogicMetadataDao(), 159 signalsDatabase.getEncodedPayloadDao(), 160 new SignalsProviderImpl(signalsDatabase.protectedSignalsDao()), 161 signalsDatabase.protectedSignalsDao(), 162 new SignalsScriptEngine( 163 flags::getEnforceIsolateMaxHeapSize, 164 flags::getIsolateMaxHeapSizeBytes, 165 retryStrategy, 166 () -> 167 DebugFlags.getInstance() 168 .getAdServicesJsIsolateConsoleMessagesInLogsEnabled()), 169 AdServicesExecutors.getBackgroundExecutor(), 170 AdServicesExecutors.getLightWeightExecutor(), 171 flags, 172 EnrollmentDao.getInstance(), 173 Clock.getInstance(), 174 AdServicesLoggerImpl.getInstance()); 175 } 176 } 177 178 /** Initiates the encoding of raw signals */ encodeProtectedSignals()179 public FluentFuture<Void> encodeProtectedSignals() { 180 sLogger.v("Starting %s", JOB_DESCRIPTION); 181 return mSingletonRunner.runSingleInstance(); 182 } 183 184 /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */ stopWork()185 public void stopWork() { 186 mSingletonRunner.stopWork(); 187 } 188 189 /** 190 * Runs encoding for the buyers that have registered their encoding logic. Also updates the 191 * encoders for buyers that have the previous encoders downloaded outside the refresh window 192 */ doRun(Supplier<Boolean> shouldStop)193 private FluentFuture<Void> doRun(Supplier<Boolean> shouldStop) { 194 boolean pasExtendedMetricsEnabled = mFlags.getPasExtendedMetricsEnabled(); 195 EncodingJobRunStatsLogger encodingJobRunStatsLogger = 196 pasExtendedMetricsEnabled 197 ? new EncodingJobRunStatsLoggerImpl( 198 mAdServicesLogger, EncodingJobRunStats.builder()) 199 : new EncodingJobRunStatsLoggerNoLoggingImpl(); 200 201 FluentFuture<List<DBEncoderLogicMetadata>> buyersWithRegisteredEncoders = 202 FluentFuture.from( 203 mBackgroundExecutor.submit(mEncoderLogicHandler::getAllRegisteredEncoders)); 204 205 FluentFuture<Void> encodeSignalsFuture = 206 buyersWithRegisteredEncoders.transformAsync( 207 logicMetadata -> 208 doEncodingForRegisteredBuyers( 209 logicMetadata, 210 pasExtendedMetricsEnabled, 211 encodingJobRunStatsLogger), 212 mBackgroundExecutor); 213 214 // TODO(b/294900119) We should do the update of encoding logic in a separate job 215 // Once the encodings are done, we update the encoder logic asynchronously 216 final Instant timeForRefresh = 217 Instant.now() 218 .minus( 219 mFlags.getProtectedSignalsEncoderRefreshWindowSeconds(), 220 ChronoUnit.SECONDS); 221 return encodeSignalsFuture.transformAsync( 222 unused -> { 223 FluentFuture<List<AdTechIdentifier>> buyersWithEncodersReadyForRefresh = 224 FluentFuture.from( 225 mBackgroundExecutor.submit( 226 () -> 227 mEncoderLogicMetadataDao 228 .getBuyersWithEncodersBeforeTime( 229 timeForRefresh))); 230 encodingJobRunStatsLogger.logEncodingJobRunStats(); 231 232 return buyersWithEncodersReadyForRefresh.transformAsync( 233 b -> doUpdateEncodersForBuyers(b), mBackgroundExecutor); 234 }, 235 mLightWeightExecutor); 236 } 237 doEncodingForRegisteredBuyers( List<DBEncoderLogicMetadata> encoderLogicMetadataList, boolean extendedLoggingEnabled, EncodingJobRunStatsLogger encodingJobRunStatsLogger)238 private FluentFuture<Void> doEncodingForRegisteredBuyers( 239 List<DBEncoderLogicMetadata> encoderLogicMetadataList, 240 boolean extendedLoggingEnabled, 241 EncodingJobRunStatsLogger encodingJobRunStatsLogger) { 242 243 List<ListenableFuture<Void>> buyerEncodings = 244 encoderLogicMetadataList.stream() 245 .map( 246 metadata -> 247 pickLoggerAndRunEncodingPerBuyer( 248 metadata, 249 extendedLoggingEnabled, 250 encodingJobRunStatsLogger)) 251 .collect(Collectors.toList()); 252 encodingJobRunStatsLogger.setSizeOfFilteredBuyerEncodingList(buyerEncodings.size()); 253 return FluentFuture.from(Futures.successfulAsList(buyerEncodings)) 254 .transform(ignored -> null, mLightWeightExecutor); 255 } 256 pickLoggerAndRunEncodingPerBuyer( DBEncoderLogicMetadata metadata, boolean extendedLoggingEnabled, EncodingJobRunStatsLogger encodingJobRunStatsLogger)257 private FluentFuture<Void> pickLoggerAndRunEncodingPerBuyer( 258 DBEncoderLogicMetadata metadata, 259 boolean extendedLoggingEnabled, 260 EncodingJobRunStatsLogger encodingJobRunStatsLogger) { 261 EncodingExecutionLogHelper logHelper; 262 if (extendedLoggingEnabled) { 263 logHelper = 264 new EncodingExecutionLogHelperImpl(mAdServicesLogger, mClock, mEnrollmentDao); 265 } else { 266 logHelper = new EncodingExecutionLogHelperNoOpImpl(); 267 } 268 int timeoutSeconds = mPerBuyerEncodingTimeoutMs / 1000; 269 return runEncodingPerBuyer(metadata, timeoutSeconds, logHelper, encodingJobRunStatsLogger) 270 .catching( 271 Exception.class, 272 (e) -> { 273 handleFailedPerBuyerEncoding(metadata); 274 encodingJobRunStatsLogger.addOneSignalEncodingFailures(); 275 return null; 276 }, 277 mLightWeightExecutor); 278 } 279 280 // TODO(b/294900119) We should do the update of encoding logic in a separate job, & remove this 281 private FluentFuture<Void> doUpdateEncodersForBuyers(List<AdTechIdentifier> buyers) { 282 List<ListenableFuture<Boolean>> encoderUpdates = 283 buyers.stream() 284 .map( 285 buyer -> 286 mEncoderLogicHandler.downloadAndUpdate( 287 buyer, DevContext.createForDevOptionsDisabled())) 288 .collect(Collectors.toList()); 289 return FluentFuture.from(Futures.successfulAsList(encoderUpdates)) 290 .transform(ignored -> null, mLightWeightExecutor); 291 } 292 293 @VisibleForTesting 294 FluentFuture<Void> runEncodingPerBuyer( 295 DBEncoderLogicMetadata encoderLogicMetadata, 296 int timeout, 297 EncodingExecutionLogHelper logHelper, 298 EncodingJobRunStatsLogger encodingJobRunStatsLogger) { 299 AdTechIdentifier buyer = encoderLogicMetadata.getBuyer(); 300 Map<String, List<ProtectedSignal>> signals = mSignalsProvider.getSignals(buyer); 301 if (signals.isEmpty()) { 302 mEncoderLogicHandler.deleteEncoderForBuyer(buyer); 303 return FluentFuture.from(Futures.immediateFuture(null)); 304 } 305 306 DBSignalsUpdateMetadata signalsUpdateMetadata = 307 mProtectedSignalsDao.getSignalsUpdateMetadata(buyer); 308 DBEncodedPayload existingPayload = mEncodedPayloadDao.getEncodedPayload(buyer); 309 if (signalsUpdateMetadata != null && existingPayload != null) { 310 boolean isNoNewSignalUpdateAfterLastEncoding = 311 signalsUpdateMetadata 312 .getLastSignalsUpdatedTime() 313 .isBefore(existingPayload.getCreationTime()); 314 boolean isEncoderLogicNotUpdatedAfterLastEncoding = 315 encoderLogicMetadata 316 .getCreationTime() 317 .isBefore(existingPayload.getCreationTime()); 318 if (isNoNewSignalUpdateAfterLastEncoding && isEncoderLogicNotUpdatedAfterLastEncoding) { 319 encodingJobRunStatsLogger.addOneSignalEncodingSkips(); 320 return FluentFuture.from(Futures.immediateFuture(null)); 321 } 322 } 323 324 int failedCount = encoderLogicMetadata.getFailedEncodingCount(); 325 if (failedCount >= mEncoderLogicMaximumFailure) { 326 return FluentFuture.from(Futures.immediateFuture(null)); 327 } 328 String encodingLogic = mEncoderLogicHandler.getEncoder(buyer); 329 int version = encoderLogicMetadata.getVersion(); 330 331 logHelper.setAdtech(buyer); 332 333 return FluentFuture.from( 334 mScriptEngine.encodeSignals( 335 encodingLogic, signals, mEncodedPayLoadMaxSizeBytes, logHelper)) 336 .transform( 337 encodedPayload -> { 338 validateAndPersistPayload( 339 encoderLogicMetadata, encodedPayload, version); 340 logHelper.setStatus(JS_RUN_STATUS_SUCCESS); 341 logHelper.finish(); 342 return (Void) null; 343 }, 344 mBackgroundExecutor) 345 .catching( 346 Exception.class, 347 e -> { 348 sLogger.e( 349 e, 350 "Exception trying to validate and persist encoded payload for" 351 + " buyer: %s", 352 buyer); 353 logHelper.setStatus(JS_RUN_STATUS_OTHER_FAILURE); 354 logHelper.finish(); 355 throw new IllegalStateException(PAYLOAD_PERSISTENCE_ERROR_MSG, e); 356 }, 357 mLightWeightExecutor) 358 .withTimeout(timeout, TimeUnit.SECONDS, AdServicesExecutors.getScheduler()); 359 } 360 361 private void handleFailedPerBuyerEncoding(DBEncoderLogicMetadata logic) { 362 mEncoderLogicHandler.updateEncoderFailedCount( 363 logic.getBuyer(), logic.getFailedEncodingCount() + 1); 364 } 365 366 @VisibleForTesting 367 void validateAndPersistPayload( 368 DBEncoderLogicMetadata encoderLogicMetadata, byte[] encodedBytes, int version) { 369 AdTechIdentifier buyer = encoderLogicMetadata.getBuyer(); 370 if (encodedBytes.length > mEncodedPayLoadMaxSizeBytes) { 371 // Do not persist encoded payload if the encoding logic violates the size constraints 372 sLogger.e("Buyer:%s encoded payload exceeded max size limit", buyer); 373 throw new IllegalArgumentException("Payload size exceeds limits."); 374 } 375 376 DBEncodedPayload dbEncodedPayload = 377 DBEncodedPayload.builder() 378 .setBuyer(buyer) 379 .setCreationTime(Instant.now()) 380 .setVersion(version) 381 .setEncodedPayload(encodedBytes) 382 .build(); 383 sLogger.v("Persisting encoded payload for buyer: %s", buyer); 384 mEncodedPayloadDao.persistEncodedPayload(dbEncodedPayload); 385 if (encoderLogicMetadata.getFailedEncodingCount() > 0) { 386 mEncoderLogicHandler.updateEncoderFailedCount(buyer, 0); 387 } 388 } 389 } 390