1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <android-base/macros.h>
20 #include <utils/Log.h>
21 #include <utils/misc.h>
22 
23 #include "PlaylistFetcher.h"
24 #include "HTTPDownloader.h"
25 #include "LiveSession.h"
26 #include "M3UParser.h"
27 #include <ID3.h>
28 #include <mpeg2ts/AnotherPacketSource.h>
29 #include <mpeg2ts/HlsSampleDecryptor.h>
30 
31 #include <datasource/DataURISource.h>
32 #include <media/stagefright/foundation/ABitReader.h>
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/ByteUtils.h>
36 #include <media/stagefright/foundation/MediaKeys.h>
37 #include <media/stagefright/foundation/avc_utils.h>
38 #include <media/stagefright/MediaDefs.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/MetaDataUtils.h>
41 #include <media/stagefright/Utils.h>
42 #include <media/stagefright/FoundationUtils.h>
43 
44 #include <ctype.h>
45 #include <inttypes.h>
46 
47 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
48 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
49          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
50 
51 namespace android {
52 
53 // static
54 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000LL;
55 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000LL;
56 // LCM of 188 (size of a TS packet) & 1k works well
57 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
58 
59 struct PlaylistFetcher::DownloadState : public RefBase {
60     DownloadState();
61     void resetState();
62     bool hasSavedState() const;
63     void restoreState(
64             AString &uri,
65             sp<AMessage> &itemMeta,
66             sp<ABuffer> &buffer,
67             sp<ABuffer> &tsBuffer,
68             int32_t &firstSeqNumberInPlaylist,
69             int32_t &lastSeqNumberInPlaylist);
70     void saveState(
71             AString &uri,
72             sp<AMessage> &itemMeta,
73             sp<ABuffer> &buffer,
74             sp<ABuffer> &tsBuffer,
75             int32_t &firstSeqNumberInPlaylist,
76             int32_t &lastSeqNumberInPlaylist);
77 
78 private:
79     bool mHasSavedState;
80     AString mUri;
81     sp<AMessage> mItemMeta;
82     sp<ABuffer> mBuffer;
83     sp<ABuffer> mTsBuffer;
84     int32_t mFirstSeqNumberInPlaylist;
85     int32_t mLastSeqNumberInPlaylist;
86 };
87 
DownloadState()88 PlaylistFetcher::DownloadState::DownloadState() {
89     resetState();
90 }
91 
hasSavedState() const92 bool PlaylistFetcher::DownloadState::hasSavedState() const {
93     return mHasSavedState;
94 }
95 
resetState()96 void PlaylistFetcher::DownloadState::resetState() {
97     mHasSavedState = false;
98 
99     mUri.clear();
100     mItemMeta = NULL;
101     mBuffer = NULL;
102     mTsBuffer = NULL;
103     mFirstSeqNumberInPlaylist = 0;
104     mLastSeqNumberInPlaylist = 0;
105 }
106 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)107 void PlaylistFetcher::DownloadState::restoreState(
108         AString &uri,
109         sp<AMessage> &itemMeta,
110         sp<ABuffer> &buffer,
111         sp<ABuffer> &tsBuffer,
112         int32_t &firstSeqNumberInPlaylist,
113         int32_t &lastSeqNumberInPlaylist) {
114     if (!mHasSavedState) {
115         return;
116     }
117 
118     uri = mUri;
119     itemMeta = mItemMeta;
120     buffer = mBuffer;
121     tsBuffer = mTsBuffer;
122     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
123     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
124 
125     resetState();
126 }
127 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)128 void PlaylistFetcher::DownloadState::saveState(
129         AString &uri,
130         sp<AMessage> &itemMeta,
131         sp<ABuffer> &buffer,
132         sp<ABuffer> &tsBuffer,
133         int32_t &firstSeqNumberInPlaylist,
134         int32_t &lastSeqNumberInPlaylist) {
135     mHasSavedState = true;
136 
137     mUri = uri;
138     mItemMeta = itemMeta;
139     mBuffer = buffer;
140     mTsBuffer = tsBuffer;
141     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
142     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
143 }
144 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)145 PlaylistFetcher::PlaylistFetcher(
146         const sp<AMessage> &notify,
147         const sp<LiveSession> &session,
148         const char *uri,
149         int32_t id,
150         int32_t subtitleGeneration)
151     : mNotify(notify),
152       mSession(session),
153       mURI(uri),
154       mFetcherID(id),
155       mStreamTypeMask(0),
156       mStartTimeUs(-1LL),
157       mSegmentStartTimeUs(-1LL),
158       mDiscontinuitySeq(-1LL),
159       mStartTimeUsRelative(false),
160       mLastPlaylistFetchTimeUs(-1LL),
161       mPlaylistTimeUs(-1LL),
162       mSeqNumber(-1),
163       mNumRetries(0),
164       mNumRetriesForMonitorQueue(0),
165       mStartup(true),
166       mIDRFound(false),
167       mSeekMode(LiveSession::kSeekModeExactPosition),
168       mTimeChangeSignaled(false),
169       mNextPTSTimeUs(-1LL),
170       mMonitorQueueGeneration(0),
171       mSubtitleGeneration(subtitleGeneration),
172       mLastDiscontinuitySeq(-1LL),
173       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
174       mFirstPTSValid(false),
175       mFirstTimeUs(-1LL),
176       mVideoBuffer(new AnotherPacketSource(NULL)),
177       mSampleAesKeyItemChanged(false),
178       mThresholdRatio(-1.0f),
179       mDownloadState(new DownloadState()),
180       mHasMetadata(false) {
181     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
182     mHTTPDownloader = mSession->getHTTPDownloader();
183 
184     memset(mKeyData, 0, sizeof(mKeyData));
185     memset(mAESInitVec, 0, sizeof(mAESInitVec));
186 }
187 
~PlaylistFetcher()188 PlaylistFetcher::~PlaylistFetcher() {
189 }
190 
getFetcherID() const191 int32_t PlaylistFetcher::getFetcherID() const {
192     return mFetcherID;
193 }
194 
getSegmentStartTimeUs(int32_t seqNumber) const195 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
196     CHECK(mPlaylist != NULL);
197 
198     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
199     mPlaylist->getSeqNumberRange(
200             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
201 
202     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
203     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
204 
205     int64_t segmentStartUs = 0LL;
206     for (int32_t index = 0;
207             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
208         sp<AMessage> itemMeta;
209         CHECK(mPlaylist->itemAt(
210                     index, NULL /* uri */, &itemMeta));
211 
212         int64_t itemDurationUs;
213         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
214 
215         segmentStartUs += itemDurationUs;
216     }
217 
218     return segmentStartUs;
219 }
220 
getSegmentDurationUs(int32_t seqNumber) const221 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
222     CHECK(mPlaylist != NULL);
223 
224     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
225     mPlaylist->getSeqNumberRange(
226             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
227 
228     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
229     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
230 
231     int32_t index = seqNumber - firstSeqNumberInPlaylist;
232     sp<AMessage> itemMeta;
233     CHECK(mPlaylist->itemAt(
234                 index, NULL /* uri */, &itemMeta));
235 
236     int64_t itemDurationUs;
237     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
238 
239     return itemDurationUs;
240 }
241 
delayUsToRefreshPlaylist() const242 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
243     int64_t nowUs = ALooper::GetNowUs();
244 
245     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0LL) {
246         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
247         return 0LL;
248     }
249 
250     if (mPlaylist->isComplete()) {
251         return (~0LLU >> 1);
252     }
253 
254     int64_t targetDurationUs = mPlaylist->getTargetDuration();
255 
256     int64_t minPlaylistAgeUs;
257 
258     switch (mRefreshState) {
259         case INITIAL_MINIMUM_RELOAD_DELAY:
260         {
261             size_t n = mPlaylist->size();
262             if (n > 0) {
263                 sp<AMessage> itemMeta;
264                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
265 
266                 int64_t itemDurationUs;
267                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
268 
269                 minPlaylistAgeUs = itemDurationUs;
270                 break;
271             }
272 
273             FALLTHROUGH_INTENDED;
274         }
275 
276         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
277         {
278             minPlaylistAgeUs = targetDurationUs / 2;
279             break;
280         }
281 
282         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
283         {
284             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
285             break;
286         }
287 
288         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
289         {
290             minPlaylistAgeUs = targetDurationUs * 3;
291             break;
292         }
293 
294         default:
295             TRESPASS();
296             break;
297     }
298 
299     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
300     return delayUs > 0LL ? delayUs : 0LL;
301 }
302 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)303 status_t PlaylistFetcher::decryptBuffer(
304         size_t playlistIndex, const sp<ABuffer> &buffer,
305         bool first) {
306     sp<AMessage> itemMeta;
307     bool found = false;
308     AString method;
309 
310     for (ssize_t i = playlistIndex; i >= 0; --i) {
311         AString uri;
312         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
313 
314         if (itemMeta->findString("cipher-method", &method)) {
315             found = true;
316             break;
317         }
318     }
319 
320     // TODO: Revise this when we add support for KEYFORMAT
321     // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
322     if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
323         ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
324                 mSampleAesKeyItem.get(), method.c_str());
325         mSampleAesKeyItem = NULL;
326         mSampleAesKeyItemChanged = true;
327     }
328 
329     if (!found) {
330         method = "NONE";
331     }
332     buffer->meta()->setString("cipher-method", method.c_str());
333 
334     if (method == "NONE") {
335         return OK;
336     } else if (method == "SAMPLE-AES") {
337         ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
338     } else if (!(method == "AES-128")) {
339         ALOGE("Unsupported cipher method '%s'", method.c_str());
340         return ERROR_UNSUPPORTED;
341     }
342 
343     AString keyURI;
344     if (!itemMeta->findString("cipher-uri", &keyURI)) {
345         ALOGE("Missing key uri");
346         return ERROR_MALFORMED;
347     }
348     keyURI = mPlaylist->getFullCipherUri(keyURI);
349 
350     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
351 
352     sp<ABuffer> key;
353     if (index >= 0) {
354         key = mAESKeyForURI.valueAt(index);
355     } else if (keyURI.startsWith("data:")) {
356         sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
357         off64_t keyLen;
358         if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
359             ALOGE("Malformed cipher key data uri.");
360             return ERROR_MALFORMED;
361         }
362         key = new ABuffer(keyLen);
363         keySrc->readAt(0, key->data(), keyLen);
364         key->setRange(0, keyLen);
365     } else {
366         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
367 
368         if (err == ERROR_NOT_CONNECTED) {
369             return ERROR_NOT_CONNECTED;
370         } else if (err < 0) {
371             ALOGE("failed to fetch cipher key from '%s'.", uriDebugString(keyURI).c_str());
372             return ERROR_IO;
373         } else if (key->size() != 16) {
374             ALOGE("key file '%s' wasn't 16 bytes in size.", uriDebugString(keyURI).c_str());
375             return ERROR_MALFORMED;
376         }
377 
378         mAESKeyForURI.add(keyURI, key);
379     }
380 
381     if (first) {
382         // If decrypting the first block in a file, read the iv from the manifest
383         // or derive the iv from the file's sequence number.
384 
385         unsigned char AESInitVec[AES_BLOCK_SIZE];
386         AString iv;
387         if (itemMeta->findString("cipher-iv", &iv)) {
388             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
389                     || iv.size() > 16 * 2 + 2) {
390                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
391                 return ERROR_MALFORMED;
392             }
393 
394             while (iv.size() < 16 * 2 + 2) {
395                 iv.insert("0", 1, 2);
396             }
397 
398             memset(AESInitVec, 0, sizeof(AESInitVec));
399             for (size_t i = 0; i < 16; ++i) {
400                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
401                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
402                 if (!isxdigit(c1) || !isxdigit(c2)) {
403                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
404                     return ERROR_MALFORMED;
405                 }
406                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
407                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
408 
409                 AESInitVec[i] = nibble1 << 4 | nibble2;
410             }
411         } else {
412             memset(AESInitVec, 0, sizeof(AESInitVec));
413             AESInitVec[15] = mSeqNumber & 0xff;
414             AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
415             AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
416             AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
417         }
418 
419         bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
420         bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
421         bool newSampleAesKeyItem = newKey || newInitVec;
422         ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
423                 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
424 
425         if (newSampleAesKeyItem) {
426             memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
427             memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
428 
429             if (method == "SAMPLE-AES") {
430                 mSampleAesKeyItemChanged = true;
431 
432                 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
433                 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
434 
435                 // always allocating a new one rather than updating the old message
436                 // lower layer might still have a reference to the old message
437                 mSampleAesKeyItem = new AMessage();
438                 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
439                 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
440 
441                 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s  IV: %s",
442                         HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
443                         HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
444             } // SAMPLE-AES
445         } // newSampleAesKeyItem
446     } // first
447 
448     if (method == "SAMPLE-AES") {
449         ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
450         return OK;
451     }
452 
453 
454     AES_KEY aes_key;
455     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
456         ALOGE("failed to set AES decryption key.");
457         return UNKNOWN_ERROR;
458     }
459 
460     size_t n = buffer->size();
461     if (!n) {
462         return OK;
463     }
464 
465     if (n < 16 || n % 16) {
466         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
467         return ERROR_MALFORMED;
468     }
469 
470     AES_cbc_encrypt(
471             buffer->data(), buffer->data(), buffer->size(),
472             &aes_key, mAESInitVec, AES_DECRYPT);
473 
474     return OK;
475 }
476 
checkDecryptPadding(const sp<ABuffer> & buffer)477 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
478     AString method;
479     CHECK(buffer->meta()->findString("cipher-method", &method));
480     if (method == "NONE" || method == "SAMPLE-AES") {
481         return OK;
482     }
483 
484     uint8_t padding = 0;
485     if (buffer->size() > 0) {
486         padding = buffer->data()[buffer->size() - 1];
487     }
488 
489     if (padding > 16) {
490         return ERROR_MALFORMED;
491     }
492 
493     for (size_t i = buffer->size() - padding; i < padding; i++) {
494         if (buffer->data()[i] != padding) {
495             return ERROR_MALFORMED;
496         }
497     }
498 
499     buffer->setRange(buffer->offset(), buffer->size() - padding);
500     return OK;
501 }
502 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)503 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
504     int64_t maxDelayUs = delayUsToRefreshPlaylist();
505     if (maxDelayUs < minDelayUs) {
506         maxDelayUs = minDelayUs;
507     }
508     if (delayUs > maxDelayUs) {
509         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
510         delayUs = maxDelayUs;
511     }
512     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
513     msg->setInt32("generation", mMonitorQueueGeneration);
514     msg->post(delayUs);
515 }
516 
cancelMonitorQueue()517 void PlaylistFetcher::cancelMonitorQueue() {
518     ++mMonitorQueueGeneration;
519 }
520 
setStoppingThreshold(float thresholdRatio,bool disconnect)521 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
522     {
523         AutoMutex _l(mThresholdLock);
524         mThresholdRatio = thresholdRatio;
525     }
526     if (disconnect) {
527         mHTTPDownloader->disconnect();
528     }
529 }
530 
resetStoppingThreshold(bool disconnect)531 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
532     {
533         AutoMutex _l(mThresholdLock);
534         mThresholdRatio = -1.0f;
535     }
536     if (disconnect) {
537         mHTTPDownloader->disconnect();
538     } else {
539         // allow reconnect
540         mHTTPDownloader->reconnect();
541     }
542 }
543 
getStoppingThreshold()544 float PlaylistFetcher::getStoppingThreshold() {
545     AutoMutex _l(mThresholdLock);
546     return mThresholdRatio;
547 }
548 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)549 void PlaylistFetcher::startAsync(
550         const sp<AnotherPacketSource> &audioSource,
551         const sp<AnotherPacketSource> &videoSource,
552         const sp<AnotherPacketSource> &subtitleSource,
553         const sp<AnotherPacketSource> &metadataSource,
554         int64_t startTimeUs,
555         int64_t segmentStartTimeUs,
556         int32_t startDiscontinuitySeq,
557         LiveSession::SeekMode seekMode) {
558     sp<AMessage> msg = new AMessage(kWhatStart, this);
559 
560     uint32_t streamTypeMask = 0ul;
561 
562     if (audioSource != NULL) {
563         msg->setPointer("audioSource", audioSource.get());
564         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
565     }
566 
567     if (videoSource != NULL) {
568         msg->setPointer("videoSource", videoSource.get());
569         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
570     }
571 
572     if (subtitleSource != NULL) {
573         msg->setPointer("subtitleSource", subtitleSource.get());
574         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
575     }
576 
577     if (metadataSource != NULL) {
578         msg->setPointer("metadataSource", metadataSource.get());
579         // metadataSource does not affect streamTypeMask.
580     }
581 
582     msg->setInt32("streamTypeMask", streamTypeMask);
583     msg->setInt64("startTimeUs", startTimeUs);
584     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
585     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
586     msg->setInt32("seekMode", seekMode);
587     msg->post();
588 }
589 
590 /*
591  * pauseAsync
592  *
593  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
594  *           -1.0f - pause after finishing current segment
595  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
596  */
pauseAsync(float thresholdRatio,bool disconnect)597 void PlaylistFetcher::pauseAsync(
598         float thresholdRatio, bool disconnect) {
599     setStoppingThreshold(thresholdRatio, disconnect);
600 
601     (new AMessage(kWhatPause, this))->post();
602 }
603 
stopAsync(bool clear)604 void PlaylistFetcher::stopAsync(bool clear) {
605     setStoppingThreshold(0.0f, true /* disconncect */);
606 
607     sp<AMessage> msg = new AMessage(kWhatStop, this);
608     msg->setInt32("clear", clear);
609     msg->post();
610 }
611 
resumeUntilAsync(const sp<AMessage> & params)612 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
613     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
614 
615     AMessage* msg = new AMessage(kWhatResumeUntil, this);
616     msg->setMessage("params", params);
617     msg->post();
618 }
619 
fetchPlaylistAsync()620 void PlaylistFetcher::fetchPlaylistAsync() {
621     (new AMessage(kWhatFetchPlaylist, this))->post();
622 }
623 
onMessageReceived(const sp<AMessage> & msg)624 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
625     switch (msg->what()) {
626         case kWhatStart:
627         {
628             status_t err = onStart(msg);
629 
630             sp<AMessage> notify = mNotify->dup();
631             notify->setInt32("what", kWhatStarted);
632             notify->setInt32("err", err);
633             notify->post();
634             break;
635         }
636 
637         case kWhatPause:
638         {
639             onPause();
640 
641             sp<AMessage> notify = mNotify->dup();
642             notify->setInt32("what", kWhatPaused);
643             notify->setInt32("seekMode",
644                     mDownloadState->hasSavedState()
645                     ? LiveSession::kSeekModeNextSample
646                     : LiveSession::kSeekModeNextSegment);
647             notify->post();
648             break;
649         }
650 
651         case kWhatStop:
652         {
653             onStop(msg);
654 
655             sp<AMessage> notify = mNotify->dup();
656             notify->setInt32("what", kWhatStopped);
657             notify->post();
658             break;
659         }
660 
661         case kWhatFetchPlaylist:
662         {
663             bool unchanged;
664             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
665                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
666 
667             sp<AMessage> notify = mNotify->dup();
668             notify->setInt32("what", kWhatPlaylistFetched);
669             notify->setObject("playlist", playlist);
670             notify->post();
671             break;
672         }
673 
674         case kWhatMonitorQueue:
675         case kWhatDownloadNext:
676         {
677             int32_t generation;
678             CHECK(msg->findInt32("generation", &generation));
679 
680             if (generation != mMonitorQueueGeneration) {
681                 // Stale event
682                 break;
683             }
684 
685             if (msg->what() == kWhatMonitorQueue) {
686                 onMonitorQueue();
687             } else {
688                 onDownloadNext();
689             }
690             break;
691         }
692 
693         case kWhatResumeUntil:
694         {
695             onResumeUntil(msg);
696             break;
697         }
698 
699         default:
700             TRESPASS();
701     }
702 }
703 
onStart(const sp<AMessage> & msg)704 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
705     mPacketSources.clear();
706     mStopParams.clear();
707     mStartTimeUsNotify = mNotify->dup();
708     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
709     mStartTimeUsNotify->setString("uri", mURI);
710 
711     uint32_t streamTypeMask;
712     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
713 
714     int64_t startTimeUs;
715     int64_t segmentStartTimeUs;
716     int32_t startDiscontinuitySeq;
717     int32_t seekMode;
718     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
719     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
720     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
721     CHECK(msg->findInt32("seekMode", &seekMode));
722 
723     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
724         void *ptr;
725         CHECK(msg->findPointer("audioSource", &ptr));
726 
727         mPacketSources.add(
728                 LiveSession::STREAMTYPE_AUDIO,
729                 static_cast<AnotherPacketSource *>(ptr));
730     }
731 
732     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
733         void *ptr;
734         CHECK(msg->findPointer("videoSource", &ptr));
735 
736         mPacketSources.add(
737                 LiveSession::STREAMTYPE_VIDEO,
738                 static_cast<AnotherPacketSource *>(ptr));
739     }
740 
741     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
742         void *ptr;
743         CHECK(msg->findPointer("subtitleSource", &ptr));
744 
745         mPacketSources.add(
746                 LiveSession::STREAMTYPE_SUBTITLES,
747                 static_cast<AnotherPacketSource *>(ptr));
748     }
749 
750     void *ptr;
751     // metadataSource is not part of streamTypeMask
752     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
753             && msg->findPointer("metadataSource", &ptr)) {
754         mPacketSources.add(
755                 LiveSession::STREAMTYPE_METADATA,
756                 static_cast<AnotherPacketSource *>(ptr));
757     }
758 
759     mStreamTypeMask = streamTypeMask;
760 
761     mSegmentStartTimeUs = segmentStartTimeUs;
762 
763     if (startDiscontinuitySeq >= 0) {
764         mDiscontinuitySeq = startDiscontinuitySeq;
765     }
766 
767     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
768     mSeekMode = (LiveSession::SeekMode) seekMode;
769 
770     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
771         mStartup = true;
772         mIDRFound = false;
773         mVideoBuffer->clear();
774     }
775 
776     if (startTimeUs >= 0) {
777         mStartTimeUs = startTimeUs;
778         mFirstPTSValid = false;
779         mSeqNumber = -1;
780         mTimeChangeSignaled = false;
781         mDownloadState->resetState();
782     }
783 
784     postMonitorQueue();
785 
786     return OK;
787 }
788 
onPause()789 void PlaylistFetcher::onPause() {
790     cancelMonitorQueue();
791     mLastDiscontinuitySeq = mDiscontinuitySeq;
792 
793     resetStoppingThreshold(false /* disconnect */);
794 }
795 
onStop(const sp<AMessage> & msg)796 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
797     cancelMonitorQueue();
798 
799     int32_t clear;
800     CHECK(msg->findInt32("clear", &clear));
801     if (clear) {
802         for (size_t i = 0; i < mPacketSources.size(); i++) {
803             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
804             packetSource->clear();
805         }
806     }
807 
808     mDownloadState->resetState();
809     mPacketSources.clear();
810     mStreamTypeMask = 0;
811 
812     resetStoppingThreshold(true /* disconnect */);
813 }
814 
815 // Resume until we have reached the boundary timestamps listed in `msg`; when
816 // the remaining time is too short (within a resume threshold) stop immediately
817 // instead.
onResumeUntil(const sp<AMessage> & msg)818 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
819     sp<AMessage> params;
820     CHECK(msg->findMessage("params", &params));
821 
822     mStopParams = params;
823     onDownloadNext();
824 
825     return OK;
826 }
827 
notifyStopReached()828 void PlaylistFetcher::notifyStopReached() {
829     sp<AMessage> notify = mNotify->dup();
830     notify->setInt32("what", kWhatStopReached);
831     notify->post();
832 }
833 
notifyError(status_t err)834 void PlaylistFetcher::notifyError(status_t err) {
835     sp<AMessage> notify = mNotify->dup();
836     notify->setInt32("what", kWhatError);
837     notify->setInt32("err", err);
838     notify->post();
839 }
840 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)841 void PlaylistFetcher::queueDiscontinuity(
842         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
843     for (size_t i = 0; i < mPacketSources.size(); ++i) {
844         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
845         // (seek will discard buffer by abandoning old fetchers)
846         mPacketSources.valueAt(i)->queueDiscontinuity(
847                 type, extra, false /* discard */);
848     }
849 }
850 
onMonitorQueue()851 void PlaylistFetcher::onMonitorQueue() {
852     // in the middle of an unfinished download, delay
853     // playlist refresh as it'll change seq numbers
854     if (!mDownloadState->hasSavedState()) {
855         status_t err = refreshPlaylist();
856         if (err != OK) {
857             if (mNumRetriesForMonitorQueue < kMaxNumRetries) {
858                 ++mNumRetriesForMonitorQueue;
859             } else {
860                 notifyError(err);
861             }
862             return;
863         } else {
864             mNumRetriesForMonitorQueue = 0;
865         }
866     }
867 
868     int64_t targetDurationUs = kMinBufferedDurationUs;
869     if (mPlaylist != NULL) {
870         targetDurationUs = mPlaylist->getTargetDuration();
871     }
872 
873     int64_t bufferedDurationUs = 0LL;
874     status_t finalResult = OK;
875     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
876         sp<AnotherPacketSource> packetSource =
877             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
878 
879         bufferedDurationUs =
880                 packetSource->getBufferedDurationUs(&finalResult);
881     } else {
882         // Use min stream duration, but ignore streams that never have any packet
883         // enqueued to prevent us from waiting on a non-existent stream;
884         // when we cannot make out from the manifest what streams are included in
885         // a playlist we might assume extra streams.
886         bufferedDurationUs = -1LL;
887         for (size_t i = 0; i < mPacketSources.size(); ++i) {
888             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
889                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
890                 continue;
891             }
892 
893             int64_t bufferedStreamDurationUs =
894                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
895 
896             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
897 
898             if (bufferedDurationUs == -1LL
899                  || bufferedStreamDurationUs < bufferedDurationUs) {
900                 bufferedDurationUs = bufferedStreamDurationUs;
901             }
902         }
903         if (bufferedDurationUs == -1LL) {
904             bufferedDurationUs = 0LL;
905         }
906     }
907 
908     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
909         FLOGV("monitoring, buffered=%lld < %lld",
910                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
911 
912         // delay the next download slightly; hopefully this gives other concurrent fetchers
913         // a better chance to run.
914         // onDownloadNext();
915         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
916         msg->setInt32("generation", mMonitorQueueGeneration);
917         msg->post(1000L);
918     } else {
919         // We'd like to maintain buffering above durationToBufferUs, so try
920         // again when buffer just about to go below durationToBufferUs
921         // (or after targetDurationUs / 2, whichever is smaller).
922         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000LL;
923         if (delayUs > targetDurationUs / 2) {
924             delayUs = targetDurationUs / 2;
925         }
926 
927         FLOGV("pausing for %lld, buffered=%lld > %lld",
928                 (long long)delayUs,
929                 (long long)bufferedDurationUs,
930                 (long long)kMinBufferedDurationUs);
931 
932         postMonitorQueue(delayUs);
933     }
934 }
935 
refreshPlaylist()936 status_t PlaylistFetcher::refreshPlaylist() {
937     if (delayUsToRefreshPlaylist() <= 0) {
938         bool unchanged;
939         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
940                 mURI.c_str(), mPlaylistHash, &unchanged);
941 
942         if (playlist == NULL) {
943             if (unchanged) {
944                 // We succeeded in fetching the playlist, but it was
945                 // unchanged from the last time we tried.
946 
947                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
948                     mRefreshState = (RefreshState)(mRefreshState + 1);
949                 }
950             } else {
951                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
952                 return ERROR_IO;
953             }
954         } else {
955             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
956             mPlaylist = playlist;
957 
958             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
959                 updateDuration();
960             }
961             // Notify LiveSession to use target-duration based buffering level
962             // for up/down switch. Default LiveSession::kUpSwitchMark may not
963             // be reachable for live streams, as our max buffering amount is
964             // limited to 3 segments.
965             if (!mPlaylist->isComplete()) {
966                 updateTargetDuration();
967             }
968             mPlaylistTimeUs = ALooper::GetNowUs();
969         }
970 
971         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
972     }
973     return OK;
974 }
975 
976 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)977 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
978     return buffer->size() > 0 && buffer->data()[0] == 0x47;
979 }
980 
shouldPauseDownload()981 bool PlaylistFetcher::shouldPauseDownload() {
982     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
983         // doesn't apply to subtitles
984         return false;
985     }
986 
987     // Calculate threshold to abort current download
988     float thresholdRatio = getStoppingThreshold();
989 
990     if (thresholdRatio < 0.0f) {
991         // never abort
992         return false;
993     } else if (thresholdRatio == 0.0f) {
994         // immediately abort
995         return true;
996     }
997 
998     // now we have a positive thresholdUs, abort if remaining
999     // portion to download is over that threshold.
1000     if (mSegmentFirstPTS < 0) {
1001         // this means we haven't even find the first access unit,
1002         // abort now as we must be very far away from the end.
1003         return true;
1004     }
1005     int64_t lastEnqueueUs = mSegmentFirstPTS;
1006     for (size_t i = 0; i < mPacketSources.size(); ++i) {
1007         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
1008             continue;
1009         }
1010         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1011         int32_t type;
1012         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
1013             continue;
1014         }
1015         int64_t tmpUs;
1016         CHECK(meta->findInt64("timeUs", &tmpUs));
1017         if (tmpUs > lastEnqueueUs) {
1018             lastEnqueueUs = tmpUs;
1019         }
1020     }
1021     lastEnqueueUs -= mSegmentFirstPTS;
1022 
1023     int64_t targetDurationUs = mPlaylist->getTargetDuration();
1024     int64_t thresholdUs = thresholdRatio * targetDurationUs;
1025 
1026     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1027             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1028             (long long)thresholdUs,
1029             (long long)(targetDurationUs - lastEnqueueUs));
1030 
1031     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1032         return true;
1033     }
1034     return false;
1035 }
1036 
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1037 void PlaylistFetcher::initSeqNumberForLiveStream(
1038         int32_t &firstSeqNumberInPlaylist,
1039         int32_t &lastSeqNumberInPlaylist) {
1040     // start at least 3 target durations from the end.
1041     int64_t timeFromEnd = 0;
1042     size_t index = mPlaylist->size();
1043     sp<AMessage> itemMeta;
1044     int64_t itemDurationUs;
1045     int32_t targetDuration;
1046     if (mPlaylist->meta() != NULL
1047             && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1048         do {
1049             --index;
1050             if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1051                     || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1052                 ALOGW("item or itemDurationUs missing");
1053                 mSeqNumber = lastSeqNumberInPlaylist - 3;
1054                 break;
1055             }
1056 
1057             timeFromEnd += itemDurationUs;
1058             mSeqNumber = firstSeqNumberInPlaylist + index;
1059         } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1060     } else {
1061         ALOGW("target-duration missing");
1062         mSeqNumber = lastSeqNumberInPlaylist - 3;
1063     }
1064 
1065     if (mSeqNumber < firstSeqNumberInPlaylist) {
1066         mSeqNumber = firstSeqNumberInPlaylist;
1067     }
1068 }
1069 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1070 bool PlaylistFetcher::initDownloadState(
1071         AString &uri,
1072         sp<AMessage> &itemMeta,
1073         int32_t &firstSeqNumberInPlaylist,
1074         int32_t &lastSeqNumberInPlaylist) {
1075     status_t err = refreshPlaylist();
1076     firstSeqNumberInPlaylist = 0;
1077     lastSeqNumberInPlaylist = 0;
1078     bool discontinuity = false;
1079 
1080     if (mPlaylist != NULL) {
1081         mPlaylist->getSeqNumberRange(
1082                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1083 
1084         if (mDiscontinuitySeq < 0) {
1085             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1086         }
1087     }
1088 
1089     mSegmentFirstPTS = -1LL;
1090 
1091     if (mPlaylist != NULL && mSeqNumber < 0) {
1092         CHECK_GE(mStartTimeUs, 0LL);
1093 
1094         if (mSegmentStartTimeUs < 0) {
1095             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1096                 // this is a live session
1097                 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1098             } else {
1099                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1100                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1101                 // and relative position inside a segment
1102                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1103                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1104             }
1105             mStartTimeUsRelative = true;
1106             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1107                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1108                     lastSeqNumberInPlaylist);
1109         } else {
1110             // When adapting or track switching, mSegmentStartTimeUs (relative
1111             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1112             // timestamps coming from the media container) is used to determine the position
1113             // inside a segments.
1114             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1115                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1116                 // avoid double fetch/decode
1117                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1118                 // for the starting segment in new variant.
1119                 // If the two variants' segments are aligned, this gives the
1120                 // next segment. If they're not aligned, this gives the segment
1121                 // that overlaps no more than 1/2 * targetDurationUs.
1122                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1123                         + mPlaylist->getTargetDuration() / 2);
1124             } else {
1125                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1126             }
1127             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1128             if (mSeqNumber < minSeq) {
1129                 mSeqNumber = minSeq;
1130             }
1131 
1132             if (mSeqNumber < firstSeqNumberInPlaylist) {
1133                 mSeqNumber = firstSeqNumberInPlaylist;
1134             }
1135 
1136             if (mSeqNumber > lastSeqNumberInPlaylist) {
1137                 mSeqNumber = lastSeqNumberInPlaylist;
1138             }
1139             FLOGV("Initial sequence number is %d from (%d .. %d)",
1140                     mSeqNumber, firstSeqNumberInPlaylist,
1141                     lastSeqNumberInPlaylist);
1142         }
1143     }
1144 
1145     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1146     if (mSeqNumber < firstSeqNumberInPlaylist
1147             || mSeqNumber > lastSeqNumberInPlaylist
1148             || err != OK) {
1149         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1150             ++mNumRetries;
1151 
1152             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1153                 // make sure we reach this retry logic on refresh failures
1154                 // by adding an err != OK clause to all enclosing if's.
1155 
1156                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1157                 // playlist's target duration or 3 seconds, whichever is less
1158                 int64_t delayUs = kMaxMonitorDelayUs;
1159                 if (mPlaylist != NULL) {
1160                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1161                             / (1 + mNumRetries);
1162                 }
1163                 if (delayUs > kMaxMonitorDelayUs) {
1164                     delayUs = kMaxMonitorDelayUs;
1165                 }
1166                 FLOGV("sequence number high: %d from (%d .. %d), "
1167                       "monitor in %lld (retry=%d)",
1168                         mSeqNumber, firstSeqNumberInPlaylist,
1169                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1170                 postMonitorQueue(delayUs);
1171                 return false;
1172             }
1173 
1174             if (err != OK) {
1175                 notifyError(err);
1176                 return false;
1177             }
1178 
1179             // we've missed the boat, let's start 3 segments prior to the latest sequence
1180             // number available and signal a discontinuity.
1181 
1182             ALOGI("We've missed the boat, restarting playback."
1183                   "  mStartup=%d, was  looking for %d in %d-%d",
1184                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1185                     lastSeqNumberInPlaylist);
1186             if (mStopParams != NULL) {
1187                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1188                 // but since the segments we are supposed to fetch have already rolled off
1189                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1190                 // skip.
1191                 notifyStopReached();
1192                 return false;
1193             }
1194             mSeqNumber = lastSeqNumberInPlaylist - 3;
1195             if (mSeqNumber < firstSeqNumberInPlaylist) {
1196                 mSeqNumber = firstSeqNumberInPlaylist;
1197             }
1198             discontinuity = true;
1199 
1200             // fall through
1201         } else {
1202             if (mPlaylist != NULL) {
1203                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1204                         && !mPlaylist->isComplete()) {
1205                     // Live playlists
1206                     ALOGW("sequence number %d not yet available", mSeqNumber);
1207                     postMonitorQueue(delayUsToRefreshPlaylist());
1208                     return false;
1209                 }
1210                 ALOGE("Cannot find sequence number %d in playlist "
1211                      "(contains %d - %d)",
1212                      mSeqNumber, firstSeqNumberInPlaylist,
1213                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1214 
1215                 if (mTSParser != NULL) {
1216                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1217                     // Use an empty buffer; we don't have any new data, just want to extract
1218                     // potential new access units after flush.  Reset mSeqNumber to
1219                     // lastSeqNumberInPlaylist such that we set the correct access unit
1220                     // properties in extractAndQueueAccessUnitsFromTs.
1221                     sp<ABuffer> buffer = new ABuffer(0);
1222                     mSeqNumber = lastSeqNumberInPlaylist;
1223                     extractAndQueueAccessUnitsFromTs(buffer);
1224                 }
1225                 notifyError(ERROR_END_OF_STREAM);
1226             } else {
1227                 // It's possible that we were never able to download the playlist.
1228                 // In this case we should notify error, instead of EOS, as EOS during
1229                 // prepare means we succeeded in downloading everything.
1230                 ALOGE("Failed to download playlist!");
1231                 notifyError(ERROR_IO);
1232             }
1233 
1234             return false;
1235         }
1236     }
1237 
1238     mNumRetries = 0;
1239 
1240     CHECK(mPlaylist->itemAt(
1241                 mSeqNumber - firstSeqNumberInPlaylist,
1242                 &uri,
1243                 &itemMeta));
1244 
1245     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1246 
1247     int32_t val;
1248     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1249         discontinuity = true;
1250     } else if (mLastDiscontinuitySeq >= 0
1251             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1252         // Seek jumped to a new discontinuity sequence. We need to signal
1253         // a format change to decoder. Decoder needs to shutdown and be
1254         // created again if seamless format change is unsupported.
1255         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1256                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1257                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1258         discontinuity = true;
1259     }
1260     mLastDiscontinuitySeq = -1;
1261 
1262     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1263     // this avoids interleaved connections to the key and segment file.
1264     {
1265         sp<ABuffer> junk = new ABuffer(16);
1266         junk->setRange(0, 16);
1267         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1268                 true /* first */);
1269         if (err == ERROR_NOT_CONNECTED) {
1270             return false;
1271         } else if (err != OK) {
1272             notifyError(err);
1273             return false;
1274         }
1275     }
1276 
1277     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1278         // We need to signal a time discontinuity to ATSParser on the
1279         // first segment after start, or on a discontinuity segment.
1280         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1281         // to send the time discontinuity.
1282         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1283             // If this was a live event this made no sense since
1284             // we don't have access to all the segment before the current
1285             // one.
1286             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1287         }
1288 
1289         // Setting mTimeChangeSignaled to true, so that if start time
1290         // searching goes into 2nd segment (without a discontinuity),
1291         // we don't reset time again. It causes corruption when pending
1292         // data in ATSParser is cleared.
1293         mTimeChangeSignaled = true;
1294     }
1295 
1296     if (discontinuity) {
1297         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1298 
1299         // Signal a format discontinuity to ATSParser to clear partial data
1300         // from previous streams. Not doing this causes bitstream corruption.
1301         if (mTSParser != NULL) {
1302             mTSParser.clear();
1303         }
1304 
1305         queueDiscontinuity(
1306                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1307                 NULL /* extra */);
1308 
1309         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1310             // This means we guessed mStartTimeUs to be in the previous
1311             // segment (likely very close to the end), but either video or
1312             // audio has not found start by the end of that segment.
1313             //
1314             // If this new segment is not a discontinuity, keep searching.
1315             //
1316             // If this new segment even got a discontinuity marker, just
1317             // set mStartTimeUs=0, and take all samples from now on.
1318             mStartTimeUs = 0;
1319             mFirstPTSValid = false;
1320             mIDRFound = false;
1321             mVideoBuffer->clear();
1322         }
1323     }
1324 
1325     FLOGV("fetching segment %d from (%d .. %d)",
1326             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1327     return true;
1328 }
1329 
onDownloadNext()1330 void PlaylistFetcher::onDownloadNext() {
1331     AString uri;
1332     sp<AMessage> itemMeta;
1333     sp<ABuffer> buffer;
1334     sp<ABuffer> tsBuffer;
1335     int32_t firstSeqNumberInPlaylist = 0;
1336     int32_t lastSeqNumberInPlaylist = 0;
1337     bool connectHTTP = true;
1338 
1339     if (mDownloadState->hasSavedState()) {
1340         mDownloadState->restoreState(
1341                 uri,
1342                 itemMeta,
1343                 buffer,
1344                 tsBuffer,
1345                 firstSeqNumberInPlaylist,
1346                 lastSeqNumberInPlaylist);
1347         connectHTTP = false;
1348         FLOGV("resuming: '%s'", uri.c_str());
1349     } else {
1350         if (!initDownloadState(
1351                 uri,
1352                 itemMeta,
1353                 firstSeqNumberInPlaylist,
1354                 lastSeqNumberInPlaylist)) {
1355             return;
1356         }
1357         FLOGV("fetching: '%s'", uri.c_str());
1358     }
1359 
1360     int64_t range_offset, range_length;
1361     if (!itemMeta->findInt64("range-offset", &range_offset)
1362             || !itemMeta->findInt64("range-length", &range_length)) {
1363         range_offset = 0;
1364         range_length = -1;
1365     }
1366 
1367     // block-wise download
1368     bool shouldPause = false;
1369     ssize_t bytesRead;
1370     do {
1371         int64_t startUs = ALooper::GetNowUs();
1372         bytesRead = mHTTPDownloader->fetchBlock(
1373                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1374                 NULL /* actualURL */, connectHTTP);
1375         int64_t delayUs = ALooper::GetNowUs() - startUs;
1376 
1377         if (bytesRead == ERROR_NOT_CONNECTED) {
1378             return;
1379         }
1380         if (bytesRead < 0) {
1381             status_t err = bytesRead;
1382             ALOGE("failed to fetch .ts segment at url '%s'", uriDebugString(uri).c_str());
1383             notifyError(err);
1384             return;
1385         }
1386 
1387         // add sample for bandwidth estimation, excluding samples from subtitles (as
1388         // its too small), or during startup/resumeUntil (when we could have more than
1389         // one connection open which affects bandwidth)
1390         if (!mStartup && mStopParams == NULL && bytesRead > 0
1391                 && (mStreamTypeMask
1392                         & (LiveSession::STREAMTYPE_AUDIO
1393                         | LiveSession::STREAMTYPE_VIDEO))) {
1394             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1395             if (delayUs > 2000000LL) {
1396                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1397                         bytesRead, (double)delayUs / 1.0e6);
1398             }
1399         }
1400 
1401         connectHTTP = false;
1402 
1403         CHECK(buffer != NULL);
1404 
1405         size_t size = buffer->size();
1406         // Set decryption range.
1407         buffer->setRange(size - bytesRead, bytesRead);
1408         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1409                 buffer->offset() == 0 /* first */);
1410         // Unset decryption range.
1411         buffer->setRange(0, size);
1412 
1413         if (err != OK) {
1414             ALOGE("decryptBuffer failed w/ error %d", err);
1415 
1416             notifyError(err);
1417             return;
1418         }
1419 
1420         bool startUp = mStartup; // save current start up state
1421 
1422         err = OK;
1423         if (bufferStartsWithTsSyncByte(buffer)) {
1424             // Incremental extraction is only supported for MPEG2 transport streams.
1425             if (tsBuffer == NULL) {
1426                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1427                 tsBuffer->setRange(0, 0);
1428             } else if (tsBuffer->capacity() != buffer->capacity()) {
1429                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1430                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1431                 tsBuffer->setRange(tsOff, tsSize);
1432             }
1433             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1434             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1435         }
1436 
1437         if (err == -EAGAIN) {
1438             // starting sequence number too low/high
1439             mTSParser.clear();
1440             for (size_t i = 0; i < mPacketSources.size(); i++) {
1441                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1442                 packetSource->clear();
1443             }
1444             postMonitorQueue();
1445             return;
1446         } else if (err == ERROR_OUT_OF_RANGE) {
1447             // reached stopping point
1448             notifyStopReached();
1449             return;
1450         } else if (err != OK) {
1451             notifyError(err);
1452             return;
1453         }
1454         // If we're switching, post start notification
1455         // this should only be posted when the last chunk is full processed by TSParser
1456         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1457             CHECK(mStartTimeUsNotify != NULL);
1458             mStartTimeUsNotify->post();
1459             mStartTimeUsNotify.clear();
1460             shouldPause = true;
1461         }
1462         if (shouldPause || shouldPauseDownload()) {
1463             // save state and return if this is not the last chunk,
1464             // leaving the fetcher in paused state.
1465             if (bytesRead != 0) {
1466                 mDownloadState->saveState(
1467                         uri,
1468                         itemMeta,
1469                         buffer,
1470                         tsBuffer,
1471                         firstSeqNumberInPlaylist,
1472                         lastSeqNumberInPlaylist);
1473                 return;
1474             }
1475             shouldPause = true;
1476         }
1477     } while (bytesRead != 0);
1478 
1479     if (bufferStartsWithTsSyncByte(buffer)) {
1480         // If we don't see a stream in the program table after fetching a full ts segment
1481         // mark it as nonexistent.
1482         ATSParser::SourceType srcTypes[] =
1483                 { ATSParser::VIDEO, ATSParser::AUDIO };
1484         LiveSession::StreamType streamTypes[] =
1485                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1486         const size_t kNumTypes = NELEM(srcTypes);
1487 
1488         for (size_t i = 0; i < kNumTypes; i++) {
1489             ATSParser::SourceType srcType = srcTypes[i];
1490             LiveSession::StreamType streamType = streamTypes[i];
1491 
1492             sp<AnotherPacketSource> source =
1493                 static_cast<AnotherPacketSource *>(
1494                     mTSParser->getSource(srcType).get());
1495 
1496             if (!mTSParser->hasSource(srcType)) {
1497                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1498                       srcType == ATSParser::VIDEO ? "video" : "audio");
1499 
1500                 mStreamTypeMask &= ~streamType;
1501                 mPacketSources.removeItem(streamType);
1502             }
1503         }
1504 
1505     }
1506 
1507     if (checkDecryptPadding(buffer) != OK) {
1508         ALOGE("Incorrect padding bytes after decryption.");
1509         notifyError(ERROR_MALFORMED);
1510         return;
1511     }
1512 
1513     if (tsBuffer != NULL) {
1514         AString method;
1515         CHECK(buffer->meta()->findString("cipher-method", &method));
1516         if ((tsBuffer->size() > 0 && method == "NONE")
1517                 || tsBuffer->size() > 16) {
1518             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1519                     "bytes in length.");
1520             notifyError(ERROR_MALFORMED);
1521             return;
1522         }
1523     }
1524 
1525     // bulk extract non-ts files
1526     bool startUp = mStartup;
1527     if (tsBuffer == NULL) {
1528         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1529         if (err == -EAGAIN) {
1530             // starting sequence number too low/high
1531             postMonitorQueue();
1532             return;
1533         } else if (err == ERROR_OUT_OF_RANGE) {
1534             // reached stopping point
1535             notifyStopReached();
1536             return;
1537         } else if (err != OK) {
1538             notifyError(err);
1539             return;
1540         }
1541     }
1542 
1543     ++mSeqNumber;
1544 
1545     // if adapting, pause after found the next starting point
1546     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1547         CHECK(mStartTimeUsNotify != NULL);
1548         mStartTimeUsNotify->post();
1549         mStartTimeUsNotify.clear();
1550         shouldPause = true;
1551     }
1552 
1553     if (!shouldPause) {
1554         postMonitorQueue();
1555     }
1556 }
1557 
1558 /*
1559  * returns true if we need to adjust mSeqNumber
1560  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1561 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1562     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1563 
1564     int64_t minDiffUs, maxDiffUs;
1565     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1566         // if the previous fetcher paused in the middle of a segment, we
1567         // want to start at a segment that overlaps the last sample
1568         minDiffUs = -mPlaylist->getTargetDuration();
1569         maxDiffUs = 0LL;
1570     } else {
1571         // if the previous fetcher paused at the end of a segment, ideally
1572         // we want to start at the segment that's roughly aligned with its
1573         // next segment, but if the two variants are not well aligned we
1574         // adjust the diff to within (-T/2, T/2)
1575         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1576         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1577     }
1578 
1579     int32_t oldSeqNumber = mSeqNumber;
1580     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1581 
1582     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1583     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1584     if (diffUs > maxDiffUs) {
1585         while (index > 0 && diffUs > maxDiffUs) {
1586             --index;
1587 
1588             sp<AMessage> itemMeta;
1589             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1590 
1591             int64_t itemDurationUs;
1592             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1593 
1594             diffUs -= itemDurationUs;
1595         }
1596     } else if (diffUs < minDiffUs) {
1597         while (index + 1 < (ssize_t) mPlaylist->size()
1598                 && diffUs < minDiffUs) {
1599             ++index;
1600 
1601             sp<AMessage> itemMeta;
1602             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1603 
1604             int64_t itemDurationUs;
1605             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1606 
1607             diffUs += itemDurationUs;
1608         }
1609     }
1610 
1611     mSeqNumber = firstSeqNumberInPlaylist + index;
1612 
1613     if (mSeqNumber != oldSeqNumber) {
1614         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1615                 (long long) anchorTimeUs - mStartTimeUs,
1616                 (long long) minDiffUs,
1617                 (long long) maxDiffUs);
1618         return true;
1619     }
1620     return false;
1621 }
1622 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1623 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1624     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1625 
1626     size_t index = 0;
1627     while (index < mPlaylist->size()) {
1628         sp<AMessage> itemMeta;
1629         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1630         size_t curDiscontinuitySeq;
1631         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1632         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1633         if (curDiscontinuitySeq == discontinuitySeq) {
1634             return seqNumber;
1635         } else if (curDiscontinuitySeq > discontinuitySeq) {
1636             return seqNumber <= 0 ? 0 : seqNumber - 1;
1637         }
1638 
1639         ++index;
1640     }
1641 
1642     return firstSeqNumberInPlaylist + mPlaylist->size();
1643 }
1644 
getSeqNumberForTime(int64_t timeUs) const1645 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1646     size_t index = 0;
1647     int64_t segmentStartUs = 0;
1648     while (index < mPlaylist->size()) {
1649         sp<AMessage> itemMeta;
1650         CHECK(mPlaylist->itemAt(
1651                     index, NULL /* uri */, &itemMeta));
1652 
1653         int64_t itemDurationUs;
1654         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1655 
1656         if (timeUs < segmentStartUs + itemDurationUs) {
1657             break;
1658         }
1659 
1660         segmentStartUs += itemDurationUs;
1661         ++index;
1662     }
1663 
1664     if (index >= mPlaylist->size()) {
1665         index = mPlaylist->size() - 1;
1666     }
1667 
1668     return mPlaylist->getFirstSeqNumber() + index;
1669 }
1670 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1671 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1672         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1673     sp<MetaData> format = source->getFormat();
1674     if (format != NULL) {
1675         // for simplicity, store a reference to the format in each unit
1676         accessUnit->meta()->setObject("format", format);
1677     }
1678 
1679     if (discard) {
1680         accessUnit->meta()->setInt32("discard", discard);
1681     }
1682 
1683     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1684     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1685     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1686     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1687     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1688         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1689     }
1690     return accessUnit;
1691 }
1692 
isStartTimeReached(int64_t timeUs)1693 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1694     if (!mFirstPTSValid) {
1695         mFirstTimeUs = timeUs;
1696         mFirstPTSValid = true;
1697     }
1698     bool startTimeReached = true;
1699     if (mStartTimeUsRelative) {
1700         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1701                 (long long)timeUs,
1702                 (long long)mFirstTimeUs,
1703                 (long long)(timeUs - mFirstTimeUs));
1704         timeUs -= mFirstTimeUs;
1705         if (timeUs < 0) {
1706             FLOGV("clamp negative timeUs to 0");
1707             timeUs = 0;
1708         }
1709         startTimeReached = (timeUs >= mStartTimeUs);
1710     }
1711     return startTimeReached;
1712 }
1713 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1714 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1715     if (mTSParser == NULL) {
1716         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1717         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1718     }
1719 
1720     if (mNextPTSTimeUs >= 0LL) {
1721         sp<AMessage> extra = new AMessage;
1722         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1723         // ATSParser from skewing the timestamps of access units.
1724         extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1725 
1726         // When adapting, signal a recent media time to the parser,
1727         // so that PTS wrap around is handled for the new variant.
1728         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1729             extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1730         }
1731 
1732         mTSParser->signalDiscontinuity(
1733                 ATSParser::DISCONTINUITY_TIME, extra);
1734 
1735         mNextPTSTimeUs = -1LL;
1736     }
1737 
1738     if (mSampleAesKeyItemChanged) {
1739         mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1740         mSampleAesKeyItemChanged = false;
1741     }
1742 
1743     size_t offset = 0;
1744     while (offset + 188 <= buffer->size()) {
1745         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1746 
1747         if (err != OK) {
1748             return err;
1749         }
1750 
1751         offset += 188;
1752     }
1753     // setRange to indicate consumed bytes.
1754     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1755 
1756     if (mSegmentFirstPTS < 0LL) {
1757         // get the smallest first PTS from all streams present in this parser
1758         for (size_t i = mPacketSources.size(); i > 0;) {
1759             i--;
1760             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1761             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1762                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1763                 return ERROR_MALFORMED;
1764             }
1765             if (stream == LiveSession::STREAMTYPE_METADATA) {
1766                 continue;
1767             }
1768             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1769             sp<AnotherPacketSource> source =
1770                 static_cast<AnotherPacketSource *>(
1771                         mTSParser->getSource(type).get());
1772 
1773             if (source == NULL) {
1774                 continue;
1775             }
1776             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1777             if (meta != NULL) {
1778                 int64_t timeUs;
1779                 CHECK(meta->findInt64("timeUs", &timeUs));
1780                 if (mSegmentFirstPTS < 0LL || timeUs < mSegmentFirstPTS) {
1781                     mSegmentFirstPTS = timeUs;
1782                 }
1783             }
1784         }
1785         if (mSegmentFirstPTS < 0LL) {
1786             // didn't find any TS packet, can return early
1787             return OK;
1788         }
1789         if (!mStartTimeUsRelative) {
1790             // mStartup
1791             //   mStartup is true until we have queued a packet for all the streams
1792             //   we are fetching. We queue packets whose timestamps are greater than
1793             //   mStartTimeUs.
1794             // mSegmentStartTimeUs >= 0
1795             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1796             // adjustSeqNumberWithAnchorTime(timeUs) == true
1797             //   we guessed a seq number that's either too large or too small.
1798             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1799             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1800             // to -1 so that we don't enter this chunk next time.
1801             if (mStartup && mSegmentStartTimeUs >= 0
1802                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1803                 mStartTimeUsNotify = mNotify->dup();
1804                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1805                 mStartTimeUsNotify->setString("uri", mURI);
1806                 mIDRFound = false;
1807                 mSegmentStartTimeUs = -1;
1808                 return -EAGAIN;
1809             }
1810         }
1811     }
1812 
1813     status_t err = OK;
1814     for (size_t i = mPacketSources.size(); i > 0;) {
1815         i--;
1816         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1817 
1818         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1819         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1820             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1821             return ERROR_MALFORMED;
1822         }
1823 
1824         const char *key = LiveSession::getKeyForStream(stream);
1825         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1826 
1827         sp<AnotherPacketSource> source =
1828             static_cast<AnotherPacketSource *>(
1829                     mTSParser->getSource(type).get());
1830 
1831         if (source == NULL) {
1832             continue;
1833         }
1834 
1835         const char *mime;
1836         sp<MetaData> format  = source->getFormat();
1837         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1838                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1839 
1840         sp<ABuffer> accessUnit;
1841         status_t finalResult;
1842         while (source->hasBufferAvailable(&finalResult)
1843                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1844 
1845             int64_t timeUs;
1846             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1847 
1848             if (mStartup) {
1849                 bool startTimeReached = isStartTimeReached(timeUs);
1850 
1851                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1852                     // buffer up to the closest preceding IDR frame in the next segement,
1853                     // or the closest succeeding IDR frame after the exact position
1854                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1855                             (long long)timeUs,
1856                             (long long)mStartTimeUs,
1857                             (long long)timeUs - mStartTimeUs,
1858                             mIDRFound);
1859                     if (isAvc) {
1860                         if (IsIDR(accessUnit->data(), accessUnit->size())) {
1861                             mVideoBuffer->clear();
1862                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1863                             mIDRFound = true;
1864                         }
1865                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1866                             mVideoBuffer->queueAccessUnit(accessUnit);
1867                             FSLOGV(stream, "saving AVC video AccessUnit");
1868                         }
1869                     }
1870                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1871                         continue;
1872                     }
1873                 }
1874             }
1875 
1876             if (mStartTimeUsNotify != NULL) {
1877                 uint32_t streamMask = 0;
1878                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1879                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1880                         && !(streamMask & mPacketSources.keyAt(i))) {
1881                     streamMask |= mPacketSources.keyAt(i);
1882                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1883                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1884                             (long long)timeUs, streamMask);
1885 
1886                     if (streamMask == mStreamTypeMask) {
1887                         FLOGV("found start point for all streams");
1888                         mStartup = false;
1889                     }
1890                 }
1891             }
1892 
1893             if (mStopParams != NULL) {
1894                 int32_t discontinuitySeq;
1895                 int64_t stopTimeUs;
1896                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1897                         || discontinuitySeq > mDiscontinuitySeq
1898                         || !mStopParams->findInt64(key, &stopTimeUs)
1899                         || (discontinuitySeq == mDiscontinuitySeq
1900                                 && timeUs >= stopTimeUs)) {
1901                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1902                     mStreamTypeMask &= ~stream;
1903                     mPacketSources.removeItemsAt(i);
1904                     break;
1905                 }
1906             }
1907 
1908             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1909                 const bool discard = true;
1910                 status_t status;
1911                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1912                     sp<ABuffer> videoBuffer;
1913                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1914                     setAccessUnitProperties(videoBuffer, source, discard);
1915                     packetSource->queueAccessUnit(videoBuffer);
1916                     int64_t bufferTimeUs;
1917                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1918                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1919                             (long long)bufferTimeUs);
1920                 }
1921             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1922                 mHasMetadata = true;
1923                 sp<AMessage> notify = mNotify->dup();
1924                 notify->setInt32("what", kWhatMetadataDetected);
1925                 notify->post();
1926             }
1927 
1928             setAccessUnitProperties(accessUnit, source);
1929             packetSource->queueAccessUnit(accessUnit);
1930             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1931         }
1932 
1933         if (err != OK) {
1934             break;
1935         }
1936     }
1937 
1938     if (err != OK) {
1939         for (size_t i = mPacketSources.size(); i > 0;) {
1940             i--;
1941             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1942             packetSource->clear();
1943         }
1944         return err;
1945     }
1946 
1947     if (!mStreamTypeMask) {
1948         // Signal gap is filled between original and new stream.
1949         FLOGV("reached stop point for all streams");
1950         return ERROR_OUT_OF_RANGE;
1951     }
1952 
1953     return OK;
1954 }
1955 
1956 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1957 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1958         const sp<ABuffer> &buffer) {
1959     size_t pos = 0;
1960 
1961     // skip possible BOM
1962     if (buffer->size() >= pos + 3 &&
1963             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1964         pos += 3;
1965     }
1966 
1967     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1968     if (buffer->size() < pos + 6 ||
1969             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1970         return false;
1971     }
1972     pos += 6;
1973 
1974     if (buffer->size() == pos) {
1975         return true;
1976     }
1977 
1978     uint8_t sep = buffer->data()[pos];
1979     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1980 }
1981 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1982 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1983         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1984     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1985         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1986             ALOGE("This stream only contains subtitles.");
1987             return ERROR_MALFORMED;
1988         }
1989 
1990         const sp<AnotherPacketSource> packetSource =
1991             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1992 
1993         int64_t durationUs;
1994         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1995         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1996         buffer->meta()->setInt64("durationUs", durationUs);
1997         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1998         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1999         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
2000         packetSource->queueAccessUnit(buffer);
2001         return OK;
2002     }
2003 
2004     if (mNextPTSTimeUs >= 0LL) {
2005         mNextPTSTimeUs = -1LL;
2006     }
2007 
2008     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
2009     // stream prefixed by an ID3 tag.
2010 
2011     bool firstID3Tag = true;
2012     uint64_t PTS = 0;
2013 
2014     for (;;) {
2015         // Make sure to skip all ID3 tags preceding the audio data.
2016         // At least one must be present to provide the PTS timestamp.
2017 
2018         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2019         if (!id3.isValid()) {
2020             if (firstID3Tag) {
2021                 ALOGE("Unable to parse ID3 tag.");
2022                 return ERROR_MALFORMED;
2023             } else {
2024                 break;
2025             }
2026         }
2027 
2028         if (firstID3Tag) {
2029             bool found = false;
2030 
2031             ID3::Iterator it(id3, "PRIV");
2032             while (!it.done()) {
2033                 size_t length;
2034                 const uint8_t *data = it.getData(&length);
2035                 if (!data) {
2036                     return ERROR_MALFORMED;
2037                 }
2038 
2039                 static const char *kMatchName =
2040                     "com.apple.streaming.transportStreamTimestamp";
2041                 static const size_t kMatchNameLen = strlen(kMatchName);
2042 
2043                 if (length == kMatchNameLen + 1 + 8
2044                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2045                     found = true;
2046                     PTS = U64_AT(&data[kMatchNameLen + 1]);
2047                 }
2048 
2049                 it.next();
2050             }
2051 
2052             if (!found) {
2053                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2054                 return ERROR_MALFORMED;
2055             }
2056         }
2057 
2058         // skip the ID3 tag
2059         buffer->setRange(
2060                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2061 
2062         firstID3Tag = false;
2063     }
2064 
2065     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2066         ALOGW("This stream only contains audio data!");
2067 
2068         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2069 
2070         if (mStreamTypeMask == 0) {
2071             return OK;
2072         }
2073     }
2074 
2075     sp<AnotherPacketSource> packetSource =
2076         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2077 
2078     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2079         ABitReader bits(buffer->data(), buffer->size());
2080 
2081         // adts_fixed_header
2082 
2083         CHECK_EQ(bits.getBits(12), 0xfffu);
2084         bits.skipBits(3);  // ID, layer
2085         bool protection_absent __unused = bits.getBits(1) != 0;
2086 
2087         unsigned profile = bits.getBits(2);
2088         CHECK_NE(profile, 3u);
2089         unsigned sampling_freq_index = bits.getBits(4);
2090         bits.getBits(1);  // private_bit
2091         unsigned channel_configuration = bits.getBits(3);
2092         CHECK_NE(channel_configuration, 0u);
2093         bits.skipBits(2);  // original_copy, home
2094 
2095         sp<MetaData> meta = new MetaData();
2096         MakeAACCodecSpecificData(*meta,
2097                 profile, sampling_freq_index, channel_configuration);
2098 
2099         meta->setInt32(kKeyIsADTS, true);
2100 
2101         packetSource->setFormat(meta);
2102     }
2103 
2104     int64_t numSamples = 0LL;
2105     int32_t sampleRate;
2106     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2107 
2108     int64_t timeUs = (PTS * 100LL) / 9LL;
2109     if (mStartup && !mFirstPTSValid) {
2110         mFirstPTSValid = true;
2111         mFirstTimeUs = timeUs;
2112     }
2113 
2114     if (mSegmentFirstPTS < 0LL) {
2115         mSegmentFirstPTS = timeUs;
2116         if (!mStartTimeUsRelative) {
2117             // Duplicated logic from how we handle .ts playlists.
2118             if (mStartup && mSegmentStartTimeUs >= 0
2119                     && adjustSeqNumberWithAnchorTime(timeUs)) {
2120                 mSegmentStartTimeUs = -1;
2121                 return -EAGAIN;
2122             }
2123         }
2124     }
2125 
2126     sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2127     if (mSampleAesKeyItem != NULL) {
2128         ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s  IV: %s",
2129                 mSeqNumber,
2130                 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2131                 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2132 
2133         sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2134     }
2135 
2136     int frameId = 0;
2137 
2138     size_t offset = 0;
2139     while (offset < buffer->size()) {
2140         const uint8_t *adtsHeader = buffer->data() + offset;
2141         if (buffer->size() <= offset+5) {
2142             ALOGV("buffer does not contain a complete header");
2143             return ERROR_MALFORMED;
2144         }
2145         // non-const pointer for decryption if needed
2146         uint8_t *adtsFrame = buffer->data() + offset;
2147 
2148         unsigned aac_frame_length =
2149             ((adtsHeader[3] & 3) << 11)
2150             | (adtsHeader[4] << 3)
2151             | (adtsHeader[5] >> 5);
2152 
2153         if (aac_frame_length == 0) {
2154             const uint8_t *id3Header = adtsHeader;
2155             if (!memcmp(id3Header, "ID3", 3)) {
2156                 ID3 id3(id3Header, buffer->size() - offset, true);
2157                 if (id3.isValid()) {
2158                     offset += id3.rawSize();
2159                     continue;
2160                 };
2161             }
2162             return ERROR_MALFORMED;
2163         }
2164 
2165         if (aac_frame_length > buffer->size() - offset) {
2166             return ERROR_MALFORMED;
2167         }
2168 
2169         int64_t unitTimeUs = timeUs + numSamples * 1000000LL / sampleRate;
2170         offset += aac_frame_length;
2171 
2172         // Each AAC frame encodes 1024 samples.
2173         numSamples += 1024;
2174 
2175         if (mStartup) {
2176             int64_t startTimeUs = unitTimeUs;
2177             if (mStartTimeUsRelative) {
2178                 startTimeUs -= mFirstTimeUs;
2179                 if (startTimeUs  < 0) {
2180                     startTimeUs = 0;
2181                 }
2182             }
2183             if (startTimeUs < mStartTimeUs) {
2184                 continue;
2185             }
2186 
2187             if (mStartTimeUsNotify != NULL) {
2188                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2189                 mStartup = false;
2190             }
2191         }
2192 
2193         if (mStopParams != NULL) {
2194             int32_t discontinuitySeq;
2195             int64_t stopTimeUs;
2196             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2197                     || discontinuitySeq > mDiscontinuitySeq
2198                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2199                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2200                 mStreamTypeMask = 0;
2201                 mPacketSources.clear();
2202                 return ERROR_OUT_OF_RANGE;
2203             }
2204         }
2205 
2206         if (sampleDecryptor != NULL) {
2207             bool protection_absent = (adtsHeader[1] & 0x1);
2208             size_t headerSize = protection_absent ? 7 : 9;
2209             if (frameId == 0) {
2210                 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2211                         mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2212             }
2213 
2214             sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2215         }
2216         frameId++;
2217 
2218         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2219         memcpy(unit->data(), adtsHeader, aac_frame_length);
2220 
2221         unit->meta()->setInt64("timeUs", unitTimeUs);
2222         setAccessUnitProperties(unit, packetSource);
2223         packetSource->queueAccessUnit(unit);
2224     }
2225 
2226     return OK;
2227 }
2228 
updateDuration()2229 void PlaylistFetcher::updateDuration() {
2230     int64_t durationUs = 0LL;
2231     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2232         sp<AMessage> itemMeta;
2233         CHECK(mPlaylist->itemAt(
2234                     index, NULL /* uri */, &itemMeta));
2235 
2236         int64_t itemDurationUs;
2237         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2238 
2239         durationUs += itemDurationUs;
2240     }
2241 
2242     sp<AMessage> msg = mNotify->dup();
2243     msg->setInt32("what", kWhatDurationUpdate);
2244     msg->setInt64("durationUs", durationUs);
2245     msg->post();
2246 }
2247 
updateTargetDuration()2248 void PlaylistFetcher::updateTargetDuration() {
2249     sp<AMessage> msg = mNotify->dup();
2250     msg->setInt32("what", kWhatTargetDurationUpdate);
2251     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2252     msg->post();
2253 }
2254 
2255 }  // namespace android
2256