1 /*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "PlaylistFetcher"
19 #include <utils/Log.h>
20 #include <utils/misc.h>
21
22 #include "PlaylistFetcher.h"
23 #include "HTTPDownloader.h"
24 #include "LiveSession.h"
25 #include "M3UParser.h"
26 #include "include/avc_utils.h"
27 #include "include/ID3.h"
28 #include "mpeg2ts/AnotherPacketSource.h"
29
30 #include <media/stagefright/foundation/ABitReader.h>
31 #include <media/stagefright/foundation/ABuffer.h>
32 #include <media/stagefright/foundation/ADebug.h>
33 #include <media/stagefright/MediaDefs.h>
34 #include <media/stagefright/MetaData.h>
35 #include <media/stagefright/Utils.h>
36
37 #include <ctype.h>
38 #include <inttypes.h>
39 #include <openssl/aes.h>
40
41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
43 LiveSession::getNameForStream(stream), ##__VA_ARGS__)
44
45 namespace android {
46
47 // static
48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
50 // LCM of 188 (size of a TS packet) & 1k works well
51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
52
53 struct PlaylistFetcher::DownloadState : public RefBase {
54 DownloadState();
55 void resetState();
56 bool hasSavedState() const;
57 void restoreState(
58 AString &uri,
59 sp<AMessage> &itemMeta,
60 sp<ABuffer> &buffer,
61 sp<ABuffer> &tsBuffer,
62 int32_t &firstSeqNumberInPlaylist,
63 int32_t &lastSeqNumberInPlaylist);
64 void saveState(
65 AString &uri,
66 sp<AMessage> &itemMeta,
67 sp<ABuffer> &buffer,
68 sp<ABuffer> &tsBuffer,
69 int32_t &firstSeqNumberInPlaylist,
70 int32_t &lastSeqNumberInPlaylist);
71
72 private:
73 bool mHasSavedState;
74 AString mUri;
75 sp<AMessage> mItemMeta;
76 sp<ABuffer> mBuffer;
77 sp<ABuffer> mTsBuffer;
78 int32_t mFirstSeqNumberInPlaylist;
79 int32_t mLastSeqNumberInPlaylist;
80 };
81
DownloadState()82 PlaylistFetcher::DownloadState::DownloadState() {
83 resetState();
84 }
85
hasSavedState() const86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
87 return mHasSavedState;
88 }
89
resetState()90 void PlaylistFetcher::DownloadState::resetState() {
91 mHasSavedState = false;
92
93 mUri.clear();
94 mItemMeta = NULL;
95 mBuffer = NULL;
96 mTsBuffer = NULL;
97 mFirstSeqNumberInPlaylist = 0;
98 mLastSeqNumberInPlaylist = 0;
99 }
100
restoreState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)101 void PlaylistFetcher::DownloadState::restoreState(
102 AString &uri,
103 sp<AMessage> &itemMeta,
104 sp<ABuffer> &buffer,
105 sp<ABuffer> &tsBuffer,
106 int32_t &firstSeqNumberInPlaylist,
107 int32_t &lastSeqNumberInPlaylist) {
108 if (!mHasSavedState) {
109 return;
110 }
111
112 uri = mUri;
113 itemMeta = mItemMeta;
114 buffer = mBuffer;
115 tsBuffer = mTsBuffer;
116 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
117 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
118
119 resetState();
120 }
121
saveState(AString & uri,sp<AMessage> & itemMeta,sp<ABuffer> & buffer,sp<ABuffer> & tsBuffer,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)122 void PlaylistFetcher::DownloadState::saveState(
123 AString &uri,
124 sp<AMessage> &itemMeta,
125 sp<ABuffer> &buffer,
126 sp<ABuffer> &tsBuffer,
127 int32_t &firstSeqNumberInPlaylist,
128 int32_t &lastSeqNumberInPlaylist) {
129 mHasSavedState = true;
130
131 mUri = uri;
132 mItemMeta = itemMeta;
133 mBuffer = buffer;
134 mTsBuffer = tsBuffer;
135 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
136 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
137 }
138
PlaylistFetcher(const sp<AMessage> & notify,const sp<LiveSession> & session,const char * uri,int32_t id,int32_t subtitleGeneration)139 PlaylistFetcher::PlaylistFetcher(
140 const sp<AMessage> ¬ify,
141 const sp<LiveSession> &session,
142 const char *uri,
143 int32_t id,
144 int32_t subtitleGeneration)
145 : mNotify(notify),
146 mSession(session),
147 mURI(uri),
148 mFetcherID(id),
149 mStreamTypeMask(0),
150 mStartTimeUs(-1ll),
151 mSegmentStartTimeUs(-1ll),
152 mDiscontinuitySeq(-1ll),
153 mStartTimeUsRelative(false),
154 mLastPlaylistFetchTimeUs(-1ll),
155 mPlaylistTimeUs(-1ll),
156 mSeqNumber(-1),
157 mNumRetries(0),
158 mStartup(true),
159 mIDRFound(false),
160 mSeekMode(LiveSession::kSeekModeExactPosition),
161 mTimeChangeSignaled(false),
162 mNextPTSTimeUs(-1ll),
163 mMonitorQueueGeneration(0),
164 mSubtitleGeneration(subtitleGeneration),
165 mLastDiscontinuitySeq(-1ll),
166 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
167 mFirstPTSValid(false),
168 mFirstTimeUs(-1ll),
169 mVideoBuffer(new AnotherPacketSource(NULL)),
170 mThresholdRatio(-1.0f),
171 mDownloadState(new DownloadState()),
172 mHasMetadata(false) {
173 memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
174 mHTTPDownloader = mSession->getHTTPDownloader();
175 }
176
~PlaylistFetcher()177 PlaylistFetcher::~PlaylistFetcher() {
178 }
179
getFetcherID() const180 int32_t PlaylistFetcher::getFetcherID() const {
181 return mFetcherID;
182 }
183
getSegmentStartTimeUs(int32_t seqNumber) const184 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
185 CHECK(mPlaylist != NULL);
186
187 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
188 mPlaylist->getSeqNumberRange(
189 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
190
191 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
192 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
193
194 int64_t segmentStartUs = 0ll;
195 for (int32_t index = 0;
196 index < seqNumber - firstSeqNumberInPlaylist; ++index) {
197 sp<AMessage> itemMeta;
198 CHECK(mPlaylist->itemAt(
199 index, NULL /* uri */, &itemMeta));
200
201 int64_t itemDurationUs;
202 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
203
204 segmentStartUs += itemDurationUs;
205 }
206
207 return segmentStartUs;
208 }
209
getSegmentDurationUs(int32_t seqNumber) const210 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
211 CHECK(mPlaylist != NULL);
212
213 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
214 mPlaylist->getSeqNumberRange(
215 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
216
217 CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
218 CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
219
220 int32_t index = seqNumber - firstSeqNumberInPlaylist;
221 sp<AMessage> itemMeta;
222 CHECK(mPlaylist->itemAt(
223 index, NULL /* uri */, &itemMeta));
224
225 int64_t itemDurationUs;
226 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
227
228 return itemDurationUs;
229 }
230
delayUsToRefreshPlaylist() const231 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
232 int64_t nowUs = ALooper::GetNowUs();
233
234 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
235 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
236 return 0ll;
237 }
238
239 if (mPlaylist->isComplete()) {
240 return (~0llu >> 1);
241 }
242
243 int64_t targetDurationUs = mPlaylist->getTargetDuration();
244
245 int64_t minPlaylistAgeUs;
246
247 switch (mRefreshState) {
248 case INITIAL_MINIMUM_RELOAD_DELAY:
249 {
250 size_t n = mPlaylist->size();
251 if (n > 0) {
252 sp<AMessage> itemMeta;
253 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
254
255 int64_t itemDurationUs;
256 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
257
258 minPlaylistAgeUs = itemDurationUs;
259 break;
260 }
261
262 // fall through
263 }
264
265 case FIRST_UNCHANGED_RELOAD_ATTEMPT:
266 {
267 minPlaylistAgeUs = targetDurationUs / 2;
268 break;
269 }
270
271 case SECOND_UNCHANGED_RELOAD_ATTEMPT:
272 {
273 minPlaylistAgeUs = (targetDurationUs * 3) / 2;
274 break;
275 }
276
277 case THIRD_UNCHANGED_RELOAD_ATTEMPT:
278 {
279 minPlaylistAgeUs = targetDurationUs * 3;
280 break;
281 }
282
283 default:
284 TRESPASS();
285 break;
286 }
287
288 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
289 return delayUs > 0ll ? delayUs : 0ll;
290 }
291
decryptBuffer(size_t playlistIndex,const sp<ABuffer> & buffer,bool first)292 status_t PlaylistFetcher::decryptBuffer(
293 size_t playlistIndex, const sp<ABuffer> &buffer,
294 bool first) {
295 sp<AMessage> itemMeta;
296 bool found = false;
297 AString method;
298
299 for (ssize_t i = playlistIndex; i >= 0; --i) {
300 AString uri;
301 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
302
303 if (itemMeta->findString("cipher-method", &method)) {
304 found = true;
305 break;
306 }
307 }
308
309 if (!found) {
310 method = "NONE";
311 }
312 buffer->meta()->setString("cipher-method", method.c_str());
313
314 if (method == "NONE") {
315 return OK;
316 } else if (!(method == "AES-128")) {
317 ALOGE("Unsupported cipher method '%s'", method.c_str());
318 return ERROR_UNSUPPORTED;
319 }
320
321 AString keyURI;
322 if (!itemMeta->findString("cipher-uri", &keyURI)) {
323 ALOGE("Missing key uri");
324 return ERROR_MALFORMED;
325 }
326
327 ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
328
329 sp<ABuffer> key;
330 if (index >= 0) {
331 key = mAESKeyForURI.valueAt(index);
332 } else {
333 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
334
335 if (err == ERROR_NOT_CONNECTED) {
336 return ERROR_NOT_CONNECTED;
337 } else if (err < 0) {
338 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
339 return ERROR_IO;
340 } else if (key->size() != 16) {
341 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
342 return ERROR_MALFORMED;
343 }
344
345 mAESKeyForURI.add(keyURI, key);
346 }
347
348 AES_KEY aes_key;
349 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
350 ALOGE("failed to set AES decryption key.");
351 return UNKNOWN_ERROR;
352 }
353
354 size_t n = buffer->size();
355 if (!n) {
356 return OK;
357 }
358 CHECK(n % 16 == 0);
359
360 if (first) {
361 // If decrypting the first block in a file, read the iv from the manifest
362 // or derive the iv from the file's sequence number.
363
364 AString iv;
365 if (itemMeta->findString("cipher-iv", &iv)) {
366 if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
367 || iv.size() != 16 * 2 + 2) {
368 ALOGE("malformed cipher IV '%s'.", iv.c_str());
369 return ERROR_MALFORMED;
370 }
371
372 memset(mAESInitVec, 0, sizeof(mAESInitVec));
373 for (size_t i = 0; i < 16; ++i) {
374 char c1 = tolower(iv.c_str()[2 + 2 * i]);
375 char c2 = tolower(iv.c_str()[3 + 2 * i]);
376 if (!isxdigit(c1) || !isxdigit(c2)) {
377 ALOGE("malformed cipher IV '%s'.", iv.c_str());
378 return ERROR_MALFORMED;
379 }
380 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
381 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
382
383 mAESInitVec[i] = nibble1 << 4 | nibble2;
384 }
385 } else {
386 memset(mAESInitVec, 0, sizeof(mAESInitVec));
387 mAESInitVec[15] = mSeqNumber & 0xff;
388 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
389 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
390 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
391 }
392 }
393
394 AES_cbc_encrypt(
395 buffer->data(), buffer->data(), buffer->size(),
396 &aes_key, mAESInitVec, AES_DECRYPT);
397
398 return OK;
399 }
400
checkDecryptPadding(const sp<ABuffer> & buffer)401 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
402 AString method;
403 CHECK(buffer->meta()->findString("cipher-method", &method));
404 if (method == "NONE") {
405 return OK;
406 }
407
408 uint8_t padding = 0;
409 if (buffer->size() > 0) {
410 padding = buffer->data()[buffer->size() - 1];
411 }
412
413 if (padding > 16) {
414 return ERROR_MALFORMED;
415 }
416
417 for (size_t i = buffer->size() - padding; i < padding; i++) {
418 if (buffer->data()[i] != padding) {
419 return ERROR_MALFORMED;
420 }
421 }
422
423 buffer->setRange(buffer->offset(), buffer->size() - padding);
424 return OK;
425 }
426
postMonitorQueue(int64_t delayUs,int64_t minDelayUs)427 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
428 int64_t maxDelayUs = delayUsToRefreshPlaylist();
429 if (maxDelayUs < minDelayUs) {
430 maxDelayUs = minDelayUs;
431 }
432 if (delayUs > maxDelayUs) {
433 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
434 delayUs = maxDelayUs;
435 }
436 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
437 msg->setInt32("generation", mMonitorQueueGeneration);
438 msg->post(delayUs);
439 }
440
cancelMonitorQueue()441 void PlaylistFetcher::cancelMonitorQueue() {
442 ++mMonitorQueueGeneration;
443 }
444
setStoppingThreshold(float thresholdRatio,bool disconnect)445 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
446 {
447 AutoMutex _l(mThresholdLock);
448 mThresholdRatio = thresholdRatio;
449 }
450 if (disconnect) {
451 mHTTPDownloader->disconnect();
452 }
453 }
454
resetStoppingThreshold(bool disconnect)455 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
456 {
457 AutoMutex _l(mThresholdLock);
458 mThresholdRatio = -1.0f;
459 }
460 if (disconnect) {
461 mHTTPDownloader->disconnect();
462 } else {
463 // allow reconnect
464 mHTTPDownloader->reconnect();
465 }
466 }
467
getStoppingThreshold()468 float PlaylistFetcher::getStoppingThreshold() {
469 AutoMutex _l(mThresholdLock);
470 return mThresholdRatio;
471 }
472
startAsync(const sp<AnotherPacketSource> & audioSource,const sp<AnotherPacketSource> & videoSource,const sp<AnotherPacketSource> & subtitleSource,const sp<AnotherPacketSource> & metadataSource,int64_t startTimeUs,int64_t segmentStartTimeUs,int32_t startDiscontinuitySeq,LiveSession::SeekMode seekMode)473 void PlaylistFetcher::startAsync(
474 const sp<AnotherPacketSource> &audioSource,
475 const sp<AnotherPacketSource> &videoSource,
476 const sp<AnotherPacketSource> &subtitleSource,
477 const sp<AnotherPacketSource> &metadataSource,
478 int64_t startTimeUs,
479 int64_t segmentStartTimeUs,
480 int32_t startDiscontinuitySeq,
481 LiveSession::SeekMode seekMode) {
482 sp<AMessage> msg = new AMessage(kWhatStart, this);
483
484 uint32_t streamTypeMask = 0ul;
485
486 if (audioSource != NULL) {
487 msg->setPointer("audioSource", audioSource.get());
488 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
489 }
490
491 if (videoSource != NULL) {
492 msg->setPointer("videoSource", videoSource.get());
493 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
494 }
495
496 if (subtitleSource != NULL) {
497 msg->setPointer("subtitleSource", subtitleSource.get());
498 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
499 }
500
501 if (metadataSource != NULL) {
502 msg->setPointer("metadataSource", metadataSource.get());
503 // metadataSource does not affect streamTypeMask.
504 }
505
506 msg->setInt32("streamTypeMask", streamTypeMask);
507 msg->setInt64("startTimeUs", startTimeUs);
508 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
509 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
510 msg->setInt32("seekMode", seekMode);
511 msg->post();
512 }
513
514 /*
515 * pauseAsync
516 *
517 * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
518 * -1.0f - pause after finishing current segment
519 * 0.0~1.0f - pause if remaining of current segment exceeds threshold
520 */
pauseAsync(float thresholdRatio,bool disconnect)521 void PlaylistFetcher::pauseAsync(
522 float thresholdRatio, bool disconnect) {
523 setStoppingThreshold(thresholdRatio, disconnect);
524
525 (new AMessage(kWhatPause, this))->post();
526 }
527
stopAsync(bool clear)528 void PlaylistFetcher::stopAsync(bool clear) {
529 setStoppingThreshold(0.0f, true /* disconncect */);
530
531 sp<AMessage> msg = new AMessage(kWhatStop, this);
532 msg->setInt32("clear", clear);
533 msg->post();
534 }
535
resumeUntilAsync(const sp<AMessage> & params)536 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) {
537 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
538
539 AMessage* msg = new AMessage(kWhatResumeUntil, this);
540 msg->setMessage("params", params);
541 msg->post();
542 }
543
fetchPlaylistAsync()544 void PlaylistFetcher::fetchPlaylistAsync() {
545 (new AMessage(kWhatFetchPlaylist, this))->post();
546 }
547
onMessageReceived(const sp<AMessage> & msg)548 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
549 switch (msg->what()) {
550 case kWhatStart:
551 {
552 status_t err = onStart(msg);
553
554 sp<AMessage> notify = mNotify->dup();
555 notify->setInt32("what", kWhatStarted);
556 notify->setInt32("err", err);
557 notify->post();
558 break;
559 }
560
561 case kWhatPause:
562 {
563 onPause();
564
565 sp<AMessage> notify = mNotify->dup();
566 notify->setInt32("what", kWhatPaused);
567 notify->setInt32("seekMode",
568 mDownloadState->hasSavedState()
569 ? LiveSession::kSeekModeNextSample
570 : LiveSession::kSeekModeNextSegment);
571 notify->post();
572 break;
573 }
574
575 case kWhatStop:
576 {
577 onStop(msg);
578
579 sp<AMessage> notify = mNotify->dup();
580 notify->setInt32("what", kWhatStopped);
581 notify->post();
582 break;
583 }
584
585 case kWhatFetchPlaylist:
586 {
587 bool unchanged;
588 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
589 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
590
591 sp<AMessage> notify = mNotify->dup();
592 notify->setInt32("what", kWhatPlaylistFetched);
593 notify->setObject("playlist", playlist);
594 notify->post();
595 break;
596 }
597
598 case kWhatMonitorQueue:
599 case kWhatDownloadNext:
600 {
601 int32_t generation;
602 CHECK(msg->findInt32("generation", &generation));
603
604 if (generation != mMonitorQueueGeneration) {
605 // Stale event
606 break;
607 }
608
609 if (msg->what() == kWhatMonitorQueue) {
610 onMonitorQueue();
611 } else {
612 onDownloadNext();
613 }
614 break;
615 }
616
617 case kWhatResumeUntil:
618 {
619 onResumeUntil(msg);
620 break;
621 }
622
623 default:
624 TRESPASS();
625 }
626 }
627
onStart(const sp<AMessage> & msg)628 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
629 mPacketSources.clear();
630 mStopParams.clear();
631 mStartTimeUsNotify = mNotify->dup();
632 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
633 mStartTimeUsNotify->setString("uri", mURI);
634
635 uint32_t streamTypeMask;
636 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
637
638 int64_t startTimeUs;
639 int64_t segmentStartTimeUs;
640 int32_t startDiscontinuitySeq;
641 int32_t seekMode;
642 CHECK(msg->findInt64("startTimeUs", &startTimeUs));
643 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
644 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
645 CHECK(msg->findInt32("seekMode", &seekMode));
646
647 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
648 void *ptr;
649 CHECK(msg->findPointer("audioSource", &ptr));
650
651 mPacketSources.add(
652 LiveSession::STREAMTYPE_AUDIO,
653 static_cast<AnotherPacketSource *>(ptr));
654 }
655
656 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
657 void *ptr;
658 CHECK(msg->findPointer("videoSource", &ptr));
659
660 mPacketSources.add(
661 LiveSession::STREAMTYPE_VIDEO,
662 static_cast<AnotherPacketSource *>(ptr));
663 }
664
665 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
666 void *ptr;
667 CHECK(msg->findPointer("subtitleSource", &ptr));
668
669 mPacketSources.add(
670 LiveSession::STREAMTYPE_SUBTITLES,
671 static_cast<AnotherPacketSource *>(ptr));
672 }
673
674 void *ptr;
675 // metadataSource is not part of streamTypeMask
676 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
677 && msg->findPointer("metadataSource", &ptr)) {
678 mPacketSources.add(
679 LiveSession::STREAMTYPE_METADATA,
680 static_cast<AnotherPacketSource *>(ptr));
681 }
682
683 mStreamTypeMask = streamTypeMask;
684
685 mSegmentStartTimeUs = segmentStartTimeUs;
686
687 if (startDiscontinuitySeq >= 0) {
688 mDiscontinuitySeq = startDiscontinuitySeq;
689 }
690
691 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
692 mSeekMode = (LiveSession::SeekMode) seekMode;
693
694 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
695 mStartup = true;
696 mIDRFound = false;
697 mVideoBuffer->clear();
698 }
699
700 if (startTimeUs >= 0) {
701 mStartTimeUs = startTimeUs;
702 mFirstPTSValid = false;
703 mSeqNumber = -1;
704 mTimeChangeSignaled = false;
705 mDownloadState->resetState();
706 }
707
708 postMonitorQueue();
709
710 return OK;
711 }
712
onPause()713 void PlaylistFetcher::onPause() {
714 cancelMonitorQueue();
715 mLastDiscontinuitySeq = mDiscontinuitySeq;
716
717 resetStoppingThreshold(false /* disconnect */);
718 }
719
onStop(const sp<AMessage> & msg)720 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
721 cancelMonitorQueue();
722
723 int32_t clear;
724 CHECK(msg->findInt32("clear", &clear));
725 if (clear) {
726 for (size_t i = 0; i < mPacketSources.size(); i++) {
727 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
728 packetSource->clear();
729 }
730 }
731
732 mDownloadState->resetState();
733 mPacketSources.clear();
734 mStreamTypeMask = 0;
735
736 resetStoppingThreshold(true /* disconnect */);
737 }
738
739 // Resume until we have reached the boundary timestamps listed in `msg`; when
740 // the remaining time is too short (within a resume threshold) stop immediately
741 // instead.
onResumeUntil(const sp<AMessage> & msg)742 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
743 sp<AMessage> params;
744 CHECK(msg->findMessage("params", ¶ms));
745
746 mStopParams = params;
747 onDownloadNext();
748
749 return OK;
750 }
751
notifyStopReached()752 void PlaylistFetcher::notifyStopReached() {
753 sp<AMessage> notify = mNotify->dup();
754 notify->setInt32("what", kWhatStopReached);
755 notify->post();
756 }
757
notifyError(status_t err)758 void PlaylistFetcher::notifyError(status_t err) {
759 sp<AMessage> notify = mNotify->dup();
760 notify->setInt32("what", kWhatError);
761 notify->setInt32("err", err);
762 notify->post();
763 }
764
queueDiscontinuity(ATSParser::DiscontinuityType type,const sp<AMessage> & extra)765 void PlaylistFetcher::queueDiscontinuity(
766 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
767 for (size_t i = 0; i < mPacketSources.size(); ++i) {
768 // do not discard buffer upon #EXT-X-DISCONTINUITY tag
769 // (seek will discard buffer by abandoning old fetchers)
770 mPacketSources.valueAt(i)->queueDiscontinuity(
771 type, extra, false /* discard */);
772 }
773 }
774
onMonitorQueue()775 void PlaylistFetcher::onMonitorQueue() {
776 // in the middle of an unfinished download, delay
777 // playlist refresh as it'll change seq numbers
778 if (!mDownloadState->hasSavedState()) {
779 refreshPlaylist();
780 }
781
782 int64_t targetDurationUs = kMinBufferedDurationUs;
783 if (mPlaylist != NULL) {
784 targetDurationUs = mPlaylist->getTargetDuration();
785 }
786
787 int64_t bufferedDurationUs = 0ll;
788 status_t finalResult = OK;
789 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
790 sp<AnotherPacketSource> packetSource =
791 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
792
793 bufferedDurationUs =
794 packetSource->getBufferedDurationUs(&finalResult);
795 } else {
796 // Use min stream duration, but ignore streams that never have any packet
797 // enqueued to prevent us from waiting on a non-existent stream;
798 // when we cannot make out from the manifest what streams are included in
799 // a playlist we might assume extra streams.
800 bufferedDurationUs = -1ll;
801 for (size_t i = 0; i < mPacketSources.size(); ++i) {
802 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
803 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
804 continue;
805 }
806
807 int64_t bufferedStreamDurationUs =
808 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
809
810 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
811
812 if (bufferedDurationUs == -1ll
813 || bufferedStreamDurationUs < bufferedDurationUs) {
814 bufferedDurationUs = bufferedStreamDurationUs;
815 }
816 }
817 if (bufferedDurationUs == -1ll) {
818 bufferedDurationUs = 0ll;
819 }
820 }
821
822 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
823 FLOGV("monitoring, buffered=%lld < %lld",
824 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
825
826 // delay the next download slightly; hopefully this gives other concurrent fetchers
827 // a better chance to run.
828 // onDownloadNext();
829 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
830 msg->setInt32("generation", mMonitorQueueGeneration);
831 msg->post(1000l);
832 } else {
833 // We'd like to maintain buffering above durationToBufferUs, so try
834 // again when buffer just about to go below durationToBufferUs
835 // (or after targetDurationUs / 2, whichever is smaller).
836 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
837 if (delayUs > targetDurationUs / 2) {
838 delayUs = targetDurationUs / 2;
839 }
840
841 FLOGV("pausing for %lld, buffered=%lld > %lld",
842 (long long)delayUs,
843 (long long)bufferedDurationUs,
844 (long long)kMinBufferedDurationUs);
845
846 postMonitorQueue(delayUs);
847 }
848 }
849
refreshPlaylist()850 status_t PlaylistFetcher::refreshPlaylist() {
851 if (delayUsToRefreshPlaylist() <= 0) {
852 bool unchanged;
853 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
854 mURI.c_str(), mPlaylistHash, &unchanged);
855
856 if (playlist == NULL) {
857 if (unchanged) {
858 // We succeeded in fetching the playlist, but it was
859 // unchanged from the last time we tried.
860
861 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
862 mRefreshState = (RefreshState)(mRefreshState + 1);
863 }
864 } else {
865 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
866 return ERROR_IO;
867 }
868 } else {
869 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
870 mPlaylist = playlist;
871
872 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
873 updateDuration();
874 }
875 // Notify LiveSession to use target-duration based buffering level
876 // for up/down switch. Default LiveSession::kUpSwitchMark may not
877 // be reachable for live streams, as our max buffering amount is
878 // limited to 3 segments.
879 if (!mPlaylist->isComplete()) {
880 updateTargetDuration();
881 }
882 mPlaylistTimeUs = ALooper::GetNowUs();
883 }
884
885 mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
886 }
887 return OK;
888 }
889
890 // static
bufferStartsWithTsSyncByte(const sp<ABuffer> & buffer)891 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
892 return buffer->size() > 0 && buffer->data()[0] == 0x47;
893 }
894
shouldPauseDownload()895 bool PlaylistFetcher::shouldPauseDownload() {
896 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
897 // doesn't apply to subtitles
898 return false;
899 }
900
901 // Calculate threshold to abort current download
902 float thresholdRatio = getStoppingThreshold();
903
904 if (thresholdRatio < 0.0f) {
905 // never abort
906 return false;
907 } else if (thresholdRatio == 0.0f) {
908 // immediately abort
909 return true;
910 }
911
912 // now we have a positive thresholdUs, abort if remaining
913 // portion to download is over that threshold.
914 if (mSegmentFirstPTS < 0) {
915 // this means we haven't even find the first access unit,
916 // abort now as we must be very far away from the end.
917 return true;
918 }
919 int64_t lastEnqueueUs = mSegmentFirstPTS;
920 for (size_t i = 0; i < mPacketSources.size(); ++i) {
921 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
922 continue;
923 }
924 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
925 int32_t type;
926 if (meta == NULL || meta->findInt32("discontinuity", &type)) {
927 continue;
928 }
929 int64_t tmpUs;
930 CHECK(meta->findInt64("timeUs", &tmpUs));
931 if (tmpUs > lastEnqueueUs) {
932 lastEnqueueUs = tmpUs;
933 }
934 }
935 lastEnqueueUs -= mSegmentFirstPTS;
936
937 int64_t targetDurationUs = mPlaylist->getTargetDuration();
938 int64_t thresholdUs = thresholdRatio * targetDurationUs;
939
940 FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
941 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
942 (long long)thresholdUs,
943 (long long)(targetDurationUs - lastEnqueueUs));
944
945 if (targetDurationUs - lastEnqueueUs > thresholdUs) {
946 return true;
947 }
948 return false;
949 }
950
initDownloadState(AString & uri,sp<AMessage> & itemMeta,int32_t & firstSeqNumberInPlaylist,int32_t & lastSeqNumberInPlaylist)951 bool PlaylistFetcher::initDownloadState(
952 AString &uri,
953 sp<AMessage> &itemMeta,
954 int32_t &firstSeqNumberInPlaylist,
955 int32_t &lastSeqNumberInPlaylist) {
956 status_t err = refreshPlaylist();
957 firstSeqNumberInPlaylist = 0;
958 lastSeqNumberInPlaylist = 0;
959 bool discontinuity = false;
960
961 if (mPlaylist != NULL) {
962 mPlaylist->getSeqNumberRange(
963 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
964
965 if (mDiscontinuitySeq < 0) {
966 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
967 }
968 }
969
970 mSegmentFirstPTS = -1ll;
971
972 if (mPlaylist != NULL && mSeqNumber < 0) {
973 CHECK_GE(mStartTimeUs, 0ll);
974
975 if (mSegmentStartTimeUs < 0) {
976 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
977 // If this is a live session, start 3 segments from the end on connect
978 mSeqNumber = lastSeqNumberInPlaylist - 3;
979 if (mSeqNumber < firstSeqNumberInPlaylist) {
980 mSeqNumber = firstSeqNumberInPlaylist;
981 }
982 } else {
983 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
984 // use mStartTimeUs (client supplied timestamp) to determine both start segment
985 // and relative position inside a segment
986 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
987 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
988 }
989 mStartTimeUsRelative = true;
990 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
991 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
992 lastSeqNumberInPlaylist);
993 } else {
994 // When adapting or track switching, mSegmentStartTimeUs (relative
995 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
996 // timestamps coming from the media container) is used to determine the position
997 // inside a segments.
998 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
999 && mSeekMode != LiveSession::kSeekModeNextSample) {
1000 // avoid double fetch/decode
1001 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
1002 // for the starting segment in new variant.
1003 // If the two variants' segments are aligned, this gives the
1004 // next segment. If they're not aligned, this gives the segment
1005 // that overlaps no more than 1/2 * targetDurationUs.
1006 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
1007 + mPlaylist->getTargetDuration() / 2);
1008 } else {
1009 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
1010 }
1011 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
1012 if (mSeqNumber < minSeq) {
1013 mSeqNumber = minSeq;
1014 }
1015
1016 if (mSeqNumber < firstSeqNumberInPlaylist) {
1017 mSeqNumber = firstSeqNumberInPlaylist;
1018 }
1019
1020 if (mSeqNumber > lastSeqNumberInPlaylist) {
1021 mSeqNumber = lastSeqNumberInPlaylist;
1022 }
1023 FLOGV("Initial sequence number is %d from (%d .. %d)",
1024 mSeqNumber, firstSeqNumberInPlaylist,
1025 lastSeqNumberInPlaylist);
1026 }
1027 }
1028
1029 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
1030 if (mSeqNumber < firstSeqNumberInPlaylist
1031 || mSeqNumber > lastSeqNumberInPlaylist
1032 || err != OK) {
1033 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
1034 ++mNumRetries;
1035
1036 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
1037 // make sure we reach this retry logic on refresh failures
1038 // by adding an err != OK clause to all enclosing if's.
1039
1040 // refresh in increasing fraction (1/2, 1/3, ...) of the
1041 // playlist's target duration or 3 seconds, whichever is less
1042 int64_t delayUs = kMaxMonitorDelayUs;
1043 if (mPlaylist != NULL) {
1044 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
1045 / (1 + mNumRetries);
1046 }
1047 if (delayUs > kMaxMonitorDelayUs) {
1048 delayUs = kMaxMonitorDelayUs;
1049 }
1050 FLOGV("sequence number high: %d from (%d .. %d), "
1051 "monitor in %lld (retry=%d)",
1052 mSeqNumber, firstSeqNumberInPlaylist,
1053 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
1054 postMonitorQueue(delayUs);
1055 return false;
1056 }
1057
1058 if (err != OK) {
1059 notifyError(err);
1060 return false;
1061 }
1062
1063 // we've missed the boat, let's start 3 segments prior to the latest sequence
1064 // number available and signal a discontinuity.
1065
1066 ALOGI("We've missed the boat, restarting playback."
1067 " mStartup=%d, was looking for %d in %d-%d",
1068 mStartup, mSeqNumber, firstSeqNumberInPlaylist,
1069 lastSeqNumberInPlaylist);
1070 if (mStopParams != NULL) {
1071 // we should have kept on fetching until we hit the boundaries in mStopParams,
1072 // but since the segments we are supposed to fetch have already rolled off
1073 // the playlist, i.e. we have already missed the boat, we inevitably have to
1074 // skip.
1075 notifyStopReached();
1076 return false;
1077 }
1078 mSeqNumber = lastSeqNumberInPlaylist - 3;
1079 if (mSeqNumber < firstSeqNumberInPlaylist) {
1080 mSeqNumber = firstSeqNumberInPlaylist;
1081 }
1082 discontinuity = true;
1083
1084 // fall through
1085 } else {
1086 if (mPlaylist != NULL) {
1087 ALOGE("Cannot find sequence number %d in playlist "
1088 "(contains %d - %d)",
1089 mSeqNumber, firstSeqNumberInPlaylist,
1090 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
1091
1092 if (mTSParser != NULL) {
1093 mTSParser->signalEOS(ERROR_END_OF_STREAM);
1094 // Use an empty buffer; we don't have any new data, just want to extract
1095 // potential new access units after flush. Reset mSeqNumber to
1096 // lastSeqNumberInPlaylist such that we set the correct access unit
1097 // properties in extractAndQueueAccessUnitsFromTs.
1098 sp<ABuffer> buffer = new ABuffer(0);
1099 mSeqNumber = lastSeqNumberInPlaylist;
1100 extractAndQueueAccessUnitsFromTs(buffer);
1101 }
1102 notifyError(ERROR_END_OF_STREAM);
1103 } else {
1104 // It's possible that we were never able to download the playlist.
1105 // In this case we should notify error, instead of EOS, as EOS during
1106 // prepare means we succeeded in downloading everything.
1107 ALOGE("Failed to download playlist!");
1108 notifyError(ERROR_IO);
1109 }
1110
1111 return false;
1112 }
1113 }
1114
1115 mNumRetries = 0;
1116
1117 CHECK(mPlaylist->itemAt(
1118 mSeqNumber - firstSeqNumberInPlaylist,
1119 &uri,
1120 &itemMeta));
1121
1122 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
1123
1124 int32_t val;
1125 if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
1126 discontinuity = true;
1127 } else if (mLastDiscontinuitySeq >= 0
1128 && mDiscontinuitySeq != mLastDiscontinuitySeq) {
1129 // Seek jumped to a new discontinuity sequence. We need to signal
1130 // a format change to decoder. Decoder needs to shutdown and be
1131 // created again if seamless format change is unsupported.
1132 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
1133 "mDiscontinuitySeq %d, mStartTimeUs %lld",
1134 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
1135 discontinuity = true;
1136 }
1137 mLastDiscontinuitySeq = -1;
1138
1139 // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
1140 // this avoids interleaved connections to the key and segment file.
1141 {
1142 sp<ABuffer> junk = new ABuffer(16);
1143 junk->setRange(0, 16);
1144 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
1145 true /* first */);
1146 if (err == ERROR_NOT_CONNECTED) {
1147 return false;
1148 } else if (err != OK) {
1149 notifyError(err);
1150 return false;
1151 }
1152 }
1153
1154 if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
1155 // We need to signal a time discontinuity to ATSParser on the
1156 // first segment after start, or on a discontinuity segment.
1157 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
1158 // to send the time discontinuity.
1159 if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
1160 // If this was a live event this made no sense since
1161 // we don't have access to all the segment before the current
1162 // one.
1163 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
1164 }
1165
1166 // Setting mTimeChangeSignaled to true, so that if start time
1167 // searching goes into 2nd segment (without a discontinuity),
1168 // we don't reset time again. It causes corruption when pending
1169 // data in ATSParser is cleared.
1170 mTimeChangeSignaled = true;
1171 }
1172
1173 if (discontinuity) {
1174 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
1175
1176 // Signal a format discontinuity to ATSParser to clear partial data
1177 // from previous streams. Not doing this causes bitstream corruption.
1178 if (mTSParser != NULL) {
1179 mTSParser->signalDiscontinuity(
1180 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */);
1181 }
1182
1183 queueDiscontinuity(
1184 ATSParser::DISCONTINUITY_FORMAT_ONLY,
1185 NULL /* extra */);
1186
1187 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
1188 // This means we guessed mStartTimeUs to be in the previous
1189 // segment (likely very close to the end), but either video or
1190 // audio has not found start by the end of that segment.
1191 //
1192 // If this new segment is not a discontinuity, keep searching.
1193 //
1194 // If this new segment even got a discontinuity marker, just
1195 // set mStartTimeUs=0, and take all samples from now on.
1196 mStartTimeUs = 0;
1197 mFirstPTSValid = false;
1198 mIDRFound = false;
1199 mVideoBuffer->clear();
1200 }
1201 }
1202
1203 FLOGV("fetching segment %d from (%d .. %d)",
1204 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
1205 return true;
1206 }
1207
onDownloadNext()1208 void PlaylistFetcher::onDownloadNext() {
1209 AString uri;
1210 sp<AMessage> itemMeta;
1211 sp<ABuffer> buffer;
1212 sp<ABuffer> tsBuffer;
1213 int32_t firstSeqNumberInPlaylist = 0;
1214 int32_t lastSeqNumberInPlaylist = 0;
1215 bool connectHTTP = true;
1216
1217 if (mDownloadState->hasSavedState()) {
1218 mDownloadState->restoreState(
1219 uri,
1220 itemMeta,
1221 buffer,
1222 tsBuffer,
1223 firstSeqNumberInPlaylist,
1224 lastSeqNumberInPlaylist);
1225 connectHTTP = false;
1226 FLOGV("resuming: '%s'", uri.c_str());
1227 } else {
1228 if (!initDownloadState(
1229 uri,
1230 itemMeta,
1231 firstSeqNumberInPlaylist,
1232 lastSeqNumberInPlaylist)) {
1233 return;
1234 }
1235 FLOGV("fetching: '%s'", uri.c_str());
1236 }
1237
1238 int64_t range_offset, range_length;
1239 if (!itemMeta->findInt64("range-offset", &range_offset)
1240 || !itemMeta->findInt64("range-length", &range_length)) {
1241 range_offset = 0;
1242 range_length = -1;
1243 }
1244
1245 // block-wise download
1246 bool shouldPause = false;
1247 ssize_t bytesRead;
1248 do {
1249 int64_t startUs = ALooper::GetNowUs();
1250 bytesRead = mHTTPDownloader->fetchBlock(
1251 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
1252 NULL /* actualURL */, connectHTTP);
1253 int64_t delayUs = ALooper::GetNowUs() - startUs;
1254
1255 if (bytesRead == ERROR_NOT_CONNECTED) {
1256 return;
1257 }
1258 if (bytesRead < 0) {
1259 status_t err = bytesRead;
1260 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
1261 notifyError(err);
1262 return;
1263 }
1264
1265 // add sample for bandwidth estimation, excluding samples from subtitles (as
1266 // its too small), or during startup/resumeUntil (when we could have more than
1267 // one connection open which affects bandwidth)
1268 if (!mStartup && mStopParams == NULL && bytesRead > 0
1269 && (mStreamTypeMask
1270 & (LiveSession::STREAMTYPE_AUDIO
1271 | LiveSession::STREAMTYPE_VIDEO))) {
1272 mSession->addBandwidthMeasurement(bytesRead, delayUs);
1273 if (delayUs > 2000000ll) {
1274 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
1275 bytesRead, (double)delayUs / 1.0e6);
1276 }
1277 }
1278
1279 connectHTTP = false;
1280
1281 CHECK(buffer != NULL);
1282
1283 size_t size = buffer->size();
1284 // Set decryption range.
1285 buffer->setRange(size - bytesRead, bytesRead);
1286 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
1287 buffer->offset() == 0 /* first */);
1288 // Unset decryption range.
1289 buffer->setRange(0, size);
1290
1291 if (err != OK) {
1292 ALOGE("decryptBuffer failed w/ error %d", err);
1293
1294 notifyError(err);
1295 return;
1296 }
1297
1298 bool startUp = mStartup; // save current start up state
1299
1300 err = OK;
1301 if (bufferStartsWithTsSyncByte(buffer)) {
1302 // Incremental extraction is only supported for MPEG2 transport streams.
1303 if (tsBuffer == NULL) {
1304 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1305 tsBuffer->setRange(0, 0);
1306 } else if (tsBuffer->capacity() != buffer->capacity()) {
1307 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
1308 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
1309 tsBuffer->setRange(tsOff, tsSize);
1310 }
1311 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
1312 err = extractAndQueueAccessUnitsFromTs(tsBuffer);
1313 }
1314
1315 if (err == -EAGAIN) {
1316 // starting sequence number too low/high
1317 mTSParser.clear();
1318 for (size_t i = 0; i < mPacketSources.size(); i++) {
1319 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1320 packetSource->clear();
1321 }
1322 postMonitorQueue();
1323 return;
1324 } else if (err == ERROR_OUT_OF_RANGE) {
1325 // reached stopping point
1326 notifyStopReached();
1327 return;
1328 } else if (err != OK) {
1329 notifyError(err);
1330 return;
1331 }
1332 // If we're switching, post start notification
1333 // this should only be posted when the last chunk is full processed by TSParser
1334 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1335 CHECK(mStartTimeUsNotify != NULL);
1336 mStartTimeUsNotify->post();
1337 mStartTimeUsNotify.clear();
1338 shouldPause = true;
1339 }
1340 if (shouldPause || shouldPauseDownload()) {
1341 // save state and return if this is not the last chunk,
1342 // leaving the fetcher in paused state.
1343 if (bytesRead != 0) {
1344 mDownloadState->saveState(
1345 uri,
1346 itemMeta,
1347 buffer,
1348 tsBuffer,
1349 firstSeqNumberInPlaylist,
1350 lastSeqNumberInPlaylist);
1351 return;
1352 }
1353 shouldPause = true;
1354 }
1355 } while (bytesRead != 0);
1356
1357 if (bufferStartsWithTsSyncByte(buffer)) {
1358 // If we don't see a stream in the program table after fetching a full ts segment
1359 // mark it as nonexistent.
1360 ATSParser::SourceType srcTypes[] =
1361 { ATSParser::VIDEO, ATSParser::AUDIO };
1362 LiveSession::StreamType streamTypes[] =
1363 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
1364 const size_t kNumTypes = NELEM(srcTypes);
1365
1366 for (size_t i = 0; i < kNumTypes; i++) {
1367 ATSParser::SourceType srcType = srcTypes[i];
1368 LiveSession::StreamType streamType = streamTypes[i];
1369
1370 sp<AnotherPacketSource> source =
1371 static_cast<AnotherPacketSource *>(
1372 mTSParser->getSource(srcType).get());
1373
1374 if (!mTSParser->hasSource(srcType)) {
1375 ALOGW("MPEG2 Transport stream does not contain %s data.",
1376 srcType == ATSParser::VIDEO ? "video" : "audio");
1377
1378 mStreamTypeMask &= ~streamType;
1379 mPacketSources.removeItem(streamType);
1380 }
1381 }
1382
1383 }
1384
1385 if (checkDecryptPadding(buffer) != OK) {
1386 ALOGE("Incorrect padding bytes after decryption.");
1387 notifyError(ERROR_MALFORMED);
1388 return;
1389 }
1390
1391 if (tsBuffer != NULL) {
1392 AString method;
1393 CHECK(buffer->meta()->findString("cipher-method", &method));
1394 if ((tsBuffer->size() > 0 && method == "NONE")
1395 || tsBuffer->size() > 16) {
1396 ALOGE("MPEG2 transport stream is not an even multiple of 188 "
1397 "bytes in length.");
1398 notifyError(ERROR_MALFORMED);
1399 return;
1400 }
1401 }
1402
1403 // bulk extract non-ts files
1404 bool startUp = mStartup;
1405 if (tsBuffer == NULL) {
1406 status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
1407 if (err == -EAGAIN) {
1408 // starting sequence number too low/high
1409 postMonitorQueue();
1410 return;
1411 } else if (err == ERROR_OUT_OF_RANGE) {
1412 // reached stopping point
1413 notifyStopReached();
1414 return;
1415 } else if (err != OK) {
1416 notifyError(err);
1417 return;
1418 }
1419 }
1420
1421 ++mSeqNumber;
1422
1423 // if adapting, pause after found the next starting point
1424 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
1425 CHECK(mStartTimeUsNotify != NULL);
1426 mStartTimeUsNotify->post();
1427 mStartTimeUsNotify.clear();
1428 shouldPause = true;
1429 }
1430
1431 if (!shouldPause) {
1432 postMonitorQueue();
1433 }
1434 }
1435
1436 /*
1437 * returns true if we need to adjust mSeqNumber
1438 */
adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs)1439 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
1440 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1441
1442 int64_t minDiffUs, maxDiffUs;
1443 if (mSeekMode == LiveSession::kSeekModeNextSample) {
1444 // if the previous fetcher paused in the middle of a segment, we
1445 // want to start at a segment that overlaps the last sample
1446 minDiffUs = -mPlaylist->getTargetDuration();
1447 maxDiffUs = 0ll;
1448 } else {
1449 // if the previous fetcher paused at the end of a segment, ideally
1450 // we want to start at the segment that's roughly aligned with its
1451 // next segment, but if the two variants are not well aligned we
1452 // adjust the diff to within (-T/2, T/2)
1453 minDiffUs = -mPlaylist->getTargetDuration() / 2;
1454 maxDiffUs = mPlaylist->getTargetDuration() / 2;
1455 }
1456
1457 int32_t oldSeqNumber = mSeqNumber;
1458 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
1459
1460 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
1461 int64_t diffUs = anchorTimeUs - mStartTimeUs;
1462 if (diffUs > maxDiffUs) {
1463 while (index > 0 && diffUs > maxDiffUs) {
1464 --index;
1465
1466 sp<AMessage> itemMeta;
1467 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1468
1469 int64_t itemDurationUs;
1470 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1471
1472 diffUs -= itemDurationUs;
1473 }
1474 } else if (diffUs < minDiffUs) {
1475 while (index + 1 < (ssize_t) mPlaylist->size()
1476 && diffUs < minDiffUs) {
1477 ++index;
1478
1479 sp<AMessage> itemMeta;
1480 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
1481
1482 int64_t itemDurationUs;
1483 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1484
1485 diffUs += itemDurationUs;
1486 }
1487 }
1488
1489 mSeqNumber = firstSeqNumberInPlaylist + index;
1490
1491 if (mSeqNumber != oldSeqNumber) {
1492 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
1493 (long long) anchorTimeUs - mStartTimeUs,
1494 (long long) minDiffUs,
1495 (long long) maxDiffUs);
1496 return true;
1497 }
1498 return false;
1499 }
1500
getSeqNumberForDiscontinuity(size_t discontinuitySeq) const1501 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
1502 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
1503
1504 size_t index = 0;
1505 while (index < mPlaylist->size()) {
1506 sp<AMessage> itemMeta;
1507 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
1508 size_t curDiscontinuitySeq;
1509 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
1510 int32_t seqNumber = firstSeqNumberInPlaylist + index;
1511 if (curDiscontinuitySeq == discontinuitySeq) {
1512 return seqNumber;
1513 } else if (curDiscontinuitySeq > discontinuitySeq) {
1514 return seqNumber <= 0 ? 0 : seqNumber - 1;
1515 }
1516
1517 ++index;
1518 }
1519
1520 return firstSeqNumberInPlaylist + mPlaylist->size();
1521 }
1522
getSeqNumberForTime(int64_t timeUs) const1523 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
1524 size_t index = 0;
1525 int64_t segmentStartUs = 0;
1526 while (index < mPlaylist->size()) {
1527 sp<AMessage> itemMeta;
1528 CHECK(mPlaylist->itemAt(
1529 index, NULL /* uri */, &itemMeta));
1530
1531 int64_t itemDurationUs;
1532 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
1533
1534 if (timeUs < segmentStartUs + itemDurationUs) {
1535 break;
1536 }
1537
1538 segmentStartUs += itemDurationUs;
1539 ++index;
1540 }
1541
1542 if (index >= mPlaylist->size()) {
1543 index = mPlaylist->size() - 1;
1544 }
1545
1546 return mPlaylist->getFirstSeqNumber() + index;
1547 }
1548
setAccessUnitProperties(const sp<ABuffer> & accessUnit,const sp<AnotherPacketSource> & source,bool discard)1549 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
1550 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
1551 sp<MetaData> format = source->getFormat();
1552 if (format != NULL) {
1553 // for simplicity, store a reference to the format in each unit
1554 accessUnit->meta()->setObject("format", format);
1555 }
1556
1557 if (discard) {
1558 accessUnit->meta()->setInt32("discard", discard);
1559 }
1560
1561 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1562 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1563 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
1564 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
1565 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
1566 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
1567 }
1568 return accessUnit;
1569 }
1570
isStartTimeReached(int64_t timeUs)1571 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
1572 if (!mFirstPTSValid) {
1573 mFirstTimeUs = timeUs;
1574 mFirstPTSValid = true;
1575 }
1576 bool startTimeReached = true;
1577 if (mStartTimeUsRelative) {
1578 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
1579 (long long)timeUs,
1580 (long long)mFirstTimeUs,
1581 (long long)(timeUs - mFirstTimeUs));
1582 timeUs -= mFirstTimeUs;
1583 if (timeUs < 0) {
1584 FLOGV("clamp negative timeUs to 0");
1585 timeUs = 0;
1586 }
1587 startTimeReached = (timeUs >= mStartTimeUs);
1588 }
1589 return startTimeReached;
1590 }
1591
extractAndQueueAccessUnitsFromTs(const sp<ABuffer> & buffer)1592 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
1593 if (mTSParser == NULL) {
1594 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
1595 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
1596 }
1597
1598 if (mNextPTSTimeUs >= 0ll) {
1599 sp<AMessage> extra = new AMessage;
1600 // Since we are using absolute timestamps, signal an offset of 0 to prevent
1601 // ATSParser from skewing the timestamps of access units.
1602 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
1603
1604 // When adapting, signal a recent media time to the parser,
1605 // so that PTS wrap around is handled for the new variant.
1606 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
1607 extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
1608 }
1609
1610 mTSParser->signalDiscontinuity(
1611 ATSParser::DISCONTINUITY_TIME, extra);
1612
1613 mNextPTSTimeUs = -1ll;
1614 }
1615
1616 size_t offset = 0;
1617 while (offset + 188 <= buffer->size()) {
1618 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
1619
1620 if (err != OK) {
1621 return err;
1622 }
1623
1624 offset += 188;
1625 }
1626 // setRange to indicate consumed bytes.
1627 buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
1628
1629 if (mSegmentFirstPTS < 0ll) {
1630 // get the smallest first PTS from all streams present in this parser
1631 for (size_t i = mPacketSources.size(); i-- > 0;) {
1632 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1633 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1634 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1635 return ERROR_MALFORMED;
1636 }
1637 if (stream == LiveSession::STREAMTYPE_METADATA) {
1638 continue;
1639 }
1640 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1641 sp<AnotherPacketSource> source =
1642 static_cast<AnotherPacketSource *>(
1643 mTSParser->getSource(type).get());
1644
1645 if (source == NULL) {
1646 continue;
1647 }
1648 sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
1649 if (meta != NULL) {
1650 int64_t timeUs;
1651 CHECK(meta->findInt64("timeUs", &timeUs));
1652 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
1653 mSegmentFirstPTS = timeUs;
1654 }
1655 }
1656 }
1657 if (mSegmentFirstPTS < 0ll) {
1658 // didn't find any TS packet, can return early
1659 return OK;
1660 }
1661 if (!mStartTimeUsRelative) {
1662 // mStartup
1663 // mStartup is true until we have queued a packet for all the streams
1664 // we are fetching. We queue packets whose timestamps are greater than
1665 // mStartTimeUs.
1666 // mSegmentStartTimeUs >= 0
1667 // mSegmentStartTimeUs is non-negative when adapting or switching tracks
1668 // adjustSeqNumberWithAnchorTime(timeUs) == true
1669 // we guessed a seq number that's either too large or too small.
1670 // If this happens, we'll adjust mSeqNumber and restart fetching from new
1671 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
1672 // to -1 so that we don't enter this chunk next time.
1673 if (mStartup && mSegmentStartTimeUs >= 0
1674 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
1675 mStartTimeUsNotify = mNotify->dup();
1676 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
1677 mStartTimeUsNotify->setString("uri", mURI);
1678 mIDRFound = false;
1679 mSegmentStartTimeUs = -1;
1680 return -EAGAIN;
1681 }
1682 }
1683 }
1684
1685 status_t err = OK;
1686 for (size_t i = mPacketSources.size(); i-- > 0;) {
1687 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1688
1689 const LiveSession::StreamType stream = mPacketSources.keyAt(i);
1690 if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
1691 ALOGE("MPEG2 Transport streams do not contain subtitles.");
1692 return ERROR_MALFORMED;
1693 }
1694
1695 const char *key = LiveSession::getKeyForStream(stream);
1696 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
1697
1698 sp<AnotherPacketSource> source =
1699 static_cast<AnotherPacketSource *>(
1700 mTSParser->getSource(type).get());
1701
1702 if (source == NULL) {
1703 continue;
1704 }
1705
1706 const char *mime;
1707 sp<MetaData> format = source->getFormat();
1708 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
1709 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
1710
1711 sp<ABuffer> accessUnit;
1712 status_t finalResult;
1713 while (source->hasBufferAvailable(&finalResult)
1714 && source->dequeueAccessUnit(&accessUnit) == OK) {
1715
1716 int64_t timeUs;
1717 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
1718
1719 if (mStartup) {
1720 bool startTimeReached = isStartTimeReached(timeUs);
1721
1722 if (!startTimeReached || (isAvc && !mIDRFound)) {
1723 // buffer up to the closest preceding IDR frame in the next segement,
1724 // or the closest succeeding IDR frame after the exact position
1725 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
1726 (long long)timeUs,
1727 (long long)mStartTimeUs,
1728 (long long)timeUs - mStartTimeUs,
1729 mIDRFound);
1730 if (isAvc) {
1731 if (IsIDR(accessUnit)) {
1732 mVideoBuffer->clear();
1733 FSLOGV(stream, "found IDR, clear mVideoBuffer");
1734 mIDRFound = true;
1735 }
1736 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
1737 mVideoBuffer->queueAccessUnit(accessUnit);
1738 FSLOGV(stream, "saving AVC video AccessUnit");
1739 }
1740 }
1741 if (!startTimeReached || (isAvc && !mIDRFound)) {
1742 continue;
1743 }
1744 }
1745 }
1746
1747 if (mStartTimeUsNotify != NULL) {
1748 uint32_t streamMask = 0;
1749 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
1750 if ((mStreamTypeMask & mPacketSources.keyAt(i))
1751 && !(streamMask & mPacketSources.keyAt(i))) {
1752 streamMask |= mPacketSources.keyAt(i);
1753 mStartTimeUsNotify->setInt32("streamMask", streamMask);
1754 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
1755 (long long)timeUs, streamMask);
1756
1757 if (streamMask == mStreamTypeMask) {
1758 FLOGV("found start point for all streams");
1759 mStartup = false;
1760 }
1761 }
1762 }
1763
1764 if (mStopParams != NULL) {
1765 int32_t discontinuitySeq;
1766 int64_t stopTimeUs;
1767 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
1768 || discontinuitySeq > mDiscontinuitySeq
1769 || !mStopParams->findInt64(key, &stopTimeUs)
1770 || (discontinuitySeq == mDiscontinuitySeq
1771 && timeUs >= stopTimeUs)) {
1772 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
1773 mStreamTypeMask &= ~stream;
1774 mPacketSources.removeItemsAt(i);
1775 break;
1776 }
1777 }
1778
1779 if (stream == LiveSession::STREAMTYPE_VIDEO) {
1780 const bool discard = true;
1781 status_t status;
1782 while (mVideoBuffer->hasBufferAvailable(&status)) {
1783 sp<ABuffer> videoBuffer;
1784 mVideoBuffer->dequeueAccessUnit(&videoBuffer);
1785 setAccessUnitProperties(videoBuffer, source, discard);
1786 packetSource->queueAccessUnit(videoBuffer);
1787 int64_t bufferTimeUs;
1788 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
1789 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
1790 (long long)bufferTimeUs);
1791 }
1792 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
1793 mHasMetadata = true;
1794 sp<AMessage> notify = mNotify->dup();
1795 notify->setInt32("what", kWhatMetadataDetected);
1796 notify->post();
1797 }
1798
1799 setAccessUnitProperties(accessUnit, source);
1800 packetSource->queueAccessUnit(accessUnit);
1801 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
1802 }
1803
1804 if (err != OK) {
1805 break;
1806 }
1807 }
1808
1809 if (err != OK) {
1810 for (size_t i = mPacketSources.size(); i-- > 0;) {
1811 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
1812 packetSource->clear();
1813 }
1814 return err;
1815 }
1816
1817 if (!mStreamTypeMask) {
1818 // Signal gap is filled between original and new stream.
1819 FLOGV("reached stop point for all streams");
1820 return ERROR_OUT_OF_RANGE;
1821 }
1822
1823 return OK;
1824 }
1825
1826 /* static */
bufferStartsWithWebVTTMagicSequence(const sp<ABuffer> & buffer)1827 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
1828 const sp<ABuffer> &buffer) {
1829 size_t pos = 0;
1830
1831 // skip possible BOM
1832 if (buffer->size() >= pos + 3 &&
1833 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
1834 pos += 3;
1835 }
1836
1837 // accept WEBVTT followed by SPACE, TAB or (CR) LF
1838 if (buffer->size() < pos + 6 ||
1839 memcmp("WEBVTT", buffer->data() + pos, 6)) {
1840 return false;
1841 }
1842 pos += 6;
1843
1844 if (buffer->size() == pos) {
1845 return true;
1846 }
1847
1848 uint8_t sep = buffer->data()[pos];
1849 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
1850 }
1851
extractAndQueueAccessUnits(const sp<ABuffer> & buffer,const sp<AMessage> & itemMeta)1852 status_t PlaylistFetcher::extractAndQueueAccessUnits(
1853 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
1854 if (bufferStartsWithWebVTTMagicSequence(buffer)) {
1855 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
1856 ALOGE("This stream only contains subtitles.");
1857 return ERROR_MALFORMED;
1858 }
1859
1860 const sp<AnotherPacketSource> packetSource =
1861 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
1862
1863 int64_t durationUs;
1864 CHECK(itemMeta->findInt64("durationUs", &durationUs));
1865 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
1866 buffer->meta()->setInt64("durationUs", durationUs);
1867 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
1868 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
1869 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
1870 packetSource->queueAccessUnit(buffer);
1871 return OK;
1872 }
1873
1874 if (mNextPTSTimeUs >= 0ll) {
1875 mNextPTSTimeUs = -1ll;
1876 }
1877
1878 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
1879 // stream prefixed by an ID3 tag.
1880
1881 bool firstID3Tag = true;
1882 uint64_t PTS = 0;
1883
1884 for (;;) {
1885 // Make sure to skip all ID3 tags preceding the audio data.
1886 // At least one must be present to provide the PTS timestamp.
1887
1888 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
1889 if (!id3.isValid()) {
1890 if (firstID3Tag) {
1891 ALOGE("Unable to parse ID3 tag.");
1892 return ERROR_MALFORMED;
1893 } else {
1894 break;
1895 }
1896 }
1897
1898 if (firstID3Tag) {
1899 bool found = false;
1900
1901 ID3::Iterator it(id3, "PRIV");
1902 while (!it.done()) {
1903 size_t length;
1904 const uint8_t *data = it.getData(&length);
1905
1906 static const char *kMatchName =
1907 "com.apple.streaming.transportStreamTimestamp";
1908 static const size_t kMatchNameLen = strlen(kMatchName);
1909
1910 if (length == kMatchNameLen + 1 + 8
1911 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
1912 found = true;
1913 PTS = U64_AT(&data[kMatchNameLen + 1]);
1914 }
1915
1916 it.next();
1917 }
1918
1919 if (!found) {
1920 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
1921 return ERROR_MALFORMED;
1922 }
1923 }
1924
1925 // skip the ID3 tag
1926 buffer->setRange(
1927 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
1928
1929 firstID3Tag = false;
1930 }
1931
1932 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
1933 ALOGW("This stream only contains audio data!");
1934
1935 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
1936
1937 if (mStreamTypeMask == 0) {
1938 return OK;
1939 }
1940 }
1941
1942 sp<AnotherPacketSource> packetSource =
1943 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
1944
1945 if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
1946 ABitReader bits(buffer->data(), buffer->size());
1947
1948 // adts_fixed_header
1949
1950 CHECK_EQ(bits.getBits(12), 0xfffu);
1951 bits.skipBits(3); // ID, layer
1952 bool protection_absent __unused = bits.getBits(1) != 0;
1953
1954 unsigned profile = bits.getBits(2);
1955 CHECK_NE(profile, 3u);
1956 unsigned sampling_freq_index = bits.getBits(4);
1957 bits.getBits(1); // private_bit
1958 unsigned channel_configuration = bits.getBits(3);
1959 CHECK_NE(channel_configuration, 0u);
1960 bits.skipBits(2); // original_copy, home
1961
1962 sp<MetaData> meta = MakeAACCodecSpecificData(
1963 profile, sampling_freq_index, channel_configuration);
1964
1965 meta->setInt32(kKeyIsADTS, true);
1966
1967 packetSource->setFormat(meta);
1968 }
1969
1970 int64_t numSamples = 0ll;
1971 int32_t sampleRate;
1972 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
1973
1974 int64_t timeUs = (PTS * 100ll) / 9ll;
1975 if (mStartup && !mFirstPTSValid) {
1976 mFirstPTSValid = true;
1977 mFirstTimeUs = timeUs;
1978 }
1979
1980 if (mSegmentFirstPTS < 0ll) {
1981 mSegmentFirstPTS = timeUs;
1982 if (!mStartTimeUsRelative) {
1983 // Duplicated logic from how we handle .ts playlists.
1984 if (mStartup && mSegmentStartTimeUs >= 0
1985 && adjustSeqNumberWithAnchorTime(timeUs)) {
1986 mSegmentStartTimeUs = -1;
1987 return -EAGAIN;
1988 }
1989 }
1990 }
1991
1992 size_t offset = 0;
1993 while (offset < buffer->size()) {
1994 const uint8_t *adtsHeader = buffer->data() + offset;
1995 CHECK_LT(offset + 5, buffer->size());
1996
1997 unsigned aac_frame_length =
1998 ((adtsHeader[3] & 3) << 11)
1999 | (adtsHeader[4] << 3)
2000 | (adtsHeader[5] >> 5);
2001
2002 if (aac_frame_length == 0) {
2003 const uint8_t *id3Header = adtsHeader;
2004 if (!memcmp(id3Header, "ID3", 3)) {
2005 ID3 id3(id3Header, buffer->size() - offset, true);
2006 if (id3.isValid()) {
2007 offset += id3.rawSize();
2008 continue;
2009 };
2010 }
2011 return ERROR_MALFORMED;
2012 }
2013
2014 CHECK_LE(offset + aac_frame_length, buffer->size());
2015
2016 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
2017 offset += aac_frame_length;
2018
2019 // Each AAC frame encodes 1024 samples.
2020 numSamples += 1024;
2021
2022 if (mStartup) {
2023 int64_t startTimeUs = unitTimeUs;
2024 if (mStartTimeUsRelative) {
2025 startTimeUs -= mFirstTimeUs;
2026 if (startTimeUs < 0) {
2027 startTimeUs = 0;
2028 }
2029 }
2030 if (startTimeUs < mStartTimeUs) {
2031 continue;
2032 }
2033
2034 if (mStartTimeUsNotify != NULL) {
2035 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
2036 mStartup = false;
2037 }
2038 }
2039
2040 if (mStopParams != NULL) {
2041 int32_t discontinuitySeq;
2042 int64_t stopTimeUs;
2043 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
2044 || discontinuitySeq > mDiscontinuitySeq
2045 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
2046 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
2047 mStreamTypeMask = 0;
2048 mPacketSources.clear();
2049 return ERROR_OUT_OF_RANGE;
2050 }
2051 }
2052
2053 sp<ABuffer> unit = new ABuffer(aac_frame_length);
2054 memcpy(unit->data(), adtsHeader, aac_frame_length);
2055
2056 unit->meta()->setInt64("timeUs", unitTimeUs);
2057 setAccessUnitProperties(unit, packetSource);
2058 packetSource->queueAccessUnit(unit);
2059 }
2060
2061 return OK;
2062 }
2063
updateDuration()2064 void PlaylistFetcher::updateDuration() {
2065 int64_t durationUs = 0ll;
2066 for (size_t index = 0; index < mPlaylist->size(); ++index) {
2067 sp<AMessage> itemMeta;
2068 CHECK(mPlaylist->itemAt(
2069 index, NULL /* uri */, &itemMeta));
2070
2071 int64_t itemDurationUs;
2072 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
2073
2074 durationUs += itemDurationUs;
2075 }
2076
2077 sp<AMessage> msg = mNotify->dup();
2078 msg->setInt32("what", kWhatDurationUpdate);
2079 msg->setInt64("durationUs", durationUs);
2080 msg->post();
2081 }
2082
updateTargetDuration()2083 void PlaylistFetcher::updateTargetDuration() {
2084 sp<AMessage> msg = mNotify->dup();
2085 msg->setInt32("what", kWhatTargetDurationUpdate);
2086 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
2087 msg->post();
2088 }
2089
2090 } // namespace android
2091