1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.measurement.registration;
18 
19 import static com.android.adservices.service.measurement.registration.AsyncRegistrationQueueRunner.InsertSourcePermission.ALLOWED;
20 import static com.android.adservices.service.measurement.registration.AsyncRegistrationQueueRunner.InsertSourcePermission.ALLOWED_FIFO_SUCCESS;
21 import static com.android.adservices.service.measurement.registration.AsyncRegistrationQueueRunner.InsertSourcePermission.NOT_ALLOWED;
22 
23 import android.annotation.NonNull;
24 import android.annotation.Nullable;
25 import android.content.ContentProviderClient;
26 import android.content.ContentResolver;
27 import android.content.Context;
28 import android.net.Uri;
29 import android.os.RemoteException;
30 
31 import com.android.adservices.LoggerFactory;
32 import com.android.adservices.data.measurement.DatastoreException;
33 import com.android.adservices.data.measurement.DatastoreManager;
34 import com.android.adservices.data.measurement.DatastoreManagerFactory;
35 import com.android.adservices.data.measurement.IMeasurementDao;
36 import com.android.adservices.service.Flags;
37 import com.android.adservices.service.FlagsFactory;
38 import com.android.adservices.service.common.WebAddresses;
39 import com.android.adservices.service.measurement.Attribution;
40 import com.android.adservices.service.measurement.EventReport;
41 import com.android.adservices.service.measurement.EventSurfaceType;
42 import com.android.adservices.service.measurement.KeyValueData;
43 import com.android.adservices.service.measurement.KeyValueData.DataType;
44 import com.android.adservices.service.measurement.Source;
45 import com.android.adservices.service.measurement.Trigger;
46 import com.android.adservices.service.measurement.attribution.TriggerContentProvider;
47 import com.android.adservices.service.measurement.noising.SourceNoiseHandler;
48 import com.android.adservices.service.measurement.reporting.DebugReportApi;
49 import com.android.adservices.service.measurement.util.Applications;
50 import com.android.adservices.service.measurement.util.BaseUriExtractor;
51 import com.android.adservices.service.measurement.util.UnsignedLong;
52 import com.android.adservices.service.stats.AdServicesLogger;
53 import com.android.adservices.service.stats.AdServicesLoggerImpl;
54 import com.android.internal.annotations.VisibleForTesting;
55 
56 import java.util.HashSet;
57 import java.util.List;
58 import java.util.Map;
59 import java.util.Objects;
60 import java.util.Optional;
61 import java.util.Set;
62 import java.util.UUID;
63 import java.util.stream.Collectors;
64 
65 /** Runner for servicing queued registration requests */
66 public class AsyncRegistrationQueueRunner {
67     /**
68      * Single attribution entry is created for possibly multiple fake reports generated per source.
69      * Setting a value to such attributions will help identify them that they are associated to fake
70      * reports.
71      */
72     @VisibleForTesting static final String ATTRIBUTION_FAKE_REPORT_ID = "-1";
73 
74     private static AsyncRegistrationQueueRunner sAsyncRegistrationQueueRunner;
75     private final DatastoreManager mDatastoreManager;
76     private final AsyncSourceFetcher mAsyncSourceFetcher;
77     private final AsyncTriggerFetcher mAsyncTriggerFetcher;
78     private final ContentResolver mContentResolver;
79     private final DebugReportApi mDebugReportApi;
80     private final SourceNoiseHandler mSourceNoiseHandler;
81     private final Flags mFlags;
82     private final AdServicesLogger mLogger;
83     private final Context mContext;
84 
AsyncRegistrationQueueRunner(Context context)85     private AsyncRegistrationQueueRunner(Context context) {
86         mContext = context;
87         mDatastoreManager = DatastoreManagerFactory.getDatastoreManager(context);
88         mAsyncSourceFetcher = new AsyncSourceFetcher(context);
89         mAsyncTriggerFetcher = new AsyncTriggerFetcher(context);
90         mContentResolver = context.getContentResolver();
91         mFlags = FlagsFactory.getFlags();
92         mDebugReportApi = new DebugReportApi(context, mFlags);
93         mSourceNoiseHandler = new SourceNoiseHandler(mFlags);
94         mLogger = AdServicesLoggerImpl.getInstance();
95     }
96 
97     @VisibleForTesting
AsyncRegistrationQueueRunner( Context context, ContentResolver contentResolver, AsyncSourceFetcher asyncSourceFetcher, AsyncTriggerFetcher asyncTriggerFetcher, DatastoreManager datastoreManager, DebugReportApi debugReportApi, SourceNoiseHandler sourceNoiseHandler, Flags flags)98     public AsyncRegistrationQueueRunner(
99             Context context,
100             ContentResolver contentResolver,
101             AsyncSourceFetcher asyncSourceFetcher,
102             AsyncTriggerFetcher asyncTriggerFetcher,
103             DatastoreManager datastoreManager,
104             DebugReportApi debugReportApi,
105             SourceNoiseHandler sourceNoiseHandler,
106             Flags flags) {
107         this(
108                 context,
109                 contentResolver,
110                 asyncSourceFetcher,
111                 asyncTriggerFetcher,
112                 datastoreManager,
113                 debugReportApi,
114                 sourceNoiseHandler,
115                 flags,
116                 AdServicesLoggerImpl.getInstance());
117     }
118 
119     @VisibleForTesting
AsyncRegistrationQueueRunner( Context context, ContentResolver contentResolver, AsyncSourceFetcher asyncSourceFetcher, AsyncTriggerFetcher asyncTriggerFetcher, DatastoreManager datastoreManager, DebugReportApi debugReportApi, SourceNoiseHandler sourceNoiseHandler, Flags flags, AdServicesLogger logger)120     public AsyncRegistrationQueueRunner(
121             Context context,
122             ContentResolver contentResolver,
123             AsyncSourceFetcher asyncSourceFetcher,
124             AsyncTriggerFetcher asyncTriggerFetcher,
125             DatastoreManager datastoreManager,
126             DebugReportApi debugReportApi,
127             SourceNoiseHandler sourceNoiseHandler,
128             Flags flags,
129             AdServicesLogger logger) {
130         mContext = context;
131         mAsyncSourceFetcher = asyncSourceFetcher;
132         mAsyncTriggerFetcher = asyncTriggerFetcher;
133         mDatastoreManager = datastoreManager;
134         mContentResolver = contentResolver;
135         mDebugReportApi = debugReportApi;
136         mSourceNoiseHandler = sourceNoiseHandler;
137         mFlags = flags;
138         mLogger = logger;
139     }
140 
141     enum ProcessingResult {
142         THREAD_INTERRUPTED,
143         SUCCESS_WITH_PENDING_RECORDS,
144         SUCCESS_ALL_RECORDS_PROCESSED
145     }
146 
147     enum InsertSourcePermission {
148         NOT_ALLOWED(false),
149         ALLOWED(true),
150         ALLOWED_FIFO_SUCCESS(true);
151 
152         private final boolean mIsAllowed;
153 
InsertSourcePermission(boolean isAllowed)154         InsertSourcePermission(boolean isAllowed) {
155             mIsAllowed = isAllowed;
156         }
157 
isAllowed()158         public boolean isAllowed() {
159             return mIsAllowed;
160         }
161     }
162 
163     /**
164      * Returns an instance of AsyncRegistrationQueueRunner.
165      *
166      * @param context the current {@link Context}.
167      */
getInstance(Context context)168     public static synchronized AsyncRegistrationQueueRunner getInstance(Context context) {
169         Objects.requireNonNull(context);
170         if (sAsyncRegistrationQueueRunner == null) {
171             sAsyncRegistrationQueueRunner = new AsyncRegistrationQueueRunner(context);
172         }
173         return sAsyncRegistrationQueueRunner;
174     }
175 
176     /** Processes records in the AsyncRegistration Queue table. */
runAsyncRegistrationQueueWorker()177     public ProcessingResult runAsyncRegistrationQueueWorker() {
178         int recordServiceLimit = mFlags.getMeasurementMaxRegistrationsPerJobInvocation();
179         int retryLimit = mFlags.getMeasurementMaxRetriesPerRegistrationRequest();
180 
181         Set<Uri> failedOrigins = new HashSet<>();
182         for (int i = 0; i < recordServiceLimit; i++) {
183             // If the job service's requirements specified at runtime are no longer met, the job
184             // service will interrupt this thread.  If the thread has been interrupted, it will exit
185             // early.
186             if (Thread.currentThread().isInterrupted()) {
187                 LoggerFactory.getMeasurementLogger()
188                         .d(
189                                 "AsyncRegistrationQueueRunner runAsyncRegistrationQueueWorker "
190                                         + "thread interrupted, exiting early.");
191                 return ProcessingResult.THREAD_INTERRUPTED;
192             }
193 
194             AsyncRegistration asyncRegistration = fetchNext(retryLimit, failedOrigins);
195             if (null == asyncRegistration) {
196                 LoggerFactory.getMeasurementLogger()
197                         .d("AsyncRegistrationQueueRunner: no async registration fetched.");
198                 return ProcessingResult.SUCCESS_ALL_RECORDS_PROCESSED;
199             }
200 
201             processAsyncRecord(asyncRegistration, failedOrigins);
202         }
203 
204         return hasPendingRecords(retryLimit, failedOrigins);
205     }
206 
fetchNext(int retryLimit, Set<Uri> failedOrigins)207     private AsyncRegistration fetchNext(int retryLimit, Set<Uri> failedOrigins) {
208         return mDatastoreManager
209                 .runInTransactionWithResult(
210                         (dao) -> dao.fetchNextQueuedAsyncRegistration(retryLimit, failedOrigins))
211                 .orElse(null);
212     }
213 
processAsyncRecord(AsyncRegistration asyncRegistration, Set<Uri> failedOrigins)214     private void processAsyncRecord(AsyncRegistration asyncRegistration, Set<Uri> failedOrigins) {
215         if (asyncRegistration.isSourceRequest()) {
216             LoggerFactory.getMeasurementLogger()
217                     .d("AsyncRegistrationQueueRunner:" + " processing source");
218             processSourceRegistration(asyncRegistration, failedOrigins);
219         } else {
220             LoggerFactory.getMeasurementLogger()
221                     .d("AsyncRegistrationQueueRunner:" + " processing trigger");
222             processTriggerRegistration(asyncRegistration, failedOrigins);
223         }
224     }
225 
hasPendingRecords(int retryLimit, Set<Uri> failedOrigins)226     private ProcessingResult hasPendingRecords(int retryLimit, Set<Uri> failedOrigins) {
227         AsyncRegistration asyncRegistration = fetchNext(retryLimit, failedOrigins);
228         if (null == asyncRegistration) {
229             LoggerFactory.getMeasurementLogger()
230                     .d("AsyncRegistrationQueueRunner: no more pending async records.");
231             return ProcessingResult.SUCCESS_ALL_RECORDS_PROCESSED;
232         } else {
233             return ProcessingResult.SUCCESS_WITH_PENDING_RECORDS;
234         }
235     }
236 
isNavigationOriginAlreadyRegisteredForRegistration( @onNull Source source, IMeasurementDao dao, Flags flags)237     private static boolean isNavigationOriginAlreadyRegisteredForRegistration(
238             @NonNull Source source, IMeasurementDao dao, Flags flags) throws DatastoreException {
239         if (!flags.getMeasurementEnableNavigationReportingOriginCheck()
240                 || source.getSourceType() != Source.SourceType.NAVIGATION) {
241             return false;
242         }
243         return dao.countNavigationSourcesPerReportingOrigin(
244                         source.getRegistrationOrigin(), source.getRegistrationId())
245                 > 0;
246     }
247 
processSourceRegistration( AsyncRegistration asyncRegistration, Set<Uri> failedOrigins)248     private void processSourceRegistration(
249             AsyncRegistration asyncRegistration, Set<Uri> failedOrigins) {
250         AsyncFetchStatus asyncFetchStatus = new AsyncFetchStatus();
251         AsyncRedirects asyncRedirects = new AsyncRedirects();
252         long startTime = asyncRegistration.getRequestTime();
253         Optional<Source> resultSource =
254                 mAsyncSourceFetcher.fetchSource(
255                         asyncRegistration, asyncFetchStatus, asyncRedirects);
256         long endTime = System.currentTimeMillis();
257         asyncFetchStatus.setRegistrationDelay(endTime - startTime);
258 
259         boolean transactionResult =
260                 mDatastoreManager.runInTransaction(
261                         (dao) -> {
262                             if (asyncFetchStatus.isRequestSuccess()) {
263                                 if (resultSource.isPresent()) {
264                                     storeSource(resultSource.get(), asyncRegistration, dao);
265                                 }
266                                 handleSuccess(
267                                         asyncRegistration, asyncFetchStatus, asyncRedirects, dao);
268                             } else {
269                                 handleFailure(
270                                         asyncRegistration, asyncFetchStatus, failedOrigins, dao);
271                             }
272                         });
273 
274         if (!transactionResult) {
275             asyncFetchStatus.setEntityStatus(AsyncFetchStatus.EntityStatus.STORAGE_ERROR);
276         }
277 
278         asyncFetchStatus.setRetryCount(Long.valueOf(asyncRegistration.getRetryCount()).intValue());
279         FetcherUtil.emitHeaderMetrics(
280                 mFlags.getMaxResponseBasedRegistrationPayloadSizeBytes(),
281                 mLogger,
282                 asyncRegistration,
283                 asyncFetchStatus);
284     }
285 
286     /** Visible only for testing. */
287     @VisibleForTesting
storeSource( Source source, AsyncRegistration asyncRegistration, IMeasurementDao dao)288     public void storeSource(
289             Source source, AsyncRegistration asyncRegistration, IMeasurementDao dao)
290             throws DatastoreException {
291         Uri topOrigin =
292                 asyncRegistration.getType() == AsyncRegistration.RegistrationType.WEB_SOURCE
293                         ? asyncRegistration.getTopOrigin()
294                         : getPublisher(asyncRegistration);
295         @EventSurfaceType
296         int publisherType =
297                 asyncRegistration.getType() == AsyncRegistration.RegistrationType.WEB_SOURCE
298                         ? EventSurfaceType.WEB
299                         : EventSurfaceType.APP;
300         // TODO(b/336403550) : Refactor isSourceAllowedToInsert out of this class
301         InsertSourcePermission sourceAllowedToInsert =
302                 isSourceAllowedToInsert(source, topOrigin, publisherType, dao, mDebugReportApi);
303         if (sourceAllowedToInsert.isAllowed()) {
304             // If preinstall check is enabled and any app destinations are already installed,
305             // mark the source for deletion. Note the source is persisted so that the fake event
306             // report generated can be cleaned up after the source is deleted by
307             // DeleteExpiredJobService.
308             if (mFlags.getMeasurementEnablePreinstallCheck()
309                     && source.shouldDropSourceIfInstalled()
310                     && Applications.anyAppsInstalled(mContext, source.getAppDestinations())) {
311                 source.setStatus(Source.Status.MARKED_TO_DELETE);
312             }
313             Map<String, String> additionalDebugReportParams = null;
314             if (mFlags.getMeasurementEnableDestinationXPublisherXEnrollmentFifo()
315                     && ALLOWED_FIFO_SUCCESS.equals(sourceAllowedToInsert)) {
316                 int limit = mFlags.getMeasurementMaxDistinctDestinationsInActiveSource();
317                 additionalDebugReportParams =
318                         Map.of(DebugReportApi.Body.SOURCE_DESTINATION_LIMIT, String.valueOf(limit));
319             }
320             insertSourceFromTransaction(source, dao, additionalDebugReportParams);
321             mDebugReportApi.scheduleSourceSuccessDebugReport(
322                     source, dao, additionalDebugReportParams);
323         }
324     }
325 
processTriggerRegistration( AsyncRegistration asyncRegistration, Set<Uri> failedOrigins)326     private void processTriggerRegistration(
327             AsyncRegistration asyncRegistration, Set<Uri> failedOrigins) {
328         AsyncFetchStatus asyncFetchStatus = new AsyncFetchStatus();
329         AsyncRedirects asyncRedirects = new AsyncRedirects();
330         long startTime = asyncRegistration.getRequestTime();
331         Optional<Trigger> resultTrigger =
332                 mAsyncTriggerFetcher.fetchTrigger(
333                         asyncRegistration, asyncFetchStatus, asyncRedirects);
334         long endTime = System.currentTimeMillis();
335         asyncFetchStatus.setRegistrationDelay(endTime - startTime);
336 
337         boolean transactionResult =
338                 mDatastoreManager.runInTransaction(
339                         (dao) -> {
340                             if (asyncFetchStatus.isRequestSuccess()) {
341                                 if (resultTrigger.isPresent()) {
342                                     storeTrigger(resultTrigger.get(), dao);
343                                 }
344                                 handleSuccess(
345                                         asyncRegistration, asyncFetchStatus, asyncRedirects, dao);
346                             } else {
347                                 handleFailure(
348                                         asyncRegistration, asyncFetchStatus, failedOrigins, dao);
349                             }
350                         });
351 
352         if (!transactionResult) {
353             asyncFetchStatus.setEntityStatus(AsyncFetchStatus.EntityStatus.STORAGE_ERROR);
354         }
355 
356         asyncFetchStatus.setRetryCount(Long.valueOf(asyncRegistration.getRetryCount()).intValue());
357         long headerSizeLimitBytes =
358                 mFlags.getMeasurementEnableUpdateTriggerHeaderLimit()
359                         ? mFlags.getMaxTriggerRegistrationHeaderSizeBytes()
360                         : mFlags.getMaxResponseBasedRegistrationPayloadSizeBytes();
361         FetcherUtil.emitHeaderMetrics(
362                 headerSizeLimitBytes, mLogger, asyncRegistration, asyncFetchStatus);
363     }
364 
365     /** Visible only for testing. */
366     @VisibleForTesting
storeTrigger(Trigger trigger, IMeasurementDao dao)367     public void storeTrigger(Trigger trigger, IMeasurementDao dao) throws DatastoreException {
368         if (isTriggerAllowedToInsert(dao, trigger)) {
369             try {
370                 dao.insertTrigger(trigger);
371             } catch (DatastoreException e) {
372                 mDebugReportApi.scheduleTriggerNoMatchingSourceDebugReport(
373                         trigger, dao, DebugReportApi.Type.TRIGGER_UNKNOWN_ERROR);
374                 LoggerFactory.getMeasurementLogger()
375                         .e(e, "Insert trigger to DB error, generate trigger-unknown-error report");
376                 throw new DatastoreException(
377                         "Insert trigger to DB error, generate trigger-unknown-error report");
378             }
379             notifyTriggerContentProvider();
380         }
381     }
382 
383     /** Visible only for testing. */
384     @VisibleForTesting
isSourceAllowedToInsert( Source source, Uri topOrigin, @EventSurfaceType int publisherType, IMeasurementDao dao, DebugReportApi debugReportApi)385     InsertSourcePermission isSourceAllowedToInsert(
386             Source source,
387             Uri topOrigin,
388             @EventSurfaceType int publisherType,
389             IMeasurementDao dao,
390             DebugReportApi debugReportApi)
391             throws DatastoreException {
392         Flags flags = FlagsFactory.getFlags();
393         // Do not persist the navigation source if the same reporting origin has been registered
394         // for the registration.
395         if (isNavigationOriginAlreadyRegisteredForRegistration(source, dao, flags)) {
396             return NOT_ALLOWED;
397         }
398         long windowStartTime =
399                 source.getEventTime() - flags.getMeasurementRateLimitWindowMilliseconds();
400         Optional<Uri> publisher = getTopLevelPublisher(topOrigin, publisherType);
401         if (publisher.isEmpty()) {
402             LoggerFactory.getMeasurementLogger()
403                     .d("insertSources: getTopLevelPublisher failed, topOrigin: %s", topOrigin);
404             return NOT_ALLOWED;
405         }
406         if (flags.getMeasurementEnableDestinationRateLimit()) {
407             if (source.getAppDestinations() != null
408                     && !sourceIsWithinTimeBasedDestinationLimits(
409                             debugReportApi,
410                             source,
411                             publisher.get(),
412                             publisherType,
413                             source.getEnrollmentId(),
414                             source.getAppDestinations(),
415                             EventSurfaceType.APP,
416                             source.getEventTime(),
417                             dao)) {
418                 return NOT_ALLOWED;
419             }
420 
421             if (source.getWebDestinations() != null
422                     && !sourceIsWithinTimeBasedDestinationLimits(
423                             debugReportApi,
424                             source,
425                             publisher.get(),
426                             publisherType,
427                             source.getEnrollmentId(),
428                             source.getWebDestinations(),
429                             EventSurfaceType.WEB,
430                             source.getEventTime(),
431                             dao)) {
432                 return NOT_ALLOWED;
433             }
434         }
435         long numOfSourcesPerPublisher =
436                 dao.getNumSourcesPerPublisher(
437                         BaseUriExtractor.getBaseUri(topOrigin), publisherType);
438         if (numOfSourcesPerPublisher >= flags.getMeasurementMaxSourcesPerPublisher()) {
439             LoggerFactory.getMeasurementLogger().d(
440                     "insertSources: Max limit of %s sources for publisher - %s reached.",
441                     flags.getMeasurementMaxSourcesPerPublisher(), publisher);
442             debugReportApi.scheduleSourceStorageLimitDebugReport(
443                     source, String.valueOf(numOfSourcesPerPublisher), dao);
444             return NOT_ALLOWED;
445         }
446         if (source.getAppDestinations() != null
447                 && !isDestinationWithinBounds(
448                         debugReportApi,
449                         source,
450                         publisher.get(),
451                         publisherType,
452                         source.getEnrollmentId(),
453                         source.getAppDestinations(),
454                         EventSurfaceType.APP,
455                         windowStartTime,
456                         source.getEventTime(),
457                         dao)) {
458             return NOT_ALLOWED;
459         }
460 
461         if (source.getWebDestinations() != null
462                 && !isDestinationWithinBounds(
463                         debugReportApi,
464                         source,
465                         publisher.get(),
466                         publisherType,
467                         source.getEnrollmentId(),
468                         source.getWebDestinations(),
469                         EventSurfaceType.WEB,
470                         windowStartTime,
471                         source.getEventTime(),
472                         dao)) {
473             return NOT_ALLOWED;
474         }
475         int numOfOriginExcludingRegistrationOrigin =
476                 dao.countSourcesPerPublisherXEnrollmentExcludingRegOrigin(
477                         source.getRegistrationOrigin(),
478                         publisher.get(),
479                         publisherType,
480                         source.getEnrollmentId(),
481                         source.getEventTime(),
482                         flags.getMeasurementMinReportingOriginUpdateWindow());
483         if (numOfOriginExcludingRegistrationOrigin
484                 >= flags.getMeasurementMaxReportingOriginsPerSourceReportingSitePerWindow()) {
485             debugReportApi.scheduleSourceSuccessDebugReport(source, dao, null);
486             LoggerFactory.getMeasurementLogger()
487                     .d(
488                             "insertSources: Max limit of 1 reporting origin for publisher - %s and"
489                                     + " enrollment - %s reached.",
490                             publisher, source.getEnrollmentId());
491             return NOT_ALLOWED;
492         }
493         try {
494             if (!source.validateAndSetNumReportStates(flags)
495                     || !source.validateAndSetMaxEventStates(flags)
496                     || !source.hasValidInformationGain(flags)) {
497                 debugReportApi.scheduleSourceFlexibleEventReportApiDebugReport(source, dao);
498                 return NOT_ALLOWED;
499             }
500         } catch (ArithmeticException e) {
501             LoggerFactory.getMeasurementLogger()
502                     .e(e, "Calculating the number of report states overflowed.");
503             debugReportApi.scheduleSourceFlexibleEventReportApiDebugReport(source, dao);
504             return NOT_ALLOWED;
505         }
506 
507         if (flags.getMeasurementEnableDestinationXPublisherXEnrollmentFifo()) {
508             InsertSourcePermission appDestSourceAllowedToInsert =
509                     maybeDeleteSourcesForLruDestination(
510                             source,
511                             publisherType,
512                             dao,
513                             flags,
514                             publisher.get(),
515                             EventSurfaceType.APP,
516                             source.getAppDestinations());
517             InsertSourcePermission webDestSourceAllowedToInsert =
518                     maybeDeleteSourcesForLruDestination(
519                             source,
520                             publisherType,
521                             dao,
522                             flags,
523                             publisher.get(),
524                             EventSurfaceType.WEB,
525                             source.getWebDestinations());
526             // NOT_ALLOWED is not an expected response from maybeDeleteSourcesForLruDestination, so
527             // we are not handling that.
528             if (appDestSourceAllowedToInsert == ALLOWED_FIFO_SUCCESS
529                     || webDestSourceAllowedToInsert == ALLOWED_FIFO_SUCCESS) {
530                 // TODO(b/332647639): Handle debug success report for this case.
531                 return ALLOWED_FIFO_SUCCESS;
532             }
533         }
534         return ALLOWED;
535     }
536 
maybeDeleteSourcesForLruDestination( Source source, @EventSurfaceType int publisherType, IMeasurementDao dao, Flags flags, Uri publisher, @EventSurfaceType int destinationType, List<Uri> destinations)537     private static InsertSourcePermission maybeDeleteSourcesForLruDestination(
538             Source source,
539             @EventSurfaceType int publisherType,
540             IMeasurementDao dao,
541             Flags flags,
542             Uri publisher,
543             @EventSurfaceType int destinationType,
544             List<Uri> destinations)
545             throws DatastoreException {
546         if (destinations == null || destinations.isEmpty()) {
547             return InsertSourcePermission.ALLOWED;
548         }
549         int fifoLimit = flags.getMeasurementMaxDistinctDestinationsInActiveSource();
550         if (destinations.size() > fifoLimit) {
551             // This is an unexpected scenario, i.e. flags are configured incorrectly.
552             throw new IllegalStateException(
553                     "Incoming destinations: " + destinations.size() + "; FIFO limit:" + fifoLimit);
554         }
555         int distinctDestinations =
556                 dao.countDistinctDestinationsPerPubXEnrollmentInUnexpiredSource(
557                         publisher,
558                         publisherType,
559                         source.getEnrollmentId(),
560                         destinations,
561                         destinationType,
562                         source.getEventTime());
563         if (distinctDestinations + destinations.size() <= fifoLimit) {
564             return InsertSourcePermission
565                     .ALLOWED; // Source is allowed to be inserted without any deletion
566         }
567 
568         // Delete sources associated to the oldest destination per enrollment per publisher.
569         // The new source may have multiple app and web destination, because of which we might
570         // need to delete multiple oldest destinations - in FIFO manner, i.e. in a loop.
571         // Although it should not be more than 4 iterations because the new source can have
572         // at max 1 app destination and 3 web destinations (configurable).
573         while (distinctDestinations + destinations.size() > fifoLimit) {
574             // Delete sources for the oldest destination
575             List<String> sourceIdsToDelete =
576                     dao.fetchSourceIdsForLruDestinationXEnrollmentXPublisher(
577                             publisher,
578                             publisherType,
579                             source.getEnrollmentId(),
580                             destinations,
581                             destinationType,
582                             source.getEventTime());
583             if (sourceIdsToDelete.isEmpty()) {
584                 // If destination limit exceeds, the oldest destination deletion should be
585                 // successful. This is an unexpected state.
586                 throw new IllegalStateException(
587                         "No sources were deleted; incoming destinations: "
588                                 + destinations.size()
589                                 + "; FIFO limit:"
590                                 + fifoLimit);
591             }
592             dao.updateSourceStatus(sourceIdsToDelete, Source.Status.MARKED_TO_DELETE);
593             LoggerFactory.getMeasurementLogger()
594                     .d(
595                             "Deleted "
596                                     + sourceIdsToDelete.size()
597                                     + " sources to insert the new source.");
598             if (flags.getMeasurementEnableFifoDestinationsDeleteAggregateReports()) {
599                 dao.deletePendingAggregateReportsAndAttributionsForSources(sourceIdsToDelete);
600                 LoggerFactory.getMeasurementLogger()
601                         .d(
602                                 "Deleted pending aggregate reports of"
603                                         + sourceIdsToDelete.size()
604                                         + " sources to insert the new source.");
605             }
606             distinctDestinations =
607                     dao.countDistinctDestinationsPerPubXEnrollmentInUnexpiredSource(
608                             publisher,
609                             publisherType,
610                             source.getEnrollmentId(),
611                             destinations,
612                             destinationType,
613                             source.getEventTime());
614         }
615         return ALLOWED_FIFO_SUCCESS;
616     }
617 
sourceIsWithinTimeBasedDestinationLimits( DebugReportApi debugReportApi, Source source, Uri publisher, @EventSurfaceType int publisherType, String enrollmentId, List<Uri> destinations, @EventSurfaceType int destinationType, long requestTime, IMeasurementDao dao)618     private static boolean sourceIsWithinTimeBasedDestinationLimits(
619             DebugReportApi debugReportApi,
620             Source source,
621             Uri publisher,
622             @EventSurfaceType int publisherType,
623             String enrollmentId,
624             List<Uri> destinations,
625             @EventSurfaceType int destinationType,
626             long requestTime,
627             IMeasurementDao dao)
628             throws DatastoreException {
629         long windowStartTime = source.getEventTime()
630                 - FlagsFactory.getFlags().getMeasurementDestinationRateLimitWindow();
631         int destinationReportingCount =
632                 dao.countDistinctDestPerPubXEnrollmentInUnexpiredSourceInWindow(
633                         publisher,
634                         publisherType,
635                         enrollmentId,
636                         destinations,
637                         destinationType,
638                         windowStartTime,
639                         requestTime);
640         // Same reporting-site destination limit
641         int maxDistinctReportingDestinations =
642                 FlagsFactory.getFlags()
643                         .getMeasurementMaxDestPerPublisherXEnrollmentPerRateLimitWindow();
644         boolean hitSameReportingRateLimit =
645                 destinationReportingCount + destinations.size() > maxDistinctReportingDestinations;
646         if (hitSameReportingRateLimit) {
647             LoggerFactory.getMeasurementLogger().d(
648                     "AsyncRegistrationQueueRunner: "
649                             + (destinationType == EventSurfaceType.APP ? "App" : "Web")
650                             + " MaxDestPerPublisherXEnrollmentPerRateLimitWindow exceeded");
651         }
652 
653         // TODO(b/336628903): Move this check at the end of other checks to not leak cross site data
654         // Global destination limit
655         int destinationCount =
656                 dao.countDistinctDestinationsPerPublisherPerRateLimitWindow(
657                         publisher,
658                         publisherType,
659                         destinations,
660                         destinationType,
661                         windowStartTime,
662                         requestTime);
663         int maxDistinctDestinations =
664                 FlagsFactory.getFlags()
665                         .getMeasurementMaxDestinationsPerPublisherPerRateLimitWindow();
666         boolean hitRateLimit = destinationCount + destinations.size() > maxDistinctDestinations;
667         if (hitRateLimit) {
668             LoggerFactory.getMeasurementLogger()
669                     .d(
670                             "AsyncRegistrationQueueRunner: "
671                                     + (destinationType == EventSurfaceType.APP ? "App" : "Web")
672                                     + " MaxDestinationsPerPublisherPerRateLimitWindow exceeded");
673         }
674 
675         if (hitSameReportingRateLimit) {
676             debugReportApi.scheduleSourceDestinationRateLimitDebugReport(
677                     source, String.valueOf(maxDistinctReportingDestinations), dao);
678             return false;
679         } else if (hitRateLimit) {
680             debugReportApi.scheduleSourceSuccessDebugReport(source, dao, null);
681             return false;
682         }
683         return true;
684     }
685 
isDestinationWithinBounds( DebugReportApi debugReportApi, Source source, Uri publisher, @EventSurfaceType int publisherType, String enrollmentId, List<Uri> destinations, @EventSurfaceType int destinationType, long windowStartTime, long requestTime, IMeasurementDao dao)686     private static boolean isDestinationWithinBounds(
687             DebugReportApi debugReportApi,
688             Source source,
689             Uri publisher,
690             @EventSurfaceType int publisherType,
691             String enrollmentId,
692             List<Uri> destinations,
693             @EventSurfaceType int destinationType,
694             long windowStartTime,
695             long requestTime,
696             IMeasurementDao dao)
697             throws DatastoreException {
698         Flags flags = FlagsFactory.getFlags();
699 
700         // If FIFO is enabled, we push the least recently used destination out for the enrollment
701         // and publisher combination by deleting the sources.
702         if (!flags.getMeasurementEnableDestinationXPublisherXEnrollmentFifo()) {
703             int destinationCount;
704             if (flags.getMeasurementEnableDestinationRateLimit()) {
705                 destinationCount =
706                         dao.countDistinctDestinationsPerPubXEnrollmentInUnexpiredSource(
707                                 publisher,
708                                 publisherType,
709                                 enrollmentId,
710                                 destinations,
711                                 destinationType,
712                                 requestTime);
713             } else {
714                 destinationCount =
715                         dao.countDistinctDestPerPubXEnrollmentInUnexpiredSourceInWindow(
716                                 publisher,
717                                 publisherType,
718                                 enrollmentId,
719                                 destinations,
720                                 destinationType,
721                                 windowStartTime,
722                                 requestTime);
723             }
724             int maxDistinctDestinations =
725                     flags.getMeasurementMaxDistinctDestinationsInActiveSource();
726             if (destinationCount + destinations.size() > maxDistinctDestinations) {
727                 LoggerFactory.getMeasurementLogger()
728                         .d(
729                                 "AsyncRegistrationQueueRunner: "
730                                         + (destinationType == EventSurfaceType.APP ? "App" : "Web")
731                                         + " destination count >= MaxDistinctDestinations"
732                                         + "PerPublisherXEnrollmentInActiveSource");
733                 debugReportApi.scheduleSourceDestinationLimitDebugReport(
734                         source, String.valueOf(maxDistinctDestinations), dao);
735                 return false;
736             }
737         }
738 
739         int distinctReportingOriginCount =
740                 dao.countDistinctReportingOriginsPerPublisherXDestinationInSource(
741                         publisher,
742                         publisherType,
743                         destinations,
744                         source.getRegistrationOrigin(),
745                         windowStartTime,
746                         requestTime);
747         if (distinctReportingOriginCount
748                 >= flags.getMeasurementMaxDistinctRepOrigPerPublXDestInSource()) {
749             debugReportApi.scheduleSourceSuccessDebugReport(source, dao, null);
750             LoggerFactory.getMeasurementLogger()
751                     .d(
752                             "AsyncRegistrationQueueRunner: "
753                                     + (destinationType == EventSurfaceType.APP ? "App" : "Web")
754                                     + " distinct reporting origin count >= "
755                                     + "MaxDistinctRepOrigPerPublisherXDestInSource exceeded");
756             return false;
757         }
758         return true;
759     }
760 
761     @VisibleForTesting
isTriggerAllowedToInsert(IMeasurementDao dao, Trigger trigger)762     static boolean isTriggerAllowedToInsert(IMeasurementDao dao, Trigger trigger) {
763         long triggerInsertedPerDestination;
764         try {
765             triggerInsertedPerDestination =
766                     dao.getNumTriggersPerDestination(
767                             trigger.getAttributionDestination(), trigger.getDestinationType());
768         } catch (DatastoreException e) {
769             LoggerFactory.getMeasurementLogger()
770                     .e("Unable to fetch number of triggers currently registered per destination.");
771             return false;
772         }
773         return triggerInsertedPerDestination
774                 < FlagsFactory.getFlags().getMeasurementMaxTriggersPerDestination();
775     }
776 
createAsyncRegistrationFromRedirect( AsyncRegistration asyncRegistration, AsyncRedirect asyncRedirect)777     private AsyncRegistration createAsyncRegistrationFromRedirect(
778             AsyncRegistration asyncRegistration, AsyncRedirect asyncRedirect) {
779         return new AsyncRegistration.Builder()
780                 .setId(UUID.randomUUID().toString())
781                 .setRegistrationUri(asyncRedirect.getUri())
782                 .setWebDestination(asyncRegistration.getWebDestination())
783                 .setOsDestination(asyncRegistration.getOsDestination())
784                 .setRegistrant(asyncRegistration.getRegistrant())
785                 .setVerifiedDestination(asyncRegistration.getVerifiedDestination())
786                 .setTopOrigin(asyncRegistration.getTopOrigin())
787                 .setType(asyncRegistration.getType())
788                 .setSourceType(asyncRegistration.getSourceType())
789                 .setRequestTime(asyncRegistration.getRequestTime())
790                 .setRetryCount(0)
791                 .setDebugKeyAllowed(asyncRegistration.getDebugKeyAllowed())
792                 .setAdIdPermission(asyncRegistration.hasAdIdPermission())
793                 .setRegistrationId(asyncRegistration.getRegistrationId())
794                 .setRedirectBehavior(asyncRedirect.getRedirectBehavior())
795                 .build();
796     }
797 
generateFakeEventReports( String sourceId, Source source, List<Source.FakeReport> fakeReports)798     private List<EventReport> generateFakeEventReports(
799             String sourceId, Source source, List<Source.FakeReport> fakeReports) {
800         return fakeReports.stream()
801                 .map(
802                         fakeReport ->
803                                 new EventReport.Builder()
804                                         .setId(UUID.randomUUID().toString())
805                                         .setSourceId(sourceId)
806                                         .setSourceEventId(source.getEventId())
807                                         .setReportTime(fakeReport.getReportingTime())
808                                         .setTriggerData(fakeReport.getTriggerData())
809                                         .setAttributionDestinations(fakeReport.getDestinations())
810                                         .setEnrollmentId(source.getEnrollmentId())
811                                         .setTriggerTime(fakeReport.getTriggerTime())
812                                         .setTriggerPriority(0L)
813                                         .setTriggerDedupKey(null)
814                                         .setSourceType(source.getSourceType())
815                                         .setStatus(EventReport.Status.PENDING)
816                                         .setRandomizedTriggerRate(
817                                                 mSourceNoiseHandler.getRandomizedTriggerRate(
818                                                         source))
819                                         .setRegistrationOrigin(source.getRegistrationOrigin())
820                                         .setTriggerSummaryBucket(
821                                                 fakeReport.getTriggerSummaryBucket())
822                                         .setSourceDebugKey(getSourceDebugKeyForNoisedReport(source))
823                                         .build())
824                 .collect(Collectors.toList());
825     }
826 
827     @VisibleForTesting
insertSourceFromTransaction( Source source, IMeasurementDao dao, Map<String, String> additionalDebugReportParams)828     void insertSourceFromTransaction(
829             Source source, IMeasurementDao dao, Map<String, String> additionalDebugReportParams)
830             throws DatastoreException {
831         List<Source.FakeReport> fakeReports =
832                 mSourceNoiseHandler.assignAttributionModeAndGenerateFakeReports(source);
833 
834         final String sourceId = insertSource(source, dao);
835         if (sourceId == null) {
836             // Source was not saved due to DB size restrictions
837             return;
838         }
839 
840         if (mFlags.getMeasurementEnableAttributionScope()) {
841             dao.updateSourcesForAttributionScope(source);
842         }
843 
844         if (fakeReports != null) {
845             mDebugReportApi.scheduleSourceNoisedDebugReport(
846                     source, dao, additionalDebugReportParams);
847             for (EventReport report : generateFakeEventReports(sourceId, source, fakeReports)) {
848                 dao.insertEventReport(report);
849             }
850         }
851         // We want to account for attribution if fake report generation was considered
852         // based on the probability. In that case the attribution mode will be NEVER
853         // (empty fake reports state) or FALSELY (non-empty fake reports).
854         if (source.getAttributionMode() != Source.AttributionMode.TRUTHFULLY) {
855             // Attribution rate limits for app and web destinations are counted
856             // separately, so add a fake report entry for each type of destination if
857             // non-null.
858             if (!Objects.isNull(source.getAppDestinations())) {
859                 for (Uri destination : source.getAppDestinations()) {
860                     dao.insertAttribution(
861                             createFakeAttributionRateLimit(sourceId, source, destination));
862                 }
863             }
864 
865             if (!Objects.isNull(source.getWebDestinations())) {
866                 for (Uri destination : source.getWebDestinations()) {
867                     dao.insertAttribution(
868                             createFakeAttributionRateLimit(sourceId, source, destination));
869                 }
870             }
871         }
872     }
873 
insertSource(Source source, IMeasurementDao dao)874     private String insertSource(Source source, IMeasurementDao dao) throws DatastoreException {
875         try {
876             return dao.insertSource(source);
877         } catch (DatastoreException e) {
878             mDebugReportApi.scheduleSourceUnknownErrorDebugReport(source, dao);
879             LoggerFactory.getMeasurementLogger()
880                     .e(e, "Insert source to DB error, generate source-unknown-error report");
881             throw new DatastoreException(
882                     "Insert source to DB error, generate source-unknown-error report");
883         }
884     }
885 
handleSuccess( AsyncRegistration asyncRegistration, AsyncFetchStatus asyncFetchStatus, AsyncRedirects asyncRedirects, IMeasurementDao dao)886     private void handleSuccess(
887             AsyncRegistration asyncRegistration,
888             AsyncFetchStatus asyncFetchStatus,
889             AsyncRedirects asyncRedirects,
890             IMeasurementDao dao)
891             throws DatastoreException {
892         // deleteAsyncRegistration will throw an exception & rollback the transaction if the record
893         // is already deleted. This can happen if both fallback & regular job are running at the
894         // same time or if deletion job deletes the records.
895         dao.deleteAsyncRegistration(asyncRegistration.getId());
896         if (asyncRedirects.getRedirects().isEmpty()) {
897             return;
898         }
899         int maxRedirects = FlagsFactory.getFlags().getMeasurementMaxRegistrationRedirects();
900         KeyValueData keyValueData =
901                 dao.getKeyValueData(
902                         asyncRegistration.getRegistrationId(),
903                         DataType.REGISTRATION_REDIRECT_COUNT);
904         int currentCount = keyValueData.getRegistrationRedirectCount();
905         if (currentCount >= maxRedirects) {
906             asyncFetchStatus.setRedirectError(true);
907             return;
908         }
909 
910         for (AsyncRedirect asyncRedirect : asyncRedirects.getRedirects()) {
911             if (currentCount >= maxRedirects) {
912                 break;
913             }
914             dao.insertAsyncRegistration(
915                     createAsyncRegistrationFromRedirect(asyncRegistration, asyncRedirect));
916             currentCount++;
917         }
918         keyValueData.setRegistrationRedirectCount(currentCount);
919         dao.insertOrUpdateKeyValueData(keyValueData);
920     }
921 
handleFailure( AsyncRegistration asyncRegistration, AsyncFetchStatus asyncFetchStatus, Set<Uri> failedOrigins, IMeasurementDao dao)922     private void handleFailure(
923             AsyncRegistration asyncRegistration,
924             AsyncFetchStatus asyncFetchStatus,
925             Set<Uri> failedOrigins,
926             IMeasurementDao dao)
927             throws DatastoreException {
928         if (asyncFetchStatus.canRetry()) {
929             LoggerFactory.getMeasurementLogger()
930                     .d(
931                             "AsyncRegistrationQueueRunner: "
932                                     + "async "
933                                     + asyncRegistration.getType()
934                                     + " registration will be queued for retry "
935                                     + "Fetch Status : "
936                                     + asyncFetchStatus.getResponseStatus());
937             failedOrigins.add(BaseUriExtractor.getBaseUri(asyncRegistration.getRegistrationUri()));
938             asyncRegistration.incrementRetryCount();
939             dao.updateRetryCount(asyncRegistration);
940         } else {
941             LoggerFactory.getMeasurementLogger()
942                     .d(
943                             "AsyncRegistrationQueueRunner: "
944                                     + "async "
945                                     + asyncRegistration.getType()
946                                     + " registration will not be queued for retry. "
947                                     + "Fetch Status : "
948                                     + asyncFetchStatus.getResponseStatus());
949             dao.deleteAsyncRegistration(asyncRegistration.getId());
950         }
951     }
952 
953     /**
954      * {@link Attribution} generated from here will only be used for fake report attribution.
955      *
956      * @param source source to derive parameters from
957      * @param destination destination for attribution
958      * @return a fake {@link Attribution}
959      */
createFakeAttributionRateLimit( String sourceId, Source source, Uri destination)960     private Attribution createFakeAttributionRateLimit(
961             String sourceId, Source source, Uri destination) {
962         Optional<Uri> topLevelPublisher =
963                 getTopLevelPublisher(source.getPublisher(), source.getPublisherType());
964 
965         if (topLevelPublisher.isEmpty()) {
966             throw new IllegalArgumentException(
967                     String.format(
968                             "insertAttributionRateLimit: getSourceAndDestinationTopPrivateDomains"
969                                     + " failed. Publisher: %s; Attribution destination: %s",
970                             source.getPublisher(), destination));
971         }
972 
973         return new Attribution.Builder()
974                 .setSourceSite(topLevelPublisher.get().toString())
975                 .setSourceOrigin(source.getPublisher().toString())
976                 .setDestinationSite(destination.toString())
977                 .setDestinationOrigin(destination.toString())
978                 .setEnrollmentId(source.getEnrollmentId())
979                 .setTriggerTime(source.getEventTime())
980                 .setRegistrant(source.getRegistrant().toString())
981                 .setSourceId(sourceId)
982                 // Intentionally kept it as null because it's a fake attribution
983                 .setTriggerId(null)
984                 // Intentionally using source here since trigger is not available
985                 .setRegistrationOrigin(source.getRegistrationOrigin())
986                 .setReportId(ATTRIBUTION_FAKE_REPORT_ID)
987                 .build();
988     }
989 
getTopLevelPublisher( Uri topOrigin, @EventSurfaceType int publisherType)990     private static Optional<Uri> getTopLevelPublisher(
991             Uri topOrigin, @EventSurfaceType int publisherType) {
992         return publisherType == EventSurfaceType.APP
993                 ? Optional.of(topOrigin)
994                 : WebAddresses.topPrivateDomainAndScheme(topOrigin);
995     }
996 
getPublisher(AsyncRegistration request)997     private Uri getPublisher(AsyncRegistration request) {
998         return request.getRegistrant();
999     }
1000 
notifyTriggerContentProvider()1001     private void notifyTriggerContentProvider() {
1002         try (ContentProviderClient contentProviderClient =
1003                 mContentResolver.acquireContentProviderClient(TriggerContentProvider.TRIGGER_URI)) {
1004             if (contentProviderClient != null) {
1005                 contentProviderClient.insert(TriggerContentProvider.TRIGGER_URI, null);
1006             }
1007         } catch (RemoteException e) {
1008             LoggerFactory.getMeasurementLogger()
1009                     .e(e, "Trigger Content Provider invocation failed.");
1010         }
1011     }
1012 
1013     @Nullable
getSourceDebugKeyForNoisedReport(@onNull Source source)1014     private UnsignedLong getSourceDebugKeyForNoisedReport(@NonNull Source source) {
1015         if ((source.getPublisherType() == EventSurfaceType.APP && source.hasAdIdPermission())
1016                 || (source.getPublisherType() == EventSurfaceType.WEB
1017                         && source.hasArDebugPermission())) {
1018             return source.getDebugKey();
1019         }
1020         return null;
1021     }
1022 }
1023