1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/ID3.h"
27 #include "mpeg2ts/AnotherPacketSource.h"
28 #include "mpeg2ts/HlsSampleDecryptor.h"
29
30 #include <media/stagefright/foundation/ABitReader.h>
31 #include <media/stagefright/foundation/ABuffer.h>
32 #include <media/stagefright/foundation/ADebug.h>
33 #include <media/stagefright/foundation/ByteUtils.h>
34 #include <media/stagefright/foundation/MediaKeys.h>
35 #include <media/stagefright/foundation/avc_utils.h>
36 #include <media/stagefright/DataURISource.h>
37 #include <media/stagefright/MediaDefs.h>
38 #include <media/stagefright/MetaData.h>
39 #include <media/stagefright/MetaDataUtils.h>
40 #include <media/stagefright/Utils.h>
41
42 #include <ctype.h>
43 #include <inttypes.h>
44
45 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
46 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
47 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
48
49 namespace android {
50
51 // static
52 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
53 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
54 // LCM of 188 (size of a TS packet) & 1k works well
55 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
56
57 struct PlaylistFetcher::DownloadState : public RefBase {
58 DownloadState();
59 void resetState();
60 bool hasSavedState() const;
61 void restoreState(
62 AString &uri,
63 sp<AMessage> &itemMeta,
64 sp<ABuffer> &buffer,
65 sp<ABuffer> &tsBuffer,
66 int32_t &firstSeqNumberInPlaylist,
67 int32_t &lastSeqNumberInPlaylist);
68 void saveState(
69 AString &uri,
70 sp<AMessage> &itemMeta,
71 sp<ABuffer> &buffer,
72 sp<ABuffer> &tsBuffer,
73 int32_t &firstSeqNumberInPlaylist,
74 int32_t &lastSeqNumberInPlaylist);
75
76 private:
77 bool mHasSavedState;
78 AString mUri;
79 sp<AMessage> mItemMeta;
80 sp<ABuffer> mBuffer;
81 sp<ABuffer> mTsBuffer;
82 int32_t mFirstSeqNumberInPlaylist;
83 int32_t mLastSeqNumberInPlaylist;
84 };
85
DownloadState()86 PlaylistFetcher::DownloadState::DownloadState() {
87 resetState();
88 }
89
hasSavedState() const90 bool PlaylistFetcher::DownloadState::hasSavedState() const {
91 return mHasSavedState;
92 }
93
resetState()94 void PlaylistFetcher::DownloadState::resetState() {
95 mHasSavedState = false;
96
97 mUri.clear();
98 mItemMeta = NULL;
99 mBuffer = NULL;
100 mTsBuffer = NULL;
101 mFirstSeqNumberInPlaylist = 0;
102 mLastSeqNumberInPlaylist = 0;
103 }
104
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)105 void PlaylistFetcher::DownloadState::restoreState(
106 AString &uri,
107 sp<AMessage> &itemMeta,
108 sp<ABuffer> &buffer,
109 sp<ABuffer> &tsBuffer,
110 int32_t &firstSeqNumberInPlaylist,
111 int32_t &lastSeqNumberInPlaylist) {
112 if (!mHasSavedState) {
113 return;
114 }
115
116 uri = mUri;
117 itemMeta = mItemMeta;
118 buffer = mBuffer;
119 tsBuffer = mTsBuffer;
120 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
121 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
122
123 resetState();
124 }
125
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)126 void PlaylistFetcher::DownloadState::saveState(
127 AString &uri,
128 sp<AMessage> &itemMeta,
129 sp<ABuffer> &buffer,
130 sp<ABuffer> &tsBuffer,
131 int32_t &firstSeqNumberInPlaylist,
132 int32_t &lastSeqNumberInPlaylist) {
133 mHasSavedState = true;
134
135 mUri = uri;
136 mItemMeta = itemMeta;
137 mBuffer = buffer;
138 mTsBuffer = tsBuffer;
139 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
140 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
141 }
142
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)143 PlaylistFetcher::PlaylistFetcher(
144 const sp<AMessage> ¬ify,
145 const sp<LiveSession> &session,
146 const char *uri,
147 int32_t id,
148 int32_t subtitleGeneration)
149 : mNotify(notify),
150 mSession(session),
151 mURI(uri),
152 mFetcherID(id),
153 mStreamTypeMask(0),
154 mStartTimeUs(-1ll),
155 mSegmentStartTimeUs(-1ll),
156 mDiscontinuitySeq(-1ll),
157 mStartTimeUsRelative(false),
158 mLastPlaylistFetchTimeUs(-1ll),
159 mPlaylistTimeUs(-1ll),
160 mSeqNumber(-1),
161 mNumRetries(0),
162 mStartup(true),
163 mIDRFound(false),
164 mSeekMode(LiveSession::kSeekModeExactPosition),
165 mTimeChangeSignaled(false),
166 mNextPTSTimeUs(-1ll),
167 mMonitorQueueGeneration(0),
168 mSubtitleGeneration(subtitleGeneration),
169 mLastDiscontinuitySeq(-1ll),
170 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
171 mFirstPTSValid(false),
172 mFirstTimeUs(-1ll),
173 mVideoBuffer(new AnotherPacketSource(NULL)),
174 mSampleAesKeyItemChanged(false),
175 mThresholdRatio(-1.0f),
176 mDownloadState(new DownloadState()),
177 mHasMetadata(false) {
178 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
179 mHTTPDownloader = mSession->getHTTPDownloader();
180
181 memset(mKeyData, 0, sizeof(mKeyData));
182 memset(mAESInitVec, 0, sizeof(mAESInitVec));
183 }
184
~PlaylistFetcher()185 PlaylistFetcher::~PlaylistFetcher() {
186 }
187
getFetcherID() const188 int32_t PlaylistFetcher::getFetcherID() const {
189 return mFetcherID;
190 }
191
getSegmentStartTimeUs(int32_t seqNumber) const192 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
193 CHECK(mPlaylist != NULL);
194
195 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
196 mPlaylist->getSeqNumberRange(
197 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
198
199 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
200 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
201
202 int64_t segmentStartUs = 0ll;
203 for (int32_t index = 0;
204 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
205 sp<AMessage> itemMeta;
206 CHECK(mPlaylist->itemAt(
207 index, NULL /* uri */, &itemMeta));
208
209 int64_t itemDurationUs;
210 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
211
212 segmentStartUs += itemDurationUs;
213 }
214
215 return segmentStartUs;
216 }
217
getSegmentDurationUs(int32_t seqNumber) const218 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
219 CHECK(mPlaylist != NULL);
220
221 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
222 mPlaylist->getSeqNumberRange(
223 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
224
225 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
226 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
227
228 int32_t index = seqNumber - firstSeqNumberInPlaylist;
229 sp<AMessage> itemMeta;
230 CHECK(mPlaylist->itemAt(
231 index, NULL /* uri */, &itemMeta));
232
233 int64_t itemDurationUs;
234 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
235
236 return itemDurationUs;
237 }
238
delayUsToRefreshPlaylist() const239 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
240 int64_t nowUs = ALooper::GetNowUs();
241
242 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
243 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
244 return 0ll;
245 }
246
247 if (mPlaylist->isComplete()) {
248 return (~0llu >> 1);
249 }
250
251 int64_t targetDurationUs = mPlaylist->getTargetDuration();
252
253 int64_t minPlaylistAgeUs;
254
255 switch (mRefreshState) {
256 case INITIAL_MINIMUM_RELOAD_DELAY:
257 {
258 size_t n = mPlaylist->size();
259 if (n > 0) {
260 sp<AMessage> itemMeta;
261 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
262
263 int64_t itemDurationUs;
264 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
265
266 minPlaylistAgeUs = itemDurationUs;
267 break;
268 }
269
270 // fall through
271 }
272
273 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
274 {
275 minPlaylistAgeUs = targetDurationUs / 2;
276 break;
277 }
278
279 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
280 {
281 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
282 break;
283 }
284
285 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
286 {
287 minPlaylistAgeUs = targetDurationUs * 3;
288 break;
289 }
290
291 default:
292 TRESPASS();
293 break;
294 }
295
296 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
297 return delayUs > 0ll ? delayUs : 0ll;
298 }
299
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)300 status_t PlaylistFetcher::decryptBuffer(
301 size_t playlistIndex, const sp<ABuffer> &buffer,
302 bool first) {
303 sp<AMessage> itemMeta;
304 bool found = false;
305 AString method;
306
307 for (ssize_t i = playlistIndex; i >= 0; --i) {
308 AString uri;
309 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
310
311 if (itemMeta->findString("cipher-method", &method)) {
312 found = true;
313 break;
314 }
315 }
316
317 // TODO: Revise this when we add support for KEYFORMAT
318 // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
319 if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
320 ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
321 mSampleAesKeyItem.get(), method.c_str());
322 mSampleAesKeyItem = NULL;
323 mSampleAesKeyItemChanged = true;
324 }
325
326 if (!found) {
327 method = "NONE";
328 }
329 buffer->meta()->setString("cipher-method", method.c_str());
330
331 if (method == "NONE") {
332 return OK;
333 } else if (method == "SAMPLE-AES") {
334 ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
335 } else if (!(method == "AES-128")) {
336 ALOGE("Unsupported cipher method '%s'", method.c_str());
337 return ERROR_UNSUPPORTED;
338 }
339
340 AString keyURI;
341 if (!itemMeta->findString("cipher-uri", &keyURI)) {
342 ALOGE("Missing key uri");
343 return ERROR_MALFORMED;
344 }
345
346 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
347
348 sp<ABuffer> key;
349 if (index >= 0) {
350 key = mAESKeyForURI.valueAt(index);
351 } else if (keyURI.startsWith("data:")) {
352 sp<DataSource> keySrc = DataURISource::Create(keyURI.c_str());
353 off64_t keyLen;
354 if (keySrc == NULL || keySrc->getSize(&keyLen) != OK || keyLen < 0) {
355 ALOGE("Malformed cipher key data uri.");
356 return ERROR_MALFORMED;
357 }
358 key = new ABuffer(keyLen);
359 keySrc->readAt(0, key->data(), keyLen);
360 key->setRange(0, keyLen);
361 } else {
362 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
363
364 if (err == ERROR_NOT_CONNECTED) {
365 return ERROR_NOT_CONNECTED;
366 } else if (err < 0) {
367 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
368 return ERROR_IO;
369 } else if (key->size() != 16) {
370 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
371 return ERROR_MALFORMED;
372 }
373
374 mAESKeyForURI.add(keyURI, key);
375 }
376
377 if (first) {
378 // If decrypting the first block in a file, read the iv from the manifest
379 // or derive the iv from the file's sequence number.
380
381 unsigned char AESInitVec[AES_BLOCK_SIZE];
382 AString iv;
383 if (itemMeta->findString("cipher-iv", &iv)) {
384 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
385 || iv.size() > 16 * 2 + 2) {
386 ALOGE("malformed cipher IV '%s'.", iv.c_str());
387 return ERROR_MALFORMED;
388 }
389
390 while (iv.size() < 16 * 2 + 2) {
391 iv.insert("0", 1, 2);
392 }
393
394 memset(AESInitVec, 0, sizeof(AESInitVec));
395 for (size_t i = 0; i < 16; ++i) {
396 char c1 = tolower(iv.c_str()[2 + 2 * i]);
397 char c2 = tolower(iv.c_str()[3 + 2 * i]);
398 if (!isxdigit(c1) || !isxdigit(c2)) {
399 ALOGE("malformed cipher IV '%s'.", iv.c_str());
400 return ERROR_MALFORMED;
401 }
402 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
403 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
404
405 AESInitVec[i] = nibble1 << 4 | nibble2;
406 }
407 } else {
408 memset(AESInitVec, 0, sizeof(AESInitVec));
409 AESInitVec[15] = mSeqNumber & 0xff;
410 AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
411 AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
412 AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
413 }
414
415 bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
416 bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
417 bool newSampleAesKeyItem = newKey || newInitVec;
418 ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
419 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
420
421 if (newSampleAesKeyItem) {
422 memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
423 memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
424
425 if (method == "SAMPLE-AES") {
426 mSampleAesKeyItemChanged = true;
427
428 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
429 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
430
431 // always allocating a new one rather than updating the old message
432 // lower layer might still have a reference to the old message
433 mSampleAesKeyItem = new AMessage();
434 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
435 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
436
437 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s IV: %s",
438 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
439 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
440 } // SAMPLE-AES
441 } // newSampleAesKeyItem
442 } // first
443
444 if (method == "SAMPLE-AES") {
445 ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
446 return OK;
447 }
448
449
450 AES_KEY aes_key;
451 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
452 ALOGE("failed to set AES decryption key.");
453 return UNKNOWN_ERROR;
454 }
455
456 size_t n = buffer->size();
457 if (!n) {
458 return OK;
459 }
460
461 if (n < 16 || n % 16) {
462 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
463 return ERROR_MALFORMED;
464 }
465
466 AES_cbc_encrypt(
467 buffer->data(), buffer->data(), buffer->size(),
468 &aes_key, mAESInitVec, AES_DECRYPT);
469
470 return OK;
471 }
472
checkDecryptPadding(const sp<ABuffer> & buffer)473 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
474 AString method;
475 CHECK(buffer->meta()->findString("cipher-method", &method));
476 if (method == "NONE" || method == "SAMPLE-AES") {
477 return OK;
478 }
479
480 uint8_t padding = 0;
481 if (buffer->size() > 0) {
482 padding = buffer->data()[buffer->size() - 1];
483 }
484
485 if (padding > 16) {
486 return ERROR_MALFORMED;
487 }
488
489 for (size_t i = buffer->size() - padding; i < padding; i++) {
490 if (buffer->data()[i] != padding) {
491 return ERROR_MALFORMED;
492 }
493 }
494
495 buffer->setRange(buffer->offset(), buffer->size() - padding);
496 return OK;
497 }
498
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)499 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
500 int64_t maxDelayUs = delayUsToRefreshPlaylist();
501 if (maxDelayUs < minDelayUs) {
502 maxDelayUs = minDelayUs;
503 }
504 if (delayUs > maxDelayUs) {
505 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
506 delayUs = maxDelayUs;
507 }
508 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
509 msg->setInt32("generation", mMonitorQueueGeneration);
510 msg->post(delayUs);
511 }
512
cancelMonitorQueue()513 void PlaylistFetcher::cancelMonitorQueue() {
514 ++mMonitorQueueGeneration;
515 }
516
setStoppingThreshold(float thresholdRatio,bool disconnect)517 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
518 {
519 AutoMutex _l(mThresholdLock);
520 mThresholdRatio = thresholdRatio;
521 }
522 if (disconnect) {
523 mHTTPDownloader->disconnect();
524 }
525 }
526
resetStoppingThreshold(bool disconnect)527 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
528 {
529 AutoMutex _l(mThresholdLock);
530 mThresholdRatio = -1.0f;
531 }
532 if (disconnect) {
533 mHTTPDownloader->disconnect();
534 } else {
535 // allow reconnect
536 mHTTPDownloader->reconnect();
537 }
538 }
539
getStoppingThreshold()540 float PlaylistFetcher::getStoppingThreshold() {
541 AutoMutex _l(mThresholdLock);
542 return mThresholdRatio;
543 }
544
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)545 void PlaylistFetcher::startAsync(
546 const sp<AnotherPacketSource> &audioSource,
547 const sp<AnotherPacketSource> &videoSource,
548 const sp<AnotherPacketSource> &subtitleSource,
549 const sp<AnotherPacketSource> &metadataSource,
550 int64_t startTimeUs,
551 int64_t segmentStartTimeUs,
552 int32_t startDiscontinuitySeq,
553 LiveSession::SeekMode seekMode) {
554 sp<AMessage> msg = new AMessage(kWhatStart, this);
555
556 uint32_t streamTypeMask = 0ul;
557
558 if (audioSource != NULL) {
559 msg->setPointer("audioSource", audioSource.get());
560 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
561 }
562
563 if (videoSource != NULL) {
564 msg->setPointer("videoSource", videoSource.get());
565 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
566 }
567
568 if (subtitleSource != NULL) {
569 msg->setPointer("subtitleSource", subtitleSource.get());
570 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
571 }
572
573 if (metadataSource != NULL) {
574 msg->setPointer("metadataSource", metadataSource.get());
575 // metadataSource does not affect streamTypeMask.
576 }
577
578 msg->setInt32("streamTypeMask", streamTypeMask);
579 msg->setInt64("startTimeUs", startTimeUs);
580 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
581 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
582 msg->setInt32("seekMode", seekMode);
583 msg->post();
584 }
585
586 /*
587 * pauseAsync
588 *
589 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
590 * -1.0f - pause after finishing current segment
591 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
592 */
pauseAsync(float thresholdRatio,bool disconnect)593 void PlaylistFetcher::pauseAsync(
594 float thresholdRatio, bool disconnect) {
595 setStoppingThreshold(thresholdRatio, disconnect);
596
597 (new AMessage(kWhatPause, this))->post();
598 }
599
stopAsync(bool clear)600 void PlaylistFetcher::stopAsync(bool clear) {
601 setStoppingThreshold(0.0f, true /* disconncect */);
602
603 sp<AMessage> msg = new AMessage(kWhatStop, this);
604 msg->setInt32("clear", clear);
605 msg->post();
606 }
607
resumeUntilAsync(const sp<AMessage> & params)608 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
609 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
610
611 AMessage* msg = new AMessage(kWhatResumeUntil, this);
612 msg->setMessage("params", params);
613 msg->post();
614 }
615
fetchPlaylistAsync()616 void PlaylistFetcher::fetchPlaylistAsync() {
617 (new AMessage(kWhatFetchPlaylist, this))->post();
618 }
619
onMessageReceived(const sp<AMessage> & msg)620 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
621 switch (msg->what()) {
622 case kWhatStart:
623 {
624 status_t err = onStart(msg);
625
626 sp<AMessage> notify = mNotify->dup();
627 notify->setInt32("what", kWhatStarted);
628 notify->setInt32("err", err);
629 notify->post();
630 break;
631 }
632
633 case kWhatPause:
634 {
635 onPause();
636
637 sp<AMessage> notify = mNotify->dup();
638 notify->setInt32("what", kWhatPaused);
639 notify->setInt32("seekMode",
640 mDownloadState->hasSavedState()
641 ? LiveSession::kSeekModeNextSample
642 : LiveSession::kSeekModeNextSegment);
643 notify->post();
644 break;
645 }
646
647 case kWhatStop:
648 {
649 onStop(msg);
650
651 sp<AMessage> notify = mNotify->dup();
652 notify->setInt32("what", kWhatStopped);
653 notify->post();
654 break;
655 }
656
657 case kWhatFetchPlaylist:
658 {
659 bool unchanged;
660 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
661 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
662
663 sp<AMessage> notify = mNotify->dup();
664 notify->setInt32("what", kWhatPlaylistFetched);
665 notify->setObject("playlist", playlist);
666 notify->post();
667 break;
668 }
669
670 case kWhatMonitorQueue:
671 case kWhatDownloadNext:
672 {
673 int32_t generation;
674 CHECK(msg->findInt32("generation", &generation));
675
676 if (generation != mMonitorQueueGeneration) {
677 // Stale event
678 break;
679 }
680
681 if (msg->what() == kWhatMonitorQueue) {
682 onMonitorQueue();
683 } else {
684 onDownloadNext();
685 }
686 break;
687 }
688
689 case kWhatResumeUntil:
690 {
691 onResumeUntil(msg);
692 break;
693 }
694
695 default:
696 TRESPASS();
697 }
698 }
699
onStart(const sp<AMessage> & msg)700 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
701 mPacketSources.clear();
702 mStopParams.clear();
703 mStartTimeUsNotify = mNotify->dup();
704 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
705 mStartTimeUsNotify->setString("uri", mURI);
706
707 uint32_t streamTypeMask;
708 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
709
710 int64_t startTimeUs;
711 int64_t segmentStartTimeUs;
712 int32_t startDiscontinuitySeq;
713 int32_t seekMode;
714 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
715 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
716 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
717 CHECK(msg->findInt32("seekMode", &seekMode));
718
719 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
720 void *ptr;
721 CHECK(msg->findPointer("audioSource", &ptr));
722
723 mPacketSources.add(
724 LiveSession::STREAMTYPE_AUDIO,
725 static_cast<AnotherPacketSource *>(ptr));
726 }
727
728 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
729 void *ptr;
730 CHECK(msg->findPointer("videoSource", &ptr));
731
732 mPacketSources.add(
733 LiveSession::STREAMTYPE_VIDEO,
734 static_cast<AnotherPacketSource *>(ptr));
735 }
736
737 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
738 void *ptr;
739 CHECK(msg->findPointer("subtitleSource", &ptr));
740
741 mPacketSources.add(
742 LiveSession::STREAMTYPE_SUBTITLES,
743 static_cast<AnotherPacketSource *>(ptr));
744 }
745
746 void *ptr;
747 // metadataSource is not part of streamTypeMask
748 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
749 && msg->findPointer("metadataSource", &ptr)) {
750 mPacketSources.add(
751 LiveSession::STREAMTYPE_METADATA,
752 static_cast<AnotherPacketSource *>(ptr));
753 }
754
755 mStreamTypeMask = streamTypeMask;
756
757 mSegmentStartTimeUs = segmentStartTimeUs;
758
759 if (startDiscontinuitySeq >= 0) {
760 mDiscontinuitySeq = startDiscontinuitySeq;
761 }
762
763 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
764 mSeekMode = (LiveSession::SeekMode) seekMode;
765
766 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
767 mStartup = true;
768 mIDRFound = false;
769 mVideoBuffer->clear();
770 }
771
772 if (startTimeUs >= 0) {
773 mStartTimeUs = startTimeUs;
774 mFirstPTSValid = false;
775 mSeqNumber = -1;
776 mTimeChangeSignaled = false;
777 mDownloadState->resetState();
778 }
779
780 postMonitorQueue();
781
782 return OK;
783 }
784
onPause()785 void PlaylistFetcher::onPause() {
786 cancelMonitorQueue();
787 mLastDiscontinuitySeq = mDiscontinuitySeq;
788
789 resetStoppingThreshold(false /* disconnect */);
790 }
791
onStop(const sp<AMessage> & msg)792 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
793 cancelMonitorQueue();
794
795 int32_t clear;
796 CHECK(msg->findInt32("clear", &clear));
797 if (clear) {
798 for (size_t i = 0; i < mPacketSources.size(); i++) {
799 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
800 packetSource->clear();
801 }
802 }
803
804 mDownloadState->resetState();
805 mPacketSources.clear();
806 mStreamTypeMask = 0;
807
808 resetStoppingThreshold(true /* disconnect */);
809 }
810
811 // Resume until we have reached the boundary timestamps listed in `msg`; when
812 // the remaining time is too short (within a resume threshold) stop immediately
813 // instead.
onResumeUntil(const sp<AMessage> & msg)814 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
815 sp<AMessage> params;
816 CHECK(msg->findMessage("params", ¶ms));
817
818 mStopParams = params;
819 onDownloadNext();
820
821 return OK;
822 }
823
notifyStopReached()824 void PlaylistFetcher::notifyStopReached() {
825 sp<AMessage> notify = mNotify->dup();
826 notify->setInt32("what", kWhatStopReached);
827 notify->post();
828 }
829
notifyError(status_t err)830 void PlaylistFetcher::notifyError(status_t err) {
831 sp<AMessage> notify = mNotify->dup();
832 notify->setInt32("what", kWhatError);
833 notify->setInt32("err", err);
834 notify->post();
835 }
836
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)837 void PlaylistFetcher::queueDiscontinuity(
838 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
839 for (size_t i = 0; i < mPacketSources.size(); ++i) {
840 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
841 // (seek will discard buffer by abandoning old fetchers)
842 mPacketSources.valueAt(i)->queueDiscontinuity(
843 type, extra, false /* discard */);
844 }
845 }
846
onMonitorQueue()847 void PlaylistFetcher::onMonitorQueue() {
848 // in the middle of an unfinished download, delay
849 // playlist refresh as it'll change seq numbers
850 if (!mDownloadState->hasSavedState()) {
851 refreshPlaylist();
852 }
853
854 int64_t targetDurationUs = kMinBufferedDurationUs;
855 if (mPlaylist != NULL) {
856 targetDurationUs = mPlaylist->getTargetDuration();
857 }
858
859 int64_t bufferedDurationUs = 0ll;
860 status_t finalResult = OK;
861 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
862 sp<AnotherPacketSource> packetSource =
863 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
864
865 bufferedDurationUs =
866 packetSource->getBufferedDurationUs(&finalResult);
867 } else {
868 // Use min stream duration, but ignore streams that never have any packet
869 // enqueued to prevent us from waiting on a non-existent stream;
870 // when we cannot make out from the manifest what streams are included in
871 // a playlist we might assume extra streams.
872 bufferedDurationUs = -1ll;
873 for (size_t i = 0; i < mPacketSources.size(); ++i) {
874 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
875 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
876 continue;
877 }
878
879 int64_t bufferedStreamDurationUs =
880 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
881
882 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
883
884 if (bufferedDurationUs == -1ll
885 || bufferedStreamDurationUs < bufferedDurationUs) {
886 bufferedDurationUs = bufferedStreamDurationUs;
887 }
888 }
889 if (bufferedDurationUs == -1ll) {
890 bufferedDurationUs = 0ll;
891 }
892 }
893
894 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
895 FLOGV("monitoring, buffered=%lld < %lld",
896 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
897
898 // delay the next download slightly; hopefully this gives other concurrent fetchers
899 // a better chance to run.
900 // onDownloadNext();
901 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
902 msg->setInt32("generation", mMonitorQueueGeneration);
903 msg->post(1000l);
904 } else {
905 // We'd like to maintain buffering above durationToBufferUs, so try
906 // again when buffer just about to go below durationToBufferUs
907 // (or after targetDurationUs / 2, whichever is smaller).
908 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
909 if (delayUs > targetDurationUs / 2) {
910 delayUs = targetDurationUs / 2;
911 }
912
913 FLOGV("pausing for %lld, buffered=%lld > %lld",
914 (long long)delayUs,
915 (long long)bufferedDurationUs,
916 (long long)kMinBufferedDurationUs);
917
918 postMonitorQueue(delayUs);
919 }
920 }
921
refreshPlaylist()922 status_t PlaylistFetcher::refreshPlaylist() {
923 if (delayUsToRefreshPlaylist() <= 0) {
924 bool unchanged;
925 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
926 mURI.c_str(), mPlaylistHash, &unchanged);
927
928 if (playlist == NULL) {
929 if (unchanged) {
930 // We succeeded in fetching the playlist, but it was
931 // unchanged from the last time we tried.
932
933 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
934 mRefreshState = (RefreshState)(mRefreshState + 1);
935 }
936 } else {
937 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
938 return ERROR_IO;
939 }
940 } else {
941 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
942 mPlaylist = playlist;
943
944 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
945 updateDuration();
946 }
947 // Notify LiveSession to use target-duration based buffering level
948 // for up/down switch. Default LiveSession::kUpSwitchMark may not
949 // be reachable for live streams, as our max buffering amount is
950 // limited to 3 segments.
951 if (!mPlaylist->isComplete()) {
952 updateTargetDuration();
953 }
954 mPlaylistTimeUs = ALooper::GetNowUs();
955 }
956
957 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
958 }
959 return OK;
960 }
961
962 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)963 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
964 return buffer->size() > 0 && buffer->data()[0] == 0x47;
965 }
966
shouldPauseDownload()967 bool PlaylistFetcher::shouldPauseDownload() {
968 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
969 // doesn't apply to subtitles
970 return false;
971 }
972
973 // Calculate threshold to abort current download
974 float thresholdRatio = getStoppingThreshold();
975
976 if (thresholdRatio < 0.0f) {
977 // never abort
978 return false;
979 } else if (thresholdRatio == 0.0f) {
980 // immediately abort
981 return true;
982 }
983
984 // now we have a positive thresholdUs, abort if remaining
985 // portion to download is over that threshold.
986 if (mSegmentFirstPTS < 0) {
987 // this means we haven't even find the first access unit,
988 // abort now as we must be very far away from the end.
989 return true;
990 }
991 int64_t lastEnqueueUs = mSegmentFirstPTS;
992 for (size_t i = 0; i < mPacketSources.size(); ++i) {
993 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
994 continue;
995 }
996 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
997 int32_t type;
998 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
999 continue;
1000 }
1001 int64_t tmpUs;
1002 CHECK(meta->findInt64("timeUs", &tmpUs));
1003 if (tmpUs > lastEnqueueUs) {
1004 lastEnqueueUs = tmpUs;
1005 }
1006 }
1007 lastEnqueueUs -= mSegmentFirstPTS;
1008
1009 int64_t targetDurationUs = mPlaylist->getTargetDuration();
1010 int64_t thresholdUs = thresholdRatio * targetDurationUs;
1011
1012 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
1013 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1014 (long long)thresholdUs,
1015 (long long)(targetDurationUs - lastEnqueueUs));
1016
1017 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1018 return true;
1019 }
1020 return false;
1021 }
1022
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1023 void PlaylistFetcher::initSeqNumberForLiveStream(
1024 int32_t &firstSeqNumberInPlaylist,
1025 int32_t &lastSeqNumberInPlaylist) {
1026 // start at least 3 target durations from the end.
1027 int64_t timeFromEnd = 0;
1028 size_t index = mPlaylist->size();
1029 sp<AMessage> itemMeta;
1030 int64_t itemDurationUs;
1031 int32_t targetDuration;
1032 if (mPlaylist->meta() != NULL
1033 && mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1034 do {
1035 --index;
1036 if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1037 || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1038 ALOGW("item or itemDurationUs missing");
1039 mSeqNumber = lastSeqNumberInPlaylist - 3;
1040 break;
1041 }
1042
1043 timeFromEnd += itemDurationUs;
1044 mSeqNumber = firstSeqNumberInPlaylist + index;
1045 } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1046 } else {
1047 ALOGW("target-duration missing");
1048 mSeqNumber = lastSeqNumberInPlaylist - 3;
1049 }
1050
1051 if (mSeqNumber < firstSeqNumberInPlaylist) {
1052 mSeqNumber = firstSeqNumberInPlaylist;
1053 }
1054 }
1055
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1056 bool PlaylistFetcher::initDownloadState(
1057 AString &uri,
1058 sp<AMessage> &itemMeta,
1059 int32_t &firstSeqNumberInPlaylist,
1060 int32_t &lastSeqNumberInPlaylist) {
1061 status_t err = refreshPlaylist();
1062 firstSeqNumberInPlaylist = 0;
1063 lastSeqNumberInPlaylist = 0;
1064 bool discontinuity = false;
1065
1066 if (mPlaylist != NULL) {
1067 mPlaylist->getSeqNumberRange(
1068 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1069
1070 if (mDiscontinuitySeq < 0) {
1071 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1072 }
1073 }
1074
1075 mSegmentFirstPTS = -1ll;
1076
1077 if (mPlaylist != NULL && mSeqNumber < 0) {
1078 CHECK_GE(mStartTimeUs, 0ll);
1079
1080 if (mSegmentStartTimeUs < 0) {
1081 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1082 // this is a live session
1083 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1084 } else {
1085 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1086 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1087 // and relative position inside a segment
1088 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1089 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1090 }
1091 mStartTimeUsRelative = true;
1092 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1093 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1094 lastSeqNumberInPlaylist);
1095 } else {
1096 // When adapting or track switching, mSegmentStartTimeUs (relative
1097 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1098 // timestamps coming from the media container) is used to determine the position
1099 // inside a segments.
1100 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1101 && mSeekMode != LiveSession::kSeekModeNextSample) {
1102 // avoid double fetch/decode
1103 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1104 // for the starting segment in new variant.
1105 // If the two variants' segments are aligned, this gives the
1106 // next segment. If they're not aligned, this gives the segment
1107 // that overlaps no more than 1/2 * targetDurationUs.
1108 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1109 + mPlaylist->getTargetDuration() / 2);
1110 } else {
1111 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1112 }
1113 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1114 if (mSeqNumber < minSeq) {
1115 mSeqNumber = minSeq;
1116 }
1117
1118 if (mSeqNumber < firstSeqNumberInPlaylist) {
1119 mSeqNumber = firstSeqNumberInPlaylist;
1120 }
1121
1122 if (mSeqNumber > lastSeqNumberInPlaylist) {
1123 mSeqNumber = lastSeqNumberInPlaylist;
1124 }
1125 FLOGV("Initial sequence number is %d from (%d .. %d)",
1126 mSeqNumber, firstSeqNumberInPlaylist,
1127 lastSeqNumberInPlaylist);
1128 }
1129 }
1130
1131 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1132 if (mSeqNumber < firstSeqNumberInPlaylist
1133 || mSeqNumber > lastSeqNumberInPlaylist
1134 || err != OK) {
1135 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1136 ++mNumRetries;
1137
1138 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1139 // make sure we reach this retry logic on refresh failures
1140 // by adding an err != OK clause to all enclosing if's.
1141
1142 // refresh in increasing fraction (1/2, 1/3, ...) of the
1143 // playlist's target duration or 3 seconds, whichever is less
1144 int64_t delayUs = kMaxMonitorDelayUs;
1145 if (mPlaylist != NULL) {
1146 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1147 / (1 + mNumRetries);
1148 }
1149 if (delayUs > kMaxMonitorDelayUs) {
1150 delayUs = kMaxMonitorDelayUs;
1151 }
1152 FLOGV("sequence number high: %d from (%d .. %d), "
1153 "monitor in %lld (retry=%d)",
1154 mSeqNumber, firstSeqNumberInPlaylist,
1155 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1156 postMonitorQueue(delayUs);
1157 return false;
1158 }
1159
1160 if (err != OK) {
1161 notifyError(err);
1162 return false;
1163 }
1164
1165 // we've missed the boat, let's start 3 segments prior to the latest sequence
1166 // number available and signal a discontinuity.
1167
1168 ALOGI("We've missed the boat, restarting playback."
1169 " mStartup=%d, was looking for %d in %d-%d",
1170 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1171 lastSeqNumberInPlaylist);
1172 if (mStopParams != NULL) {
1173 // we should have kept on fetching until we hit the boundaries in mStopParams,
1174 // but since the segments we are supposed to fetch have already rolled off
1175 // the playlist, i.e. we have already missed the boat, we inevitably have to
1176 // skip.
1177 notifyStopReached();
1178 return false;
1179 }
1180 mSeqNumber = lastSeqNumberInPlaylist - 3;
1181 if (mSeqNumber < firstSeqNumberInPlaylist) {
1182 mSeqNumber = firstSeqNumberInPlaylist;
1183 }
1184 discontinuity = true;
1185
1186 // fall through
1187 } else {
1188 if (mPlaylist != NULL) {
1189 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1190 && !mPlaylist->isComplete()) {
1191 // Live playlists
1192 ALOGW("sequence number %d not yet available", mSeqNumber);
1193 postMonitorQueue(delayUsToRefreshPlaylist());
1194 return false;
1195 }
1196 ALOGE("Cannot find sequence number %d in playlist "
1197 "(contains %d - %d)",
1198 mSeqNumber, firstSeqNumberInPlaylist,
1199 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1200
1201 if (mTSParser != NULL) {
1202 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1203 // Use an empty buffer; we don't have any new data, just want to extract
1204 // potential new access units after flush. Reset mSeqNumber to
1205 // lastSeqNumberInPlaylist such that we set the correct access unit
1206 // properties in extractAndQueueAccessUnitsFromTs.
1207 sp<ABuffer> buffer = new ABuffer(0);
1208 mSeqNumber = lastSeqNumberInPlaylist;
1209 extractAndQueueAccessUnitsFromTs(buffer);
1210 }
1211 notifyError(ERROR_END_OF_STREAM);
1212 } else {
1213 // It's possible that we were never able to download the playlist.
1214 // In this case we should notify error, instead of EOS, as EOS during
1215 // prepare means we succeeded in downloading everything.
1216 ALOGE("Failed to download playlist!");
1217 notifyError(ERROR_IO);
1218 }
1219
1220 return false;
1221 }
1222 }
1223
1224 mNumRetries = 0;
1225
1226 CHECK(mPlaylist->itemAt(
1227 mSeqNumber - firstSeqNumberInPlaylist,
1228 &uri,
1229 &itemMeta));
1230
1231 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1232
1233 int32_t val;
1234 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1235 discontinuity = true;
1236 } else if (mLastDiscontinuitySeq >= 0
1237 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1238 // Seek jumped to a new discontinuity sequence. We need to signal
1239 // a format change to decoder. Decoder needs to shutdown and be
1240 // created again if seamless format change is unsupported.
1241 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1242 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1243 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1244 discontinuity = true;
1245 }
1246 mLastDiscontinuitySeq = -1;
1247
1248 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1249 // this avoids interleaved connections to the key and segment file.
1250 {
1251 sp<ABuffer> junk = new ABuffer(16);
1252 junk->setRange(0, 16);
1253 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1254 true /* first */);
1255 if (err == ERROR_NOT_CONNECTED) {
1256 return false;
1257 } else if (err != OK) {
1258 notifyError(err);
1259 return false;
1260 }
1261 }
1262
1263 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1264 // We need to signal a time discontinuity to ATSParser on the
1265 // first segment after start, or on a discontinuity segment.
1266 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1267 // to send the time discontinuity.
1268 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1269 // If this was a live event this made no sense since
1270 // we don't have access to all the segment before the current
1271 // one.
1272 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1273 }
1274
1275 // Setting mTimeChangeSignaled to true, so that if start time
1276 // searching goes into 2nd segment (without a discontinuity),
1277 // we don't reset time again. It causes corruption when pending
1278 // data in ATSParser is cleared.
1279 mTimeChangeSignaled = true;
1280 }
1281
1282 if (discontinuity) {
1283 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1284
1285 // Signal a format discontinuity to ATSParser to clear partial data
1286 // from previous streams. Not doing this causes bitstream corruption.
1287 if (mTSParser != NULL) {
1288 mTSParser.clear();
1289 }
1290
1291 queueDiscontinuity(
1292 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1293 NULL /* extra */);
1294
1295 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1296 // This means we guessed mStartTimeUs to be in the previous
1297 // segment (likely very close to the end), but either video or
1298 // audio has not found start by the end of that segment.
1299 //
1300 // If this new segment is not a discontinuity, keep searching.
1301 //
1302 // If this new segment even got a discontinuity marker, just
1303 // set mStartTimeUs=0, and take all samples from now on.
1304 mStartTimeUs = 0;
1305 mFirstPTSValid = false;
1306 mIDRFound = false;
1307 mVideoBuffer->clear();
1308 }
1309 }
1310
1311 FLOGV("fetching segment %d from (%d .. %d)",
1312 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1313 return true;
1314 }
1315
onDownloadNext()1316 void PlaylistFetcher::onDownloadNext() {
1317 AString uri;
1318 sp<AMessage> itemMeta;
1319 sp<ABuffer> buffer;
1320 sp<ABuffer> tsBuffer;
1321 int32_t firstSeqNumberInPlaylist = 0;
1322 int32_t lastSeqNumberInPlaylist = 0;
1323 bool connectHTTP = true;
1324
1325 if (mDownloadState->hasSavedState()) {
1326 mDownloadState->restoreState(
1327 uri,
1328 itemMeta,
1329 buffer,
1330 tsBuffer,
1331 firstSeqNumberInPlaylist,
1332 lastSeqNumberInPlaylist);
1333 connectHTTP = false;
1334 FLOGV("resuming: '%s'", uri.c_str());
1335 } else {
1336 if (!initDownloadState(
1337 uri,
1338 itemMeta,
1339 firstSeqNumberInPlaylist,
1340 lastSeqNumberInPlaylist)) {
1341 return;
1342 }
1343 FLOGV("fetching: '%s'", uri.c_str());
1344 }
1345
1346 int64_t range_offset, range_length;
1347 if (!itemMeta->findInt64("range-offset", &range_offset)
1348 || !itemMeta->findInt64("range-length", &range_length)) {
1349 range_offset = 0;
1350 range_length = -1;
1351 }
1352
1353 // block-wise download
1354 bool shouldPause = false;
1355 ssize_t bytesRead;
1356 do {
1357 int64_t startUs = ALooper::GetNowUs();
1358 bytesRead = mHTTPDownloader->fetchBlock(
1359 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1360 NULL /* actualURL */, connectHTTP);
1361 int64_t delayUs = ALooper::GetNowUs() - startUs;
1362
1363 if (bytesRead == ERROR_NOT_CONNECTED) {
1364 return;
1365 }
1366 if (bytesRead < 0) {
1367 status_t err = bytesRead;
1368 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1369 notifyError(err);
1370 return;
1371 }
1372
1373 // add sample for bandwidth estimation, excluding samples from subtitles (as
1374 // its too small), or during startup/resumeUntil (when we could have more than
1375 // one connection open which affects bandwidth)
1376 if (!mStartup && mStopParams == NULL && bytesRead > 0
1377 && (mStreamTypeMask
1378 & (LiveSession::STREAMTYPE_AUDIO
1379 | LiveSession::STREAMTYPE_VIDEO))) {
1380 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1381 if (delayUs > 2000000ll) {
1382 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1383 bytesRead, (double)delayUs / 1.0e6);
1384 }
1385 }
1386
1387 connectHTTP = false;
1388
1389 CHECK(buffer != NULL);
1390
1391 size_t size = buffer->size();
1392 // Set decryption range.
1393 buffer->setRange(size - bytesRead, bytesRead);
1394 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1395 buffer->offset() == 0 /* first */);
1396 // Unset decryption range.
1397 buffer->setRange(0, size);
1398
1399 if (err != OK) {
1400 ALOGE("decryptBuffer failed w/ error %d", err);
1401
1402 notifyError(err);
1403 return;
1404 }
1405
1406 bool startUp = mStartup; // save current start up state
1407
1408 err = OK;
1409 if (bufferStartsWithTsSyncByte(buffer)) {
1410 // Incremental extraction is only supported for MPEG2 transport streams.
1411 if (tsBuffer == NULL) {
1412 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1413 tsBuffer->setRange(0, 0);
1414 } else if (tsBuffer->capacity() != buffer->capacity()) {
1415 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1416 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1417 tsBuffer->setRange(tsOff, tsSize);
1418 }
1419 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1420 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1421 }
1422
1423 if (err == -EAGAIN) {
1424 // starting sequence number too low/high
1425 mTSParser.clear();
1426 for (size_t i = 0; i < mPacketSources.size(); i++) {
1427 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1428 packetSource->clear();
1429 }
1430 postMonitorQueue();
1431 return;
1432 } else if (err == ERROR_OUT_OF_RANGE) {
1433 // reached stopping point
1434 notifyStopReached();
1435 return;
1436 } else if (err != OK) {
1437 notifyError(err);
1438 return;
1439 }
1440 // If we're switching, post start notification
1441 // this should only be posted when the last chunk is full processed by TSParser
1442 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1443 CHECK(mStartTimeUsNotify != NULL);
1444 mStartTimeUsNotify->post();
1445 mStartTimeUsNotify.clear();
1446 shouldPause = true;
1447 }
1448 if (shouldPause || shouldPauseDownload()) {
1449 // save state and return if this is not the last chunk,
1450 // leaving the fetcher in paused state.
1451 if (bytesRead != 0) {
1452 mDownloadState->saveState(
1453 uri,
1454 itemMeta,
1455 buffer,
1456 tsBuffer,
1457 firstSeqNumberInPlaylist,
1458 lastSeqNumberInPlaylist);
1459 return;
1460 }
1461 shouldPause = true;
1462 }
1463 } while (bytesRead != 0);
1464
1465 if (bufferStartsWithTsSyncByte(buffer)) {
1466 // If we don't see a stream in the program table after fetching a full ts segment
1467 // mark it as nonexistent.
1468 ATSParser::SourceType srcTypes[] =
1469 { ATSParser::VIDEO, ATSParser::AUDIO };
1470 LiveSession::StreamType streamTypes[] =
1471 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1472 const size_t kNumTypes = NELEM(srcTypes);
1473
1474 for (size_t i = 0; i < kNumTypes; i++) {
1475 ATSParser::SourceType srcType = srcTypes[i];
1476 LiveSession::StreamType streamType = streamTypes[i];
1477
1478 sp<AnotherPacketSource> source =
1479 static_cast<AnotherPacketSource *>(
1480 mTSParser->getSource(srcType).get());
1481
1482 if (!mTSParser->hasSource(srcType)) {
1483 ALOGW("MPEG2 Transport stream does not contain %s data.",
1484 srcType == ATSParser::VIDEO ? "video" : "audio");
1485
1486 mStreamTypeMask &= ~streamType;
1487 mPacketSources.removeItem(streamType);
1488 }
1489 }
1490
1491 }
1492
1493 if (checkDecryptPadding(buffer) != OK) {
1494 ALOGE("Incorrect padding bytes after decryption.");
1495 notifyError(ERROR_MALFORMED);
1496 return;
1497 }
1498
1499 if (tsBuffer != NULL) {
1500 AString method;
1501 CHECK(buffer->meta()->findString("cipher-method", &method));
1502 if ((tsBuffer->size() > 0 && method == "NONE")
1503 || tsBuffer->size() > 16) {
1504 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1505 "bytes in length.");
1506 notifyError(ERROR_MALFORMED);
1507 return;
1508 }
1509 }
1510
1511 // bulk extract non-ts files
1512 bool startUp = mStartup;
1513 if (tsBuffer == NULL) {
1514 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1515 if (err == -EAGAIN) {
1516 // starting sequence number too low/high
1517 postMonitorQueue();
1518 return;
1519 } else if (err == ERROR_OUT_OF_RANGE) {
1520 // reached stopping point
1521 notifyStopReached();
1522 return;
1523 } else if (err != OK) {
1524 notifyError(err);
1525 return;
1526 }
1527 }
1528
1529 ++mSeqNumber;
1530
1531 // if adapting, pause after found the next starting point
1532 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1533 CHECK(mStartTimeUsNotify != NULL);
1534 mStartTimeUsNotify->post();
1535 mStartTimeUsNotify.clear();
1536 shouldPause = true;
1537 }
1538
1539 if (!shouldPause) {
1540 postMonitorQueue();
1541 }
1542 }
1543
1544 /*
1545 * returns true if we need to adjust mSeqNumber
1546 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1547 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1548 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1549
1550 int64_t minDiffUs, maxDiffUs;
1551 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1552 // if the previous fetcher paused in the middle of a segment, we
1553 // want to start at a segment that overlaps the last sample
1554 minDiffUs = -mPlaylist->getTargetDuration();
1555 maxDiffUs = 0ll;
1556 } else {
1557 // if the previous fetcher paused at the end of a segment, ideally
1558 // we want to start at the segment that's roughly aligned with its
1559 // next segment, but if the two variants are not well aligned we
1560 // adjust the diff to within (-T/2, T/2)
1561 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1562 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1563 }
1564
1565 int32_t oldSeqNumber = mSeqNumber;
1566 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1567
1568 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1569 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1570 if (diffUs > maxDiffUs) {
1571 while (index > 0 && diffUs > maxDiffUs) {
1572 --index;
1573
1574 sp<AMessage> itemMeta;
1575 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1576
1577 int64_t itemDurationUs;
1578 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1579
1580 diffUs -= itemDurationUs;
1581 }
1582 } else if (diffUs < minDiffUs) {
1583 while (index + 1 < (ssize_t) mPlaylist->size()
1584 && diffUs < minDiffUs) {
1585 ++index;
1586
1587 sp<AMessage> itemMeta;
1588 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1589
1590 int64_t itemDurationUs;
1591 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1592
1593 diffUs += itemDurationUs;
1594 }
1595 }
1596
1597 mSeqNumber = firstSeqNumberInPlaylist + index;
1598
1599 if (mSeqNumber != oldSeqNumber) {
1600 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1601 (long long) anchorTimeUs - mStartTimeUs,
1602 (long long) minDiffUs,
1603 (long long) maxDiffUs);
1604 return true;
1605 }
1606 return false;
1607 }
1608
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1609 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1610 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1611
1612 size_t index = 0;
1613 while (index < mPlaylist->size()) {
1614 sp<AMessage> itemMeta;
1615 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1616 size_t curDiscontinuitySeq;
1617 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1618 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1619 if (curDiscontinuitySeq == discontinuitySeq) {
1620 return seqNumber;
1621 } else if (curDiscontinuitySeq > discontinuitySeq) {
1622 return seqNumber <= 0 ? 0 : seqNumber - 1;
1623 }
1624
1625 ++index;
1626 }
1627
1628 return firstSeqNumberInPlaylist + mPlaylist->size();
1629 }
1630
getSeqNumberForTime(int64_t timeUs) const1631 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1632 size_t index = 0;
1633 int64_t segmentStartUs = 0;
1634 while (index < mPlaylist->size()) {
1635 sp<AMessage> itemMeta;
1636 CHECK(mPlaylist->itemAt(
1637 index, NULL /* uri */, &itemMeta));
1638
1639 int64_t itemDurationUs;
1640 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1641
1642 if (timeUs < segmentStartUs + itemDurationUs) {
1643 break;
1644 }
1645
1646 segmentStartUs += itemDurationUs;
1647 ++index;
1648 }
1649
1650 if (index >= mPlaylist->size()) {
1651 index = mPlaylist->size() - 1;
1652 }
1653
1654 return mPlaylist->getFirstSeqNumber() + index;
1655 }
1656
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1657 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1658 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1659 sp<MetaData> format = source->getFormat();
1660 if (format != NULL) {
1661 // for simplicity, store a reference to the format in each unit
1662 accessUnit->meta()->setObject("format", format);
1663 }
1664
1665 if (discard) {
1666 accessUnit->meta()->setInt32("discard", discard);
1667 }
1668
1669 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1670 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1671 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1672 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1673 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1674 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1675 }
1676 return accessUnit;
1677 }
1678
isStartTimeReached(int64_t timeUs)1679 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1680 if (!mFirstPTSValid) {
1681 mFirstTimeUs = timeUs;
1682 mFirstPTSValid = true;
1683 }
1684 bool startTimeReached = true;
1685 if (mStartTimeUsRelative) {
1686 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1687 (long long)timeUs,
1688 (long long)mFirstTimeUs,
1689 (long long)(timeUs - mFirstTimeUs));
1690 timeUs -= mFirstTimeUs;
1691 if (timeUs < 0) {
1692 FLOGV("clamp negative timeUs to 0");
1693 timeUs = 0;
1694 }
1695 startTimeReached = (timeUs >= mStartTimeUs);
1696 }
1697 return startTimeReached;
1698 }
1699
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1700 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1701 if (mTSParser == NULL) {
1702 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1703 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1704 }
1705
1706 if (mNextPTSTimeUs >= 0ll) {
1707 sp<AMessage> extra = new AMessage;
1708 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1709 // ATSParser from skewing the timestamps of access units.
1710 extra->setInt64(kATSParserKeyMediaTimeUs, 0);
1711
1712 // When adapting, signal a recent media time to the parser,
1713 // so that PTS wrap around is handled for the new variant.
1714 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1715 extra->setInt64(kATSParserKeyRecentMediaTimeUs, mStartTimeUs);
1716 }
1717
1718 mTSParser->signalDiscontinuity(
1719 ATSParser::DISCONTINUITY_TIME, extra);
1720
1721 mNextPTSTimeUs = -1ll;
1722 }
1723
1724 if (mSampleAesKeyItemChanged) {
1725 mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1726 mSampleAesKeyItemChanged = false;
1727 }
1728
1729 size_t offset = 0;
1730 while (offset + 188 <= buffer->size()) {
1731 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1732
1733 if (err != OK) {
1734 return err;
1735 }
1736
1737 offset += 188;
1738 }
1739 // setRange to indicate consumed bytes.
1740 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1741
1742 if (mSegmentFirstPTS < 0ll) {
1743 // get the smallest first PTS from all streams present in this parser
1744 for (size_t i = mPacketSources.size(); i > 0;) {
1745 i--;
1746 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1747 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1748 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1749 return ERROR_MALFORMED;
1750 }
1751 if (stream == LiveSession::STREAMTYPE_METADATA) {
1752 continue;
1753 }
1754 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1755 sp<AnotherPacketSource> source =
1756 static_cast<AnotherPacketSource *>(
1757 mTSParser->getSource(type).get());
1758
1759 if (source == NULL) {
1760 continue;
1761 }
1762 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1763 if (meta != NULL) {
1764 int64_t timeUs;
1765 CHECK(meta->findInt64("timeUs", &timeUs));
1766 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1767 mSegmentFirstPTS = timeUs;
1768 }
1769 }
1770 }
1771 if (mSegmentFirstPTS < 0ll) {
1772 // didn't find any TS packet, can return early
1773 return OK;
1774 }
1775 if (!mStartTimeUsRelative) {
1776 // mStartup
1777 // mStartup is true until we have queued a packet for all the streams
1778 // we are fetching. We queue packets whose timestamps are greater than
1779 // mStartTimeUs.
1780 // mSegmentStartTimeUs >= 0
1781 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1782 // adjustSeqNumberWithAnchorTime(timeUs) == true
1783 // we guessed a seq number that's either too large or too small.
1784 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1785 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1786 // to -1 so that we don't enter this chunk next time.
1787 if (mStartup && mSegmentStartTimeUs >= 0
1788 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1789 mStartTimeUsNotify = mNotify->dup();
1790 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1791 mStartTimeUsNotify->setString("uri", mURI);
1792 mIDRFound = false;
1793 mSegmentStartTimeUs = -1;
1794 return -EAGAIN;
1795 }
1796 }
1797 }
1798
1799 status_t err = OK;
1800 for (size_t i = mPacketSources.size(); i > 0;) {
1801 i--;
1802 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1803
1804 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1805 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1806 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1807 return ERROR_MALFORMED;
1808 }
1809
1810 const char *key = LiveSession::getKeyForStream(stream);
1811 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1812
1813 sp<AnotherPacketSource> source =
1814 static_cast<AnotherPacketSource *>(
1815 mTSParser->getSource(type).get());
1816
1817 if (source == NULL) {
1818 continue;
1819 }
1820
1821 const char *mime;
1822 sp<MetaData> format = source->getFormat();
1823 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1824 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1825
1826 sp<ABuffer> accessUnit;
1827 status_t finalResult;
1828 while (source->hasBufferAvailable(&finalResult)
1829 && source->dequeueAccessUnit(&accessUnit) == OK) {
1830
1831 int64_t timeUs;
1832 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1833
1834 if (mStartup) {
1835 bool startTimeReached = isStartTimeReached(timeUs);
1836
1837 if (!startTimeReached || (isAvc && !mIDRFound)) {
1838 // buffer up to the closest preceding IDR frame in the next segement,
1839 // or the closest succeeding IDR frame after the exact position
1840 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1841 (long long)timeUs,
1842 (long long)mStartTimeUs,
1843 (long long)timeUs - mStartTimeUs,
1844 mIDRFound);
1845 if (isAvc) {
1846 if (IsIDR(accessUnit->data(), accessUnit->size())) {
1847 mVideoBuffer->clear();
1848 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1849 mIDRFound = true;
1850 }
1851 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1852 mVideoBuffer->queueAccessUnit(accessUnit);
1853 FSLOGV(stream, "saving AVC video AccessUnit");
1854 }
1855 }
1856 if (!startTimeReached || (isAvc && !mIDRFound)) {
1857 continue;
1858 }
1859 }
1860 }
1861
1862 if (mStartTimeUsNotify != NULL) {
1863 uint32_t streamMask = 0;
1864 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1865 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1866 && !(streamMask & mPacketSources.keyAt(i))) {
1867 streamMask |= mPacketSources.keyAt(i);
1868 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1869 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1870 (long long)timeUs, streamMask);
1871
1872 if (streamMask == mStreamTypeMask) {
1873 FLOGV("found start point for all streams");
1874 mStartup = false;
1875 }
1876 }
1877 }
1878
1879 if (mStopParams != NULL) {
1880 int32_t discontinuitySeq;
1881 int64_t stopTimeUs;
1882 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1883 || discontinuitySeq > mDiscontinuitySeq
1884 || !mStopParams->findInt64(key, &stopTimeUs)
1885 || (discontinuitySeq == mDiscontinuitySeq
1886 && timeUs >= stopTimeUs)) {
1887 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1888 mStreamTypeMask &= ~stream;
1889 mPacketSources.removeItemsAt(i);
1890 break;
1891 }
1892 }
1893
1894 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1895 const bool discard = true;
1896 status_t status;
1897 while (mVideoBuffer->hasBufferAvailable(&status)) {
1898 sp<ABuffer> videoBuffer;
1899 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1900 setAccessUnitProperties(videoBuffer, source, discard);
1901 packetSource->queueAccessUnit(videoBuffer);
1902 int64_t bufferTimeUs;
1903 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1904 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1905 (long long)bufferTimeUs);
1906 }
1907 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1908 mHasMetadata = true;
1909 sp<AMessage> notify = mNotify->dup();
1910 notify->setInt32("what", kWhatMetadataDetected);
1911 notify->post();
1912 }
1913
1914 setAccessUnitProperties(accessUnit, source);
1915 packetSource->queueAccessUnit(accessUnit);
1916 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1917 }
1918
1919 if (err != OK) {
1920 break;
1921 }
1922 }
1923
1924 if (err != OK) {
1925 for (size_t i = mPacketSources.size(); i > 0;) {
1926 i--;
1927 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1928 packetSource->clear();
1929 }
1930 return err;
1931 }
1932
1933 if (!mStreamTypeMask) {
1934 // Signal gap is filled between original and new stream.
1935 FLOGV("reached stop point for all streams");
1936 return ERROR_OUT_OF_RANGE;
1937 }
1938
1939 return OK;
1940 }
1941
1942 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1943 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1944 const sp<ABuffer> &buffer) {
1945 size_t pos = 0;
1946
1947 // skip possible BOM
1948 if (buffer->size() >= pos + 3 &&
1949 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1950 pos += 3;
1951 }
1952
1953 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1954 if (buffer->size() < pos + 6 ||
1955 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1956 return false;
1957 }
1958 pos += 6;
1959
1960 if (buffer->size() == pos) {
1961 return true;
1962 }
1963
1964 uint8_t sep = buffer->data()[pos];
1965 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1966 }
1967
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1968 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1969 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1970 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1971 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1972 ALOGE("This stream only contains subtitles.");
1973 return ERROR_MALFORMED;
1974 }
1975
1976 const sp<AnotherPacketSource> packetSource =
1977 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1978
1979 int64_t durationUs;
1980 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1981 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1982 buffer->meta()->setInt64("durationUs", durationUs);
1983 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1984 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1985 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1986 packetSource->queueAccessUnit(buffer);
1987 return OK;
1988 }
1989
1990 if (mNextPTSTimeUs >= 0ll) {
1991 mNextPTSTimeUs = -1ll;
1992 }
1993
1994 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1995 // stream prefixed by an ID3 tag.
1996
1997 bool firstID3Tag = true;
1998 uint64_t PTS = 0;
1999
2000 for (;;) {
2001 // Make sure to skip all ID3 tags preceding the audio data.
2002 // At least one must be present to provide the PTS timestamp.
2003
2004 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
2005 if (!id3.isValid()) {
2006 if (firstID3Tag) {
2007 ALOGE("Unable to parse ID3 tag.");
2008 return ERROR_MALFORMED;
2009 } else {
2010 break;
2011 }
2012 }
2013
2014 if (firstID3Tag) {
2015 bool found = false;
2016
2017 ID3::Iterator it(id3, "PRIV");
2018 while (!it.done()) {
2019 size_t length;
2020 const uint8_t *data = it.getData(&length);
2021 if (!data) {
2022 return ERROR_MALFORMED;
2023 }
2024
2025 static const char *kMatchName =
2026 "com.apple.streaming.transportStreamTimestamp";
2027 static const size_t kMatchNameLen = strlen(kMatchName);
2028
2029 if (length == kMatchNameLen + 1 + 8
2030 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2031 found = true;
2032 PTS = U64_AT(&data[kMatchNameLen + 1]);
2033 }
2034
2035 it.next();
2036 }
2037
2038 if (!found) {
2039 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2040 return ERROR_MALFORMED;
2041 }
2042 }
2043
2044 // skip the ID3 tag
2045 buffer->setRange(
2046 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2047
2048 firstID3Tag = false;
2049 }
2050
2051 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2052 ALOGW("This stream only contains audio data!");
2053
2054 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2055
2056 if (mStreamTypeMask == 0) {
2057 return OK;
2058 }
2059 }
2060
2061 sp<AnotherPacketSource> packetSource =
2062 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2063
2064 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2065 ABitReader bits(buffer->data(), buffer->size());
2066
2067 // adts_fixed_header
2068
2069 CHECK_EQ(bits.getBits(12), 0xfffu);
2070 bits.skipBits(3); // ID, layer
2071 bool protection_absent __unused = bits.getBits(1) != 0;
2072
2073 unsigned profile = bits.getBits(2);
2074 CHECK_NE(profile, 3u);
2075 unsigned sampling_freq_index = bits.getBits(4);
2076 bits.getBits(1); // private_bit
2077 unsigned channel_configuration = bits.getBits(3);
2078 CHECK_NE(channel_configuration, 0u);
2079 bits.skipBits(2); // original_copy, home
2080
2081 sp<MetaData> meta = new MetaData();
2082 MakeAACCodecSpecificData(*meta,
2083 profile, sampling_freq_index, channel_configuration);
2084
2085 meta->setInt32(kKeyIsADTS, true);
2086
2087 packetSource->setFormat(meta);
2088 }
2089
2090 int64_t numSamples = 0ll;
2091 int32_t sampleRate;
2092 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2093
2094 int64_t timeUs = (PTS * 100ll) / 9ll;
2095 if (mStartup && !mFirstPTSValid) {
2096 mFirstPTSValid = true;
2097 mFirstTimeUs = timeUs;
2098 }
2099
2100 if (mSegmentFirstPTS < 0ll) {
2101 mSegmentFirstPTS = timeUs;
2102 if (!mStartTimeUsRelative) {
2103 // Duplicated logic from how we handle .ts playlists.
2104 if (mStartup && mSegmentStartTimeUs >= 0
2105 && adjustSeqNumberWithAnchorTime(timeUs)) {
2106 mSegmentStartTimeUs = -1;
2107 return -EAGAIN;
2108 }
2109 }
2110 }
2111
2112 sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2113 if (mSampleAesKeyItem != NULL) {
2114 ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s IV: %s",
2115 mSeqNumber,
2116 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2117 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2118
2119 sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2120 }
2121
2122 int frameId = 0;
2123
2124 size_t offset = 0;
2125 while (offset < buffer->size()) {
2126 const uint8_t *adtsHeader = buffer->data() + offset;
2127 CHECK_LT(offset + 5, buffer->size());
2128 // non-const pointer for decryption if needed
2129 uint8_t *adtsFrame = buffer->data() + offset;
2130
2131 unsigned aac_frame_length =
2132 ((adtsHeader[3] & 3) << 11)
2133 | (adtsHeader[4] << 3)
2134 | (adtsHeader[5] >> 5);
2135
2136 if (aac_frame_length == 0) {
2137 const uint8_t *id3Header = adtsHeader;
2138 if (!memcmp(id3Header, "ID3", 3)) {
2139 ID3 id3(id3Header, buffer->size() - offset, true);
2140 if (id3.isValid()) {
2141 offset += id3.rawSize();
2142 continue;
2143 };
2144 }
2145 return ERROR_MALFORMED;
2146 }
2147
2148 CHECK_LE(offset + aac_frame_length, buffer->size());
2149
2150 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2151 offset += aac_frame_length;
2152
2153 // Each AAC frame encodes 1024 samples.
2154 numSamples += 1024;
2155
2156 if (mStartup) {
2157 int64_t startTimeUs = unitTimeUs;
2158 if (mStartTimeUsRelative) {
2159 startTimeUs -= mFirstTimeUs;
2160 if (startTimeUs < 0) {
2161 startTimeUs = 0;
2162 }
2163 }
2164 if (startTimeUs < mStartTimeUs) {
2165 continue;
2166 }
2167
2168 if (mStartTimeUsNotify != NULL) {
2169 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2170 mStartup = false;
2171 }
2172 }
2173
2174 if (mStopParams != NULL) {
2175 int32_t discontinuitySeq;
2176 int64_t stopTimeUs;
2177 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2178 || discontinuitySeq > mDiscontinuitySeq
2179 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2180 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2181 mStreamTypeMask = 0;
2182 mPacketSources.clear();
2183 return ERROR_OUT_OF_RANGE;
2184 }
2185 }
2186
2187 if (sampleDecryptor != NULL) {
2188 bool protection_absent = (adtsHeader[1] & 0x1);
2189 size_t headerSize = protection_absent ? 7 : 9;
2190 if (frameId == 0) {
2191 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2192 mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2193 }
2194
2195 sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2196 }
2197 frameId++;
2198
2199 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2200 memcpy(unit->data(), adtsHeader, aac_frame_length);
2201
2202 unit->meta()->setInt64("timeUs", unitTimeUs);
2203 setAccessUnitProperties(unit, packetSource);
2204 packetSource->queueAccessUnit(unit);
2205 }
2206
2207 return OK;
2208 }
2209
updateDuration()2210 void PlaylistFetcher::updateDuration() {
2211 int64_t durationUs = 0ll;
2212 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2213 sp<AMessage> itemMeta;
2214 CHECK(mPlaylist->itemAt(
2215 index, NULL /* uri */, &itemMeta));
2216
2217 int64_t itemDurationUs;
2218 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2219
2220 durationUs += itemDurationUs;
2221 }
2222
2223 sp<AMessage> msg = mNotify->dup();
2224 msg->setInt32("what", kWhatDurationUpdate);
2225 msg->setInt64("durationUs", durationUs);
2226 msg->post();
2227 }
2228
updateTargetDuration()2229 void PlaylistFetcher::updateTargetDuration() {
2230 sp<AMessage> msg = mNotify->dup();
2231 msg->setInt32("what", kWhatTargetDurationUpdate);
2232 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2233 msg->post();
2234 }
2235
2236 } // namespace android
2237