1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License
15  */
16 
17 package com.android.voicemail.impl.scheduling;
18 
19 import android.annotation.TargetApi;
20 import android.content.Context;
21 import android.content.Intent;
22 import android.os.Build.VERSION_CODES;
23 import android.os.Bundle;
24 import android.os.Handler;
25 import android.os.HandlerThread;
26 import android.os.Looper;
27 import android.os.Message;
28 import android.support.annotation.MainThread;
29 import android.support.annotation.Nullable;
30 import android.support.annotation.VisibleForTesting;
31 import android.support.annotation.WorkerThread;
32 import com.android.voicemail.impl.Assert;
33 import com.android.voicemail.impl.NeededForTesting;
34 import com.android.voicemail.impl.VvmLog;
35 import com.android.voicemail.impl.scheduling.TaskQueue.NextTask;
36 import java.util.List;
37 
38 /**
39  * A singleton to queue and run {@link Task} with the {@link android.app.job.JobScheduler}. A task
40  * is queued by sending a broadcast to {@link TaskReceiver}. The intent should contain enough
41  * information in {@link Intent#getExtras()} to construct the task (see {@link
42  * Tasks#createIntent(Context, Class)}).
43  *
44  * <p>The executor will only exist when {@link TaskSchedulerJobService} is running.
45  *
46  * <p>All tasks are ran in the background with a wakelock being held by the {@link
47  * android.app.job.JobScheduler}, which is between {@link #onStartJob(Job, List)} and {@link
48  * #finishJobAsync()}. The {@link TaskSchedulerJobService} also has a {@link TaskQueue}, but the
49  * data is stored in the {@link android.app.job.JobScheduler} instead of the process memory, so if
50  * the process is killed the queued tasks will be restored. If a new task is added, a new {@link
51  * TaskSchedulerJobService} will be scheduled to run the task. If the job is already scheduled, the
52  * new task will be pushed into the queue of the scheduled job. If the job is already running, the
53  * job will be queued in process memory.
54  *
55  * <p>Only one task will be ran at a time, and same task cannot exist in the queue at the same time.
56  * Refer to {@link TaskQueue} for queuing and execution order.
57  *
58  * <p>If there are still tasks in the queue but none are executable immediately, the service will
59  * enter a "sleep", pushing all remaining task into a new job and end the current job.
60  *
61  * <p>The executor will be started when {@link TaskSchedulerJobService} is running, and stopped when
62  * there are no more tasks in the queue or when the executor is put to sleep.
63  *
64  * <p>{@link android.app.job.JobScheduler} is not used directly due to:
65  *
66  * <ul>
67  *   <li>The {@link android.telecom.PhoneAccountHandle} used to differentiate task can not be easily
68  *       mapped into an integer for job id
69  *   <li>A job cannot be mutated to store information such as retry count.
70  * </ul>
71  */
72 @TargetApi(VERSION_CODES.O)
73 final class TaskExecutor {
74 
75   /**
76    * An entity that holds execution resources for the {@link TaskExecutor} to run, usually a {@link
77    * android.app.job.JobService}.
78    */
79   interface Job {
80 
81     /**
82      * Signals to Job to end and release its' resources. This is an asynchronous call and may not
83      * take effect immediately.
84      */
85     @MainThread
finishAsync()86     void finishAsync();
87 
88     /** Whether the call to {@link #finishAsync()} has actually taken effect. */
89     @MainThread
isFinished()90     boolean isFinished();
91   }
92 
93   private static final String TAG = "VvmTaskExecutor";
94 
95   private static final int READY_TOLERANCE_MILLISECONDS = 100;
96 
97   /**
98    * Threshold to determine whether to do a short or long sleep when a task is scheduled in the
99    * future.
100    *
101    * <p>A short sleep will continue the job and use {@link Handler#postDelayed(Runnable, long)} to
102    * wait for the next task.
103    *
104    * <p>A long sleep will finish the job and schedule a new one. The exact execution time is
105    * subjected to {@link android.app.job.JobScheduler} battery optimization, and is not exact.
106    */
107   private static final int SHORT_SLEEP_THRESHOLD_MILLISECONDS = 10_000;
108   /**
109    * When there are no more tasks to be run the service should be stopped. But when all tasks has
110    * finished there might still be more tasks in the message queue waiting to be processed,
111    * especially the ones submitted in {@link Task#onCompleted()}. Wait for a while before stopping
112    * the service to make sure there are no pending messages.
113    */
114   private static final int STOP_DELAY_MILLISECONDS = 5_000;
115 
116   /** Interval between polling of whether the job is finished. */
117   private static final int TERMINATE_POLLING_INTERVAL_MILLISECONDS = 1_000;
118 
119   // The thread to run tasks on
120   private final WorkerThreadHandler workerThreadHandler;
121 
122   private static TaskExecutor instance;
123 
124   /**
125    * Used by tests to turn task handling into a single threaded process by calling {@link
126    * Handler#handleMessage(Message)} directly
127    */
128   private MessageSender messageSender = new MessageSender();
129 
130   private final MainThreadHandler mainThreadHandler;
131 
132   private final Context appContext;
133 
134   /** Main thread only, access through {@link #getTasks()} */
135   private final TaskQueue tasks = new TaskQueue();
136 
137   private boolean isWorkerThreadBusy = false;
138 
139   private boolean isTerminating = false;
140 
141   private Job job;
142 
143   private final Runnable stopServiceWithDelay =
144       new Runnable() {
145         @MainThread
146         @Override
147         public void run() {
148           VvmLog.i(TAG, "Stopping service");
149           if (!isJobRunning() || isTerminating()) {
150             VvmLog.e(TAG, "Service already stopped");
151             return;
152           }
153           scheduleJobAndTerminate(0, true);
154         }
155       };
156 
157   /**
158    * Reschedule the {@link TaskSchedulerJobService} and terminate the executor when the {@link Job}
159    * is truly finished. If the job is still not finished, this runnable will requeue itself on the
160    * main thread. The requeue is only expected to happen a few times.
161    */
162   private class JobFinishedPoller implements Runnable {
163 
164     private final long delayMillis;
165     private final boolean isNewJob;
166     private int invocationCounter = 0;
167 
JobFinishedPoller(long delayMillis, boolean isNewJob)168     JobFinishedPoller(long delayMillis, boolean isNewJob) {
169       this.delayMillis = delayMillis;
170       this.isNewJob = isNewJob;
171     }
172 
173     @Override
run()174     public void run() {
175       // The job should be finished relatively quickly. Assert to make sure this assumption is true.
176       Assert.isTrue(invocationCounter < 10);
177       invocationCounter++;
178       if (job.isFinished()) {
179         VvmLog.i("JobFinishedPoller.run", "Job finished");
180         if (!getTasks().isEmpty()) {
181           TaskSchedulerJobService.scheduleJob(
182               appContext, serializePendingTasks(), delayMillis, isNewJob);
183           tasks.clear();
184         }
185         terminate();
186         return;
187       }
188       VvmLog.w("JobFinishedPoller.run", "Job still running");
189       mainThreadHandler.postDelayed(this, TERMINATE_POLLING_INTERVAL_MILLISECONDS);
190     }
191   };
192 
193   /** Should attempt to run the next task when a task has finished or been added. */
194   private boolean taskAutoRunDisabledForTesting = false;
195 
196   /** Handles execution of the background task in teh worker thread. */
197   @VisibleForTesting
198   final class WorkerThreadHandler extends Handler {
199 
200     public WorkerThreadHandler(Looper looper) {
201       super(looper);
202     }
203     @Override
204     @WorkerThread
205     public void handleMessage(Message msg) {
206       Assert.isNotMainThread();
207       Task task = (Task) msg.obj;
208       try {
209         VvmLog.i(TAG, "executing task " + task);
210         task.onExecuteInBackgroundThread();
211       } catch (Throwable throwable) {
212         VvmLog.e(TAG, "Exception while executing task " + task + ":", throwable);
213       }
214 
215       Message schedulerMessage = mainThreadHandler.obtainMessage();
216       schedulerMessage.obj = task;
217       messageSender.send(schedulerMessage);
218     }
219   }
220 
221   /** Handles completion of the background task in the main thread. */
222   @VisibleForTesting
223   final class MainThreadHandler extends Handler {
224 
225     public MainThreadHandler(Looper looper) {
226       super(looper);
227     }
228 
229     @Override
230     @MainThread
231     public void handleMessage(Message msg) {
232       Assert.isMainThread();
233       Task task = (Task) msg.obj;
234       getTasks().remove(task);
235       task.onCompleted();
236       isWorkerThreadBusy = false;
237       if (!isJobRunning() || isTerminating()) {
238         // TaskExecutor was terminated when the task is running in background, don't need to run the
239         // next task or terminate again
240         return;
241       }
242       maybeRunNextTask();
243     }
244   }
245 
246   /** Starts a new TaskExecutor. May only be called by {@link TaskSchedulerJobService}. */
247   @MainThread
248   static void createRunningInstance(Context context) {
249     Assert.isMainThread();
250     Assert.isTrue(instance == null);
251     instance = new TaskExecutor(context);
252   }
253 
254   /** @return the currently running instance, or {@code null} if the executor is not running. */
255   @MainThread
256   @Nullable
257   static TaskExecutor getRunningInstance() {
258     return instance;
259   }
260 
261   private TaskExecutor(Context context) {
262     this.appContext = context.getApplicationContext();
263     HandlerThread thread = new HandlerThread("VvmTaskExecutor");
264     thread.start();
265 
266     workerThreadHandler = new WorkerThreadHandler(thread.getLooper());
267     mainThreadHandler = new MainThreadHandler(Looper.getMainLooper());
268   }
269 
270   @VisibleForTesting
271   void terminate() {
272     VvmLog.i(TAG, "terminated");
273     Assert.isMainThread();
274     job = null;
275     workerThreadHandler.getLooper().quit();
276     instance = null;
277     TaskReceiver.resendDeferredBroadcasts(appContext);
278   }
279 
280   @MainThread
281   void addTask(Task task) {
282     Assert.isMainThread();
283     getTasks().add(task);
284     VvmLog.i(TAG, task + " added");
285     mainThreadHandler.removeCallbacks(stopServiceWithDelay);
286     maybeRunNextTask();
287   }
288 
289   @MainThread
290   @VisibleForTesting
291   TaskQueue getTasks() {
292     Assert.isMainThread();
293     return tasks;
294   }
295 
296   @MainThread
297   private void maybeRunNextTask() {
298     Assert.isMainThread();
299 
300     if (isWorkerThreadBusy) {
301       return;
302     }
303     if (taskAutoRunDisabledForTesting) {
304       // If taskAutoRunDisabledForTesting is true, runNextTask() must be explicitly called
305       // to run the next task.
306       return;
307     }
308 
309     runNextTask();
310   }
311 
312   @VisibleForTesting
313   @MainThread
314   void runNextTask() {
315     Assert.isMainThread();
316     if (getTasks().isEmpty()) {
317       prepareStop();
318       return;
319     }
320     NextTask nextTask = getTasks().getNextTask(READY_TOLERANCE_MILLISECONDS);
321 
322     if (nextTask.task != null) {
323       nextTask.task.onBeforeExecute();
324       Message message = workerThreadHandler.obtainMessage();
325       message.obj = nextTask.task;
326       isWorkerThreadBusy = true;
327       messageSender.send(message);
328       return;
329     }
330     VvmLog.i(TAG, "minimal wait time:" + nextTask.minimalWaitTimeMillis);
331     if (!taskAutoRunDisabledForTesting && nextTask.minimalWaitTimeMillis != null) {
332       // No tasks are currently ready. Sleep until the next one should be.
333       // If a new task is added during the sleep the service will wake immediately.
334       sleep(nextTask.minimalWaitTimeMillis);
335     }
336   }
337 
338   @MainThread
339   private void sleep(long timeMillis) {
340     VvmLog.i(TAG, "sleep for " + timeMillis + " millis");
341     if (timeMillis < SHORT_SLEEP_THRESHOLD_MILLISECONDS) {
342       mainThreadHandler.postDelayed(
343           new Runnable() {
344             @Override
345             public void run() {
346               maybeRunNextTask();
347             }
348           },
349           timeMillis);
350       return;
351     }
352     scheduleJobAndTerminate(timeMillis, false);
353   }
354 
355   private List<Bundle> serializePendingTasks() {
356     return getTasks().toBundles();
357   }
358 
359   private void prepareStop() {
360     VvmLog.i(
361         TAG,
362         "no more tasks, stopping service if no task are added in "
363             + STOP_DELAY_MILLISECONDS
364             + " millis");
365     mainThreadHandler.postDelayed(stopServiceWithDelay, STOP_DELAY_MILLISECONDS);
366   }
367 
368   @NeededForTesting
369   static class MessageSender {
370 
371     public void send(Message message) {
372       message.sendToTarget();
373     }
374   }
375 
376   @NeededForTesting
377   void setTaskAutoRunDisabledForTest(boolean value) {
378     taskAutoRunDisabledForTesting = value;
379   }
380 
381   @NeededForTesting
382   void setMessageSenderForTest(MessageSender sender) {
383     messageSender = sender;
384   }
385 
386   /**
387    * The {@link TaskSchedulerJobService} has started and all queued task should be executed in the
388    * worker thread.
389    */
390   @MainThread
391   public void onStartJob(Job job, List<Bundle> pendingTasks) {
392     VvmLog.i(TAG, "onStartJob");
393     this.job = job;
394     tasks.fromBundles(appContext, pendingTasks);
395     maybeRunNextTask();
396   }
397 
398   /**
399    * The {@link TaskSchedulerJobService} is being terminated by the system (timeout or network
400    * lost). A new job will be queued to resume all pending tasks. The current unfinished job may be
401    * ran again.
402    */
403   @MainThread
404   public void onStopJob() {
405     VvmLog.e(TAG, "onStopJob");
406     if (isJobRunning() && !isTerminating()) {
407       scheduleJobAndTerminate(0, true);
408     }
409   }
410 
411   /**
412    * Send all pending tasks and schedule a new {@link TaskSchedulerJobService}. The current executor
413    * will start the termination process, but restarted when the scheduled job runs in the future.
414    *
415    * @param delayMillis the delay before stating the job, see {@link
416    *     android.app.job.JobInfo.Builder#setMinimumLatency(long)}. This must be 0 if {@code
417    *     isNewJob} is true.
418    * @param isNewJob a new job will be requested to run immediately, bypassing all requirements.
419    */
420   @MainThread
421   @VisibleForTesting
422   void scheduleJobAndTerminate(long delayMillis, boolean isNewJob) {
423     Assert.isMainThread();
424     finishJobAsync();
425     mainThreadHandler.post(new JobFinishedPoller(delayMillis, isNewJob));
426   }
427 
428   /**
429    * Whether the TaskExecutor is still terminating. {@link TaskReceiver} should defer all new task
430    * until {@link #getRunningInstance()} returns {@code null} so a new job can be started. {@link
431    * #scheduleJobAndTerminate(long, boolean)} does not run immediately because the job can only be
432    * scheduled after the main thread has returned. The TaskExecutor will be in a intermediate state
433    * between scheduleJobAndTerminate() and terminate(). In this state, {@link #getRunningInstance()}
434    * returns non-null because it has not been fully stopped yet, but the TaskExecutor cannot do
435    * anything. A new job should not be scheduled either because the current job might still be
436    * running.
437    */
438   @MainThread
439   public boolean isTerminating() {
440     return isTerminating;
441   }
442 
443   /**
444    * Signals {@link TaskSchedulerJobService} the current session of tasks has finished, and the wake
445    * lock can be released. Note: this only takes effect after the main thread has been returned. If
446    * a new job need to be scheduled, it should be posted on the main thread handler instead of
447    * calling directly.
448    */
449   @MainThread
450   private void finishJobAsync() {
451     Assert.isTrue(!isTerminating());
452     Assert.isMainThread();
453     VvmLog.i(TAG, "finishing Job");
454     job.finishAsync();
455     isTerminating = true;
456     mainThreadHandler.removeCallbacks(stopServiceWithDelay);
457   }
458 
459   private boolean isJobRunning() {
460     return job != null;
461   }
462 }
463