1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/avc_utils.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29 #include "mpeg2ts/HlsSampleDecryptor.h"
30
31 #include <media/stagefright/foundation/ABitReader.h>
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37
38 #include <ctype.h>
39 #include <inttypes.h>
40
41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44
45 namespace android {
46
47 // static
48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50 // LCM of 188 (size of a TS packet) & 1k works well
51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52
53 struct PlaylistFetcher::DownloadState : public RefBase {
54 DownloadState();
55 void resetState();
56 bool hasSavedState() const;
57 void restoreState(
58 AString &uri,
59 sp<AMessage> &itemMeta,
60 sp<ABuffer> &buffer,
61 sp<ABuffer> &tsBuffer,
62 int32_t &firstSeqNumberInPlaylist,
63 int32_t &lastSeqNumberInPlaylist);
64 void saveState(
65 AString &uri,
66 sp<AMessage> &itemMeta,
67 sp<ABuffer> &buffer,
68 sp<ABuffer> &tsBuffer,
69 int32_t &firstSeqNumberInPlaylist,
70 int32_t &lastSeqNumberInPlaylist);
71
72 private:
73 bool mHasSavedState;
74 AString mUri;
75 sp<AMessage> mItemMeta;
76 sp<ABuffer> mBuffer;
77 sp<ABuffer> mTsBuffer;
78 int32_t mFirstSeqNumberInPlaylist;
79 int32_t mLastSeqNumberInPlaylist;
80 };
81
DownloadState()82 PlaylistFetcher::DownloadState::DownloadState() {
83 resetState();
84 }
85
hasSavedState() const86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
87 return mHasSavedState;
88 }
89
resetState()90 void PlaylistFetcher::DownloadState::resetState() {
91 mHasSavedState = false;
92
93 mUri.clear();
94 mItemMeta = NULL;
95 mBuffer = NULL;
96 mTsBuffer = NULL;
97 mFirstSeqNumberInPlaylist = 0;
98 mLastSeqNumberInPlaylist = 0;
99 }
100
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)101 void PlaylistFetcher::DownloadState::restoreState(
102 AString &uri,
103 sp<AMessage> &itemMeta,
104 sp<ABuffer> &buffer,
105 sp<ABuffer> &tsBuffer,
106 int32_t &firstSeqNumberInPlaylist,
107 int32_t &lastSeqNumberInPlaylist) {
108 if (!mHasSavedState) {
109 return;
110 }
111
112 uri = mUri;
113 itemMeta = mItemMeta;
114 buffer = mBuffer;
115 tsBuffer = mTsBuffer;
116 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118
119 resetState();
120 }
121
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)122 void PlaylistFetcher::DownloadState::saveState(
123 AString &uri,
124 sp<AMessage> &itemMeta,
125 sp<ABuffer> &buffer,
126 sp<ABuffer> &tsBuffer,
127 int32_t &firstSeqNumberInPlaylist,
128 int32_t &lastSeqNumberInPlaylist) {
129 mHasSavedState = true;
130
131 mUri = uri;
132 mItemMeta = itemMeta;
133 mBuffer = buffer;
134 mTsBuffer = tsBuffer;
135 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137 }
138
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)139 PlaylistFetcher::PlaylistFetcher(
140 const sp<AMessage> ¬ify,
141 const sp<LiveSession> &session,
142 const char *uri,
143 int32_t id,
144 int32_t subtitleGeneration)
145 : mNotify(notify),
146 mSession(session),
147 mURI(uri),
148 mFetcherID(id),
149 mStreamTypeMask(0),
150 mStartTimeUs(-1ll),
151 mSegmentStartTimeUs(-1ll),
152 mDiscontinuitySeq(-1ll),
153 mStartTimeUsRelative(false),
154 mLastPlaylistFetchTimeUs(-1ll),
155 mPlaylistTimeUs(-1ll),
156 mSeqNumber(-1),
157 mNumRetries(0),
158 mStartup(true),
159 mIDRFound(false),
160 mSeekMode(LiveSession::kSeekModeExactPosition),
161 mTimeChangeSignaled(false),
162 mNextPTSTimeUs(-1ll),
163 mMonitorQueueGeneration(0),
164 mSubtitleGeneration(subtitleGeneration),
165 mLastDiscontinuitySeq(-1ll),
166 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167 mFirstPTSValid(false),
168 mFirstTimeUs(-1ll),
169 mVideoBuffer(new AnotherPacketSource(NULL)),
170 mSampleAesKeyItemChanged(false),
171 mThresholdRatio(-1.0f),
172 mDownloadState(new DownloadState()),
173 mHasMetadata(false) {
174 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
175 mHTTPDownloader = mSession->getHTTPDownloader();
176
177 memset(mKeyData, 0, sizeof(mKeyData));
178 memset(mAESInitVec, 0, sizeof(mAESInitVec));
179 }
180
~PlaylistFetcher()181 PlaylistFetcher::~PlaylistFetcher() {
182 }
183
getFetcherID() const184 int32_t PlaylistFetcher::getFetcherID() const {
185 return mFetcherID;
186 }
187
getSegmentStartTimeUs(int32_t seqNumber) const188 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
189 CHECK(mPlaylist != NULL);
190
191 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
192 mPlaylist->getSeqNumberRange(
193 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
194
195 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
196 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
197
198 int64_t segmentStartUs = 0ll;
199 for (int32_t index = 0;
200 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
201 sp<AMessage> itemMeta;
202 CHECK(mPlaylist->itemAt(
203 index, NULL /* uri */, &itemMeta));
204
205 int64_t itemDurationUs;
206 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
207
208 segmentStartUs += itemDurationUs;
209 }
210
211 return segmentStartUs;
212 }
213
getSegmentDurationUs(int32_t seqNumber) const214 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
215 CHECK(mPlaylist != NULL);
216
217 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
218 mPlaylist->getSeqNumberRange(
219 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
220
221 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
222 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
223
224 int32_t index = seqNumber - firstSeqNumberInPlaylist;
225 sp<AMessage> itemMeta;
226 CHECK(mPlaylist->itemAt(
227 index, NULL /* uri */, &itemMeta));
228
229 int64_t itemDurationUs;
230 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
231
232 return itemDurationUs;
233 }
234
delayUsToRefreshPlaylist() const235 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
236 int64_t nowUs = ALooper::GetNowUs();
237
238 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
239 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
240 return 0ll;
241 }
242
243 if (mPlaylist->isComplete()) {
244 return (~0llu >> 1);
245 }
246
247 int64_t targetDurationUs = mPlaylist->getTargetDuration();
248
249 int64_t minPlaylistAgeUs;
250
251 switch (mRefreshState) {
252 case INITIAL_MINIMUM_RELOAD_DELAY:
253 {
254 size_t n = mPlaylist->size();
255 if (n > 0) {
256 sp<AMessage> itemMeta;
257 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
258
259 int64_t itemDurationUs;
260 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
261
262 minPlaylistAgeUs = itemDurationUs;
263 break;
264 }
265
266 // fall through
267 }
268
269 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
270 {
271 minPlaylistAgeUs = targetDurationUs / 2;
272 break;
273 }
274
275 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
276 {
277 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
278 break;
279 }
280
281 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
282 {
283 minPlaylistAgeUs = targetDurationUs * 3;
284 break;
285 }
286
287 default:
288 TRESPASS();
289 break;
290 }
291
292 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
293 return delayUs > 0ll ? delayUs : 0ll;
294 }
295
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)296 status_t PlaylistFetcher::decryptBuffer(
297 size_t playlistIndex, const sp<ABuffer> &buffer,
298 bool first) {
299 sp<AMessage> itemMeta;
300 bool found = false;
301 AString method;
302
303 for (ssize_t i = playlistIndex; i >= 0; --i) {
304 AString uri;
305 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
306
307 if (itemMeta->findString("cipher-method", &method)) {
308 found = true;
309 break;
310 }
311 }
312
313 // TODO: Revise this when we add support for KEYFORMAT
314 // If method has changed (e.g., -> NONE); sufficient to check at the segment boundary
315 if (mSampleAesKeyItem != NULL && first && found && method != "SAMPLE-AES") {
316 ALOGI("decryptBuffer: resetting mSampleAesKeyItem(%p) with method %s",
317 mSampleAesKeyItem.get(), method.c_str());
318 mSampleAesKeyItem = NULL;
319 mSampleAesKeyItemChanged = true;
320 }
321
322 if (!found) {
323 method = "NONE";
324 }
325 buffer->meta()->setString("cipher-method", method.c_str());
326
327 if (method == "NONE") {
328 return OK;
329 } else if (method == "SAMPLE-AES") {
330 ALOGV("decryptBuffer: Non-Widevine SAMPLE-AES is supported now.");
331 } else if (!(method == "AES-128")) {
332 ALOGE("Unsupported cipher method '%s'", method.c_str());
333 return ERROR_UNSUPPORTED;
334 }
335
336 AString keyURI;
337 if (!itemMeta->findString("cipher-uri", &keyURI)) {
338 ALOGE("Missing key uri");
339 return ERROR_MALFORMED;
340 }
341
342 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
343
344 sp<ABuffer> key;
345 if (index >= 0) {
346 key = mAESKeyForURI.valueAt(index);
347 } else {
348 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
349
350 if (err == ERROR_NOT_CONNECTED) {
351 return ERROR_NOT_CONNECTED;
352 } else if (err < 0) {
353 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
354 return ERROR_IO;
355 } else if (key->size() != 16) {
356 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
357 return ERROR_MALFORMED;
358 }
359
360 mAESKeyForURI.add(keyURI, key);
361 }
362
363 if (first) {
364 // If decrypting the first block in a file, read the iv from the manifest
365 // or derive the iv from the file's sequence number.
366
367 unsigned char AESInitVec[AES_BLOCK_SIZE];
368 AString iv;
369 if (itemMeta->findString("cipher-iv", &iv)) {
370 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
371 || iv.size() > 16 * 2 + 2) {
372 ALOGE("malformed cipher IV '%s'.", iv.c_str());
373 return ERROR_MALFORMED;
374 }
375
376 while (iv.size() < 16 * 2 + 2) {
377 iv.insert("0", 1, 2);
378 }
379
380 memset(AESInitVec, 0, sizeof(AESInitVec));
381 for (size_t i = 0; i < 16; ++i) {
382 char c1 = tolower(iv.c_str()[2 + 2 * i]);
383 char c2 = tolower(iv.c_str()[3 + 2 * i]);
384 if (!isxdigit(c1) || !isxdigit(c2)) {
385 ALOGE("malformed cipher IV '%s'.", iv.c_str());
386 return ERROR_MALFORMED;
387 }
388 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
389 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
390
391 AESInitVec[i] = nibble1 << 4 | nibble2;
392 }
393 } else {
394 memset(AESInitVec, 0, sizeof(AESInitVec));
395 AESInitVec[15] = mSeqNumber & 0xff;
396 AESInitVec[14] = (mSeqNumber >> 8) & 0xff;
397 AESInitVec[13] = (mSeqNumber >> 16) & 0xff;
398 AESInitVec[12] = (mSeqNumber >> 24) & 0xff;
399 }
400
401 bool newKey = memcmp(mKeyData, key->data(), AES_BLOCK_SIZE) != 0;
402 bool newInitVec = memcmp(mAESInitVec, AESInitVec, AES_BLOCK_SIZE) != 0;
403 bool newSampleAesKeyItem = newKey || newInitVec;
404 ALOGV("decryptBuffer: SAMPLE-AES newKeyItem %d/%d (Key %d initVec %d)",
405 mSampleAesKeyItemChanged, newSampleAesKeyItem, newKey, newInitVec);
406
407 if (newSampleAesKeyItem) {
408 memcpy(mKeyData, key->data(), AES_BLOCK_SIZE);
409 memcpy(mAESInitVec, AESInitVec, AES_BLOCK_SIZE);
410
411 if (method == "SAMPLE-AES") {
412 mSampleAesKeyItemChanged = true;
413
414 sp<ABuffer> keyDataBuffer = ABuffer::CreateAsCopy(mKeyData, sizeof(mKeyData));
415 sp<ABuffer> initVecBuffer = ABuffer::CreateAsCopy(mAESInitVec, sizeof(mAESInitVec));
416
417 // always allocating a new one rather than updating the old message
418 // lower layer might still have a reference to the old message
419 mSampleAesKeyItem = new AMessage();
420 mSampleAesKeyItem->setBuffer("keyData", keyDataBuffer);
421 mSampleAesKeyItem->setBuffer("initVec", initVecBuffer);
422
423 ALOGV("decryptBuffer: New SampleAesKeyItem: Key: %s IV: %s",
424 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
425 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
426 } // SAMPLE-AES
427 } // newSampleAesKeyItem
428 } // first
429
430 if (method == "SAMPLE-AES") {
431 ALOGV("decryptBuffer: skipping full-seg decrypt for SAMPLE-AES");
432 return OK;
433 }
434
435
436 AES_KEY aes_key;
437 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
438 ALOGE("failed to set AES decryption key.");
439 return UNKNOWN_ERROR;
440 }
441
442 size_t n = buffer->size();
443 if (!n) {
444 return OK;
445 }
446
447 if (n < 16 || n % 16) {
448 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
449 return ERROR_MALFORMED;
450 }
451
452 AES_cbc_encrypt(
453 buffer->data(), buffer->data(), buffer->size(),
454 &aes_key, mAESInitVec, AES_DECRYPT);
455
456 return OK;
457 }
458
checkDecryptPadding(const sp<ABuffer> & buffer)459 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
460 AString method;
461 CHECK(buffer->meta()->findString("cipher-method", &method));
462 if (method == "NONE" || method == "SAMPLE-AES") {
463 return OK;
464 }
465
466 uint8_t padding = 0;
467 if (buffer->size() > 0) {
468 padding = buffer->data()[buffer->size() - 1];
469 }
470
471 if (padding > 16) {
472 return ERROR_MALFORMED;
473 }
474
475 for (size_t i = buffer->size() - padding; i < padding; i++) {
476 if (buffer->data()[i] != padding) {
477 return ERROR_MALFORMED;
478 }
479 }
480
481 buffer->setRange(buffer->offset(), buffer->size() - padding);
482 return OK;
483 }
484
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)485 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
486 int64_t maxDelayUs = delayUsToRefreshPlaylist();
487 if (maxDelayUs < minDelayUs) {
488 maxDelayUs = minDelayUs;
489 }
490 if (delayUs > maxDelayUs) {
491 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
492 delayUs = maxDelayUs;
493 }
494 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
495 msg->setInt32("generation", mMonitorQueueGeneration);
496 msg->post(delayUs);
497 }
498
cancelMonitorQueue()499 void PlaylistFetcher::cancelMonitorQueue() {
500 ++mMonitorQueueGeneration;
501 }
502
setStoppingThreshold(float thresholdRatio,bool disconnect)503 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
504 {
505 AutoMutex _l(mThresholdLock);
506 mThresholdRatio = thresholdRatio;
507 }
508 if (disconnect) {
509 mHTTPDownloader->disconnect();
510 }
511 }
512
resetStoppingThreshold(bool disconnect)513 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
514 {
515 AutoMutex _l(mThresholdLock);
516 mThresholdRatio = -1.0f;
517 }
518 if (disconnect) {
519 mHTTPDownloader->disconnect();
520 } else {
521 // allow reconnect
522 mHTTPDownloader->reconnect();
523 }
524 }
525
getStoppingThreshold()526 float PlaylistFetcher::getStoppingThreshold() {
527 AutoMutex _l(mThresholdLock);
528 return mThresholdRatio;
529 }
530
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)531 void PlaylistFetcher::startAsync(
532 const sp<AnotherPacketSource> &audioSource,
533 const sp<AnotherPacketSource> &videoSource,
534 const sp<AnotherPacketSource> &subtitleSource,
535 const sp<AnotherPacketSource> &metadataSource,
536 int64_t startTimeUs,
537 int64_t segmentStartTimeUs,
538 int32_t startDiscontinuitySeq,
539 LiveSession::SeekMode seekMode) {
540 sp<AMessage> msg = new AMessage(kWhatStart, this);
541
542 uint32_t streamTypeMask = 0ul;
543
544 if (audioSource != NULL) {
545 msg->setPointer("audioSource", audioSource.get());
546 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
547 }
548
549 if (videoSource != NULL) {
550 msg->setPointer("videoSource", videoSource.get());
551 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
552 }
553
554 if (subtitleSource != NULL) {
555 msg->setPointer("subtitleSource", subtitleSource.get());
556 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
557 }
558
559 if (metadataSource != NULL) {
560 msg->setPointer("metadataSource", metadataSource.get());
561 // metadataSource does not affect streamTypeMask.
562 }
563
564 msg->setInt32("streamTypeMask", streamTypeMask);
565 msg->setInt64("startTimeUs", startTimeUs);
566 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
567 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
568 msg->setInt32("seekMode", seekMode);
569 msg->post();
570 }
571
572 /*
573 * pauseAsync
574 *
575 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
576 * -1.0f - pause after finishing current segment
577 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
578 */
pauseAsync(float thresholdRatio,bool disconnect)579 void PlaylistFetcher::pauseAsync(
580 float thresholdRatio, bool disconnect) {
581 setStoppingThreshold(thresholdRatio, disconnect);
582
583 (new AMessage(kWhatPause, this))->post();
584 }
585
stopAsync(bool clear)586 void PlaylistFetcher::stopAsync(bool clear) {
587 setStoppingThreshold(0.0f, true /* disconncect */);
588
589 sp<AMessage> msg = new AMessage(kWhatStop, this);
590 msg->setInt32("clear", clear);
591 msg->post();
592 }
593
resumeUntilAsync(const sp<AMessage> & params)594 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
595 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
596
597 AMessage* msg = new AMessage(kWhatResumeUntil, this);
598 msg->setMessage("params", params);
599 msg->post();
600 }
601
fetchPlaylistAsync()602 void PlaylistFetcher::fetchPlaylistAsync() {
603 (new AMessage(kWhatFetchPlaylist, this))->post();
604 }
605
onMessageReceived(const sp<AMessage> & msg)606 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
607 switch (msg->what()) {
608 case kWhatStart:
609 {
610 status_t err = onStart(msg);
611
612 sp<AMessage> notify = mNotify->dup();
613 notify->setInt32("what", kWhatStarted);
614 notify->setInt32("err", err);
615 notify->post();
616 break;
617 }
618
619 case kWhatPause:
620 {
621 onPause();
622
623 sp<AMessage> notify = mNotify->dup();
624 notify->setInt32("what", kWhatPaused);
625 notify->setInt32("seekMode",
626 mDownloadState->hasSavedState()
627 ? LiveSession::kSeekModeNextSample
628 : LiveSession::kSeekModeNextSegment);
629 notify->post();
630 break;
631 }
632
633 case kWhatStop:
634 {
635 onStop(msg);
636
637 sp<AMessage> notify = mNotify->dup();
638 notify->setInt32("what", kWhatStopped);
639 notify->post();
640 break;
641 }
642
643 case kWhatFetchPlaylist:
644 {
645 bool unchanged;
646 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
647 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
648
649 sp<AMessage> notify = mNotify->dup();
650 notify->setInt32("what", kWhatPlaylistFetched);
651 notify->setObject("playlist", playlist);
652 notify->post();
653 break;
654 }
655
656 case kWhatMonitorQueue:
657 case kWhatDownloadNext:
658 {
659 int32_t generation;
660 CHECK(msg->findInt32("generation", &generation));
661
662 if (generation != mMonitorQueueGeneration) {
663 // Stale event
664 break;
665 }
666
667 if (msg->what() == kWhatMonitorQueue) {
668 onMonitorQueue();
669 } else {
670 onDownloadNext();
671 }
672 break;
673 }
674
675 case kWhatResumeUntil:
676 {
677 onResumeUntil(msg);
678 break;
679 }
680
681 default:
682 TRESPASS();
683 }
684 }
685
onStart(const sp<AMessage> & msg)686 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
687 mPacketSources.clear();
688 mStopParams.clear();
689 mStartTimeUsNotify = mNotify->dup();
690 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
691 mStartTimeUsNotify->setString("uri", mURI);
692
693 uint32_t streamTypeMask;
694 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
695
696 int64_t startTimeUs;
697 int64_t segmentStartTimeUs;
698 int32_t startDiscontinuitySeq;
699 int32_t seekMode;
700 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
701 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
702 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
703 CHECK(msg->findInt32("seekMode", &seekMode));
704
705 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
706 void *ptr;
707 CHECK(msg->findPointer("audioSource", &ptr));
708
709 mPacketSources.add(
710 LiveSession::STREAMTYPE_AUDIO,
711 static_cast<AnotherPacketSource *>(ptr));
712 }
713
714 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
715 void *ptr;
716 CHECK(msg->findPointer("videoSource", &ptr));
717
718 mPacketSources.add(
719 LiveSession::STREAMTYPE_VIDEO,
720 static_cast<AnotherPacketSource *>(ptr));
721 }
722
723 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
724 void *ptr;
725 CHECK(msg->findPointer("subtitleSource", &ptr));
726
727 mPacketSources.add(
728 LiveSession::STREAMTYPE_SUBTITLES,
729 static_cast<AnotherPacketSource *>(ptr));
730 }
731
732 void *ptr;
733 // metadataSource is not part of streamTypeMask
734 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
735 && msg->findPointer("metadataSource", &ptr)) {
736 mPacketSources.add(
737 LiveSession::STREAMTYPE_METADATA,
738 static_cast<AnotherPacketSource *>(ptr));
739 }
740
741 mStreamTypeMask = streamTypeMask;
742
743 mSegmentStartTimeUs = segmentStartTimeUs;
744
745 if (startDiscontinuitySeq >= 0) {
746 mDiscontinuitySeq = startDiscontinuitySeq;
747 }
748
749 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
750 mSeekMode = (LiveSession::SeekMode) seekMode;
751
752 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
753 mStartup = true;
754 mIDRFound = false;
755 mVideoBuffer->clear();
756 }
757
758 if (startTimeUs >= 0) {
759 mStartTimeUs = startTimeUs;
760 mFirstPTSValid = false;
761 mSeqNumber = -1;
762 mTimeChangeSignaled = false;
763 mDownloadState->resetState();
764 }
765
766 postMonitorQueue();
767
768 return OK;
769 }
770
onPause()771 void PlaylistFetcher::onPause() {
772 cancelMonitorQueue();
773 mLastDiscontinuitySeq = mDiscontinuitySeq;
774
775 resetStoppingThreshold(false /* disconnect */);
776 }
777
onStop(const sp<AMessage> & msg)778 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
779 cancelMonitorQueue();
780
781 int32_t clear;
782 CHECK(msg->findInt32("clear", &clear));
783 if (clear) {
784 for (size_t i = 0; i < mPacketSources.size(); i++) {
785 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
786 packetSource->clear();
787 }
788 }
789
790 mDownloadState->resetState();
791 mPacketSources.clear();
792 mStreamTypeMask = 0;
793
794 resetStoppingThreshold(true /* disconnect */);
795 }
796
797 // Resume until we have reached the boundary timestamps listed in `msg`; when
798 // the remaining time is too short (within a resume threshold) stop immediately
799 // instead.
onResumeUntil(const sp<AMessage> & msg)800 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
801 sp<AMessage> params;
802 CHECK(msg->findMessage("params", ¶ms));
803
804 mStopParams = params;
805 onDownloadNext();
806
807 return OK;
808 }
809
notifyStopReached()810 void PlaylistFetcher::notifyStopReached() {
811 sp<AMessage> notify = mNotify->dup();
812 notify->setInt32("what", kWhatStopReached);
813 notify->post();
814 }
815
notifyError(status_t err)816 void PlaylistFetcher::notifyError(status_t err) {
817 sp<AMessage> notify = mNotify->dup();
818 notify->setInt32("what", kWhatError);
819 notify->setInt32("err", err);
820 notify->post();
821 }
822
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)823 void PlaylistFetcher::queueDiscontinuity(
824 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
825 for (size_t i = 0; i < mPacketSources.size(); ++i) {
826 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
827 // (seek will discard buffer by abandoning old fetchers)
828 mPacketSources.valueAt(i)->queueDiscontinuity(
829 type, extra, false /* discard */);
830 }
831 }
832
onMonitorQueue()833 void PlaylistFetcher::onMonitorQueue() {
834 // in the middle of an unfinished download, delay
835 // playlist refresh as it'll change seq numbers
836 if (!mDownloadState->hasSavedState()) {
837 refreshPlaylist();
838 }
839
840 int64_t targetDurationUs = kMinBufferedDurationUs;
841 if (mPlaylist != NULL) {
842 targetDurationUs = mPlaylist->getTargetDuration();
843 }
844
845 int64_t bufferedDurationUs = 0ll;
846 status_t finalResult = OK;
847 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
848 sp<AnotherPacketSource> packetSource =
849 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
850
851 bufferedDurationUs =
852 packetSource->getBufferedDurationUs(&finalResult);
853 } else {
854 // Use min stream duration, but ignore streams that never have any packet
855 // enqueued to prevent us from waiting on a non-existent stream;
856 // when we cannot make out from the manifest what streams are included in
857 // a playlist we might assume extra streams.
858 bufferedDurationUs = -1ll;
859 for (size_t i = 0; i < mPacketSources.size(); ++i) {
860 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
861 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
862 continue;
863 }
864
865 int64_t bufferedStreamDurationUs =
866 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
867
868 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
869
870 if (bufferedDurationUs == -1ll
871 || bufferedStreamDurationUs < bufferedDurationUs) {
872 bufferedDurationUs = bufferedStreamDurationUs;
873 }
874 }
875 if (bufferedDurationUs == -1ll) {
876 bufferedDurationUs = 0ll;
877 }
878 }
879
880 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
881 FLOGV("monitoring, buffered=%lld < %lld",
882 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
883
884 // delay the next download slightly; hopefully this gives other concurrent fetchers
885 // a better chance to run.
886 // onDownloadNext();
887 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
888 msg->setInt32("generation", mMonitorQueueGeneration);
889 msg->post(1000l);
890 } else {
891 // We'd like to maintain buffering above durationToBufferUs, so try
892 // again when buffer just about to go below durationToBufferUs
893 // (or after targetDurationUs / 2, whichever is smaller).
894 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
895 if (delayUs > targetDurationUs / 2) {
896 delayUs = targetDurationUs / 2;
897 }
898
899 FLOGV("pausing for %lld, buffered=%lld > %lld",
900 (long long)delayUs,
901 (long long)bufferedDurationUs,
902 (long long)kMinBufferedDurationUs);
903
904 postMonitorQueue(delayUs);
905 }
906 }
907
refreshPlaylist()908 status_t PlaylistFetcher::refreshPlaylist() {
909 if (delayUsToRefreshPlaylist() <= 0) {
910 bool unchanged;
911 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
912 mURI.c_str(), mPlaylistHash, &unchanged);
913
914 if (playlist == NULL) {
915 if (unchanged) {
916 // We succeeded in fetching the playlist, but it was
917 // unchanged from the last time we tried.
918
919 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
920 mRefreshState = (RefreshState)(mRefreshState + 1);
921 }
922 } else {
923 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
924 return ERROR_IO;
925 }
926 } else {
927 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
928 mPlaylist = playlist;
929
930 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
931 updateDuration();
932 }
933 // Notify LiveSession to use target-duration based buffering level
934 // for up/down switch. Default LiveSession::kUpSwitchMark may not
935 // be reachable for live streams, as our max buffering amount is
936 // limited to 3 segments.
937 if (!mPlaylist->isComplete()) {
938 updateTargetDuration();
939 }
940 mPlaylistTimeUs = ALooper::GetNowUs();
941 }
942
943 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
944 }
945 return OK;
946 }
947
948 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)949 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
950 return buffer->size() > 0 && buffer->data()[0] == 0x47;
951 }
952
shouldPauseDownload()953 bool PlaylistFetcher::shouldPauseDownload() {
954 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
955 // doesn't apply to subtitles
956 return false;
957 }
958
959 // Calculate threshold to abort current download
960 float thresholdRatio = getStoppingThreshold();
961
962 if (thresholdRatio < 0.0f) {
963 // never abort
964 return false;
965 } else if (thresholdRatio == 0.0f) {
966 // immediately abort
967 return true;
968 }
969
970 // now we have a positive thresholdUs, abort if remaining
971 // portion to download is over that threshold.
972 if (mSegmentFirstPTS < 0) {
973 // this means we haven't even find the first access unit,
974 // abort now as we must be very far away from the end.
975 return true;
976 }
977 int64_t lastEnqueueUs = mSegmentFirstPTS;
978 for (size_t i = 0; i < mPacketSources.size(); ++i) {
979 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
980 continue;
981 }
982 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
983 int32_t type;
984 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
985 continue;
986 }
987 int64_t tmpUs;
988 CHECK(meta->findInt64("timeUs", &tmpUs));
989 if (tmpUs > lastEnqueueUs) {
990 lastEnqueueUs = tmpUs;
991 }
992 }
993 lastEnqueueUs -= mSegmentFirstPTS;
994
995 int64_t targetDurationUs = mPlaylist->getTargetDuration();
996 int64_t thresholdUs = thresholdRatio * targetDurationUs;
997
998 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
999 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
1000 (long long)thresholdUs,
1001 (long long)(targetDurationUs - lastEnqueueUs));
1002
1003 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
1004 return true;
1005 }
1006 return false;
1007 }
1008
initSeqNumberForLiveStream(int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1009 void PlaylistFetcher::initSeqNumberForLiveStream(
1010 int32_t &firstSeqNumberInPlaylist,
1011 int32_t &lastSeqNumberInPlaylist) {
1012 // start at least 3 target durations from the end.
1013 int64_t timeFromEnd = 0;
1014 size_t index = mPlaylist->size();
1015 sp<AMessage> itemMeta;
1016 int64_t itemDurationUs;
1017 int32_t targetDuration;
1018 if (mPlaylist->meta()->findInt32("target-duration", &targetDuration)) {
1019 do {
1020 --index;
1021 if (!mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)
1022 || !itemMeta->findInt64("durationUs", &itemDurationUs)) {
1023 ALOGW("item or itemDurationUs missing");
1024 mSeqNumber = lastSeqNumberInPlaylist - 3;
1025 break;
1026 }
1027
1028 timeFromEnd += itemDurationUs;
1029 mSeqNumber = firstSeqNumberInPlaylist + index;
1030 } while (timeFromEnd < targetDuration * 3E6 && index > 0);
1031 } else {
1032 ALOGW("target-duration missing");
1033 mSeqNumber = lastSeqNumberInPlaylist - 3;
1034 }
1035
1036 if (mSeqNumber < firstSeqNumberInPlaylist) {
1037 mSeqNumber = firstSeqNumberInPlaylist;
1038 }
1039 }
1040
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)1041 bool PlaylistFetcher::initDownloadState(
1042 AString &uri,
1043 sp<AMessage> &itemMeta,
1044 int32_t &firstSeqNumberInPlaylist,
1045 int32_t &lastSeqNumberInPlaylist) {
1046 status_t err = refreshPlaylist();
1047 firstSeqNumberInPlaylist = 0;
1048 lastSeqNumberInPlaylist = 0;
1049 bool discontinuity = false;
1050
1051 if (mPlaylist != NULL) {
1052 mPlaylist->getSeqNumberRange(
1053 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
1054
1055 if (mDiscontinuitySeq < 0) {
1056 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
1057 }
1058 }
1059
1060 mSegmentFirstPTS = -1ll;
1061
1062 if (mPlaylist != NULL && mSeqNumber < 0) {
1063 CHECK_GE(mStartTimeUs, 0ll);
1064
1065 if (mSegmentStartTimeUs < 0) {
1066 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1067 // this is a live session
1068 initSeqNumberForLiveStream(firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1069 } else {
1070 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
1071 // use mStartTimeUs (client supplied timestamp) to determine both start segment
1072 // and relative position inside a segment
1073 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
1074 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
1075 }
1076 mStartTimeUsRelative = true;
1077 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
1078 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
1079 lastSeqNumberInPlaylist);
1080 } else {
1081 // When adapting or track switching, mSegmentStartTimeUs (relative
1082 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
1083 // timestamps coming from the media container) is used to determine the position
1084 // inside a segments.
1085 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
1086 && mSeekMode != LiveSession::kSeekModeNextSample) {
1087 // avoid double fetch/decode
1088 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1089 // for the starting segment in new variant.
1090 // If the two variants' segments are aligned, this gives the
1091 // next segment. If they're not aligned, this gives the segment
1092 // that overlaps no more than 1/2 * targetDurationUs.
1093 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1094 + mPlaylist->getTargetDuration() / 2);
1095 } else {
1096 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1097 }
1098 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1099 if (mSeqNumber < minSeq) {
1100 mSeqNumber = minSeq;
1101 }
1102
1103 if (mSeqNumber < firstSeqNumberInPlaylist) {
1104 mSeqNumber = firstSeqNumberInPlaylist;
1105 }
1106
1107 if (mSeqNumber > lastSeqNumberInPlaylist) {
1108 mSeqNumber = lastSeqNumberInPlaylist;
1109 }
1110 FLOGV("Initial sequence number is %d from (%d .. %d)",
1111 mSeqNumber, firstSeqNumberInPlaylist,
1112 lastSeqNumberInPlaylist);
1113 }
1114 }
1115
1116 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1117 if (mSeqNumber < firstSeqNumberInPlaylist
1118 || mSeqNumber > lastSeqNumberInPlaylist
1119 || err != OK) {
1120 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1121 ++mNumRetries;
1122
1123 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1124 // make sure we reach this retry logic on refresh failures
1125 // by adding an err != OK clause to all enclosing if's.
1126
1127 // refresh in increasing fraction (1/2, 1/3, ...) of the
1128 // playlist's target duration or 3 seconds, whichever is less
1129 int64_t delayUs = kMaxMonitorDelayUs;
1130 if (mPlaylist != NULL) {
1131 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1132 / (1 + mNumRetries);
1133 }
1134 if (delayUs > kMaxMonitorDelayUs) {
1135 delayUs = kMaxMonitorDelayUs;
1136 }
1137 FLOGV("sequence number high: %d from (%d .. %d), "
1138 "monitor in %lld (retry=%d)",
1139 mSeqNumber, firstSeqNumberInPlaylist,
1140 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1141 postMonitorQueue(delayUs);
1142 return false;
1143 }
1144
1145 if (err != OK) {
1146 notifyError(err);
1147 return false;
1148 }
1149
1150 // we've missed the boat, let's start 3 segments prior to the latest sequence
1151 // number available and signal a discontinuity.
1152
1153 ALOGI("We've missed the boat, restarting playback."
1154 " mStartup=%d, was looking for %d in %d-%d",
1155 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1156 lastSeqNumberInPlaylist);
1157 if (mStopParams != NULL) {
1158 // we should have kept on fetching until we hit the boundaries in mStopParams,
1159 // but since the segments we are supposed to fetch have already rolled off
1160 // the playlist, i.e. we have already missed the boat, we inevitably have to
1161 // skip.
1162 notifyStopReached();
1163 return false;
1164 }
1165 mSeqNumber = lastSeqNumberInPlaylist - 3;
1166 if (mSeqNumber < firstSeqNumberInPlaylist) {
1167 mSeqNumber = firstSeqNumberInPlaylist;
1168 }
1169 discontinuity = true;
1170
1171 // fall through
1172 } else {
1173 if (mPlaylist != NULL) {
1174 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
1175 && !mPlaylist->isComplete()) {
1176 // Live playlists
1177 ALOGW("sequence number %d not yet available", mSeqNumber);
1178 postMonitorQueue(delayUsToRefreshPlaylist());
1179 return false;
1180 }
1181 ALOGE("Cannot find sequence number %d in playlist "
1182 "(contains %d - %d)",
1183 mSeqNumber, firstSeqNumberInPlaylist,
1184 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1185
1186 if (mTSParser != NULL) {
1187 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1188 // Use an empty buffer; we don't have any new data, just want to extract
1189 // potential new access units after flush. Reset mSeqNumber to
1190 // lastSeqNumberInPlaylist such that we set the correct access unit
1191 // properties in extractAndQueueAccessUnitsFromTs.
1192 sp<ABuffer> buffer = new ABuffer(0);
1193 mSeqNumber = lastSeqNumberInPlaylist;
1194 extractAndQueueAccessUnitsFromTs(buffer);
1195 }
1196 notifyError(ERROR_END_OF_STREAM);
1197 } else {
1198 // It's possible that we were never able to download the playlist.
1199 // In this case we should notify error, instead of EOS, as EOS during
1200 // prepare means we succeeded in downloading everything.
1201 ALOGE("Failed to download playlist!");
1202 notifyError(ERROR_IO);
1203 }
1204
1205 return false;
1206 }
1207 }
1208
1209 mNumRetries = 0;
1210
1211 CHECK(mPlaylist->itemAt(
1212 mSeqNumber - firstSeqNumberInPlaylist,
1213 &uri,
1214 &itemMeta));
1215
1216 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1217
1218 int32_t val;
1219 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1220 discontinuity = true;
1221 } else if (mLastDiscontinuitySeq >= 0
1222 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1223 // Seek jumped to a new discontinuity sequence. We need to signal
1224 // a format change to decoder. Decoder needs to shutdown and be
1225 // created again if seamless format change is unsupported.
1226 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1227 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1228 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1229 discontinuity = true;
1230 }
1231 mLastDiscontinuitySeq = -1;
1232
1233 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1234 // this avoids interleaved connections to the key and segment file.
1235 {
1236 sp<ABuffer> junk = new ABuffer(16);
1237 junk->setRange(0, 16);
1238 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1239 true /* first */);
1240 if (err == ERROR_NOT_CONNECTED) {
1241 return false;
1242 } else if (err != OK) {
1243 notifyError(err);
1244 return false;
1245 }
1246 }
1247
1248 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1249 // We need to signal a time discontinuity to ATSParser on the
1250 // first segment after start, or on a discontinuity segment.
1251 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1252 // to send the time discontinuity.
1253 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1254 // If this was a live event this made no sense since
1255 // we don't have access to all the segment before the current
1256 // one.
1257 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1258 }
1259
1260 // Setting mTimeChangeSignaled to true, so that if start time
1261 // searching goes into 2nd segment (without a discontinuity),
1262 // we don't reset time again. It causes corruption when pending
1263 // data in ATSParser is cleared.
1264 mTimeChangeSignaled = true;
1265 }
1266
1267 if (discontinuity) {
1268 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1269
1270 // Signal a format discontinuity to ATSParser to clear partial data
1271 // from previous streams. Not doing this causes bitstream corruption.
1272 if (mTSParser != NULL) {
1273 mTSParser.clear();
1274 }
1275
1276 queueDiscontinuity(
1277 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1278 NULL /* extra */);
1279
1280 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1281 // This means we guessed mStartTimeUs to be in the previous
1282 // segment (likely very close to the end), but either video or
1283 // audio has not found start by the end of that segment.
1284 //
1285 // If this new segment is not a discontinuity, keep searching.
1286 //
1287 // If this new segment even got a discontinuity marker, just
1288 // set mStartTimeUs=0, and take all samples from now on.
1289 mStartTimeUs = 0;
1290 mFirstPTSValid = false;
1291 mIDRFound = false;
1292 mVideoBuffer->clear();
1293 }
1294 }
1295
1296 FLOGV("fetching segment %d from (%d .. %d)",
1297 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1298 return true;
1299 }
1300
onDownloadNext()1301 void PlaylistFetcher::onDownloadNext() {
1302 AString uri;
1303 sp<AMessage> itemMeta;
1304 sp<ABuffer> buffer;
1305 sp<ABuffer> tsBuffer;
1306 int32_t firstSeqNumberInPlaylist = 0;
1307 int32_t lastSeqNumberInPlaylist = 0;
1308 bool connectHTTP = true;
1309
1310 if (mDownloadState->hasSavedState()) {
1311 mDownloadState->restoreState(
1312 uri,
1313 itemMeta,
1314 buffer,
1315 tsBuffer,
1316 firstSeqNumberInPlaylist,
1317 lastSeqNumberInPlaylist);
1318 connectHTTP = false;
1319 FLOGV("resuming: '%s'", uri.c_str());
1320 } else {
1321 if (!initDownloadState(
1322 uri,
1323 itemMeta,
1324 firstSeqNumberInPlaylist,
1325 lastSeqNumberInPlaylist)) {
1326 return;
1327 }
1328 FLOGV("fetching: '%s'", uri.c_str());
1329 }
1330
1331 int64_t range_offset, range_length;
1332 if (!itemMeta->findInt64("range-offset", &range_offset)
1333 || !itemMeta->findInt64("range-length", &range_length)) {
1334 range_offset = 0;
1335 range_length = -1;
1336 }
1337
1338 // block-wise download
1339 bool shouldPause = false;
1340 ssize_t bytesRead;
1341 do {
1342 int64_t startUs = ALooper::GetNowUs();
1343 bytesRead = mHTTPDownloader->fetchBlock(
1344 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1345 NULL /* actualURL */, connectHTTP);
1346 int64_t delayUs = ALooper::GetNowUs() - startUs;
1347
1348 if (bytesRead == ERROR_NOT_CONNECTED) {
1349 return;
1350 }
1351 if (bytesRead < 0) {
1352 status_t err = bytesRead;
1353 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1354 notifyError(err);
1355 return;
1356 }
1357
1358 // add sample for bandwidth estimation, excluding samples from subtitles (as
1359 // its too small), or during startup/resumeUntil (when we could have more than
1360 // one connection open which affects bandwidth)
1361 if (!mStartup && mStopParams == NULL && bytesRead > 0
1362 && (mStreamTypeMask
1363 & (LiveSession::STREAMTYPE_AUDIO
1364 | LiveSession::STREAMTYPE_VIDEO))) {
1365 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1366 if (delayUs > 2000000ll) {
1367 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1368 bytesRead, (double)delayUs / 1.0e6);
1369 }
1370 }
1371
1372 connectHTTP = false;
1373
1374 CHECK(buffer != NULL);
1375
1376 size_t size = buffer->size();
1377 // Set decryption range.
1378 buffer->setRange(size - bytesRead, bytesRead);
1379 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1380 buffer->offset() == 0 /* first */);
1381 // Unset decryption range.
1382 buffer->setRange(0, size);
1383
1384 if (err != OK) {
1385 ALOGE("decryptBuffer failed w/ error %d", err);
1386
1387 notifyError(err);
1388 return;
1389 }
1390
1391 bool startUp = mStartup; // save current start up state
1392
1393 err = OK;
1394 if (bufferStartsWithTsSyncByte(buffer)) {
1395 // Incremental extraction is only supported for MPEG2 transport streams.
1396 if (tsBuffer == NULL) {
1397 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1398 tsBuffer->setRange(0, 0);
1399 } else if (tsBuffer->capacity() != buffer->capacity()) {
1400 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1401 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1402 tsBuffer->setRange(tsOff, tsSize);
1403 }
1404 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1405 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1406 }
1407
1408 if (err == -EAGAIN) {
1409 // starting sequence number too low/high
1410 mTSParser.clear();
1411 for (size_t i = 0; i < mPacketSources.size(); i++) {
1412 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1413 packetSource->clear();
1414 }
1415 postMonitorQueue();
1416 return;
1417 } else if (err == ERROR_OUT_OF_RANGE) {
1418 // reached stopping point
1419 notifyStopReached();
1420 return;
1421 } else if (err != OK) {
1422 notifyError(err);
1423 return;
1424 }
1425 // If we're switching, post start notification
1426 // this should only be posted when the last chunk is full processed by TSParser
1427 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1428 CHECK(mStartTimeUsNotify != NULL);
1429 mStartTimeUsNotify->post();
1430 mStartTimeUsNotify.clear();
1431 shouldPause = true;
1432 }
1433 if (shouldPause || shouldPauseDownload()) {
1434 // save state and return if this is not the last chunk,
1435 // leaving the fetcher in paused state.
1436 if (bytesRead != 0) {
1437 mDownloadState->saveState(
1438 uri,
1439 itemMeta,
1440 buffer,
1441 tsBuffer,
1442 firstSeqNumberInPlaylist,
1443 lastSeqNumberInPlaylist);
1444 return;
1445 }
1446 shouldPause = true;
1447 }
1448 } while (bytesRead != 0);
1449
1450 if (bufferStartsWithTsSyncByte(buffer)) {
1451 // If we don't see a stream in the program table after fetching a full ts segment
1452 // mark it as nonexistent.
1453 ATSParser::SourceType srcTypes[] =
1454 { ATSParser::VIDEO, ATSParser::AUDIO };
1455 LiveSession::StreamType streamTypes[] =
1456 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1457 const size_t kNumTypes = NELEM(srcTypes);
1458
1459 for (size_t i = 0; i < kNumTypes; i++) {
1460 ATSParser::SourceType srcType = srcTypes[i];
1461 LiveSession::StreamType streamType = streamTypes[i];
1462
1463 sp<AnotherPacketSource> source =
1464 static_cast<AnotherPacketSource *>(
1465 mTSParser->getSource(srcType).get());
1466
1467 if (!mTSParser->hasSource(srcType)) {
1468 ALOGW("MPEG2 Transport stream does not contain %s data.",
1469 srcType == ATSParser::VIDEO ? "video" : "audio");
1470
1471 mStreamTypeMask &= ~streamType;
1472 mPacketSources.removeItem(streamType);
1473 }
1474 }
1475
1476 }
1477
1478 if (checkDecryptPadding(buffer) != OK) {
1479 ALOGE("Incorrect padding bytes after decryption.");
1480 notifyError(ERROR_MALFORMED);
1481 return;
1482 }
1483
1484 if (tsBuffer != NULL) {
1485 AString method;
1486 CHECK(buffer->meta()->findString("cipher-method", &method));
1487 if ((tsBuffer->size() > 0 && method == "NONE")
1488 || tsBuffer->size() > 16) {
1489 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1490 "bytes in length.");
1491 notifyError(ERROR_MALFORMED);
1492 return;
1493 }
1494 }
1495
1496 // bulk extract non-ts files
1497 bool startUp = mStartup;
1498 if (tsBuffer == NULL) {
1499 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1500 if (err == -EAGAIN) {
1501 // starting sequence number too low/high
1502 postMonitorQueue();
1503 return;
1504 } else if (err == ERROR_OUT_OF_RANGE) {
1505 // reached stopping point
1506 notifyStopReached();
1507 return;
1508 } else if (err != OK) {
1509 notifyError(err);
1510 return;
1511 }
1512 }
1513
1514 ++mSeqNumber;
1515
1516 // if adapting, pause after found the next starting point
1517 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1518 CHECK(mStartTimeUsNotify != NULL);
1519 mStartTimeUsNotify->post();
1520 mStartTimeUsNotify.clear();
1521 shouldPause = true;
1522 }
1523
1524 if (!shouldPause) {
1525 postMonitorQueue();
1526 }
1527 }
1528
1529 /*
1530 * returns true if we need to adjust mSeqNumber
1531 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1532 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1533 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1534
1535 int64_t minDiffUs, maxDiffUs;
1536 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1537 // if the previous fetcher paused in the middle of a segment, we
1538 // want to start at a segment that overlaps the last sample
1539 minDiffUs = -mPlaylist->getTargetDuration();
1540 maxDiffUs = 0ll;
1541 } else {
1542 // if the previous fetcher paused at the end of a segment, ideally
1543 // we want to start at the segment that's roughly aligned with its
1544 // next segment, but if the two variants are not well aligned we
1545 // adjust the diff to within (-T/2, T/2)
1546 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1547 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1548 }
1549
1550 int32_t oldSeqNumber = mSeqNumber;
1551 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1552
1553 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1554 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1555 if (diffUs > maxDiffUs) {
1556 while (index > 0 && diffUs > maxDiffUs) {
1557 --index;
1558
1559 sp<AMessage> itemMeta;
1560 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1561
1562 int64_t itemDurationUs;
1563 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1564
1565 diffUs -= itemDurationUs;
1566 }
1567 } else if (diffUs < minDiffUs) {
1568 while (index + 1 < (ssize_t) mPlaylist->size()
1569 && diffUs < minDiffUs) {
1570 ++index;
1571
1572 sp<AMessage> itemMeta;
1573 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1574
1575 int64_t itemDurationUs;
1576 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1577
1578 diffUs += itemDurationUs;
1579 }
1580 }
1581
1582 mSeqNumber = firstSeqNumberInPlaylist + index;
1583
1584 if (mSeqNumber != oldSeqNumber) {
1585 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1586 (long long) anchorTimeUs - mStartTimeUs,
1587 (long long) minDiffUs,
1588 (long long) maxDiffUs);
1589 return true;
1590 }
1591 return false;
1592 }
1593
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1594 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1595 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1596
1597 size_t index = 0;
1598 while (index < mPlaylist->size()) {
1599 sp<AMessage> itemMeta;
1600 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1601 size_t curDiscontinuitySeq;
1602 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1603 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1604 if (curDiscontinuitySeq == discontinuitySeq) {
1605 return seqNumber;
1606 } else if (curDiscontinuitySeq > discontinuitySeq) {
1607 return seqNumber <= 0 ? 0 : seqNumber - 1;
1608 }
1609
1610 ++index;
1611 }
1612
1613 return firstSeqNumberInPlaylist + mPlaylist->size();
1614 }
1615
getSeqNumberForTime(int64_t timeUs) const1616 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1617 size_t index = 0;
1618 int64_t segmentStartUs = 0;
1619 while (index < mPlaylist->size()) {
1620 sp<AMessage> itemMeta;
1621 CHECK(mPlaylist->itemAt(
1622 index, NULL /* uri */, &itemMeta));
1623
1624 int64_t itemDurationUs;
1625 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1626
1627 if (timeUs < segmentStartUs + itemDurationUs) {
1628 break;
1629 }
1630
1631 segmentStartUs += itemDurationUs;
1632 ++index;
1633 }
1634
1635 if (index >= mPlaylist->size()) {
1636 index = mPlaylist->size() - 1;
1637 }
1638
1639 return mPlaylist->getFirstSeqNumber() + index;
1640 }
1641
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1642 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1643 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1644 sp<MetaData> format = source->getFormat();
1645 if (format != NULL) {
1646 // for simplicity, store a reference to the format in each unit
1647 accessUnit->meta()->setObject("format", format);
1648 }
1649
1650 if (discard) {
1651 accessUnit->meta()->setInt32("discard", discard);
1652 }
1653
1654 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1655 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1656 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1657 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1658 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1659 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1660 }
1661 return accessUnit;
1662 }
1663
isStartTimeReached(int64_t timeUs)1664 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1665 if (!mFirstPTSValid) {
1666 mFirstTimeUs = timeUs;
1667 mFirstPTSValid = true;
1668 }
1669 bool startTimeReached = true;
1670 if (mStartTimeUsRelative) {
1671 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1672 (long long)timeUs,
1673 (long long)mFirstTimeUs,
1674 (long long)(timeUs - mFirstTimeUs));
1675 timeUs -= mFirstTimeUs;
1676 if (timeUs < 0) {
1677 FLOGV("clamp negative timeUs to 0");
1678 timeUs = 0;
1679 }
1680 startTimeReached = (timeUs >= mStartTimeUs);
1681 }
1682 return startTimeReached;
1683 }
1684
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1685 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1686 if (mTSParser == NULL) {
1687 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1688 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1689 }
1690
1691 if (mNextPTSTimeUs >= 0ll) {
1692 sp<AMessage> extra = new AMessage;
1693 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1694 // ATSParser from skewing the timestamps of access units.
1695 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1696
1697 // When adapting, signal a recent media time to the parser,
1698 // so that PTS wrap around is handled for the new variant.
1699 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1700 extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1701 }
1702
1703 mTSParser->signalDiscontinuity(
1704 ATSParser::DISCONTINUITY_TIME, extra);
1705
1706 mNextPTSTimeUs = -1ll;
1707 }
1708
1709 if (mSampleAesKeyItemChanged) {
1710 mTSParser->signalNewSampleAesKey(mSampleAesKeyItem);
1711 mSampleAesKeyItemChanged = false;
1712 }
1713
1714 size_t offset = 0;
1715 while (offset + 188 <= buffer->size()) {
1716 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1717
1718 if (err != OK) {
1719 return err;
1720 }
1721
1722 offset += 188;
1723 }
1724 // setRange to indicate consumed bytes.
1725 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1726
1727 if (mSegmentFirstPTS < 0ll) {
1728 // get the smallest first PTS from all streams present in this parser
1729 for (size_t i = mPacketSources.size(); i > 0;) {
1730 i--;
1731 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1732 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1733 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1734 return ERROR_MALFORMED;
1735 }
1736 if (stream == LiveSession::STREAMTYPE_METADATA) {
1737 continue;
1738 }
1739 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1740 sp<AnotherPacketSource> source =
1741 static_cast<AnotherPacketSource *>(
1742 mTSParser->getSource(type).get());
1743
1744 if (source == NULL) {
1745 continue;
1746 }
1747 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1748 if (meta != NULL) {
1749 int64_t timeUs;
1750 CHECK(meta->findInt64("timeUs", &timeUs));
1751 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1752 mSegmentFirstPTS = timeUs;
1753 }
1754 }
1755 }
1756 if (mSegmentFirstPTS < 0ll) {
1757 // didn't find any TS packet, can return early
1758 return OK;
1759 }
1760 if (!mStartTimeUsRelative) {
1761 // mStartup
1762 // mStartup is true until we have queued a packet for all the streams
1763 // we are fetching. We queue packets whose timestamps are greater than
1764 // mStartTimeUs.
1765 // mSegmentStartTimeUs >= 0
1766 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1767 // adjustSeqNumberWithAnchorTime(timeUs) == true
1768 // we guessed a seq number that's either too large or too small.
1769 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1770 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1771 // to -1 so that we don't enter this chunk next time.
1772 if (mStartup && mSegmentStartTimeUs >= 0
1773 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1774 mStartTimeUsNotify = mNotify->dup();
1775 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1776 mStartTimeUsNotify->setString("uri", mURI);
1777 mIDRFound = false;
1778 mSegmentStartTimeUs = -1;
1779 return -EAGAIN;
1780 }
1781 }
1782 }
1783
1784 status_t err = OK;
1785 for (size_t i = mPacketSources.size(); i > 0;) {
1786 i--;
1787 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1788
1789 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1790 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1791 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1792 return ERROR_MALFORMED;
1793 }
1794
1795 const char *key = LiveSession::getKeyForStream(stream);
1796 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1797
1798 sp<AnotherPacketSource> source =
1799 static_cast<AnotherPacketSource *>(
1800 mTSParser->getSource(type).get());
1801
1802 if (source == NULL) {
1803 continue;
1804 }
1805
1806 const char *mime;
1807 sp<MetaData> format = source->getFormat();
1808 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1809 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1810
1811 sp<ABuffer> accessUnit;
1812 status_t finalResult;
1813 while (source->hasBufferAvailable(&finalResult)
1814 && source->dequeueAccessUnit(&accessUnit) == OK) {
1815
1816 int64_t timeUs;
1817 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1818
1819 if (mStartup) {
1820 bool startTimeReached = isStartTimeReached(timeUs);
1821
1822 if (!startTimeReached || (isAvc && !mIDRFound)) {
1823 // buffer up to the closest preceding IDR frame in the next segement,
1824 // or the closest succeeding IDR frame after the exact position
1825 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1826 (long long)timeUs,
1827 (long long)mStartTimeUs,
1828 (long long)timeUs - mStartTimeUs,
1829 mIDRFound);
1830 if (isAvc) {
1831 if (IsIDR(accessUnit)) {
1832 mVideoBuffer->clear();
1833 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1834 mIDRFound = true;
1835 }
1836 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1837 mVideoBuffer->queueAccessUnit(accessUnit);
1838 FSLOGV(stream, "saving AVC video AccessUnit");
1839 }
1840 }
1841 if (!startTimeReached || (isAvc && !mIDRFound)) {
1842 continue;
1843 }
1844 }
1845 }
1846
1847 if (mStartTimeUsNotify != NULL) {
1848 uint32_t streamMask = 0;
1849 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1850 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1851 && !(streamMask & mPacketSources.keyAt(i))) {
1852 streamMask |= mPacketSources.keyAt(i);
1853 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1854 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1855 (long long)timeUs, streamMask);
1856
1857 if (streamMask == mStreamTypeMask) {
1858 FLOGV("found start point for all streams");
1859 mStartup = false;
1860 }
1861 }
1862 }
1863
1864 if (mStopParams != NULL) {
1865 int32_t discontinuitySeq;
1866 int64_t stopTimeUs;
1867 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1868 || discontinuitySeq > mDiscontinuitySeq
1869 || !mStopParams->findInt64(key, &stopTimeUs)
1870 || (discontinuitySeq == mDiscontinuitySeq
1871 && timeUs >= stopTimeUs)) {
1872 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1873 mStreamTypeMask &= ~stream;
1874 mPacketSources.removeItemsAt(i);
1875 break;
1876 }
1877 }
1878
1879 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1880 const bool discard = true;
1881 status_t status;
1882 while (mVideoBuffer->hasBufferAvailable(&status)) {
1883 sp<ABuffer> videoBuffer;
1884 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1885 setAccessUnitProperties(videoBuffer, source, discard);
1886 packetSource->queueAccessUnit(videoBuffer);
1887 int64_t bufferTimeUs;
1888 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1889 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1890 (long long)bufferTimeUs);
1891 }
1892 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1893 mHasMetadata = true;
1894 sp<AMessage> notify = mNotify->dup();
1895 notify->setInt32("what", kWhatMetadataDetected);
1896 notify->post();
1897 }
1898
1899 setAccessUnitProperties(accessUnit, source);
1900 packetSource->queueAccessUnit(accessUnit);
1901 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1902 }
1903
1904 if (err != OK) {
1905 break;
1906 }
1907 }
1908
1909 if (err != OK) {
1910 for (size_t i = mPacketSources.size(); i > 0;) {
1911 i--;
1912 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1913 packetSource->clear();
1914 }
1915 return err;
1916 }
1917
1918 if (!mStreamTypeMask) {
1919 // Signal gap is filled between original and new stream.
1920 FLOGV("reached stop point for all streams");
1921 return ERROR_OUT_OF_RANGE;
1922 }
1923
1924 return OK;
1925 }
1926
1927 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1928 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1929 const sp<ABuffer> &buffer) {
1930 size_t pos = 0;
1931
1932 // skip possible BOM
1933 if (buffer->size() >= pos + 3 &&
1934 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1935 pos += 3;
1936 }
1937
1938 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1939 if (buffer->size() < pos + 6 ||
1940 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1941 return false;
1942 }
1943 pos += 6;
1944
1945 if (buffer->size() == pos) {
1946 return true;
1947 }
1948
1949 uint8_t sep = buffer->data()[pos];
1950 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1951 }
1952
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1953 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1954 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1955 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1956 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1957 ALOGE("This stream only contains subtitles.");
1958 return ERROR_MALFORMED;
1959 }
1960
1961 const sp<AnotherPacketSource> packetSource =
1962 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1963
1964 int64_t durationUs;
1965 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1966 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1967 buffer->meta()->setInt64("durationUs", durationUs);
1968 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1969 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1970 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1971 packetSource->queueAccessUnit(buffer);
1972 return OK;
1973 }
1974
1975 if (mNextPTSTimeUs >= 0ll) {
1976 mNextPTSTimeUs = -1ll;
1977 }
1978
1979 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1980 // stream prefixed by an ID3 tag.
1981
1982 bool firstID3Tag = true;
1983 uint64_t PTS = 0;
1984
1985 for (;;) {
1986 // Make sure to skip all ID3 tags preceding the audio data.
1987 // At least one must be present to provide the PTS timestamp.
1988
1989 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1990 if (!id3.isValid()) {
1991 if (firstID3Tag) {
1992 ALOGE("Unable to parse ID3 tag.");
1993 return ERROR_MALFORMED;
1994 } else {
1995 break;
1996 }
1997 }
1998
1999 if (firstID3Tag) {
2000 bool found = false;
2001
2002 ID3::Iterator it(id3, "PRIV");
2003 while (!it.done()) {
2004 size_t length;
2005 const uint8_t *data = it.getData(&length);
2006 if (!data) {
2007 return ERROR_MALFORMED;
2008 }
2009
2010 static const char *kMatchName =
2011 "com.apple.streaming.transportStreamTimestamp";
2012 static const size_t kMatchNameLen = strlen(kMatchName);
2013
2014 if (length == kMatchNameLen + 1 + 8
2015 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
2016 found = true;
2017 PTS = U64_AT(&data[kMatchNameLen + 1]);
2018 }
2019
2020 it.next();
2021 }
2022
2023 if (!found) {
2024 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
2025 return ERROR_MALFORMED;
2026 }
2027 }
2028
2029 // skip the ID3 tag
2030 buffer->setRange(
2031 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
2032
2033 firstID3Tag = false;
2034 }
2035
2036 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
2037 ALOGW("This stream only contains audio data!");
2038
2039 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
2040
2041 if (mStreamTypeMask == 0) {
2042 return OK;
2043 }
2044 }
2045
2046 sp<AnotherPacketSource> packetSource =
2047 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
2048
2049 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
2050 ABitReader bits(buffer->data(), buffer->size());
2051
2052 // adts_fixed_header
2053
2054 CHECK_EQ(bits.getBits(12), 0xfffu);
2055 bits.skipBits(3); // ID, layer
2056 bool protection_absent __unused = bits.getBits(1) != 0;
2057
2058 unsigned profile = bits.getBits(2);
2059 CHECK_NE(profile, 3u);
2060 unsigned sampling_freq_index = bits.getBits(4);
2061 bits.getBits(1); // private_bit
2062 unsigned channel_configuration = bits.getBits(3);
2063 CHECK_NE(channel_configuration, 0u);
2064 bits.skipBits(2); // original_copy, home
2065
2066 sp<MetaData> meta = MakeAACCodecSpecificData(
2067 profile, sampling_freq_index, channel_configuration);
2068
2069 meta->setInt32(kKeyIsADTS, true);
2070
2071 packetSource->setFormat(meta);
2072 }
2073
2074 int64_t numSamples = 0ll;
2075 int32_t sampleRate;
2076 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
2077
2078 int64_t timeUs = (PTS * 100ll) / 9ll;
2079 if (mStartup && !mFirstPTSValid) {
2080 mFirstPTSValid = true;
2081 mFirstTimeUs = timeUs;
2082 }
2083
2084 if (mSegmentFirstPTS < 0ll) {
2085 mSegmentFirstPTS = timeUs;
2086 if (!mStartTimeUsRelative) {
2087 // Duplicated logic from how we handle .ts playlists.
2088 if (mStartup && mSegmentStartTimeUs >= 0
2089 && adjustSeqNumberWithAnchorTime(timeUs)) {
2090 mSegmentStartTimeUs = -1;
2091 return -EAGAIN;
2092 }
2093 }
2094 }
2095
2096 sp<HlsSampleDecryptor> sampleDecryptor = NULL;
2097 if (mSampleAesKeyItem != NULL) {
2098 ALOGV("extractAndQueueAccessUnits[%d] SampleAesKeyItem: Key: %s IV: %s",
2099 mSeqNumber,
2100 HlsSampleDecryptor::aesBlockToStr(mKeyData).c_str(),
2101 HlsSampleDecryptor::aesBlockToStr(mAESInitVec).c_str());
2102
2103 sampleDecryptor = new HlsSampleDecryptor(mSampleAesKeyItem);
2104 }
2105
2106 int frameId = 0;
2107
2108 size_t offset = 0;
2109 while (offset < buffer->size()) {
2110 const uint8_t *adtsHeader = buffer->data() + offset;
2111 CHECK_LT(offset + 5, buffer->size());
2112 // non-const pointer for decryption if needed
2113 uint8_t *adtsFrame = buffer->data() + offset;
2114
2115 unsigned aac_frame_length =
2116 ((adtsHeader[3] & 3) << 11)
2117 | (adtsHeader[4] << 3)
2118 | (adtsHeader[5] >> 5);
2119
2120 if (aac_frame_length == 0) {
2121 const uint8_t *id3Header = adtsHeader;
2122 if (!memcmp(id3Header, "ID3", 3)) {
2123 ID3 id3(id3Header, buffer->size() - offset, true);
2124 if (id3.isValid()) {
2125 offset += id3.rawSize();
2126 continue;
2127 };
2128 }
2129 return ERROR_MALFORMED;
2130 }
2131
2132 CHECK_LE(offset + aac_frame_length, buffer->size());
2133
2134 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2135 offset += aac_frame_length;
2136
2137 // Each AAC frame encodes 1024 samples.
2138 numSamples += 1024;
2139
2140 if (mStartup) {
2141 int64_t startTimeUs = unitTimeUs;
2142 if (mStartTimeUsRelative) {
2143 startTimeUs -= mFirstTimeUs;
2144 if (startTimeUs < 0) {
2145 startTimeUs = 0;
2146 }
2147 }
2148 if (startTimeUs < mStartTimeUs) {
2149 continue;
2150 }
2151
2152 if (mStartTimeUsNotify != NULL) {
2153 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2154 mStartup = false;
2155 }
2156 }
2157
2158 if (mStopParams != NULL) {
2159 int32_t discontinuitySeq;
2160 int64_t stopTimeUs;
2161 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2162 || discontinuitySeq > mDiscontinuitySeq
2163 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2164 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2165 mStreamTypeMask = 0;
2166 mPacketSources.clear();
2167 return ERROR_OUT_OF_RANGE;
2168 }
2169 }
2170
2171 if (sampleDecryptor != NULL) {
2172 bool protection_absent = (adtsHeader[1] & 0x1);
2173 size_t headerSize = protection_absent ? 7 : 9;
2174 if (frameId == 0) {
2175 ALOGV("extractAndQueueAAC[%d] protection_absent %d (%02x) headerSize %zu",
2176 mSeqNumber, protection_absent, adtsHeader[1], headerSize);
2177 }
2178
2179 sampleDecryptor->processAAC(headerSize, adtsFrame, aac_frame_length);
2180 }
2181 frameId++;
2182
2183 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2184 memcpy(unit->data(), adtsHeader, aac_frame_length);
2185
2186 unit->meta()->setInt64("timeUs", unitTimeUs);
2187 setAccessUnitProperties(unit, packetSource);
2188 packetSource->queueAccessUnit(unit);
2189 }
2190
2191 return OK;
2192 }
2193
updateDuration()2194 void PlaylistFetcher::updateDuration() {
2195 int64_t durationUs = 0ll;
2196 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2197 sp<AMessage> itemMeta;
2198 CHECK(mPlaylist->itemAt(
2199 index, NULL /* uri */, &itemMeta));
2200
2201 int64_t itemDurationUs;
2202 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2203
2204 durationUs += itemDurationUs;
2205 }
2206
2207 sp<AMessage> msg = mNotify->dup();
2208 msg->setInt32("what", kWhatDurationUpdate);
2209 msg->setInt64("durationUs", durationUs);
2210 msg->post();
2211 }
2212
updateTargetDuration()2213 void PlaylistFetcher::updateTargetDuration() {
2214 sp<AMessage> msg = mNotify->dup();
2215 msg->setInt32("what", kWhatTargetDurationUpdate);
2216 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2217 msg->post();
2218 }
2219
2220 } // namespace android
2221