1 /* 2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.measurement.reporting; 18 19 import static com.android.adservices.service.measurement.util.JobLockHolder.Type.AGGREGATE_REPORTING; 20 import static com.android.adservices.service.measurement.util.JobLockHolder.Type.EVENT_REPORTING; 21 import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON; 22 import static com.android.adservices.spe.AdServicesJobInfo.MEASUREMENT_REPORTING_JOB; 23 24 import android.app.job.JobInfo; 25 import android.app.job.JobParameters; 26 import android.app.job.JobScheduler; 27 import android.app.job.JobService; 28 import android.content.ComponentName; 29 import android.content.Context; 30 31 import com.android.adservices.LogUtil; 32 import com.android.adservices.LoggerFactory; 33 import com.android.adservices.concurrency.AdServicesExecutors; 34 import com.android.adservices.data.measurement.DatastoreManager; 35 import com.android.adservices.data.measurement.DatastoreManagerFactory; 36 import com.android.adservices.service.Flags; 37 import com.android.adservices.service.FlagsFactory; 38 import com.android.adservices.service.common.compat.ServiceCompatUtils; 39 import com.android.adservices.service.measurement.KeyValueData; 40 import com.android.adservices.service.measurement.aggregation.AggregateEncryptionKeyManager; 41 import com.android.adservices.service.measurement.util.JobLockHolder; 42 import com.android.adservices.service.stats.AdServicesLoggerImpl; 43 import com.android.adservices.spe.AdServicesJobServiceLogger; 44 import com.android.internal.annotations.VisibleForTesting; 45 46 import com.google.common.util.concurrent.ListeningExecutorService; 47 48 import java.util.Optional; 49 import java.util.concurrent.Future; 50 51 /** 52 * Service for sending event and aggregate reports. Reporting logic contained in {@link 53 * EventReportingJobHandler} and {@link AggregateReportingJobHandler}. 54 * 55 * <p>Bug(b/342687912): This will eventually replace {@link EventReportingJobService} and {@link 56 * AggregateReportingJobService}. 57 */ 58 public final class ReportingJobService extends JobService { 59 private static final ListeningExecutorService sBlockingExecutor = 60 AdServicesExecutors.getBlockingExecutor(); 61 private Future mExecutorFuture; 62 private static final String JOB_NEXT_EXECUTION_TIME = "job_next_execution_time"; 63 private static final String JOB_LAST_EXECUTION_TIME = "job_last_execution_time"; 64 65 @Override onStartJob(JobParameters params)66 public boolean onStartJob(JobParameters params) { 67 // Always ensure that the first thing this job does is check if it should be running, and 68 // cancel itself if it's not supposed to be. 69 if (ServiceCompatUtils.shouldDisableExtServicesJobOnTPlus(this)) { 70 LogUtil.d( 71 "Disabling %s job because it's running in" + " ExtServices on T+", 72 this.getClass().getSimpleName()); 73 return skipAndCancelBackgroundJob(params, /* skipReason= */ 0, /* doRecord= */ false); 74 } 75 76 AdServicesJobServiceLogger.getInstance() 77 .recordOnStartJob(MEASUREMENT_REPORTING_JOB.getJobId()); 78 79 if (!FlagsFactory.getFlags().getMeasurementReportingJobServiceEnabled()) { 80 LoggerFactory.getMeasurementLogger() 81 .e("%s is disabled", this.getClass().getSimpleName()); 82 return skipAndCancelBackgroundJob( 83 params, 84 AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON, 85 /* doRecord= */ true); 86 } 87 88 LoggerFactory.getMeasurementLogger().d("%s.onStartJob", this.getClass().getSimpleName()); 89 mExecutorFuture = 90 sBlockingExecutor.submit( 91 () -> { 92 try { 93 saveExecutionStartTime(); 94 processPendingAggregateReports(); 95 processPendingEventReports(); 96 97 AdServicesJobServiceLogger.getInstance() 98 .recordJobFinished( 99 MEASUREMENT_REPORTING_JOB.getJobId(), 100 /* isSuccessful= */ true, 101 /* shouldRetry= */ false); 102 jobFinished(params, /* wantsReschedule= */ false); 103 } finally { 104 scheduleIfNeeded( 105 getApplicationContext(), /* forceSchedule= */ false); 106 } 107 }); 108 return true; 109 } 110 111 @Override onStopJob(JobParameters params)112 public boolean onStopJob(JobParameters params) { 113 LoggerFactory.getMeasurementLogger().d("%s.onStopJob", this.getClass().getSimpleName()); 114 boolean shouldRetry = true; 115 if (mExecutorFuture != null) { 116 shouldRetry = mExecutorFuture.cancel(/* mayInterruptIfRunning */ true); 117 } 118 AdServicesJobServiceLogger.getInstance() 119 .recordOnStopJob(params, MEASUREMENT_REPORTING_JOB.getJobId(), shouldRetry); 120 return shouldRetry; 121 } 122 123 /** 124 * Schedule execution of this job service based on pending reports in the database, either 125 * aggregate or event. 126 * 127 * <p>If there are no pending reports, this service will not be scheduled. 128 * 129 * <p>This job will be scheduled to the latest report within the batching window. The batching 130 * window is the window of time between the next earliest report and the window length specified 131 * in the flags. 132 * 133 * <p>Job scheduling will also be throttled by a minimum execution window specified in the 134 * flags. 135 * 136 * @param context application context 137 * @param forceSchedule true if the job is to be scheduled at the next pending report, 138 * disregarding the minimum execution window. If there is no pending report, this job will 139 * not be scheduled. 140 */ scheduleIfNeeded(Context context, boolean forceSchedule)141 public static void scheduleIfNeeded(Context context, boolean forceSchedule) { 142 Flags flags = FlagsFactory.getFlags(); 143 if (!flags.getMeasurementReportingJobServiceEnabled()) { 144 LoggerFactory.getMeasurementLogger() 145 .e("ReportingJobService is disabled, skip scheduling"); 146 return; 147 } 148 149 final JobScheduler jobScheduler = context.getSystemService(JobScheduler.class); 150 if (jobScheduler == null) { 151 LoggerFactory.getMeasurementLogger().e("JobScheduler not found"); 152 return; 153 } 154 155 Optional<Long> latestReportTimeInBatchOpt = getLastReportTimeInBatch(context, flags); 156 if (latestReportTimeInBatchOpt.isEmpty()) { 157 LoggerFactory.getMeasurementLogger() 158 .d("ReportingJobService found no pending reports. Aborting scheduling."); 159 return; 160 } 161 162 long latestReportTimeInBatch = latestReportTimeInBatchOpt.get(); 163 long lastExecution = getLastExecution(context); 164 Long nextScheduledExecution = getNextScheduledExecution(context); 165 long minExecutionWindowEnd = 166 lastExecution + flags.getMeasurementReportingJobServiceMinExecutionWindowMillis(); 167 168 final JobInfo scheduledJob = 169 jobScheduler.getPendingJob(MEASUREMENT_REPORTING_JOB.getJobId()); 170 171 long nextExecutionTime = 172 getNextExecutionTime(forceSchedule, latestReportTimeInBatch, minExecutionWindowEnd); 173 JobInfo jobInfo = buildJobInfo(context, flags, nextExecutionTime); 174 if (forceSchedule 175 || !isNextReportScheduled( 176 scheduledJob, nextScheduledExecution, latestReportTimeInBatch)) { 177 jobScheduler.schedule(jobInfo); 178 saveNextExecution(context, latestReportTimeInBatch); 179 LoggerFactory.getMeasurementLogger().d("Scheduled ReportingJobService"); 180 } 181 } 182 getNextExecutionTime( boolean forceSchedule, long latestReportTimeInBatch, long minExecutionWindowEnd)183 private static long getNextExecutionTime( 184 boolean forceSchedule, long latestReportTimeInBatch, long minExecutionWindowEnd) { 185 return forceSchedule 186 ? latestReportTimeInBatch 187 : Math.max(minExecutionWindowEnd, latestReportTimeInBatch); 188 } 189 saveNextExecution(Context context, Long latestReportTimeInBatch)190 private static void saveNextExecution(Context context, Long latestReportTimeInBatch) { 191 DatastoreManager datastoreManager = DatastoreManagerFactory.getDatastoreManager(context); 192 datastoreManager.runInTransaction(getSaveNextExecutionConsumer(latestReportTimeInBatch)); 193 } 194 getSaveNextExecutionConsumer( Long latestReportTimeInBatch)195 private static DatastoreManager.ThrowingCheckedConsumer getSaveNextExecutionConsumer( 196 Long latestReportTimeInBatch) { 197 return measurementDao -> { 198 KeyValueData nextScheduledExecution = 199 measurementDao.getKeyValueData( 200 JOB_NEXT_EXECUTION_TIME, KeyValueData.DataType.JOB_NEXT_EXECUTION_TIME); 201 nextScheduledExecution.setReportingJobNextExecutionTime(latestReportTimeInBatch); 202 measurementDao.insertOrUpdateKeyValueData(nextScheduledExecution); 203 }; 204 } 205 isNextReportScheduled( JobInfo scheduledJob, Long nextScheduledExecution, long latestReportTimeInBatch)206 private static boolean isNextReportScheduled( 207 JobInfo scheduledJob, Long nextScheduledExecution, long latestReportTimeInBatch) { 208 return scheduledJob != null 209 && nextScheduledExecution != null 210 && nextScheduledExecution == latestReportTimeInBatch; 211 } 212 getNextScheduledExecution(Context context)213 private static Long getNextScheduledExecution(Context context) { 214 DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(context); 215 216 KeyValueData kvData = 217 dataStoreManager 218 .runInTransactionWithResult( 219 measurementDao -> 220 measurementDao.getKeyValueData( 221 JOB_NEXT_EXECUTION_TIME, 222 KeyValueData.DataType.JOB_NEXT_EXECUTION_TIME)) 223 .orElseThrow(); 224 225 return kvData.getReportingJobNextExecutionTime(); 226 } 227 getLastReportTimeInBatch(Context context, Flags flags)228 private static Optional<Long> getLastReportTimeInBatch(Context context, Flags flags) { 229 DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(context); 230 231 return dataStoreManager.runInTransactionWithResult( 232 measurementDao -> 233 measurementDao.getLatestReportTimeInBatchWindow( 234 flags.getMeasurementReportingJobServiceBatchWindowMillis())); 235 } 236 saveExecutionStartTime()237 private void saveExecutionStartTime() { 238 DatastoreManager datastoreManager = 239 DatastoreManagerFactory.getDatastoreManager(getApplicationContext()); 240 datastoreManager.runInTransaction(getSaveExecutionTimeConsumer()); 241 } 242 getSaveExecutionTimeConsumer()243 private DatastoreManager.ThrowingCheckedConsumer getSaveExecutionTimeConsumer() { 244 return measurementDao -> { 245 KeyValueData lastExecution = 246 measurementDao.getKeyValueData( 247 JOB_LAST_EXECUTION_TIME, KeyValueData.DataType.JOB_LAST_EXECUTION_TIME); 248 249 lastExecution.setReportingJobLastExecutionTime(System.currentTimeMillis()); 250 measurementDao.insertOrUpdateKeyValueData(lastExecution); 251 }; 252 } 253 254 private static long getLastExecution(Context context) { 255 DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(context); 256 257 KeyValueData lastExecution = 258 dataStoreManager 259 .runInTransactionWithResult( 260 measurementDao -> 261 measurementDao.getKeyValueData( 262 JOB_LAST_EXECUTION_TIME, 263 KeyValueData.DataType.JOB_LAST_EXECUTION_TIME)) 264 .orElseThrow(); 265 266 return lastExecution.getReportingJobLastExecutionTime() != null 267 ? lastExecution.getReportingJobLastExecutionTime() 268 : Long.MIN_VALUE; 269 } 270 271 private boolean skipAndCancelBackgroundJob( 272 final JobParameters params, int skipReason, boolean doRecord) { 273 final JobScheduler jobScheduler = this.getSystemService(JobScheduler.class); 274 if (jobScheduler != null) { 275 jobScheduler.cancel(MEASUREMENT_REPORTING_JOB.getJobId()); 276 saveNextExecution(getApplicationContext(), null); 277 } 278 279 if (doRecord) { 280 AdServicesJobServiceLogger.getInstance() 281 .recordJobSkipped(MEASUREMENT_REPORTING_JOB.getJobId(), skipReason); 282 } 283 284 // Tell the JobScheduler that the job has completed and does not need to be rescheduled. 285 jobFinished(params, false); 286 287 // Returning false means that this job has completed its work. 288 return false; 289 } 290 291 private static JobInfo buildJobInfo(Context context, Flags flags, long nextExecutionTime) { 292 JobInfo.Builder builder = 293 new JobInfo.Builder( 294 MEASUREMENT_REPORTING_JOB.getJobId(), 295 new ComponentName(context, ReportingJobService.class)) 296 .setRequiresBatteryNotLow( 297 flags.getMeasurementReportingJobRequiredBatteryNotLow()) 298 .setRequiredNetworkType( 299 flags.getMeasurementReportingJobRequiredNetworkType()) 300 .setPersisted(flags.getMeasurementReportingJobPersisted()); 301 // nextExecutionTime could potentially be in the past, i.e. for Aggregate Reports with 302 // trigger context ids. Using such a timestamp would result in a negative minimum latency. 303 if (nextExecutionTime > System.currentTimeMillis()) { 304 builder.setMinimumLatency(nextExecutionTime - System.currentTimeMillis()); 305 } 306 307 return builder.build(); 308 } 309 310 @VisibleForTesting 311 void processPendingAggregateReports() { 312 final JobLockHolder lock = JobLockHolder.getInstance(AGGREGATE_REPORTING); 313 if (lock.tryLock()) { 314 try { 315 long maxAggregateReportUploadRetryWindowMs = 316 FlagsFactory.getFlags() 317 .getMeasurementMaxAggregateReportUploadRetryWindowMs(); 318 DatastoreManager datastoreManager = 319 DatastoreManagerFactory.getDatastoreManager(getApplicationContext()); 320 new AggregateReportingJobHandler( 321 datastoreManager, 322 new AggregateEncryptionKeyManager( 323 datastoreManager, getApplicationContext()), 324 FlagsFactory.getFlags(), 325 AdServicesLoggerImpl.getInstance(), 326 ReportingStatus.ReportType.AGGREGATE, 327 ReportingStatus.UploadMethod.REGULAR, 328 getApplicationContext()) 329 .performScheduledPendingReportsInWindow( 330 System.currentTimeMillis() - maxAggregateReportUploadRetryWindowMs, 331 System.currentTimeMillis()); 332 return; 333 } finally { 334 lock.unlock(); 335 } 336 } 337 LoggerFactory.getMeasurementLogger() 338 .d("ReportingJobService did not acquire the lock for Aggregate Reporting"); 339 } 340 341 @VisibleForTesting 342 void processPendingEventReports() { 343 final JobLockHolder lock = JobLockHolder.getInstance(EVENT_REPORTING); 344 if (lock.tryLock()) { 345 try { 346 long maxEventReportUploadRetryWindowMs = 347 FlagsFactory.getFlags().getMeasurementMaxEventReportUploadRetryWindowMs(); 348 new EventReportingJobHandler( 349 DatastoreManagerFactory.getDatastoreManager( 350 getApplicationContext()), 351 FlagsFactory.getFlags(), 352 AdServicesLoggerImpl.getInstance(), 353 ReportingStatus.ReportType.EVENT, 354 ReportingStatus.UploadMethod.REGULAR, 355 getApplicationContext()) 356 .performScheduledPendingReportsInWindow( 357 System.currentTimeMillis() - maxEventReportUploadRetryWindowMs, 358 System.currentTimeMillis()); 359 return; 360 } finally { 361 lock.unlock(); 362 } 363 } 364 LoggerFactory.getMeasurementLogger() 365 .d("ReportingJobService did not acquire the lock for Event Reporting"); 366 } 367 368 @VisibleForTesting 369 static void schedule(JobScheduler jobScheduler, JobInfo jobInfo) { 370 jobScheduler.schedule(jobInfo); 371 } 372 373 @VisibleForTesting 374 Future getFutureForTesting() { 375 return mExecutorFuture; 376 } 377 } 378