1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <android-base/macros.h>
20 #include <utils/Log.h>
21 #include <utils/misc.h>
22
23 #include "PlaylistFetcher.h"
24 #include "HTTPDownloader.h"
25 #include "LiveSession.h"
26 #include "M3UParser.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 #include "mpeg2ts/HlsSampleDecryptor.h"
30
31 #include <media/stagefright/foundation/ABitReader.h>
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/ByteUtils.h>
35 #include <media/stagefright/foundation/MediaKeys.h>
36 #include <media/stagefright/foundation/avc_utils.h>
37 #include <media/stagefright/DataURISource.h>
38 #include <media/stagefright/MediaDefs.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/MetaDataUtils.h>
41 #include <media/stagefright/Utils.h>
42
43 #include <ctype.h>
44 #include <inttypes.h>
45
46 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
47 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
48 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
49
50 namespace android {
51
52 // static
53 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000LL;
54 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000LL;
55 // LCM of 188 (size of a TS packet) & 1k works well
56 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
57
58 struct PlaylistFetcher::DownloadState : public RefBase {
59 DownloadState();
60 void resetState();
61 bool hasSavedState() const;
62 void restoreState(
63 AString &uri,
64 sp<AMessage> &itemMeta,
65 sp<ABuffer> &buffer,
66 sp<ABuffer> &tsBuffer,
67 int32_t &firstSeqNumberInPlaylist,
68 int32_t &lastSeqNumberInPlaylist);
69 void saveState(
70 AString &uri,
71 sp<AMessage> &itemMeta,
72 sp<ABuffer> &buffer,
73 sp<ABuffer> &tsBuffer,
74 int32_t &firstSeqNumberInPlaylist,
75 int32_t &lastSeqNumberInPlaylist);
76
77 private:
78 bool mHasSavedState;
79 AString mUri;
80 sp<AMessage> mItemMeta;
81 sp<ABuffer> mBuffer;
82 sp<ABuffer> mTsBuffer;
83 int32_t mFirstSeqNumberInPlaylist;
84 int32_t mLastSeqNumberInPlaylist;
85 };
86
DownloadState()87 PlaylistFetcher::DownloadState::DownloadState() {
88 resetState();
89 }
90
hasSavedState() const91 bool PlaylistFetcher::DownloadState::hasSavedState() const {
92 return mHasSavedState;
93 }
94
resetState()95 void PlaylistFetcher::DownloadState::resetState() {
96 mHasSavedState = false;
97
98 mUri.clear();
99 mItemMeta = NULL;
100 mBuffer = NULL;
101 mTsBuffer = NULL;
102 mFirstSeqNumberInPlaylist = 0;
103 mLastSeqNumberInPlaylist = 0;
104 }
105
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)106 void PlaylistFetcher::DownloadState::restoreState(
107 AString &uri,
108 sp<AMessage> &itemMeta,
109 sp<ABuffer> &buffer,
110 sp<ABuffer> &tsBuffer,
111 int32_t &firstSeqNumberInPlaylist,
112 int32_t &lastSeqNumberInPlaylist) {
113 if (!mHasSavedState) {
114 return;
115 }
116
117 uri = mUri;
118 itemMeta = mItemMeta;
119 buffer = mBuffer;
120 tsBuffer = mTsBuffer;
121 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
122 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
123
124 resetState();
125 }
126
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)127 void PlaylistFetcher::DownloadState::saveState(
128 AString &uri,
129 sp<AMessage> &itemMeta,
130 sp<ABuffer> &buffer,
131 sp<ABuffer> &tsBuffer,
132 int32_t &firstSeqNumberInPlaylist,
133 int32_t &lastSeqNumberInPlaylist) {
134 mHasSavedState = true;
135
136 mUri = uri;
137 mItemMeta = itemMeta;
138 mBuffer = buffer;
139 mTsBuffer = tsBuffer;
140 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
141 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
142 }
143
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)144 PlaylistFetcher::PlaylistFetcher(
145 const sp<AMessage> ¬ify,
146 const sp<LiveSession> &session,
147 const char *uri,
148 int32_t id,
149 int32_t subtitleGeneration)
150 : mNotify(notify),
151 mSession(session),
152 mURI(uri),
153 mFetcherID(id),
154 mStreamTypeMask(0),
155 mStartTimeUs(-1LL),
156 mSegmentStartTimeUs(-1LL),
157 mDiscontinuitySeq(-1LL),
158 mStartTimeUsRelative(false),
159 mLastPlaylistFetchTimeUs(-1LL),
160 mPlaylistTimeUs(-1LL),
161 mSeqNumber(-1),
162 mNumRetries(0),
163 mNumRetriesForMonitorQueue(0),
164 mStartup(true),
165 mIDRFound(false),
166 mSeekMode(LiveSession::kSeekModeExactPosition),
167 mTimeChangeSignaled(false),
168 mNextPTSTimeUs(-1LL),
169 mMonitorQueueGeneration(0),
170 mSubtitleGeneration(subtitleGeneration),
171 mLastDiscontinuitySeq(-1LL),
172 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
173 mFirstPTSValid(false),
174 mFirstTimeUs(-1LL),
175 mVideoBuffer(new AnotherPacketSource(NULL)),
176 mSampleAesKeyItemChanged(false),
177 mThresholdRatio(-1.0f),
178 mDownloadState(new DownloadState()),
179 mHasMetadata(false) {
180 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
181 mHTTPDownloader = mSession->getHTTPDownloader();
182
183 memset(mKeyData, 0, sizeof(mKeyData));
184 memset(mAESInitVec, 0, sizeof(mAESInitVec));
185 }
186
~PlaylistFetcher()187 PlaylistFetcher::~PlaylistFetcher() {
188 }
189
getFetcherID() const190 int32_t PlaylistFetcher::getFetcherID() const {
191 return mFetcherID;
192 }
193
getSegmentStartTimeUs(int32_t seqNumber) const194 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
195 CHECK(mPlaylist != NULL);
196
197 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
198 mPlaylist->getSeqNumberRange(
199 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
200
201 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
202 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
203
204 int64_t segmentStartUs = 0LL;
205 for (int32_t index = 0;
206 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
207 sp<AMessage> itemMeta;
208 CHECK(mPlaylist->itemAt(
209 index, NULL /* uri */, &itemMeta));
210
211 int64_t itemDurationUs;
212 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
213
214 segmentStartUs += itemDurationUs;
215 }
216
217 return segmentStartUs;
218 }
219
getSegmentDurationUs(int32_t seqNumber) const220 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
221 CHECK(mPlaylist != NULL);
222
223 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
224 mPlaylist->getSeqNumberRange(
225 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
226
227 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
228 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
229
230 int32_t index = seqNumber - firstSeqNumberInPlaylist;
231 sp<AMessage> itemMeta;
232 CHECK(mPlaylist->itemAt(
233 index, NULL /* uri */, &itemMeta));
234
235 int64_t itemDurationUs;
236 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
237
238 return itemDurationUs;
239 }
240
delayUsToRefreshPlaylist() const241 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
242 int64_t nowUs = ALooper::GetNowUs();
243
244 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0LL) {
245 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
246 return 0LL;
247 }
248
249 if (mPlaylist->isComplete()) {
250 return (~0LLU >> 1);
251 }
252
253 int64_t targetDurationUs = mPlaylist->getTargetDuration();
254
255 int64_t minPlaylistAgeUs;
256
257 switch (mRefreshState) {
258 case INITIAL_MINIMUM_RELOAD_DELAY:
259 {
260 size_t n = mPlaylist->size();
261 if (n > 0) {
262 sp<AMessage> itemMeta;
263 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
264
265 int64_t itemDurationUs;
266 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
267
268 minPlaylistAgeUs = itemDurationUs;
269 break;
270 }
271
272 FALLTHROUGH_INTENDED;
273 }
274
275 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
276 {
277 minPlaylistAgeUs = targetDurationUs / 2;
278 break;
279 }
280
281 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
282 {
283 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
284 break;
285 }
286
287 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
288 {
289 minPlaylistAgeUs = targetDurationUs * 3;
290 break;
291 }
292
293 default:
294 TRESPASS();
295 break;
296 }
297
298 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
299 return delayUs > 0LL ? delayUs : 0LL;
300 }
301
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)302 status_t PlaylistFetcher::decryptBuffer(
303 size_t playlistIndex, const sp<ABuffer> &buffer,
304 bool first) {
305 sp<AMessage> itemMeta;
306 bool found = false;
307 AString method;
308
309 for (ssize_t i = playlistIndex; i >= 0; --i) {
310 AString uri;
311 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
312
313 if (itemMeta->findString("cipher-method", &method)) {
314 found = true;
315 break;
316 }
317 }
318
319 // TODO: Revise this when we add support for KEYFORMAT
320 // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
321 if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
322 ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
323 mSampleAesKeyItem.get(), method.c_str());
324 mSampleAesKeyItem = NULL;
325 mSampleAesKeyItemChanged = true;
326 }
327
328 if (!found) {
329 method = "NONE";
330 }
331 buffer->meta()->setString("cipher-method", method.c_str());
332
333 if (method == "NONE") {
334 return OK;
335 } else if (method == "SAMPLE-AES") {
336 ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
337 } else if (!(method == "AES-128")) {
338 ALOGE("Unsupported cipher method '%s'", method.c_str());
339 return ERROR_UNSUPPORTED;
340 }
341
342 AString keyURI;
343 if (!itemMeta->findString("cipher-uri", &keyURI)) {
344 ALOGE("Missing key uri");
345 return ERROR_MALFORMED;
346 }
347
348 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
349
350 sp<ABuffer> key;
351 if (index >= 0) {
352 key = mAESKeyForURI.valueAt(index);
353 } else if (keyURI.startsWith("data:")) {
354 sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
355 off64_t keyLen;
356 if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
357 ALOGE("Malformed cipher key data uri.");
358 return ERROR_MALFORMED;
359 }
360 key = new ABuffer(keyLen);
361 keySrc->readAt(0, key->data(), keyLen);
362 key->setRange(0, keyLen);
363 } else {
364 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
365
366 if (err == ERROR_NOT_CONNECTED) {
367 return ERROR_NOT_CONNECTED;
368 } else if (err < 0) {
369 ALOGE("failed to fetch cipher key from '%s'.", uriDebugString(keyURI).c_str());
370 return ERROR_IO;
371 } else if (key->size() != 16) {
372 ALOGE("key file '%s' wasn't 16 bytes in size.", uriDebugString(keyURI).c_str());
373 return ERROR_MALFORMED;
374 }
375
376 mAESKeyForURI.add(keyURI, key);
377 }
378
379 if (first) {
380 // If decrypting the first block in a file, read the iv from the manifest
381 // or derive the iv from the file's sequence number.
382
383 unsigned char AESInitVec[AES_BLOCK_SIZE];
384 AString iv;
385 if (itemMeta->findString("cipher-iv", &iv)) {
386 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
387 || iv.size() > 16 * 2 + 2) {
388 ALOGE("malformed cipher IV '%s'.", iv.c_str());
389 return ERROR_MALFORMED;
390 }
391
392 while (iv.size() < 16 * 2 + 2) {
393 iv.insert("0", 1, 2);
394 }
395
396 memset(AESInitVec, 0, sizeof(AESInitVec));
397 for (size_t i = 0; i < 16; ++i) {
398 char c1 = tolower(iv.c_str()[2 + 2 * i]);
399 char c2 = tolower(iv.c_str()[3 + 2 * i]);
400 if (!isxdigit(c1) || !isxdigit(c2)) {
401 ALOGE("malformed cipher IV '%s'.", iv.c_str());
402 return ERROR_MALFORMED;
403 }
404 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
405 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
406
407 AESInitVec[i] = nibble1 << 4 | nibble2;
408 }
409 } else {
410 memset(AESInitVec, 0, sizeof(AESInitVec));
411 AESInitVec[15] = mSeqNumber & 0xff;
412 AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
413 AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
414 AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
415 }
416
417 bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
418 bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
419 bool newSampleAesKeyItem = newKey || newInitVec;
420 ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
421 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
422
423 if (newSampleAesKeyItem) {
424 memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
425 memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
426
427 if (method == "SAMPLE-AES") {
428 mSampleAesKeyItemChanged = true;
429
430 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
431 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
432
433 // always allocating a new one rather than updating the old message
434 // lower layer might still have a reference to the old message
435 mSampleAesKeyItem = new AMessage();
436 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
437 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
438
439 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s IV: %s",
440 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
441 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
442 } // SAMPLE-AES
443 } // newSampleAesKeyItem
444 } // first
445
446 if (method == "SAMPLE-AES") {
447 ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
448 return OK;
449 }
450
451
452 AES_KEY aes_key;
453 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
454 ALOGE("failed to set AES decryption key.");
455 return UNKNOWN_ERROR;
456 }
457
458 size_t n = buffer->size();
459 if (!n) {
460 return OK;
461 }
462
463 if (n < 16 || n % 16) {
464 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
465 return ERROR_MALFORMED;
466 }
467
468 AES_cbc_encrypt(
469 buffer->data(), buffer->data(), buffer->size(),
470 &aes_key, mAESInitVec, AES_DECRYPT);
471
472 return OK;
473 }
474
checkDecryptPadding(const sp<ABuffer> & buffer)475 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
476 AString method;
477 CHECK(buffer->meta()->findString("cipher-method", &method));
478 if (method == "NONE" || method == "SAMPLE-AES") {
479 return OK;
480 }
481
482 uint8_t padding = 0;
483 if (buffer->size() > 0) {
484 padding = buffer->data()[buffer->size() - 1];
485 }
486
487 if (padding > 16) {
488 return ERROR_MALFORMED;
489 }
490
491 for (size_t i = buffer->size() - padding; i < padding; i++) {
492 if (buffer->data()[i] != padding) {
493 return ERROR_MALFORMED;
494 }
495 }
496
497 buffer->setRange(buffer->offset(), buffer->size() - padding);
498 return OK;
499 }
500
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)501 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
502 int64_t maxDelayUs = delayUsToRefreshPlaylist();
503 if (maxDelayUs < minDelayUs) {
504 maxDelayUs = minDelayUs;
505 }
506 if (delayUs > maxDelayUs) {
507 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
508 delayUs = maxDelayUs;
509 }
510 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
511 msg->setInt32("generation", mMonitorQueueGeneration);
512 msg->post(delayUs);
513 }
514
cancelMonitorQueue()515 void PlaylistFetcher::cancelMonitorQueue() {
516 ++mMonitorQueueGeneration;
517 }
518
setStoppingThreshold(float thresholdRatio,bool disconnect)519 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
520 {
521 AutoMutex _l(mThresholdLock);
522 mThresholdRatio = thresholdRatio;
523 }
524 if (disconnect) {
525 mHTTPDownloader->disconnect();
526 }
527 }
528
resetStoppingThreshold(bool disconnect)529 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
530 {
531 AutoMutex _l(mThresholdLock);
532 mThresholdRatio = -1.0f;
533 }
534 if (disconnect) {
535 mHTTPDownloader->disconnect();
536 } else {
537 // allow reconnect
538 mHTTPDownloader->reconnect();
539 }
540 }
541
getStoppingThreshold()542 float PlaylistFetcher::getStoppingThreshold() {
543 AutoMutex _l(mThresholdLock);
544 return mThresholdRatio;
545 }
546
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)547 void PlaylistFetcher::startAsync(
548 const sp<AnotherPacketSource> &audioSource,
549 const sp<AnotherPacketSource> &videoSource,
550 const sp<AnotherPacketSource> &subtitleSource,
551 const sp<AnotherPacketSource> &metadataSource,
552 int64_t startTimeUs,
553 int64_t segmentStartTimeUs,
554 int32_t startDiscontinuitySeq,
555 LiveSession::SeekMode seekMode) {
556 sp<AMessage> msg = new AMessage(kWhatStart, this);
557
558 uint32_t streamTypeMask = 0ul;
559
560 if (audioSource != NULL) {
561 msg->setPointer("audioSource", audioSource.get());
562 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
563 }
564
565 if (videoSource != NULL) {
566 msg->setPointer("videoSource", videoSource.get());
567 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
568 }
569
570 if (subtitleSource != NULL) {
571 msg->setPointer("subtitleSource", subtitleSource.get());
572 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
573 }
574
575 if (metadataSource != NULL) {
576 msg->setPointer("metadataSource", metadataSource.get());
577 // metadataSource does not affect streamTypeMask.
578 }
579
580 msg->setInt32("streamTypeMask", streamTypeMask);
581 msg->setInt64("startTimeUs", startTimeUs);
582 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
583 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
584 msg->setInt32("seekMode", seekMode);
585 msg->post();
586 }
587
588 /*
589 * pauseAsync
590 *
591 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
592 * -1.0f - pause after finishing current segment
593 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
594 */
pauseAsync(float thresholdRatio,bool disconnect)595 void PlaylistFetcher::pauseAsync(
596 float thresholdRatio, bool disconnect) {
597 setStoppingThreshold(thresholdRatio, disconnect);
598
599 (new AMessage(kWhatPause, this))->post();
600 }
601
stopAsync(bool clear)602 void PlaylistFetcher::stopAsync(bool clear) {
603 setStoppingThreshold(0.0f, true /* disconncect */);
604
605 sp<AMessage> msg = new AMessage(kWhatStop, this);
606 msg->setInt32("clear", clear);
607 msg->post();
608 }
609
resumeUntilAsync(const sp<AMessage> & params)610 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
611 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
612
613 AMessage* msg = new AMessage(kWhatResumeUntil, this);
614 msg->setMessage("params", params);
615 msg->post();
616 }
617
fetchPlaylistAsync()618 void PlaylistFetcher::fetchPlaylistAsync() {
619 (new AMessage(kWhatFetchPlaylist, this))->post();
620 }
621
onMessageReceived(const sp<AMessage> & msg)622 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
623 switch (msg->what()) {
624 case kWhatStart:
625 {
626 status_t err = onStart(msg);
627
628 sp<AMessage> notify = mNotify->dup();
629 notify->setInt32("what", kWhatStarted);
630 notify->setInt32("err", err);
631 notify->post();
632 break;
633 }
634
635 case kWhatPause:
636 {
637 onPause();
638
639 sp<AMessage> notify = mNotify->dup();
640 notify->setInt32("what", kWhatPaused);
641 notify->setInt32("seekMode",
642 mDownloadState->hasSavedState()
643 ? LiveSession::kSeekModeNextSample
644 : LiveSession::kSeekModeNextSegment);
645 notify->post();
646 break;
647 }
648
649 case kWhatStop:
650 {
651 onStop(msg);
652
653 sp<AMessage> notify = mNotify->dup();
654 notify->setInt32("what", kWhatStopped);
655 notify->post();
656 break;
657 }
658
659 case kWhatFetchPlaylist:
660 {
661 bool unchanged;
662 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
663 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
664
665 sp<AMessage> notify = mNotify->dup();
666 notify->setInt32("what", kWhatPlaylistFetched);
667 notify->setObject("playlist", playlist);
668 notify->post();
669 break;
670 }
671
672 case kWhatMonitorQueue:
673 case kWhatDownloadNext:
674 {
675 int32_t generation;
676 CHECK(msg->findInt32("generation", &generation));
677
678 if (generation != mMonitorQueueGeneration) {
679 // Stale event
680 break;
681 }
682
683 if (msg->what() == kWhatMonitorQueue) {
684 onMonitorQueue();
685 } else {
686 onDownloadNext();
687 }
688 break;
689 }
690
691 case kWhatResumeUntil:
692 {
693 onResumeUntil(msg);
694 break;
695 }
696
697 default:
698 TRESPASS();
699 }
700 }
701
onStart(const sp<AMessage> & msg)702 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
703 mPacketSources.clear();
704 mStopParams.clear();
705 mStartTimeUsNotify = mNotify->dup();
706 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
707 mStartTimeUsNotify->setString("uri", mURI);
708
709 uint32_t streamTypeMask;
710 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
711
712 int64_t startTimeUs;
713 int64_t segmentStartTimeUs;
714 int32_t startDiscontinuitySeq;
715 int32_t seekMode;
716 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
717 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
718 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
719 CHECK(msg->findInt32("seekMode", &seekMode));
720
721 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
722 void *ptr;
723 CHECK(msg->findPointer("audioSource", &ptr));
724
725 mPacketSources.add(
726 LiveSession::STREAMTYPE_AUDIO,
727 static_cast<AnotherPacketSource *>(ptr));
728 }
729
730 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
731 void *ptr;
732 CHECK(msg->findPointer("videoSource", &ptr));
733
734 mPacketSources.add(
735 LiveSession::STREAMTYPE_VIDEO,
736 static_cast<AnotherPacketSource *>(ptr));
737 }
738
739 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
740 void *ptr;
741 CHECK(msg->findPointer("subtitleSource", &ptr));
742
743 mPacketSources.add(
744 LiveSession::STREAMTYPE_SUBTITLES,
745 static_cast<AnotherPacketSource *>(ptr));
746 }
747
748 void *ptr;
749 // metadataSource is not part of streamTypeMask
750 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
751 && msg->findPointer("metadataSource", &ptr)) {
752 mPacketSources.add(
753 LiveSession::STREAMTYPE_METADATA,
754 static_cast<AnotherPacketSource *>(ptr));
755 }
756
757 mStreamTypeMask = streamTypeMask;
758
759 mSegmentStartTimeUs = segmentStartTimeUs;
760
761 if (startDiscontinuitySeq >= 0) {
762 mDiscontinuitySeq = startDiscontinuitySeq;
763 }
764
765 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
766 mSeekMode = (LiveSession::SeekMode) seekMode;
767
768 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
769 mStartup = true;
770 mIDRFound = false;
771 mVideoBuffer->clear();
772 }
773
774 if (startTimeUs >= 0) {
775 mStartTimeUs = startTimeUs;
776 mFirstPTSValid = false;
777 mSeqNumber = -1;
778 mTimeChangeSignaled = false;
779 mDownloadState->resetState();
780 }
781
782 postMonitorQueue();
783
784 return OK;
785 }
786
onPause()787 void PlaylistFetcher::onPause() {
788 cancelMonitorQueue();
789 mLastDiscontinuitySeq = mDiscontinuitySeq;
790
791 resetStoppingThreshold(false /* disconnect */);
792 }
793
onStop(const sp<AMessage> & msg)794 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
795 cancelMonitorQueue();
796
797 int32_t clear;
798 CHECK(msg->findInt32("clear", &clear));
799 if (clear) {
800 for (size_t i = 0; i < mPacketSources.size(); i++) {
801 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
802 packetSource->clear();
803 }
804 }
805
806 mDownloadState->resetState();
807 mPacketSources.clear();
808 mStreamTypeMask = 0;
809
810 resetStoppingThreshold(true /* disconnect */);
811 }
812
813 // Resume until we have reached the boundary timestamps listed in `msg`; when
814 // the remaining time is too short (within a resume threshold) stop immediately
815 // instead.
onResumeUntil(const sp<AMessage> & msg)816 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
817 sp<AMessage> params;
818 CHECK(msg->findMessage("params", ¶ms));
819
820 mStopParams = params;
821 onDownloadNext();
822
823 return OK;
824 }
825
notifyStopReached()826 void PlaylistFetcher::notifyStopReached() {
827 sp<AMessage> notify = mNotify->dup();
828 notify->setInt32("what", kWhatStopReached);
829 notify->post();
830 }
831
notifyError(status_t err)832 void PlaylistFetcher::notifyError(status_t err) {
833 sp<AMessage> notify = mNotify->dup();
834 notify->setInt32("what", kWhatError);
835 notify->setInt32("err", err);
836 notify->post();
837 }
838
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)839 void PlaylistFetcher::queueDiscontinuity(
840 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
841 for (size_t i = 0; i < mPacketSources.size(); ++i) {
842 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
843 // (seek will discard buffer by abandoning old fetchers)
844 mPacketSources.valueAt(i)->queueDiscontinuity(
845 type, extra, false /* discard */);
846 }
847 }
848
onMonitorQueue()849 void PlaylistFetcher::onMonitorQueue() {
850 // in the middle of an unfinished download, delay
851 // playlist refresh as it'll change seq numbers
852 if (!mDownloadState->hasSavedState()) {
853 status_t err = refreshPlaylist();
854 if (err != OK) {
855 if (mNumRetriesForMonitorQueue < kMaxNumRetries) {
856 ++mNumRetriesForMonitorQueue;
857 } else {
858 notifyError(err);
859 }
860 return;
861 } else {
862 mNumRetriesForMonitorQueue = 0;
863 }
864 }
865
866 int64_t targetDurationUs = kMinBufferedDurationUs;
867 if (mPlaylist != NULL) {
868 targetDurationUs = mPlaylist->getTargetDuration();
869 }
870
871 int64_t bufferedDurationUs = 0LL;
872 status_t finalResult = OK;
873 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
874 sp<AnotherPacketSource> packetSource =
875 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
876
877 bufferedDurationUs =
878 packetSource->getBufferedDurationUs(&finalResult);
879 } else {
880 // Use min stream duration, but ignore streams that never have any packet
881 // enqueued to prevent us from waiting on a non-existent stream;
882 // when we cannot make out from the manifest what streams are included in
883 // a playlist we might assume extra streams.
884 bufferedDurationUs = -1LL;
885 for (size_t i = 0; i < mPacketSources.size(); ++i) {
886 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
887 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
888 continue;
889 }
890
891 int64_t bufferedStreamDurationUs =
892 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
893
894 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
895
896 if (bufferedDurationUs == -1LL
897 || bufferedStreamDurationUs < bufferedDurationUs) {
898 bufferedDurationUs = bufferedStreamDurationUs;
899 }
900 }
901 if (bufferedDurationUs == -1LL) {
902 bufferedDurationUs = 0LL;
903 }
904 }
905
906 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
907 FLOGV("monitoring, buffered=%lld < %lld",
908 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
909
910 // delay the next download slightly; hopefully this gives other concurrent fetchers
911 // a better chance to run.
912 // onDownloadNext();
913 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
914 msg->setInt32("generation", mMonitorQueueGeneration);
915 msg->post(1000L);
916 } else {
917 // We'd like to maintain buffering above durationToBufferUs, so try
918 // again when buffer just about to go below durationToBufferUs
919 // (or after targetDurationUs / 2, whichever is smaller).
920 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000LL;
921 if (delayUs > targetDurationUs / 2) {
922 delayUs = targetDurationUs / 2;
923 }
924
925 FLOGV("pausing for %lld, buffered=%lld > %lld",
926 (long long)delayUs,
927 (long long)bufferedDurationUs,
928 (long long)kMinBufferedDurationUs);
929
930 postMonitorQueue(delayUs);
931 }
932 }
933
refreshPlaylist()934 status_t PlaylistFetcher::refreshPlaylist() {
935 if (delayUsToRefreshPlaylist() <= 0) {
936 bool unchanged;
937 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
938 mURI.c_str(), mPlaylistHash, &unchanged);
939
940 if (playlist == NULL) {
941 if (unchanged) {
942 // We succeeded in fetching the playlist, but it was
943 // unchanged from the last time we tried.
944
945 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
946 mRefreshState = (RefreshState)(mRefreshState + 1);
947 }
948 } else {
949 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
950 return ERROR_IO;
951 }
952 } else {
953 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
954 mPlaylist = playlist;
955
956 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
957 updateDuration();
958 }
959 // Notify LiveSession to use target-duration based buffering level
960 // for up/down switch. Default LiveSession::kUpSwitchMark may not
961 // be reachable for live streams, as our max buffering amount is
962 // limited to 3 segments.
963 if (!mPlaylist->isComplete()) {
964 updateTargetDuration();
965 }
966 mPlaylistTimeUs = ALooper::GetNowUs();
967 }
968
969 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
970 }
971 return OK;
972 }
973
974 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)975 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
976 return buffer->size() > 0 && buffer->data()[0] == 0x47;
977 }
978
shouldPauseDownload()979 bool PlaylistFetcher::shouldPauseDownload() {
980 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
981 // doesn't apply to subtitles
982 return false;
983 }
984
985 // Calculate threshold to abort current download
986 float thresholdRatio = getStoppingThreshold();
987
988 if (thresholdRatio < 0.0f) {
989 // never abort
990 return false;
991 } else if (thresholdRatio == 0.0f) {
992 // immediately abort
993 return true;
994 }
995
996 // now we have a positive thresholdUs, abort if remaining
997 // portion to download is over that threshold.
998 if (mSegmentFirstPTS < 0) {
999 // this means we haven't even find the first access unit,
1000 // abort now as we must be very far away from the end.
1001 return true;
1002 }
1003 int64_t lastEnqueueUs = mSegmentFirstPTS;
1004 for (size_t i = 0; i < mPacketSources.size(); ++i) {
1005 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
1006 continue;
1007 }
1008 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
1009 int32_t type;
1010 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
1011 continue;
1012 }
1013 int64_t tmpUs;
1014 CHECK(meta->findInt64("timeUs", &tmpUs));
1015 if (tmpUs > lastEnqueueUs) {
1016 lastEnqueueUs = tmpUs;
1017 }
1018 }
1019 lastEnqueueUs -= mSegmentFirstPTS;
1020
1021 int64_t targetDurationUs = mPlaylist->getTargetDuration();
1022 int64_t thresholdUs = thresholdRatio * targetDurationUs;
1023
1024 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1025 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1026 (long long)thresholdUs,
1027 (long long)(targetDurationUs - lastEnqueueUs));
1028
1029 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1030 return true;
1031 }
1032 return false;
1033 }
1034
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1035 void PlaylistFetcher::initSeqNumberForLiveStream(
1036 int32_t &firstSeqNumberInPlaylist,
1037 int32_t &lastSeqNumberInPlaylist) {
1038 // start at least 3 target durations from the end.
1039 int64_t timeFromEnd = 0;
1040 size_t index = mPlaylist->size();
1041 sp<AMessage> itemMeta;
1042 int64_t itemDurationUs;
1043 int32_t targetDuration;
1044 if (mPlaylist->meta() != NULL
1045 && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1046 do {
1047 --index;
1048 if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1049 || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1050 ALOGW("item or itemDurationUs missing");
1051 mSeqNumber = lastSeqNumberInPlaylist - 3;
1052 break;
1053 }
1054
1055 timeFromEnd += itemDurationUs;
1056 mSeqNumber = firstSeqNumberInPlaylist + index;
1057 } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1058 } else {
1059 ALOGW("target-duration missing");
1060 mSeqNumber = lastSeqNumberInPlaylist - 3;
1061 }
1062
1063 if (mSeqNumber < firstSeqNumberInPlaylist) {
1064 mSeqNumber = firstSeqNumberInPlaylist;
1065 }
1066 }
1067
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1068 bool PlaylistFetcher::initDownloadState(
1069 AString &uri,
1070 sp<AMessage> &itemMeta,
1071 int32_t &firstSeqNumberInPlaylist,
1072 int32_t &lastSeqNumberInPlaylist) {
1073 status_t err = refreshPlaylist();
1074 firstSeqNumberInPlaylist = 0;
1075 lastSeqNumberInPlaylist = 0;
1076 bool discontinuity = false;
1077
1078 if (mPlaylist != NULL) {
1079 mPlaylist->getSeqNumberRange(
1080 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1081
1082 if (mDiscontinuitySeq < 0) {
1083 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1084 }
1085 }
1086
1087 mSegmentFirstPTS = -1LL;
1088
1089 if (mPlaylist != NULL && mSeqNumber < 0) {
1090 CHECK_GE(mStartTimeUs, 0LL);
1091
1092 if (mSegmentStartTimeUs < 0) {
1093 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1094 // this is a live session
1095 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1096 } else {
1097 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1098 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1099 // and relative position inside a segment
1100 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1101 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1102 }
1103 mStartTimeUsRelative = true;
1104 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1105 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1106 lastSeqNumberInPlaylist);
1107 } else {
1108 // When adapting or track switching, mSegmentStartTimeUs (relative
1109 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1110 // timestamps coming from the media container) is used to determine the position
1111 // inside a segments.
1112 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1113 && mSeekMode != LiveSession::kSeekModeNextSample) {
1114 // avoid double fetch/decode
1115 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1116 // for the starting segment in new variant.
1117 // If the two variants' segments are aligned, this gives the
1118 // next segment. If they're not aligned, this gives the segment
1119 // that overlaps no more than 1/2 * targetDurationUs.
1120 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1121 + mPlaylist->getTargetDuration() / 2);
1122 } else {
1123 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1124 }
1125 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1126 if (mSeqNumber < minSeq) {
1127 mSeqNumber = minSeq;
1128 }
1129
1130 if (mSeqNumber < firstSeqNumberInPlaylist) {
1131 mSeqNumber = firstSeqNumberInPlaylist;
1132 }
1133
1134 if (mSeqNumber > lastSeqNumberInPlaylist) {
1135 mSeqNumber = lastSeqNumberInPlaylist;
1136 }
1137 FLOGV("Initial sequence number is %d from (%d .. %d)",
1138 mSeqNumber, firstSeqNumberInPlaylist,
1139 lastSeqNumberInPlaylist);
1140 }
1141 }
1142
1143 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1144 if (mSeqNumber < firstSeqNumberInPlaylist
1145 || mSeqNumber > lastSeqNumberInPlaylist
1146 || err != OK) {
1147 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1148 ++mNumRetries;
1149
1150 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1151 // make sure we reach this retry logic on refresh failures
1152 // by adding an err != OK clause to all enclosing if's.
1153
1154 // refresh in increasing fraction (1/2, 1/3, ...) of the
1155 // playlist's target duration or 3 seconds, whichever is less
1156 int64_t delayUs = kMaxMonitorDelayUs;
1157 if (mPlaylist != NULL) {
1158 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1159 / (1 + mNumRetries);
1160 }
1161 if (delayUs > kMaxMonitorDelayUs) {
1162 delayUs = kMaxMonitorDelayUs;
1163 }
1164 FLOGV("sequence number high: %d from (%d .. %d), "
1165 "monitor in %lld (retry=%d)",
1166 mSeqNumber, firstSeqNumberInPlaylist,
1167 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1168 postMonitorQueue(delayUs);
1169 return false;
1170 }
1171
1172 if (err != OK) {
1173 notifyError(err);
1174 return false;
1175 }
1176
1177 // we've missed the boat, let's start 3 segments prior to the latest sequence
1178 // number available and signal a discontinuity.
1179
1180 ALOGI("We've missed the boat, restarting playback."
1181 " mStartup=%d, was looking for %d in %d-%d",
1182 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1183 lastSeqNumberInPlaylist);
1184 if (mStopParams != NULL) {
1185 // we should have kept on fetching until we hit the boundaries in mStopParams,
1186 // but since the segments we are supposed to fetch have already rolled off
1187 // the playlist, i.e. we have already missed the boat, we inevitably have to
1188 // skip.
1189 notifyStopReached();
1190 return false;
1191 }
1192 mSeqNumber = lastSeqNumberInPlaylist - 3;
1193 if (mSeqNumber < firstSeqNumberInPlaylist) {
1194 mSeqNumber = firstSeqNumberInPlaylist;
1195 }
1196 discontinuity = true;
1197
1198 // fall through
1199 } else {
1200 if (mPlaylist != NULL) {
1201 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1202 && !mPlaylist->isComplete()) {
1203 // Live playlists
1204 ALOGW("sequence number %d not yet available", mSeqNumber);
1205 postMonitorQueue(delayUsToRefreshPlaylist());
1206 return false;
1207 }
1208 ALOGE("Cannot find sequence number %d in playlist "
1209 "(contains %d - %d)",
1210 mSeqNumber, firstSeqNumberInPlaylist,
1211 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1212
1213 if (mTSParser != NULL) {
1214 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1215 // Use an empty buffer; we don't have any new data, just want to extract
1216 // potential new access units after flush. Reset mSeqNumber to
1217 // lastSeqNumberInPlaylist such that we set the correct access unit
1218 // properties in extractAndQueueAccessUnitsFromTs.
1219 sp<ABuffer> buffer = new ABuffer(0);
1220 mSeqNumber = lastSeqNumberInPlaylist;
1221 extractAndQueueAccessUnitsFromTs(buffer);
1222 }
1223 notifyError(ERROR_END_OF_STREAM);
1224 } else {
1225 // It's possible that we were never able to download the playlist.
1226 // In this case we should notify error, instead of EOS, as EOS during
1227 // prepare means we succeeded in downloading everything.
1228 ALOGE("Failed to download playlist!");
1229 notifyError(ERROR_IO);
1230 }
1231
1232 return false;
1233 }
1234 }
1235
1236 mNumRetries = 0;
1237
1238 CHECK(mPlaylist->itemAt(
1239 mSeqNumber - firstSeqNumberInPlaylist,
1240 &uri,
1241 &itemMeta));
1242
1243 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1244
1245 int32_t val;
1246 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1247 discontinuity = true;
1248 } else if (mLastDiscontinuitySeq >= 0
1249 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1250 // Seek jumped to a new discontinuity sequence. We need to signal
1251 // a format change to decoder. Decoder needs to shutdown and be
1252 // created again if seamless format change is unsupported.
1253 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1254 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1255 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1256 discontinuity = true;
1257 }
1258 mLastDiscontinuitySeq = -1;
1259
1260 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1261 // this avoids interleaved connections to the key and segment file.
1262 {
1263 sp<ABuffer> junk = new ABuffer(16);
1264 junk->setRange(0, 16);
1265 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1266 true /* first */);
1267 if (err == ERROR_NOT_CONNECTED) {
1268 return false;
1269 } else if (err != OK) {
1270 notifyError(err);
1271 return false;
1272 }
1273 }
1274
1275 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1276 // We need to signal a time discontinuity to ATSParser on the
1277 // first segment after start, or on a discontinuity segment.
1278 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1279 // to send the time discontinuity.
1280 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1281 // If this was a live event this made no sense since
1282 // we don't have access to all the segment before the current
1283 // one.
1284 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1285 }
1286
1287 // Setting mTimeChangeSignaled to true, so that if start time
1288 // searching goes into 2nd segment (without a discontinuity),
1289 // we don't reset time again. It causes corruption when pending
1290 // data in ATSParser is cleared.
1291 mTimeChangeSignaled = true;
1292 }
1293
1294 if (discontinuity) {
1295 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1296
1297 // Signal a format discontinuity to ATSParser to clear partial data
1298 // from previous streams. Not doing this causes bitstream corruption.
1299 if (mTSParser != NULL) {
1300 mTSParser.clear();
1301 }
1302
1303 queueDiscontinuity(
1304 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1305 NULL /* extra */);
1306
1307 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1308 // This means we guessed mStartTimeUs to be in the previous
1309 // segment (likely very close to the end), but either video or
1310 // audio has not found start by the end of that segment.
1311 //
1312 // If this new segment is not a discontinuity, keep searching.
1313 //
1314 // If this new segment even got a discontinuity marker, just
1315 // set mStartTimeUs=0, and take all samples from now on.
1316 mStartTimeUs = 0;
1317 mFirstPTSValid = false;
1318 mIDRFound = false;
1319 mVideoBuffer->clear();
1320 }
1321 }
1322
1323 FLOGV("fetching segment %d from (%d .. %d)",
1324 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1325 return true;
1326 }
1327
onDownloadNext()1328 void PlaylistFetcher::onDownloadNext() {
1329 AString uri;
1330 sp<AMessage> itemMeta;
1331 sp<ABuffer> buffer;
1332 sp<ABuffer> tsBuffer;
1333 int32_t firstSeqNumberInPlaylist = 0;
1334 int32_t lastSeqNumberInPlaylist = 0;
1335 bool connectHTTP = true;
1336
1337 if (mDownloadState->hasSavedState()) {
1338 mDownloadState->restoreState(
1339 uri,
1340 itemMeta,
1341 buffer,
1342 tsBuffer,
1343 firstSeqNumberInPlaylist,
1344 lastSeqNumberInPlaylist);
1345 connectHTTP = false;
1346 FLOGV("resuming: '%s'", uri.c_str());
1347 } else {
1348 if (!initDownloadState(
1349 uri,
1350 itemMeta,
1351 firstSeqNumberInPlaylist,
1352 lastSeqNumberInPlaylist)) {
1353 return;
1354 }
1355 FLOGV("fetching: '%s'", uri.c_str());
1356 }
1357
1358 int64_t range_offset, range_length;
1359 if (!itemMeta->findInt64("range-offset", &range_offset)
1360 || !itemMeta->findInt64("range-length", &range_length)) {
1361 range_offset = 0;
1362 range_length = -1;
1363 }
1364
1365 // block-wise download
1366 bool shouldPause = false;
1367 ssize_t bytesRead;
1368 do {
1369 int64_t startUs = ALooper::GetNowUs();
1370 bytesRead = mHTTPDownloader->fetchBlock(
1371 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1372 NULL /* actualURL */, connectHTTP);
1373 int64_t delayUs = ALooper::GetNowUs() - startUs;
1374
1375 if (bytesRead == ERROR_NOT_CONNECTED) {
1376 return;
1377 }
1378 if (bytesRead < 0) {
1379 status_t err = bytesRead;
1380 ALOGE("failed to fetch .ts segment at url '%s'", uriDebugString(uri).c_str());
1381 notifyError(err);
1382 return;
1383 }
1384
1385 // add sample for bandwidth estimation, excluding samples from subtitles (as
1386 // its too small), or during startup/resumeUntil (when we could have more than
1387 // one connection open which affects bandwidth)
1388 if (!mStartup && mStopParams == NULL && bytesRead > 0
1389 && (mStreamTypeMask
1390 & (LiveSession::STREAMTYPE_AUDIO
1391 | LiveSession::STREAMTYPE_VIDEO))) {
1392 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1393 if (delayUs > 2000000LL) {
1394 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1395 bytesRead, (double)delayUs / 1.0e6);
1396 }
1397 }
1398
1399 connectHTTP = false;
1400
1401 CHECK(buffer != NULL);
1402
1403 size_t size = buffer->size();
1404 // Set decryption range.
1405 buffer->setRange(size - bytesRead, bytesRead);
1406 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1407 buffer->offset() == 0 /* first */);
1408 // Unset decryption range.
1409 buffer->setRange(0, size);
1410
1411 if (err != OK) {
1412 ALOGE("decryptBuffer failed w/ error %d", err);
1413
1414 notifyError(err);
1415 return;
1416 }
1417
1418 bool startUp = mStartup; // save current start up state
1419
1420 err = OK;
1421 if (bufferStartsWithTsSyncByte(buffer)) {
1422 // Incremental extraction is only supported for MPEG2 transport streams.
1423 if (tsBuffer == NULL) {
1424 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1425 tsBuffer->setRange(0, 0);
1426 } else if (tsBuffer->capacity() != buffer->capacity()) {
1427 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1428 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1429 tsBuffer->setRange(tsOff, tsSize);
1430 }
1431 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1432 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1433 }
1434
1435 if (err == -EAGAIN) {
1436 // starting sequence number too low/high
1437 mTSParser.clear();
1438 for (size_t i = 0; i < mPacketSources.size(); i++) {
1439 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1440 packetSource->clear();
1441 }
1442 postMonitorQueue();
1443 return;
1444 } else if (err == ERROR_OUT_OF_RANGE) {
1445 // reached stopping point
1446 notifyStopReached();
1447 return;
1448 } else if (err != OK) {
1449 notifyError(err);
1450 return;
1451 }
1452 // If we're switching, post start notification
1453 // this should only be posted when the last chunk is full processed by TSParser
1454 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1455 CHECK(mStartTimeUsNotify != NULL);
1456 mStartTimeUsNotify->post();
1457 mStartTimeUsNotify.clear();
1458 shouldPause = true;
1459 }
1460 if (shouldPause || shouldPauseDownload()) {
1461 // save state and return if this is not the last chunk,
1462 // leaving the fetcher in paused state.
1463 if (bytesRead != 0) {
1464 mDownloadState->saveState(
1465 uri,
1466 itemMeta,
1467 buffer,
1468 tsBuffer,
1469 firstSeqNumberInPlaylist,
1470 lastSeqNumberInPlaylist);
1471 return;
1472 }
1473 shouldPause = true;
1474 }
1475 } while (bytesRead != 0);
1476
1477 if (bufferStartsWithTsSyncByte(buffer)) {
1478 // If we don't see a stream in the program table after fetching a full ts segment
1479 // mark it as nonexistent.
1480 ATSParser::SourceType srcTypes[] =
1481 { ATSParser::VIDEO, ATSParser::AUDIO };
1482 LiveSession::StreamType streamTypes[] =
1483 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1484 const size_t kNumTypes = NELEM(srcTypes);
1485
1486 for (size_t i = 0; i < kNumTypes; i++) {
1487 ATSParser::SourceType srcType = srcTypes[i];
1488 LiveSession::StreamType streamType = streamTypes[i];
1489
1490 sp<AnotherPacketSource> source =
1491 static_cast<AnotherPacketSource *>(
1492 mTSParser->getSource(srcType).get());
1493
1494 if (!mTSParser->hasSource(srcType)) {
1495 ALOGW("MPEG2 Transport stream does not contain %s data.",
1496 srcType == ATSParser::VIDEO ? "video" : "audio");
1497
1498 mStreamTypeMask &= ~streamType;
1499 mPacketSources.removeItem(streamType);
1500 }
1501 }
1502
1503 }
1504
1505 if (checkDecryptPadding(buffer) != OK) {
1506 ALOGE("Incorrect padding bytes after decryption.");
1507 notifyError(ERROR_MALFORMED);
1508 return;
1509 }
1510
1511 if (tsBuffer != NULL) {
1512 AString method;
1513 CHECK(buffer->meta()->findString("cipher-method", &method));
1514 if ((tsBuffer->size() > 0 && method == "NONE")
1515 || tsBuffer->size() > 16) {
1516 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1517 "bytes in length.");
1518 notifyError(ERROR_MALFORMED);
1519 return;
1520 }
1521 }
1522
1523 // bulk extract non-ts files
1524 bool startUp = mStartup;
1525 if (tsBuffer == NULL) {
1526 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1527 if (err == -EAGAIN) {
1528 // starting sequence number too low/high
1529 postMonitorQueue();
1530 return;
1531 } else if (err == ERROR_OUT_OF_RANGE) {
1532 // reached stopping point
1533 notifyStopReached();
1534 return;
1535 } else if (err != OK) {
1536 notifyError(err);
1537 return;
1538 }
1539 }
1540
1541 ++mSeqNumber;
1542
1543 // if adapting, pause after found the next starting point
1544 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1545 CHECK(mStartTimeUsNotify != NULL);
1546 mStartTimeUsNotify->post();
1547 mStartTimeUsNotify.clear();
1548 shouldPause = true;
1549 }
1550
1551 if (!shouldPause) {
1552 postMonitorQueue();
1553 }
1554 }
1555
1556 /*
1557 * returns true if we need to adjust mSeqNumber
1558 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1559 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1560 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1561
1562 int64_t minDiffUs, maxDiffUs;
1563 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1564 // if the previous fetcher paused in the middle of a segment, we
1565 // want to start at a segment that overlaps the last sample
1566 minDiffUs = -mPlaylist->getTargetDuration();
1567 maxDiffUs = 0LL;
1568 } else {
1569 // if the previous fetcher paused at the end of a segment, ideally
1570 // we want to start at the segment that's roughly aligned with its
1571 // next segment, but if the two variants are not well aligned we
1572 // adjust the diff to within (-T/2, T/2)
1573 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1574 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1575 }
1576
1577 int32_t oldSeqNumber = mSeqNumber;
1578 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1579
1580 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1581 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1582 if (diffUs > maxDiffUs) {
1583 while (index > 0 && diffUs > maxDiffUs) {
1584 --index;
1585
1586 sp<AMessage> itemMeta;
1587 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1588
1589 int64_t itemDurationUs;
1590 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1591
1592 diffUs -= itemDurationUs;
1593 }
1594 } else if (diffUs < minDiffUs) {
1595 while (index + 1 < (ssize_t) mPlaylist->size()
1596 && diffUs < minDiffUs) {
1597 ++index;
1598
1599 sp<AMessage> itemMeta;
1600 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1601
1602 int64_t itemDurationUs;
1603 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1604
1605 diffUs += itemDurationUs;
1606 }
1607 }
1608
1609 mSeqNumber = firstSeqNumberInPlaylist + index;
1610
1611 if (mSeqNumber != oldSeqNumber) {
1612 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1613 (long long) anchorTimeUs - mStartTimeUs,
1614 (long long) minDiffUs,
1615 (long long) maxDiffUs);
1616 return true;
1617 }
1618 return false;
1619 }
1620
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1621 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1622 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1623
1624 size_t index = 0;
1625 while (index < mPlaylist->size()) {
1626 sp<AMessage> itemMeta;
1627 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1628 size_t curDiscontinuitySeq;
1629 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1630 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1631 if (curDiscontinuitySeq == discontinuitySeq) {
1632 return seqNumber;
1633 } else if (curDiscontinuitySeq > discontinuitySeq) {
1634 return seqNumber <= 0 ? 0 : seqNumber - 1;
1635 }
1636
1637 ++index;
1638 }
1639
1640 return firstSeqNumberInPlaylist + mPlaylist->size();
1641 }
1642
getSeqNumberForTime(int64_t timeUs) const1643 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1644 size_t index = 0;
1645 int64_t segmentStartUs = 0;
1646 while (index < mPlaylist->size()) {
1647 sp<AMessage> itemMeta;
1648 CHECK(mPlaylist->itemAt(
1649 index, NULL /* uri */, &itemMeta));
1650
1651 int64_t itemDurationUs;
1652 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1653
1654 if (timeUs < segmentStartUs + itemDurationUs) {
1655 break;
1656 }
1657
1658 segmentStartUs += itemDurationUs;
1659 ++index;
1660 }
1661
1662 if (index >= mPlaylist->size()) {
1663 index = mPlaylist->size() - 1;
1664 }
1665
1666 return mPlaylist->getFirstSeqNumber() + index;
1667 }
1668
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1669 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1670 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1671 sp<MetaData> format = source->getFormat();
1672 if (format != NULL) {
1673 // for simplicity, store a reference to the format in each unit
1674 accessUnit->meta()->setObject("format", format);
1675 }
1676
1677 if (discard) {
1678 accessUnit->meta()->setInt32("discard", discard);
1679 }
1680
1681 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1682 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1683 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1684 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1685 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1686 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1687 }
1688 return accessUnit;
1689 }
1690
isStartTimeReached(int64_t timeUs)1691 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1692 if (!mFirstPTSValid) {
1693 mFirstTimeUs = timeUs;
1694 mFirstPTSValid = true;
1695 }
1696 bool startTimeReached = true;
1697 if (mStartTimeUsRelative) {
1698 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1699 (long long)timeUs,
1700 (long long)mFirstTimeUs,
1701 (long long)(timeUs - mFirstTimeUs));
1702 timeUs -= mFirstTimeUs;
1703 if (timeUs < 0) {
1704 FLOGV("clamp negative timeUs to 0");
1705 timeUs = 0;
1706 }
1707 startTimeReached = (timeUs >= mStartTimeUs);
1708 }
1709 return startTimeReached;
1710 }
1711
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1712 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1713 if (mTSParser == NULL) {
1714 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1715 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1716 }
1717
1718 if (mNextPTSTimeUs >= 0LL) {
1719 sp<AMessage> extra = new AMessage;
1720 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1721 // ATSParser from skewing the timestamps of access units.
1722 extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1723
1724 // When adapting, signal a recent media time to the parser,
1725 // so that PTS wrap around is handled for the new variant.
1726 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1727 extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1728 }
1729
1730 mTSParser->signalDiscontinuity(
1731 ATSParser::DISCONTINUITY_TIME, extra);
1732
1733 mNextPTSTimeUs = -1LL;
1734 }
1735
1736 if (mSampleAesKeyItemChanged) {
1737 mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1738 mSampleAesKeyItemChanged = false;
1739 }
1740
1741 size_t offset = 0;
1742 while (offset + 188 <= buffer->size()) {
1743 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1744
1745 if (err != OK) {
1746 return err;
1747 }
1748
1749 offset += 188;
1750 }
1751 // setRange to indicate consumed bytes.
1752 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1753
1754 if (mSegmentFirstPTS < 0LL) {
1755 // get the smallest first PTS from all streams present in this parser
1756 for (size_t i = mPacketSources.size(); i > 0;) {
1757 i--;
1758 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1759 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1760 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1761 return ERROR_MALFORMED;
1762 }
1763 if (stream == LiveSession::STREAMTYPE_METADATA) {
1764 continue;
1765 }
1766 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1767 sp<AnotherPacketSource> source =
1768 static_cast<AnotherPacketSource *>(
1769 mTSParser->getSource(type).get());
1770
1771 if (source == NULL) {
1772 continue;
1773 }
1774 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1775 if (meta != NULL) {
1776 int64_t timeUs;
1777 CHECK(meta->findInt64("timeUs", &timeUs));
1778 if (mSegmentFirstPTS < 0LL || timeUs < mSegmentFirstPTS) {
1779 mSegmentFirstPTS = timeUs;
1780 }
1781 }
1782 }
1783 if (mSegmentFirstPTS < 0LL) {
1784 // didn't find any TS packet, can return early
1785 return OK;
1786 }
1787 if (!mStartTimeUsRelative) {
1788 // mStartup
1789 // mStartup is true until we have queued a packet for all the streams
1790 // we are fetching. We queue packets whose timestamps are greater than
1791 // mStartTimeUs.
1792 // mSegmentStartTimeUs >= 0
1793 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1794 // adjustSeqNumberWithAnchorTime(timeUs) == true
1795 // we guessed a seq number that's either too large or too small.
1796 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1797 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1798 // to -1 so that we don't enter this chunk next time.
1799 if (mStartup && mSegmentStartTimeUs >= 0
1800 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1801 mStartTimeUsNotify = mNotify->dup();
1802 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1803 mStartTimeUsNotify->setString("uri", mURI);
1804 mIDRFound = false;
1805 mSegmentStartTimeUs = -1;
1806 return -EAGAIN;
1807 }
1808 }
1809 }
1810
1811 status_t err = OK;
1812 for (size_t i = mPacketSources.size(); i > 0;) {
1813 i--;
1814 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1815
1816 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1817 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1818 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1819 return ERROR_MALFORMED;
1820 }
1821
1822 const char *key = LiveSession::getKeyForStream(stream);
1823 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1824
1825 sp<AnotherPacketSource> source =
1826 static_cast<AnotherPacketSource *>(
1827 mTSParser->getSource(type).get());
1828
1829 if (source == NULL) {
1830 continue;
1831 }
1832
1833 const char *mime;
1834 sp<MetaData> format = source->getFormat();
1835 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1836 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1837
1838 sp<ABuffer> accessUnit;
1839 status_t finalResult;
1840 while (source->hasBufferAvailable(&finalResult)
1841 && source->dequeueAccessUnit(&accessUnit) == OK) {
1842
1843 int64_t timeUs;
1844 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1845
1846 if (mStartup) {
1847 bool startTimeReached = isStartTimeReached(timeUs);
1848
1849 if (!startTimeReached || (isAvc && !mIDRFound)) {
1850 // buffer up to the closest preceding IDR frame in the next segement,
1851 // or the closest succeeding IDR frame after the exact position
1852 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1853 (long long)timeUs,
1854 (long long)mStartTimeUs,
1855 (long long)timeUs - mStartTimeUs,
1856 mIDRFound);
1857 if (isAvc) {
1858 if (IsIDR(accessUnit->data(), accessUnit->size())) {
1859 mVideoBuffer->clear();
1860 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1861 mIDRFound = true;
1862 }
1863 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1864 mVideoBuffer->queueAccessUnit(accessUnit);
1865 FSLOGV(stream, "saving AVC video AccessUnit");
1866 }
1867 }
1868 if (!startTimeReached || (isAvc && !mIDRFound)) {
1869 continue;
1870 }
1871 }
1872 }
1873
1874 if (mStartTimeUsNotify != NULL) {
1875 uint32_t streamMask = 0;
1876 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1877 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1878 && !(streamMask & mPacketSources.keyAt(i))) {
1879 streamMask |= mPacketSources.keyAt(i);
1880 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1881 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1882 (long long)timeUs, streamMask);
1883
1884 if (streamMask == mStreamTypeMask) {
1885 FLOGV("found start point for all streams");
1886 mStartup = false;
1887 }
1888 }
1889 }
1890
1891 if (mStopParams != NULL) {
1892 int32_t discontinuitySeq;
1893 int64_t stopTimeUs;
1894 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1895 || discontinuitySeq > mDiscontinuitySeq
1896 || !mStopParams->findInt64(key, &stopTimeUs)
1897 || (discontinuitySeq == mDiscontinuitySeq
1898 && timeUs >= stopTimeUs)) {
1899 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1900 mStreamTypeMask &= ~stream;
1901 mPacketSources.removeItemsAt(i);
1902 break;
1903 }
1904 }
1905
1906 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1907 const bool discard = true;
1908 status_t status;
1909 while (mVideoBuffer->hasBufferAvailable(&status)) {
1910 sp<ABuffer> videoBuffer;
1911 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1912 setAccessUnitProperties(videoBuffer, source, discard);
1913 packetSource->queueAccessUnit(videoBuffer);
1914 int64_t bufferTimeUs;
1915 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1916 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1917 (long long)bufferTimeUs);
1918 }
1919 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1920 mHasMetadata = true;
1921 sp<AMessage> notify = mNotify->dup();
1922 notify->setInt32("what", kWhatMetadataDetected);
1923 notify->post();
1924 }
1925
1926 setAccessUnitProperties(accessUnit, source);
1927 packetSource->queueAccessUnit(accessUnit);
1928 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1929 }
1930
1931 if (err != OK) {
1932 break;
1933 }
1934 }
1935
1936 if (err != OK) {
1937 for (size_t i = mPacketSources.size(); i > 0;) {
1938 i--;
1939 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1940 packetSource->clear();
1941 }
1942 return err;
1943 }
1944
1945 if (!mStreamTypeMask) {
1946 // Signal gap is filled between original and new stream.
1947 FLOGV("reached stop point for all streams");
1948 return ERROR_OUT_OF_RANGE;
1949 }
1950
1951 return OK;
1952 }
1953
1954 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1955 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1956 const sp<ABuffer> &buffer) {
1957 size_t pos = 0;
1958
1959 // skip possible BOM
1960 if (buffer->size() >= pos + 3 &&
1961 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1962 pos += 3;
1963 }
1964
1965 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1966 if (buffer->size() < pos + 6 ||
1967 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1968 return false;
1969 }
1970 pos += 6;
1971
1972 if (buffer->size() == pos) {
1973 return true;
1974 }
1975
1976 uint8_t sep = buffer->data()[pos];
1977 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1978 }
1979
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1980 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1981 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1982 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1983 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1984 ALOGE("This stream only contains subtitles.");
1985 return ERROR_MALFORMED;
1986 }
1987
1988 const sp<AnotherPacketSource> packetSource =
1989 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1990
1991 int64_t durationUs;
1992 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1993 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1994 buffer->meta()->setInt64("durationUs", durationUs);
1995 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1996 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1997 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1998 packetSource->queueAccessUnit(buffer);
1999 return OK;
2000 }
2001
2002 if (mNextPTSTimeUs >= 0LL) {
2003 mNextPTSTimeUs = -1LL;
2004 }
2005
2006 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
2007 // stream prefixed by an ID3 tag.
2008
2009 bool firstID3Tag = true;
2010 uint64_t PTS = 0;
2011
2012 for (;;) {
2013 // Make sure to skip all ID3 tags preceding the audio data.
2014 // At least one must be present to provide the PTS timestamp.
2015
2016 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2017 if (!id3.isValid()) {
2018 if (firstID3Tag) {
2019 ALOGE("Unable to parse ID3 tag.");
2020 return ERROR_MALFORMED;
2021 } else {
2022 break;
2023 }
2024 }
2025
2026 if (firstID3Tag) {
2027 bool found = false;
2028
2029 ID3::Iterator it(id3, "PRIV");
2030 while (!it.done()) {
2031 size_t length;
2032 const uint8_t *data = it.getData(&length);
2033 if (!data) {
2034 return ERROR_MALFORMED;
2035 }
2036
2037 static const char *kMatchName =
2038 "com.apple.streaming.transportStreamTimestamp";
2039 static const size_t kMatchNameLen = strlen(kMatchName);
2040
2041 if (length == kMatchNameLen + 1 + 8
2042 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2043 found = true;
2044 PTS = U64_AT(&data[kMatchNameLen + 1]);
2045 }
2046
2047 it.next();
2048 }
2049
2050 if (!found) {
2051 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2052 return ERROR_MALFORMED;
2053 }
2054 }
2055
2056 // skip the ID3 tag
2057 buffer->setRange(
2058 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2059
2060 firstID3Tag = false;
2061 }
2062
2063 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2064 ALOGW("This stream only contains audio data!");
2065
2066 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2067
2068 if (mStreamTypeMask == 0) {
2069 return OK;
2070 }
2071 }
2072
2073 sp<AnotherPacketSource> packetSource =
2074 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2075
2076 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2077 ABitReader bits(buffer->data(), buffer->size());
2078
2079 // adts_fixed_header
2080
2081 CHECK_EQ(bits.getBits(12), 0xfffu);
2082 bits.skipBits(3); // ID, layer
2083 bool protection_absent __unused = bits.getBits(1) != 0;
2084
2085 unsigned profile = bits.getBits(2);
2086 CHECK_NE(profile, 3u);
2087 unsigned sampling_freq_index = bits.getBits(4);
2088 bits.getBits(1); // private_bit
2089 unsigned channel_configuration = bits.getBits(3);
2090 CHECK_NE(channel_configuration, 0u);
2091 bits.skipBits(2); // original_copy, home
2092
2093 sp<MetaData> meta = new MetaData();
2094 MakeAACCodecSpecificData(*meta,
2095 profile, sampling_freq_index, channel_configuration);
2096
2097 meta->setInt32(kKeyIsADTS, true);
2098
2099 packetSource->setFormat(meta);
2100 }
2101
2102 int64_t numSamples = 0LL;
2103 int32_t sampleRate;
2104 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2105
2106 int64_t timeUs = (PTS * 100LL) / 9LL;
2107 if (mStartup && !mFirstPTSValid) {
2108 mFirstPTSValid = true;
2109 mFirstTimeUs = timeUs;
2110 }
2111
2112 if (mSegmentFirstPTS < 0LL) {
2113 mSegmentFirstPTS = timeUs;
2114 if (!mStartTimeUsRelative) {
2115 // Duplicated logic from how we handle .ts playlists.
2116 if (mStartup && mSegmentStartTimeUs >= 0
2117 && adjustSeqNumberWithAnchorTime(timeUs)) {
2118 mSegmentStartTimeUs = -1;
2119 return -EAGAIN;
2120 }
2121 }
2122 }
2123
2124 sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2125 if (mSampleAesKeyItem != NULL) {
2126 ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s IV: %s",
2127 mSeqNumber,
2128 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2129 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2130
2131 sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2132 }
2133
2134 int frameId = 0;
2135
2136 size_t offset = 0;
2137 while (offset < buffer->size()) {
2138 const uint8_t *adtsHeader = buffer->data() + offset;
2139 if (buffer->size() <= offset+5) {
2140 ALOGV("buffer does not contain a complete header");
2141 return ERROR_MALFORMED;
2142 }
2143 // non-const pointer for decryption if needed
2144 uint8_t *adtsFrame = buffer->data() + offset;
2145
2146 unsigned aac_frame_length =
2147 ((adtsHeader[3] & 3) << 11)
2148 | (adtsHeader[4] << 3)
2149 | (adtsHeader[5] >> 5);
2150
2151 if (aac_frame_length == 0) {
2152 const uint8_t *id3Header = adtsHeader;
2153 if (!memcmp(id3Header, "ID3", 3)) {
2154 ID3 id3(id3Header, buffer->size() - offset, true);
2155 if (id3.isValid()) {
2156 offset += id3.rawSize();
2157 continue;
2158 };
2159 }
2160 return ERROR_MALFORMED;
2161 }
2162
2163 CHECK_LE(offset + aac_frame_length, buffer->size());
2164
2165 int64_t unitTimeUs = timeUs + numSamples * 1000000LL / sampleRate;
2166 offset += aac_frame_length;
2167
2168 // Each AAC frame encodes 1024 samples.
2169 numSamples += 1024;
2170
2171 if (mStartup) {
2172 int64_t startTimeUs = unitTimeUs;
2173 if (mStartTimeUsRelative) {
2174 startTimeUs -= mFirstTimeUs;
2175 if (startTimeUs < 0) {
2176 startTimeUs = 0;
2177 }
2178 }
2179 if (startTimeUs < mStartTimeUs) {
2180 continue;
2181 }
2182
2183 if (mStartTimeUsNotify != NULL) {
2184 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2185 mStartup = false;
2186 }
2187 }
2188
2189 if (mStopParams != NULL) {
2190 int32_t discontinuitySeq;
2191 int64_t stopTimeUs;
2192 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2193 || discontinuitySeq > mDiscontinuitySeq
2194 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2195 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2196 mStreamTypeMask = 0;
2197 mPacketSources.clear();
2198 return ERROR_OUT_OF_RANGE;
2199 }
2200 }
2201
2202 if (sampleDecryptor != NULL) {
2203 bool protection_absent = (adtsHeader[1] & 0x1);
2204 size_t headerSize = protection_absent ? 7 : 9;
2205 if (frameId == 0) {
2206 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2207 mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2208 }
2209
2210 sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2211 }
2212 frameId++;
2213
2214 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2215 memcpy(unit->data(), adtsHeader, aac_frame_length);
2216
2217 unit->meta()->setInt64("timeUs", unitTimeUs);
2218 setAccessUnitProperties(unit, packetSource);
2219 packetSource->queueAccessUnit(unit);
2220 }
2221
2222 return OK;
2223 }
2224
updateDuration()2225 void PlaylistFetcher::updateDuration() {
2226 int64_t durationUs = 0LL;
2227 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2228 sp<AMessage> itemMeta;
2229 CHECK(mPlaylist->itemAt(
2230 index, NULL /* uri */, &itemMeta));
2231
2232 int64_t itemDurationUs;
2233 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2234
2235 durationUs += itemDurationUs;
2236 }
2237
2238 sp<AMessage> msg = mNotify->dup();
2239 msg->setInt32("what", kWhatDurationUpdate);
2240 msg->setInt64("durationUs", durationUs);
2241 msg->post();
2242 }
2243
updateTargetDuration()2244 void PlaylistFetcher::updateTargetDuration() {
2245 sp<AMessage> msg = mNotify->dup();
2246 msg->setInt32("what", kWhatTargetDurationUpdate);
2247 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2248 msg->post();
2249 }
2250
2251 } // namespace android
2252