1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tv.data;
18 
19 import android.content.ContentResolver;
20 import android.content.Context;
21 import android.database.ContentObserver;
22 import android.database.Cursor;
23 import android.media.tv.TvContract;
24 import android.media.tv.TvContract.Programs;
25 import android.net.Uri;
26 import android.os.Handler;
27 import android.os.Looper;
28 import android.os.Message;
29 import android.support.annotation.MainThread;
30 import android.support.annotation.VisibleForTesting;
31 import android.util.ArraySet;
32 import android.util.Log;
33 import android.util.LongSparseArray;
34 import android.util.LruCache;
35 
36 import com.android.tv.common.MemoryManageable;
37 import com.android.tv.common.SoftPreconditions;
38 import com.android.tv.data.epg.EpgFetcher;
39 import com.android.tv.experiments.Experiments;
40 import com.android.tv.util.AsyncDbTask;
41 import com.android.tv.util.Clock;
42 import com.android.tv.util.MultiLongSparseArray;
43 import com.android.tv.util.Utils;
44 
45 import java.util.ArrayList;
46 import java.util.Collections;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.List;
50 import java.util.ListIterator;
51 import java.util.Map;
52 import java.util.Objects;
53 import java.util.Set;
54 import java.util.concurrent.TimeUnit;
55 
56 @MainThread
57 public class ProgramDataManager implements MemoryManageable {
58     private static final String TAG = "ProgramDataManager";
59     private static final boolean DEBUG = false;
60 
61     // To prevent from too many program update operations at the same time, we give random interval
62     // between PERIODIC_PROGRAM_UPDATE_MIN_MS and PERIODIC_PROGRAM_UPDATE_MAX_MS.
63     private static final long PERIODIC_PROGRAM_UPDATE_MIN_MS = TimeUnit.MINUTES.toMillis(5);
64     private static final long PERIODIC_PROGRAM_UPDATE_MAX_MS = TimeUnit.MINUTES.toMillis(10);
65     private static final long PROGRAM_PREFETCH_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5);
66     // TODO: need to optimize consecutive DB updates.
67     private static final long CURRENT_PROGRAM_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5);
68     @VisibleForTesting
69     static final long PROGRAM_GUIDE_SNAP_TIME_MS = TimeUnit.MINUTES.toMillis(30);
70     @VisibleForTesting
71     static final long PROGRAM_GUIDE_MAX_TIME_RANGE = TimeUnit.DAYS.toMillis(2);
72 
73     // TODO: Use TvContract constants, once they become public.
74     private static final String PARAM_START_TIME = "start_time";
75     private static final String PARAM_END_TIME = "end_time";
76     // COLUMN_CHANNEL_ID, COLUMN_END_TIME_UTC_MILLIS are added to detect duplicated programs.
77     // Duplicated programs are always consecutive by the sorting order.
78     private static final String SORT_BY_TIME = Programs.COLUMN_START_TIME_UTC_MILLIS + ", "
79             + Programs.COLUMN_CHANNEL_ID + ", " + Programs.COLUMN_END_TIME_UTC_MILLIS;
80 
81     private static final int MSG_UPDATE_CURRENT_PROGRAMS = 1000;
82     private static final int MSG_UPDATE_ONE_CURRENT_PROGRAM = 1001;
83     private static final int MSG_UPDATE_PREFETCH_PROGRAM = 1002;
84 
85     private final Clock mClock;
86     private final ContentResolver mContentResolver;
87     private boolean mStarted;
88     private ProgramsUpdateTask mProgramsUpdateTask;
89     private final LongSparseArray<UpdateCurrentProgramForChannelTask> mProgramUpdateTaskMap =
90             new LongSparseArray<>();
91     private final Map<Long, Program> mChannelIdCurrentProgramMap = new HashMap<>();
92     private final MultiLongSparseArray<OnCurrentProgramUpdatedListener>
93             mChannelId2ProgramUpdatedListeners = new MultiLongSparseArray<>();
94     private final Handler mHandler;
95     private final Set<Listener> mListeners = new ArraySet<>();
96 
97     private final ContentObserver mProgramObserver;
98 
99     private boolean mPrefetchEnabled;
100     private long mProgramPrefetchUpdateWaitMs;
101     private long mLastPrefetchTaskRunMs;
102     private ProgramsPrefetchTask mProgramsPrefetchTask;
103     private Map<Long, ArrayList<Program>> mChannelIdProgramCache = new HashMap<>();
104 
105     // Any program that ends prior to this time will be removed from the cache
106     // when a channel's current program is updated.
107     // Note that there's no limit for end time.
108     private long mPrefetchTimeRangeStartMs;
109 
110     private boolean mPauseProgramUpdate = false;
111     private final LruCache<Long, Program> mZeroLengthProgramCache = new LruCache<>(10);
112     private final EpgFetcher mEpgFetcher;
113 
ProgramDataManager(Context context)114     public ProgramDataManager(Context context) {
115         this(context.getContentResolver(), Clock.SYSTEM, Looper.myLooper(),
116                 EpgFetcher.getInstance(context));
117     }
118 
119     @VisibleForTesting
ProgramDataManager(ContentResolver contentResolver, Clock time, Looper looper, EpgFetcher epgFetcher)120     ProgramDataManager(ContentResolver contentResolver, Clock time, Looper looper,
121             EpgFetcher epgFetcher) {
122         mEpgFetcher = epgFetcher;
123         mClock = time;
124         mContentResolver = contentResolver;
125         mHandler = new MyHandler(looper);
126         mProgramObserver = new ContentObserver(mHandler) {
127             @Override
128             public void onChange(boolean selfChange) {
129                 if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) {
130                     mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS);
131                 }
132                 if (isProgramUpdatePaused()) {
133                     return;
134                 }
135                 if (mPrefetchEnabled) {
136                     // The delay time of an existing MSG_UPDATE_PREFETCH_PROGRAM could be quite long
137                     // up to PROGRAM_GUIDE_SNAP_TIME_MS. So we need to remove the existing message
138                     // and send MSG_UPDATE_PREFETCH_PROGRAM again.
139                     mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM);
140                     mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
141                 }
142             }
143         };
144         mProgramPrefetchUpdateWaitMs = PROGRAM_PREFETCH_UPDATE_WAIT_MS;
145     }
146 
147     @VisibleForTesting
getContentObserver()148     ContentObserver getContentObserver() {
149         return mProgramObserver;
150     }
151 
152     /**
153      * Set the program prefetch update wait which gives the delay to query all programs from DB
154      * to prevent from too frequent DB queries.
155      * Default value is {@link #PROGRAM_PREFETCH_UPDATE_WAIT_MS}
156      */
157     @VisibleForTesting
setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs)158     void setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs) {
159         mProgramPrefetchUpdateWaitMs = programPrefetchUpdateWaitMs;
160     }
161 
162     /**
163      * Starts the manager.
164      */
start()165     public void start() {
166         if (mStarted) {
167             return;
168         }
169         mStarted = true;
170         // Should be called directly instead of posting MSG_UPDATE_CURRENT_PROGRAMS message
171         // to the handler. If not, another DB task can be executed before loading current programs.
172         handleUpdateCurrentPrograms();
173         if (mPrefetchEnabled) {
174             mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
175         }
176         mContentResolver.registerContentObserver(Programs.CONTENT_URI,
177                 true, mProgramObserver);
178         if (mEpgFetcher != null && Experiments.CLOUD_EPG.get()) {
179             mEpgFetcher.start();
180         }
181     }
182 
183     /**
184      * Stops the manager. It clears manager states and runs pending DB operations. Added listeners
185      * aren't automatically removed by this method.
186      */
187     @VisibleForTesting
stop()188     public void stop() {
189         if (!mStarted) {
190             return;
191         }
192         mStarted = false;
193 
194         if (mEpgFetcher != null) {
195             mEpgFetcher.stop();
196         }
197         mContentResolver.unregisterContentObserver(mProgramObserver);
198         mHandler.removeCallbacksAndMessages(null);
199 
200         clearTask(mProgramUpdateTaskMap);
201         cancelPrefetchTask();
202         if (mProgramsUpdateTask != null) {
203             mProgramsUpdateTask.cancel(true);
204             mProgramsUpdateTask = null;
205         }
206     }
207 
208     /**
209      * Returns the current program at the specified channel.
210      */
getCurrentProgram(long channelId)211     public Program getCurrentProgram(long channelId) {
212         return mChannelIdCurrentProgramMap.get(channelId);
213     }
214 
215     /**
216      * Reloads program data.
217      */
reload()218     public void reload() {
219         if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) {
220             mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS);
221         }
222         if (mPrefetchEnabled && !mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
223             mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
224         }
225     }
226 
227     /**
228      * A listener interface to receive notification on program data retrieval from DB.
229      */
230     public interface Listener {
231         /**
232          * Called when a Program data is now available through getProgram()
233          * after the DB operation is done which wasn't before.
234          * This would be called only if fetched data is around the selected program.
235          **/
onProgramUpdated()236         void onProgramUpdated();
237     }
238 
239     /**
240      * Adds the {@link Listener}.
241      */
addListener(Listener listener)242     public void addListener(Listener listener) {
243         mListeners.add(listener);
244     }
245 
246     /**
247      * Removes the {@link Listener}.
248      */
removeListener(Listener listener)249     public void removeListener(Listener listener) {
250         mListeners.remove(listener);
251     }
252 
253     /**
254      * Enables or Disables program prefetch.
255      */
setPrefetchEnabled(boolean enable)256     public void setPrefetchEnabled(boolean enable) {
257         if (mPrefetchEnabled == enable) {
258             return;
259         }
260         if (enable) {
261             mPrefetchEnabled = true;
262             mLastPrefetchTaskRunMs = 0;
263             if (mStarted) {
264                 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
265             }
266         } else {
267             mPrefetchEnabled = false;
268             cancelPrefetchTask();
269             mChannelIdProgramCache.clear();
270             mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM);
271         }
272     }
273 
274     /**
275      * Returns the programs for the given channel which ends after the given start time.
276      *
277      * <p> Prefetch should be enabled to call it.
278      *
279      * @return {@link List} with Programs. It may includes dummy program if the entry needs DB
280      *         operations to get.
281      */
getPrograms(long channelId, long startTime)282     public List<Program> getPrograms(long channelId, long startTime) {
283         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
284         ArrayList<Program> cachedPrograms = mChannelIdProgramCache.get(channelId);
285         if (cachedPrograms == null) {
286             return Collections.emptyList();
287         }
288         int startIndex = getProgramIndexAt(cachedPrograms, startTime);
289         return Collections.unmodifiableList(
290                 cachedPrograms.subList(startIndex, cachedPrograms.size()));
291     }
292 
293     // Returns the index of program that is played at the specified time.
294     // If there isn't, return the first program among programs that starts after the given time
295     // if returnNextProgram is {@code true}.
getProgramIndexAt(List<Program> programs, long time)296     private int getProgramIndexAt(List<Program> programs, long time) {
297         Program key = mZeroLengthProgramCache.get(time);
298         if (key == null) {
299             key = createDummyProgram(time, time);
300             mZeroLengthProgramCache.put(time, key);
301         }
302         int index = Collections.binarySearch(programs, key);
303         if (index < 0) {
304             index = -(index + 1); // change it to index to be added.
305             if (index > 0 && isProgramPlayedAt(programs.get(index - 1), time)) {
306                 // A program is played at that time.
307                 return index - 1;
308             }
309             return index;
310         }
311         return index;
312     }
313 
isProgramPlayedAt(Program program, long time)314     private boolean isProgramPlayedAt(Program program, long time) {
315         return program.getStartTimeUtcMillis() <= time && time <= program.getEndTimeUtcMillis();
316     }
317 
318     /**
319      * Adds the listener to be notified if current program is updated for a channel.
320      *
321      * @param channelId A channel ID to get notified. If it's {@link Channel#INVALID_ID}, the
322      *            listener would be called whenever a current program is updated.
323      */
addOnCurrentProgramUpdatedListener( long channelId, OnCurrentProgramUpdatedListener listener)324     public void addOnCurrentProgramUpdatedListener(
325             long channelId, OnCurrentProgramUpdatedListener listener) {
326         mChannelId2ProgramUpdatedListeners
327                 .put(channelId, listener);
328     }
329 
330     /**
331      * Removes the listener previously added by
332      * {@link #addOnCurrentProgramUpdatedListener(long, OnCurrentProgramUpdatedListener)}.
333      */
removeOnCurrentProgramUpdatedListener( long channelId, OnCurrentProgramUpdatedListener listener)334     public void removeOnCurrentProgramUpdatedListener(
335             long channelId, OnCurrentProgramUpdatedListener listener) {
336         mChannelId2ProgramUpdatedListeners
337                 .remove(channelId, listener);
338     }
339 
notifyCurrentProgramUpdate(long channelId, Program program)340     private void notifyCurrentProgramUpdate(long channelId, Program program) {
341 
342         for (OnCurrentProgramUpdatedListener listener : mChannelId2ProgramUpdatedListeners
343                 .get(channelId)) {
344             listener.onCurrentProgramUpdated(channelId, program);
345             }
346         for (OnCurrentProgramUpdatedListener listener : mChannelId2ProgramUpdatedListeners
347                 .get(Channel.INVALID_ID)) {
348             listener.onCurrentProgramUpdated(channelId, program);
349             }
350     }
351 
updateCurrentProgram(long channelId, Program program)352     private void updateCurrentProgram(long channelId, Program program) {
353         Program previousProgram = mChannelIdCurrentProgramMap.put(channelId, program);
354         if (!Objects.equals(program, previousProgram)) {
355             if (mPrefetchEnabled) {
356                 removePreviousProgramsAndUpdateCurrentProgramInCache(channelId, program);
357             }
358             notifyCurrentProgramUpdate(channelId, program);
359         }
360 
361         long delayedTime;
362         if (program == null) {
363             delayedTime = PERIODIC_PROGRAM_UPDATE_MIN_MS
364                     + (long) (Math.random() * (PERIODIC_PROGRAM_UPDATE_MAX_MS
365                             - PERIODIC_PROGRAM_UPDATE_MIN_MS));
366         } else {
367             delayedTime = program.getEndTimeUtcMillis() - mClock.currentTimeMillis();
368         }
369         mHandler.sendMessageDelayed(mHandler.obtainMessage(
370                 MSG_UPDATE_ONE_CURRENT_PROGRAM, channelId), delayedTime);
371     }
372 
removePreviousProgramsAndUpdateCurrentProgramInCache( long channelId, Program currentProgram)373     private void removePreviousProgramsAndUpdateCurrentProgramInCache(
374             long channelId, Program currentProgram) {
375         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
376         if (!Program.isValid(currentProgram)) {
377             return;
378         }
379         ArrayList<Program> cachedPrograms = mChannelIdProgramCache.remove(channelId);
380         if (cachedPrograms == null) {
381             return;
382         }
383         ListIterator<Program> i = cachedPrograms.listIterator();
384         while (i.hasNext()) {
385             Program cachedProgram = i.next();
386             if (cachedProgram.getEndTimeUtcMillis() <= mPrefetchTimeRangeStartMs) {
387                 // Remove previous programs which will not be shown in program guide.
388                 i.remove();
389                 continue;
390             }
391 
392             if (cachedProgram.getEndTimeUtcMillis() <= currentProgram
393                     .getStartTimeUtcMillis()) {
394                 // Keep the programs that ends earlier than current program
395                 // but later than mPrefetchTimeRangeStartMs.
396                 continue;
397             }
398 
399             // Update dummy program around current program if any.
400             if (cachedProgram.getStartTimeUtcMillis() < currentProgram
401                     .getStartTimeUtcMillis()) {
402                 // The dummy program starts earlier than the current program. Adjust its end time.
403                 i.set(createDummyProgram(cachedProgram.getStartTimeUtcMillis(),
404                         currentProgram.getStartTimeUtcMillis()));
405                 i.add(currentProgram);
406             } else {
407                 i.set(currentProgram);
408             }
409             if (currentProgram.getEndTimeUtcMillis() < cachedProgram.getEndTimeUtcMillis()) {
410                 // The dummy program ends later than the current program. Adjust its start time.
411                 i.add(createDummyProgram(currentProgram.getEndTimeUtcMillis(),
412                         cachedProgram.getEndTimeUtcMillis()));
413             }
414             break;
415         }
416         if (cachedPrograms.isEmpty()) {
417             // If all the cached programs finish before mPrefetchTimeRangeStartMs, the
418             // currentProgram would not have a chance to be inserted to the cache.
419             cachedPrograms.add(currentProgram);
420         }
421         mChannelIdProgramCache.put(channelId, cachedPrograms);
422     }
423 
handleUpdateCurrentPrograms()424     private void handleUpdateCurrentPrograms() {
425         if (mProgramsUpdateTask != null) {
426             mHandler.sendEmptyMessageDelayed(MSG_UPDATE_CURRENT_PROGRAMS,
427                     CURRENT_PROGRAM_UPDATE_WAIT_MS);
428             return;
429         }
430         clearTask(mProgramUpdateTaskMap);
431         mHandler.removeMessages(MSG_UPDATE_ONE_CURRENT_PROGRAM);
432         mProgramsUpdateTask = new ProgramsUpdateTask(mContentResolver, mClock.currentTimeMillis());
433         mProgramsUpdateTask.executeOnDbThread();
434     }
435 
436     private class ProgramsPrefetchTask
437             extends AsyncDbTask<Void, Void, Map<Long, ArrayList<Program>>> {
438         private final long mStartTimeMs;
439         private final long mEndTimeMs;
440 
441         private boolean mSuccess;
442 
ProgramsPrefetchTask()443         public ProgramsPrefetchTask() {
444             long time = mClock.currentTimeMillis();
445             mStartTimeMs = Utils
446                     .floorTime(time - PROGRAM_GUIDE_SNAP_TIME_MS, PROGRAM_GUIDE_SNAP_TIME_MS);
447             mEndTimeMs = mStartTimeMs + PROGRAM_GUIDE_MAX_TIME_RANGE;
448             mSuccess = false;
449         }
450 
451         @Override
doInBackground(Void... params)452         protected Map<Long, ArrayList<Program>> doInBackground(Void... params) {
453             Map<Long, ArrayList<Program>> programMap = new HashMap<>();
454             if (DEBUG) {
455                 Log.d(TAG, "Starts programs prefetch. " + Utils.toTimeString(mStartTimeMs) + "-"
456                         + Utils.toTimeString(mEndTimeMs));
457             }
458             Uri uri = Programs.CONTENT_URI.buildUpon()
459                     .appendQueryParameter(PARAM_START_TIME, String.valueOf(mStartTimeMs))
460                     .appendQueryParameter(PARAM_END_TIME, String.valueOf(mEndTimeMs)).build();
461             final int RETRY_COUNT = 3;
462             Program lastReadProgram = null;
463             for (int retryCount = RETRY_COUNT; retryCount > 0; retryCount--) {
464                 if (isProgramUpdatePaused()) {
465                     return null;
466                 }
467                 programMap.clear();
468                 try (Cursor c = mContentResolver.query(uri, Program.PROJECTION, null, null,
469                         SORT_BY_TIME)) {
470                     if (c == null) {
471                         continue;
472                     }
473                     while (c.moveToNext()) {
474                         int duplicateCount = 0;
475                         if (isCancelled()) {
476                             if (DEBUG) {
477                                 Log.d(TAG, "ProgramsPrefetchTask canceled.");
478                             }
479                             return null;
480                         }
481                         Program program = Program.fromCursor(c);
482                         if (Program.isDuplicate(program, lastReadProgram)) {
483                             duplicateCount++;
484                             continue;
485                         } else {
486                             lastReadProgram = program;
487                         }
488                         ArrayList<Program> programs = programMap.get(program.getChannelId());
489                         if (programs == null) {
490                             programs = new ArrayList<>();
491                             programMap.put(program.getChannelId(), programs);
492                         }
493                         programs.add(program);
494                         if (duplicateCount > 0) {
495                             Log.w(TAG, "Found " + duplicateCount + " duplicate programs");
496                         }
497                     }
498                     mSuccess = true;
499                     break;
500                 } catch (IllegalStateException e) {
501                     if (DEBUG) {
502                         Log.d(TAG, "Database is changed while querying. Will retry.");
503                     }
504                 } catch (SecurityException e) {
505                     Log.d(TAG, "Security exception during program data query", e);
506                 }
507             }
508             if (DEBUG) {
509                 Log.d(TAG, "Ends programs prefetch for " + programMap.size() + " channels");
510             }
511             return programMap;
512         }
513 
514         @Override
onPostExecute(Map<Long, ArrayList<Program>> programs)515         protected void onPostExecute(Map<Long, ArrayList<Program>> programs) {
516             mProgramsPrefetchTask = null;
517             if (isProgramUpdatePaused()) {
518                 // ProgramsPrefetchTask will run again once setPauseProgramUpdate(false) is called.
519                 return;
520             }
521             long nextMessageDelayedTime;
522             if (mSuccess) {
523                 mChannelIdProgramCache = programs;
524                 notifyProgramUpdated();
525                 long currentTime = mClock.currentTimeMillis();
526                 mLastPrefetchTaskRunMs = currentTime;
527                 nextMessageDelayedTime =
528                         Utils.floorTime(mLastPrefetchTaskRunMs + PROGRAM_GUIDE_SNAP_TIME_MS,
529                                 PROGRAM_GUIDE_SNAP_TIME_MS) - currentTime;
530             } else {
531                 nextMessageDelayedTime = PERIODIC_PROGRAM_UPDATE_MIN_MS;
532             }
533             if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
534                 mHandler.sendEmptyMessageDelayed(MSG_UPDATE_PREFETCH_PROGRAM,
535                         nextMessageDelayedTime);
536             }
537         }
538     }
539 
notifyProgramUpdated()540     private void notifyProgramUpdated() {
541         for (Listener listener : mListeners) {
542             listener.onProgramUpdated();
543         }
544     }
545 
546     private class ProgramsUpdateTask extends AsyncDbTask.AsyncQueryTask<List<Program>> {
ProgramsUpdateTask(ContentResolver contentResolver, long time)547         public ProgramsUpdateTask(ContentResolver contentResolver, long time) {
548             super(contentResolver, Programs.CONTENT_URI.buildUpon()
549                             .appendQueryParameter(PARAM_START_TIME, String.valueOf(time))
550                             .appendQueryParameter(PARAM_END_TIME, String.valueOf(time)).build(),
551                     Program.PROJECTION, null, null, SORT_BY_TIME);
552         }
553 
554         @Override
onQuery(Cursor c)555         public List<Program> onQuery(Cursor c) {
556             final List<Program> programs = new ArrayList<>();
557             if (c != null) {
558                 int duplicateCount = 0;
559                 Program lastReadProgram = null;
560                 while (c.moveToNext()) {
561                     if (isCancelled()) {
562                         return programs;
563                     }
564                     Program program = Program.fromCursor(c);
565                     if (Program.isDuplicate(program, lastReadProgram)) {
566                         duplicateCount++;
567                         continue;
568                     } else {
569                         lastReadProgram = program;
570                     }
571                     programs.add(program);
572                 }
573                 if (duplicateCount > 0) {
574                     Log.w(TAG, "Found " + duplicateCount + " duplicate programs");
575                 }
576             }
577             return programs;
578         }
579 
580         @Override
onPostExecute(List<Program> programs)581         protected void onPostExecute(List<Program> programs) {
582             if (DEBUG) Log.d(TAG, "ProgramsUpdateTask done");
583             mProgramsUpdateTask = null;
584             if (programs == null) {
585                 return;
586             }
587             Set<Long> removedChannelIds = new HashSet<>(mChannelIdCurrentProgramMap.keySet());
588             for (Program program : programs) {
589                 long channelId = program.getChannelId();
590                 updateCurrentProgram(channelId, program);
591                 removedChannelIds.remove(channelId);
592             }
593             for (Long channelId : removedChannelIds) {
594                 if (mPrefetchEnabled) {
595                     mChannelIdProgramCache.remove(channelId);
596                 }
597                 mChannelIdCurrentProgramMap.remove(channelId);
598                 notifyCurrentProgramUpdate(channelId, null);
599             }
600         }
601     }
602 
603     private class UpdateCurrentProgramForChannelTask extends AsyncDbTask.AsyncQueryTask<Program> {
604         private final long mChannelId;
UpdateCurrentProgramForChannelTask(ContentResolver contentResolver, long channelId, long time)605         private UpdateCurrentProgramForChannelTask(ContentResolver contentResolver, long channelId,
606                 long time) {
607             super(contentResolver, TvContract.buildProgramsUriForChannel(channelId, time, time),
608                     Program.PROJECTION, null, null, SORT_BY_TIME);
609             mChannelId = channelId;
610         }
611 
612         @Override
onQuery(Cursor c)613         public Program onQuery(Cursor c) {
614             Program program = null;
615             if (c != null && c.moveToNext()) {
616                 program = Program.fromCursor(c);
617             }
618             return program;
619         }
620 
621         @Override
onPostExecute(Program program)622         protected void onPostExecute(Program program) {
623             mProgramUpdateTaskMap.remove(mChannelId);
624             updateCurrentProgram(mChannelId, program);
625         }
626     }
627 
628     private class MyHandler extends Handler {
MyHandler(Looper looper)629         public MyHandler(Looper looper) {
630             super(looper);
631         }
632 
633         @Override
handleMessage(Message msg)634         public void handleMessage(Message msg) {
635             switch (msg.what) {
636                 case MSG_UPDATE_CURRENT_PROGRAMS:
637                     handleUpdateCurrentPrograms();
638                     break;
639                 case MSG_UPDATE_ONE_CURRENT_PROGRAM: {
640                     long channelId = (Long) msg.obj;
641                     UpdateCurrentProgramForChannelTask oldTask = mProgramUpdateTaskMap
642                             .get(channelId);
643                     if (oldTask != null) {
644                         oldTask.cancel(true);
645                     }
646                     UpdateCurrentProgramForChannelTask
647                             task = new UpdateCurrentProgramForChannelTask(
648                             mContentResolver, channelId, mClock.currentTimeMillis());
649                     mProgramUpdateTaskMap.put(channelId, task);
650                     task.executeOnDbThread();
651                     break;
652                 }
653                 case MSG_UPDATE_PREFETCH_PROGRAM: {
654                     if (isProgramUpdatePaused()) {
655                         return;
656                     }
657                     if (mProgramsPrefetchTask != null) {
658                         mHandler.sendEmptyMessageDelayed(msg.what, mProgramPrefetchUpdateWaitMs);
659                         return;
660                     }
661                     long delayMillis = mLastPrefetchTaskRunMs + mProgramPrefetchUpdateWaitMs
662                             - mClock.currentTimeMillis();
663                     if (delayMillis > 0) {
664                         mHandler.sendEmptyMessageDelayed(MSG_UPDATE_PREFETCH_PROGRAM, delayMillis);
665                     } else {
666                         mProgramsPrefetchTask = new ProgramsPrefetchTask();
667                         mProgramsPrefetchTask.executeOnDbThread();
668                     }
669                     break;
670                 }
671             }
672         }
673     }
674 
675     /**
676      * Pause program update.
677      * Updating program data will result in UI refresh,
678      * but UI is fragile to handle it so we'd better disable it for a while.
679      *
680      * <p> Prefetch should be enabled to call it.
681      */
setPauseProgramUpdate(boolean pauseProgramUpdate)682     public void setPauseProgramUpdate(boolean pauseProgramUpdate) {
683         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
684         if (mPauseProgramUpdate && !pauseProgramUpdate) {
685             if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
686                 // MSG_UPDATE_PRFETCH_PROGRAM can be empty
687                 // if prefetch task is launched while program update is paused.
688                 // Update immediately in that case.
689                 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
690             }
691         }
692         mPauseProgramUpdate = pauseProgramUpdate;
693     }
694 
isProgramUpdatePaused()695     private boolean isProgramUpdatePaused() {
696         // Although pause is requested, we need to keep updating if cache is empty.
697         return mPauseProgramUpdate && !mChannelIdProgramCache.isEmpty();
698     }
699 
700     /**
701      * Sets program data prefetch time range.
702      * Any program data that ends before the start time will be removed from the cache later.
703      * Note that there's no limit for end time.
704      *
705      * <p> Prefetch should be enabled to call it.
706      */
setPrefetchTimeRange(long startTimeMs)707     public void setPrefetchTimeRange(long startTimeMs) {
708         SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
709         if (mPrefetchTimeRangeStartMs > startTimeMs) {
710             // Fetch the programs immediately to re-create the cache.
711             if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
712                 mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
713             }
714         }
715         mPrefetchTimeRangeStartMs = startTimeMs;
716     }
717 
clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks)718     private void clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks) {
719         for (int i = 0; i < tasks.size(); i++) {
720             tasks.valueAt(i).cancel(true);
721         }
722         tasks.clear();
723     }
724 
cancelPrefetchTask()725     private void cancelPrefetchTask() {
726         if (mProgramsPrefetchTask != null) {
727             mProgramsPrefetchTask.cancel(true);
728             mProgramsPrefetchTask = null;
729         }
730     }
731 
732     // Create dummy program which indicates data isn't loaded yet so DB query is required.
createDummyProgram(long startTimeMs, long endTimeMs)733     private Program createDummyProgram(long startTimeMs, long endTimeMs) {
734         return new Program.Builder()
735                 .setChannelId(Channel.INVALID_ID)
736                 .setStartTimeUtcMillis(startTimeMs)
737                 .setEndTimeUtcMillis(endTimeMs).build();
738     }
739 
740     @Override
performTrimMemory(int level)741     public void performTrimMemory(int level) {
742         mChannelId2ProgramUpdatedListeners.clearEmptyCache();
743     }
744 }
745