1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.measurement.reporting;
18 
19 import static com.android.adservices.service.measurement.util.JobLockHolder.Type.AGGREGATE_REPORTING;
20 import static com.android.adservices.service.measurement.util.JobLockHolder.Type.EVENT_REPORTING;
21 import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON;
22 import static com.android.adservices.spe.AdServicesJobInfo.MEASUREMENT_REPORTING_JOB;
23 
24 import android.app.job.JobInfo;
25 import android.app.job.JobParameters;
26 import android.app.job.JobScheduler;
27 import android.app.job.JobService;
28 import android.content.ComponentName;
29 import android.content.Context;
30 
31 import com.android.adservices.LogUtil;
32 import com.android.adservices.LoggerFactory;
33 import com.android.adservices.concurrency.AdServicesExecutors;
34 import com.android.adservices.data.measurement.DatastoreManager;
35 import com.android.adservices.data.measurement.DatastoreManagerFactory;
36 import com.android.adservices.service.Flags;
37 import com.android.adservices.service.FlagsFactory;
38 import com.android.adservices.service.common.compat.ServiceCompatUtils;
39 import com.android.adservices.service.measurement.KeyValueData;
40 import com.android.adservices.service.measurement.aggregation.AggregateEncryptionKeyManager;
41 import com.android.adservices.service.measurement.util.JobLockHolder;
42 import com.android.adservices.service.stats.AdServicesLoggerImpl;
43 import com.android.adservices.spe.AdServicesJobServiceLogger;
44 import com.android.internal.annotations.VisibleForTesting;
45 
46 import com.google.common.util.concurrent.ListeningExecutorService;
47 
48 import java.util.Optional;
49 import java.util.concurrent.Future;
50 
51 /**
52  * Service for sending event and aggregate reports. Reporting logic contained in {@link
53  * EventReportingJobHandler} and {@link AggregateReportingJobHandler}.
54  *
55  * <p>Bug(b/342687912): This will eventually replace {@link EventReportingJobService} and {@link
56  * AggregateReportingJobService}.
57  */
58 public final class ReportingJobService extends JobService {
59     private static final ListeningExecutorService sBlockingExecutor =
60             AdServicesExecutors.getBlockingExecutor();
61     private Future mExecutorFuture;
62     private static final String JOB_NEXT_EXECUTION_TIME = "job_next_execution_time";
63     private static final String JOB_LAST_EXECUTION_TIME = "job_last_execution_time";
64 
65     @Override
onStartJob(JobParameters params)66     public boolean onStartJob(JobParameters params) {
67         // Always ensure that the first thing this job does is check if it should be running, and
68         // cancel itself if it's not supposed to be.
69         if (ServiceCompatUtils.shouldDisableExtServicesJobOnTPlus(this)) {
70             LogUtil.d(
71                     "Disabling %s job because it's running in" + " ExtServices on T+",
72                     this.getClass().getSimpleName());
73             return skipAndCancelBackgroundJob(params, /* skipReason= */ 0, /* doRecord= */ false);
74         }
75 
76         AdServicesJobServiceLogger.getInstance()
77                 .recordOnStartJob(MEASUREMENT_REPORTING_JOB.getJobId());
78 
79         if (!FlagsFactory.getFlags().getMeasurementReportingJobServiceEnabled()) {
80             LoggerFactory.getMeasurementLogger()
81                     .e("%s is disabled", this.getClass().getSimpleName());
82             return skipAndCancelBackgroundJob(
83                     params,
84                     AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON,
85                     /* doRecord= */ true);
86         }
87 
88         LoggerFactory.getMeasurementLogger().d("%s.onStartJob", this.getClass().getSimpleName());
89         mExecutorFuture =
90                 sBlockingExecutor.submit(
91                         () -> {
92                             try {
93                                 saveExecutionStartTime();
94                                 processPendingAggregateReports();
95                                 processPendingEventReports();
96 
97                                 AdServicesJobServiceLogger.getInstance()
98                                         .recordJobFinished(
99                                                 MEASUREMENT_REPORTING_JOB.getJobId(),
100                                                 /* isSuccessful= */ true,
101                                                 /* shouldRetry= */ false);
102                                 jobFinished(params, /* wantsReschedule= */ false);
103                             } finally {
104                                 scheduleIfNeeded(
105                                         getApplicationContext(), /* forceSchedule= */ false);
106                             }
107                         });
108         return true;
109     }
110 
111     @Override
onStopJob(JobParameters params)112     public boolean onStopJob(JobParameters params) {
113         LoggerFactory.getMeasurementLogger().d("%s.onStopJob", this.getClass().getSimpleName());
114         boolean shouldRetry = true;
115         if (mExecutorFuture != null) {
116             shouldRetry = mExecutorFuture.cancel(/* mayInterruptIfRunning */ true);
117         }
118         AdServicesJobServiceLogger.getInstance()
119                 .recordOnStopJob(params, MEASUREMENT_REPORTING_JOB.getJobId(), shouldRetry);
120         return shouldRetry;
121     }
122 
123     /**
124      * Schedule execution of this job service based on pending reports in the database, either
125      * aggregate or event.
126      *
127      * <p>If there are no pending reports, this service will not be scheduled.
128      *
129      * <p>This job will be scheduled to the latest report within the batching window. The batching
130      * window is the window of time between the next earliest report and the window length specified
131      * in the flags.
132      *
133      * <p>Job scheduling will also be throttled by a minimum execution window specified in the
134      * flags.
135      *
136      * @param context application context
137      * @param forceSchedule true if the job is to be scheduled at the next pending report,
138      *     disregarding the minimum execution window. If there is no pending report, this job will
139      *     not be scheduled.
140      */
scheduleIfNeeded(Context context, boolean forceSchedule)141     public static void scheduleIfNeeded(Context context, boolean forceSchedule) {
142         Flags flags = FlagsFactory.getFlags();
143         if (!flags.getMeasurementReportingJobServiceEnabled()) {
144             LoggerFactory.getMeasurementLogger()
145                     .e("ReportingJobService is disabled, skip scheduling");
146             return;
147         }
148 
149         final JobScheduler jobScheduler = context.getSystemService(JobScheduler.class);
150         if (jobScheduler == null) {
151             LoggerFactory.getMeasurementLogger().e("JobScheduler not found");
152             return;
153         }
154 
155         Optional<Long> latestReportTimeInBatchOpt = getLastReportTimeInBatch(context, flags);
156         if (latestReportTimeInBatchOpt.isEmpty()) {
157             LoggerFactory.getMeasurementLogger()
158                     .d("ReportingJobService found no pending reports. Aborting scheduling.");
159             return;
160         }
161 
162         long latestReportTimeInBatch = latestReportTimeInBatchOpt.get();
163         long lastExecution = getLastExecution(context);
164         Long nextScheduledExecution = getNextScheduledExecution(context);
165         long minExecutionWindowEnd =
166                 lastExecution + flags.getMeasurementReportingJobServiceMinExecutionWindowMillis();
167 
168         final JobInfo scheduledJob =
169                 jobScheduler.getPendingJob(MEASUREMENT_REPORTING_JOB.getJobId());
170 
171         long nextExecutionTime =
172                 getNextExecutionTime(forceSchedule, latestReportTimeInBatch, minExecutionWindowEnd);
173         JobInfo jobInfo = buildJobInfo(context, flags, nextExecutionTime);
174         if (forceSchedule
175                 || !isNextReportScheduled(
176                         scheduledJob, nextScheduledExecution, latestReportTimeInBatch)) {
177             jobScheduler.schedule(jobInfo);
178             saveNextExecution(context, latestReportTimeInBatch);
179             LoggerFactory.getMeasurementLogger().d("Scheduled ReportingJobService");
180         }
181     }
182 
getNextExecutionTime( boolean forceSchedule, long latestReportTimeInBatch, long minExecutionWindowEnd)183     private static long getNextExecutionTime(
184             boolean forceSchedule, long latestReportTimeInBatch, long minExecutionWindowEnd) {
185         return forceSchedule
186                 ? latestReportTimeInBatch
187                 : Math.max(minExecutionWindowEnd, latestReportTimeInBatch);
188     }
189 
saveNextExecution(Context context, Long latestReportTimeInBatch)190     private static void saveNextExecution(Context context, Long latestReportTimeInBatch) {
191         DatastoreManager datastoreManager = DatastoreManagerFactory.getDatastoreManager(context);
192         datastoreManager.runInTransaction(getSaveNextExecutionConsumer(latestReportTimeInBatch));
193     }
194 
getSaveNextExecutionConsumer( Long latestReportTimeInBatch)195     private static DatastoreManager.ThrowingCheckedConsumer getSaveNextExecutionConsumer(
196             Long latestReportTimeInBatch) {
197         return measurementDao -> {
198             KeyValueData nextScheduledExecution =
199                     measurementDao.getKeyValueData(
200                             JOB_NEXT_EXECUTION_TIME, KeyValueData.DataType.JOB_NEXT_EXECUTION_TIME);
201             nextScheduledExecution.setReportingJobNextExecutionTime(latestReportTimeInBatch);
202             measurementDao.insertOrUpdateKeyValueData(nextScheduledExecution);
203         };
204     }
205 
isNextReportScheduled( JobInfo scheduledJob, Long nextScheduledExecution, long latestReportTimeInBatch)206     private static boolean isNextReportScheduled(
207             JobInfo scheduledJob, Long nextScheduledExecution, long latestReportTimeInBatch) {
208         return scheduledJob != null
209                 && nextScheduledExecution != null
210                 && nextScheduledExecution == latestReportTimeInBatch;
211     }
212 
getNextScheduledExecution(Context context)213     private static Long getNextScheduledExecution(Context context) {
214         DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(context);
215 
216         KeyValueData kvData =
217                 dataStoreManager
218                         .runInTransactionWithResult(
219                                 measurementDao ->
220                                         measurementDao.getKeyValueData(
221                                                 JOB_NEXT_EXECUTION_TIME,
222                                                 KeyValueData.DataType.JOB_NEXT_EXECUTION_TIME))
223                         .orElseThrow();
224 
225         return kvData.getReportingJobNextExecutionTime();
226     }
227 
getLastReportTimeInBatch(Context context, Flags flags)228     private static Optional<Long> getLastReportTimeInBatch(Context context, Flags flags) {
229         DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(context);
230 
231         return dataStoreManager.runInTransactionWithResult(
232                 measurementDao ->
233                         measurementDao.getLatestReportTimeInBatchWindow(
234                                 flags.getMeasurementReportingJobServiceBatchWindowMillis()));
235     }
236 
saveExecutionStartTime()237     private void saveExecutionStartTime() {
238         DatastoreManager datastoreManager =
239                 DatastoreManagerFactory.getDatastoreManager(getApplicationContext());
240         datastoreManager.runInTransaction(getSaveExecutionTimeConsumer());
241     }
242 
getSaveExecutionTimeConsumer()243     private DatastoreManager.ThrowingCheckedConsumer getSaveExecutionTimeConsumer() {
244         return measurementDao -> {
245             KeyValueData lastExecution =
246                     measurementDao.getKeyValueData(
247                             JOB_LAST_EXECUTION_TIME, KeyValueData.DataType.JOB_LAST_EXECUTION_TIME);
248 
249             lastExecution.setReportingJobLastExecutionTime(System.currentTimeMillis());
250             measurementDao.insertOrUpdateKeyValueData(lastExecution);
251         };
252     }
253 
254     private static long getLastExecution(Context context) {
255         DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(context);
256 
257         KeyValueData lastExecution =
258                 dataStoreManager
259                         .runInTransactionWithResult(
260                                 measurementDao ->
261                                         measurementDao.getKeyValueData(
262                                                 JOB_LAST_EXECUTION_TIME,
263                                                 KeyValueData.DataType.JOB_LAST_EXECUTION_TIME))
264                         .orElseThrow();
265 
266         return lastExecution.getReportingJobLastExecutionTime() != null
267                 ? lastExecution.getReportingJobLastExecutionTime()
268                 : Long.MIN_VALUE;
269     }
270 
271     private boolean skipAndCancelBackgroundJob(
272             final JobParameters params, int skipReason, boolean doRecord) {
273         final JobScheduler jobScheduler = this.getSystemService(JobScheduler.class);
274         if (jobScheduler != null) {
275             jobScheduler.cancel(MEASUREMENT_REPORTING_JOB.getJobId());
276             saveNextExecution(getApplicationContext(), null);
277         }
278 
279         if (doRecord) {
280             AdServicesJobServiceLogger.getInstance()
281                     .recordJobSkipped(MEASUREMENT_REPORTING_JOB.getJobId(), skipReason);
282         }
283 
284         // Tell the JobScheduler that the job has completed and does not need to be rescheduled.
285         jobFinished(params, false);
286 
287         // Returning false means that this job has completed its work.
288         return false;
289     }
290 
291     private static JobInfo buildJobInfo(Context context, Flags flags, long nextExecutionTime) {
292         JobInfo.Builder builder =
293                 new JobInfo.Builder(
294                                 MEASUREMENT_REPORTING_JOB.getJobId(),
295                                 new ComponentName(context, ReportingJobService.class))
296                         .setRequiresBatteryNotLow(
297                                 flags.getMeasurementReportingJobRequiredBatteryNotLow())
298                         .setRequiredNetworkType(
299                                 flags.getMeasurementReportingJobRequiredNetworkType())
300                         .setPersisted(flags.getMeasurementReportingJobPersisted());
301         // nextExecutionTime could potentially be in the past, i.e. for Aggregate Reports with
302         // trigger context ids. Using such a timestamp would result in a negative minimum latency.
303         if (nextExecutionTime > System.currentTimeMillis()) {
304             builder.setMinimumLatency(nextExecutionTime - System.currentTimeMillis());
305         }
306 
307         return builder.build();
308     }
309 
310     @VisibleForTesting
311     void processPendingAggregateReports() {
312         final JobLockHolder lock = JobLockHolder.getInstance(AGGREGATE_REPORTING);
313         if (lock.tryLock()) {
314             try {
315                 long maxAggregateReportUploadRetryWindowMs =
316                         FlagsFactory.getFlags()
317                                 .getMeasurementMaxAggregateReportUploadRetryWindowMs();
318                 DatastoreManager datastoreManager =
319                         DatastoreManagerFactory.getDatastoreManager(getApplicationContext());
320                 new AggregateReportingJobHandler(
321                                 datastoreManager,
322                                 new AggregateEncryptionKeyManager(
323                                         datastoreManager, getApplicationContext()),
324                                 FlagsFactory.getFlags(),
325                                 AdServicesLoggerImpl.getInstance(),
326                                 ReportingStatus.ReportType.AGGREGATE,
327                                 ReportingStatus.UploadMethod.REGULAR,
328                                 getApplicationContext())
329                         .performScheduledPendingReportsInWindow(
330                                 System.currentTimeMillis() - maxAggregateReportUploadRetryWindowMs,
331                                 System.currentTimeMillis());
332                 return;
333             } finally {
334                 lock.unlock();
335             }
336         }
337         LoggerFactory.getMeasurementLogger()
338                 .d("ReportingJobService did not acquire the lock for Aggregate Reporting");
339     }
340 
341     @VisibleForTesting
342     void processPendingEventReports() {
343         final JobLockHolder lock = JobLockHolder.getInstance(EVENT_REPORTING);
344         if (lock.tryLock()) {
345             try {
346                 long maxEventReportUploadRetryWindowMs =
347                         FlagsFactory.getFlags().getMeasurementMaxEventReportUploadRetryWindowMs();
348                 new EventReportingJobHandler(
349                                 DatastoreManagerFactory.getDatastoreManager(
350                                         getApplicationContext()),
351                                 FlagsFactory.getFlags(),
352                                 AdServicesLoggerImpl.getInstance(),
353                                 ReportingStatus.ReportType.EVENT,
354                                 ReportingStatus.UploadMethod.REGULAR,
355                                 getApplicationContext())
356                         .performScheduledPendingReportsInWindow(
357                                 System.currentTimeMillis() - maxEventReportUploadRetryWindowMs,
358                                 System.currentTimeMillis());
359                 return;
360             } finally {
361                 lock.unlock();
362             }
363         }
364         LoggerFactory.getMeasurementLogger()
365                 .d("ReportingJobService did not acquire the lock for Event Reporting");
366     }
367 
368     @VisibleForTesting
369     static void schedule(JobScheduler jobScheduler, JobInfo jobInfo) {
370         jobScheduler.schedule(jobInfo);
371     }
372 
373     @VisibleForTesting
374     Future getFutureForTesting() {
375         return mExecutorFuture;
376     }
377 }
378