1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <android-base/macros.h>
20 #include <utils/Log.h>
21 #include <utils/misc.h>
22 
23 #include "PlaylistFetcher.h"
24 #include "HTTPDownloader.h"
25 #include "LiveSession.h"
26 #include "M3UParser.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 #include "mpeg2ts/HlsSampleDecryptor.h"
30 
31 #include <media/stagefright/foundation/ABitReader.h>
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/ByteUtils.h>
35 #include <media/stagefright/foundation/MediaKeys.h>
36 #include <media/stagefright/foundation/avc_utils.h>
37 #include <media/stagefright/DataURISource.h>
38 #include <media/stagefright/MediaDefs.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/MetaDataUtils.h>
41 #include <media/stagefright/Utils.h>
42 
43 #include <ctype.h>
44 #include <inttypes.h>
45 
46 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
47 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
48          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
49 
50 namespace android {
51 
52 // static
53 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000LL;
54 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000LL;
55 // LCM of 188 (size of a TS packet) & 1k works well
56 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
57 
58 struct PlaylistFetcher::DownloadState : public RefBase {
59     DownloadState();
60     void resetState();
61     bool hasSavedState() const;
62     void restoreState(
63             AString &uri,
64             sp<AMessage> &itemMeta,
65             sp<ABuffer> &buffer,
66             sp<ABuffer> &tsBuffer,
67             int32_t &firstSeqNumberInPlaylist,
68             int32_t &lastSeqNumberInPlaylist);
69     void saveState(
70             AString &uri,
71             sp<AMessage> &itemMeta,
72             sp<ABuffer> &buffer,
73             sp<ABuffer> &tsBuffer,
74             int32_t &firstSeqNumberInPlaylist,
75             int32_t &lastSeqNumberInPlaylist);
76 
77 private:
78     bool mHasSavedState;
79     AString mUri;
80     sp<AMessage> mItemMeta;
81     sp<ABuffer> mBuffer;
82     sp<ABuffer> mTsBuffer;
83     int32_t mFirstSeqNumberInPlaylist;
84     int32_t mLastSeqNumberInPlaylist;
85 };
86 
DownloadState()87 PlaylistFetcher::DownloadState::DownloadState() {
88     resetState();
89 }
90 
hasSavedState() const91 bool PlaylistFetcher::DownloadState::hasSavedState() const {
92     return mHasSavedState;
93 }
94 
resetState()95 void PlaylistFetcher::DownloadState::resetState() {
96     mHasSavedState = false;
97 
98     mUri.clear();
99     mItemMeta = NULL;
100     mBuffer = NULL;
101     mTsBuffer = NULL;
102     mFirstSeqNumberInPlaylist = 0;
103     mLastSeqNumberInPlaylist = 0;
104 }
105 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)106 void PlaylistFetcher::DownloadState::restoreState(
107         AString &uri,
108         sp<AMessage> &itemMeta,
109         sp<ABuffer> &buffer,
110         sp<ABuffer> &tsBuffer,
111         int32_t &firstSeqNumberInPlaylist,
112         int32_t &lastSeqNumberInPlaylist) {
113     if (!mHasSavedState) {
114         return;
115     }
116 
117     uri = mUri;
118     itemMeta = mItemMeta;
119     buffer = mBuffer;
120     tsBuffer = mTsBuffer;
121     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
122     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
123 
124     resetState();
125 }
126 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)127 void PlaylistFetcher::DownloadState::saveState(
128         AString &uri,
129         sp<AMessage> &itemMeta,
130         sp<ABuffer> &buffer,
131         sp<ABuffer> &tsBuffer,
132         int32_t &firstSeqNumberInPlaylist,
133         int32_t &lastSeqNumberInPlaylist) {
134     mHasSavedState = true;
135 
136     mUri = uri;
137     mItemMeta = itemMeta;
138     mBuffer = buffer;
139     mTsBuffer = tsBuffer;
140     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
141     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
142 }
143 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)144 PlaylistFetcher::PlaylistFetcher(
145         const sp<AMessage> &notify,
146         const sp<LiveSession> &session,
147         const char *uri,
148         int32_t id,
149         int32_t subtitleGeneration)
150     : mNotify(notify),
151       mSession(session),
152       mURI(uri),
153       mFetcherID(id),
154       mStreamTypeMask(0),
155       mStartTimeUs(-1LL),
156       mSegmentStartTimeUs(-1LL),
157       mDiscontinuitySeq(-1LL),
158       mStartTimeUsRelative(false),
159       mLastPlaylistFetchTimeUs(-1LL),
160       mPlaylistTimeUs(-1LL),
161       mSeqNumber(-1),
162       mNumRetries(0),
163       mNumRetriesForMonitorQueue(0),
164       mStartup(true),
165       mIDRFound(false),
166       mSeekMode(LiveSession::kSeekModeExactPosition),
167       mTimeChangeSignaled(false),
168       mNextPTSTimeUs(-1LL),
169       mMonitorQueueGeneration(0),
170       mSubtitleGeneration(subtitleGeneration),
171       mLastDiscontinuitySeq(-1LL),
172       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
173       mFirstPTSValid(false),
174       mFirstTimeUs(-1LL),
175       mVideoBuffer(new AnotherPacketSource(NULL)),
176       mSampleAesKeyItemChanged(false),
177       mThresholdRatio(-1.0f),
178       mDownloadState(new DownloadState()),
179       mHasMetadata(false) {
180     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
181     mHTTPDownloader = mSession->getHTTPDownloader();
182 
183     memset(mKeyData, 0, sizeof(mKeyData));
184     memset(mAESInitVec, 0, sizeof(mAESInitVec));
185 }
186 
~PlaylistFetcher()187 PlaylistFetcher::~PlaylistFetcher() {
188 }
189 
getFetcherID() const190 int32_t PlaylistFetcher::getFetcherID() const {
191     return mFetcherID;
192 }
193 
getSegmentStartTimeUs(int32_t seqNumber) const194 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
195     CHECK(mPlaylist != NULL);
196 
197     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
198     mPlaylist->getSeqNumberRange(
199             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
200 
201     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
202     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
203 
204     int64_t segmentStartUs = 0LL;
205     for (int32_t index = 0;
206             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
207         sp<AMessage> itemMeta;
208         CHECK(mPlaylist->itemAt(
209                     index, NULL /* uri */, &itemMeta));
210 
211         int64_t itemDurationUs;
212         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
213 
214         segmentStartUs += itemDurationUs;
215     }
216 
217     return segmentStartUs;
218 }
219 
getSegmentDurationUs(int32_t seqNumber) const220 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
221     CHECK(mPlaylist != NULL);
222 
223     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
224     mPlaylist->getSeqNumberRange(
225             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
226 
227     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
228     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
229 
230     int32_t index = seqNumber - firstSeqNumberInPlaylist;
231     sp<AMessage> itemMeta;
232     CHECK(mPlaylist->itemAt(
233                 index, NULL /* uri */, &itemMeta));
234 
235     int64_t itemDurationUs;
236     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
237 
238     return itemDurationUs;
239 }
240 
delayUsToRefreshPlaylist() const241 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
242     int64_t nowUs = ALooper::GetNowUs();
243 
244     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0LL) {
245         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
246         return 0LL;
247     }
248 
249     if (mPlaylist->isComplete()) {
250         return (~0LLU >> 1);
251     }
252 
253     int64_t targetDurationUs = mPlaylist->getTargetDuration();
254 
255     int64_t minPlaylistAgeUs;
256 
257     switch (mRefreshState) {
258         case INITIAL_MINIMUM_RELOAD_DELAY:
259         {
260             size_t n = mPlaylist->size();
261             if (n > 0) {
262                 sp<AMessage> itemMeta;
263                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
264 
265                 int64_t itemDurationUs;
266                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
267 
268                 minPlaylistAgeUs = itemDurationUs;
269                 break;
270             }
271 
272             FALLTHROUGH_INTENDED;
273         }
274 
275         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
276         {
277             minPlaylistAgeUs = targetDurationUs / 2;
278             break;
279         }
280 
281         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
282         {
283             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
284             break;
285         }
286 
287         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
288         {
289             minPlaylistAgeUs = targetDurationUs * 3;
290             break;
291         }
292 
293         default:
294             TRESPASS();
295             break;
296     }
297 
298     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
299     return delayUs > 0LL ? delayUs : 0LL;
300 }
301 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)302 status_t PlaylistFetcher::decryptBuffer(
303         size_t playlistIndex, const sp<ABuffer> &buffer,
304         bool first) {
305     sp<AMessage> itemMeta;
306     bool found = false;
307     AString method;
308 
309     for (ssize_t i = playlistIndex; i >= 0; --i) {
310         AString uri;
311         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
312 
313         if (itemMeta->findString("cipher-method", &method)) {
314             found = true;
315             break;
316         }
317     }
318 
319     // TODO: Revise this when we add support for KEYFORMAT
320     // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
321     if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
322         ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
323                 mSampleAesKeyItem.get(), method.c_str());
324         mSampleAesKeyItem = NULL;
325         mSampleAesKeyItemChanged = true;
326     }
327 
328     if (!found) {
329         method = "NONE";
330     }
331     buffer->meta()->setString("cipher-method", method.c_str());
332 
333     if (method == "NONE") {
334         return OK;
335     } else if (method == "SAMPLE-AES") {
336         ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
337     } else if (!(method == "AES-128")) {
338         ALOGE("Unsupported cipher method '%s'", method.c_str());
339         return ERROR_UNSUPPORTED;
340     }
341 
342     AString keyURI;
343     if (!itemMeta->findString("cipher-uri", &keyURI)) {
344         ALOGE("Missing key uri");
345         return ERROR_MALFORMED;
346     }
347 
348     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
349 
350     sp<ABuffer> key;
351     if (index >= 0) {
352         key = mAESKeyForURI.valueAt(index);
353     } else if (keyURI.startsWith("data:")) {
354         sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
355         off64_t keyLen;
356         if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
357             ALOGE("Malformed cipher key data uri.");
358             return ERROR_MALFORMED;
359         }
360         key = new ABuffer(keyLen);
361         keySrc->readAt(0, key->data(), keyLen);
362         key->setRange(0, keyLen);
363     } else {
364         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
365 
366         if (err == ERROR_NOT_CONNECTED) {
367             return ERROR_NOT_CONNECTED;
368         } else if (err < 0) {
369             ALOGE("failed to fetch cipher key from '%s'.", uriDebugString(keyURI).c_str());
370             return ERROR_IO;
371         } else if (key->size() != 16) {
372             ALOGE("key file '%s' wasn't 16 bytes in size.", uriDebugString(keyURI).c_str());
373             return ERROR_MALFORMED;
374         }
375 
376         mAESKeyForURI.add(keyURI, key);
377     }
378 
379     if (first) {
380         // If decrypting the first block in a file, read the iv from the manifest
381         // or derive the iv from the file's sequence number.
382 
383         unsigned char AESInitVec[AES_BLOCK_SIZE];
384         AString iv;
385         if (itemMeta->findString("cipher-iv", &iv)) {
386             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
387                     || iv.size() > 16 * 2 + 2) {
388                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
389                 return ERROR_MALFORMED;
390             }
391 
392             while (iv.size() < 16 * 2 + 2) {
393                 iv.insert("0", 1, 2);
394             }
395 
396             memset(AESInitVec, 0, sizeof(AESInitVec));
397             for (size_t i = 0; i < 16; ++i) {
398                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
399                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
400                 if (!isxdigit(c1) || !isxdigit(c2)) {
401                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
402                     return ERROR_MALFORMED;
403                 }
404                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
405                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
406 
407                 AESInitVec[i] = nibble1 << 4 | nibble2;
408             }
409         } else {
410             memset(AESInitVec, 0, sizeof(AESInitVec));
411             AESInitVec[15] = mSeqNumber & 0xff;
412             AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
413             AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
414             AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
415         }
416 
417         bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
418         bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
419         bool newSampleAesKeyItem = newKey || newInitVec;
420         ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
421                 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
422 
423         if (newSampleAesKeyItem) {
424             memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
425             memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
426 
427             if (method == "SAMPLE-AES") {
428                 mSampleAesKeyItemChanged = true;
429 
430                 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
431                 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
432 
433                 // always allocating a new one rather than updating the old message
434                 // lower layer might still have a reference to the old message
435                 mSampleAesKeyItem = new AMessage();
436                 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
437                 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
438 
439                 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s  IV: %s",
440                         HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
441                         HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
442             } // SAMPLE-AES
443         } // newSampleAesKeyItem
444     } // first
445 
446     if (method == "SAMPLE-AES") {
447         ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
448         return OK;
449     }
450 
451 
452     AES_KEY aes_key;
453     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
454         ALOGE("failed to set AES decryption key.");
455         return UNKNOWN_ERROR;
456     }
457 
458     size_t n = buffer->size();
459     if (!n) {
460         return OK;
461     }
462 
463     if (n < 16 || n % 16) {
464         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
465         return ERROR_MALFORMED;
466     }
467 
468     AES_cbc_encrypt(
469             buffer->data(), buffer->data(), buffer->size(),
470             &aes_key, mAESInitVec, AES_DECRYPT);
471 
472     return OK;
473 }
474 
checkDecryptPadding(const sp<ABuffer> & buffer)475 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
476     AString method;
477     CHECK(buffer->meta()->findString("cipher-method", &method));
478     if (method == "NONE" || method == "SAMPLE-AES") {
479         return OK;
480     }
481 
482     uint8_t padding = 0;
483     if (buffer->size() > 0) {
484         padding = buffer->data()[buffer->size() - 1];
485     }
486 
487     if (padding > 16) {
488         return ERROR_MALFORMED;
489     }
490 
491     for (size_t i = buffer->size() - padding; i < padding; i++) {
492         if (buffer->data()[i] != padding) {
493             return ERROR_MALFORMED;
494         }
495     }
496 
497     buffer->setRange(buffer->offset(), buffer->size() - padding);
498     return OK;
499 }
500 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)501 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
502     int64_t maxDelayUs = delayUsToRefreshPlaylist();
503     if (maxDelayUs < minDelayUs) {
504         maxDelayUs = minDelayUs;
505     }
506     if (delayUs > maxDelayUs) {
507         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
508         delayUs = maxDelayUs;
509     }
510     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
511     msg->setInt32("generation", mMonitorQueueGeneration);
512     msg->post(delayUs);
513 }
514 
cancelMonitorQueue()515 void PlaylistFetcher::cancelMonitorQueue() {
516     ++mMonitorQueueGeneration;
517 }
518 
setStoppingThreshold(float thresholdRatio,bool disconnect)519 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
520     {
521         AutoMutex _l(mThresholdLock);
522         mThresholdRatio = thresholdRatio;
523     }
524     if (disconnect) {
525         mHTTPDownloader->disconnect();
526     }
527 }
528 
resetStoppingThreshold(bool disconnect)529 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
530     {
531         AutoMutex _l(mThresholdLock);
532         mThresholdRatio = -1.0f;
533     }
534     if (disconnect) {
535         mHTTPDownloader->disconnect();
536     } else {
537         // allow reconnect
538         mHTTPDownloader->reconnect();
539     }
540 }
541 
getStoppingThreshold()542 float PlaylistFetcher::getStoppingThreshold() {
543     AutoMutex _l(mThresholdLock);
544     return mThresholdRatio;
545 }
546 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)547 void PlaylistFetcher::startAsync(
548         const sp<AnotherPacketSource> &audioSource,
549         const sp<AnotherPacketSource> &videoSource,
550         const sp<AnotherPacketSource> &subtitleSource,
551         const sp<AnotherPacketSource> &metadataSource,
552         int64_t startTimeUs,
553         int64_t segmentStartTimeUs,
554         int32_t startDiscontinuitySeq,
555         LiveSession::SeekMode seekMode) {
556     sp<AMessage> msg = new AMessage(kWhatStart, this);
557 
558     uint32_t streamTypeMask = 0ul;
559 
560     if (audioSource != NULL) {
561         msg->setPointer("audioSource", audioSource.get());
562         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
563     }
564 
565     if (videoSource != NULL) {
566         msg->setPointer("videoSource", videoSource.get());
567         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
568     }
569 
570     if (subtitleSource != NULL) {
571         msg->setPointer("subtitleSource", subtitleSource.get());
572         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
573     }
574 
575     if (metadataSource != NULL) {
576         msg->setPointer("metadataSource", metadataSource.get());
577         // metadataSource does not affect streamTypeMask.
578     }
579 
580     msg->setInt32("streamTypeMask", streamTypeMask);
581     msg->setInt64("startTimeUs", startTimeUs);
582     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
583     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
584     msg->setInt32("seekMode", seekMode);
585     msg->post();
586 }
587 
588 /*
589  * pauseAsync
590  *
591  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
592  *           -1.0f - pause after finishing current segment
593  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
594  */
pauseAsync(float thresholdRatio,bool disconnect)595 void PlaylistFetcher::pauseAsync(
596         float thresholdRatio, bool disconnect) {
597     setStoppingThreshold(thresholdRatio, disconnect);
598 
599     (new AMessage(kWhatPause, this))->post();
600 }
601 
stopAsync(bool clear)602 void PlaylistFetcher::stopAsync(bool clear) {
603     setStoppingThreshold(0.0f, true /* disconncect */);
604 
605     sp<AMessage> msg = new AMessage(kWhatStop, this);
606     msg->setInt32("clear", clear);
607     msg->post();
608 }
609 
resumeUntilAsync(const sp<AMessage> & params)610 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
611     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
612 
613     AMessage* msg = new AMessage(kWhatResumeUntil, this);
614     msg->setMessage("params", params);
615     msg->post();
616 }
617 
fetchPlaylistAsync()618 void PlaylistFetcher::fetchPlaylistAsync() {
619     (new AMessage(kWhatFetchPlaylist, this))->post();
620 }
621 
onMessageReceived(const sp<AMessage> & msg)622 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
623     switch (msg->what()) {
624         case kWhatStart:
625         {
626             status_t err = onStart(msg);
627 
628             sp<AMessage> notify = mNotify->dup();
629             notify->setInt32("what", kWhatStarted);
630             notify->setInt32("err", err);
631             notify->post();
632             break;
633         }
634 
635         case kWhatPause:
636         {
637             onPause();
638 
639             sp<AMessage> notify = mNotify->dup();
640             notify->setInt32("what", kWhatPaused);
641             notify->setInt32("seekMode",
642                     mDownloadState->hasSavedState()
643                     ? LiveSession::kSeekModeNextSample
644                     : LiveSession::kSeekModeNextSegment);
645             notify->post();
646             break;
647         }
648 
649         case kWhatStop:
650         {
651             onStop(msg);
652 
653             sp<AMessage> notify = mNotify->dup();
654             notify->setInt32("what", kWhatStopped);
655             notify->post();
656             break;
657         }
658 
659         case kWhatFetchPlaylist:
660         {
661             bool unchanged;
662             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
663                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
664 
665             sp<AMessage> notify = mNotify->dup();
666             notify->setInt32("what", kWhatPlaylistFetched);
667             notify->setObject("playlist", playlist);
668             notify->post();
669             break;
670         }
671 
672         case kWhatMonitorQueue:
673         case kWhatDownloadNext:
674         {
675             int32_t generation;
676             CHECK(msg->findInt32("generation", &generation));
677 
678             if (generation != mMonitorQueueGeneration) {
679                 // Stale event
680                 break;
681             }
682 
683             if (msg->what() == kWhatMonitorQueue) {
684                 onMonitorQueue();
685             } else {
686                 onDownloadNext();
687             }
688             break;
689         }
690 
691         case kWhatResumeUntil:
692         {
693             onResumeUntil(msg);
694             break;
695         }
696 
697         default:
698             TRESPASS();
699     }
700 }
701 
onStart(const sp<AMessage> & msg)702 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
703     mPacketSources.clear();
704     mStopParams.clear();
705     mStartTimeUsNotify = mNotify->dup();
706     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
707     mStartTimeUsNotify->setString("uri", mURI);
708 
709     uint32_t streamTypeMask;
710     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
711 
712     int64_t startTimeUs;
713     int64_t segmentStartTimeUs;
714     int32_t startDiscontinuitySeq;
715     int32_t seekMode;
716     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
717     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
718     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
719     CHECK(msg->findInt32("seekMode", &seekMode));
720 
721     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
722         void *ptr;
723         CHECK(msg->findPointer("audioSource", &ptr));
724 
725         mPacketSources.add(
726                 LiveSession::STREAMTYPE_AUDIO,
727                 static_cast<AnotherPacketSource *>(ptr));
728     }
729 
730     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
731         void *ptr;
732         CHECK(msg->findPointer("videoSource", &ptr));
733 
734         mPacketSources.add(
735                 LiveSession::STREAMTYPE_VIDEO,
736                 static_cast<AnotherPacketSource *>(ptr));
737     }
738 
739     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
740         void *ptr;
741         CHECK(msg->findPointer("subtitleSource", &ptr));
742 
743         mPacketSources.add(
744                 LiveSession::STREAMTYPE_SUBTITLES,
745                 static_cast<AnotherPacketSource *>(ptr));
746     }
747 
748     void *ptr;
749     // metadataSource is not part of streamTypeMask
750     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
751             && msg->findPointer("metadataSource", &ptr)) {
752         mPacketSources.add(
753                 LiveSession::STREAMTYPE_METADATA,
754                 static_cast<AnotherPacketSource *>(ptr));
755     }
756 
757     mStreamTypeMask = streamTypeMask;
758 
759     mSegmentStartTimeUs = segmentStartTimeUs;
760 
761     if (startDiscontinuitySeq >= 0) {
762         mDiscontinuitySeq = startDiscontinuitySeq;
763     }
764 
765     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
766     mSeekMode = (LiveSession::SeekMode) seekMode;
767 
768     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
769         mStartup = true;
770         mIDRFound = false;
771         mVideoBuffer->clear();
772     }
773 
774     if (startTimeUs >= 0) {
775         mStartTimeUs = startTimeUs;
776         mFirstPTSValid = false;
777         mSeqNumber = -1;
778         mTimeChangeSignaled = false;
779         mDownloadState->resetState();
780     }
781 
782     postMonitorQueue();
783 
784     return OK;
785 }
786 
onPause()787 void PlaylistFetcher::onPause() {
788     cancelMonitorQueue();
789     mLastDiscontinuitySeq = mDiscontinuitySeq;
790 
791     resetStoppingThreshold(false /* disconnect */);
792 }
793 
onStop(const sp<AMessage> & msg)794 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
795     cancelMonitorQueue();
796 
797     int32_t clear;
798     CHECK(msg->findInt32("clear", &clear));
799     if (clear) {
800         for (size_t i = 0; i < mPacketSources.size(); i++) {
801             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
802             packetSource->clear();
803         }
804     }
805 
806     mDownloadState->resetState();
807     mPacketSources.clear();
808     mStreamTypeMask = 0;
809 
810     resetStoppingThreshold(true /* disconnect */);
811 }
812 
813 // Resume until we have reached the boundary timestamps listed in `msg`; when
814 // the remaining time is too short (within a resume threshold) stop immediately
815 // instead.
onResumeUntil(const sp<AMessage> & msg)816 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
817     sp<AMessage> params;
818     CHECK(msg->findMessage("params", &params));
819 
820     mStopParams = params;
821     onDownloadNext();
822 
823     return OK;
824 }
825 
notifyStopReached()826 void PlaylistFetcher::notifyStopReached() {
827     sp<AMessage> notify = mNotify->dup();
828     notify->setInt32("what", kWhatStopReached);
829     notify->post();
830 }
831 
notifyError(status_t err)832 void PlaylistFetcher::notifyError(status_t err) {
833     sp<AMessage> notify = mNotify->dup();
834     notify->setInt32("what", kWhatError);
835     notify->setInt32("err", err);
836     notify->post();
837 }
838 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)839 void PlaylistFetcher::queueDiscontinuity(
840         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
841     for (size_t i = 0; i < mPacketSources.size(); ++i) {
842         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
843         // (seek will discard buffer by abandoning old fetchers)
844         mPacketSources.valueAt(i)->queueDiscontinuity(
845                 type, extra, false /* discard */);
846     }
847 }
848 
onMonitorQueue()849 void PlaylistFetcher::onMonitorQueue() {
850     // in the middle of an unfinished download, delay
851     // playlist refresh as it'll change seq numbers
852     if (!mDownloadState->hasSavedState()) {
853         status_t err = refreshPlaylist();
854         if (err != OK) {
855             if (mNumRetriesForMonitorQueue < kMaxNumRetries) {
856                 ++mNumRetriesForMonitorQueue;
857             } else {
858                 notifyError(err);
859             }
860             return;
861         } else {
862             mNumRetriesForMonitorQueue = 0;
863         }
864     }
865 
866     int64_t targetDurationUs = kMinBufferedDurationUs;
867     if (mPlaylist != NULL) {
868         targetDurationUs = mPlaylist->getTargetDuration();
869     }
870 
871     int64_t bufferedDurationUs = 0LL;
872     status_t finalResult = OK;
873     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
874         sp<AnotherPacketSource> packetSource =
875             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
876 
877         bufferedDurationUs =
878                 packetSource->getBufferedDurationUs(&finalResult);
879     } else {
880         // Use min stream duration, but ignore streams that never have any packet
881         // enqueued to prevent us from waiting on a non-existent stream;
882         // when we cannot make out from the manifest what streams are included in
883         // a playlist we might assume extra streams.
884         bufferedDurationUs = -1LL;
885         for (size_t i = 0; i < mPacketSources.size(); ++i) {
886             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
887                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
888                 continue;
889             }
890 
891             int64_t bufferedStreamDurationUs =
892                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
893 
894             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
895 
896             if (bufferedDurationUs == -1LL
897                  || bufferedStreamDurationUs < bufferedDurationUs) {
898                 bufferedDurationUs = bufferedStreamDurationUs;
899             }
900         }
901         if (bufferedDurationUs == -1LL) {
902             bufferedDurationUs = 0LL;
903         }
904     }
905 
906     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
907         FLOGV("monitoring, buffered=%lld < %lld",
908                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
909 
910         // delay the next download slightly; hopefully this gives other concurrent fetchers
911         // a better chance to run.
912         // onDownloadNext();
913         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
914         msg->setInt32("generation", mMonitorQueueGeneration);
915         msg->post(1000L);
916     } else {
917         // We'd like to maintain buffering above durationToBufferUs, so try
918         // again when buffer just about to go below durationToBufferUs
919         // (or after targetDurationUs / 2, whichever is smaller).
920         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000LL;
921         if (delayUs > targetDurationUs / 2) {
922             delayUs = targetDurationUs / 2;
923         }
924 
925         FLOGV("pausing for %lld, buffered=%lld > %lld",
926                 (long long)delayUs,
927                 (long long)bufferedDurationUs,
928                 (long long)kMinBufferedDurationUs);
929 
930         postMonitorQueue(delayUs);
931     }
932 }
933 
refreshPlaylist()934 status_t PlaylistFetcher::refreshPlaylist() {
935     if (delayUsToRefreshPlaylist() <= 0) {
936         bool unchanged;
937         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
938                 mURI.c_str(), mPlaylistHash, &unchanged);
939 
940         if (playlist == NULL) {
941             if (unchanged) {
942                 // We succeeded in fetching the playlist, but it was
943                 // unchanged from the last time we tried.
944 
945                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
946                     mRefreshState = (RefreshState)(mRefreshState + 1);
947                 }
948             } else {
949                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
950                 return ERROR_IO;
951             }
952         } else {
953             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
954             mPlaylist = playlist;
955 
956             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
957                 updateDuration();
958             }
959             // Notify LiveSession to use target-duration based buffering level
960             // for up/down switch. Default LiveSession::kUpSwitchMark may not
961             // be reachable for live streams, as our max buffering amount is
962             // limited to 3 segments.
963             if (!mPlaylist->isComplete()) {
964                 updateTargetDuration();
965             }
966             mPlaylistTimeUs = ALooper::GetNowUs();
967         }
968 
969         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
970     }
971     return OK;
972 }
973 
974 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)975 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
976     return buffer->size() > 0 && buffer->data()[0] == 0x47;
977 }
978 
shouldPauseDownload()979 bool PlaylistFetcher::shouldPauseDownload() {
980     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
981         // doesn't apply to subtitles
982         return false;
983     }
984 
985     // Calculate threshold to abort current download
986     float thresholdRatio = getStoppingThreshold();
987 
988     if (thresholdRatio < 0.0f) {
989         // never abort
990         return false;
991     } else if (thresholdRatio == 0.0f) {
992         // immediately abort
993         return true;
994     }
995 
996     // now we have a positive thresholdUs, abort if remaining
997     // portion to download is over that threshold.
998     if (mSegmentFirstPTS < 0) {
999         // this means we haven't even find the first access unit,
1000         // abort now as we must be very far away from the end.
1001         return true;
1002     }
1003     int64_t lastEnqueueUs = mSegmentFirstPTS;
1004     for (size_t i = 0; i < mPacketSources.size(); ++i) {
1005         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
1006             continue;
1007         }
1008         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1009         int32_t type;
1010         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
1011             continue;
1012         }
1013         int64_t tmpUs;
1014         CHECK(meta->findInt64("timeUs", &tmpUs));
1015         if (tmpUs > lastEnqueueUs) {
1016             lastEnqueueUs = tmpUs;
1017         }
1018     }
1019     lastEnqueueUs -= mSegmentFirstPTS;
1020 
1021     int64_t targetDurationUs = mPlaylist->getTargetDuration();
1022     int64_t thresholdUs = thresholdRatio * targetDurationUs;
1023 
1024     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1025             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1026             (long long)thresholdUs,
1027             (long long)(targetDurationUs - lastEnqueueUs));
1028 
1029     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1030         return true;
1031     }
1032     return false;
1033 }
1034 
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1035 void PlaylistFetcher::initSeqNumberForLiveStream(
1036         int32_t &firstSeqNumberInPlaylist,
1037         int32_t &lastSeqNumberInPlaylist) {
1038     // start at least 3 target durations from the end.
1039     int64_t timeFromEnd = 0;
1040     size_t index = mPlaylist->size();
1041     sp<AMessage> itemMeta;
1042     int64_t itemDurationUs;
1043     int32_t targetDuration;
1044     if (mPlaylist->meta() != NULL
1045             && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1046         do {
1047             --index;
1048             if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1049                     || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1050                 ALOGW("item or itemDurationUs missing");
1051                 mSeqNumber = lastSeqNumberInPlaylist - 3;
1052                 break;
1053             }
1054 
1055             timeFromEnd += itemDurationUs;
1056             mSeqNumber = firstSeqNumberInPlaylist + index;
1057         } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1058     } else {
1059         ALOGW("target-duration missing");
1060         mSeqNumber = lastSeqNumberInPlaylist - 3;
1061     }
1062 
1063     if (mSeqNumber < firstSeqNumberInPlaylist) {
1064         mSeqNumber = firstSeqNumberInPlaylist;
1065     }
1066 }
1067 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1068 bool PlaylistFetcher::initDownloadState(
1069         AString &uri,
1070         sp<AMessage> &itemMeta,
1071         int32_t &firstSeqNumberInPlaylist,
1072         int32_t &lastSeqNumberInPlaylist) {
1073     status_t err = refreshPlaylist();
1074     firstSeqNumberInPlaylist = 0;
1075     lastSeqNumberInPlaylist = 0;
1076     bool discontinuity = false;
1077 
1078     if (mPlaylist != NULL) {
1079         mPlaylist->getSeqNumberRange(
1080                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1081 
1082         if (mDiscontinuitySeq < 0) {
1083             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1084         }
1085     }
1086 
1087     mSegmentFirstPTS = -1LL;
1088 
1089     if (mPlaylist != NULL && mSeqNumber < 0) {
1090         CHECK_GE(mStartTimeUs, 0LL);
1091 
1092         if (mSegmentStartTimeUs < 0) {
1093             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1094                 // this is a live session
1095                 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1096             } else {
1097                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1098                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1099                 // and relative position inside a segment
1100                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1101                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1102             }
1103             mStartTimeUsRelative = true;
1104             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1105                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1106                     lastSeqNumberInPlaylist);
1107         } else {
1108             // When adapting or track switching, mSegmentStartTimeUs (relative
1109             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1110             // timestamps coming from the media container) is used to determine the position
1111             // inside a segments.
1112             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1113                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1114                 // avoid double fetch/decode
1115                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1116                 // for the starting segment in new variant.
1117                 // If the two variants' segments are aligned, this gives the
1118                 // next segment. If they're not aligned, this gives the segment
1119                 // that overlaps no more than 1/2 * targetDurationUs.
1120                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1121                         + mPlaylist->getTargetDuration() / 2);
1122             } else {
1123                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1124             }
1125             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1126             if (mSeqNumber < minSeq) {
1127                 mSeqNumber = minSeq;
1128             }
1129 
1130             if (mSeqNumber < firstSeqNumberInPlaylist) {
1131                 mSeqNumber = firstSeqNumberInPlaylist;
1132             }
1133 
1134             if (mSeqNumber > lastSeqNumberInPlaylist) {
1135                 mSeqNumber = lastSeqNumberInPlaylist;
1136             }
1137             FLOGV("Initial sequence number is %d from (%d .. %d)",
1138                     mSeqNumber, firstSeqNumberInPlaylist,
1139                     lastSeqNumberInPlaylist);
1140         }
1141     }
1142 
1143     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1144     if (mSeqNumber < firstSeqNumberInPlaylist
1145             || mSeqNumber > lastSeqNumberInPlaylist
1146             || err != OK) {
1147         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1148             ++mNumRetries;
1149 
1150             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1151                 // make sure we reach this retry logic on refresh failures
1152                 // by adding an err != OK clause to all enclosing if's.
1153 
1154                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1155                 // playlist's target duration or 3 seconds, whichever is less
1156                 int64_t delayUs = kMaxMonitorDelayUs;
1157                 if (mPlaylist != NULL) {
1158                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1159                             / (1 + mNumRetries);
1160                 }
1161                 if (delayUs > kMaxMonitorDelayUs) {
1162                     delayUs = kMaxMonitorDelayUs;
1163                 }
1164                 FLOGV("sequence number high: %d from (%d .. %d), "
1165                       "monitor in %lld (retry=%d)",
1166                         mSeqNumber, firstSeqNumberInPlaylist,
1167                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1168                 postMonitorQueue(delayUs);
1169                 return false;
1170             }
1171 
1172             if (err != OK) {
1173                 notifyError(err);
1174                 return false;
1175             }
1176 
1177             // we've missed the boat, let's start 3 segments prior to the latest sequence
1178             // number available and signal a discontinuity.
1179 
1180             ALOGI("We've missed the boat, restarting playback."
1181                   "  mStartup=%d, was  looking for %d in %d-%d",
1182                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1183                     lastSeqNumberInPlaylist);
1184             if (mStopParams != NULL) {
1185                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1186                 // but since the segments we are supposed to fetch have already rolled off
1187                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1188                 // skip.
1189                 notifyStopReached();
1190                 return false;
1191             }
1192             mSeqNumber = lastSeqNumberInPlaylist - 3;
1193             if (mSeqNumber < firstSeqNumberInPlaylist) {
1194                 mSeqNumber = firstSeqNumberInPlaylist;
1195             }
1196             discontinuity = true;
1197 
1198             // fall through
1199         } else {
1200             if (mPlaylist != NULL) {
1201                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1202                         && !mPlaylist->isComplete()) {
1203                     // Live playlists
1204                     ALOGW("sequence number %d not yet available", mSeqNumber);
1205                     postMonitorQueue(delayUsToRefreshPlaylist());
1206                     return false;
1207                 }
1208                 ALOGE("Cannot find sequence number %d in playlist "
1209                      "(contains %d - %d)",
1210                      mSeqNumber, firstSeqNumberInPlaylist,
1211                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1212 
1213                 if (mTSParser != NULL) {
1214                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1215                     // Use an empty buffer; we don't have any new data, just want to extract
1216                     // potential new access units after flush.  Reset mSeqNumber to
1217                     // lastSeqNumberInPlaylist such that we set the correct access unit
1218                     // properties in extractAndQueueAccessUnitsFromTs.
1219                     sp<ABuffer> buffer = new ABuffer(0);
1220                     mSeqNumber = lastSeqNumberInPlaylist;
1221                     extractAndQueueAccessUnitsFromTs(buffer);
1222                 }
1223                 notifyError(ERROR_END_OF_STREAM);
1224             } else {
1225                 // It's possible that we were never able to download the playlist.
1226                 // In this case we should notify error, instead of EOS, as EOS during
1227                 // prepare means we succeeded in downloading everything.
1228                 ALOGE("Failed to download playlist!");
1229                 notifyError(ERROR_IO);
1230             }
1231 
1232             return false;
1233         }
1234     }
1235 
1236     mNumRetries = 0;
1237 
1238     CHECK(mPlaylist->itemAt(
1239                 mSeqNumber - firstSeqNumberInPlaylist,
1240                 &uri,
1241                 &itemMeta));
1242 
1243     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1244 
1245     int32_t val;
1246     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1247         discontinuity = true;
1248     } else if (mLastDiscontinuitySeq >= 0
1249             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1250         // Seek jumped to a new discontinuity sequence. We need to signal
1251         // a format change to decoder. Decoder needs to shutdown and be
1252         // created again if seamless format change is unsupported.
1253         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1254                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1255                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1256         discontinuity = true;
1257     }
1258     mLastDiscontinuitySeq = -1;
1259 
1260     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1261     // this avoids interleaved connections to the key and segment file.
1262     {
1263         sp<ABuffer> junk = new ABuffer(16);
1264         junk->setRange(0, 16);
1265         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1266                 true /* first */);
1267         if (err == ERROR_NOT_CONNECTED) {
1268             return false;
1269         } else if (err != OK) {
1270             notifyError(err);
1271             return false;
1272         }
1273     }
1274 
1275     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1276         // We need to signal a time discontinuity to ATSParser on the
1277         // first segment after start, or on a discontinuity segment.
1278         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1279         // to send the time discontinuity.
1280         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1281             // If this was a live event this made no sense since
1282             // we don't have access to all the segment before the current
1283             // one.
1284             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1285         }
1286 
1287         // Setting mTimeChangeSignaled to true, so that if start time
1288         // searching goes into 2nd segment (without a discontinuity),
1289         // we don't reset time again. It causes corruption when pending
1290         // data in ATSParser is cleared.
1291         mTimeChangeSignaled = true;
1292     }
1293 
1294     if (discontinuity) {
1295         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1296 
1297         // Signal a format discontinuity to ATSParser to clear partial data
1298         // from previous streams. Not doing this causes bitstream corruption.
1299         if (mTSParser != NULL) {
1300             mTSParser.clear();
1301         }
1302 
1303         queueDiscontinuity(
1304                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1305                 NULL /* extra */);
1306 
1307         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1308             // This means we guessed mStartTimeUs to be in the previous
1309             // segment (likely very close to the end), but either video or
1310             // audio has not found start by the end of that segment.
1311             //
1312             // If this new segment is not a discontinuity, keep searching.
1313             //
1314             // If this new segment even got a discontinuity marker, just
1315             // set mStartTimeUs=0, and take all samples from now on.
1316             mStartTimeUs = 0;
1317             mFirstPTSValid = false;
1318             mIDRFound = false;
1319             mVideoBuffer->clear();
1320         }
1321     }
1322 
1323     FLOGV("fetching segment %d from (%d .. %d)",
1324             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1325     return true;
1326 }
1327 
onDownloadNext()1328 void PlaylistFetcher::onDownloadNext() {
1329     AString uri;
1330     sp<AMessage> itemMeta;
1331     sp<ABuffer> buffer;
1332     sp<ABuffer> tsBuffer;
1333     int32_t firstSeqNumberInPlaylist = 0;
1334     int32_t lastSeqNumberInPlaylist = 0;
1335     bool connectHTTP = true;
1336 
1337     if (mDownloadState->hasSavedState()) {
1338         mDownloadState->restoreState(
1339                 uri,
1340                 itemMeta,
1341                 buffer,
1342                 tsBuffer,
1343                 firstSeqNumberInPlaylist,
1344                 lastSeqNumberInPlaylist);
1345         connectHTTP = false;
1346         FLOGV("resuming: '%s'", uri.c_str());
1347     } else {
1348         if (!initDownloadState(
1349                 uri,
1350                 itemMeta,
1351                 firstSeqNumberInPlaylist,
1352                 lastSeqNumberInPlaylist)) {
1353             return;
1354         }
1355         FLOGV("fetching: '%s'", uri.c_str());
1356     }
1357 
1358     int64_t range_offset, range_length;
1359     if (!itemMeta->findInt64("range-offset", &range_offset)
1360             || !itemMeta->findInt64("range-length", &range_length)) {
1361         range_offset = 0;
1362         range_length = -1;
1363     }
1364 
1365     // block-wise download
1366     bool shouldPause = false;
1367     ssize_t bytesRead;
1368     do {
1369         int64_t startUs = ALooper::GetNowUs();
1370         bytesRead = mHTTPDownloader->fetchBlock(
1371                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1372                 NULL /* actualURL */, connectHTTP);
1373         int64_t delayUs = ALooper::GetNowUs() - startUs;
1374 
1375         if (bytesRead == ERROR_NOT_CONNECTED) {
1376             return;
1377         }
1378         if (bytesRead < 0) {
1379             status_t err = bytesRead;
1380             ALOGE("failed to fetch .ts segment at url '%s'", uriDebugString(uri).c_str());
1381             notifyError(err);
1382             return;
1383         }
1384 
1385         // add sample for bandwidth estimation, excluding samples from subtitles (as
1386         // its too small), or during startup/resumeUntil (when we could have more than
1387         // one connection open which affects bandwidth)
1388         if (!mStartup && mStopParams == NULL && bytesRead > 0
1389                 && (mStreamTypeMask
1390                         & (LiveSession::STREAMTYPE_AUDIO
1391                         | LiveSession::STREAMTYPE_VIDEO))) {
1392             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1393             if (delayUs > 2000000LL) {
1394                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1395                         bytesRead, (double)delayUs / 1.0e6);
1396             }
1397         }
1398 
1399         connectHTTP = false;
1400 
1401         CHECK(buffer != NULL);
1402 
1403         size_t size = buffer->size();
1404         // Set decryption range.
1405         buffer->setRange(size - bytesRead, bytesRead);
1406         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1407                 buffer->offset() == 0 /* first */);
1408         // Unset decryption range.
1409         buffer->setRange(0, size);
1410 
1411         if (err != OK) {
1412             ALOGE("decryptBuffer failed w/ error %d", err);
1413 
1414             notifyError(err);
1415             return;
1416         }
1417 
1418         bool startUp = mStartup; // save current start up state
1419 
1420         err = OK;
1421         if (bufferStartsWithTsSyncByte(buffer)) {
1422             // Incremental extraction is only supported for MPEG2 transport streams.
1423             if (tsBuffer == NULL) {
1424                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1425                 tsBuffer->setRange(0, 0);
1426             } else if (tsBuffer->capacity() != buffer->capacity()) {
1427                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1428                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1429                 tsBuffer->setRange(tsOff, tsSize);
1430             }
1431             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1432             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1433         }
1434 
1435         if (err == -EAGAIN) {
1436             // starting sequence number too low/high
1437             mTSParser.clear();
1438             for (size_t i = 0; i < mPacketSources.size(); i++) {
1439                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1440                 packetSource->clear();
1441             }
1442             postMonitorQueue();
1443             return;
1444         } else if (err == ERROR_OUT_OF_RANGE) {
1445             // reached stopping point
1446             notifyStopReached();
1447             return;
1448         } else if (err != OK) {
1449             notifyError(err);
1450             return;
1451         }
1452         // If we're switching, post start notification
1453         // this should only be posted when the last chunk is full processed by TSParser
1454         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1455             CHECK(mStartTimeUsNotify != NULL);
1456             mStartTimeUsNotify->post();
1457             mStartTimeUsNotify.clear();
1458             shouldPause = true;
1459         }
1460         if (shouldPause || shouldPauseDownload()) {
1461             // save state and return if this is not the last chunk,
1462             // leaving the fetcher in paused state.
1463             if (bytesRead != 0) {
1464                 mDownloadState->saveState(
1465                         uri,
1466                         itemMeta,
1467                         buffer,
1468                         tsBuffer,
1469                         firstSeqNumberInPlaylist,
1470                         lastSeqNumberInPlaylist);
1471                 return;
1472             }
1473             shouldPause = true;
1474         }
1475     } while (bytesRead != 0);
1476 
1477     if (bufferStartsWithTsSyncByte(buffer)) {
1478         // If we don't see a stream in the program table after fetching a full ts segment
1479         // mark it as nonexistent.
1480         ATSParser::SourceType srcTypes[] =
1481                 { ATSParser::VIDEO, ATSParser::AUDIO };
1482         LiveSession::StreamType streamTypes[] =
1483                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1484         const size_t kNumTypes = NELEM(srcTypes);
1485 
1486         for (size_t i = 0; i < kNumTypes; i++) {
1487             ATSParser::SourceType srcType = srcTypes[i];
1488             LiveSession::StreamType streamType = streamTypes[i];
1489 
1490             sp<AnotherPacketSource> source =
1491                 static_cast<AnotherPacketSource *>(
1492                     mTSParser->getSource(srcType).get());
1493 
1494             if (!mTSParser->hasSource(srcType)) {
1495                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1496                       srcType == ATSParser::VIDEO ? "video" : "audio");
1497 
1498                 mStreamTypeMask &= ~streamType;
1499                 mPacketSources.removeItem(streamType);
1500             }
1501         }
1502 
1503     }
1504 
1505     if (checkDecryptPadding(buffer) != OK) {
1506         ALOGE("Incorrect padding bytes after decryption.");
1507         notifyError(ERROR_MALFORMED);
1508         return;
1509     }
1510 
1511     if (tsBuffer != NULL) {
1512         AString method;
1513         CHECK(buffer->meta()->findString("cipher-method", &method));
1514         if ((tsBuffer->size() > 0 && method == "NONE")
1515                 || tsBuffer->size() > 16) {
1516             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1517                     "bytes in length.");
1518             notifyError(ERROR_MALFORMED);
1519             return;
1520         }
1521     }
1522 
1523     // bulk extract non-ts files
1524     bool startUp = mStartup;
1525     if (tsBuffer == NULL) {
1526         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1527         if (err == -EAGAIN) {
1528             // starting sequence number too low/high
1529             postMonitorQueue();
1530             return;
1531         } else if (err == ERROR_OUT_OF_RANGE) {
1532             // reached stopping point
1533             notifyStopReached();
1534             return;
1535         } else if (err != OK) {
1536             notifyError(err);
1537             return;
1538         }
1539     }
1540 
1541     ++mSeqNumber;
1542 
1543     // if adapting, pause after found the next starting point
1544     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1545         CHECK(mStartTimeUsNotify != NULL);
1546         mStartTimeUsNotify->post();
1547         mStartTimeUsNotify.clear();
1548         shouldPause = true;
1549     }
1550 
1551     if (!shouldPause) {
1552         postMonitorQueue();
1553     }
1554 }
1555 
1556 /*
1557  * returns true if we need to adjust mSeqNumber
1558  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1559 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1560     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1561 
1562     int64_t minDiffUs, maxDiffUs;
1563     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1564         // if the previous fetcher paused in the middle of a segment, we
1565         // want to start at a segment that overlaps the last sample
1566         minDiffUs = -mPlaylist->getTargetDuration();
1567         maxDiffUs = 0LL;
1568     } else {
1569         // if the previous fetcher paused at the end of a segment, ideally
1570         // we want to start at the segment that's roughly aligned with its
1571         // next segment, but if the two variants are not well aligned we
1572         // adjust the diff to within (-T/2, T/2)
1573         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1574         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1575     }
1576 
1577     int32_t oldSeqNumber = mSeqNumber;
1578     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1579 
1580     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1581     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1582     if (diffUs > maxDiffUs) {
1583         while (index > 0 && diffUs > maxDiffUs) {
1584             --index;
1585 
1586             sp<AMessage> itemMeta;
1587             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1588 
1589             int64_t itemDurationUs;
1590             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1591 
1592             diffUs -= itemDurationUs;
1593         }
1594     } else if (diffUs < minDiffUs) {
1595         while (index + 1 < (ssize_t) mPlaylist->size()
1596                 && diffUs < minDiffUs) {
1597             ++index;
1598 
1599             sp<AMessage> itemMeta;
1600             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1601 
1602             int64_t itemDurationUs;
1603             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1604 
1605             diffUs += itemDurationUs;
1606         }
1607     }
1608 
1609     mSeqNumber = firstSeqNumberInPlaylist + index;
1610 
1611     if (mSeqNumber != oldSeqNumber) {
1612         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1613                 (long long) anchorTimeUs - mStartTimeUs,
1614                 (long long) minDiffUs,
1615                 (long long) maxDiffUs);
1616         return true;
1617     }
1618     return false;
1619 }
1620 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1621 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1622     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1623 
1624     size_t index = 0;
1625     while (index < mPlaylist->size()) {
1626         sp<AMessage> itemMeta;
1627         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1628         size_t curDiscontinuitySeq;
1629         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1630         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1631         if (curDiscontinuitySeq == discontinuitySeq) {
1632             return seqNumber;
1633         } else if (curDiscontinuitySeq > discontinuitySeq) {
1634             return seqNumber <= 0 ? 0 : seqNumber - 1;
1635         }
1636 
1637         ++index;
1638     }
1639 
1640     return firstSeqNumberInPlaylist + mPlaylist->size();
1641 }
1642 
getSeqNumberForTime(int64_t timeUs) const1643 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1644     size_t index = 0;
1645     int64_t segmentStartUs = 0;
1646     while (index < mPlaylist->size()) {
1647         sp<AMessage> itemMeta;
1648         CHECK(mPlaylist->itemAt(
1649                     index, NULL /* uri */, &itemMeta));
1650 
1651         int64_t itemDurationUs;
1652         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1653 
1654         if (timeUs < segmentStartUs + itemDurationUs) {
1655             break;
1656         }
1657 
1658         segmentStartUs += itemDurationUs;
1659         ++index;
1660     }
1661 
1662     if (index >= mPlaylist->size()) {
1663         index = mPlaylist->size() - 1;
1664     }
1665 
1666     return mPlaylist->getFirstSeqNumber() + index;
1667 }
1668 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1669 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1670         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1671     sp<MetaData> format = source->getFormat();
1672     if (format != NULL) {
1673         // for simplicity, store a reference to the format in each unit
1674         accessUnit->meta()->setObject("format", format);
1675     }
1676 
1677     if (discard) {
1678         accessUnit->meta()->setInt32("discard", discard);
1679     }
1680 
1681     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1682     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1683     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1684     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1685     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1686         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1687     }
1688     return accessUnit;
1689 }
1690 
isStartTimeReached(int64_t timeUs)1691 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1692     if (!mFirstPTSValid) {
1693         mFirstTimeUs = timeUs;
1694         mFirstPTSValid = true;
1695     }
1696     bool startTimeReached = true;
1697     if (mStartTimeUsRelative) {
1698         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1699                 (long long)timeUs,
1700                 (long long)mFirstTimeUs,
1701                 (long long)(timeUs - mFirstTimeUs));
1702         timeUs -= mFirstTimeUs;
1703         if (timeUs < 0) {
1704             FLOGV("clamp negative timeUs to 0");
1705             timeUs = 0;
1706         }
1707         startTimeReached = (timeUs >= mStartTimeUs);
1708     }
1709     return startTimeReached;
1710 }
1711 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1712 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1713     if (mTSParser == NULL) {
1714         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1715         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1716     }
1717 
1718     if (mNextPTSTimeUs >= 0LL) {
1719         sp<AMessage> extra = new AMessage;
1720         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1721         // ATSParser from skewing the timestamps of access units.
1722         extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1723 
1724         // When adapting, signal a recent media time to the parser,
1725         // so that PTS wrap around is handled for the new variant.
1726         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1727             extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1728         }
1729 
1730         mTSParser->signalDiscontinuity(
1731                 ATSParser::DISCONTINUITY_TIME, extra);
1732 
1733         mNextPTSTimeUs = -1LL;
1734     }
1735 
1736     if (mSampleAesKeyItemChanged) {
1737         mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1738         mSampleAesKeyItemChanged = false;
1739     }
1740 
1741     size_t offset = 0;
1742     while (offset + 188 <= buffer->size()) {
1743         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1744 
1745         if (err != OK) {
1746             return err;
1747         }
1748 
1749         offset += 188;
1750     }
1751     // setRange to indicate consumed bytes.
1752     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1753 
1754     if (mSegmentFirstPTS < 0LL) {
1755         // get the smallest first PTS from all streams present in this parser
1756         for (size_t i = mPacketSources.size(); i > 0;) {
1757             i--;
1758             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1759             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1760                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1761                 return ERROR_MALFORMED;
1762             }
1763             if (stream == LiveSession::STREAMTYPE_METADATA) {
1764                 continue;
1765             }
1766             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1767             sp<AnotherPacketSource> source =
1768                 static_cast<AnotherPacketSource *>(
1769                         mTSParser->getSource(type).get());
1770 
1771             if (source == NULL) {
1772                 continue;
1773             }
1774             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1775             if (meta != NULL) {
1776                 int64_t timeUs;
1777                 CHECK(meta->findInt64("timeUs", &timeUs));
1778                 if (mSegmentFirstPTS < 0LL || timeUs < mSegmentFirstPTS) {
1779                     mSegmentFirstPTS = timeUs;
1780                 }
1781             }
1782         }
1783         if (mSegmentFirstPTS < 0LL) {
1784             // didn't find any TS packet, can return early
1785             return OK;
1786         }
1787         if (!mStartTimeUsRelative) {
1788             // mStartup
1789             //   mStartup is true until we have queued a packet for all the streams
1790             //   we are fetching. We queue packets whose timestamps are greater than
1791             //   mStartTimeUs.
1792             // mSegmentStartTimeUs >= 0
1793             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1794             // adjustSeqNumberWithAnchorTime(timeUs) == true
1795             //   we guessed a seq number that's either too large or too small.
1796             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1797             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1798             // to -1 so that we don't enter this chunk next time.
1799             if (mStartup && mSegmentStartTimeUs >= 0
1800                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1801                 mStartTimeUsNotify = mNotify->dup();
1802                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1803                 mStartTimeUsNotify->setString("uri", mURI);
1804                 mIDRFound = false;
1805                 mSegmentStartTimeUs = -1;
1806                 return -EAGAIN;
1807             }
1808         }
1809     }
1810 
1811     status_t err = OK;
1812     for (size_t i = mPacketSources.size(); i > 0;) {
1813         i--;
1814         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1815 
1816         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1817         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1818             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1819             return ERROR_MALFORMED;
1820         }
1821 
1822         const char *key = LiveSession::getKeyForStream(stream);
1823         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1824 
1825         sp<AnotherPacketSource> source =
1826             static_cast<AnotherPacketSource *>(
1827                     mTSParser->getSource(type).get());
1828 
1829         if (source == NULL) {
1830             continue;
1831         }
1832 
1833         const char *mime;
1834         sp<MetaData> format  = source->getFormat();
1835         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1836                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1837 
1838         sp<ABuffer> accessUnit;
1839         status_t finalResult;
1840         while (source->hasBufferAvailable(&finalResult)
1841                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1842 
1843             int64_t timeUs;
1844             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1845 
1846             if (mStartup) {
1847                 bool startTimeReached = isStartTimeReached(timeUs);
1848 
1849                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1850                     // buffer up to the closest preceding IDR frame in the next segement,
1851                     // or the closest succeeding IDR frame after the exact position
1852                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1853                             (long long)timeUs,
1854                             (long long)mStartTimeUs,
1855                             (long long)timeUs - mStartTimeUs,
1856                             mIDRFound);
1857                     if (isAvc) {
1858                         if (IsIDR(accessUnit->data(), accessUnit->size())) {
1859                             mVideoBuffer->clear();
1860                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1861                             mIDRFound = true;
1862                         }
1863                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1864                             mVideoBuffer->queueAccessUnit(accessUnit);
1865                             FSLOGV(stream, "saving AVC video AccessUnit");
1866                         }
1867                     }
1868                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1869                         continue;
1870                     }
1871                 }
1872             }
1873 
1874             if (mStartTimeUsNotify != NULL) {
1875                 uint32_t streamMask = 0;
1876                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1877                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1878                         && !(streamMask & mPacketSources.keyAt(i))) {
1879                     streamMask |= mPacketSources.keyAt(i);
1880                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1881                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1882                             (long long)timeUs, streamMask);
1883 
1884                     if (streamMask == mStreamTypeMask) {
1885                         FLOGV("found start point for all streams");
1886                         mStartup = false;
1887                     }
1888                 }
1889             }
1890 
1891             if (mStopParams != NULL) {
1892                 int32_t discontinuitySeq;
1893                 int64_t stopTimeUs;
1894                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1895                         || discontinuitySeq > mDiscontinuitySeq
1896                         || !mStopParams->findInt64(key, &stopTimeUs)
1897                         || (discontinuitySeq == mDiscontinuitySeq
1898                                 && timeUs >= stopTimeUs)) {
1899                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1900                     mStreamTypeMask &= ~stream;
1901                     mPacketSources.removeItemsAt(i);
1902                     break;
1903                 }
1904             }
1905 
1906             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1907                 const bool discard = true;
1908                 status_t status;
1909                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1910                     sp<ABuffer> videoBuffer;
1911                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1912                     setAccessUnitProperties(videoBuffer, source, discard);
1913                     packetSource->queueAccessUnit(videoBuffer);
1914                     int64_t bufferTimeUs;
1915                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1916                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1917                             (long long)bufferTimeUs);
1918                 }
1919             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1920                 mHasMetadata = true;
1921                 sp<AMessage> notify = mNotify->dup();
1922                 notify->setInt32("what", kWhatMetadataDetected);
1923                 notify->post();
1924             }
1925 
1926             setAccessUnitProperties(accessUnit, source);
1927             packetSource->queueAccessUnit(accessUnit);
1928             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1929         }
1930 
1931         if (err != OK) {
1932             break;
1933         }
1934     }
1935 
1936     if (err != OK) {
1937         for (size_t i = mPacketSources.size(); i > 0;) {
1938             i--;
1939             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1940             packetSource->clear();
1941         }
1942         return err;
1943     }
1944 
1945     if (!mStreamTypeMask) {
1946         // Signal gap is filled between original and new stream.
1947         FLOGV("reached stop point for all streams");
1948         return ERROR_OUT_OF_RANGE;
1949     }
1950 
1951     return OK;
1952 }
1953 
1954 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1955 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1956         const sp<ABuffer> &buffer) {
1957     size_t pos = 0;
1958 
1959     // skip possible BOM
1960     if (buffer->size() >= pos + 3 &&
1961             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1962         pos += 3;
1963     }
1964 
1965     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1966     if (buffer->size() < pos + 6 ||
1967             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1968         return false;
1969     }
1970     pos += 6;
1971 
1972     if (buffer->size() == pos) {
1973         return true;
1974     }
1975 
1976     uint8_t sep = buffer->data()[pos];
1977     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1978 }
1979 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1980 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1981         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1982     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1983         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1984             ALOGE("This stream only contains subtitles.");
1985             return ERROR_MALFORMED;
1986         }
1987 
1988         const sp<AnotherPacketSource> packetSource =
1989             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1990 
1991         int64_t durationUs;
1992         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1993         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1994         buffer->meta()->setInt64("durationUs", durationUs);
1995         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1996         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1997         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1998         packetSource->queueAccessUnit(buffer);
1999         return OK;
2000     }
2001 
2002     if (mNextPTSTimeUs >= 0LL) {
2003         mNextPTSTimeUs = -1LL;
2004     }
2005 
2006     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
2007     // stream prefixed by an ID3 tag.
2008 
2009     bool firstID3Tag = true;
2010     uint64_t PTS = 0;
2011 
2012     for (;;) {
2013         // Make sure to skip all ID3 tags preceding the audio data.
2014         // At least one must be present to provide the PTS timestamp.
2015 
2016         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2017         if (!id3.isValid()) {
2018             if (firstID3Tag) {
2019                 ALOGE("Unable to parse ID3 tag.");
2020                 return ERROR_MALFORMED;
2021             } else {
2022                 break;
2023             }
2024         }
2025 
2026         if (firstID3Tag) {
2027             bool found = false;
2028 
2029             ID3::Iterator it(id3, "PRIV");
2030             while (!it.done()) {
2031                 size_t length;
2032                 const uint8_t *data = it.getData(&length);
2033                 if (!data) {
2034                     return ERROR_MALFORMED;
2035                 }
2036 
2037                 static const char *kMatchName =
2038                     "com.apple.streaming.transportStreamTimestamp";
2039                 static const size_t kMatchNameLen = strlen(kMatchName);
2040 
2041                 if (length == kMatchNameLen + 1 + 8
2042                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2043                     found = true;
2044                     PTS = U64_AT(&data[kMatchNameLen + 1]);
2045                 }
2046 
2047                 it.next();
2048             }
2049 
2050             if (!found) {
2051                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2052                 return ERROR_MALFORMED;
2053             }
2054         }
2055 
2056         // skip the ID3 tag
2057         buffer->setRange(
2058                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2059 
2060         firstID3Tag = false;
2061     }
2062 
2063     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2064         ALOGW("This stream only contains audio data!");
2065 
2066         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2067 
2068         if (mStreamTypeMask == 0) {
2069             return OK;
2070         }
2071     }
2072 
2073     sp<AnotherPacketSource> packetSource =
2074         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2075 
2076     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2077         ABitReader bits(buffer->data(), buffer->size());
2078 
2079         // adts_fixed_header
2080 
2081         CHECK_EQ(bits.getBits(12), 0xfffu);
2082         bits.skipBits(3);  // ID, layer
2083         bool protection_absent __unused = bits.getBits(1) != 0;
2084 
2085         unsigned profile = bits.getBits(2);
2086         CHECK_NE(profile, 3u);
2087         unsigned sampling_freq_index = bits.getBits(4);
2088         bits.getBits(1);  // private_bit
2089         unsigned channel_configuration = bits.getBits(3);
2090         CHECK_NE(channel_configuration, 0u);
2091         bits.skipBits(2);  // original_copy, home
2092 
2093         sp<MetaData> meta = new MetaData();
2094         MakeAACCodecSpecificData(*meta,
2095                 profile, sampling_freq_index, channel_configuration);
2096 
2097         meta->setInt32(kKeyIsADTS, true);
2098 
2099         packetSource->setFormat(meta);
2100     }
2101 
2102     int64_t numSamples = 0LL;
2103     int32_t sampleRate;
2104     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2105 
2106     int64_t timeUs = (PTS * 100LL) / 9LL;
2107     if (mStartup && !mFirstPTSValid) {
2108         mFirstPTSValid = true;
2109         mFirstTimeUs = timeUs;
2110     }
2111 
2112     if (mSegmentFirstPTS < 0LL) {
2113         mSegmentFirstPTS = timeUs;
2114         if (!mStartTimeUsRelative) {
2115             // Duplicated logic from how we handle .ts playlists.
2116             if (mStartup && mSegmentStartTimeUs >= 0
2117                     && adjustSeqNumberWithAnchorTime(timeUs)) {
2118                 mSegmentStartTimeUs = -1;
2119                 return -EAGAIN;
2120             }
2121         }
2122     }
2123 
2124     sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2125     if (mSampleAesKeyItem != NULL) {
2126         ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s  IV: %s",
2127                 mSeqNumber,
2128                 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2129                 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2130 
2131         sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2132     }
2133 
2134     int frameId = 0;
2135 
2136     size_t offset = 0;
2137     while (offset < buffer->size()) {
2138         const uint8_t *adtsHeader = buffer->data() + offset;
2139         if (buffer->size() <= offset+5) {
2140             ALOGV("buffer does not contain a complete header");
2141             return ERROR_MALFORMED;
2142         }
2143         // non-const pointer for decryption if needed
2144         uint8_t *adtsFrame = buffer->data() + offset;
2145 
2146         unsigned aac_frame_length =
2147             ((adtsHeader[3] & 3) << 11)
2148             | (adtsHeader[4] << 3)
2149             | (adtsHeader[5] >> 5);
2150 
2151         if (aac_frame_length == 0) {
2152             const uint8_t *id3Header = adtsHeader;
2153             if (!memcmp(id3Header, "ID3", 3)) {
2154                 ID3 id3(id3Header, buffer->size() - offset, true);
2155                 if (id3.isValid()) {
2156                     offset += id3.rawSize();
2157                     continue;
2158                 };
2159             }
2160             return ERROR_MALFORMED;
2161         }
2162 
2163         CHECK_LE(offset + aac_frame_length, buffer->size());
2164 
2165         int64_t unitTimeUs = timeUs + numSamples * 1000000LL / sampleRate;
2166         offset += aac_frame_length;
2167 
2168         // Each AAC frame encodes 1024 samples.
2169         numSamples += 1024;
2170 
2171         if (mStartup) {
2172             int64_t startTimeUs = unitTimeUs;
2173             if (mStartTimeUsRelative) {
2174                 startTimeUs -= mFirstTimeUs;
2175                 if (startTimeUs  < 0) {
2176                     startTimeUs = 0;
2177                 }
2178             }
2179             if (startTimeUs < mStartTimeUs) {
2180                 continue;
2181             }
2182 
2183             if (mStartTimeUsNotify != NULL) {
2184                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2185                 mStartup = false;
2186             }
2187         }
2188 
2189         if (mStopParams != NULL) {
2190             int32_t discontinuitySeq;
2191             int64_t stopTimeUs;
2192             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2193                     || discontinuitySeq > mDiscontinuitySeq
2194                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2195                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2196                 mStreamTypeMask = 0;
2197                 mPacketSources.clear();
2198                 return ERROR_OUT_OF_RANGE;
2199             }
2200         }
2201 
2202         if (sampleDecryptor != NULL) {
2203             bool protection_absent = (adtsHeader[1] & 0x1);
2204             size_t headerSize = protection_absent ? 7 : 9;
2205             if (frameId == 0) {
2206                 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2207                         mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2208             }
2209 
2210             sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2211         }
2212         frameId++;
2213 
2214         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2215         memcpy(unit->data(), adtsHeader, aac_frame_length);
2216 
2217         unit->meta()->setInt64("timeUs", unitTimeUs);
2218         setAccessUnitProperties(unit, packetSource);
2219         packetSource->queueAccessUnit(unit);
2220     }
2221 
2222     return OK;
2223 }
2224 
updateDuration()2225 void PlaylistFetcher::updateDuration() {
2226     int64_t durationUs = 0LL;
2227     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2228         sp<AMessage> itemMeta;
2229         CHECK(mPlaylist->itemAt(
2230                     index, NULL /* uri */, &itemMeta));
2231 
2232         int64_t itemDurationUs;
2233         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2234 
2235         durationUs += itemDurationUs;
2236     }
2237 
2238     sp<AMessage> msg = mNotify->dup();
2239     msg->setInt32("what", kWhatDurationUpdate);
2240     msg->setInt64("durationUs", durationUs);
2241     msg->post();
2242 }
2243 
updateTargetDuration()2244 void PlaylistFetcher::updateTargetDuration() {
2245     sp<AMessage> msg = mNotify->dup();
2246     msg->setInt32("what", kWhatTargetDurationUpdate);
2247     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2248     msg->post();
2249 }
2250 
2251 }  // namespace android
2252