1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21 
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/ID3.h"
27 #include "mpeg2ts/AnotherPacketSource.h"
28 #include "mpeg2ts/HlsSampleDecryptor.h"
29 
30 #include <media/stagefright/foundation/ABitReader.h>
31 #include <media/stagefright/foundation/ABuffer.h>
32 #include <media/stagefright/foundation/ADebug.h>
33 #include <media/stagefright/foundation/ByteUtils.h>
34 #include <media/stagefright/foundation/MediaKeys.h>
35 #include <media/stagefright/foundation/avc_utils.h>
36 #include <media/stagefright/DataURISource.h>
37 #include <media/stagefright/MediaDefs.h>
38 #include <media/stagefright/MetaData.h>
39 #include <media/stagefright/MetaDataUtils.h>
40 #include <media/stagefright/Utils.h>
41 
42 #include <ctype.h>
43 #include <inttypes.h>
44 
45 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
46 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
47          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
48 
49 namespace android {
50 
51 // static
52 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
53 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
54 // LCM of 188 (size of a TS packet) & 1k works well
55 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
56 
57 struct PlaylistFetcher::DownloadState : public RefBase {
58     DownloadState();
59     void resetState();
60     bool hasSavedState() const;
61     void restoreState(
62             AString &uri,
63             sp<AMessage> &itemMeta,
64             sp<ABuffer> &buffer,
65             sp<ABuffer> &tsBuffer,
66             int32_t &firstSeqNumberInPlaylist,
67             int32_t &lastSeqNumberInPlaylist);
68     void saveState(
69             AString &uri,
70             sp<AMessage> &itemMeta,
71             sp<ABuffer> &buffer,
72             sp<ABuffer> &tsBuffer,
73             int32_t &firstSeqNumberInPlaylist,
74             int32_t &lastSeqNumberInPlaylist);
75 
76 private:
77     bool mHasSavedState;
78     AString mUri;
79     sp<AMessage> mItemMeta;
80     sp<ABuffer> mBuffer;
81     sp<ABuffer> mTsBuffer;
82     int32_t mFirstSeqNumberInPlaylist;
83     int32_t mLastSeqNumberInPlaylist;
84 };
85 
DownloadState()86 PlaylistFetcher::DownloadState::DownloadState() {
87     resetState();
88 }
89 
hasSavedState() const90 bool PlaylistFetcher::DownloadState::hasSavedState() const {
91     return mHasSavedState;
92 }
93 
resetState()94 void PlaylistFetcher::DownloadState::resetState() {
95     mHasSavedState = false;
96 
97     mUri.clear();
98     mItemMeta = NULL;
99     mBuffer = NULL;
100     mTsBuffer = NULL;
101     mFirstSeqNumberInPlaylist = 0;
102     mLastSeqNumberInPlaylist = 0;
103 }
104 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)105 void PlaylistFetcher::DownloadState::restoreState(
106         AString &uri,
107         sp<AMessage> &itemMeta,
108         sp<ABuffer> &buffer,
109         sp<ABuffer> &tsBuffer,
110         int32_t &firstSeqNumberInPlaylist,
111         int32_t &lastSeqNumberInPlaylist) {
112     if (!mHasSavedState) {
113         return;
114     }
115 
116     uri = mUri;
117     itemMeta = mItemMeta;
118     buffer = mBuffer;
119     tsBuffer = mTsBuffer;
120     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
121     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
122 
123     resetState();
124 }
125 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)126 void PlaylistFetcher::DownloadState::saveState(
127         AString &uri,
128         sp<AMessage> &itemMeta,
129         sp<ABuffer> &buffer,
130         sp<ABuffer> &tsBuffer,
131         int32_t &firstSeqNumberInPlaylist,
132         int32_t &lastSeqNumberInPlaylist) {
133     mHasSavedState = true;
134 
135     mUri = uri;
136     mItemMeta = itemMeta;
137     mBuffer = buffer;
138     mTsBuffer = tsBuffer;
139     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
140     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
141 }
142 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)143 PlaylistFetcher::PlaylistFetcher(
144         const sp<AMessage> &notify,
145         const sp<LiveSession> &session,
146         const char *uri,
147         int32_t id,
148         int32_t subtitleGeneration)
149     : mNotify(notify),
150       mSession(session),
151       mURI(uri),
152       mFetcherID(id),
153       mStreamTypeMask(0),
154       mStartTimeUs(-1ll),
155       mSegmentStartTimeUs(-1ll),
156       mDiscontinuitySeq(-1ll),
157       mStartTimeUsRelative(false),
158       mLastPlaylistFetchTimeUs(-1ll),
159       mPlaylistTimeUs(-1ll),
160       mSeqNumber(-1),
161       mNumRetries(0),
162       mStartup(true),
163       mIDRFound(false),
164       mSeekMode(LiveSession::kSeekModeExactPosition),
165       mTimeChangeSignaled(false),
166       mNextPTSTimeUs(-1ll),
167       mMonitorQueueGeneration(0),
168       mSubtitleGeneration(subtitleGeneration),
169       mLastDiscontinuitySeq(-1ll),
170       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
171       mFirstPTSValid(false),
172       mFirstTimeUs(-1ll),
173       mVideoBuffer(new AnotherPacketSource(NULL)),
174       mSampleAesKeyItemChanged(false),
175       mThresholdRatio(-1.0f),
176       mDownloadState(new DownloadState()),
177       mHasMetadata(false) {
178     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
179     mHTTPDownloader = mSession->getHTTPDownloader();
180 
181     memset(mKeyData, 0, sizeof(mKeyData));
182     memset(mAESInitVec, 0, sizeof(mAESInitVec));
183 }
184 
~PlaylistFetcher()185 PlaylistFetcher::~PlaylistFetcher() {
186 }
187 
getFetcherID() const188 int32_t PlaylistFetcher::getFetcherID() const {
189     return mFetcherID;
190 }
191 
getSegmentStartTimeUs(int32_t seqNumber) const192 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
193     CHECK(mPlaylist != NULL);
194 
195     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
196     mPlaylist->getSeqNumberRange(
197             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
198 
199     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
200     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
201 
202     int64_t segmentStartUs = 0ll;
203     for (int32_t index = 0;
204             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
205         sp<AMessage> itemMeta;
206         CHECK(mPlaylist->itemAt(
207                     index, NULL /* uri */, &itemMeta));
208 
209         int64_t itemDurationUs;
210         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
211 
212         segmentStartUs += itemDurationUs;
213     }
214 
215     return segmentStartUs;
216 }
217 
getSegmentDurationUs(int32_t seqNumber) const218 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
219     CHECK(mPlaylist != NULL);
220 
221     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
222     mPlaylist->getSeqNumberRange(
223             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
224 
225     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
226     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
227 
228     int32_t index = seqNumber - firstSeqNumberInPlaylist;
229     sp<AMessage> itemMeta;
230     CHECK(mPlaylist->itemAt(
231                 index, NULL /* uri */, &itemMeta));
232 
233     int64_t itemDurationUs;
234     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
235 
236     return itemDurationUs;
237 }
238 
delayUsToRefreshPlaylist() const239 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
240     int64_t nowUs = ALooper::GetNowUs();
241 
242     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
243         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
244         return 0ll;
245     }
246 
247     if (mPlaylist->isComplete()) {
248         return (~0llu >> 1);
249     }
250 
251     int64_t targetDurationUs = mPlaylist->getTargetDuration();
252 
253     int64_t minPlaylistAgeUs;
254 
255     switch (mRefreshState) {
256         case INITIAL_MINIMUM_RELOAD_DELAY:
257         {
258             size_t n = mPlaylist->size();
259             if (n > 0) {
260                 sp<AMessage> itemMeta;
261                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
262 
263                 int64_t itemDurationUs;
264                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
265 
266                 minPlaylistAgeUs = itemDurationUs;
267                 break;
268             }
269 
270             // fall through
271         }
272 
273         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
274         {
275             minPlaylistAgeUs = targetDurationUs / 2;
276             break;
277         }
278 
279         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
280         {
281             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
282             break;
283         }
284 
285         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
286         {
287             minPlaylistAgeUs = targetDurationUs * 3;
288             break;
289         }
290 
291         default:
292             TRESPASS();
293             break;
294     }
295 
296     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
297     return delayUs > 0ll ? delayUs : 0ll;
298 }
299 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)300 status_t PlaylistFetcher::decryptBuffer(
301         size_t playlistIndex, const sp<ABuffer> &buffer,
302         bool first) {
303     sp<AMessage> itemMeta;
304     bool found = false;
305     AString method;
306 
307     for (ssize_t i = playlistIndex; i >= 0; --i) {
308         AString uri;
309         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
310 
311         if (itemMeta->findString("cipher-method", &method)) {
312             found = true;
313             break;
314         }
315     }
316 
317     // TODO: Revise this when we add support for KEYFORMAT
318     // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
319     if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
320         ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
321                 mSampleAesKeyItem.get(), method.c_str());
322         mSampleAesKeyItem = NULL;
323         mSampleAesKeyItemChanged = true;
324     }
325 
326     if (!found) {
327         method = "NONE";
328     }
329     buffer->meta()->setString("cipher-method", method.c_str());
330 
331     if (method == "NONE") {
332         return OK;
333     } else if (method == "SAMPLE-AES") {
334         ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
335     } else if (!(method == "AES-128")) {
336         ALOGE("Unsupported cipher method '%s'", method.c_str());
337         return ERROR_UNSUPPORTED;
338     }
339 
340     AString keyURI;
341     if (!itemMeta->findString("cipher-uri", &keyURI)) {
342         ALOGE("Missing key uri");
343         return ERROR_MALFORMED;
344     }
345 
346     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
347 
348     sp<ABuffer> key;
349     if (index >= 0) {
350         key = mAESKeyForURI.valueAt(index);
351     } else if (keyURI.startsWith("data:")) {
352         sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
353         off64_t keyLen;
354         if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
355             ALOGE("Malformed cipher key data uri.");
356             return ERROR_MALFORMED;
357         }
358         key = new ABuffer(keyLen);
359         keySrc->readAt(0, key->data(), keyLen);
360         key->setRange(0, keyLen);
361     } else {
362         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
363 
364         if (err == ERROR_NOT_CONNECTED) {
365             return ERROR_NOT_CONNECTED;
366         } else if (err < 0) {
367             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
368             return ERROR_IO;
369         } else if (key->size() != 16) {
370             ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
371             return ERROR_MALFORMED;
372         }
373 
374         mAESKeyForURI.add(keyURI, key);
375     }
376 
377     if (first) {
378         // If decrypting the first block in a file, read the iv from the manifest
379         // or derive the iv from the file's sequence number.
380 
381         unsigned char AESInitVec[AES_BLOCK_SIZE];
382         AString iv;
383         if (itemMeta->findString("cipher-iv", &iv)) {
384             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
385                     || iv.size() > 16 * 2 + 2) {
386                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
387                 return ERROR_MALFORMED;
388             }
389 
390             while (iv.size() < 16 * 2 + 2) {
391                 iv.insert("0", 1, 2);
392             }
393 
394             memset(AESInitVec, 0, sizeof(AESInitVec));
395             for (size_t i = 0; i < 16; ++i) {
396                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
397                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
398                 if (!isxdigit(c1) || !isxdigit(c2)) {
399                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
400                     return ERROR_MALFORMED;
401                 }
402                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
403                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
404 
405                 AESInitVec[i] = nibble1 << 4 | nibble2;
406             }
407         } else {
408             memset(AESInitVec, 0, sizeof(AESInitVec));
409             AESInitVec[15] = mSeqNumber & 0xff;
410             AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
411             AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
412             AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
413         }
414 
415         bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
416         bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
417         bool newSampleAesKeyItem = newKey || newInitVec;
418         ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
419                 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
420 
421         if (newSampleAesKeyItem) {
422             memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
423             memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
424 
425             if (method == "SAMPLE-AES") {
426                 mSampleAesKeyItemChanged = true;
427 
428                 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
429                 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
430 
431                 // always allocating a new one rather than updating the old message
432                 // lower layer might still have a reference to the old message
433                 mSampleAesKeyItem = new AMessage();
434                 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
435                 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
436 
437                 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s  IV: %s",
438                         HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
439                         HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
440             } // SAMPLE-AES
441         } // newSampleAesKeyItem
442     } // first
443 
444     if (method == "SAMPLE-AES") {
445         ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
446         return OK;
447     }
448 
449 
450     AES_KEY aes_key;
451     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
452         ALOGE("failed to set AES decryption key.");
453         return UNKNOWN_ERROR;
454     }
455 
456     size_t n = buffer->size();
457     if (!n) {
458         return OK;
459     }
460 
461     if (n < 16 || n % 16) {
462         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
463         return ERROR_MALFORMED;
464     }
465 
466     AES_cbc_encrypt(
467             buffer->data(), buffer->data(), buffer->size(),
468             &aes_key, mAESInitVec, AES_DECRYPT);
469 
470     return OK;
471 }
472 
checkDecryptPadding(const sp<ABuffer> & buffer)473 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
474     AString method;
475     CHECK(buffer->meta()->findString("cipher-method", &method));
476     if (method == "NONE" || method == "SAMPLE-AES") {
477         return OK;
478     }
479 
480     uint8_t padding = 0;
481     if (buffer->size() > 0) {
482         padding = buffer->data()[buffer->size() - 1];
483     }
484 
485     if (padding > 16) {
486         return ERROR_MALFORMED;
487     }
488 
489     for (size_t i = buffer->size() - padding; i < padding; i++) {
490         if (buffer->data()[i] != padding) {
491             return ERROR_MALFORMED;
492         }
493     }
494 
495     buffer->setRange(buffer->offset(), buffer->size() - padding);
496     return OK;
497 }
498 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)499 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
500     int64_t maxDelayUs = delayUsToRefreshPlaylist();
501     if (maxDelayUs < minDelayUs) {
502         maxDelayUs = minDelayUs;
503     }
504     if (delayUs > maxDelayUs) {
505         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
506         delayUs = maxDelayUs;
507     }
508     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
509     msg->setInt32("generation", mMonitorQueueGeneration);
510     msg->post(delayUs);
511 }
512 
cancelMonitorQueue()513 void PlaylistFetcher::cancelMonitorQueue() {
514     ++mMonitorQueueGeneration;
515 }
516 
setStoppingThreshold(float thresholdRatio,bool disconnect)517 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
518     {
519         AutoMutex _l(mThresholdLock);
520         mThresholdRatio = thresholdRatio;
521     }
522     if (disconnect) {
523         mHTTPDownloader->disconnect();
524     }
525 }
526 
resetStoppingThreshold(bool disconnect)527 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
528     {
529         AutoMutex _l(mThresholdLock);
530         mThresholdRatio = -1.0f;
531     }
532     if (disconnect) {
533         mHTTPDownloader->disconnect();
534     } else {
535         // allow reconnect
536         mHTTPDownloader->reconnect();
537     }
538 }
539 
getStoppingThreshold()540 float PlaylistFetcher::getStoppingThreshold() {
541     AutoMutex _l(mThresholdLock);
542     return mThresholdRatio;
543 }
544 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)545 void PlaylistFetcher::startAsync(
546         const sp<AnotherPacketSource> &audioSource,
547         const sp<AnotherPacketSource> &videoSource,
548         const sp<AnotherPacketSource> &subtitleSource,
549         const sp<AnotherPacketSource> &metadataSource,
550         int64_t startTimeUs,
551         int64_t segmentStartTimeUs,
552         int32_t startDiscontinuitySeq,
553         LiveSession::SeekMode seekMode) {
554     sp<AMessage> msg = new AMessage(kWhatStart, this);
555 
556     uint32_t streamTypeMask = 0ul;
557 
558     if (audioSource != NULL) {
559         msg->setPointer("audioSource", audioSource.get());
560         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
561     }
562 
563     if (videoSource != NULL) {
564         msg->setPointer("videoSource", videoSource.get());
565         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
566     }
567 
568     if (subtitleSource != NULL) {
569         msg->setPointer("subtitleSource", subtitleSource.get());
570         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
571     }
572 
573     if (metadataSource != NULL) {
574         msg->setPointer("metadataSource", metadataSource.get());
575         // metadataSource does not affect streamTypeMask.
576     }
577 
578     msg->setInt32("streamTypeMask", streamTypeMask);
579     msg->setInt64("startTimeUs", startTimeUs);
580     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
581     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
582     msg->setInt32("seekMode", seekMode);
583     msg->post();
584 }
585 
586 /*
587  * pauseAsync
588  *
589  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
590  *           -1.0f - pause after finishing current segment
591  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
592  */
pauseAsync(float thresholdRatio,bool disconnect)593 void PlaylistFetcher::pauseAsync(
594         float thresholdRatio, bool disconnect) {
595     setStoppingThreshold(thresholdRatio, disconnect);
596 
597     (new AMessage(kWhatPause, this))->post();
598 }
599 
stopAsync(bool clear)600 void PlaylistFetcher::stopAsync(bool clear) {
601     setStoppingThreshold(0.0f, true /* disconncect */);
602 
603     sp<AMessage> msg = new AMessage(kWhatStop, this);
604     msg->setInt32("clear", clear);
605     msg->post();
606 }
607 
resumeUntilAsync(const sp<AMessage> & params)608 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
609     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
610 
611     AMessage* msg = new AMessage(kWhatResumeUntil, this);
612     msg->setMessage("params", params);
613     msg->post();
614 }
615 
fetchPlaylistAsync()616 void PlaylistFetcher::fetchPlaylistAsync() {
617     (new AMessage(kWhatFetchPlaylist, this))->post();
618 }
619 
onMessageReceived(const sp<AMessage> & msg)620 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
621     switch (msg->what()) {
622         case kWhatStart:
623         {
624             status_t err = onStart(msg);
625 
626             sp<AMessage> notify = mNotify->dup();
627             notify->setInt32("what", kWhatStarted);
628             notify->setInt32("err", err);
629             notify->post();
630             break;
631         }
632 
633         case kWhatPause:
634         {
635             onPause();
636 
637             sp<AMessage> notify = mNotify->dup();
638             notify->setInt32("what", kWhatPaused);
639             notify->setInt32("seekMode",
640                     mDownloadState->hasSavedState()
641                     ? LiveSession::kSeekModeNextSample
642                     : LiveSession::kSeekModeNextSegment);
643             notify->post();
644             break;
645         }
646 
647         case kWhatStop:
648         {
649             onStop(msg);
650 
651             sp<AMessage> notify = mNotify->dup();
652             notify->setInt32("what", kWhatStopped);
653             notify->post();
654             break;
655         }
656 
657         case kWhatFetchPlaylist:
658         {
659             bool unchanged;
660             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
661                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
662 
663             sp<AMessage> notify = mNotify->dup();
664             notify->setInt32("what", kWhatPlaylistFetched);
665             notify->setObject("playlist", playlist);
666             notify->post();
667             break;
668         }
669 
670         case kWhatMonitorQueue:
671         case kWhatDownloadNext:
672         {
673             int32_t generation;
674             CHECK(msg->findInt32("generation", &generation));
675 
676             if (generation != mMonitorQueueGeneration) {
677                 // Stale event
678                 break;
679             }
680 
681             if (msg->what() == kWhatMonitorQueue) {
682                 onMonitorQueue();
683             } else {
684                 onDownloadNext();
685             }
686             break;
687         }
688 
689         case kWhatResumeUntil:
690         {
691             onResumeUntil(msg);
692             break;
693         }
694 
695         default:
696             TRESPASS();
697     }
698 }
699 
onStart(const sp<AMessage> & msg)700 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
701     mPacketSources.clear();
702     mStopParams.clear();
703     mStartTimeUsNotify = mNotify->dup();
704     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
705     mStartTimeUsNotify->setString("uri", mURI);
706 
707     uint32_t streamTypeMask;
708     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
709 
710     int64_t startTimeUs;
711     int64_t segmentStartTimeUs;
712     int32_t startDiscontinuitySeq;
713     int32_t seekMode;
714     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
715     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
716     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
717     CHECK(msg->findInt32("seekMode", &seekMode));
718 
719     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
720         void *ptr;
721         CHECK(msg->findPointer("audioSource", &ptr));
722 
723         mPacketSources.add(
724                 LiveSession::STREAMTYPE_AUDIO,
725                 static_cast<AnotherPacketSource *>(ptr));
726     }
727 
728     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
729         void *ptr;
730         CHECK(msg->findPointer("videoSource", &ptr));
731 
732         mPacketSources.add(
733                 LiveSession::STREAMTYPE_VIDEO,
734                 static_cast<AnotherPacketSource *>(ptr));
735     }
736 
737     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
738         void *ptr;
739         CHECK(msg->findPointer("subtitleSource", &ptr));
740 
741         mPacketSources.add(
742                 LiveSession::STREAMTYPE_SUBTITLES,
743                 static_cast<AnotherPacketSource *>(ptr));
744     }
745 
746     void *ptr;
747     // metadataSource is not part of streamTypeMask
748     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
749             && msg->findPointer("metadataSource", &ptr)) {
750         mPacketSources.add(
751                 LiveSession::STREAMTYPE_METADATA,
752                 static_cast<AnotherPacketSource *>(ptr));
753     }
754 
755     mStreamTypeMask = streamTypeMask;
756 
757     mSegmentStartTimeUs = segmentStartTimeUs;
758 
759     if (startDiscontinuitySeq >= 0) {
760         mDiscontinuitySeq = startDiscontinuitySeq;
761     }
762 
763     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
764     mSeekMode = (LiveSession::SeekMode) seekMode;
765 
766     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
767         mStartup = true;
768         mIDRFound = false;
769         mVideoBuffer->clear();
770     }
771 
772     if (startTimeUs >= 0) {
773         mStartTimeUs = startTimeUs;
774         mFirstPTSValid = false;
775         mSeqNumber = -1;
776         mTimeChangeSignaled = false;
777         mDownloadState->resetState();
778     }
779 
780     postMonitorQueue();
781 
782     return OK;
783 }
784 
onPause()785 void PlaylistFetcher::onPause() {
786     cancelMonitorQueue();
787     mLastDiscontinuitySeq = mDiscontinuitySeq;
788 
789     resetStoppingThreshold(false /* disconnect */);
790 }
791 
onStop(const sp<AMessage> & msg)792 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
793     cancelMonitorQueue();
794 
795     int32_t clear;
796     CHECK(msg->findInt32("clear", &clear));
797     if (clear) {
798         for (size_t i = 0; i < mPacketSources.size(); i++) {
799             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
800             packetSource->clear();
801         }
802     }
803 
804     mDownloadState->resetState();
805     mPacketSources.clear();
806     mStreamTypeMask = 0;
807 
808     resetStoppingThreshold(true /* disconnect */);
809 }
810 
811 // Resume until we have reached the boundary timestamps listed in `msg`; when
812 // the remaining time is too short (within a resume threshold) stop immediately
813 // instead.
onResumeUntil(const sp<AMessage> & msg)814 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
815     sp<AMessage> params;
816     CHECK(msg->findMessage("params", &params));
817 
818     mStopParams = params;
819     onDownloadNext();
820 
821     return OK;
822 }
823 
notifyStopReached()824 void PlaylistFetcher::notifyStopReached() {
825     sp<AMessage> notify = mNotify->dup();
826     notify->setInt32("what", kWhatStopReached);
827     notify->post();
828 }
829 
notifyError(status_t err)830 void PlaylistFetcher::notifyError(status_t err) {
831     sp<AMessage> notify = mNotify->dup();
832     notify->setInt32("what", kWhatError);
833     notify->setInt32("err", err);
834     notify->post();
835 }
836 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)837 void PlaylistFetcher::queueDiscontinuity(
838         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
839     for (size_t i = 0; i < mPacketSources.size(); ++i) {
840         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
841         // (seek will discard buffer by abandoning old fetchers)
842         mPacketSources.valueAt(i)->queueDiscontinuity(
843                 type, extra, false /* discard */);
844     }
845 }
846 
onMonitorQueue()847 void PlaylistFetcher::onMonitorQueue() {
848     // in the middle of an unfinished download, delay
849     // playlist refresh as it'll change seq numbers
850     if (!mDownloadState->hasSavedState()) {
851         refreshPlaylist();
852     }
853 
854     int64_t targetDurationUs = kMinBufferedDurationUs;
855     if (mPlaylist != NULL) {
856         targetDurationUs = mPlaylist->getTargetDuration();
857     }
858 
859     int64_t bufferedDurationUs = 0ll;
860     status_t finalResult = OK;
861     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
862         sp<AnotherPacketSource> packetSource =
863             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
864 
865         bufferedDurationUs =
866                 packetSource->getBufferedDurationUs(&finalResult);
867     } else {
868         // Use min stream duration, but ignore streams that never have any packet
869         // enqueued to prevent us from waiting on a non-existent stream;
870         // when we cannot make out from the manifest what streams are included in
871         // a playlist we might assume extra streams.
872         bufferedDurationUs = -1ll;
873         for (size_t i = 0; i < mPacketSources.size(); ++i) {
874             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
875                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
876                 continue;
877             }
878 
879             int64_t bufferedStreamDurationUs =
880                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
881 
882             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
883 
884             if (bufferedDurationUs == -1ll
885                  || bufferedStreamDurationUs < bufferedDurationUs) {
886                 bufferedDurationUs = bufferedStreamDurationUs;
887             }
888         }
889         if (bufferedDurationUs == -1ll) {
890             bufferedDurationUs = 0ll;
891         }
892     }
893 
894     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
895         FLOGV("monitoring, buffered=%lld < %lld",
896                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
897 
898         // delay the next download slightly; hopefully this gives other concurrent fetchers
899         // a better chance to run.
900         // onDownloadNext();
901         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
902         msg->setInt32("generation", mMonitorQueueGeneration);
903         msg->post(1000l);
904     } else {
905         // We'd like to maintain buffering above durationToBufferUs, so try
906         // again when buffer just about to go below durationToBufferUs
907         // (or after targetDurationUs / 2, whichever is smaller).
908         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
909         if (delayUs > targetDurationUs / 2) {
910             delayUs = targetDurationUs / 2;
911         }
912 
913         FLOGV("pausing for %lld, buffered=%lld > %lld",
914                 (long long)delayUs,
915                 (long long)bufferedDurationUs,
916                 (long long)kMinBufferedDurationUs);
917 
918         postMonitorQueue(delayUs);
919     }
920 }
921 
refreshPlaylist()922 status_t PlaylistFetcher::refreshPlaylist() {
923     if (delayUsToRefreshPlaylist() <= 0) {
924         bool unchanged;
925         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
926                 mURI.c_str(), mPlaylistHash, &unchanged);
927 
928         if (playlist == NULL) {
929             if (unchanged) {
930                 // We succeeded in fetching the playlist, but it was
931                 // unchanged from the last time we tried.
932 
933                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
934                     mRefreshState = (RefreshState)(mRefreshState + 1);
935                 }
936             } else {
937                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
938                 return ERROR_IO;
939             }
940         } else {
941             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
942             mPlaylist = playlist;
943 
944             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
945                 updateDuration();
946             }
947             // Notify LiveSession to use target-duration based buffering level
948             // for up/down switch. Default LiveSession::kUpSwitchMark may not
949             // be reachable for live streams, as our max buffering amount is
950             // limited to 3 segments.
951             if (!mPlaylist->isComplete()) {
952                 updateTargetDuration();
953             }
954             mPlaylistTimeUs = ALooper::GetNowUs();
955         }
956 
957         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
958     }
959     return OK;
960 }
961 
962 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)963 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
964     return buffer->size() > 0 && buffer->data()[0] == 0x47;
965 }
966 
shouldPauseDownload()967 bool PlaylistFetcher::shouldPauseDownload() {
968     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
969         // doesn't apply to subtitles
970         return false;
971     }
972 
973     // Calculate threshold to abort current download
974     float thresholdRatio = getStoppingThreshold();
975 
976     if (thresholdRatio < 0.0f) {
977         // never abort
978         return false;
979     } else if (thresholdRatio == 0.0f) {
980         // immediately abort
981         return true;
982     }
983 
984     // now we have a positive thresholdUs, abort if remaining
985     // portion to download is over that threshold.
986     if (mSegmentFirstPTS < 0) {
987         // this means we haven't even find the first access unit,
988         // abort now as we must be very far away from the end.
989         return true;
990     }
991     int64_t lastEnqueueUs = mSegmentFirstPTS;
992     for (size_t i = 0; i < mPacketSources.size(); ++i) {
993         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
994             continue;
995         }
996         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
997         int32_t type;
998         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
999             continue;
1000         }
1001         int64_t tmpUs;
1002         CHECK(meta->findInt64("timeUs", &tmpUs));
1003         if (tmpUs > lastEnqueueUs) {
1004             lastEnqueueUs = tmpUs;
1005         }
1006     }
1007     lastEnqueueUs -= mSegmentFirstPTS;
1008 
1009     int64_t targetDurationUs = mPlaylist->getTargetDuration();
1010     int64_t thresholdUs = thresholdRatio * targetDurationUs;
1011 
1012     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1013             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1014             (long long)thresholdUs,
1015             (long long)(targetDurationUs - lastEnqueueUs));
1016 
1017     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1018         return true;
1019     }
1020     return false;
1021 }
1022 
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1023 void PlaylistFetcher::initSeqNumberForLiveStream(
1024         int32_t &firstSeqNumberInPlaylist,
1025         int32_t &lastSeqNumberInPlaylist) {
1026     // start at least 3 target durations from the end.
1027     int64_t timeFromEnd = 0;
1028     size_t index = mPlaylist->size();
1029     sp<AMessage> itemMeta;
1030     int64_t itemDurationUs;
1031     int32_t targetDuration;
1032     if (mPlaylist->meta() != NULL
1033             && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1034         do {
1035             --index;
1036             if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1037                     || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1038                 ALOGW("item or itemDurationUs missing");
1039                 mSeqNumber = lastSeqNumberInPlaylist - 3;
1040                 break;
1041             }
1042 
1043             timeFromEnd += itemDurationUs;
1044             mSeqNumber = firstSeqNumberInPlaylist + index;
1045         } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1046     } else {
1047         ALOGW("target-duration missing");
1048         mSeqNumber = lastSeqNumberInPlaylist - 3;
1049     }
1050 
1051     if (mSeqNumber < firstSeqNumberInPlaylist) {
1052         mSeqNumber = firstSeqNumberInPlaylist;
1053     }
1054 }
1055 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1056 bool PlaylistFetcher::initDownloadState(
1057         AString &uri,
1058         sp<AMessage> &itemMeta,
1059         int32_t &firstSeqNumberInPlaylist,
1060         int32_t &lastSeqNumberInPlaylist) {
1061     status_t err = refreshPlaylist();
1062     firstSeqNumberInPlaylist = 0;
1063     lastSeqNumberInPlaylist = 0;
1064     bool discontinuity = false;
1065 
1066     if (mPlaylist != NULL) {
1067         mPlaylist->getSeqNumberRange(
1068                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1069 
1070         if (mDiscontinuitySeq < 0) {
1071             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1072         }
1073     }
1074 
1075     mSegmentFirstPTS = -1ll;
1076 
1077     if (mPlaylist != NULL && mSeqNumber < 0) {
1078         CHECK_GE(mStartTimeUs, 0ll);
1079 
1080         if (mSegmentStartTimeUs < 0) {
1081             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1082                 // this is a live session
1083                 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1084             } else {
1085                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1086                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1087                 // and relative position inside a segment
1088                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1089                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1090             }
1091             mStartTimeUsRelative = true;
1092             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1093                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1094                     lastSeqNumberInPlaylist);
1095         } else {
1096             // When adapting or track switching, mSegmentStartTimeUs (relative
1097             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1098             // timestamps coming from the media container) is used to determine the position
1099             // inside a segments.
1100             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1101                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1102                 // avoid double fetch/decode
1103                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1104                 // for the starting segment in new variant.
1105                 // If the two variants' segments are aligned, this gives the
1106                 // next segment. If they're not aligned, this gives the segment
1107                 // that overlaps no more than 1/2 * targetDurationUs.
1108                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1109                         + mPlaylist->getTargetDuration() / 2);
1110             } else {
1111                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1112             }
1113             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1114             if (mSeqNumber < minSeq) {
1115                 mSeqNumber = minSeq;
1116             }
1117 
1118             if (mSeqNumber < firstSeqNumberInPlaylist) {
1119                 mSeqNumber = firstSeqNumberInPlaylist;
1120             }
1121 
1122             if (mSeqNumber > lastSeqNumberInPlaylist) {
1123                 mSeqNumber = lastSeqNumberInPlaylist;
1124             }
1125             FLOGV("Initial sequence number is %d from (%d .. %d)",
1126                     mSeqNumber, firstSeqNumberInPlaylist,
1127                     lastSeqNumberInPlaylist);
1128         }
1129     }
1130 
1131     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1132     if (mSeqNumber < firstSeqNumberInPlaylist
1133             || mSeqNumber > lastSeqNumberInPlaylist
1134             || err != OK) {
1135         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1136             ++mNumRetries;
1137 
1138             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1139                 // make sure we reach this retry logic on refresh failures
1140                 // by adding an err != OK clause to all enclosing if's.
1141 
1142                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1143                 // playlist's target duration or 3 seconds, whichever is less
1144                 int64_t delayUs = kMaxMonitorDelayUs;
1145                 if (mPlaylist != NULL) {
1146                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1147                             / (1 + mNumRetries);
1148                 }
1149                 if (delayUs > kMaxMonitorDelayUs) {
1150                     delayUs = kMaxMonitorDelayUs;
1151                 }
1152                 FLOGV("sequence number high: %d from (%d .. %d), "
1153                       "monitor in %lld (retry=%d)",
1154                         mSeqNumber, firstSeqNumberInPlaylist,
1155                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1156                 postMonitorQueue(delayUs);
1157                 return false;
1158             }
1159 
1160             if (err != OK) {
1161                 notifyError(err);
1162                 return false;
1163             }
1164 
1165             // we've missed the boat, let's start 3 segments prior to the latest sequence
1166             // number available and signal a discontinuity.
1167 
1168             ALOGI("We've missed the boat, restarting playback."
1169                   "  mStartup=%d, was  looking for %d in %d-%d",
1170                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1171                     lastSeqNumberInPlaylist);
1172             if (mStopParams != NULL) {
1173                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1174                 // but since the segments we are supposed to fetch have already rolled off
1175                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1176                 // skip.
1177                 notifyStopReached();
1178                 return false;
1179             }
1180             mSeqNumber = lastSeqNumberInPlaylist - 3;
1181             if (mSeqNumber < firstSeqNumberInPlaylist) {
1182                 mSeqNumber = firstSeqNumberInPlaylist;
1183             }
1184             discontinuity = true;
1185 
1186             // fall through
1187         } else {
1188             if (mPlaylist != NULL) {
1189                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1190                         && !mPlaylist->isComplete()) {
1191                     // Live playlists
1192                     ALOGW("sequence number %d not yet available", mSeqNumber);
1193                     postMonitorQueue(delayUsToRefreshPlaylist());
1194                     return false;
1195                 }
1196                 ALOGE("Cannot find sequence number %d in playlist "
1197                      "(contains %d - %d)",
1198                      mSeqNumber, firstSeqNumberInPlaylist,
1199                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1200 
1201                 if (mTSParser != NULL) {
1202                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1203                     // Use an empty buffer; we don't have any new data, just want to extract
1204                     // potential new access units after flush.  Reset mSeqNumber to
1205                     // lastSeqNumberInPlaylist such that we set the correct access unit
1206                     // properties in extractAndQueueAccessUnitsFromTs.
1207                     sp<ABuffer> buffer = new ABuffer(0);
1208                     mSeqNumber = lastSeqNumberInPlaylist;
1209                     extractAndQueueAccessUnitsFromTs(buffer);
1210                 }
1211                 notifyError(ERROR_END_OF_STREAM);
1212             } else {
1213                 // It's possible that we were never able to download the playlist.
1214                 // In this case we should notify error, instead of EOS, as EOS during
1215                 // prepare means we succeeded in downloading everything.
1216                 ALOGE("Failed to download playlist!");
1217                 notifyError(ERROR_IO);
1218             }
1219 
1220             return false;
1221         }
1222     }
1223 
1224     mNumRetries = 0;
1225 
1226     CHECK(mPlaylist->itemAt(
1227                 mSeqNumber - firstSeqNumberInPlaylist,
1228                 &uri,
1229                 &itemMeta));
1230 
1231     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1232 
1233     int32_t val;
1234     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1235         discontinuity = true;
1236     } else if (mLastDiscontinuitySeq >= 0
1237             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1238         // Seek jumped to a new discontinuity sequence. We need to signal
1239         // a format change to decoder. Decoder needs to shutdown and be
1240         // created again if seamless format change is unsupported.
1241         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1242                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1243                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1244         discontinuity = true;
1245     }
1246     mLastDiscontinuitySeq = -1;
1247 
1248     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1249     // this avoids interleaved connections to the key and segment file.
1250     {
1251         sp<ABuffer> junk = new ABuffer(16);
1252         junk->setRange(0, 16);
1253         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1254                 true /* first */);
1255         if (err == ERROR_NOT_CONNECTED) {
1256             return false;
1257         } else if (err != OK) {
1258             notifyError(err);
1259             return false;
1260         }
1261     }
1262 
1263     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1264         // We need to signal a time discontinuity to ATSParser on the
1265         // first segment after start, or on a discontinuity segment.
1266         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1267         // to send the time discontinuity.
1268         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1269             // If this was a live event this made no sense since
1270             // we don't have access to all the segment before the current
1271             // one.
1272             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1273         }
1274 
1275         // Setting mTimeChangeSignaled to true, so that if start time
1276         // searching goes into 2nd segment (without a discontinuity),
1277         // we don't reset time again. It causes corruption when pending
1278         // data in ATSParser is cleared.
1279         mTimeChangeSignaled = true;
1280     }
1281 
1282     if (discontinuity) {
1283         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1284 
1285         // Signal a format discontinuity to ATSParser to clear partial data
1286         // from previous streams. Not doing this causes bitstream corruption.
1287         if (mTSParser != NULL) {
1288             mTSParser.clear();
1289         }
1290 
1291         queueDiscontinuity(
1292                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1293                 NULL /* extra */);
1294 
1295         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1296             // This means we guessed mStartTimeUs to be in the previous
1297             // segment (likely very close to the end), but either video or
1298             // audio has not found start by the end of that segment.
1299             //
1300             // If this new segment is not a discontinuity, keep searching.
1301             //
1302             // If this new segment even got a discontinuity marker, just
1303             // set mStartTimeUs=0, and take all samples from now on.
1304             mStartTimeUs = 0;
1305             mFirstPTSValid = false;
1306             mIDRFound = false;
1307             mVideoBuffer->clear();
1308         }
1309     }
1310 
1311     FLOGV("fetching segment %d from (%d .. %d)",
1312             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1313     return true;
1314 }
1315 
onDownloadNext()1316 void PlaylistFetcher::onDownloadNext() {
1317     AString uri;
1318     sp<AMessage> itemMeta;
1319     sp<ABuffer> buffer;
1320     sp<ABuffer> tsBuffer;
1321     int32_t firstSeqNumberInPlaylist = 0;
1322     int32_t lastSeqNumberInPlaylist = 0;
1323     bool connectHTTP = true;
1324 
1325     if (mDownloadState->hasSavedState()) {
1326         mDownloadState->restoreState(
1327                 uri,
1328                 itemMeta,
1329                 buffer,
1330                 tsBuffer,
1331                 firstSeqNumberInPlaylist,
1332                 lastSeqNumberInPlaylist);
1333         connectHTTP = false;
1334         FLOGV("resuming: '%s'", uri.c_str());
1335     } else {
1336         if (!initDownloadState(
1337                 uri,
1338                 itemMeta,
1339                 firstSeqNumberInPlaylist,
1340                 lastSeqNumberInPlaylist)) {
1341             return;
1342         }
1343         FLOGV("fetching: '%s'", uri.c_str());
1344     }
1345 
1346     int64_t range_offset, range_length;
1347     if (!itemMeta->findInt64("range-offset", &range_offset)
1348             || !itemMeta->findInt64("range-length", &range_length)) {
1349         range_offset = 0;
1350         range_length = -1;
1351     }
1352 
1353     // block-wise download
1354     bool shouldPause = false;
1355     ssize_t bytesRead;
1356     do {
1357         int64_t startUs = ALooper::GetNowUs();
1358         bytesRead = mHTTPDownloader->fetchBlock(
1359                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1360                 NULL /* actualURL */, connectHTTP);
1361         int64_t delayUs = ALooper::GetNowUs() - startUs;
1362 
1363         if (bytesRead == ERROR_NOT_CONNECTED) {
1364             return;
1365         }
1366         if (bytesRead < 0) {
1367             status_t err = bytesRead;
1368             ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1369             notifyError(err);
1370             return;
1371         }
1372 
1373         // add sample for bandwidth estimation, excluding samples from subtitles (as
1374         // its too small), or during startup/resumeUntil (when we could have more than
1375         // one connection open which affects bandwidth)
1376         if (!mStartup && mStopParams == NULL && bytesRead > 0
1377                 && (mStreamTypeMask
1378                         & (LiveSession::STREAMTYPE_AUDIO
1379                         | LiveSession::STREAMTYPE_VIDEO))) {
1380             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1381             if (delayUs > 2000000ll) {
1382                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1383                         bytesRead, (double)delayUs / 1.0e6);
1384             }
1385         }
1386 
1387         connectHTTP = false;
1388 
1389         CHECK(buffer != NULL);
1390 
1391         size_t size = buffer->size();
1392         // Set decryption range.
1393         buffer->setRange(size - bytesRead, bytesRead);
1394         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1395                 buffer->offset() == 0 /* first */);
1396         // Unset decryption range.
1397         buffer->setRange(0, size);
1398 
1399         if (err != OK) {
1400             ALOGE("decryptBuffer failed w/ error %d", err);
1401 
1402             notifyError(err);
1403             return;
1404         }
1405 
1406         bool startUp = mStartup; // save current start up state
1407 
1408         err = OK;
1409         if (bufferStartsWithTsSyncByte(buffer)) {
1410             // Incremental extraction is only supported for MPEG2 transport streams.
1411             if (tsBuffer == NULL) {
1412                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1413                 tsBuffer->setRange(0, 0);
1414             } else if (tsBuffer->capacity() != buffer->capacity()) {
1415                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1416                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1417                 tsBuffer->setRange(tsOff, tsSize);
1418             }
1419             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1420             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1421         }
1422 
1423         if (err == -EAGAIN) {
1424             // starting sequence number too low/high
1425             mTSParser.clear();
1426             for (size_t i = 0; i < mPacketSources.size(); i++) {
1427                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1428                 packetSource->clear();
1429             }
1430             postMonitorQueue();
1431             return;
1432         } else if (err == ERROR_OUT_OF_RANGE) {
1433             // reached stopping point
1434             notifyStopReached();
1435             return;
1436         } else if (err != OK) {
1437             notifyError(err);
1438             return;
1439         }
1440         // If we're switching, post start notification
1441         // this should only be posted when the last chunk is full processed by TSParser
1442         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1443             CHECK(mStartTimeUsNotify != NULL);
1444             mStartTimeUsNotify->post();
1445             mStartTimeUsNotify.clear();
1446             shouldPause = true;
1447         }
1448         if (shouldPause || shouldPauseDownload()) {
1449             // save state and return if this is not the last chunk,
1450             // leaving the fetcher in paused state.
1451             if (bytesRead != 0) {
1452                 mDownloadState->saveState(
1453                         uri,
1454                         itemMeta,
1455                         buffer,
1456                         tsBuffer,
1457                         firstSeqNumberInPlaylist,
1458                         lastSeqNumberInPlaylist);
1459                 return;
1460             }
1461             shouldPause = true;
1462         }
1463     } while (bytesRead != 0);
1464 
1465     if (bufferStartsWithTsSyncByte(buffer)) {
1466         // If we don't see a stream in the program table after fetching a full ts segment
1467         // mark it as nonexistent.
1468         ATSParser::SourceType srcTypes[] =
1469                 { ATSParser::VIDEO, ATSParser::AUDIO };
1470         LiveSession::StreamType streamTypes[] =
1471                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1472         const size_t kNumTypes = NELEM(srcTypes);
1473 
1474         for (size_t i = 0; i < kNumTypes; i++) {
1475             ATSParser::SourceType srcType = srcTypes[i];
1476             LiveSession::StreamType streamType = streamTypes[i];
1477 
1478             sp<AnotherPacketSource> source =
1479                 static_cast<AnotherPacketSource *>(
1480                     mTSParser->getSource(srcType).get());
1481 
1482             if (!mTSParser->hasSource(srcType)) {
1483                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1484                       srcType == ATSParser::VIDEO ? "video" : "audio");
1485 
1486                 mStreamTypeMask &= ~streamType;
1487                 mPacketSources.removeItem(streamType);
1488             }
1489         }
1490 
1491     }
1492 
1493     if (checkDecryptPadding(buffer) != OK) {
1494         ALOGE("Incorrect padding bytes after decryption.");
1495         notifyError(ERROR_MALFORMED);
1496         return;
1497     }
1498 
1499     if (tsBuffer != NULL) {
1500         AString method;
1501         CHECK(buffer->meta()->findString("cipher-method", &method));
1502         if ((tsBuffer->size() > 0 && method == "NONE")
1503                 || tsBuffer->size() > 16) {
1504             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1505                     "bytes in length.");
1506             notifyError(ERROR_MALFORMED);
1507             return;
1508         }
1509     }
1510 
1511     // bulk extract non-ts files
1512     bool startUp = mStartup;
1513     if (tsBuffer == NULL) {
1514         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1515         if (err == -EAGAIN) {
1516             // starting sequence number too low/high
1517             postMonitorQueue();
1518             return;
1519         } else if (err == ERROR_OUT_OF_RANGE) {
1520             // reached stopping point
1521             notifyStopReached();
1522             return;
1523         } else if (err != OK) {
1524             notifyError(err);
1525             return;
1526         }
1527     }
1528 
1529     ++mSeqNumber;
1530 
1531     // if adapting, pause after found the next starting point
1532     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1533         CHECK(mStartTimeUsNotify != NULL);
1534         mStartTimeUsNotify->post();
1535         mStartTimeUsNotify.clear();
1536         shouldPause = true;
1537     }
1538 
1539     if (!shouldPause) {
1540         postMonitorQueue();
1541     }
1542 }
1543 
1544 /*
1545  * returns true if we need to adjust mSeqNumber
1546  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1547 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1548     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1549 
1550     int64_t minDiffUs, maxDiffUs;
1551     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1552         // if the previous fetcher paused in the middle of a segment, we
1553         // want to start at a segment that overlaps the last sample
1554         minDiffUs = -mPlaylist->getTargetDuration();
1555         maxDiffUs = 0ll;
1556     } else {
1557         // if the previous fetcher paused at the end of a segment, ideally
1558         // we want to start at the segment that's roughly aligned with its
1559         // next segment, but if the two variants are not well aligned we
1560         // adjust the diff to within (-T/2, T/2)
1561         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1562         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1563     }
1564 
1565     int32_t oldSeqNumber = mSeqNumber;
1566     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1567 
1568     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1569     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1570     if (diffUs > maxDiffUs) {
1571         while (index > 0 && diffUs > maxDiffUs) {
1572             --index;
1573 
1574             sp<AMessage> itemMeta;
1575             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1576 
1577             int64_t itemDurationUs;
1578             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1579 
1580             diffUs -= itemDurationUs;
1581         }
1582     } else if (diffUs < minDiffUs) {
1583         while (index + 1 < (ssize_t) mPlaylist->size()
1584                 && diffUs < minDiffUs) {
1585             ++index;
1586 
1587             sp<AMessage> itemMeta;
1588             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1589 
1590             int64_t itemDurationUs;
1591             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1592 
1593             diffUs += itemDurationUs;
1594         }
1595     }
1596 
1597     mSeqNumber = firstSeqNumberInPlaylist + index;
1598 
1599     if (mSeqNumber != oldSeqNumber) {
1600         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1601                 (long long) anchorTimeUs - mStartTimeUs,
1602                 (long long) minDiffUs,
1603                 (long long) maxDiffUs);
1604         return true;
1605     }
1606     return false;
1607 }
1608 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1609 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1610     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1611 
1612     size_t index = 0;
1613     while (index < mPlaylist->size()) {
1614         sp<AMessage> itemMeta;
1615         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1616         size_t curDiscontinuitySeq;
1617         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1618         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1619         if (curDiscontinuitySeq == discontinuitySeq) {
1620             return seqNumber;
1621         } else if (curDiscontinuitySeq > discontinuitySeq) {
1622             return seqNumber <= 0 ? 0 : seqNumber - 1;
1623         }
1624 
1625         ++index;
1626     }
1627 
1628     return firstSeqNumberInPlaylist + mPlaylist->size();
1629 }
1630 
getSeqNumberForTime(int64_t timeUs) const1631 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1632     size_t index = 0;
1633     int64_t segmentStartUs = 0;
1634     while (index < mPlaylist->size()) {
1635         sp<AMessage> itemMeta;
1636         CHECK(mPlaylist->itemAt(
1637                     index, NULL /* uri */, &itemMeta));
1638 
1639         int64_t itemDurationUs;
1640         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1641 
1642         if (timeUs < segmentStartUs + itemDurationUs) {
1643             break;
1644         }
1645 
1646         segmentStartUs += itemDurationUs;
1647         ++index;
1648     }
1649 
1650     if (index >= mPlaylist->size()) {
1651         index = mPlaylist->size() - 1;
1652     }
1653 
1654     return mPlaylist->getFirstSeqNumber() + index;
1655 }
1656 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1657 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1658         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1659     sp<MetaData> format = source->getFormat();
1660     if (format != NULL) {
1661         // for simplicity, store a reference to the format in each unit
1662         accessUnit->meta()->setObject("format", format);
1663     }
1664 
1665     if (discard) {
1666         accessUnit->meta()->setInt32("discard", discard);
1667     }
1668 
1669     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1670     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1671     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1672     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1673     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1674         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1675     }
1676     return accessUnit;
1677 }
1678 
isStartTimeReached(int64_t timeUs)1679 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1680     if (!mFirstPTSValid) {
1681         mFirstTimeUs = timeUs;
1682         mFirstPTSValid = true;
1683     }
1684     bool startTimeReached = true;
1685     if (mStartTimeUsRelative) {
1686         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1687                 (long long)timeUs,
1688                 (long long)mFirstTimeUs,
1689                 (long long)(timeUs - mFirstTimeUs));
1690         timeUs -= mFirstTimeUs;
1691         if (timeUs < 0) {
1692             FLOGV("clamp negative timeUs to 0");
1693             timeUs = 0;
1694         }
1695         startTimeReached = (timeUs >= mStartTimeUs);
1696     }
1697     return startTimeReached;
1698 }
1699 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1700 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1701     if (mTSParser == NULL) {
1702         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1703         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1704     }
1705 
1706     if (mNextPTSTimeUs >= 0ll) {
1707         sp<AMessage> extra = new AMessage;
1708         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1709         // ATSParser from skewing the timestamps of access units.
1710         extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1711 
1712         // When adapting, signal a recent media time to the parser,
1713         // so that PTS wrap around is handled for the new variant.
1714         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1715             extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1716         }
1717 
1718         mTSParser->signalDiscontinuity(
1719                 ATSParser::DISCONTINUITY_TIME, extra);
1720 
1721         mNextPTSTimeUs = -1ll;
1722     }
1723 
1724     if (mSampleAesKeyItemChanged) {
1725         mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1726         mSampleAesKeyItemChanged = false;
1727     }
1728 
1729     size_t offset = 0;
1730     while (offset + 188 <= buffer->size()) {
1731         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1732 
1733         if (err != OK) {
1734             return err;
1735         }
1736 
1737         offset += 188;
1738     }
1739     // setRange to indicate consumed bytes.
1740     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1741 
1742     if (mSegmentFirstPTS < 0ll) {
1743         // get the smallest first PTS from all streams present in this parser
1744         for (size_t i = mPacketSources.size(); i > 0;) {
1745             i--;
1746             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1747             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1748                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1749                 return ERROR_MALFORMED;
1750             }
1751             if (stream == LiveSession::STREAMTYPE_METADATA) {
1752                 continue;
1753             }
1754             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1755             sp<AnotherPacketSource> source =
1756                 static_cast<AnotherPacketSource *>(
1757                         mTSParser->getSource(type).get());
1758 
1759             if (source == NULL) {
1760                 continue;
1761             }
1762             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1763             if (meta != NULL) {
1764                 int64_t timeUs;
1765                 CHECK(meta->findInt64("timeUs", &timeUs));
1766                 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1767                     mSegmentFirstPTS = timeUs;
1768                 }
1769             }
1770         }
1771         if (mSegmentFirstPTS < 0ll) {
1772             // didn't find any TS packet, can return early
1773             return OK;
1774         }
1775         if (!mStartTimeUsRelative) {
1776             // mStartup
1777             //   mStartup is true until we have queued a packet for all the streams
1778             //   we are fetching. We queue packets whose timestamps are greater than
1779             //   mStartTimeUs.
1780             // mSegmentStartTimeUs >= 0
1781             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1782             // adjustSeqNumberWithAnchorTime(timeUs) == true
1783             //   we guessed a seq number that's either too large or too small.
1784             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1785             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1786             // to -1 so that we don't enter this chunk next time.
1787             if (mStartup && mSegmentStartTimeUs >= 0
1788                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1789                 mStartTimeUsNotify = mNotify->dup();
1790                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1791                 mStartTimeUsNotify->setString("uri", mURI);
1792                 mIDRFound = false;
1793                 mSegmentStartTimeUs = -1;
1794                 return -EAGAIN;
1795             }
1796         }
1797     }
1798 
1799     status_t err = OK;
1800     for (size_t i = mPacketSources.size(); i > 0;) {
1801         i--;
1802         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1803 
1804         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1805         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1806             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1807             return ERROR_MALFORMED;
1808         }
1809 
1810         const char *key = LiveSession::getKeyForStream(stream);
1811         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1812 
1813         sp<AnotherPacketSource> source =
1814             static_cast<AnotherPacketSource *>(
1815                     mTSParser->getSource(type).get());
1816 
1817         if (source == NULL) {
1818             continue;
1819         }
1820 
1821         const char *mime;
1822         sp<MetaData> format  = source->getFormat();
1823         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1824                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1825 
1826         sp<ABuffer> accessUnit;
1827         status_t finalResult;
1828         while (source->hasBufferAvailable(&finalResult)
1829                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1830 
1831             int64_t timeUs;
1832             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1833 
1834             if (mStartup) {
1835                 bool startTimeReached = isStartTimeReached(timeUs);
1836 
1837                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1838                     // buffer up to the closest preceding IDR frame in the next segement,
1839                     // or the closest succeeding IDR frame after the exact position
1840                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1841                             (long long)timeUs,
1842                             (long long)mStartTimeUs,
1843                             (long long)timeUs - mStartTimeUs,
1844                             mIDRFound);
1845                     if (isAvc) {
1846                         if (IsIDR(accessUnit->data(), accessUnit->size())) {
1847                             mVideoBuffer->clear();
1848                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1849                             mIDRFound = true;
1850                         }
1851                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1852                             mVideoBuffer->queueAccessUnit(accessUnit);
1853                             FSLOGV(stream, "saving AVC video AccessUnit");
1854                         }
1855                     }
1856                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1857                         continue;
1858                     }
1859                 }
1860             }
1861 
1862             if (mStartTimeUsNotify != NULL) {
1863                 uint32_t streamMask = 0;
1864                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1865                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1866                         && !(streamMask & mPacketSources.keyAt(i))) {
1867                     streamMask |= mPacketSources.keyAt(i);
1868                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1869                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1870                             (long long)timeUs, streamMask);
1871 
1872                     if (streamMask == mStreamTypeMask) {
1873                         FLOGV("found start point for all streams");
1874                         mStartup = false;
1875                     }
1876                 }
1877             }
1878 
1879             if (mStopParams != NULL) {
1880                 int32_t discontinuitySeq;
1881                 int64_t stopTimeUs;
1882                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1883                         || discontinuitySeq > mDiscontinuitySeq
1884                         || !mStopParams->findInt64(key, &stopTimeUs)
1885                         || (discontinuitySeq == mDiscontinuitySeq
1886                                 && timeUs >= stopTimeUs)) {
1887                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1888                     mStreamTypeMask &= ~stream;
1889                     mPacketSources.removeItemsAt(i);
1890                     break;
1891                 }
1892             }
1893 
1894             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1895                 const bool discard = true;
1896                 status_t status;
1897                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1898                     sp<ABuffer> videoBuffer;
1899                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1900                     setAccessUnitProperties(videoBuffer, source, discard);
1901                     packetSource->queueAccessUnit(videoBuffer);
1902                     int64_t bufferTimeUs;
1903                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1904                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1905                             (long long)bufferTimeUs);
1906                 }
1907             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1908                 mHasMetadata = true;
1909                 sp<AMessage> notify = mNotify->dup();
1910                 notify->setInt32("what", kWhatMetadataDetected);
1911                 notify->post();
1912             }
1913 
1914             setAccessUnitProperties(accessUnit, source);
1915             packetSource->queueAccessUnit(accessUnit);
1916             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1917         }
1918 
1919         if (err != OK) {
1920             break;
1921         }
1922     }
1923 
1924     if (err != OK) {
1925         for (size_t i = mPacketSources.size(); i > 0;) {
1926             i--;
1927             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1928             packetSource->clear();
1929         }
1930         return err;
1931     }
1932 
1933     if (!mStreamTypeMask) {
1934         // Signal gap is filled between original and new stream.
1935         FLOGV("reached stop point for all streams");
1936         return ERROR_OUT_OF_RANGE;
1937     }
1938 
1939     return OK;
1940 }
1941 
1942 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1943 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1944         const sp<ABuffer> &buffer) {
1945     size_t pos = 0;
1946 
1947     // skip possible BOM
1948     if (buffer->size() >= pos + 3 &&
1949             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1950         pos += 3;
1951     }
1952 
1953     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1954     if (buffer->size() < pos + 6 ||
1955             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1956         return false;
1957     }
1958     pos += 6;
1959 
1960     if (buffer->size() == pos) {
1961         return true;
1962     }
1963 
1964     uint8_t sep = buffer->data()[pos];
1965     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1966 }
1967 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1968 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1969         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1970     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1971         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1972             ALOGE("This stream only contains subtitles.");
1973             return ERROR_MALFORMED;
1974         }
1975 
1976         const sp<AnotherPacketSource> packetSource =
1977             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1978 
1979         int64_t durationUs;
1980         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1981         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1982         buffer->meta()->setInt64("durationUs", durationUs);
1983         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1984         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1985         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1986         packetSource->queueAccessUnit(buffer);
1987         return OK;
1988     }
1989 
1990     if (mNextPTSTimeUs >= 0ll) {
1991         mNextPTSTimeUs = -1ll;
1992     }
1993 
1994     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1995     // stream prefixed by an ID3 tag.
1996 
1997     bool firstID3Tag = true;
1998     uint64_t PTS = 0;
1999 
2000     for (;;) {
2001         // Make sure to skip all ID3 tags preceding the audio data.
2002         // At least one must be present to provide the PTS timestamp.
2003 
2004         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2005         if (!id3.isValid()) {
2006             if (firstID3Tag) {
2007                 ALOGE("Unable to parse ID3 tag.");
2008                 return ERROR_MALFORMED;
2009             } else {
2010                 break;
2011             }
2012         }
2013 
2014         if (firstID3Tag) {
2015             bool found = false;
2016 
2017             ID3::Iterator it(id3, "PRIV");
2018             while (!it.done()) {
2019                 size_t length;
2020                 const uint8_t *data = it.getData(&length);
2021                 if (!data) {
2022                     return ERROR_MALFORMED;
2023                 }
2024 
2025                 static const char *kMatchName =
2026                     "com.apple.streaming.transportStreamTimestamp";
2027                 static const size_t kMatchNameLen = strlen(kMatchName);
2028 
2029                 if (length == kMatchNameLen + 1 + 8
2030                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2031                     found = true;
2032                     PTS = U64_AT(&data[kMatchNameLen + 1]);
2033                 }
2034 
2035                 it.next();
2036             }
2037 
2038             if (!found) {
2039                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2040                 return ERROR_MALFORMED;
2041             }
2042         }
2043 
2044         // skip the ID3 tag
2045         buffer->setRange(
2046                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2047 
2048         firstID3Tag = false;
2049     }
2050 
2051     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2052         ALOGW("This stream only contains audio data!");
2053 
2054         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2055 
2056         if (mStreamTypeMask == 0) {
2057             return OK;
2058         }
2059     }
2060 
2061     sp<AnotherPacketSource> packetSource =
2062         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2063 
2064     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2065         ABitReader bits(buffer->data(), buffer->size());
2066 
2067         // adts_fixed_header
2068 
2069         CHECK_EQ(bits.getBits(12), 0xfffu);
2070         bits.skipBits(3);  // ID, layer
2071         bool protection_absent __unused = bits.getBits(1) != 0;
2072 
2073         unsigned profile = bits.getBits(2);
2074         CHECK_NE(profile, 3u);
2075         unsigned sampling_freq_index = bits.getBits(4);
2076         bits.getBits(1);  // private_bit
2077         unsigned channel_configuration = bits.getBits(3);
2078         CHECK_NE(channel_configuration, 0u);
2079         bits.skipBits(2);  // original_copy, home
2080 
2081         sp<MetaData> meta = new MetaData();
2082         MakeAACCodecSpecificData(*meta,
2083                 profile, sampling_freq_index, channel_configuration);
2084 
2085         meta->setInt32(kKeyIsADTS, true);
2086 
2087         packetSource->setFormat(meta);
2088     }
2089 
2090     int64_t numSamples = 0ll;
2091     int32_t sampleRate;
2092     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2093 
2094     int64_t timeUs = (PTS * 100ll) / 9ll;
2095     if (mStartup && !mFirstPTSValid) {
2096         mFirstPTSValid = true;
2097         mFirstTimeUs = timeUs;
2098     }
2099 
2100     if (mSegmentFirstPTS < 0ll) {
2101         mSegmentFirstPTS = timeUs;
2102         if (!mStartTimeUsRelative) {
2103             // Duplicated logic from how we handle .ts playlists.
2104             if (mStartup && mSegmentStartTimeUs >= 0
2105                     && adjustSeqNumberWithAnchorTime(timeUs)) {
2106                 mSegmentStartTimeUs = -1;
2107                 return -EAGAIN;
2108             }
2109         }
2110     }
2111 
2112     sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2113     if (mSampleAesKeyItem != NULL) {
2114         ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s  IV: %s",
2115                 mSeqNumber,
2116                 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2117                 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2118 
2119         sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2120     }
2121 
2122     int frameId = 0;
2123 
2124     size_t offset = 0;
2125     while (offset < buffer->size()) {
2126         const uint8_t *adtsHeader = buffer->data() + offset;
2127         CHECK_LT(offset + 5, buffer->size());
2128         // non-const pointer for decryption if needed
2129         uint8_t *adtsFrame = buffer->data() + offset;
2130 
2131         unsigned aac_frame_length =
2132             ((adtsHeader[3] & 3) << 11)
2133             | (adtsHeader[4] << 3)
2134             | (adtsHeader[5] >> 5);
2135 
2136         if (aac_frame_length == 0) {
2137             const uint8_t *id3Header = adtsHeader;
2138             if (!memcmp(id3Header, "ID3", 3)) {
2139                 ID3 id3(id3Header, buffer->size() - offset, true);
2140                 if (id3.isValid()) {
2141                     offset += id3.rawSize();
2142                     continue;
2143                 };
2144             }
2145             return ERROR_MALFORMED;
2146         }
2147 
2148         CHECK_LE(offset + aac_frame_length, buffer->size());
2149 
2150         int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2151         offset += aac_frame_length;
2152 
2153         // Each AAC frame encodes 1024 samples.
2154         numSamples += 1024;
2155 
2156         if (mStartup) {
2157             int64_t startTimeUs = unitTimeUs;
2158             if (mStartTimeUsRelative) {
2159                 startTimeUs -= mFirstTimeUs;
2160                 if (startTimeUs  < 0) {
2161                     startTimeUs = 0;
2162                 }
2163             }
2164             if (startTimeUs < mStartTimeUs) {
2165                 continue;
2166             }
2167 
2168             if (mStartTimeUsNotify != NULL) {
2169                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2170                 mStartup = false;
2171             }
2172         }
2173 
2174         if (mStopParams != NULL) {
2175             int32_t discontinuitySeq;
2176             int64_t stopTimeUs;
2177             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2178                     || discontinuitySeq > mDiscontinuitySeq
2179                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2180                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2181                 mStreamTypeMask = 0;
2182                 mPacketSources.clear();
2183                 return ERROR_OUT_OF_RANGE;
2184             }
2185         }
2186 
2187         if (sampleDecryptor != NULL) {
2188             bool protection_absent = (adtsHeader[1] & 0x1);
2189             size_t headerSize = protection_absent ? 7 : 9;
2190             if (frameId == 0) {
2191                 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2192                         mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2193             }
2194 
2195             sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2196         }
2197         frameId++;
2198 
2199         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2200         memcpy(unit->data(), adtsHeader, aac_frame_length);
2201 
2202         unit->meta()->setInt64("timeUs", unitTimeUs);
2203         setAccessUnitProperties(unit, packetSource);
2204         packetSource->queueAccessUnit(unit);
2205     }
2206 
2207     return OK;
2208 }
2209 
updateDuration()2210 void PlaylistFetcher::updateDuration() {
2211     int64_t durationUs = 0ll;
2212     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2213         sp<AMessage> itemMeta;
2214         CHECK(mPlaylist->itemAt(
2215                     index, NULL /* uri */, &itemMeta));
2216 
2217         int64_t itemDurationUs;
2218         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2219 
2220         durationUs += itemDurationUs;
2221     }
2222 
2223     sp<AMessage> msg = mNotify->dup();
2224     msg->setInt32("what", kWhatDurationUpdate);
2225     msg->setInt64("durationUs", durationUs);
2226     msg->post();
2227 }
2228 
updateTargetDuration()2229 void PlaylistFetcher::updateTargetDuration() {
2230     sp<AMessage> msg = mNotify->dup();
2231     msg->setInt32("what", kWhatTargetDurationUpdate);
2232     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2233     msg->post();
2234 }
2235 
2236 }  // namespace android
2237