1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21 
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/avc_utils.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 #include "mpeg2ts/HlsSampleDecryptor.h"
30 
31 #include <media/stagefright/foundation/ABitReader.h>
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37 
38 #include <ctype.h>
39 #include <inttypes.h>
40 
41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44 
45 namespace android {
46 
47 // static
48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50 // LCM of 188 (size of a TS packet) & 1k works well
51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52 
53 struct PlaylistFetcher::DownloadState : public RefBase {
54     DownloadState();
55     void resetState();
56     bool hasSavedState() const;
57     void restoreState(
58             AString &uri,
59             sp<AMessage> &itemMeta,
60             sp<ABuffer> &buffer,
61             sp<ABuffer> &tsBuffer,
62             int32_t &firstSeqNumberInPlaylist,
63             int32_t &lastSeqNumberInPlaylist);
64     void saveState(
65             AString &uri,
66             sp<AMessage> &itemMeta,
67             sp<ABuffer> &buffer,
68             sp<ABuffer> &tsBuffer,
69             int32_t &firstSeqNumberInPlaylist,
70             int32_t &lastSeqNumberInPlaylist);
71 
72 private:
73     bool mHasSavedState;
74     AString mUri;
75     sp<AMessage> mItemMeta;
76     sp<ABuffer> mBuffer;
77     sp<ABuffer> mTsBuffer;
78     int32_t mFirstSeqNumberInPlaylist;
79     int32_t mLastSeqNumberInPlaylist;
80 };
81 
DownloadState()82 PlaylistFetcher::DownloadState::DownloadState() {
83     resetState();
84 }
85 
hasSavedState() const86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
87     return mHasSavedState;
88 }
89 
resetState()90 void PlaylistFetcher::DownloadState::resetState() {
91     mHasSavedState = false;
92 
93     mUri.clear();
94     mItemMeta = NULL;
95     mBuffer = NULL;
96     mTsBuffer = NULL;
97     mFirstSeqNumberInPlaylist = 0;
98     mLastSeqNumberInPlaylist = 0;
99 }
100 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)101 void PlaylistFetcher::DownloadState::restoreState(
102         AString &uri,
103         sp<AMessage> &itemMeta,
104         sp<ABuffer> &buffer,
105         sp<ABuffer> &tsBuffer,
106         int32_t &firstSeqNumberInPlaylist,
107         int32_t &lastSeqNumberInPlaylist) {
108     if (!mHasSavedState) {
109         return;
110     }
111 
112     uri = mUri;
113     itemMeta = mItemMeta;
114     buffer = mBuffer;
115     tsBuffer = mTsBuffer;
116     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118 
119     resetState();
120 }
121 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)122 void PlaylistFetcher::DownloadState::saveState(
123         AString &uri,
124         sp<AMessage> &itemMeta,
125         sp<ABuffer> &buffer,
126         sp<ABuffer> &tsBuffer,
127         int32_t &firstSeqNumberInPlaylist,
128         int32_t &lastSeqNumberInPlaylist) {
129     mHasSavedState = true;
130 
131     mUri = uri;
132     mItemMeta = itemMeta;
133     mBuffer = buffer;
134     mTsBuffer = tsBuffer;
135     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137 }
138 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)139 PlaylistFetcher::PlaylistFetcher(
140         const sp<AMessage> &notify,
141         const sp<LiveSession> &session,
142         const char *uri,
143         int32_t id,
144         int32_t subtitleGeneration)
145     : mNotify(notify),
146       mSession(session),
147       mURI(uri),
148       mFetcherID(id),
149       mStreamTypeMask(0),
150       mStartTimeUs(-1ll),
151       mSegmentStartTimeUs(-1ll),
152       mDiscontinuitySeq(-1ll),
153       mStartTimeUsRelative(false),
154       mLastPlaylistFetchTimeUs(-1ll),
155       mPlaylistTimeUs(-1ll),
156       mSeqNumber(-1),
157       mNumRetries(0),
158       mStartup(true),
159       mIDRFound(false),
160       mSeekMode(LiveSession::kSeekModeExactPosition),
161       mTimeChangeSignaled(false),
162       mNextPTSTimeUs(-1ll),
163       mMonitorQueueGeneration(0),
164       mSubtitleGeneration(subtitleGeneration),
165       mLastDiscontinuitySeq(-1ll),
166       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167       mFirstPTSValid(false),
168       mFirstTimeUs(-1ll),
169       mVideoBuffer(new AnotherPacketSource(NULL)),
170       mSampleAesKeyItemChanged(false),
171       mThresholdRatio(-1.0f),
172       mDownloadState(new DownloadState()),
173       mHasMetadata(false) {
174     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
175     mHTTPDownloader = mSession->getHTTPDownloader();
176 
177     memset(mKeyData, 0, sizeof(mKeyData));
178     memset(mAESInitVec, 0, sizeof(mAESInitVec));
179 }
180 
~PlaylistFetcher()181 PlaylistFetcher::~PlaylistFetcher() {
182 }
183 
getFetcherID() const184 int32_t PlaylistFetcher::getFetcherID() const {
185     return mFetcherID;
186 }
187 
getSegmentStartTimeUs(int32_t seqNumber) const188 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
189     CHECK(mPlaylist != NULL);
190 
191     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
192     mPlaylist->getSeqNumberRange(
193             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
194 
195     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
196     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
197 
198     int64_t segmentStartUs = 0ll;
199     for (int32_t index = 0;
200             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
201         sp<AMessage> itemMeta;
202         CHECK(mPlaylist->itemAt(
203                     index, NULL /* uri */, &itemMeta));
204 
205         int64_t itemDurationUs;
206         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
207 
208         segmentStartUs += itemDurationUs;
209     }
210 
211     return segmentStartUs;
212 }
213 
getSegmentDurationUs(int32_t seqNumber) const214 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
215     CHECK(mPlaylist != NULL);
216 
217     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
218     mPlaylist->getSeqNumberRange(
219             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
220 
221     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
222     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
223 
224     int32_t index = seqNumber - firstSeqNumberInPlaylist;
225     sp<AMessage> itemMeta;
226     CHECK(mPlaylist->itemAt(
227                 index, NULL /* uri */, &itemMeta));
228 
229     int64_t itemDurationUs;
230     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
231 
232     return itemDurationUs;
233 }
234 
delayUsToRefreshPlaylist() const235 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
236     int64_t nowUs = ALooper::GetNowUs();
237 
238     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
239         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
240         return 0ll;
241     }
242 
243     if (mPlaylist->isComplete()) {
244         return (~0llu >> 1);
245     }
246 
247     int64_t targetDurationUs = mPlaylist->getTargetDuration();
248 
249     int64_t minPlaylistAgeUs;
250 
251     switch (mRefreshState) {
252         case INITIAL_MINIMUM_RELOAD_DELAY:
253         {
254             size_t n = mPlaylist->size();
255             if (n > 0) {
256                 sp<AMessage> itemMeta;
257                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
258 
259                 int64_t itemDurationUs;
260                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
261 
262                 minPlaylistAgeUs = itemDurationUs;
263                 break;
264             }
265 
266             // fall through
267         }
268 
269         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
270         {
271             minPlaylistAgeUs = targetDurationUs / 2;
272             break;
273         }
274 
275         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
276         {
277             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
278             break;
279         }
280 
281         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
282         {
283             minPlaylistAgeUs = targetDurationUs * 3;
284             break;
285         }
286 
287         default:
288             TRESPASS();
289             break;
290     }
291 
292     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
293     return delayUs > 0ll ? delayUs : 0ll;
294 }
295 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)296 status_t PlaylistFetcher::decryptBuffer(
297         size_t playlistIndex, const sp<ABuffer> &buffer,
298         bool first) {
299     sp<AMessage> itemMeta;
300     bool found = false;
301     AString method;
302 
303     for (ssize_t i = playlistIndex; i >= 0; --i) {
304         AString uri;
305         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
306 
307         if (itemMeta->findString("cipher-method", &method)) {
308             found = true;
309             break;
310         }
311     }
312 
313     // TODO: Revise this when we add support for KEYFORMAT
314     // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
315     if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
316         ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
317                 mSampleAesKeyItem.get(), method.c_str());
318         mSampleAesKeyItem = NULL;
319         mSampleAesKeyItemChanged = true;
320     }
321 
322     if (!found) {
323         method = "NONE";
324     }
325     buffer->meta()->setString("cipher-method", method.c_str());
326 
327     if (method == "NONE") {
328         return OK;
329     } else if (method == "SAMPLE-AES") {
330         ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
331     } else if (!(method == "AES-128")) {
332         ALOGE("Unsupported cipher method '%s'", method.c_str());
333         return ERROR_UNSUPPORTED;
334     }
335 
336     AString keyURI;
337     if (!itemMeta->findString("cipher-uri", &keyURI)) {
338         ALOGE("Missing key uri");
339         return ERROR_MALFORMED;
340     }
341 
342     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
343 
344     sp<ABuffer> key;
345     if (index >= 0) {
346         key = mAESKeyForURI.valueAt(index);
347     } else {
348         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
349 
350         if (err == ERROR_NOT_CONNECTED) {
351             return ERROR_NOT_CONNECTED;
352         } else if (err < 0) {
353             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
354             return ERROR_IO;
355         } else if (key->size() != 16) {
356             ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
357             return ERROR_MALFORMED;
358         }
359 
360         mAESKeyForURI.add(keyURI, key);
361     }
362 
363     if (first) {
364         // If decrypting the first block in a file, read the iv from the manifest
365         // or derive the iv from the file's sequence number.
366 
367         unsigned char AESInitVec[AES_BLOCK_SIZE];
368         AString iv;
369         if (itemMeta->findString("cipher-iv", &iv)) {
370             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
371                     || iv.size() > 16 * 2 + 2) {
372                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
373                 return ERROR_MALFORMED;
374             }
375 
376             while (iv.size() < 16 * 2 + 2) {
377                 iv.insert("0", 1, 2);
378             }
379 
380             memset(AESInitVec, 0, sizeof(AESInitVec));
381             for (size_t i = 0; i < 16; ++i) {
382                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
383                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
384                 if (!isxdigit(c1) || !isxdigit(c2)) {
385                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
386                     return ERROR_MALFORMED;
387                 }
388                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
389                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
390 
391                 AESInitVec[i] = nibble1 << 4 | nibble2;
392             }
393         } else {
394             memset(AESInitVec, 0, sizeof(AESInitVec));
395             AESInitVec[15] = mSeqNumber & 0xff;
396             AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
397             AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
398             AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
399         }
400 
401         bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
402         bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
403         bool newSampleAesKeyItem = newKey || newInitVec;
404         ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
405                 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
406 
407         if (newSampleAesKeyItem) {
408             memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
409             memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
410 
411             if (method == "SAMPLE-AES") {
412                 mSampleAesKeyItemChanged = true;
413 
414                 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
415                 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
416 
417                 // always allocating a new one rather than updating the old message
418                 // lower layer might still have a reference to the old message
419                 mSampleAesKeyItem = new AMessage();
420                 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
421                 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
422 
423                 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s  IV: %s",
424                         HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
425                         HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
426             } // SAMPLE-AES
427         } // newSampleAesKeyItem
428     } // first
429 
430     if (method == "SAMPLE-AES") {
431         ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
432         return OK;
433     }
434 
435 
436     AES_KEY aes_key;
437     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
438         ALOGE("failed to set AES decryption key.");
439         return UNKNOWN_ERROR;
440     }
441 
442     size_t n = buffer->size();
443     if (!n) {
444         return OK;
445     }
446 
447     if (n < 16 || n % 16) {
448         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
449         return ERROR_MALFORMED;
450     }
451 
452     AES_cbc_encrypt(
453             buffer->data(), buffer->data(), buffer->size(),
454             &aes_key, mAESInitVec, AES_DECRYPT);
455 
456     return OK;
457 }
458 
checkDecryptPadding(const sp<ABuffer> & buffer)459 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
460     AString method;
461     CHECK(buffer->meta()->findString("cipher-method", &method));
462     if (method == "NONE" || method == "SAMPLE-AES") {
463         return OK;
464     }
465 
466     uint8_t padding = 0;
467     if (buffer->size() > 0) {
468         padding = buffer->data()[buffer->size() - 1];
469     }
470 
471     if (padding > 16) {
472         return ERROR_MALFORMED;
473     }
474 
475     for (size_t i = buffer->size() - padding; i < padding; i++) {
476         if (buffer->data()[i] != padding) {
477             return ERROR_MALFORMED;
478         }
479     }
480 
481     buffer->setRange(buffer->offset(), buffer->size() - padding);
482     return OK;
483 }
484 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)485 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
486     int64_t maxDelayUs = delayUsToRefreshPlaylist();
487     if (maxDelayUs < minDelayUs) {
488         maxDelayUs = minDelayUs;
489     }
490     if (delayUs > maxDelayUs) {
491         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
492         delayUs = maxDelayUs;
493     }
494     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
495     msg->setInt32("generation", mMonitorQueueGeneration);
496     msg->post(delayUs);
497 }
498 
cancelMonitorQueue()499 void PlaylistFetcher::cancelMonitorQueue() {
500     ++mMonitorQueueGeneration;
501 }
502 
setStoppingThreshold(float thresholdRatio,bool disconnect)503 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
504     {
505         AutoMutex _l(mThresholdLock);
506         mThresholdRatio = thresholdRatio;
507     }
508     if (disconnect) {
509         mHTTPDownloader->disconnect();
510     }
511 }
512 
resetStoppingThreshold(bool disconnect)513 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
514     {
515         AutoMutex _l(mThresholdLock);
516         mThresholdRatio = -1.0f;
517     }
518     if (disconnect) {
519         mHTTPDownloader->disconnect();
520     } else {
521         // allow reconnect
522         mHTTPDownloader->reconnect();
523     }
524 }
525 
getStoppingThreshold()526 float PlaylistFetcher::getStoppingThreshold() {
527     AutoMutex _l(mThresholdLock);
528     return mThresholdRatio;
529 }
530 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)531 void PlaylistFetcher::startAsync(
532         const sp<AnotherPacketSource> &audioSource,
533         const sp<AnotherPacketSource> &videoSource,
534         const sp<AnotherPacketSource> &subtitleSource,
535         const sp<AnotherPacketSource> &metadataSource,
536         int64_t startTimeUs,
537         int64_t segmentStartTimeUs,
538         int32_t startDiscontinuitySeq,
539         LiveSession::SeekMode seekMode) {
540     sp<AMessage> msg = new AMessage(kWhatStart, this);
541 
542     uint32_t streamTypeMask = 0ul;
543 
544     if (audioSource != NULL) {
545         msg->setPointer("audioSource", audioSource.get());
546         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
547     }
548 
549     if (videoSource != NULL) {
550         msg->setPointer("videoSource", videoSource.get());
551         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
552     }
553 
554     if (subtitleSource != NULL) {
555         msg->setPointer("subtitleSource", subtitleSource.get());
556         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
557     }
558 
559     if (metadataSource != NULL) {
560         msg->setPointer("metadataSource", metadataSource.get());
561         // metadataSource does not affect streamTypeMask.
562     }
563 
564     msg->setInt32("streamTypeMask", streamTypeMask);
565     msg->setInt64("startTimeUs", startTimeUs);
566     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
567     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
568     msg->setInt32("seekMode", seekMode);
569     msg->post();
570 }
571 
572 /*
573  * pauseAsync
574  *
575  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
576  *           -1.0f - pause after finishing current segment
577  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
578  */
pauseAsync(float thresholdRatio,bool disconnect)579 void PlaylistFetcher::pauseAsync(
580         float thresholdRatio, bool disconnect) {
581     setStoppingThreshold(thresholdRatio, disconnect);
582 
583     (new AMessage(kWhatPause, this))->post();
584 }
585 
stopAsync(bool clear)586 void PlaylistFetcher::stopAsync(bool clear) {
587     setStoppingThreshold(0.0f, true /* disconncect */);
588 
589     sp<AMessage> msg = new AMessage(kWhatStop, this);
590     msg->setInt32("clear", clear);
591     msg->post();
592 }
593 
resumeUntilAsync(const sp<AMessage> & params)594 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
595     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
596 
597     AMessage* msg = new AMessage(kWhatResumeUntil, this);
598     msg->setMessage("params", params);
599     msg->post();
600 }
601 
fetchPlaylistAsync()602 void PlaylistFetcher::fetchPlaylistAsync() {
603     (new AMessage(kWhatFetchPlaylist, this))->post();
604 }
605 
onMessageReceived(const sp<AMessage> & msg)606 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
607     switch (msg->what()) {
608         case kWhatStart:
609         {
610             status_t err = onStart(msg);
611 
612             sp<AMessage> notify = mNotify->dup();
613             notify->setInt32("what", kWhatStarted);
614             notify->setInt32("err", err);
615             notify->post();
616             break;
617         }
618 
619         case kWhatPause:
620         {
621             onPause();
622 
623             sp<AMessage> notify = mNotify->dup();
624             notify->setInt32("what", kWhatPaused);
625             notify->setInt32("seekMode",
626                     mDownloadState->hasSavedState()
627                     ? LiveSession::kSeekModeNextSample
628                     : LiveSession::kSeekModeNextSegment);
629             notify->post();
630             break;
631         }
632 
633         case kWhatStop:
634         {
635             onStop(msg);
636 
637             sp<AMessage> notify = mNotify->dup();
638             notify->setInt32("what", kWhatStopped);
639             notify->post();
640             break;
641         }
642 
643         case kWhatFetchPlaylist:
644         {
645             bool unchanged;
646             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
647                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
648 
649             sp<AMessage> notify = mNotify->dup();
650             notify->setInt32("what", kWhatPlaylistFetched);
651             notify->setObject("playlist", playlist);
652             notify->post();
653             break;
654         }
655 
656         case kWhatMonitorQueue:
657         case kWhatDownloadNext:
658         {
659             int32_t generation;
660             CHECK(msg->findInt32("generation", &generation));
661 
662             if (generation != mMonitorQueueGeneration) {
663                 // Stale event
664                 break;
665             }
666 
667             if (msg->what() == kWhatMonitorQueue) {
668                 onMonitorQueue();
669             } else {
670                 onDownloadNext();
671             }
672             break;
673         }
674 
675         case kWhatResumeUntil:
676         {
677             onResumeUntil(msg);
678             break;
679         }
680 
681         default:
682             TRESPASS();
683     }
684 }
685 
onStart(const sp<AMessage> & msg)686 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
687     mPacketSources.clear();
688     mStopParams.clear();
689     mStartTimeUsNotify = mNotify->dup();
690     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
691     mStartTimeUsNotify->setString("uri", mURI);
692 
693     uint32_t streamTypeMask;
694     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
695 
696     int64_t startTimeUs;
697     int64_t segmentStartTimeUs;
698     int32_t startDiscontinuitySeq;
699     int32_t seekMode;
700     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
701     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
702     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
703     CHECK(msg->findInt32("seekMode", &seekMode));
704 
705     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
706         void *ptr;
707         CHECK(msg->findPointer("audioSource", &ptr));
708 
709         mPacketSources.add(
710                 LiveSession::STREAMTYPE_AUDIO,
711                 static_cast<AnotherPacketSource *>(ptr));
712     }
713 
714     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
715         void *ptr;
716         CHECK(msg->findPointer("videoSource", &ptr));
717 
718         mPacketSources.add(
719                 LiveSession::STREAMTYPE_VIDEO,
720                 static_cast<AnotherPacketSource *>(ptr));
721     }
722 
723     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
724         void *ptr;
725         CHECK(msg->findPointer("subtitleSource", &ptr));
726 
727         mPacketSources.add(
728                 LiveSession::STREAMTYPE_SUBTITLES,
729                 static_cast<AnotherPacketSource *>(ptr));
730     }
731 
732     void *ptr;
733     // metadataSource is not part of streamTypeMask
734     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
735             && msg->findPointer("metadataSource", &ptr)) {
736         mPacketSources.add(
737                 LiveSession::STREAMTYPE_METADATA,
738                 static_cast<AnotherPacketSource *>(ptr));
739     }
740 
741     mStreamTypeMask = streamTypeMask;
742 
743     mSegmentStartTimeUs = segmentStartTimeUs;
744 
745     if (startDiscontinuitySeq >= 0) {
746         mDiscontinuitySeq = startDiscontinuitySeq;
747     }
748 
749     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
750     mSeekMode = (LiveSession::SeekMode) seekMode;
751 
752     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
753         mStartup = true;
754         mIDRFound = false;
755         mVideoBuffer->clear();
756     }
757 
758     if (startTimeUs >= 0) {
759         mStartTimeUs = startTimeUs;
760         mFirstPTSValid = false;
761         mSeqNumber = -1;
762         mTimeChangeSignaled = false;
763         mDownloadState->resetState();
764     }
765 
766     postMonitorQueue();
767 
768     return OK;
769 }
770 
onPause()771 void PlaylistFetcher::onPause() {
772     cancelMonitorQueue();
773     mLastDiscontinuitySeq = mDiscontinuitySeq;
774 
775     resetStoppingThreshold(false /* disconnect */);
776 }
777 
onStop(const sp<AMessage> & msg)778 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
779     cancelMonitorQueue();
780 
781     int32_t clear;
782     CHECK(msg->findInt32("clear", &clear));
783     if (clear) {
784         for (size_t i = 0; i < mPacketSources.size(); i++) {
785             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
786             packetSource->clear();
787         }
788     }
789 
790     mDownloadState->resetState();
791     mPacketSources.clear();
792     mStreamTypeMask = 0;
793 
794     resetStoppingThreshold(true /* disconnect */);
795 }
796 
797 // Resume until we have reached the boundary timestamps listed in `msg`; when
798 // the remaining time is too short (within a resume threshold) stop immediately
799 // instead.
onResumeUntil(const sp<AMessage> & msg)800 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
801     sp<AMessage> params;
802     CHECK(msg->findMessage("params", &params));
803 
804     mStopParams = params;
805     onDownloadNext();
806 
807     return OK;
808 }
809 
notifyStopReached()810 void PlaylistFetcher::notifyStopReached() {
811     sp<AMessage> notify = mNotify->dup();
812     notify->setInt32("what", kWhatStopReached);
813     notify->post();
814 }
815 
notifyError(status_t err)816 void PlaylistFetcher::notifyError(status_t err) {
817     sp<AMessage> notify = mNotify->dup();
818     notify->setInt32("what", kWhatError);
819     notify->setInt32("err", err);
820     notify->post();
821 }
822 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)823 void PlaylistFetcher::queueDiscontinuity(
824         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
825     for (size_t i = 0; i < mPacketSources.size(); ++i) {
826         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
827         // (seek will discard buffer by abandoning old fetchers)
828         mPacketSources.valueAt(i)->queueDiscontinuity(
829                 type, extra, false /* discard */);
830     }
831 }
832 
onMonitorQueue()833 void PlaylistFetcher::onMonitorQueue() {
834     // in the middle of an unfinished download, delay
835     // playlist refresh as it'll change seq numbers
836     if (!mDownloadState->hasSavedState()) {
837         refreshPlaylist();
838     }
839 
840     int64_t targetDurationUs = kMinBufferedDurationUs;
841     if (mPlaylist != NULL) {
842         targetDurationUs = mPlaylist->getTargetDuration();
843     }
844 
845     int64_t bufferedDurationUs = 0ll;
846     status_t finalResult = OK;
847     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
848         sp<AnotherPacketSource> packetSource =
849             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
850 
851         bufferedDurationUs =
852                 packetSource->getBufferedDurationUs(&finalResult);
853     } else {
854         // Use min stream duration, but ignore streams that never have any packet
855         // enqueued to prevent us from waiting on a non-existent stream;
856         // when we cannot make out from the manifest what streams are included in
857         // a playlist we might assume extra streams.
858         bufferedDurationUs = -1ll;
859         for (size_t i = 0; i < mPacketSources.size(); ++i) {
860             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
861                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
862                 continue;
863             }
864 
865             int64_t bufferedStreamDurationUs =
866                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
867 
868             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
869 
870             if (bufferedDurationUs == -1ll
871                  || bufferedStreamDurationUs < bufferedDurationUs) {
872                 bufferedDurationUs = bufferedStreamDurationUs;
873             }
874         }
875         if (bufferedDurationUs == -1ll) {
876             bufferedDurationUs = 0ll;
877         }
878     }
879 
880     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
881         FLOGV("monitoring, buffered=%lld < %lld",
882                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
883 
884         // delay the next download slightly; hopefully this gives other concurrent fetchers
885         // a better chance to run.
886         // onDownloadNext();
887         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
888         msg->setInt32("generation", mMonitorQueueGeneration);
889         msg->post(1000l);
890     } else {
891         // We'd like to maintain buffering above durationToBufferUs, so try
892         // again when buffer just about to go below durationToBufferUs
893         // (or after targetDurationUs / 2, whichever is smaller).
894         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
895         if (delayUs > targetDurationUs / 2) {
896             delayUs = targetDurationUs / 2;
897         }
898 
899         FLOGV("pausing for %lld, buffered=%lld > %lld",
900                 (long long)delayUs,
901                 (long long)bufferedDurationUs,
902                 (long long)kMinBufferedDurationUs);
903 
904         postMonitorQueue(delayUs);
905     }
906 }
907 
refreshPlaylist()908 status_t PlaylistFetcher::refreshPlaylist() {
909     if (delayUsToRefreshPlaylist() <= 0) {
910         bool unchanged;
911         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
912                 mURI.c_str(), mPlaylistHash, &unchanged);
913 
914         if (playlist == NULL) {
915             if (unchanged) {
916                 // We succeeded in fetching the playlist, but it was
917                 // unchanged from the last time we tried.
918 
919                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
920                     mRefreshState = (RefreshState)(mRefreshState + 1);
921                 }
922             } else {
923                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
924                 return ERROR_IO;
925             }
926         } else {
927             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
928             mPlaylist = playlist;
929 
930             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
931                 updateDuration();
932             }
933             // Notify LiveSession to use target-duration based buffering level
934             // for up/down switch. Default LiveSession::kUpSwitchMark may not
935             // be reachable for live streams, as our max buffering amount is
936             // limited to 3 segments.
937             if (!mPlaylist->isComplete()) {
938                 updateTargetDuration();
939             }
940             mPlaylistTimeUs = ALooper::GetNowUs();
941         }
942 
943         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
944     }
945     return OK;
946 }
947 
948 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)949 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
950     return buffer->size() > 0 && buffer->data()[0] == 0x47;
951 }
952 
shouldPauseDownload()953 bool PlaylistFetcher::shouldPauseDownload() {
954     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
955         // doesn't apply to subtitles
956         return false;
957     }
958 
959     // Calculate threshold to abort current download
960     float thresholdRatio = getStoppingThreshold();
961 
962     if (thresholdRatio < 0.0f) {
963         // never abort
964         return false;
965     } else if (thresholdRatio == 0.0f) {
966         // immediately abort
967         return true;
968     }
969 
970     // now we have a positive thresholdUs, abort if remaining
971     // portion to download is over that threshold.
972     if (mSegmentFirstPTS < 0) {
973         // this means we haven't even find the first access unit,
974         // abort now as we must be very far away from the end.
975         return true;
976     }
977     int64_t lastEnqueueUs = mSegmentFirstPTS;
978     for (size_t i = 0; i < mPacketSources.size(); ++i) {
979         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
980             continue;
981         }
982         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
983         int32_t type;
984         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
985             continue;
986         }
987         int64_t tmpUs;
988         CHECK(meta->findInt64("timeUs", &tmpUs));
989         if (tmpUs > lastEnqueueUs) {
990             lastEnqueueUs = tmpUs;
991         }
992     }
993     lastEnqueueUs -= mSegmentFirstPTS;
994 
995     int64_t targetDurationUs = mPlaylist->getTargetDuration();
996     int64_t thresholdUs = thresholdRatio * targetDurationUs;
997 
998     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
999             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1000             (long long)thresholdUs,
1001             (long long)(targetDurationUs - lastEnqueueUs));
1002 
1003     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1004         return true;
1005     }
1006     return false;
1007 }
1008 
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1009 void PlaylistFetcher::initSeqNumberForLiveStream(
1010         int32_t &firstSeqNumberInPlaylist,
1011         int32_t &lastSeqNumberInPlaylist) {
1012     // start at least 3 target durations from the end.
1013     int64_t timeFromEnd = 0;
1014     size_t index = mPlaylist->size();
1015     sp<AMessage> itemMeta;
1016     int64_t itemDurationUs;
1017     int32_t targetDuration;
1018     if (mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1019         do {
1020             --index;
1021             if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1022                     || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1023                 ALOGW("item or itemDurationUs missing");
1024                 mSeqNumber = lastSeqNumberInPlaylist - 3;
1025                 break;
1026             }
1027 
1028             timeFromEnd += itemDurationUs;
1029             mSeqNumber = firstSeqNumberInPlaylist + index;
1030         } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1031     } else {
1032         ALOGW("target-duration missing");
1033         mSeqNumber = lastSeqNumberInPlaylist - 3;
1034     }
1035 
1036     if (mSeqNumber < firstSeqNumberInPlaylist) {
1037         mSeqNumber = firstSeqNumberInPlaylist;
1038     }
1039 }
1040 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1041 bool PlaylistFetcher::initDownloadState(
1042         AString &uri,
1043         sp<AMessage> &itemMeta,
1044         int32_t &firstSeqNumberInPlaylist,
1045         int32_t &lastSeqNumberInPlaylist) {
1046     status_t err = refreshPlaylist();
1047     firstSeqNumberInPlaylist = 0;
1048     lastSeqNumberInPlaylist = 0;
1049     bool discontinuity = false;
1050 
1051     if (mPlaylist != NULL) {
1052         mPlaylist->getSeqNumberRange(
1053                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1054 
1055         if (mDiscontinuitySeq < 0) {
1056             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1057         }
1058     }
1059 
1060     mSegmentFirstPTS = -1ll;
1061 
1062     if (mPlaylist != NULL && mSeqNumber < 0) {
1063         CHECK_GE(mStartTimeUs, 0ll);
1064 
1065         if (mSegmentStartTimeUs < 0) {
1066             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1067                 // this is a live session
1068                 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1069             } else {
1070                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1071                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1072                 // and relative position inside a segment
1073                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1074                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1075             }
1076             mStartTimeUsRelative = true;
1077             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1078                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1079                     lastSeqNumberInPlaylist);
1080         } else {
1081             // When adapting or track switching, mSegmentStartTimeUs (relative
1082             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1083             // timestamps coming from the media container) is used to determine the position
1084             // inside a segments.
1085             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1086                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1087                 // avoid double fetch/decode
1088                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1089                 // for the starting segment in new variant.
1090                 // If the two variants' segments are aligned, this gives the
1091                 // next segment. If they're not aligned, this gives the segment
1092                 // that overlaps no more than 1/2 * targetDurationUs.
1093                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1094                         + mPlaylist->getTargetDuration() / 2);
1095             } else {
1096                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1097             }
1098             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1099             if (mSeqNumber < minSeq) {
1100                 mSeqNumber = minSeq;
1101             }
1102 
1103             if (mSeqNumber < firstSeqNumberInPlaylist) {
1104                 mSeqNumber = firstSeqNumberInPlaylist;
1105             }
1106 
1107             if (mSeqNumber > lastSeqNumberInPlaylist) {
1108                 mSeqNumber = lastSeqNumberInPlaylist;
1109             }
1110             FLOGV("Initial sequence number is %d from (%d .. %d)",
1111                     mSeqNumber, firstSeqNumberInPlaylist,
1112                     lastSeqNumberInPlaylist);
1113         }
1114     }
1115 
1116     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1117     if (mSeqNumber < firstSeqNumberInPlaylist
1118             || mSeqNumber > lastSeqNumberInPlaylist
1119             || err != OK) {
1120         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1121             ++mNumRetries;
1122 
1123             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1124                 // make sure we reach this retry logic on refresh failures
1125                 // by adding an err != OK clause to all enclosing if's.
1126 
1127                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1128                 // playlist's target duration or 3 seconds, whichever is less
1129                 int64_t delayUs = kMaxMonitorDelayUs;
1130                 if (mPlaylist != NULL) {
1131                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1132                             / (1 + mNumRetries);
1133                 }
1134                 if (delayUs > kMaxMonitorDelayUs) {
1135                     delayUs = kMaxMonitorDelayUs;
1136                 }
1137                 FLOGV("sequence number high: %d from (%d .. %d), "
1138                       "monitor in %lld (retry=%d)",
1139                         mSeqNumber, firstSeqNumberInPlaylist,
1140                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1141                 postMonitorQueue(delayUs);
1142                 return false;
1143             }
1144 
1145             if (err != OK) {
1146                 notifyError(err);
1147                 return false;
1148             }
1149 
1150             // we've missed the boat, let's start 3 segments prior to the latest sequence
1151             // number available and signal a discontinuity.
1152 
1153             ALOGI("We've missed the boat, restarting playback."
1154                   "  mStartup=%d, was  looking for %d in %d-%d",
1155                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1156                     lastSeqNumberInPlaylist);
1157             if (mStopParams != NULL) {
1158                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1159                 // but since the segments we are supposed to fetch have already rolled off
1160                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1161                 // skip.
1162                 notifyStopReached();
1163                 return false;
1164             }
1165             mSeqNumber = lastSeqNumberInPlaylist - 3;
1166             if (mSeqNumber < firstSeqNumberInPlaylist) {
1167                 mSeqNumber = firstSeqNumberInPlaylist;
1168             }
1169             discontinuity = true;
1170 
1171             // fall through
1172         } else {
1173             if (mPlaylist != NULL) {
1174                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1175                         && !mPlaylist->isComplete()) {
1176                     // Live playlists
1177                     ALOGW("sequence number %d not yet available", mSeqNumber);
1178                     postMonitorQueue(delayUsToRefreshPlaylist());
1179                     return false;
1180                 }
1181                 ALOGE("Cannot find sequence number %d in playlist "
1182                      "(contains %d - %d)",
1183                      mSeqNumber, firstSeqNumberInPlaylist,
1184                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1185 
1186                 if (mTSParser != NULL) {
1187                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1188                     // Use an empty buffer; we don't have any new data, just want to extract
1189                     // potential new access units after flush.  Reset mSeqNumber to
1190                     // lastSeqNumberInPlaylist such that we set the correct access unit
1191                     // properties in extractAndQueueAccessUnitsFromTs.
1192                     sp<ABuffer> buffer = new ABuffer(0);
1193                     mSeqNumber = lastSeqNumberInPlaylist;
1194                     extractAndQueueAccessUnitsFromTs(buffer);
1195                 }
1196                 notifyError(ERROR_END_OF_STREAM);
1197             } else {
1198                 // It's possible that we were never able to download the playlist.
1199                 // In this case we should notify error, instead of EOS, as EOS during
1200                 // prepare means we succeeded in downloading everything.
1201                 ALOGE("Failed to download playlist!");
1202                 notifyError(ERROR_IO);
1203             }
1204 
1205             return false;
1206         }
1207     }
1208 
1209     mNumRetries = 0;
1210 
1211     CHECK(mPlaylist->itemAt(
1212                 mSeqNumber - firstSeqNumberInPlaylist,
1213                 &uri,
1214                 &itemMeta));
1215 
1216     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1217 
1218     int32_t val;
1219     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1220         discontinuity = true;
1221     } else if (mLastDiscontinuitySeq >= 0
1222             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1223         // Seek jumped to a new discontinuity sequence. We need to signal
1224         // a format change to decoder. Decoder needs to shutdown and be
1225         // created again if seamless format change is unsupported.
1226         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1227                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1228                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1229         discontinuity = true;
1230     }
1231     mLastDiscontinuitySeq = -1;
1232 
1233     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1234     // this avoids interleaved connections to the key and segment file.
1235     {
1236         sp<ABuffer> junk = new ABuffer(16);
1237         junk->setRange(0, 16);
1238         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1239                 true /* first */);
1240         if (err == ERROR_NOT_CONNECTED) {
1241             return false;
1242         } else if (err != OK) {
1243             notifyError(err);
1244             return false;
1245         }
1246     }
1247 
1248     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1249         // We need to signal a time discontinuity to ATSParser on the
1250         // first segment after start, or on a discontinuity segment.
1251         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1252         // to send the time discontinuity.
1253         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1254             // If this was a live event this made no sense since
1255             // we don't have access to all the segment before the current
1256             // one.
1257             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1258         }
1259 
1260         // Setting mTimeChangeSignaled to true, so that if start time
1261         // searching goes into 2nd segment (without a discontinuity),
1262         // we don't reset time again. It causes corruption when pending
1263         // data in ATSParser is cleared.
1264         mTimeChangeSignaled = true;
1265     }
1266 
1267     if (discontinuity) {
1268         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1269 
1270         // Signal a format discontinuity to ATSParser to clear partial data
1271         // from previous streams. Not doing this causes bitstream corruption.
1272         if (mTSParser != NULL) {
1273             mTSParser.clear();
1274         }
1275 
1276         queueDiscontinuity(
1277                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1278                 NULL /* extra */);
1279 
1280         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1281             // This means we guessed mStartTimeUs to be in the previous
1282             // segment (likely very close to the end), but either video or
1283             // audio has not found start by the end of that segment.
1284             //
1285             // If this new segment is not a discontinuity, keep searching.
1286             //
1287             // If this new segment even got a discontinuity marker, just
1288             // set mStartTimeUs=0, and take all samples from now on.
1289             mStartTimeUs = 0;
1290             mFirstPTSValid = false;
1291             mIDRFound = false;
1292             mVideoBuffer->clear();
1293         }
1294     }
1295 
1296     FLOGV("fetching segment %d from (%d .. %d)",
1297             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1298     return true;
1299 }
1300 
onDownloadNext()1301 void PlaylistFetcher::onDownloadNext() {
1302     AString uri;
1303     sp<AMessage> itemMeta;
1304     sp<ABuffer> buffer;
1305     sp<ABuffer> tsBuffer;
1306     int32_t firstSeqNumberInPlaylist = 0;
1307     int32_t lastSeqNumberInPlaylist = 0;
1308     bool connectHTTP = true;
1309 
1310     if (mDownloadState->hasSavedState()) {
1311         mDownloadState->restoreState(
1312                 uri,
1313                 itemMeta,
1314                 buffer,
1315                 tsBuffer,
1316                 firstSeqNumberInPlaylist,
1317                 lastSeqNumberInPlaylist);
1318         connectHTTP = false;
1319         FLOGV("resuming: '%s'", uri.c_str());
1320     } else {
1321         if (!initDownloadState(
1322                 uri,
1323                 itemMeta,
1324                 firstSeqNumberInPlaylist,
1325                 lastSeqNumberInPlaylist)) {
1326             return;
1327         }
1328         FLOGV("fetching: '%s'", uri.c_str());
1329     }
1330 
1331     int64_t range_offset, range_length;
1332     if (!itemMeta->findInt64("range-offset", &range_offset)
1333             || !itemMeta->findInt64("range-length", &range_length)) {
1334         range_offset = 0;
1335         range_length = -1;
1336     }
1337 
1338     // block-wise download
1339     bool shouldPause = false;
1340     ssize_t bytesRead;
1341     do {
1342         int64_t startUs = ALooper::GetNowUs();
1343         bytesRead = mHTTPDownloader->fetchBlock(
1344                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1345                 NULL /* actualURL */, connectHTTP);
1346         int64_t delayUs = ALooper::GetNowUs() - startUs;
1347 
1348         if (bytesRead == ERROR_NOT_CONNECTED) {
1349             return;
1350         }
1351         if (bytesRead < 0) {
1352             status_t err = bytesRead;
1353             ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1354             notifyError(err);
1355             return;
1356         }
1357 
1358         // add sample for bandwidth estimation, excluding samples from subtitles (as
1359         // its too small), or during startup/resumeUntil (when we could have more than
1360         // one connection open which affects bandwidth)
1361         if (!mStartup && mStopParams == NULL && bytesRead > 0
1362                 && (mStreamTypeMask
1363                         & (LiveSession::STREAMTYPE_AUDIO
1364                         | LiveSession::STREAMTYPE_VIDEO))) {
1365             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1366             if (delayUs > 2000000ll) {
1367                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1368                         bytesRead, (double)delayUs / 1.0e6);
1369             }
1370         }
1371 
1372         connectHTTP = false;
1373 
1374         CHECK(buffer != NULL);
1375 
1376         size_t size = buffer->size();
1377         // Set decryption range.
1378         buffer->setRange(size - bytesRead, bytesRead);
1379         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1380                 buffer->offset() == 0 /* first */);
1381         // Unset decryption range.
1382         buffer->setRange(0, size);
1383 
1384         if (err != OK) {
1385             ALOGE("decryptBuffer failed w/ error %d", err);
1386 
1387             notifyError(err);
1388             return;
1389         }
1390 
1391         bool startUp = mStartup; // save current start up state
1392 
1393         err = OK;
1394         if (bufferStartsWithTsSyncByte(buffer)) {
1395             // Incremental extraction is only supported for MPEG2 transport streams.
1396             if (tsBuffer == NULL) {
1397                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1398                 tsBuffer->setRange(0, 0);
1399             } else if (tsBuffer->capacity() != buffer->capacity()) {
1400                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1401                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1402                 tsBuffer->setRange(tsOff, tsSize);
1403             }
1404             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1405             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1406         }
1407 
1408         if (err == -EAGAIN) {
1409             // starting sequence number too low/high
1410             mTSParser.clear();
1411             for (size_t i = 0; i < mPacketSources.size(); i++) {
1412                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1413                 packetSource->clear();
1414             }
1415             postMonitorQueue();
1416             return;
1417         } else if (err == ERROR_OUT_OF_RANGE) {
1418             // reached stopping point
1419             notifyStopReached();
1420             return;
1421         } else if (err != OK) {
1422             notifyError(err);
1423             return;
1424         }
1425         // If we're switching, post start notification
1426         // this should only be posted when the last chunk is full processed by TSParser
1427         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1428             CHECK(mStartTimeUsNotify != NULL);
1429             mStartTimeUsNotify->post();
1430             mStartTimeUsNotify.clear();
1431             shouldPause = true;
1432         }
1433         if (shouldPause || shouldPauseDownload()) {
1434             // save state and return if this is not the last chunk,
1435             // leaving the fetcher in paused state.
1436             if (bytesRead != 0) {
1437                 mDownloadState->saveState(
1438                         uri,
1439                         itemMeta,
1440                         buffer,
1441                         tsBuffer,
1442                         firstSeqNumberInPlaylist,
1443                         lastSeqNumberInPlaylist);
1444                 return;
1445             }
1446             shouldPause = true;
1447         }
1448     } while (bytesRead != 0);
1449 
1450     if (bufferStartsWithTsSyncByte(buffer)) {
1451         // If we don't see a stream in the program table after fetching a full ts segment
1452         // mark it as nonexistent.
1453         ATSParser::SourceType srcTypes[] =
1454                 { ATSParser::VIDEO, ATSParser::AUDIO };
1455         LiveSession::StreamType streamTypes[] =
1456                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1457         const size_t kNumTypes = NELEM(srcTypes);
1458 
1459         for (size_t i = 0; i < kNumTypes; i++) {
1460             ATSParser::SourceType srcType = srcTypes[i];
1461             LiveSession::StreamType streamType = streamTypes[i];
1462 
1463             sp<AnotherPacketSource> source =
1464                 static_cast<AnotherPacketSource *>(
1465                     mTSParser->getSource(srcType).get());
1466 
1467             if (!mTSParser->hasSource(srcType)) {
1468                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1469                       srcType == ATSParser::VIDEO ? "video" : "audio");
1470 
1471                 mStreamTypeMask &= ~streamType;
1472                 mPacketSources.removeItem(streamType);
1473             }
1474         }
1475 
1476     }
1477 
1478     if (checkDecryptPadding(buffer) != OK) {
1479         ALOGE("Incorrect padding bytes after decryption.");
1480         notifyError(ERROR_MALFORMED);
1481         return;
1482     }
1483 
1484     if (tsBuffer != NULL) {
1485         AString method;
1486         CHECK(buffer->meta()->findString("cipher-method", &method));
1487         if ((tsBuffer->size() > 0 && method == "NONE")
1488                 || tsBuffer->size() > 16) {
1489             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1490                     "bytes in length.");
1491             notifyError(ERROR_MALFORMED);
1492             return;
1493         }
1494     }
1495 
1496     // bulk extract non-ts files
1497     bool startUp = mStartup;
1498     if (tsBuffer == NULL) {
1499         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1500         if (err == -EAGAIN) {
1501             // starting sequence number too low/high
1502             postMonitorQueue();
1503             return;
1504         } else if (err == ERROR_OUT_OF_RANGE) {
1505             // reached stopping point
1506             notifyStopReached();
1507             return;
1508         } else if (err != OK) {
1509             notifyError(err);
1510             return;
1511         }
1512     }
1513 
1514     ++mSeqNumber;
1515 
1516     // if adapting, pause after found the next starting point
1517     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1518         CHECK(mStartTimeUsNotify != NULL);
1519         mStartTimeUsNotify->post();
1520         mStartTimeUsNotify.clear();
1521         shouldPause = true;
1522     }
1523 
1524     if (!shouldPause) {
1525         postMonitorQueue();
1526     }
1527 }
1528 
1529 /*
1530  * returns true if we need to adjust mSeqNumber
1531  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1532 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1533     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1534 
1535     int64_t minDiffUs, maxDiffUs;
1536     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1537         // if the previous fetcher paused in the middle of a segment, we
1538         // want to start at a segment that overlaps the last sample
1539         minDiffUs = -mPlaylist->getTargetDuration();
1540         maxDiffUs = 0ll;
1541     } else {
1542         // if the previous fetcher paused at the end of a segment, ideally
1543         // we want to start at the segment that's roughly aligned with its
1544         // next segment, but if the two variants are not well aligned we
1545         // adjust the diff to within (-T/2, T/2)
1546         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1547         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1548     }
1549 
1550     int32_t oldSeqNumber = mSeqNumber;
1551     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1552 
1553     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1554     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1555     if (diffUs > maxDiffUs) {
1556         while (index > 0 && diffUs > maxDiffUs) {
1557             --index;
1558 
1559             sp<AMessage> itemMeta;
1560             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1561 
1562             int64_t itemDurationUs;
1563             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1564 
1565             diffUs -= itemDurationUs;
1566         }
1567     } else if (diffUs < minDiffUs) {
1568         while (index + 1 < (ssize_t) mPlaylist->size()
1569                 && diffUs < minDiffUs) {
1570             ++index;
1571 
1572             sp<AMessage> itemMeta;
1573             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1574 
1575             int64_t itemDurationUs;
1576             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1577 
1578             diffUs += itemDurationUs;
1579         }
1580     }
1581 
1582     mSeqNumber = firstSeqNumberInPlaylist + index;
1583 
1584     if (mSeqNumber != oldSeqNumber) {
1585         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1586                 (long long) anchorTimeUs - mStartTimeUs,
1587                 (long long) minDiffUs,
1588                 (long long) maxDiffUs);
1589         return true;
1590     }
1591     return false;
1592 }
1593 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1594 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1595     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1596 
1597     size_t index = 0;
1598     while (index < mPlaylist->size()) {
1599         sp<AMessage> itemMeta;
1600         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1601         size_t curDiscontinuitySeq;
1602         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1603         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1604         if (curDiscontinuitySeq == discontinuitySeq) {
1605             return seqNumber;
1606         } else if (curDiscontinuitySeq > discontinuitySeq) {
1607             return seqNumber <= 0 ? 0 : seqNumber - 1;
1608         }
1609 
1610         ++index;
1611     }
1612 
1613     return firstSeqNumberInPlaylist + mPlaylist->size();
1614 }
1615 
getSeqNumberForTime(int64_t timeUs) const1616 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1617     size_t index = 0;
1618     int64_t segmentStartUs = 0;
1619     while (index < mPlaylist->size()) {
1620         sp<AMessage> itemMeta;
1621         CHECK(mPlaylist->itemAt(
1622                     index, NULL /* uri */, &itemMeta));
1623 
1624         int64_t itemDurationUs;
1625         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1626 
1627         if (timeUs < segmentStartUs + itemDurationUs) {
1628             break;
1629         }
1630 
1631         segmentStartUs += itemDurationUs;
1632         ++index;
1633     }
1634 
1635     if (index >= mPlaylist->size()) {
1636         index = mPlaylist->size() - 1;
1637     }
1638 
1639     return mPlaylist->getFirstSeqNumber() + index;
1640 }
1641 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1642 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1643         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1644     sp<MetaData> format = source->getFormat();
1645     if (format != NULL) {
1646         // for simplicity, store a reference to the format in each unit
1647         accessUnit->meta()->setObject("format", format);
1648     }
1649 
1650     if (discard) {
1651         accessUnit->meta()->setInt32("discard", discard);
1652     }
1653 
1654     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1655     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1656     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1657     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1658     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1659         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1660     }
1661     return accessUnit;
1662 }
1663 
isStartTimeReached(int64_t timeUs)1664 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1665     if (!mFirstPTSValid) {
1666         mFirstTimeUs = timeUs;
1667         mFirstPTSValid = true;
1668     }
1669     bool startTimeReached = true;
1670     if (mStartTimeUsRelative) {
1671         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1672                 (long long)timeUs,
1673                 (long long)mFirstTimeUs,
1674                 (long long)(timeUs - mFirstTimeUs));
1675         timeUs -= mFirstTimeUs;
1676         if (timeUs < 0) {
1677             FLOGV("clamp negative timeUs to 0");
1678             timeUs = 0;
1679         }
1680         startTimeReached = (timeUs >= mStartTimeUs);
1681     }
1682     return startTimeReached;
1683 }
1684 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1685 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1686     if (mTSParser == NULL) {
1687         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1688         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1689     }
1690 
1691     if (mNextPTSTimeUs >= 0ll) {
1692         sp<AMessage> extra = new AMessage;
1693         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1694         // ATSParser from skewing the timestamps of access units.
1695         extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1696 
1697         // When adapting, signal a recent media time to the parser,
1698         // so that PTS wrap around is handled for the new variant.
1699         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1700             extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1701         }
1702 
1703         mTSParser->signalDiscontinuity(
1704                 ATSParser::DISCONTINUITY_TIME, extra);
1705 
1706         mNextPTSTimeUs = -1ll;
1707     }
1708 
1709     if (mSampleAesKeyItemChanged) {
1710         mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1711         mSampleAesKeyItemChanged = false;
1712     }
1713 
1714     size_t offset = 0;
1715     while (offset + 188 <= buffer->size()) {
1716         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1717 
1718         if (err != OK) {
1719             return err;
1720         }
1721 
1722         offset += 188;
1723     }
1724     // setRange to indicate consumed bytes.
1725     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1726 
1727     if (mSegmentFirstPTS < 0ll) {
1728         // get the smallest first PTS from all streams present in this parser
1729         for (size_t i = mPacketSources.size(); i > 0;) {
1730             i--;
1731             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1732             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1733                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1734                 return ERROR_MALFORMED;
1735             }
1736             if (stream == LiveSession::STREAMTYPE_METADATA) {
1737                 continue;
1738             }
1739             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1740             sp<AnotherPacketSource> source =
1741                 static_cast<AnotherPacketSource *>(
1742                         mTSParser->getSource(type).get());
1743 
1744             if (source == NULL) {
1745                 continue;
1746             }
1747             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1748             if (meta != NULL) {
1749                 int64_t timeUs;
1750                 CHECK(meta->findInt64("timeUs", &timeUs));
1751                 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1752                     mSegmentFirstPTS = timeUs;
1753                 }
1754             }
1755         }
1756         if (mSegmentFirstPTS < 0ll) {
1757             // didn't find any TS packet, can return early
1758             return OK;
1759         }
1760         if (!mStartTimeUsRelative) {
1761             // mStartup
1762             //   mStartup is true until we have queued a packet for all the streams
1763             //   we are fetching. We queue packets whose timestamps are greater than
1764             //   mStartTimeUs.
1765             // mSegmentStartTimeUs >= 0
1766             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1767             // adjustSeqNumberWithAnchorTime(timeUs) == true
1768             //   we guessed a seq number that's either too large or too small.
1769             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1770             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1771             // to -1 so that we don't enter this chunk next time.
1772             if (mStartup && mSegmentStartTimeUs >= 0
1773                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1774                 mStartTimeUsNotify = mNotify->dup();
1775                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1776                 mStartTimeUsNotify->setString("uri", mURI);
1777                 mIDRFound = false;
1778                 mSegmentStartTimeUs = -1;
1779                 return -EAGAIN;
1780             }
1781         }
1782     }
1783 
1784     status_t err = OK;
1785     for (size_t i = mPacketSources.size(); i > 0;) {
1786         i--;
1787         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1788 
1789         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1790         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1791             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1792             return ERROR_MALFORMED;
1793         }
1794 
1795         const char *key = LiveSession::getKeyForStream(stream);
1796         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1797 
1798         sp<AnotherPacketSource> source =
1799             static_cast<AnotherPacketSource *>(
1800                     mTSParser->getSource(type).get());
1801 
1802         if (source == NULL) {
1803             continue;
1804         }
1805 
1806         const char *mime;
1807         sp<MetaData> format  = source->getFormat();
1808         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1809                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1810 
1811         sp<ABuffer> accessUnit;
1812         status_t finalResult;
1813         while (source->hasBufferAvailable(&finalResult)
1814                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1815 
1816             int64_t timeUs;
1817             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1818 
1819             if (mStartup) {
1820                 bool startTimeReached = isStartTimeReached(timeUs);
1821 
1822                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1823                     // buffer up to the closest preceding IDR frame in the next segement,
1824                     // or the closest succeeding IDR frame after the exact position
1825                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1826                             (long long)timeUs,
1827                             (long long)mStartTimeUs,
1828                             (long long)timeUs - mStartTimeUs,
1829                             mIDRFound);
1830                     if (isAvc) {
1831                         if (IsIDR(accessUnit)) {
1832                             mVideoBuffer->clear();
1833                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1834                             mIDRFound = true;
1835                         }
1836                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1837                             mVideoBuffer->queueAccessUnit(accessUnit);
1838                             FSLOGV(stream, "saving AVC video AccessUnit");
1839                         }
1840                     }
1841                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1842                         continue;
1843                     }
1844                 }
1845             }
1846 
1847             if (mStartTimeUsNotify != NULL) {
1848                 uint32_t streamMask = 0;
1849                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1850                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1851                         && !(streamMask & mPacketSources.keyAt(i))) {
1852                     streamMask |= mPacketSources.keyAt(i);
1853                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1854                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1855                             (long long)timeUs, streamMask);
1856 
1857                     if (streamMask == mStreamTypeMask) {
1858                         FLOGV("found start point for all streams");
1859                         mStartup = false;
1860                     }
1861                 }
1862             }
1863 
1864             if (mStopParams != NULL) {
1865                 int32_t discontinuitySeq;
1866                 int64_t stopTimeUs;
1867                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1868                         || discontinuitySeq > mDiscontinuitySeq
1869                         || !mStopParams->findInt64(key, &stopTimeUs)
1870                         || (discontinuitySeq == mDiscontinuitySeq
1871                                 && timeUs >= stopTimeUs)) {
1872                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1873                     mStreamTypeMask &= ~stream;
1874                     mPacketSources.removeItemsAt(i);
1875                     break;
1876                 }
1877             }
1878 
1879             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1880                 const bool discard = true;
1881                 status_t status;
1882                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1883                     sp<ABuffer> videoBuffer;
1884                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1885                     setAccessUnitProperties(videoBuffer, source, discard);
1886                     packetSource->queueAccessUnit(videoBuffer);
1887                     int64_t bufferTimeUs;
1888                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1889                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1890                             (long long)bufferTimeUs);
1891                 }
1892             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1893                 mHasMetadata = true;
1894                 sp<AMessage> notify = mNotify->dup();
1895                 notify->setInt32("what", kWhatMetadataDetected);
1896                 notify->post();
1897             }
1898 
1899             setAccessUnitProperties(accessUnit, source);
1900             packetSource->queueAccessUnit(accessUnit);
1901             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1902         }
1903 
1904         if (err != OK) {
1905             break;
1906         }
1907     }
1908 
1909     if (err != OK) {
1910         for (size_t i = mPacketSources.size(); i > 0;) {
1911             i--;
1912             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1913             packetSource->clear();
1914         }
1915         return err;
1916     }
1917 
1918     if (!mStreamTypeMask) {
1919         // Signal gap is filled between original and new stream.
1920         FLOGV("reached stop point for all streams");
1921         return ERROR_OUT_OF_RANGE;
1922     }
1923 
1924     return OK;
1925 }
1926 
1927 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1928 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1929         const sp<ABuffer> &buffer) {
1930     size_t pos = 0;
1931 
1932     // skip possible BOM
1933     if (buffer->size() >= pos + 3 &&
1934             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1935         pos += 3;
1936     }
1937 
1938     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1939     if (buffer->size() < pos + 6 ||
1940             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1941         return false;
1942     }
1943     pos += 6;
1944 
1945     if (buffer->size() == pos) {
1946         return true;
1947     }
1948 
1949     uint8_t sep = buffer->data()[pos];
1950     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1951 }
1952 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1953 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1954         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1955     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1956         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1957             ALOGE("This stream only contains subtitles.");
1958             return ERROR_MALFORMED;
1959         }
1960 
1961         const sp<AnotherPacketSource> packetSource =
1962             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1963 
1964         int64_t durationUs;
1965         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1966         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1967         buffer->meta()->setInt64("durationUs", durationUs);
1968         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1969         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1970         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1971         packetSource->queueAccessUnit(buffer);
1972         return OK;
1973     }
1974 
1975     if (mNextPTSTimeUs >= 0ll) {
1976         mNextPTSTimeUs = -1ll;
1977     }
1978 
1979     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1980     // stream prefixed by an ID3 tag.
1981 
1982     bool firstID3Tag = true;
1983     uint64_t PTS = 0;
1984 
1985     for (;;) {
1986         // Make sure to skip all ID3 tags preceding the audio data.
1987         // At least one must be present to provide the PTS timestamp.
1988 
1989         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1990         if (!id3.isValid()) {
1991             if (firstID3Tag) {
1992                 ALOGE("Unable to parse ID3 tag.");
1993                 return ERROR_MALFORMED;
1994             } else {
1995                 break;
1996             }
1997         }
1998 
1999         if (firstID3Tag) {
2000             bool found = false;
2001 
2002             ID3::Iterator it(id3, "PRIV");
2003             while (!it.done()) {
2004                 size_t length;
2005                 const uint8_t *data = it.getData(&length);
2006                 if (!data) {
2007                     return ERROR_MALFORMED;
2008                 }
2009 
2010                 static const char *kMatchName =
2011                     "com.apple.streaming.transportStreamTimestamp";
2012                 static const size_t kMatchNameLen = strlen(kMatchName);
2013 
2014                 if (length == kMatchNameLen + 1 + 8
2015                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2016                     found = true;
2017                     PTS = U64_AT(&data[kMatchNameLen + 1]);
2018                 }
2019 
2020                 it.next();
2021             }
2022 
2023             if (!found) {
2024                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2025                 return ERROR_MALFORMED;
2026             }
2027         }
2028 
2029         // skip the ID3 tag
2030         buffer->setRange(
2031                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2032 
2033         firstID3Tag = false;
2034     }
2035 
2036     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2037         ALOGW("This stream only contains audio data!");
2038 
2039         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2040 
2041         if (mStreamTypeMask == 0) {
2042             return OK;
2043         }
2044     }
2045 
2046     sp<AnotherPacketSource> packetSource =
2047         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2048 
2049     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2050         ABitReader bits(buffer->data(), buffer->size());
2051 
2052         // adts_fixed_header
2053 
2054         CHECK_EQ(bits.getBits(12), 0xfffu);
2055         bits.skipBits(3);  // ID, layer
2056         bool protection_absent __unused = bits.getBits(1) != 0;
2057 
2058         unsigned profile = bits.getBits(2);
2059         CHECK_NE(profile, 3u);
2060         unsigned sampling_freq_index = bits.getBits(4);
2061         bits.getBits(1);  // private_bit
2062         unsigned channel_configuration = bits.getBits(3);
2063         CHECK_NE(channel_configuration, 0u);
2064         bits.skipBits(2);  // original_copy, home
2065 
2066         sp<MetaData> meta = MakeAACCodecSpecificData(
2067                 profile, sampling_freq_index, channel_configuration);
2068 
2069         meta->setInt32(kKeyIsADTS, true);
2070 
2071         packetSource->setFormat(meta);
2072     }
2073 
2074     int64_t numSamples = 0ll;
2075     int32_t sampleRate;
2076     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2077 
2078     int64_t timeUs = (PTS * 100ll) / 9ll;
2079     if (mStartup && !mFirstPTSValid) {
2080         mFirstPTSValid = true;
2081         mFirstTimeUs = timeUs;
2082     }
2083 
2084     if (mSegmentFirstPTS < 0ll) {
2085         mSegmentFirstPTS = timeUs;
2086         if (!mStartTimeUsRelative) {
2087             // Duplicated logic from how we handle .ts playlists.
2088             if (mStartup && mSegmentStartTimeUs >= 0
2089                     && adjustSeqNumberWithAnchorTime(timeUs)) {
2090                 mSegmentStartTimeUs = -1;
2091                 return -EAGAIN;
2092             }
2093         }
2094     }
2095 
2096     sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2097     if (mSampleAesKeyItem != NULL) {
2098         ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s  IV: %s",
2099                 mSeqNumber,
2100                 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2101                 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2102 
2103         sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2104     }
2105 
2106     int frameId = 0;
2107 
2108     size_t offset = 0;
2109     while (offset < buffer->size()) {
2110         const uint8_t *adtsHeader = buffer->data() + offset;
2111         CHECK_LT(offset + 5, buffer->size());
2112         // non-const pointer for decryption if needed
2113         uint8_t *adtsFrame = buffer->data() + offset;
2114 
2115         unsigned aac_frame_length =
2116             ((adtsHeader[3] & 3) << 11)
2117             | (adtsHeader[4] << 3)
2118             | (adtsHeader[5] >> 5);
2119 
2120         if (aac_frame_length == 0) {
2121             const uint8_t *id3Header = adtsHeader;
2122             if (!memcmp(id3Header, "ID3", 3)) {
2123                 ID3 id3(id3Header, buffer->size() - offset, true);
2124                 if (id3.isValid()) {
2125                     offset += id3.rawSize();
2126                     continue;
2127                 };
2128             }
2129             return ERROR_MALFORMED;
2130         }
2131 
2132         CHECK_LE(offset + aac_frame_length, buffer->size());
2133 
2134         int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2135         offset += aac_frame_length;
2136 
2137         // Each AAC frame encodes 1024 samples.
2138         numSamples += 1024;
2139 
2140         if (mStartup) {
2141             int64_t startTimeUs = unitTimeUs;
2142             if (mStartTimeUsRelative) {
2143                 startTimeUs -= mFirstTimeUs;
2144                 if (startTimeUs  < 0) {
2145                     startTimeUs = 0;
2146                 }
2147             }
2148             if (startTimeUs < mStartTimeUs) {
2149                 continue;
2150             }
2151 
2152             if (mStartTimeUsNotify != NULL) {
2153                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2154                 mStartup = false;
2155             }
2156         }
2157 
2158         if (mStopParams != NULL) {
2159             int32_t discontinuitySeq;
2160             int64_t stopTimeUs;
2161             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2162                     || discontinuitySeq > mDiscontinuitySeq
2163                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2164                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2165                 mStreamTypeMask = 0;
2166                 mPacketSources.clear();
2167                 return ERROR_OUT_OF_RANGE;
2168             }
2169         }
2170 
2171         if (sampleDecryptor != NULL) {
2172             bool protection_absent = (adtsHeader[1] & 0x1);
2173             size_t headerSize = protection_absent ? 7 : 9;
2174             if (frameId == 0) {
2175                 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2176                         mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2177             }
2178 
2179             sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2180         }
2181         frameId++;
2182 
2183         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2184         memcpy(unit->data(), adtsHeader, aac_frame_length);
2185 
2186         unit->meta()->setInt64("timeUs", unitTimeUs);
2187         setAccessUnitProperties(unit, packetSource);
2188         packetSource->queueAccessUnit(unit);
2189     }
2190 
2191     return OK;
2192 }
2193 
updateDuration()2194 void PlaylistFetcher::updateDuration() {
2195     int64_t durationUs = 0ll;
2196     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2197         sp<AMessage> itemMeta;
2198         CHECK(mPlaylist->itemAt(
2199                     index, NULL /* uri */, &itemMeta));
2200 
2201         int64_t itemDurationUs;
2202         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2203 
2204         durationUs += itemDurationUs;
2205     }
2206 
2207     sp<AMessage> msg = mNotify->dup();
2208     msg->setInt32("what", kWhatDurationUpdate);
2209     msg->setInt64("durationUs", durationUs);
2210     msg->post();
2211 }
2212 
updateTargetDuration()2213 void PlaylistFetcher::updateTargetDuration() {
2214     sp<AMessage> msg = mNotify->dup();
2215     msg->setInt32("what", kWhatTargetDurationUpdate);
2216     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2217     msg->post();
2218 }
2219 
2220 }  // namespace android
2221