1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21 
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/avc_utils.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 
30 #include <media/stagefright/foundation/ABitReader.h>
31 #include <media/stagefright/foundation/ABuffer.h>
32 #include <media/stagefright/foundation/ADebug.h>
33 #include <media/stagefright/MediaDefs.h>
34 #include <media/stagefright/MetaData.h>
35 #include <media/stagefright/Utils.h>
36 
37 #include <ctype.h>
38 #include <inttypes.h>
39 #include <openssl/aes.h>
40 
41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44 
45 namespace android {
46 
47 // static
48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50 // LCM of 188 (size of a TS packet) & 1k works well
51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52 
53 struct PlaylistFetcher::DownloadState : public RefBase {
54     DownloadState();
55     void resetState();
56     bool hasSavedState() const;
57     void restoreState(
58             AString &uri,
59             sp<AMessage> &itemMeta,
60             sp<ABuffer> &buffer,
61             sp<ABuffer> &tsBuffer,
62             int32_t &firstSeqNumberInPlaylist,
63             int32_t &lastSeqNumberInPlaylist);
64     void saveState(
65             AString &uri,
66             sp<AMessage> &itemMeta,
67             sp<ABuffer> &buffer,
68             sp<ABuffer> &tsBuffer,
69             int32_t &firstSeqNumberInPlaylist,
70             int32_t &lastSeqNumberInPlaylist);
71 
72 private:
73     bool mHasSavedState;
74     AString mUri;
75     sp<AMessage> mItemMeta;
76     sp<ABuffer> mBuffer;
77     sp<ABuffer> mTsBuffer;
78     int32_t mFirstSeqNumberInPlaylist;
79     int32_t mLastSeqNumberInPlaylist;
80 };
81 
DownloadState()82 PlaylistFetcher::DownloadState::DownloadState() {
83     resetState();
84 }
85 
hasSavedState() const86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
87     return mHasSavedState;
88 }
89 
resetState()90 void PlaylistFetcher::DownloadState::resetState() {
91     mHasSavedState = false;
92 
93     mUri.clear();
94     mItemMeta = NULL;
95     mBuffer = NULL;
96     mTsBuffer = NULL;
97     mFirstSeqNumberInPlaylist = 0;
98     mLastSeqNumberInPlaylist = 0;
99 }
100 
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)101 void PlaylistFetcher::DownloadState::restoreState(
102         AString &uri,
103         sp<AMessage> &itemMeta,
104         sp<ABuffer> &buffer,
105         sp<ABuffer> &tsBuffer,
106         int32_t &firstSeqNumberInPlaylist,
107         int32_t &lastSeqNumberInPlaylist) {
108     if (!mHasSavedState) {
109         return;
110     }
111 
112     uri = mUri;
113     itemMeta = mItemMeta;
114     buffer = mBuffer;
115     tsBuffer = mTsBuffer;
116     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118 
119     resetState();
120 }
121 
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)122 void PlaylistFetcher::DownloadState::saveState(
123         AString &uri,
124         sp<AMessage> &itemMeta,
125         sp<ABuffer> &buffer,
126         sp<ABuffer> &tsBuffer,
127         int32_t &firstSeqNumberInPlaylist,
128         int32_t &lastSeqNumberInPlaylist) {
129     mHasSavedState = true;
130 
131     mUri = uri;
132     mItemMeta = itemMeta;
133     mBuffer = buffer;
134     mTsBuffer = tsBuffer;
135     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137 }
138 
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)139 PlaylistFetcher::PlaylistFetcher(
140         const sp<AMessage> &notify,
141         const sp<LiveSession> &session,
142         const char *uri,
143         int32_t id,
144         int32_t subtitleGeneration)
145     : mNotify(notify),
146       mSession(session),
147       mURI(uri),
148       mFetcherID(id),
149       mStreamTypeMask(0),
150       mStartTimeUs(-1ll),
151       mSegmentStartTimeUs(-1ll),
152       mDiscontinuitySeq(-1ll),
153       mStartTimeUsRelative(false),
154       mLastPlaylistFetchTimeUs(-1ll),
155       mPlaylistTimeUs(-1ll),
156       mSeqNumber(-1),
157       mNumRetries(0),
158       mStartup(true),
159       mIDRFound(false),
160       mSeekMode(LiveSession::kSeekModeExactPosition),
161       mTimeChangeSignaled(false),
162       mNextPTSTimeUs(-1ll),
163       mMonitorQueueGeneration(0),
164       mSubtitleGeneration(subtitleGeneration),
165       mLastDiscontinuitySeq(-1ll),
166       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167       mFirstPTSValid(false),
168       mFirstTimeUs(-1ll),
169       mVideoBuffer(new AnotherPacketSource(NULL)),
170       mThresholdRatio(-1.0f),
171       mDownloadState(new DownloadState()),
172       mHasMetadata(false) {
173     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
174     mHTTPDownloader = mSession->getHTTPDownloader();
175 }
176 
~PlaylistFetcher()177 PlaylistFetcher::~PlaylistFetcher() {
178 }
179 
getFetcherID() const180 int32_t PlaylistFetcher::getFetcherID() const {
181     return mFetcherID;
182 }
183 
getSegmentStartTimeUs(int32_t seqNumber) const184 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
185     CHECK(mPlaylist != NULL);
186 
187     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
188     mPlaylist->getSeqNumberRange(
189             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
190 
191     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
192     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
193 
194     int64_t segmentStartUs = 0ll;
195     for (int32_t index = 0;
196             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
197         sp<AMessage> itemMeta;
198         CHECK(mPlaylist->itemAt(
199                     index, NULL /* uri */, &itemMeta));
200 
201         int64_t itemDurationUs;
202         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
203 
204         segmentStartUs += itemDurationUs;
205     }
206 
207     return segmentStartUs;
208 }
209 
getSegmentDurationUs(int32_t seqNumber) const210 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
211     CHECK(mPlaylist != NULL);
212 
213     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
214     mPlaylist->getSeqNumberRange(
215             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
216 
217     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
218     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
219 
220     int32_t index = seqNumber - firstSeqNumberInPlaylist;
221     sp<AMessage> itemMeta;
222     CHECK(mPlaylist->itemAt(
223                 index, NULL /* uri */, &itemMeta));
224 
225     int64_t itemDurationUs;
226     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
227 
228     return itemDurationUs;
229 }
230 
delayUsToRefreshPlaylist() const231 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
232     int64_t nowUs = ALooper::GetNowUs();
233 
234     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
235         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
236         return 0ll;
237     }
238 
239     if (mPlaylist->isComplete()) {
240         return (~0llu >> 1);
241     }
242 
243     int64_t targetDurationUs = mPlaylist->getTargetDuration();
244 
245     int64_t minPlaylistAgeUs;
246 
247     switch (mRefreshState) {
248         case INITIAL_MINIMUM_RELOAD_DELAY:
249         {
250             size_t n = mPlaylist->size();
251             if (n > 0) {
252                 sp<AMessage> itemMeta;
253                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
254 
255                 int64_t itemDurationUs;
256                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
257 
258                 minPlaylistAgeUs = itemDurationUs;
259                 break;
260             }
261 
262             // fall through
263         }
264 
265         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
266         {
267             minPlaylistAgeUs = targetDurationUs / 2;
268             break;
269         }
270 
271         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
272         {
273             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
274             break;
275         }
276 
277         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
278         {
279             minPlaylistAgeUs = targetDurationUs * 3;
280             break;
281         }
282 
283         default:
284             TRESPASS();
285             break;
286     }
287 
288     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
289     return delayUs > 0ll ? delayUs : 0ll;
290 }
291 
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)292 status_t PlaylistFetcher::decryptBuffer(
293         size_t playlistIndex, const sp<ABuffer> &buffer,
294         bool first) {
295     sp<AMessage> itemMeta;
296     bool found = false;
297     AString method;
298 
299     for (ssize_t i = playlistIndex; i >= 0; --i) {
300         AString uri;
301         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
302 
303         if (itemMeta->findString("cipher-method", &method)) {
304             found = true;
305             break;
306         }
307     }
308 
309     if (!found) {
310         method = "NONE";
311     }
312     buffer->meta()->setString("cipher-method", method.c_str());
313 
314     if (method == "NONE") {
315         return OK;
316     } else if (!(method == "AES-128")) {
317         ALOGE("Unsupported cipher method '%s'", method.c_str());
318         return ERROR_UNSUPPORTED;
319     }
320 
321     AString keyURI;
322     if (!itemMeta->findString("cipher-uri", &keyURI)) {
323         ALOGE("Missing key uri");
324         return ERROR_MALFORMED;
325     }
326 
327     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
328 
329     sp<ABuffer> key;
330     if (index >= 0) {
331         key = mAESKeyForURI.valueAt(index);
332     } else {
333         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
334 
335         if (err == ERROR_NOT_CONNECTED) {
336             return ERROR_NOT_CONNECTED;
337         } else if (err < 0) {
338             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
339             return ERROR_IO;
340         } else if (key->size() != 16) {
341             ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
342             return ERROR_MALFORMED;
343         }
344 
345         mAESKeyForURI.add(keyURI, key);
346     }
347 
348     AES_KEY aes_key;
349     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
350         ALOGE("failed to set AES decryption key.");
351         return UNKNOWN_ERROR;
352     }
353 
354     size_t n = buffer->size();
355     if (!n) {
356         return OK;
357     }
358     CHECK(n % 16 == 0);
359 
360     if (first) {
361         // If decrypting the first block in a file, read the iv from the manifest
362         // or derive the iv from the file's sequence number.
363 
364         AString iv;
365         if (itemMeta->findString("cipher-iv", &iv)) {
366             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
367                     || iv.size() != 16 * 2 + 2) {
368                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
369                 return ERROR_MALFORMED;
370             }
371 
372             memset(mAESInitVec, 0, sizeof(mAESInitVec));
373             for (size_t i = 0; i < 16; ++i) {
374                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
375                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
376                 if (!isxdigit(c1) || !isxdigit(c2)) {
377                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
378                     return ERROR_MALFORMED;
379                 }
380                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
381                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
382 
383                 mAESInitVec[i] = nibble1 << 4 | nibble2;
384             }
385         } else {
386             memset(mAESInitVec, 0, sizeof(mAESInitVec));
387             mAESInitVec[15] = mSeqNumber & 0xff;
388             mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
389             mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
390             mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
391         }
392     }
393 
394     AES_cbc_encrypt(
395             buffer->data(), buffer->data(), buffer->size(),
396             &aes_key, mAESInitVec, AES_DECRYPT);
397 
398     return OK;
399 }
400 
checkDecryptPadding(const sp<ABuffer> & buffer)401 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
402     AString method;
403     CHECK(buffer->meta()->findString("cipher-method", &method));
404     if (method == "NONE") {
405         return OK;
406     }
407 
408     uint8_t padding = 0;
409     if (buffer->size() > 0) {
410         padding = buffer->data()[buffer->size() - 1];
411     }
412 
413     if (padding > 16) {
414         return ERROR_MALFORMED;
415     }
416 
417     for (size_t i = buffer->size() - padding; i < padding; i++) {
418         if (buffer->data()[i] != padding) {
419             return ERROR_MALFORMED;
420         }
421     }
422 
423     buffer->setRange(buffer->offset(), buffer->size() - padding);
424     return OK;
425 }
426 
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)427 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
428     int64_t maxDelayUs = delayUsToRefreshPlaylist();
429     if (maxDelayUs < minDelayUs) {
430         maxDelayUs = minDelayUs;
431     }
432     if (delayUs > maxDelayUs) {
433         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
434         delayUs = maxDelayUs;
435     }
436     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
437     msg->setInt32("generation", mMonitorQueueGeneration);
438     msg->post(delayUs);
439 }
440 
cancelMonitorQueue()441 void PlaylistFetcher::cancelMonitorQueue() {
442     ++mMonitorQueueGeneration;
443 }
444 
setStoppingThreshold(float thresholdRatio,bool disconnect)445 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
446     {
447         AutoMutex _l(mThresholdLock);
448         mThresholdRatio = thresholdRatio;
449     }
450     if (disconnect) {
451         mHTTPDownloader->disconnect();
452     }
453 }
454 
resetStoppingThreshold(bool disconnect)455 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
456     {
457         AutoMutex _l(mThresholdLock);
458         mThresholdRatio = -1.0f;
459     }
460     if (disconnect) {
461         mHTTPDownloader->disconnect();
462     } else {
463         // allow reconnect
464         mHTTPDownloader->reconnect();
465     }
466 }
467 
getStoppingThreshold()468 float PlaylistFetcher::getStoppingThreshold() {
469     AutoMutex _l(mThresholdLock);
470     return mThresholdRatio;
471 }
472 
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)473 void PlaylistFetcher::startAsync(
474         const sp<AnotherPacketSource> &audioSource,
475         const sp<AnotherPacketSource> &videoSource,
476         const sp<AnotherPacketSource> &subtitleSource,
477         const sp<AnotherPacketSource> &metadataSource,
478         int64_t startTimeUs,
479         int64_t segmentStartTimeUs,
480         int32_t startDiscontinuitySeq,
481         LiveSession::SeekMode seekMode) {
482     sp<AMessage> msg = new AMessage(kWhatStart, this);
483 
484     uint32_t streamTypeMask = 0ul;
485 
486     if (audioSource != NULL) {
487         msg->setPointer("audioSource", audioSource.get());
488         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
489     }
490 
491     if (videoSource != NULL) {
492         msg->setPointer("videoSource", videoSource.get());
493         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
494     }
495 
496     if (subtitleSource != NULL) {
497         msg->setPointer("subtitleSource", subtitleSource.get());
498         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
499     }
500 
501     if (metadataSource != NULL) {
502         msg->setPointer("metadataSource", metadataSource.get());
503         // metadataSource does not affect streamTypeMask.
504     }
505 
506     msg->setInt32("streamTypeMask", streamTypeMask);
507     msg->setInt64("startTimeUs", startTimeUs);
508     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
509     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
510     msg->setInt32("seekMode", seekMode);
511     msg->post();
512 }
513 
514 /*
515  * pauseAsync
516  *
517  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
518  *           -1.0f - pause after finishing current segment
519  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
520  */
pauseAsync(float thresholdRatio,bool disconnect)521 void PlaylistFetcher::pauseAsync(
522         float thresholdRatio, bool disconnect) {
523     setStoppingThreshold(thresholdRatio, disconnect);
524 
525     (new AMessage(kWhatPause, this))->post();
526 }
527 
stopAsync(bool clear)528 void PlaylistFetcher::stopAsync(bool clear) {
529     setStoppingThreshold(0.0f, true /* disconncect */);
530 
531     sp<AMessage> msg = new AMessage(kWhatStop, this);
532     msg->setInt32("clear", clear);
533     msg->post();
534 }
535 
resumeUntilAsync(const sp<AMessage> & params)536 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
537     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
538 
539     AMessage* msg = new AMessage(kWhatResumeUntil, this);
540     msg->setMessage("params", params);
541     msg->post();
542 }
543 
fetchPlaylistAsync()544 void PlaylistFetcher::fetchPlaylistAsync() {
545     (new AMessage(kWhatFetchPlaylist, this))->post();
546 }
547 
onMessageReceived(const sp<AMessage> & msg)548 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
549     switch (msg->what()) {
550         case kWhatStart:
551         {
552             status_t err = onStart(msg);
553 
554             sp<AMessage> notify = mNotify->dup();
555             notify->setInt32("what", kWhatStarted);
556             notify->setInt32("err", err);
557             notify->post();
558             break;
559         }
560 
561         case kWhatPause:
562         {
563             onPause();
564 
565             sp<AMessage> notify = mNotify->dup();
566             notify->setInt32("what", kWhatPaused);
567             notify->setInt32("seekMode",
568                     mDownloadState->hasSavedState()
569                     ? LiveSession::kSeekModeNextSample
570                     : LiveSession::kSeekModeNextSegment);
571             notify->post();
572             break;
573         }
574 
575         case kWhatStop:
576         {
577             onStop(msg);
578 
579             sp<AMessage> notify = mNotify->dup();
580             notify->setInt32("what", kWhatStopped);
581             notify->post();
582             break;
583         }
584 
585         case kWhatFetchPlaylist:
586         {
587             bool unchanged;
588             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
589                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
590 
591             sp<AMessage> notify = mNotify->dup();
592             notify->setInt32("what", kWhatPlaylistFetched);
593             notify->setObject("playlist", playlist);
594             notify->post();
595             break;
596         }
597 
598         case kWhatMonitorQueue:
599         case kWhatDownloadNext:
600         {
601             int32_t generation;
602             CHECK(msg->findInt32("generation", &generation));
603 
604             if (generation != mMonitorQueueGeneration) {
605                 // Stale event
606                 break;
607             }
608 
609             if (msg->what() == kWhatMonitorQueue) {
610                 onMonitorQueue();
611             } else {
612                 onDownloadNext();
613             }
614             break;
615         }
616 
617         case kWhatResumeUntil:
618         {
619             onResumeUntil(msg);
620             break;
621         }
622 
623         default:
624             TRESPASS();
625     }
626 }
627 
onStart(const sp<AMessage> & msg)628 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
629     mPacketSources.clear();
630     mStopParams.clear();
631     mStartTimeUsNotify = mNotify->dup();
632     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
633     mStartTimeUsNotify->setString("uri", mURI);
634 
635     uint32_t streamTypeMask;
636     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
637 
638     int64_t startTimeUs;
639     int64_t segmentStartTimeUs;
640     int32_t startDiscontinuitySeq;
641     int32_t seekMode;
642     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
643     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
644     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
645     CHECK(msg->findInt32("seekMode", &seekMode));
646 
647     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
648         void *ptr;
649         CHECK(msg->findPointer("audioSource", &ptr));
650 
651         mPacketSources.add(
652                 LiveSession::STREAMTYPE_AUDIO,
653                 static_cast<AnotherPacketSource *>(ptr));
654     }
655 
656     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
657         void *ptr;
658         CHECK(msg->findPointer("videoSource", &ptr));
659 
660         mPacketSources.add(
661                 LiveSession::STREAMTYPE_VIDEO,
662                 static_cast<AnotherPacketSource *>(ptr));
663     }
664 
665     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
666         void *ptr;
667         CHECK(msg->findPointer("subtitleSource", &ptr));
668 
669         mPacketSources.add(
670                 LiveSession::STREAMTYPE_SUBTITLES,
671                 static_cast<AnotherPacketSource *>(ptr));
672     }
673 
674     void *ptr;
675     // metadataSource is not part of streamTypeMask
676     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
677             && msg->findPointer("metadataSource", &ptr)) {
678         mPacketSources.add(
679                 LiveSession::STREAMTYPE_METADATA,
680                 static_cast<AnotherPacketSource *>(ptr));
681     }
682 
683     mStreamTypeMask = streamTypeMask;
684 
685     mSegmentStartTimeUs = segmentStartTimeUs;
686 
687     if (startDiscontinuitySeq >= 0) {
688         mDiscontinuitySeq = startDiscontinuitySeq;
689     }
690 
691     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
692     mSeekMode = (LiveSession::SeekMode) seekMode;
693 
694     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
695         mStartup = true;
696         mIDRFound = false;
697         mVideoBuffer->clear();
698     }
699 
700     if (startTimeUs >= 0) {
701         mStartTimeUs = startTimeUs;
702         mFirstPTSValid = false;
703         mSeqNumber = -1;
704         mTimeChangeSignaled = false;
705         mDownloadState->resetState();
706     }
707 
708     postMonitorQueue();
709 
710     return OK;
711 }
712 
onPause()713 void PlaylistFetcher::onPause() {
714     cancelMonitorQueue();
715     mLastDiscontinuitySeq = mDiscontinuitySeq;
716 
717     resetStoppingThreshold(false /* disconnect */);
718 }
719 
onStop(const sp<AMessage> & msg)720 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
721     cancelMonitorQueue();
722 
723     int32_t clear;
724     CHECK(msg->findInt32("clear", &clear));
725     if (clear) {
726         for (size_t i = 0; i < mPacketSources.size(); i++) {
727             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
728             packetSource->clear();
729         }
730     }
731 
732     mDownloadState->resetState();
733     mPacketSources.clear();
734     mStreamTypeMask = 0;
735 
736     resetStoppingThreshold(true /* disconnect */);
737 }
738 
739 // Resume until we have reached the boundary timestamps listed in `msg`; when
740 // the remaining time is too short (within a resume threshold) stop immediately
741 // instead.
onResumeUntil(const sp<AMessage> & msg)742 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
743     sp<AMessage> params;
744     CHECK(msg->findMessage("params", &params));
745 
746     mStopParams = params;
747     onDownloadNext();
748 
749     return OK;
750 }
751 
notifyStopReached()752 void PlaylistFetcher::notifyStopReached() {
753     sp<AMessage> notify = mNotify->dup();
754     notify->setInt32("what", kWhatStopReached);
755     notify->post();
756 }
757 
notifyError(status_t err)758 void PlaylistFetcher::notifyError(status_t err) {
759     sp<AMessage> notify = mNotify->dup();
760     notify->setInt32("what", kWhatError);
761     notify->setInt32("err", err);
762     notify->post();
763 }
764 
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)765 void PlaylistFetcher::queueDiscontinuity(
766         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
767     for (size_t i = 0; i < mPacketSources.size(); ++i) {
768         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
769         // (seek will discard buffer by abandoning old fetchers)
770         mPacketSources.valueAt(i)->queueDiscontinuity(
771                 type, extra, false /* discard */);
772     }
773 }
774 
onMonitorQueue()775 void PlaylistFetcher::onMonitorQueue() {
776     // in the middle of an unfinished download, delay
777     // playlist refresh as it'll change seq numbers
778     if (!mDownloadState->hasSavedState()) {
779         refreshPlaylist();
780     }
781 
782     int64_t targetDurationUs = kMinBufferedDurationUs;
783     if (mPlaylist != NULL) {
784         targetDurationUs = mPlaylist->getTargetDuration();
785     }
786 
787     int64_t bufferedDurationUs = 0ll;
788     status_t finalResult = OK;
789     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
790         sp<AnotherPacketSource> packetSource =
791             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
792 
793         bufferedDurationUs =
794                 packetSource->getBufferedDurationUs(&finalResult);
795     } else {
796         // Use min stream duration, but ignore streams that never have any packet
797         // enqueued to prevent us from waiting on a non-existent stream;
798         // when we cannot make out from the manifest what streams are included in
799         // a playlist we might assume extra streams.
800         bufferedDurationUs = -1ll;
801         for (size_t i = 0; i < mPacketSources.size(); ++i) {
802             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
803                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
804                 continue;
805             }
806 
807             int64_t bufferedStreamDurationUs =
808                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
809 
810             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
811 
812             if (bufferedDurationUs == -1ll
813                  || bufferedStreamDurationUs < bufferedDurationUs) {
814                 bufferedDurationUs = bufferedStreamDurationUs;
815             }
816         }
817         if (bufferedDurationUs == -1ll) {
818             bufferedDurationUs = 0ll;
819         }
820     }
821 
822     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
823         FLOGV("monitoring, buffered=%lld < %lld",
824                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
825 
826         // delay the next download slightly; hopefully this gives other concurrent fetchers
827         // a better chance to run.
828         // onDownloadNext();
829         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
830         msg->setInt32("generation", mMonitorQueueGeneration);
831         msg->post(1000l);
832     } else {
833         // We'd like to maintain buffering above durationToBufferUs, so try
834         // again when buffer just about to go below durationToBufferUs
835         // (or after targetDurationUs / 2, whichever is smaller).
836         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
837         if (delayUs > targetDurationUs / 2) {
838             delayUs = targetDurationUs / 2;
839         }
840 
841         FLOGV("pausing for %lld, buffered=%lld > %lld",
842                 (long long)delayUs,
843                 (long long)bufferedDurationUs,
844                 (long long)kMinBufferedDurationUs);
845 
846         postMonitorQueue(delayUs);
847     }
848 }
849 
refreshPlaylist()850 status_t PlaylistFetcher::refreshPlaylist() {
851     if (delayUsToRefreshPlaylist() <= 0) {
852         bool unchanged;
853         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
854                 mURI.c_str(), mPlaylistHash, &unchanged);
855 
856         if (playlist == NULL) {
857             if (unchanged) {
858                 // We succeeded in fetching the playlist, but it was
859                 // unchanged from the last time we tried.
860 
861                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
862                     mRefreshState = (RefreshState)(mRefreshState + 1);
863                 }
864             } else {
865                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
866                 return ERROR_IO;
867             }
868         } else {
869             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
870             mPlaylist = playlist;
871 
872             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
873                 updateDuration();
874             }
875             // Notify LiveSession to use target-duration based buffering level
876             // for up/down switch. Default LiveSession::kUpSwitchMark may not
877             // be reachable for live streams, as our max buffering amount is
878             // limited to 3 segments.
879             if (!mPlaylist->isComplete()) {
880                 updateTargetDuration();
881             }
882             mPlaylistTimeUs = ALooper::GetNowUs();
883         }
884 
885         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
886     }
887     return OK;
888 }
889 
890 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)891 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
892     return buffer->size() > 0 && buffer->data()[0] == 0x47;
893 }
894 
shouldPauseDownload()895 bool PlaylistFetcher::shouldPauseDownload() {
896     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
897         // doesn't apply to subtitles
898         return false;
899     }
900 
901     // Calculate threshold to abort current download
902     float thresholdRatio = getStoppingThreshold();
903 
904     if (thresholdRatio < 0.0f) {
905         // never abort
906         return false;
907     } else if (thresholdRatio == 0.0f) {
908         // immediately abort
909         return true;
910     }
911 
912     // now we have a positive thresholdUs, abort if remaining
913     // portion to download is over that threshold.
914     if (mSegmentFirstPTS < 0) {
915         // this means we haven't even find the first access unit,
916         // abort now as we must be very far away from the end.
917         return true;
918     }
919     int64_t lastEnqueueUs = mSegmentFirstPTS;
920     for (size_t i = 0; i < mPacketSources.size(); ++i) {
921         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
922             continue;
923         }
924         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
925         int32_t type;
926         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
927             continue;
928         }
929         int64_t tmpUs;
930         CHECK(meta->findInt64("timeUs", &tmpUs));
931         if (tmpUs > lastEnqueueUs) {
932             lastEnqueueUs = tmpUs;
933         }
934     }
935     lastEnqueueUs -= mSegmentFirstPTS;
936 
937     int64_t targetDurationUs = mPlaylist->getTargetDuration();
938     int64_t thresholdUs = thresholdRatio * targetDurationUs;
939 
940     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
941             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
942             (long long)thresholdUs,
943             (long long)(targetDurationUs - lastEnqueueUs));
944 
945     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
946         return true;
947     }
948     return false;
949 }
950 
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)951 bool PlaylistFetcher::initDownloadState(
952         AString &uri,
953         sp<AMessage> &itemMeta,
954         int32_t &firstSeqNumberInPlaylist,
955         int32_t &lastSeqNumberInPlaylist) {
956     status_t err = refreshPlaylist();
957     firstSeqNumberInPlaylist = 0;
958     lastSeqNumberInPlaylist = 0;
959     bool discontinuity = false;
960 
961     if (mPlaylist != NULL) {
962         mPlaylist->getSeqNumberRange(
963                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
964 
965         if (mDiscontinuitySeq < 0) {
966             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
967         }
968     }
969 
970     mSegmentFirstPTS = -1ll;
971 
972     if (mPlaylist != NULL && mSeqNumber < 0) {
973         CHECK_GE(mStartTimeUs, 0ll);
974 
975         if (mSegmentStartTimeUs < 0) {
976             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
977                 // If this is a live session, start 3 segments from the end on connect
978                 mSeqNumber = lastSeqNumberInPlaylist - 3;
979                 if (mSeqNumber < firstSeqNumberInPlaylist) {
980                     mSeqNumber = firstSeqNumberInPlaylist;
981                 }
982             } else {
983                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
984                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
985                 // and relative position inside a segment
986                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
987                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
988             }
989             mStartTimeUsRelative = true;
990             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
991                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
992                     lastSeqNumberInPlaylist);
993         } else {
994             // When adapting or track switching, mSegmentStartTimeUs (relative
995             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
996             // timestamps coming from the media container) is used to determine the position
997             // inside a segments.
998             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
999                     && mSeekMode != LiveSession::kSeekModeNextSample) {
1000                 // avoid double fetch/decode
1001                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1002                 // for the starting segment in new variant.
1003                 // If the two variants' segments are aligned, this gives the
1004                 // next segment. If they're not aligned, this gives the segment
1005                 // that overlaps no more than 1/2 * targetDurationUs.
1006                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1007                         + mPlaylist->getTargetDuration() / 2);
1008             } else {
1009                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1010             }
1011             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1012             if (mSeqNumber < minSeq) {
1013                 mSeqNumber = minSeq;
1014             }
1015 
1016             if (mSeqNumber < firstSeqNumberInPlaylist) {
1017                 mSeqNumber = firstSeqNumberInPlaylist;
1018             }
1019 
1020             if (mSeqNumber > lastSeqNumberInPlaylist) {
1021                 mSeqNumber = lastSeqNumberInPlaylist;
1022             }
1023             FLOGV("Initial sequence number is %d from (%d .. %d)",
1024                     mSeqNumber, firstSeqNumberInPlaylist,
1025                     lastSeqNumberInPlaylist);
1026         }
1027     }
1028 
1029     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1030     if (mSeqNumber < firstSeqNumberInPlaylist
1031             || mSeqNumber > lastSeqNumberInPlaylist
1032             || err != OK) {
1033         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1034             ++mNumRetries;
1035 
1036             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1037                 // make sure we reach this retry logic on refresh failures
1038                 // by adding an err != OK clause to all enclosing if's.
1039 
1040                 // refresh in increasing fraction (1/2, 1/3, ...) of the
1041                 // playlist's target duration or 3 seconds, whichever is less
1042                 int64_t delayUs = kMaxMonitorDelayUs;
1043                 if (mPlaylist != NULL) {
1044                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1045                             / (1 + mNumRetries);
1046                 }
1047                 if (delayUs > kMaxMonitorDelayUs) {
1048                     delayUs = kMaxMonitorDelayUs;
1049                 }
1050                 FLOGV("sequence number high: %d from (%d .. %d), "
1051                       "monitor in %lld (retry=%d)",
1052                         mSeqNumber, firstSeqNumberInPlaylist,
1053                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1054                 postMonitorQueue(delayUs);
1055                 return false;
1056             }
1057 
1058             if (err != OK) {
1059                 notifyError(err);
1060                 return false;
1061             }
1062 
1063             // we've missed the boat, let's start 3 segments prior to the latest sequence
1064             // number available and signal a discontinuity.
1065 
1066             ALOGI("We've missed the boat, restarting playback."
1067                   "  mStartup=%d, was  looking for %d in %d-%d",
1068                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1069                     lastSeqNumberInPlaylist);
1070             if (mStopParams != NULL) {
1071                 // we should have kept on fetching until we hit the boundaries in mStopParams,
1072                 // but since the segments we are supposed to fetch have already rolled off
1073                 // the playlist, i.e. we have already missed the boat, we inevitably have to
1074                 // skip.
1075                 notifyStopReached();
1076                 return false;
1077             }
1078             mSeqNumber = lastSeqNumberInPlaylist - 3;
1079             if (mSeqNumber < firstSeqNumberInPlaylist) {
1080                 mSeqNumber = firstSeqNumberInPlaylist;
1081             }
1082             discontinuity = true;
1083 
1084             // fall through
1085         } else {
1086             if (mPlaylist != NULL) {
1087                 ALOGE("Cannot find sequence number %d in playlist "
1088                      "(contains %d - %d)",
1089                      mSeqNumber, firstSeqNumberInPlaylist,
1090                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1091 
1092                 if (mTSParser != NULL) {
1093                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
1094                     // Use an empty buffer; we don't have any new data, just want to extract
1095                     // potential new access units after flush.  Reset mSeqNumber to
1096                     // lastSeqNumberInPlaylist such that we set the correct access unit
1097                     // properties in extractAndQueueAccessUnitsFromTs.
1098                     sp<ABuffer> buffer = new ABuffer(0);
1099                     mSeqNumber = lastSeqNumberInPlaylist;
1100                     extractAndQueueAccessUnitsFromTs(buffer);
1101                 }
1102                 notifyError(ERROR_END_OF_STREAM);
1103             } else {
1104                 // It's possible that we were never able to download the playlist.
1105                 // In this case we should notify error, instead of EOS, as EOS during
1106                 // prepare means we succeeded in downloading everything.
1107                 ALOGE("Failed to download playlist!");
1108                 notifyError(ERROR_IO);
1109             }
1110 
1111             return false;
1112         }
1113     }
1114 
1115     mNumRetries = 0;
1116 
1117     CHECK(mPlaylist->itemAt(
1118                 mSeqNumber - firstSeqNumberInPlaylist,
1119                 &uri,
1120                 &itemMeta));
1121 
1122     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1123 
1124     int32_t val;
1125     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1126         discontinuity = true;
1127     } else if (mLastDiscontinuitySeq >= 0
1128             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1129         // Seek jumped to a new discontinuity sequence. We need to signal
1130         // a format change to decoder. Decoder needs to shutdown and be
1131         // created again if seamless format change is unsupported.
1132         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1133                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1134                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1135         discontinuity = true;
1136     }
1137     mLastDiscontinuitySeq = -1;
1138 
1139     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1140     // this avoids interleaved connections to the key and segment file.
1141     {
1142         sp<ABuffer> junk = new ABuffer(16);
1143         junk->setRange(0, 16);
1144         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1145                 true /* first */);
1146         if (err == ERROR_NOT_CONNECTED) {
1147             return false;
1148         } else if (err != OK) {
1149             notifyError(err);
1150             return false;
1151         }
1152     }
1153 
1154     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1155         // We need to signal a time discontinuity to ATSParser on the
1156         // first segment after start, or on a discontinuity segment.
1157         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1158         // to send the time discontinuity.
1159         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1160             // If this was a live event this made no sense since
1161             // we don't have access to all the segment before the current
1162             // one.
1163             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1164         }
1165 
1166         // Setting mTimeChangeSignaled to true, so that if start time
1167         // searching goes into 2nd segment (without a discontinuity),
1168         // we don't reset time again. It causes corruption when pending
1169         // data in ATSParser is cleared.
1170         mTimeChangeSignaled = true;
1171     }
1172 
1173     if (discontinuity) {
1174         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1175 
1176         // Signal a format discontinuity to ATSParser to clear partial data
1177         // from previous streams. Not doing this causes bitstream corruption.
1178         if (mTSParser != NULL) {
1179             mTSParser->signalDiscontinuity(
1180                     ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */);
1181         }
1182 
1183         queueDiscontinuity(
1184                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1185                 NULL /* extra */);
1186 
1187         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1188             // This means we guessed mStartTimeUs to be in the previous
1189             // segment (likely very close to the end), but either video or
1190             // audio has not found start by the end of that segment.
1191             //
1192             // If this new segment is not a discontinuity, keep searching.
1193             //
1194             // If this new segment even got a discontinuity marker, just
1195             // set mStartTimeUs=0, and take all samples from now on.
1196             mStartTimeUs = 0;
1197             mFirstPTSValid = false;
1198             mIDRFound = false;
1199             mVideoBuffer->clear();
1200         }
1201     }
1202 
1203     FLOGV("fetching segment %d from (%d .. %d)",
1204             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1205     return true;
1206 }
1207 
onDownloadNext()1208 void PlaylistFetcher::onDownloadNext() {
1209     AString uri;
1210     sp<AMessage> itemMeta;
1211     sp<ABuffer> buffer;
1212     sp<ABuffer> tsBuffer;
1213     int32_t firstSeqNumberInPlaylist = 0;
1214     int32_t lastSeqNumberInPlaylist = 0;
1215     bool connectHTTP = true;
1216 
1217     if (mDownloadState->hasSavedState()) {
1218         mDownloadState->restoreState(
1219                 uri,
1220                 itemMeta,
1221                 buffer,
1222                 tsBuffer,
1223                 firstSeqNumberInPlaylist,
1224                 lastSeqNumberInPlaylist);
1225         connectHTTP = false;
1226         FLOGV("resuming: '%s'", uri.c_str());
1227     } else {
1228         if (!initDownloadState(
1229                 uri,
1230                 itemMeta,
1231                 firstSeqNumberInPlaylist,
1232                 lastSeqNumberInPlaylist)) {
1233             return;
1234         }
1235         FLOGV("fetching: '%s'", uri.c_str());
1236     }
1237 
1238     int64_t range_offset, range_length;
1239     if (!itemMeta->findInt64("range-offset", &range_offset)
1240             || !itemMeta->findInt64("range-length", &range_length)) {
1241         range_offset = 0;
1242         range_length = -1;
1243     }
1244 
1245     // block-wise download
1246     bool shouldPause = false;
1247     ssize_t bytesRead;
1248     do {
1249         int64_t startUs = ALooper::GetNowUs();
1250         bytesRead = mHTTPDownloader->fetchBlock(
1251                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1252                 NULL /* actualURL */, connectHTTP);
1253         int64_t delayUs = ALooper::GetNowUs() - startUs;
1254 
1255         if (bytesRead == ERROR_NOT_CONNECTED) {
1256             return;
1257         }
1258         if (bytesRead < 0) {
1259             status_t err = bytesRead;
1260             ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1261             notifyError(err);
1262             return;
1263         }
1264 
1265         // add sample for bandwidth estimation, excluding samples from subtitles (as
1266         // its too small), or during startup/resumeUntil (when we could have more than
1267         // one connection open which affects bandwidth)
1268         if (!mStartup && mStopParams == NULL && bytesRead > 0
1269                 && (mStreamTypeMask
1270                         & (LiveSession::STREAMTYPE_AUDIO
1271                         | LiveSession::STREAMTYPE_VIDEO))) {
1272             mSession->addBandwidthMeasurement(bytesRead, delayUs);
1273             if (delayUs > 2000000ll) {
1274                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1275                         bytesRead, (double)delayUs / 1.0e6);
1276             }
1277         }
1278 
1279         connectHTTP = false;
1280 
1281         CHECK(buffer != NULL);
1282 
1283         size_t size = buffer->size();
1284         // Set decryption range.
1285         buffer->setRange(size - bytesRead, bytesRead);
1286         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1287                 buffer->offset() == 0 /* first */);
1288         // Unset decryption range.
1289         buffer->setRange(0, size);
1290 
1291         if (err != OK) {
1292             ALOGE("decryptBuffer failed w/ error %d", err);
1293 
1294             notifyError(err);
1295             return;
1296         }
1297 
1298         bool startUp = mStartup; // save current start up state
1299 
1300         err = OK;
1301         if (bufferStartsWithTsSyncByte(buffer)) {
1302             // Incremental extraction is only supported for MPEG2 transport streams.
1303             if (tsBuffer == NULL) {
1304                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1305                 tsBuffer->setRange(0, 0);
1306             } else if (tsBuffer->capacity() != buffer->capacity()) {
1307                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1308                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1309                 tsBuffer->setRange(tsOff, tsSize);
1310             }
1311             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1312             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1313         }
1314 
1315         if (err == -EAGAIN) {
1316             // starting sequence number too low/high
1317             mTSParser.clear();
1318             for (size_t i = 0; i < mPacketSources.size(); i++) {
1319                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1320                 packetSource->clear();
1321             }
1322             postMonitorQueue();
1323             return;
1324         } else if (err == ERROR_OUT_OF_RANGE) {
1325             // reached stopping point
1326             notifyStopReached();
1327             return;
1328         } else if (err != OK) {
1329             notifyError(err);
1330             return;
1331         }
1332         // If we're switching, post start notification
1333         // this should only be posted when the last chunk is full processed by TSParser
1334         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1335             CHECK(mStartTimeUsNotify != NULL);
1336             mStartTimeUsNotify->post();
1337             mStartTimeUsNotify.clear();
1338             shouldPause = true;
1339         }
1340         if (shouldPause || shouldPauseDownload()) {
1341             // save state and return if this is not the last chunk,
1342             // leaving the fetcher in paused state.
1343             if (bytesRead != 0) {
1344                 mDownloadState->saveState(
1345                         uri,
1346                         itemMeta,
1347                         buffer,
1348                         tsBuffer,
1349                         firstSeqNumberInPlaylist,
1350                         lastSeqNumberInPlaylist);
1351                 return;
1352             }
1353             shouldPause = true;
1354         }
1355     } while (bytesRead != 0);
1356 
1357     if (bufferStartsWithTsSyncByte(buffer)) {
1358         // If we don't see a stream in the program table after fetching a full ts segment
1359         // mark it as nonexistent.
1360         ATSParser::SourceType srcTypes[] =
1361                 { ATSParser::VIDEO, ATSParser::AUDIO };
1362         LiveSession::StreamType streamTypes[] =
1363                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1364         const size_t kNumTypes = NELEM(srcTypes);
1365 
1366         for (size_t i = 0; i < kNumTypes; i++) {
1367             ATSParser::SourceType srcType = srcTypes[i];
1368             LiveSession::StreamType streamType = streamTypes[i];
1369 
1370             sp<AnotherPacketSource> source =
1371                 static_cast<AnotherPacketSource *>(
1372                     mTSParser->getSource(srcType).get());
1373 
1374             if (!mTSParser->hasSource(srcType)) {
1375                 ALOGW("MPEG2 Transport stream does not contain %s data.",
1376                       srcType == ATSParser::VIDEO ? "video" : "audio");
1377 
1378                 mStreamTypeMask &= ~streamType;
1379                 mPacketSources.removeItem(streamType);
1380             }
1381         }
1382 
1383     }
1384 
1385     if (checkDecryptPadding(buffer) != OK) {
1386         ALOGE("Incorrect padding bytes after decryption.");
1387         notifyError(ERROR_MALFORMED);
1388         return;
1389     }
1390 
1391     if (tsBuffer != NULL) {
1392         AString method;
1393         CHECK(buffer->meta()->findString("cipher-method", &method));
1394         if ((tsBuffer->size() > 0 && method == "NONE")
1395                 || tsBuffer->size() > 16) {
1396             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1397                     "bytes in length.");
1398             notifyError(ERROR_MALFORMED);
1399             return;
1400         }
1401     }
1402 
1403     // bulk extract non-ts files
1404     bool startUp = mStartup;
1405     if (tsBuffer == NULL) {
1406         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1407         if (err == -EAGAIN) {
1408             // starting sequence number too low/high
1409             postMonitorQueue();
1410             return;
1411         } else if (err == ERROR_OUT_OF_RANGE) {
1412             // reached stopping point
1413             notifyStopReached();
1414             return;
1415         } else if (err != OK) {
1416             notifyError(err);
1417             return;
1418         }
1419     }
1420 
1421     ++mSeqNumber;
1422 
1423     // if adapting, pause after found the next starting point
1424     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1425         CHECK(mStartTimeUsNotify != NULL);
1426         mStartTimeUsNotify->post();
1427         mStartTimeUsNotify.clear();
1428         shouldPause = true;
1429     }
1430 
1431     if (!shouldPause) {
1432         postMonitorQueue();
1433     }
1434 }
1435 
1436 /*
1437  * returns true if we need to adjust mSeqNumber
1438  */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1439 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1440     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1441 
1442     int64_t minDiffUs, maxDiffUs;
1443     if (mSeekMode == LiveSession::kSeekModeNextSample) {
1444         // if the previous fetcher paused in the middle of a segment, we
1445         // want to start at a segment that overlaps the last sample
1446         minDiffUs = -mPlaylist->getTargetDuration();
1447         maxDiffUs = 0ll;
1448     } else {
1449         // if the previous fetcher paused at the end of a segment, ideally
1450         // we want to start at the segment that's roughly aligned with its
1451         // next segment, but if the two variants are not well aligned we
1452         // adjust the diff to within (-T/2, T/2)
1453         minDiffUs = -mPlaylist->getTargetDuration() / 2;
1454         maxDiffUs = mPlaylist->getTargetDuration() / 2;
1455     }
1456 
1457     int32_t oldSeqNumber = mSeqNumber;
1458     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1459 
1460     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1461     int64_t diffUs = anchorTimeUs - mStartTimeUs;
1462     if (diffUs > maxDiffUs) {
1463         while (index > 0 && diffUs > maxDiffUs) {
1464             --index;
1465 
1466             sp<AMessage> itemMeta;
1467             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1468 
1469             int64_t itemDurationUs;
1470             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1471 
1472             diffUs -= itemDurationUs;
1473         }
1474     } else if (diffUs < minDiffUs) {
1475         while (index + 1 < (ssize_t) mPlaylist->size()
1476                 && diffUs < minDiffUs) {
1477             ++index;
1478 
1479             sp<AMessage> itemMeta;
1480             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1481 
1482             int64_t itemDurationUs;
1483             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1484 
1485             diffUs += itemDurationUs;
1486         }
1487     }
1488 
1489     mSeqNumber = firstSeqNumberInPlaylist + index;
1490 
1491     if (mSeqNumber != oldSeqNumber) {
1492         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1493                 (long long) anchorTimeUs - mStartTimeUs,
1494                 (long long) minDiffUs,
1495                 (long long) maxDiffUs);
1496         return true;
1497     }
1498     return false;
1499 }
1500 
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1501 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1502     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1503 
1504     size_t index = 0;
1505     while (index < mPlaylist->size()) {
1506         sp<AMessage> itemMeta;
1507         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1508         size_t curDiscontinuitySeq;
1509         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1510         int32_t seqNumber = firstSeqNumberInPlaylist + index;
1511         if (curDiscontinuitySeq == discontinuitySeq) {
1512             return seqNumber;
1513         } else if (curDiscontinuitySeq > discontinuitySeq) {
1514             return seqNumber <= 0 ? 0 : seqNumber - 1;
1515         }
1516 
1517         ++index;
1518     }
1519 
1520     return firstSeqNumberInPlaylist + mPlaylist->size();
1521 }
1522 
getSeqNumberForTime(int64_t timeUs) const1523 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1524     size_t index = 0;
1525     int64_t segmentStartUs = 0;
1526     while (index < mPlaylist->size()) {
1527         sp<AMessage> itemMeta;
1528         CHECK(mPlaylist->itemAt(
1529                     index, NULL /* uri */, &itemMeta));
1530 
1531         int64_t itemDurationUs;
1532         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1533 
1534         if (timeUs < segmentStartUs + itemDurationUs) {
1535             break;
1536         }
1537 
1538         segmentStartUs += itemDurationUs;
1539         ++index;
1540     }
1541 
1542     if (index >= mPlaylist->size()) {
1543         index = mPlaylist->size() - 1;
1544     }
1545 
1546     return mPlaylist->getFirstSeqNumber() + index;
1547 }
1548 
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1549 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1550         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1551     sp<MetaData> format = source->getFormat();
1552     if (format != NULL) {
1553         // for simplicity, store a reference to the format in each unit
1554         accessUnit->meta()->setObject("format", format);
1555     }
1556 
1557     if (discard) {
1558         accessUnit->meta()->setInt32("discard", discard);
1559     }
1560 
1561     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1562     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1563     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1564     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1565     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1566         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1567     }
1568     return accessUnit;
1569 }
1570 
isStartTimeReached(int64_t timeUs)1571 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1572     if (!mFirstPTSValid) {
1573         mFirstTimeUs = timeUs;
1574         mFirstPTSValid = true;
1575     }
1576     bool startTimeReached = true;
1577     if (mStartTimeUsRelative) {
1578         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1579                 (long long)timeUs,
1580                 (long long)mFirstTimeUs,
1581                 (long long)(timeUs - mFirstTimeUs));
1582         timeUs -= mFirstTimeUs;
1583         if (timeUs < 0) {
1584             FLOGV("clamp negative timeUs to 0");
1585             timeUs = 0;
1586         }
1587         startTimeReached = (timeUs >= mStartTimeUs);
1588     }
1589     return startTimeReached;
1590 }
1591 
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1592 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1593     if (mTSParser == NULL) {
1594         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1595         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1596     }
1597 
1598     if (mNextPTSTimeUs >= 0ll) {
1599         sp<AMessage> extra = new AMessage;
1600         // Since we are using absolute timestamps, signal an offset of 0 to prevent
1601         // ATSParser from skewing the timestamps of access units.
1602         extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1603 
1604         // When adapting, signal a recent media time to the parser,
1605         // so that PTS wrap around is handled for the new variant.
1606         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1607             extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1608         }
1609 
1610         mTSParser->signalDiscontinuity(
1611                 ATSParser::DISCONTINUITY_TIME, extra);
1612 
1613         mNextPTSTimeUs = -1ll;
1614     }
1615 
1616     size_t offset = 0;
1617     while (offset + 188 <= buffer->size()) {
1618         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1619 
1620         if (err != OK) {
1621             return err;
1622         }
1623 
1624         offset += 188;
1625     }
1626     // setRange to indicate consumed bytes.
1627     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1628 
1629     if (mSegmentFirstPTS < 0ll) {
1630         // get the smallest first PTS from all streams present in this parser
1631         for (size_t i = mPacketSources.size(); i-- > 0;) {
1632             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1633             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1634                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1635                 return ERROR_MALFORMED;
1636             }
1637             if (stream == LiveSession::STREAMTYPE_METADATA) {
1638                 continue;
1639             }
1640             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1641             sp<AnotherPacketSource> source =
1642                 static_cast<AnotherPacketSource *>(
1643                         mTSParser->getSource(type).get());
1644 
1645             if (source == NULL) {
1646                 continue;
1647             }
1648             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1649             if (meta != NULL) {
1650                 int64_t timeUs;
1651                 CHECK(meta->findInt64("timeUs", &timeUs));
1652                 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1653                     mSegmentFirstPTS = timeUs;
1654                 }
1655             }
1656         }
1657         if (mSegmentFirstPTS < 0ll) {
1658             // didn't find any TS packet, can return early
1659             return OK;
1660         }
1661         if (!mStartTimeUsRelative) {
1662             // mStartup
1663             //   mStartup is true until we have queued a packet for all the streams
1664             //   we are fetching. We queue packets whose timestamps are greater than
1665             //   mStartTimeUs.
1666             // mSegmentStartTimeUs >= 0
1667             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
1668             // adjustSeqNumberWithAnchorTime(timeUs) == true
1669             //   we guessed a seq number that's either too large or too small.
1670             // If this happens, we'll adjust mSeqNumber and restart fetching from new
1671             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1672             // to -1 so that we don't enter this chunk next time.
1673             if (mStartup && mSegmentStartTimeUs >= 0
1674                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1675                 mStartTimeUsNotify = mNotify->dup();
1676                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1677                 mStartTimeUsNotify->setString("uri", mURI);
1678                 mIDRFound = false;
1679                 mSegmentStartTimeUs = -1;
1680                 return -EAGAIN;
1681             }
1682         }
1683     }
1684 
1685     status_t err = OK;
1686     for (size_t i = mPacketSources.size(); i-- > 0;) {
1687         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1688 
1689         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1690         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1691             ALOGE("MPEG2 Transport streams do not contain subtitles.");
1692             return ERROR_MALFORMED;
1693         }
1694 
1695         const char *key = LiveSession::getKeyForStream(stream);
1696         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1697 
1698         sp<AnotherPacketSource> source =
1699             static_cast<AnotherPacketSource *>(
1700                     mTSParser->getSource(type).get());
1701 
1702         if (source == NULL) {
1703             continue;
1704         }
1705 
1706         const char *mime;
1707         sp<MetaData> format  = source->getFormat();
1708         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1709                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1710 
1711         sp<ABuffer> accessUnit;
1712         status_t finalResult;
1713         while (source->hasBufferAvailable(&finalResult)
1714                 && source->dequeueAccessUnit(&accessUnit) == OK) {
1715 
1716             int64_t timeUs;
1717             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1718 
1719             if (mStartup) {
1720                 bool startTimeReached = isStartTimeReached(timeUs);
1721 
1722                 if (!startTimeReached || (isAvc && !mIDRFound)) {
1723                     // buffer up to the closest preceding IDR frame in the next segement,
1724                     // or the closest succeeding IDR frame after the exact position
1725                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1726                             (long long)timeUs,
1727                             (long long)mStartTimeUs,
1728                             (long long)timeUs - mStartTimeUs,
1729                             mIDRFound);
1730                     if (isAvc) {
1731                         if (IsIDR(accessUnit)) {
1732                             mVideoBuffer->clear();
1733                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
1734                             mIDRFound = true;
1735                         }
1736                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1737                             mVideoBuffer->queueAccessUnit(accessUnit);
1738                             FSLOGV(stream, "saving AVC video AccessUnit");
1739                         }
1740                     }
1741                     if (!startTimeReached || (isAvc && !mIDRFound)) {
1742                         continue;
1743                     }
1744                 }
1745             }
1746 
1747             if (mStartTimeUsNotify != NULL) {
1748                 uint32_t streamMask = 0;
1749                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1750                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1751                         && !(streamMask & mPacketSources.keyAt(i))) {
1752                     streamMask |= mPacketSources.keyAt(i);
1753                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
1754                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1755                             (long long)timeUs, streamMask);
1756 
1757                     if (streamMask == mStreamTypeMask) {
1758                         FLOGV("found start point for all streams");
1759                         mStartup = false;
1760                     }
1761                 }
1762             }
1763 
1764             if (mStopParams != NULL) {
1765                 int32_t discontinuitySeq;
1766                 int64_t stopTimeUs;
1767                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1768                         || discontinuitySeq > mDiscontinuitySeq
1769                         || !mStopParams->findInt64(key, &stopTimeUs)
1770                         || (discontinuitySeq == mDiscontinuitySeq
1771                                 && timeUs >= stopTimeUs)) {
1772                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1773                     mStreamTypeMask &= ~stream;
1774                     mPacketSources.removeItemsAt(i);
1775                     break;
1776                 }
1777             }
1778 
1779             if (stream == LiveSession::STREAMTYPE_VIDEO) {
1780                 const bool discard = true;
1781                 status_t status;
1782                 while (mVideoBuffer->hasBufferAvailable(&status)) {
1783                     sp<ABuffer> videoBuffer;
1784                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1785                     setAccessUnitProperties(videoBuffer, source, discard);
1786                     packetSource->queueAccessUnit(videoBuffer);
1787                     int64_t bufferTimeUs;
1788                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1789                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1790                             (long long)bufferTimeUs);
1791                 }
1792             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1793                 mHasMetadata = true;
1794                 sp<AMessage> notify = mNotify->dup();
1795                 notify->setInt32("what", kWhatMetadataDetected);
1796                 notify->post();
1797             }
1798 
1799             setAccessUnitProperties(accessUnit, source);
1800             packetSource->queueAccessUnit(accessUnit);
1801             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1802         }
1803 
1804         if (err != OK) {
1805             break;
1806         }
1807     }
1808 
1809     if (err != OK) {
1810         for (size_t i = mPacketSources.size(); i-- > 0;) {
1811             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1812             packetSource->clear();
1813         }
1814         return err;
1815     }
1816 
1817     if (!mStreamTypeMask) {
1818         // Signal gap is filled between original and new stream.
1819         FLOGV("reached stop point for all streams");
1820         return ERROR_OUT_OF_RANGE;
1821     }
1822 
1823     return OK;
1824 }
1825 
1826 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1827 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1828         const sp<ABuffer> &buffer) {
1829     size_t pos = 0;
1830 
1831     // skip possible BOM
1832     if (buffer->size() >= pos + 3 &&
1833             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1834         pos += 3;
1835     }
1836 
1837     // accept WEBVTT followed by SPACE, TAB or (CR) LF
1838     if (buffer->size() < pos + 6 ||
1839             memcmp("WEBVTT", buffer->data() + pos, 6)) {
1840         return false;
1841     }
1842     pos += 6;
1843 
1844     if (buffer->size() == pos) {
1845         return true;
1846     }
1847 
1848     uint8_t sep = buffer->data()[pos];
1849     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1850 }
1851 
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1852 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1853         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1854     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1855         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1856             ALOGE("This stream only contains subtitles.");
1857             return ERROR_MALFORMED;
1858         }
1859 
1860         const sp<AnotherPacketSource> packetSource =
1861             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1862 
1863         int64_t durationUs;
1864         CHECK(itemMeta->findInt64("durationUs", &durationUs));
1865         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1866         buffer->meta()->setInt64("durationUs", durationUs);
1867         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1868         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1869         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1870         packetSource->queueAccessUnit(buffer);
1871         return OK;
1872     }
1873 
1874     if (mNextPTSTimeUs >= 0ll) {
1875         mNextPTSTimeUs = -1ll;
1876     }
1877 
1878     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1879     // stream prefixed by an ID3 tag.
1880 
1881     bool firstID3Tag = true;
1882     uint64_t PTS = 0;
1883 
1884     for (;;) {
1885         // Make sure to skip all ID3 tags preceding the audio data.
1886         // At least one must be present to provide the PTS timestamp.
1887 
1888         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1889         if (!id3.isValid()) {
1890             if (firstID3Tag) {
1891                 ALOGE("Unable to parse ID3 tag.");
1892                 return ERROR_MALFORMED;
1893             } else {
1894                 break;
1895             }
1896         }
1897 
1898         if (firstID3Tag) {
1899             bool found = false;
1900 
1901             ID3::Iterator it(id3, "PRIV");
1902             while (!it.done()) {
1903                 size_t length;
1904                 const uint8_t *data = it.getData(&length);
1905 
1906                 static const char *kMatchName =
1907                     "com.apple.streaming.transportStreamTimestamp";
1908                 static const size_t kMatchNameLen = strlen(kMatchName);
1909 
1910                 if (length == kMatchNameLen + 1 + 8
1911                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1912                     found = true;
1913                     PTS = U64_AT(&data[kMatchNameLen + 1]);
1914                 }
1915 
1916                 it.next();
1917             }
1918 
1919             if (!found) {
1920                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1921                 return ERROR_MALFORMED;
1922             }
1923         }
1924 
1925         // skip the ID3 tag
1926         buffer->setRange(
1927                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1928 
1929         firstID3Tag = false;
1930     }
1931 
1932     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1933         ALOGW("This stream only contains audio data!");
1934 
1935         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1936 
1937         if (mStreamTypeMask == 0) {
1938             return OK;
1939         }
1940     }
1941 
1942     sp<AnotherPacketSource> packetSource =
1943         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1944 
1945     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1946         ABitReader bits(buffer->data(), buffer->size());
1947 
1948         // adts_fixed_header
1949 
1950         CHECK_EQ(bits.getBits(12), 0xfffu);
1951         bits.skipBits(3);  // ID, layer
1952         bool protection_absent __unused = bits.getBits(1) != 0;
1953 
1954         unsigned profile = bits.getBits(2);
1955         CHECK_NE(profile, 3u);
1956         unsigned sampling_freq_index = bits.getBits(4);
1957         bits.getBits(1);  // private_bit
1958         unsigned channel_configuration = bits.getBits(3);
1959         CHECK_NE(channel_configuration, 0u);
1960         bits.skipBits(2);  // original_copy, home
1961 
1962         sp<MetaData> meta = MakeAACCodecSpecificData(
1963                 profile, sampling_freq_index, channel_configuration);
1964 
1965         meta->setInt32(kKeyIsADTS, true);
1966 
1967         packetSource->setFormat(meta);
1968     }
1969 
1970     int64_t numSamples = 0ll;
1971     int32_t sampleRate;
1972     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1973 
1974     int64_t timeUs = (PTS * 100ll) / 9ll;
1975     if (mStartup && !mFirstPTSValid) {
1976         mFirstPTSValid = true;
1977         mFirstTimeUs = timeUs;
1978     }
1979 
1980     if (mSegmentFirstPTS < 0ll) {
1981         mSegmentFirstPTS = timeUs;
1982         if (!mStartTimeUsRelative) {
1983             // Duplicated logic from how we handle .ts playlists.
1984             if (mStartup && mSegmentStartTimeUs >= 0
1985                     && adjustSeqNumberWithAnchorTime(timeUs)) {
1986                 mSegmentStartTimeUs = -1;
1987                 return -EAGAIN;
1988             }
1989         }
1990     }
1991 
1992     size_t offset = 0;
1993     while (offset < buffer->size()) {
1994         const uint8_t *adtsHeader = buffer->data() + offset;
1995         CHECK_LT(offset + 5, buffer->size());
1996 
1997         unsigned aac_frame_length =
1998             ((adtsHeader[3] & 3) << 11)
1999             | (adtsHeader[4] << 3)
2000             | (adtsHeader[5] >> 5);
2001 
2002         if (aac_frame_length == 0) {
2003             const uint8_t *id3Header = adtsHeader;
2004             if (!memcmp(id3Header, "ID3", 3)) {
2005                 ID3 id3(id3Header, buffer->size() - offset, true);
2006                 if (id3.isValid()) {
2007                     offset += id3.rawSize();
2008                     continue;
2009                 };
2010             }
2011             return ERROR_MALFORMED;
2012         }
2013 
2014         CHECK_LE(offset + aac_frame_length, buffer->size());
2015 
2016         int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2017         offset += aac_frame_length;
2018 
2019         // Each AAC frame encodes 1024 samples.
2020         numSamples += 1024;
2021 
2022         if (mStartup) {
2023             int64_t startTimeUs = unitTimeUs;
2024             if (mStartTimeUsRelative) {
2025                 startTimeUs -= mFirstTimeUs;
2026                 if (startTimeUs  < 0) {
2027                     startTimeUs = 0;
2028                 }
2029             }
2030             if (startTimeUs < mStartTimeUs) {
2031                 continue;
2032             }
2033 
2034             if (mStartTimeUsNotify != NULL) {
2035                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2036                 mStartup = false;
2037             }
2038         }
2039 
2040         if (mStopParams != NULL) {
2041             int32_t discontinuitySeq;
2042             int64_t stopTimeUs;
2043             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2044                     || discontinuitySeq > mDiscontinuitySeq
2045                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2046                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2047                 mStreamTypeMask = 0;
2048                 mPacketSources.clear();
2049                 return ERROR_OUT_OF_RANGE;
2050             }
2051         }
2052 
2053         sp<ABuffer> unit = new ABuffer(aac_frame_length);
2054         memcpy(unit->data(), adtsHeader, aac_frame_length);
2055 
2056         unit->meta()->setInt64("timeUs", unitTimeUs);
2057         setAccessUnitProperties(unit, packetSource);
2058         packetSource->queueAccessUnit(unit);
2059     }
2060 
2061     return OK;
2062 }
2063 
updateDuration()2064 void PlaylistFetcher::updateDuration() {
2065     int64_t durationUs = 0ll;
2066     for (size_t index = 0; index < mPlaylist->size(); ++index) {
2067         sp<AMessage> itemMeta;
2068         CHECK(mPlaylist->itemAt(
2069                     index, NULL /* uri */, &itemMeta));
2070 
2071         int64_t itemDurationUs;
2072         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2073 
2074         durationUs += itemDurationUs;
2075     }
2076 
2077     sp<AMessage> msg = mNotify->dup();
2078     msg->setInt32("what", kWhatDurationUpdate);
2079     msg->setInt64("durationUs", durationUs);
2080     msg->post();
2081 }
2082 
updateTargetDuration()2083 void PlaylistFetcher::updateTargetDuration() {
2084     sp<AMessage> msg = mNotify->dup();
2085     msg->setInt32("what", kWhatTargetDurationUpdate);
2086     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2087     msg->post();
2088 }
2089 
2090 }  // namespace android
2091