1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.signals;
18 
19 import static com.android.adservices.service.stats.AdsRelevanceStatusUtils.JS_RUN_STATUS_OTHER_FAILURE;
20 import static com.android.adservices.service.stats.AdsRelevanceStatusUtils.JS_RUN_STATUS_SUCCESS;
21 
22 import android.adservices.common.AdTechIdentifier;
23 import android.content.Context;
24 
25 import com.android.adservices.LoggerFactory;
26 import com.android.adservices.concurrency.AdServicesExecutors;
27 import com.android.adservices.data.enrollment.EnrollmentDao;
28 import com.android.adservices.data.signals.DBEncodedPayload;
29 import com.android.adservices.data.signals.DBEncoderLogicMetadata;
30 import com.android.adservices.data.signals.DBSignalsUpdateMetadata;
31 import com.android.adservices.data.signals.EncodedPayloadDao;
32 import com.android.adservices.data.signals.EncoderLogicHandler;
33 import com.android.adservices.data.signals.EncoderLogicMetadataDao;
34 import com.android.adservices.data.signals.ProtectedSignalsDao;
35 import com.android.adservices.data.signals.ProtectedSignalsDatabase;
36 import com.android.adservices.service.DebugFlags;
37 import com.android.adservices.service.Flags;
38 import com.android.adservices.service.FlagsFactory;
39 import com.android.adservices.service.common.RetryStrategy;
40 import com.android.adservices.service.common.RetryStrategyFactory;
41 import com.android.adservices.service.common.SingletonRunner;
42 import com.android.adservices.service.devapi.DevContext;
43 import com.android.adservices.service.stats.AdServicesLogger;
44 import com.android.adservices.service.stats.AdServicesLoggerImpl;
45 import com.android.adservices.service.stats.pas.EncodingExecutionLogHelper;
46 import com.android.adservices.service.stats.pas.EncodingExecutionLogHelperImpl;
47 import com.android.adservices.service.stats.pas.EncodingExecutionLogHelperNoOpImpl;
48 import com.android.adservices.service.stats.pas.EncodingJobRunStats;
49 import com.android.adservices.service.stats.pas.EncodingJobRunStatsLogger;
50 import com.android.adservices.service.stats.pas.EncodingJobRunStatsLoggerImpl;
51 import com.android.adservices.service.stats.pas.EncodingJobRunStatsLoggerNoLoggingImpl;
52 import com.android.adservices.shared.common.ApplicationContextSingleton;
53 import com.android.adservices.shared.util.Clock;
54 
55 import com.google.common.annotations.VisibleForTesting;
56 import com.google.common.util.concurrent.FluentFuture;
57 import com.google.common.util.concurrent.Futures;
58 import com.google.common.util.concurrent.ListenableFuture;
59 import com.google.common.util.concurrent.ListeningExecutorService;
60 
61 import java.time.Instant;
62 import java.time.temporal.ChronoUnit;
63 import java.util.List;
64 import java.util.Map;
65 import java.util.concurrent.TimeUnit;
66 import java.util.function.Supplier;
67 import java.util.stream.Collectors;
68 
69 /**
70  * Handles the periodic encoding responsibilities, such as fetching the raw signals and triggering
71  * the JS engine for encoding.
72  */
73 public final class PeriodicEncodingJobWorker {
74 
75     private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger();
76 
77     public static final String JOB_DESCRIPTION = "Protected-Signals Periodic Encoding";
78 
79     public static final String PAYLOAD_PERSISTENCE_ERROR_MSG = "Failed to persist encoded payload";
80 
81     private final int mPerBuyerEncodingTimeoutMs;
82 
83     private final int mEncodedPayLoadMaxSizeBytes;
84     private final int mEncoderLogicMaximumFailure;
85 
86     private final EncoderLogicHandler mEncoderLogicHandler;
87     private final EncoderLogicMetadataDao mEncoderLogicMetadataDao;
88     private final EncodedPayloadDao mEncodedPayloadDao;
89     private final SignalsProvider mSignalsProvider;
90     private final ProtectedSignalsDao mProtectedSignalsDao;
91     private final SignalsScriptEngine mScriptEngine;
92     private final ListeningExecutorService mBackgroundExecutor;
93     private final ListeningExecutorService mLightWeightExecutor;
94     private final EnrollmentDao mEnrollmentDao;
95     private final Clock mClock;
96     private final AdServicesLogger mAdServicesLogger;
97     private final SingletonRunner<Void> mSingletonRunner =
98             new SingletonRunner<>(JOB_DESCRIPTION, this::doRun);
99 
100     private final Flags mFlags;
101 
102     @VisibleForTesting
PeriodicEncodingJobWorker( EncoderLogicHandler encoderLogicHandler, EncoderLogicMetadataDao encoderLogicMetadataDao, EncodedPayloadDao encodedPayloadDao, SignalsProviderImpl signalStorageManager, ProtectedSignalsDao protectedSignalsDao, SignalsScriptEngine scriptEngine, ListeningExecutorService backgroundExecutor, ListeningExecutorService lightWeightExecutor, Flags flags, EnrollmentDao enrollmentDao, Clock clock, AdServicesLogger adServicesLogger)103     public PeriodicEncodingJobWorker(
104             EncoderLogicHandler encoderLogicHandler,
105             EncoderLogicMetadataDao encoderLogicMetadataDao,
106             EncodedPayloadDao encodedPayloadDao,
107             SignalsProviderImpl signalStorageManager,
108             ProtectedSignalsDao protectedSignalsDao,
109             SignalsScriptEngine scriptEngine,
110             ListeningExecutorService backgroundExecutor,
111             ListeningExecutorService lightWeightExecutor,
112             Flags flags,
113             EnrollmentDao enrollmentDao,
114             Clock clock,
115             AdServicesLogger adServicesLogger) {
116         mEncoderLogicHandler = encoderLogicHandler;
117         mEncoderLogicMetadataDao = encoderLogicMetadataDao;
118         mEncodedPayloadDao = encodedPayloadDao;
119         mSignalsProvider = signalStorageManager;
120         mProtectedSignalsDao = protectedSignalsDao;
121         mScriptEngine = scriptEngine;
122         mBackgroundExecutor = backgroundExecutor;
123         mLightWeightExecutor = lightWeightExecutor;
124         mFlags = flags;
125         mEncodedPayLoadMaxSizeBytes = mFlags.getProtectedSignalsEncodedPayloadMaxSizeBytes();
126         mEncoderLogicMaximumFailure =
127                 mFlags.getProtectedSignalsMaxJsFailureExecutionOnCertainVersionBeforeStop();
128         mEnrollmentDao = enrollmentDao;
129         mClock = clock;
130         mAdServicesLogger = adServicesLogger;
131         mPerBuyerEncodingTimeoutMs = mFlags.getPasScriptExecutionTimeoutMs();
132     }
133 
134     /**
135      * @return an instance of {@link PeriodicEncodingJobWorker}
136      */
getInstance()137     public static PeriodicEncodingJobWorker getInstance() {
138         return FieldHolder.sInstance;
139     }
140 
141     // Lazy initialization holder class idiom for static fields as described in Effective Java Item
142     // 83
143     private static final class FieldHolder {
144         private static final PeriodicEncodingJobWorker sInstance = computeFieldValue();
145 
computeFieldValue()146         private static PeriodicEncodingJobWorker computeFieldValue() {
147             Context context = ApplicationContextSingleton.get();
148             ProtectedSignalsDatabase signalsDatabase = ProtectedSignalsDatabase.getInstance();
149             Flags flags = FlagsFactory.getFlags();
150             RetryStrategy retryStrategy =
151                     RetryStrategyFactory.createInstance(
152                                     flags.getAdServicesRetryStrategyEnabled(),
153                                     AdServicesExecutors.getLightWeightExecutor())
154                             .createRetryStrategy(
155                                     flags.getAdServicesJsScriptEngineMaxRetryAttempts());
156             return new PeriodicEncodingJobWorker(
157                     new EncoderLogicHandler(context),
158                     signalsDatabase.getEncoderLogicMetadataDao(),
159                     signalsDatabase.getEncodedPayloadDao(),
160                     new SignalsProviderImpl(signalsDatabase.protectedSignalsDao()),
161                     signalsDatabase.protectedSignalsDao(),
162                     new SignalsScriptEngine(
163                             flags::getEnforceIsolateMaxHeapSize,
164                             flags::getIsolateMaxHeapSizeBytes,
165                             retryStrategy,
166                             () ->
167                                     DebugFlags.getInstance()
168                                             .getAdServicesJsIsolateConsoleMessagesInLogsEnabled()),
169                     AdServicesExecutors.getBackgroundExecutor(),
170                     AdServicesExecutors.getLightWeightExecutor(),
171                     flags,
172                     EnrollmentDao.getInstance(),
173                     Clock.getInstance(),
174                     AdServicesLoggerImpl.getInstance());
175         }
176     }
177 
178     /** Initiates the encoding of raw signals */
encodeProtectedSignals()179     public FluentFuture<Void> encodeProtectedSignals() {
180         sLogger.v("Starting %s", JOB_DESCRIPTION);
181         return mSingletonRunner.runSingleInstance();
182     }
183 
184     /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */
stopWork()185     public void stopWork() {
186         mSingletonRunner.stopWork();
187     }
188 
189     /**
190      * Runs encoding for the buyers that have registered their encoding logic. Also updates the
191      * encoders for buyers that have the previous encoders downloaded outside the refresh window
192      */
doRun(Supplier<Boolean> shouldStop)193     private FluentFuture<Void> doRun(Supplier<Boolean> shouldStop) {
194         boolean pasExtendedMetricsEnabled = mFlags.getPasExtendedMetricsEnabled();
195         EncodingJobRunStatsLogger encodingJobRunStatsLogger =
196                 pasExtendedMetricsEnabled
197                         ? new EncodingJobRunStatsLoggerImpl(
198                         mAdServicesLogger, EncodingJobRunStats.builder())
199                         : new EncodingJobRunStatsLoggerNoLoggingImpl();
200 
201         FluentFuture<List<DBEncoderLogicMetadata>> buyersWithRegisteredEncoders =
202                 FluentFuture.from(
203                         mBackgroundExecutor.submit(mEncoderLogicHandler::getAllRegisteredEncoders));
204 
205         FluentFuture<Void> encodeSignalsFuture =
206                 buyersWithRegisteredEncoders.transformAsync(
207                         logicMetadata ->
208                                 doEncodingForRegisteredBuyers(
209                                         logicMetadata,
210                                         pasExtendedMetricsEnabled,
211                                         encodingJobRunStatsLogger),
212                         mBackgroundExecutor);
213 
214         // TODO(b/294900119) We should do the update of encoding logic in a separate job
215         // Once the encodings are done, we update the encoder logic asynchronously
216         final Instant timeForRefresh =
217                 Instant.now()
218                         .minus(
219                                 mFlags.getProtectedSignalsEncoderRefreshWindowSeconds(),
220                                 ChronoUnit.SECONDS);
221         return encodeSignalsFuture.transformAsync(
222                 unused -> {
223                     FluentFuture<List<AdTechIdentifier>> buyersWithEncodersReadyForRefresh =
224                             FluentFuture.from(
225                                     mBackgroundExecutor.submit(
226                                             () ->
227                                                     mEncoderLogicMetadataDao
228                                                             .getBuyersWithEncodersBeforeTime(
229                                                                     timeForRefresh)));
230                     encodingJobRunStatsLogger.logEncodingJobRunStats();
231 
232                     return buyersWithEncodersReadyForRefresh.transformAsync(
233                             b -> doUpdateEncodersForBuyers(b), mBackgroundExecutor);
234                 },
235                 mLightWeightExecutor);
236     }
237 
doEncodingForRegisteredBuyers( List<DBEncoderLogicMetadata> encoderLogicMetadataList, boolean extendedLoggingEnabled, EncodingJobRunStatsLogger encodingJobRunStatsLogger)238     private FluentFuture<Void> doEncodingForRegisteredBuyers(
239             List<DBEncoderLogicMetadata> encoderLogicMetadataList,
240             boolean extendedLoggingEnabled,
241             EncodingJobRunStatsLogger encodingJobRunStatsLogger) {
242 
243         List<ListenableFuture<Void>> buyerEncodings =
244                 encoderLogicMetadataList.stream()
245                         .map(
246                                 metadata ->
247                                         pickLoggerAndRunEncodingPerBuyer(
248                                                 metadata,
249                                                 extendedLoggingEnabled,
250                                                 encodingJobRunStatsLogger))
251                         .collect(Collectors.toList());
252         encodingJobRunStatsLogger.setSizeOfFilteredBuyerEncodingList(buyerEncodings.size());
253         return FluentFuture.from(Futures.successfulAsList(buyerEncodings))
254                 .transform(ignored -> null, mLightWeightExecutor);
255     }
256 
pickLoggerAndRunEncodingPerBuyer( DBEncoderLogicMetadata metadata, boolean extendedLoggingEnabled, EncodingJobRunStatsLogger encodingJobRunStatsLogger)257     private FluentFuture<Void> pickLoggerAndRunEncodingPerBuyer(
258             DBEncoderLogicMetadata metadata,
259             boolean extendedLoggingEnabled,
260             EncodingJobRunStatsLogger encodingJobRunStatsLogger) {
261         EncodingExecutionLogHelper logHelper;
262         if (extendedLoggingEnabled) {
263             logHelper =
264                     new EncodingExecutionLogHelperImpl(mAdServicesLogger, mClock, mEnrollmentDao);
265         } else {
266             logHelper = new EncodingExecutionLogHelperNoOpImpl();
267         }
268         int timeoutSeconds = mPerBuyerEncodingTimeoutMs / 1000;
269         return runEncodingPerBuyer(metadata, timeoutSeconds, logHelper, encodingJobRunStatsLogger)
270                 .catching(
271                         Exception.class,
272                         (e) -> {
273                             handleFailedPerBuyerEncoding(metadata);
274                             encodingJobRunStatsLogger.addOneSignalEncodingFailures();
275                             return null;
276                         },
277                         mLightWeightExecutor);
278     }
279 
280     // TODO(b/294900119) We should do the update of encoding logic in a separate job, & remove this
281     private FluentFuture<Void> doUpdateEncodersForBuyers(List<AdTechIdentifier> buyers) {
282         List<ListenableFuture<Boolean>> encoderUpdates =
283                 buyers.stream()
284                         .map(
285                                 buyer ->
286                                         mEncoderLogicHandler.downloadAndUpdate(
287                                                 buyer, DevContext.createForDevOptionsDisabled()))
288                         .collect(Collectors.toList());
289         return FluentFuture.from(Futures.successfulAsList(encoderUpdates))
290                 .transform(ignored -> null, mLightWeightExecutor);
291     }
292 
293     @VisibleForTesting
294     FluentFuture<Void> runEncodingPerBuyer(
295             DBEncoderLogicMetadata encoderLogicMetadata,
296             int timeout,
297             EncodingExecutionLogHelper logHelper,
298             EncodingJobRunStatsLogger encodingJobRunStatsLogger) {
299         AdTechIdentifier buyer = encoderLogicMetadata.getBuyer();
300         Map<String, List<ProtectedSignal>> signals = mSignalsProvider.getSignals(buyer);
301         if (signals.isEmpty()) {
302             mEncoderLogicHandler.deleteEncoderForBuyer(buyer);
303             return FluentFuture.from(Futures.immediateFuture(null));
304         }
305 
306         DBSignalsUpdateMetadata signalsUpdateMetadata =
307                 mProtectedSignalsDao.getSignalsUpdateMetadata(buyer);
308         DBEncodedPayload existingPayload = mEncodedPayloadDao.getEncodedPayload(buyer);
309         if (signalsUpdateMetadata != null && existingPayload != null) {
310             boolean isNoNewSignalUpdateAfterLastEncoding =
311                     signalsUpdateMetadata
312                             .getLastSignalsUpdatedTime()
313                             .isBefore(existingPayload.getCreationTime());
314             boolean isEncoderLogicNotUpdatedAfterLastEncoding =
315                     encoderLogicMetadata
316                             .getCreationTime()
317                             .isBefore(existingPayload.getCreationTime());
318             if (isNoNewSignalUpdateAfterLastEncoding && isEncoderLogicNotUpdatedAfterLastEncoding) {
319                 encodingJobRunStatsLogger.addOneSignalEncodingSkips();
320                 return FluentFuture.from(Futures.immediateFuture(null));
321             }
322         }
323 
324         int failedCount = encoderLogicMetadata.getFailedEncodingCount();
325         if (failedCount >= mEncoderLogicMaximumFailure) {
326             return FluentFuture.from(Futures.immediateFuture(null));
327         }
328         String encodingLogic = mEncoderLogicHandler.getEncoder(buyer);
329         int version = encoderLogicMetadata.getVersion();
330 
331         logHelper.setAdtech(buyer);
332 
333         return FluentFuture.from(
334                         mScriptEngine.encodeSignals(
335                                 encodingLogic, signals, mEncodedPayLoadMaxSizeBytes, logHelper))
336                 .transform(
337                         encodedPayload -> {
338                             validateAndPersistPayload(
339                                     encoderLogicMetadata, encodedPayload, version);
340                             logHelper.setStatus(JS_RUN_STATUS_SUCCESS);
341                             logHelper.finish();
342                             return (Void) null;
343                         },
344                         mBackgroundExecutor)
345                 .catching(
346                         Exception.class,
347                         e -> {
348                             sLogger.e(
349                                     e,
350                                     "Exception trying to validate and persist encoded payload for"
351                                             + " buyer: %s",
352                                     buyer);
353                             logHelper.setStatus(JS_RUN_STATUS_OTHER_FAILURE);
354                             logHelper.finish();
355                             throw new IllegalStateException(PAYLOAD_PERSISTENCE_ERROR_MSG, e);
356                         },
357                         mLightWeightExecutor)
358                 .withTimeout(timeout, TimeUnit.SECONDS, AdServicesExecutors.getScheduler());
359     }
360 
361     private void handleFailedPerBuyerEncoding(DBEncoderLogicMetadata logic) {
362         mEncoderLogicHandler.updateEncoderFailedCount(
363                 logic.getBuyer(), logic.getFailedEncodingCount() + 1);
364     }
365 
366     @VisibleForTesting
367     void validateAndPersistPayload(
368             DBEncoderLogicMetadata encoderLogicMetadata, byte[] encodedBytes, int version) {
369         AdTechIdentifier buyer = encoderLogicMetadata.getBuyer();
370         if (encodedBytes.length > mEncodedPayLoadMaxSizeBytes) {
371             // Do not persist encoded payload if the encoding logic violates the size constraints
372             sLogger.e("Buyer:%s encoded payload exceeded max size limit", buyer);
373             throw new IllegalArgumentException("Payload size exceeds limits.");
374         }
375 
376         DBEncodedPayload dbEncodedPayload =
377                 DBEncodedPayload.builder()
378                         .setBuyer(buyer)
379                         .setCreationTime(Instant.now())
380                         .setVersion(version)
381                         .setEncodedPayload(encodedBytes)
382                         .build();
383         sLogger.v("Persisting encoded payload for buyer: %s", buyer);
384         mEncodedPayloadDao.persistEncodedPayload(dbEncodedPayload);
385         if (encoderLogicMetadata.getFailedEncodingCount() > 0) {
386             mEncoderLogicHandler.updateEncoderFailedCount(buyer, 0);
387         }
388     }
389 }
390