1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25
26 #include "mpeg2ts/AnotherPacketSource.h"
27
28 #include <cutils/properties.h>
29 #include <media/MediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37
38 #include <utils/Mutex.h>
39
40 #include <ctype.h>
41 #include <inttypes.h>
42
43 namespace android {
44
45 // static
46 // Bandwidth Switch Mark Defaults
47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50 const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51
52 //TODO: redefine this mark to a fair value
53 // default buffer underflow mark
54 static const int kUnderflowMarkMs = 1000; // 1 second
55
56 struct LiveSession::BandwidthEstimator : public RefBase {
57 BandwidthEstimator();
58
59 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
60 bool estimateBandwidth(
61 int32_t *bandwidth,
62 bool *isStable = NULL,
63 int32_t *shortTermBps = NULL);
64
65 private:
66 // Bandwidth estimation parameters
67 static const int32_t kShortTermBandwidthItems = 3;
68 static const int32_t kMinBandwidthHistoryItems = 20;
69 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
70 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
71 static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
72
73 struct BandwidthEntry {
74 int64_t mTimestampUs;
75 int64_t mDelayUs;
76 size_t mNumBytes;
77 };
78
79 Mutex mLock;
80 List<BandwidthEntry> mBandwidthHistory;
81 List<int32_t> mPrevEstimates;
82 int32_t mShortTermEstimate;
83 bool mHasNewSample;
84 bool mIsStable;
85 int64_t mTotalTransferTimeUs;
86 size_t mTotalTransferBytes;
87
88 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
89 };
90
BandwidthEstimator()91 LiveSession::BandwidthEstimator::BandwidthEstimator() :
92 mShortTermEstimate(0),
93 mHasNewSample(false),
94 mIsStable(true),
95 mTotalTransferTimeUs(0),
96 mTotalTransferBytes(0) {
97 }
98
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)99 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
100 size_t numBytes, int64_t delayUs) {
101 AutoMutex autoLock(mLock);
102
103 int64_t nowUs = ALooper::GetNowUs();
104 BandwidthEntry entry;
105 entry.mTimestampUs = nowUs;
106 entry.mDelayUs = delayUs;
107 entry.mNumBytes = numBytes;
108 mTotalTransferTimeUs += delayUs;
109 mTotalTransferBytes += numBytes;
110 mBandwidthHistory.push_back(entry);
111 mHasNewSample = true;
112
113 // Remove no more than 10% of total transfer time at a time
114 // to avoid sudden jump on bandwidth estimation. There might
115 // be long blocking reads that takes up signification time,
116 // we have to keep a longer window in that case.
117 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
118 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
119 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
120 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
121 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
122 }
123 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
124 // and total transfer time at least kMaxBandwidthHistoryWindowUs.
125 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
126 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
127 // remove sample if either absolute age or total transfer time is
128 // over kMaxBandwidthHistoryWindowUs
129 if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
130 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
131 break;
132 }
133 mTotalTransferTimeUs -= it->mDelayUs;
134 mTotalTransferBytes -= it->mNumBytes;
135 mBandwidthHistory.erase(mBandwidthHistory.begin());
136 }
137 }
138
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)139 bool LiveSession::BandwidthEstimator::estimateBandwidth(
140 int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
141 AutoMutex autoLock(mLock);
142
143 if (mBandwidthHistory.size() < 2) {
144 return false;
145 }
146
147 if (!mHasNewSample) {
148 *bandwidthBps = *(--mPrevEstimates.end());
149 if (isStable) {
150 *isStable = mIsStable;
151 }
152 if (shortTermBps) {
153 *shortTermBps = mShortTermEstimate;
154 }
155 return true;
156 }
157
158 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
159 mPrevEstimates.push_back(*bandwidthBps);
160 while (mPrevEstimates.size() > 3) {
161 mPrevEstimates.erase(mPrevEstimates.begin());
162 }
163 mHasNewSample = false;
164
165 int64_t totalTimeUs = 0;
166 size_t totalBytes = 0;
167 if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
168 List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
169 for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
170 totalTimeUs += it->mDelayUs;
171 totalBytes += it->mNumBytes;
172 }
173 }
174 mShortTermEstimate = totalTimeUs > 0 ?
175 (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
176 if (shortTermBps) {
177 *shortTermBps = mShortTermEstimate;
178 }
179
180 int64_t minEstimate = -1, maxEstimate = -1;
181 List<int32_t>::iterator it;
182 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
183 int32_t estimate = *it;
184 if (minEstimate < 0 || minEstimate > estimate) {
185 minEstimate = estimate;
186 }
187 if (maxEstimate < 0 || maxEstimate < estimate) {
188 maxEstimate = estimate;
189 }
190 }
191 // consider it stable if long-term average is not jumping a lot
192 // and short-term average is not much lower than long-term average
193 mIsStable = (maxEstimate <= minEstimate * 4 / 3)
194 && mShortTermEstimate > minEstimate * 7 / 10;
195 if (isStable) {
196 *isStable = mIsStable;
197 }
198
199 #if 0
200 {
201 char dumpStr[1024] = {0};
202 size_t itemIdx = 0;
203 size_t histSize = mBandwidthHistory.size();
204 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
205 *bandwidthBps, mIsStable, histSize);
206 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
207 for (; it != mBandwidthHistory.end(); ++it) {
208 if (itemIdx > 50) {
209 sprintf(dumpStr + strlen(dumpStr),
210 "...(%zd more items)... }", histSize - itemIdx);
211 break;
212 }
213 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
214 it->mNumBytes / 1024,
215 (double)it->mDelayUs * 1.0e-6,
216 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
217 itemIdx++;
218 }
219 ALOGE(dumpStr);
220 }
221 #endif
222 return true;
223 }
224
225 //static
getKeyForStream(StreamType type)226 const char *LiveSession::getKeyForStream(StreamType type) {
227 switch (type) {
228 case STREAMTYPE_VIDEO:
229 return "timeUsVideo";
230 case STREAMTYPE_AUDIO:
231 return "timeUsAudio";
232 case STREAMTYPE_SUBTITLES:
233 return "timeUsSubtitle";
234 case STREAMTYPE_METADATA:
235 return "timeUsMetadata"; // unused
236 default:
237 TRESPASS();
238 }
239 return NULL;
240 }
241
242 //static
getNameForStream(StreamType type)243 const char *LiveSession::getNameForStream(StreamType type) {
244 switch (type) {
245 case STREAMTYPE_VIDEO:
246 return "video";
247 case STREAMTYPE_AUDIO:
248 return "audio";
249 case STREAMTYPE_SUBTITLES:
250 return "subs";
251 case STREAMTYPE_METADATA:
252 return "metadata";
253 default:
254 break;
255 }
256 return "unknown";
257 }
258
259 //static
getSourceTypeForStream(StreamType type)260 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
261 switch (type) {
262 case STREAMTYPE_VIDEO:
263 return ATSParser::VIDEO;
264 case STREAMTYPE_AUDIO:
265 return ATSParser::AUDIO;
266 case STREAMTYPE_METADATA:
267 return ATSParser::META;
268 case STREAMTYPE_SUBTITLES:
269 default:
270 TRESPASS();
271 }
272 return ATSParser::NUM_SOURCE_TYPES; // should not reach here
273 }
274
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<MediaHTTPService> & httpService)275 LiveSession::LiveSession(
276 const sp<AMessage> ¬ify, uint32_t flags,
277 const sp<MediaHTTPService> &httpService)
278 : mNotify(notify),
279 mFlags(flags),
280 mHTTPService(httpService),
281 mBuffering(false),
282 mInPreparationPhase(true),
283 mPollBufferingGeneration(0),
284 mPrevBufferPercentage(-1),
285 mCurBandwidthIndex(-1),
286 mOrigBandwidthIndex(-1),
287 mLastBandwidthBps(-1ll),
288 mLastBandwidthStable(false),
289 mBandwidthEstimator(new BandwidthEstimator()),
290 mMaxWidth(720),
291 mMaxHeight(480),
292 mStreamMask(0),
293 mNewStreamMask(0),
294 mSwapMask(0),
295 mSwitchGeneration(0),
296 mSubtitleGeneration(0),
297 mLastDequeuedTimeUs(0ll),
298 mRealTimeBaseUs(0ll),
299 mReconfigurationInProgress(false),
300 mSwitchInProgress(false),
301 mUpSwitchMark(kUpSwitchMarkUs),
302 mDownSwitchMark(kDownSwitchMarkUs),
303 mUpSwitchMargin(kUpSwitchMarginUs),
304 mFirstTimeUsValid(false),
305 mFirstTimeUs(0),
306 mLastSeekTimeUs(0),
307 mHasMetadata(false) {
308 mStreams[kAudioIndex] = StreamItem("audio");
309 mStreams[kVideoIndex] = StreamItem("video");
310 mStreams[kSubtitleIndex] = StreamItem("subtitles");
311
312 for (size_t i = 0; i < kNumSources; ++i) {
313 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
314 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315 }
316 }
317
~LiveSession()318 LiveSession::~LiveSession() {
319 if (mFetcherLooper != NULL) {
320 mFetcherLooper->stop();
321 }
322 }
323
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)324 int64_t LiveSession::calculateMediaTimeUs(
325 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
326 if (timeUs >= firstTimeUs) {
327 timeUs -= firstTimeUs;
328 } else {
329 timeUs = 0;
330 }
331 timeUs += mLastSeekTimeUs;
332 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
333 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
334 }
335 return timeUs;
336 }
337
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)338 status_t LiveSession::dequeueAccessUnit(
339 StreamType stream, sp<ABuffer> *accessUnit) {
340 status_t finalResult = OK;
341 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
342
343 ssize_t streamIdx = typeToIndex(stream);
344 if (streamIdx < 0) {
345 return BAD_VALUE;
346 }
347 const char *streamStr = getNameForStream(stream);
348 // Do not let client pull data if we don't have data packets yet.
349 // We might only have a format discontinuity queued without data.
350 // When NuPlayerDecoder dequeues the format discontinuity, it will
351 // immediately try to getFormat. If we return NULL, NuPlayerDecoder
352 // thinks it can do seamless change, so will not shutdown decoder.
353 // When the actual format arrives, it can't handle it and get stuck.
354 if (!packetSource->hasDataBufferAvailable(&finalResult)) {
355 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
356 streamStr, finalResult);
357
358 if (finalResult == OK) {
359 return -EAGAIN;
360 } else {
361 return finalResult;
362 }
363 }
364
365 // Let the client dequeue as long as we have buffers available
366 // Do not make pause/resume decisions here.
367
368 status_t err = packetSource->dequeueAccessUnit(accessUnit);
369
370 if (err == INFO_DISCONTINUITY) {
371 // adaptive streaming, discontinuities in the playlist
372 int32_t type;
373 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
374
375 sp<AMessage> extra;
376 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
377 extra.clear();
378 }
379
380 ALOGI("[%s] read discontinuity of type %d, extra = %s",
381 streamStr,
382 type,
383 extra == NULL ? "NULL" : extra->debugString().c_str());
384 } else if (err == OK) {
385
386 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
387 int64_t timeUs, originalTimeUs;
388 int32_t discontinuitySeq = 0;
389 StreamItem& strm = mStreams[streamIdx];
390 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
391 originalTimeUs = timeUs;
392 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
393 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
394 int64_t offsetTimeUs;
395 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
396 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
397 } else {
398 offsetTimeUs = 0;
399 }
400
401 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
402 && strm.mLastDequeuedTimeUs >= 0) {
403 int64_t firstTimeUs;
404 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
405 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
406 offsetTimeUs += strm.mLastSampleDurationUs;
407 } else {
408 offsetTimeUs += strm.mLastSampleDurationUs;
409 }
410
411 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
412 strm.mCurDiscontinuitySeq = discontinuitySeq;
413 }
414
415 int32_t discard = 0;
416 int64_t firstTimeUs;
417 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
418 int64_t durUs; // approximate sample duration
419 if (timeUs > strm.mLastDequeuedTimeUs) {
420 durUs = timeUs - strm.mLastDequeuedTimeUs;
421 } else {
422 durUs = strm.mLastDequeuedTimeUs - timeUs;
423 }
424 strm.mLastSampleDurationUs = durUs;
425 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
426 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
427 firstTimeUs = timeUs;
428 } else {
429 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
430 firstTimeUs = timeUs;
431 }
432
433 strm.mLastDequeuedTimeUs = timeUs;
434 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
435
436 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
437 streamStr, (long long)timeUs, (long long)originalTimeUs);
438 (*accessUnit)->meta()->setInt64("timeUs", timeUs);
439 mLastDequeuedTimeUs = timeUs;
440 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
441 } else if (stream == STREAMTYPE_SUBTITLES) {
442 int32_t subtitleGeneration;
443 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
444 && subtitleGeneration != mSubtitleGeneration) {
445 return -EAGAIN;
446 };
447 (*accessUnit)->meta()->setInt32(
448 "trackIndex", mPlaylist->getSelectedIndex());
449 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
450 } else if (stream == STREAMTYPE_METADATA) {
451 HLSTime mdTime((*accessUnit)->meta());
452 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
453 packetSource->requeueAccessUnit((*accessUnit));
454 return -EAGAIN;
455 } else {
456 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
457 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
458 (*accessUnit)->meta()->setInt64("timeUs", timeUs);
459 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
460 }
461 }
462 } else {
463 ALOGI("[%s] encountered error %d", streamStr, err);
464 }
465
466 return err;
467 }
468
getStreamFormatMeta(StreamType stream,sp<MetaData> * meta)469 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
470 if (!(mStreamMask & stream)) {
471 return UNKNOWN_ERROR;
472 }
473
474 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
475
476 *meta = packetSource->getFormat();
477
478 if (*meta == NULL) {
479 return -EWOULDBLOCK;
480 }
481
482 if (stream == STREAMTYPE_AUDIO) {
483 // set AAC input buffer size to 32K bytes (256kbps x 1sec)
484 (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
485 } else if (stream == STREAMTYPE_VIDEO) {
486 (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
487 (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
488 }
489
490 return OK;
491 }
492
getHTTPDownloader()493 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
494 return new HTTPDownloader(mHTTPService, mExtraHeaders);
495 }
496
setBufferingSettings(const BufferingSettings & buffering)497 void LiveSession::setBufferingSettings(
498 const BufferingSettings &buffering) {
499 sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
500 writeToAMessage(msg, buffering);
501 msg->post();
502 }
503
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)504 void LiveSession::connectAsync(
505 const char *url, const KeyedVector<String8, String8> *headers) {
506 sp<AMessage> msg = new AMessage(kWhatConnect, this);
507 msg->setString("url", url);
508
509 if (headers != NULL) {
510 msg->setPointer(
511 "headers",
512 new KeyedVector<String8, String8>(*headers));
513 }
514
515 msg->post();
516 }
517
disconnect()518 status_t LiveSession::disconnect() {
519 sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
520
521 sp<AMessage> response;
522 status_t err = msg->postAndAwaitResponse(&response);
523
524 return err;
525 }
526
seekTo(int64_t timeUs,MediaPlayerSeekMode mode)527 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
528 sp<AMessage> msg = new AMessage(kWhatSeek, this);
529 msg->setInt64("timeUs", timeUs);
530 msg->setInt32("mode", mode);
531
532 sp<AMessage> response;
533 status_t err = msg->postAndAwaitResponse(&response);
534
535 return err;
536 }
537
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)538 bool LiveSession::checkSwitchProgress(
539 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
540 AString newUri;
541 CHECK(stopParams->findString("uri", &newUri));
542
543 *needResumeUntil = false;
544 sp<AMessage> firstNewMeta[kMaxStreams];
545 for (size_t i = 0; i < kMaxStreams; ++i) {
546 StreamType stream = indexToType(i);
547 if (!(mSwapMask & mNewStreamMask & stream)
548 || (mStreams[i].mNewUri != newUri)) {
549 continue;
550 }
551 if (stream == STREAMTYPE_SUBTITLES) {
552 continue;
553 }
554 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
555
556 // First, get latest dequeued meta, which is where the decoder is at.
557 // (when upswitching, we take the meta after a certain delay, so that
558 // the decoder is left with some cushion)
559 sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
560 if (delayUs > 0) {
561 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
562 if (lastDequeueMeta == NULL) {
563 // this means we don't have enough cushion, try again later
564 ALOGV("[%s] up switching failed due to insufficient buffer",
565 getNameForStream(stream));
566 return false;
567 }
568 } else {
569 // It's okay for lastDequeueMeta to be NULL here, it means the
570 // decoder hasn't even started dequeueing
571 lastDequeueMeta = source->getLatestDequeuedMeta();
572 }
573 // Then, trim off packets at beginning of mPacketSources2 that's before
574 // the latest dequeued time. These samples are definitely too late.
575 firstNewMeta[i] = mPacketSources2.editValueAt(i)
576 ->trimBuffersBeforeMeta(lastDequeueMeta);
577
578 // Now firstNewMeta[i] is the first sample after the trim.
579 // If it's NULL, we failed because dequeue already past all samples
580 // in mPacketSource2, we have to try again.
581 if (firstNewMeta[i] == NULL) {
582 HLSTime dequeueTime(lastDequeueMeta);
583 ALOGV("[%s] dequeue time (%d, %lld) past start time",
584 getNameForStream(stream),
585 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
586 return false;
587 }
588
589 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
590 // already fetched, and see if we need to resumeUntil
591 lastEnqueueMeta = source->getLatestEnqueuedMeta();
592 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
593 // boundary, no need to resume as the content will look different anyways
594 if (lastEnqueueMeta != NULL) {
595 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
596
597 // no need to resume old fetcher if new fetcher started in different
598 // discontinuity sequence, as the content will look different.
599 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
600 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
601
602 // update the stopTime for resumeUntil
603 stopParams->setInt32("discontinuitySeq", startTime.mSeq);
604 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
605 }
606 }
607
608 // if we're here, it means dequeue progress hasn't passed some samples in
609 // mPacketSource2, we can trim off the excess in mPacketSource.
610 // (old fetcher might still need to resumeUntil the start time of new fetcher)
611 for (size_t i = 0; i < kMaxStreams; ++i) {
612 StreamType stream = indexToType(i);
613 if (!(mSwapMask & mNewStreamMask & stream)
614 || (newUri != mStreams[i].mNewUri)
615 || stream == STREAMTYPE_SUBTITLES) {
616 continue;
617 }
618 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
619 }
620
621 // no resumeUntil if already underflow
622 *needResumeUntil &= !mBuffering;
623
624 return true;
625 }
626
onMessageReceived(const sp<AMessage> & msg)627 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
628 switch (msg->what()) {
629 case kWhatSetBufferingSettings:
630 {
631 readFromAMessage(msg, &mBufferingSettings);
632 break;
633 }
634
635 case kWhatConnect:
636 {
637 onConnect(msg);
638 break;
639 }
640
641 case kWhatDisconnect:
642 {
643 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
644
645 if (mReconfigurationInProgress) {
646 break;
647 }
648
649 finishDisconnect();
650 break;
651 }
652
653 case kWhatSeek:
654 {
655 if (mReconfigurationInProgress) {
656 msg->post(50000);
657 break;
658 }
659
660 CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
661 mSeekReply = new AMessage;
662
663 onSeek(msg);
664 break;
665 }
666
667 case kWhatFetcherNotify:
668 {
669 int32_t what;
670 CHECK(msg->findInt32("what", &what));
671
672 switch (what) {
673 case PlaylistFetcher::kWhatStarted:
674 break;
675 case PlaylistFetcher::kWhatPaused:
676 case PlaylistFetcher::kWhatStopped:
677 {
678 AString uri;
679 CHECK(msg->findString("uri", &uri));
680 ssize_t index = mFetcherInfos.indexOfKey(uri);
681 if (index < 0) {
682 // ignore msgs from fetchers that's already gone
683 break;
684 }
685
686 ALOGV("fetcher-%d %s",
687 mFetcherInfos[index].mFetcher->getFetcherID(),
688 what == PlaylistFetcher::kWhatPaused ?
689 "paused" : "stopped");
690
691 if (what == PlaylistFetcher::kWhatStopped) {
692 mFetcherLooper->unregisterHandler(
693 mFetcherInfos[index].mFetcher->id());
694 mFetcherInfos.removeItemsAt(index);
695 } else if (what == PlaylistFetcher::kWhatPaused) {
696 int32_t seekMode;
697 CHECK(msg->findInt32("seekMode", &seekMode));
698 for (size_t i = 0; i < kMaxStreams; ++i) {
699 if (mStreams[i].mUri == uri) {
700 mStreams[i].mSeekMode = (SeekMode) seekMode;
701 }
702 }
703 }
704
705 if (mContinuation != NULL) {
706 CHECK_GT(mContinuationCounter, 0u);
707 if (--mContinuationCounter == 0) {
708 mContinuation->post();
709 }
710 ALOGV("%zu fetcher(s) left", mContinuationCounter);
711 }
712 break;
713 }
714
715 case PlaylistFetcher::kWhatDurationUpdate:
716 {
717 AString uri;
718 CHECK(msg->findString("uri", &uri));
719
720 int64_t durationUs;
721 CHECK(msg->findInt64("durationUs", &durationUs));
722
723 ssize_t index = mFetcherInfos.indexOfKey(uri);
724 if (index >= 0) {
725 FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
726 info->mDurationUs = durationUs;
727 }
728 break;
729 }
730
731 case PlaylistFetcher::kWhatTargetDurationUpdate:
732 {
733 int64_t targetDurationUs;
734 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
735 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
736 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
737 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
738 break;
739 }
740
741 case PlaylistFetcher::kWhatError:
742 {
743 status_t err;
744 CHECK(msg->findInt32("err", &err));
745
746 ALOGE("XXX Received error %d from PlaylistFetcher.", err);
747
748 // handle EOS on subtitle tracks independently
749 AString uri;
750 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
751 ssize_t i = mFetcherInfos.indexOfKey(uri);
752 if (i >= 0) {
753 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
754 if (fetcher != NULL) {
755 uint32_t type = fetcher->getStreamTypeMask();
756 if (type == STREAMTYPE_SUBTITLES) {
757 mPacketSources.valueFor(
758 STREAMTYPE_SUBTITLES)->signalEOS(err);;
759 break;
760 }
761 }
762 }
763 }
764
765 // remember the failure index (as mCurBandwidthIndex will be restored
766 // after cancelBandwidthSwitch()), and record last fail time
767 size_t failureIndex = mCurBandwidthIndex;
768 mBandwidthItems.editItemAt(
769 failureIndex).mLastFailureUs = ALooper::GetNowUs();
770
771 if (mSwitchInProgress) {
772 // if error happened when we switch to a variant, try fallback
773 // to other variant to save the session
774 if (tryBandwidthFallback()) {
775 break;
776 }
777 }
778
779 if (mInPreparationPhase) {
780 postPrepared(err);
781 }
782
783 cancelBandwidthSwitch();
784
785 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
786
787 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
788
789 mPacketSources.valueFor(
790 STREAMTYPE_SUBTITLES)->signalEOS(err);
791
792 postError(err);
793 break;
794 }
795
796 case PlaylistFetcher::kWhatStopReached:
797 {
798 ALOGV("kWhatStopReached");
799
800 AString oldUri;
801 CHECK(msg->findString("uri", &oldUri));
802
803 ssize_t index = mFetcherInfos.indexOfKey(oldUri);
804 if (index < 0) {
805 break;
806 }
807
808 tryToFinishBandwidthSwitch(oldUri);
809 break;
810 }
811
812 case PlaylistFetcher::kWhatStartedAt:
813 {
814 int32_t switchGeneration;
815 CHECK(msg->findInt32("switchGeneration", &switchGeneration));
816
817 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
818 switchGeneration, mSwitchGeneration);
819
820 if (switchGeneration != mSwitchGeneration) {
821 break;
822 }
823
824 AString uri;
825 CHECK(msg->findString("uri", &uri));
826
827 // mark new fetcher mToBeResumed
828 ssize_t index = mFetcherInfos.indexOfKey(uri);
829 if (index >= 0) {
830 mFetcherInfos.editValueAt(index).mToBeResumed = true;
831 }
832
833 // temporarily disable packet sources to be swapped to prevent
834 // NuPlayerDecoder from dequeuing while we check progress
835 for (size_t i = 0; i < mPacketSources.size(); ++i) {
836 if ((mSwapMask & mPacketSources.keyAt(i))
837 && uri == mStreams[i].mNewUri) {
838 mPacketSources.editValueAt(i)->enable(false);
839 }
840 }
841 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
842 // If switching up, require a cushion bigger than kUnderflowMark
843 // to avoid buffering immediately after the switch.
844 // (If we don't have that cushion we'd rather cancel and try again.)
845 int64_t delayUs =
846 switchUp ?
847 (kUnderflowMarkMs * 1000ll + 1000000ll)
848 : 0;
849 bool needResumeUntil = false;
850 sp<AMessage> stopParams = msg;
851 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
852 // playback time hasn't passed startAt time
853 if (!needResumeUntil) {
854 ALOGV("finish switch");
855 for (size_t i = 0; i < kMaxStreams; ++i) {
856 if ((mSwapMask & indexToType(i))
857 && uri == mStreams[i].mNewUri) {
858 // have to make a copy of mStreams[i].mUri because
859 // tryToFinishBandwidthSwitch is modifying mStreams[]
860 AString oldURI = mStreams[i].mUri;
861 tryToFinishBandwidthSwitch(oldURI);
862 break;
863 }
864 }
865 } else {
866 // startAt time is after last enqueue time
867 // Resume fetcher for the original variant; the resumed fetcher should
868 // continue until the timestamps found in msg, which is stored by the
869 // new fetcher to indicate where the new variant has started buffering.
870 ALOGV("finish switch with resumeUntilAsync");
871 for (size_t i = 0; i < mFetcherInfos.size(); i++) {
872 const FetcherInfo &info = mFetcherInfos.valueAt(i);
873 if (info.mToBeRemoved) {
874 info.mFetcher->resumeUntilAsync(stopParams);
875 }
876 }
877 }
878 } else {
879 // playback time passed startAt time
880 if (switchUp) {
881 // if switching up, cancel and retry if condition satisfies again
882 ALOGV("cancel up switch because we're too late");
883 cancelBandwidthSwitch(true /* resume */);
884 } else {
885 ALOGV("retry down switch at next sample");
886 resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
887 }
888 }
889 // re-enable all packet sources
890 for (size_t i = 0; i < mPacketSources.size(); ++i) {
891 mPacketSources.editValueAt(i)->enable(true);
892 }
893
894 break;
895 }
896
897 case PlaylistFetcher::kWhatPlaylistFetched:
898 {
899 onMasterPlaylistFetched(msg);
900 break;
901 }
902
903 case PlaylistFetcher::kWhatMetadataDetected:
904 {
905 if (!mHasMetadata) {
906 mHasMetadata = true;
907 sp<AMessage> notify = mNotify->dup();
908 notify->setInt32("what", kWhatMetadataDetected);
909 notify->post();
910 }
911 break;
912 }
913
914 default:
915 TRESPASS();
916 }
917
918 break;
919 }
920
921 case kWhatChangeConfiguration:
922 {
923 onChangeConfiguration(msg);
924 break;
925 }
926
927 case kWhatChangeConfiguration2:
928 {
929 onChangeConfiguration2(msg);
930 break;
931 }
932
933 case kWhatChangeConfiguration3:
934 {
935 onChangeConfiguration3(msg);
936 break;
937 }
938
939 case kWhatPollBuffering:
940 {
941 int32_t generation;
942 CHECK(msg->findInt32("generation", &generation));
943 if (generation == mPollBufferingGeneration) {
944 onPollBuffering();
945 }
946 break;
947 }
948
949 default:
950 TRESPASS();
951 break;
952 }
953 }
954
955 // static
isBandwidthValid(const BandwidthItem & item)956 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
957 static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
958 return item.mLastFailureUs < 0
959 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
960 }
961
962 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)963 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
964 if (a->mBandwidth < b->mBandwidth) {
965 return -1;
966 } else if (a->mBandwidth == b->mBandwidth) {
967 return 0;
968 }
969
970 return 1;
971 }
972
973 // static
indexToType(int idx)974 LiveSession::StreamType LiveSession::indexToType(int idx) {
975 CHECK(idx >= 0 && idx < kNumSources);
976 return (StreamType)(1 << idx);
977 }
978
979 // static
typeToIndex(int32_t type)980 ssize_t LiveSession::typeToIndex(int32_t type) {
981 switch (type) {
982 case STREAMTYPE_AUDIO:
983 return 0;
984 case STREAMTYPE_VIDEO:
985 return 1;
986 case STREAMTYPE_SUBTITLES:
987 return 2;
988 case STREAMTYPE_METADATA:
989 return 3;
990 default:
991 return -1;
992 };
993 return -1;
994 }
995
onConnect(const sp<AMessage> & msg)996 void LiveSession::onConnect(const sp<AMessage> &msg) {
997 CHECK(msg->findString("url", &mMasterURL));
998
999 // TODO currently we don't know if we are coming here from incognito mode
1000 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
1001
1002 KeyedVector<String8, String8> *headers = NULL;
1003 if (!msg->findPointer("headers", (void **)&headers)) {
1004 mExtraHeaders.clear();
1005 } else {
1006 mExtraHeaders = *headers;
1007
1008 delete headers;
1009 headers = NULL;
1010 }
1011
1012 // create looper for fetchers
1013 if (mFetcherLooper == NULL) {
1014 mFetcherLooper = new ALooper();
1015
1016 mFetcherLooper->setName("Fetcher");
1017 mFetcherLooper->start(false, /* runOnCallingThread */
1018 true /* canCallJava */);
1019 }
1020
1021 // create fetcher to fetch the master playlist
1022 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1023 }
1024
onMasterPlaylistFetched(const sp<AMessage> & msg)1025 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1026 AString uri;
1027 CHECK(msg->findString("uri", &uri));
1028 ssize_t index = mFetcherInfos.indexOfKey(uri);
1029 if (index < 0) {
1030 ALOGW("fetcher for master playlist is gone.");
1031 return;
1032 }
1033
1034 // no longer useful, remove
1035 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1036 mFetcherInfos.removeItemsAt(index);
1037
1038 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1039 if (mPlaylist == NULL) {
1040 ALOGE("unable to fetch master playlist %s.",
1041 uriDebugString(mMasterURL).c_str());
1042
1043 postPrepared(ERROR_IO);
1044 return;
1045 }
1046 // We trust the content provider to make a reasonable choice of preferred
1047 // initial bandwidth by listing it first in the variant playlist.
1048 // At startup we really don't have a good estimate on the available
1049 // network bandwidth since we haven't tranferred any data yet. Once
1050 // we have we can make a better informed choice.
1051 size_t initialBandwidth = 0;
1052 size_t initialBandwidthIndex = 0;
1053
1054 int32_t maxWidth = 0;
1055 int32_t maxHeight = 0;
1056
1057 if (mPlaylist->isVariantPlaylist()) {
1058 Vector<BandwidthItem> itemsWithVideo;
1059 for (size_t i = 0; i < mPlaylist->size(); ++i) {
1060 BandwidthItem item;
1061
1062 item.mPlaylistIndex = i;
1063 item.mLastFailureUs = -1ll;
1064
1065 sp<AMessage> meta;
1066 AString uri;
1067 mPlaylist->itemAt(i, &uri, &meta);
1068
1069 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1070
1071 int32_t width, height;
1072 if (meta->findInt32("width", &width)) {
1073 maxWidth = max(maxWidth, width);
1074 }
1075 if (meta->findInt32("height", &height)) {
1076 maxHeight = max(maxHeight, height);
1077 }
1078
1079 mBandwidthItems.push(item);
1080 if (mPlaylist->hasType(i, "video")) {
1081 itemsWithVideo.push(item);
1082 }
1083 }
1084 // remove the audio-only variants if we have at least one with video
1085 if (!itemsWithVideo.empty()
1086 && itemsWithVideo.size() < mBandwidthItems.size()) {
1087 mBandwidthItems.clear();
1088 for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1089 mBandwidthItems.push(itemsWithVideo[i]);
1090 }
1091 }
1092
1093 CHECK_GT(mBandwidthItems.size(), 0u);
1094 initialBandwidth = mBandwidthItems[0].mBandwidth;
1095
1096 mBandwidthItems.sort(SortByBandwidth);
1097
1098 for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1099 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1100 initialBandwidthIndex = i;
1101 break;
1102 }
1103 }
1104 } else {
1105 // dummy item.
1106 BandwidthItem item;
1107 item.mPlaylistIndex = 0;
1108 item.mBandwidth = 0;
1109 mBandwidthItems.push(item);
1110 }
1111
1112 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1113 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1114
1115 mPlaylist->pickRandomMediaItems();
1116 changeConfiguration(
1117 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1118 }
1119
finishDisconnect()1120 void LiveSession::finishDisconnect() {
1121 ALOGV("finishDisconnect");
1122
1123 // No reconfiguration is currently pending, make sure none will trigger
1124 // during disconnection either.
1125 cancelBandwidthSwitch();
1126
1127 // cancel buffer polling
1128 cancelPollBuffering();
1129
1130 // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1131 //
1132 // Some fetchers might be stuck in connect/getSize at this point. These
1133 // operations will eventually timeout (as we have a timeout set in
1134 // MediaHTTPConnection), but we don't want to block the main UI thread
1135 // until then. Here we just need to make sure we clear all references
1136 // to the fetchers, so that when they finally exit from the blocking
1137 // operation, they can be destructed.
1138 //
1139 // There is one very tricky point though. For this scheme to work, the
1140 // fecther must hold a reference to LiveSession, so that LiveSession is
1141 // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1142 // own destructor when it waits for mFetcherLooper to stop, which still
1143 // blocks main UI thread.
1144 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1145 mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1146 mFetcherLooper->unregisterHandler(
1147 mFetcherInfos.valueAt(i).mFetcher->id());
1148 }
1149 mFetcherInfos.clear();
1150
1151 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1152 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1153
1154 mPacketSources.valueFor(
1155 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1156
1157 sp<AMessage> response = new AMessage;
1158 response->setInt32("err", OK);
1159
1160 response->postReply(mDisconnectReplyID);
1161 mDisconnectReplyID.clear();
1162 }
1163
addFetcher(const char * uri)1164 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1165 ssize_t index = mFetcherInfos.indexOfKey(uri);
1166
1167 if (index >= 0) {
1168 return NULL;
1169 }
1170
1171 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1172 notify->setString("uri", uri);
1173 notify->setInt32("switchGeneration", mSwitchGeneration);
1174
1175 FetcherInfo info;
1176 info.mFetcher = new PlaylistFetcher(
1177 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1178 info.mDurationUs = -1ll;
1179 info.mToBeRemoved = false;
1180 info.mToBeResumed = false;
1181 mFetcherLooper->registerHandler(info.mFetcher);
1182
1183 mFetcherInfos.add(uri, info);
1184
1185 return info.mFetcher;
1186 }
1187
1188 #if 0
1189 static double uniformRand() {
1190 return (double)rand() / RAND_MAX;
1191 }
1192 #endif
1193
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1194 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1195 ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1196 newUri ? "true" : "false",
1197 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1198 return i >= 0
1199 && ((!newUri && uri == mStreams[i].mUri)
1200 || (newUri && uri == mStreams[i].mNewUri));
1201 }
1202
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1203 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1204 size_t trackIndex, bool newUri) {
1205 StreamType type = indexToType(trackIndex);
1206 sp<AnotherPacketSource> source = NULL;
1207 if (newUri) {
1208 source = mPacketSources2.valueFor(type);
1209 source->clear();
1210 } else {
1211 source = mPacketSources.valueFor(type);
1212 };
1213 return source;
1214 }
1215
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1216 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1217 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1218 // todo: One case where the following strategy can fail is when audio and video
1219 // are in separate playlists, both are transport streams, and the metadata
1220 // is actually contained in the audio stream.
1221 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1222 streamMask, newUri ? "true" : "false");
1223
1224 if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1225 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1226 // ... audio fetcher for audio only variant
1227 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1228 }
1229
1230 return NULL;
1231 }
1232
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1233 bool LiveSession::resumeFetcher(
1234 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1235 ssize_t index = mFetcherInfos.indexOfKey(uri);
1236 if (index < 0) {
1237 ALOGE("did not find fetcher for uri: %s", uri.c_str());
1238 return false;
1239 }
1240
1241 bool resume = false;
1242 sp<AnotherPacketSource> sources[kNumSources];
1243 for (size_t i = 0; i < kMaxStreams; ++i) {
1244 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1245 resume = true;
1246 sources[i] = getPacketSourceForStreamIndex(i, newUri);
1247 }
1248 }
1249
1250 if (resume) {
1251 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1252 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1253
1254 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1255 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1256
1257 fetcher->startAsync(
1258 sources[kAudioIndex],
1259 sources[kVideoIndex],
1260 sources[kSubtitleIndex],
1261 getMetadataSource(sources, streamMask, newUri),
1262 timeUs, -1, -1, seekMode);
1263 }
1264
1265 return resume;
1266 }
1267
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1268 float LiveSession::getAbortThreshold(
1269 ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1270 float abortThreshold = -1.0f;
1271 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1272 /*
1273 If we're switching down, we need to decide whether to
1274
1275 1) finish last segment of high-bandwidth variant, or
1276 2) abort last segment of high-bandwidth variant, and fetch an
1277 overlapping portion from low-bandwidth variant.
1278
1279 Here we try to maximize the amount of buffer left when the
1280 switch point is met. Given the following parameters:
1281
1282 B: our current buffering level in seconds
1283 T: target duration in seconds
1284 X: sample duration in seconds remain to fetch in last segment
1285 bw0: bandwidth of old variant (as specified in playlist)
1286 bw1: bandwidth of new variant (as specified in playlist)
1287 bw: measured bandwidth available
1288
1289 If we choose 1), when switch happens at the end of current
1290 segment, our buffering will be
1291 B + X - X * bw0 / bw
1292
1293 If we choose 2), when switch happens where we aborted current
1294 segment, our buffering will be
1295 B - (T - X) * bw1 / bw
1296
1297 We should only choose 1) if
1298 X/T < bw1 / (bw1 + bw0 - bw)
1299 */
1300
1301 // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1302 // our estimate could be far off, and fetching old bandwidth could
1303 // take too long.
1304 if (!mLastBandwidthStable) {
1305 return 0.0f;
1306 }
1307
1308 // Taking the measured current bandwidth at 50% face value only,
1309 // as our bandwidth estimation is a lagging indicator. Being
1310 // conservative on this, we prefer switching to lower bandwidth
1311 // unless we're really confident finishing up the last segment
1312 // of higher bandwidth will be fast.
1313 CHECK(mLastBandwidthBps >= 0);
1314 abortThreshold =
1315 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1316 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1317 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1318 - (float)mLastBandwidthBps * 0.5f);
1319 if (abortThreshold < 0.0f) {
1320 abortThreshold = -1.0f; // do not abort
1321 }
1322 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1323 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1324 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1325 mLastBandwidthBps,
1326 abortThreshold);
1327 }
1328 return abortThreshold;
1329 }
1330
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1331 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1332 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1333 }
1334
getLowestValidBandwidthIndex() const1335 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1336 for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1337 if (isBandwidthValid(mBandwidthItems[index])) {
1338 return index;
1339 }
1340 }
1341 // if playlists are all blacklisted, return 0 and hope it's alive
1342 return 0;
1343 }
1344
getBandwidthIndex(int32_t bandwidthBps)1345 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1346 if (mBandwidthItems.size() < 2) {
1347 // shouldn't be here if we only have 1 bandwidth, check
1348 // logic to get rid of redundant bandwidth polling
1349 ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1350 return 0;
1351 }
1352
1353 #if 1
1354 char value[PROPERTY_VALUE_MAX];
1355 ssize_t index = -1;
1356 if (property_get("media.httplive.bw-index", value, NULL)) {
1357 char *end;
1358 index = strtol(value, &end, 10);
1359 CHECK(end > value && *end == '\0');
1360
1361 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1362 index = mBandwidthItems.size() - 1;
1363 }
1364 }
1365
1366 if (index < 0) {
1367 char value[PROPERTY_VALUE_MAX];
1368 if (property_get("media.httplive.max-bw", value, NULL)) {
1369 char *end;
1370 long maxBw = strtoul(value, &end, 10);
1371 if (end > value && *end == '\0') {
1372 if (maxBw > 0 && bandwidthBps > maxBw) {
1373 ALOGV("bandwidth capped to %ld bps", maxBw);
1374 bandwidthBps = maxBw;
1375 }
1376 }
1377 }
1378
1379 // Pick the highest bandwidth stream that's not currently blacklisted
1380 // below or equal to estimated bandwidth.
1381
1382 index = mBandwidthItems.size() - 1;
1383 ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1384 while (index > lowestBandwidth) {
1385 // be conservative (70%) to avoid overestimating and immediately
1386 // switching down again.
1387 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1388 const BandwidthItem &item = mBandwidthItems[index];
1389 if (item.mBandwidth <= adjustedBandwidthBps
1390 && isBandwidthValid(item)) {
1391 break;
1392 }
1393 --index;
1394 }
1395 }
1396 #elif 0
1397 // Change bandwidth at random()
1398 size_t index = uniformRand() * mBandwidthItems.size();
1399 #elif 0
1400 // There's a 50% chance to stay on the current bandwidth and
1401 // a 50% chance to switch to the next higher bandwidth (wrapping around
1402 // to lowest)
1403 const size_t kMinIndex = 0;
1404
1405 static ssize_t mCurBandwidthIndex = -1;
1406
1407 size_t index;
1408 if (mCurBandwidthIndex < 0) {
1409 index = kMinIndex;
1410 } else if (uniformRand() < 0.5) {
1411 index = (size_t)mCurBandwidthIndex;
1412 } else {
1413 index = mCurBandwidthIndex + 1;
1414 if (index == mBandwidthItems.size()) {
1415 index = kMinIndex;
1416 }
1417 }
1418 mCurBandwidthIndex = index;
1419 #elif 0
1420 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1421
1422 size_t index = mBandwidthItems.size() - 1;
1423 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1424 --index;
1425 }
1426 #elif 1
1427 char value[PROPERTY_VALUE_MAX];
1428 size_t index;
1429 if (property_get("media.httplive.bw-index", value, NULL)) {
1430 char *end;
1431 index = strtoul(value, &end, 10);
1432 CHECK(end > value && *end == '\0');
1433
1434 if (index >= mBandwidthItems.size()) {
1435 index = mBandwidthItems.size() - 1;
1436 }
1437 } else {
1438 index = 0;
1439 }
1440 #else
1441 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream
1442 #endif
1443
1444 CHECK_GE(index, 0);
1445
1446 return index;
1447 }
1448
latestMediaSegmentStartTime() const1449 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1450 HLSTime audioTime(mPacketSources.valueFor(
1451 STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1452
1453 HLSTime videoTime(mPacketSources.valueFor(
1454 STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1455
1456 return audioTime < videoTime ? videoTime : audioTime;
1457 }
1458
onSeek(const sp<AMessage> & msg)1459 void LiveSession::onSeek(const sp<AMessage> &msg) {
1460 int64_t timeUs;
1461 int32_t mode;
1462 CHECK(msg->findInt64("timeUs", &timeUs));
1463 CHECK(msg->findInt32("mode", &mode));
1464 // TODO: add "mode" to changeConfiguration.
1465 changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1466 }
1467
getDuration(int64_t * durationUs) const1468 status_t LiveSession::getDuration(int64_t *durationUs) const {
1469 int64_t maxDurationUs = -1ll;
1470 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1471 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1472
1473 if (fetcherDurationUs > maxDurationUs) {
1474 maxDurationUs = fetcherDurationUs;
1475 }
1476 }
1477
1478 *durationUs = maxDurationUs;
1479
1480 return OK;
1481 }
1482
isSeekable() const1483 bool LiveSession::isSeekable() const {
1484 int64_t durationUs;
1485 return getDuration(&durationUs) == OK && durationUs >= 0;
1486 }
1487
hasDynamicDuration() const1488 bool LiveSession::hasDynamicDuration() const {
1489 return false;
1490 }
1491
getTrackCount() const1492 size_t LiveSession::getTrackCount() const {
1493 if (mPlaylist == NULL) {
1494 return 0;
1495 } else {
1496 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1497 }
1498 }
1499
getTrackInfo(size_t trackIndex) const1500 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1501 if (mPlaylist == NULL) {
1502 return NULL;
1503 } else {
1504 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1505 sp<AMessage> format = new AMessage();
1506 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1507 format->setString("language", "und");
1508 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1509 return format;
1510 }
1511 return mPlaylist->getTrackInfo(trackIndex);
1512 }
1513 }
1514
selectTrack(size_t index,bool select)1515 status_t LiveSession::selectTrack(size_t index, bool select) {
1516 if (mPlaylist == NULL) {
1517 return INVALID_OPERATION;
1518 }
1519
1520 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1521 index, select, mSubtitleGeneration);
1522
1523 ++mSubtitleGeneration;
1524 status_t err = mPlaylist->selectTrack(index, select);
1525 if (err == OK) {
1526 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1527 msg->setInt32("pickTrack", select);
1528 msg->post();
1529 }
1530 return err;
1531 }
1532
getSelectedTrack(media_track_type type) const1533 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1534 if (mPlaylist == NULL) {
1535 return -1;
1536 } else {
1537 return mPlaylist->getSelectedTrack(type);
1538 }
1539 }
1540
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1541 void LiveSession::changeConfiguration(
1542 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1543 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1544 (long long)timeUs, bandwidthIndex, pickTrack);
1545
1546 cancelBandwidthSwitch();
1547
1548 CHECK(!mReconfigurationInProgress);
1549 mReconfigurationInProgress = true;
1550 if (bandwidthIndex >= 0) {
1551 mOrigBandwidthIndex = mCurBandwidthIndex;
1552 mCurBandwidthIndex = bandwidthIndex;
1553 if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1554 ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1555 mOrigBandwidthIndex, mCurBandwidthIndex);
1556 }
1557 }
1558 CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size());
1559 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1560
1561 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1562 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1563
1564 AString URIs[kMaxStreams];
1565 for (size_t i = 0; i < kMaxStreams; ++i) {
1566 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1567 streamMask |= indexToType(i);
1568 }
1569 }
1570
1571 // Step 1, stop and discard fetchers that are no longer needed.
1572 // Pause those that we'll reuse.
1573 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1574 // skip fetchers that are marked mToBeRemoved,
1575 // these are done and can't be reused
1576 if (mFetcherInfos[i].mToBeRemoved) {
1577 continue;
1578 }
1579
1580 const AString &uri = mFetcherInfos.keyAt(i);
1581 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1582
1583 bool discardFetcher = true, delayRemoval = false;
1584 for (size_t j = 0; j < kMaxStreams; ++j) {
1585 StreamType type = indexToType(j);
1586 if ((streamMask & type) && uri == URIs[j]) {
1587 resumeMask |= type;
1588 streamMask &= ~type;
1589 discardFetcher = false;
1590 }
1591 }
1592 // Delay fetcher removal if not picking tracks, AND old fetcher
1593 // has stream mask that overlaps new variant. (Okay to discard
1594 // old fetcher now, if completely no overlap.)
1595 if (discardFetcher && timeUs < 0ll && !pickTrack
1596 && (fetcher->getStreamTypeMask() & streamMask)) {
1597 discardFetcher = false;
1598 delayRemoval = true;
1599 }
1600
1601 if (discardFetcher) {
1602 ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1603 fetcher->stopAsync();
1604 } else {
1605 float threshold = 0.0f; // default to pause after current block (47Kbytes)
1606 bool disconnect = false;
1607 if (timeUs >= 0ll) {
1608 // seeking, no need to finish fetching
1609 disconnect = true;
1610 } else if (delayRemoval) {
1611 // adapting, abort if remaining of current segment is over threshold
1612 threshold = getAbortThreshold(
1613 mOrigBandwidthIndex, mCurBandwidthIndex);
1614 }
1615
1616 ALOGV("pausing fetcher-%d, threshold=%.2f",
1617 fetcher->getFetcherID(), threshold);
1618 fetcher->pauseAsync(threshold, disconnect);
1619 }
1620 }
1621
1622 sp<AMessage> msg;
1623 if (timeUs < 0ll) {
1624 // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1625 msg = new AMessage(kWhatChangeConfiguration3, this);
1626 } else {
1627 msg = new AMessage(kWhatChangeConfiguration2, this);
1628 }
1629 msg->setInt32("streamMask", streamMask);
1630 msg->setInt32("resumeMask", resumeMask);
1631 msg->setInt32("pickTrack", pickTrack);
1632 msg->setInt64("timeUs", timeUs);
1633 for (size_t i = 0; i < kMaxStreams; ++i) {
1634 if ((streamMask | resumeMask) & indexToType(i)) {
1635 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1636 }
1637 }
1638
1639 // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1640 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1641 // fetchers have completed their asynchronous operation, we'll post
1642 // mContinuation, which then is handled below in onChangeConfiguration2.
1643 mContinuationCounter = mFetcherInfos.size();
1644 mContinuation = msg;
1645
1646 if (mContinuationCounter == 0) {
1647 msg->post();
1648 }
1649 }
1650
onChangeConfiguration(const sp<AMessage> & msg)1651 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1652 ALOGV("onChangeConfiguration");
1653
1654 if (!mReconfigurationInProgress) {
1655 int32_t pickTrack = 0;
1656 msg->findInt32("pickTrack", &pickTrack);
1657 changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1658 } else {
1659 msg->post(1000000ll); // retry in 1 sec
1660 }
1661 }
1662
onChangeConfiguration2(const sp<AMessage> & msg)1663 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1664 ALOGV("onChangeConfiguration2");
1665
1666 mContinuation.clear();
1667
1668 // All fetchers are either suspended or have been removed now.
1669
1670 // If we're seeking, clear all packet sources before we report
1671 // seek complete, to prevent decoder from pulling stale data.
1672 int64_t timeUs;
1673 CHECK(msg->findInt64("timeUs", &timeUs));
1674
1675 if (timeUs >= 0) {
1676 mLastSeekTimeUs = timeUs;
1677 mLastDequeuedTimeUs = timeUs;
1678
1679 for (size_t i = 0; i < mPacketSources.size(); i++) {
1680 sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1681 sp<MetaData> format = packetSource->getFormat();
1682 packetSource->clear();
1683 // Set a tentative format here such that HTTPLiveSource will always have
1684 // a format available when NuPlayer queries. Without an available video
1685 // format when setting a surface NuPlayer might disable video decoding
1686 // altogether. The tentative format will be overwritten by the
1687 // authoritative (and possibly same) format once content from the new
1688 // position is dequeued.
1689 packetSource->setFormat(format);
1690 }
1691
1692 for (size_t i = 0; i < kMaxStreams; ++i) {
1693 mStreams[i].reset();
1694 }
1695
1696 mDiscontinuityOffsetTimesUs.clear();
1697 mDiscontinuityAbsStartTimesUs.clear();
1698
1699 if (mSeekReplyID != NULL) {
1700 CHECK(mSeekReply != NULL);
1701 mSeekReply->setInt32("err", OK);
1702 mSeekReply->postReply(mSeekReplyID);
1703 mSeekReplyID.clear();
1704 mSeekReply.clear();
1705 }
1706
1707 // restart buffer polling after seek becauese previous
1708 // buffering position is no longer valid.
1709 restartPollBuffering();
1710 }
1711
1712 uint32_t streamMask, resumeMask;
1713 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1714 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1715
1716 streamMask |= resumeMask;
1717
1718 AString URIs[kMaxStreams];
1719 for (size_t i = 0; i < kMaxStreams; ++i) {
1720 if (streamMask & indexToType(i)) {
1721 const AString &uriKey = mStreams[i].uriKey();
1722 CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1723 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1724 }
1725 }
1726
1727 uint32_t changedMask = 0;
1728 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1729 // stream URI could change even if onChangeConfiguration2 is only
1730 // used for seek. Seek could happen during a bw switch, in this
1731 // case bw switch will be cancelled, but the seekTo position will
1732 // fetch from the new URI.
1733 if ((mStreamMask & streamMask & indexToType(i))
1734 && !mStreams[i].mUri.empty()
1735 && !(URIs[i] == mStreams[i].mUri)) {
1736 ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1737 mStreams[i].mUri.c_str(), URIs[i].c_str());
1738 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1739 if (source->getLatestDequeuedMeta() != NULL) {
1740 source->queueDiscontinuity(
1741 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1742 }
1743 }
1744 // Determine which decoders to shutdown on the player side,
1745 // a decoder has to be shutdown if its streamtype was active
1746 // before but now longer isn't.
1747 if ((mStreamMask & ~streamMask & indexToType(i))) {
1748 changedMask |= indexToType(i);
1749 }
1750 }
1751
1752 if (changedMask == 0) {
1753 // If nothing changed as far as the audio/video decoders
1754 // are concerned we can proceed.
1755 onChangeConfiguration3(msg);
1756 return;
1757 }
1758
1759 // Something changed, inform the player which will shutdown the
1760 // corresponding decoders and will post the reply once that's done.
1761 // Handling the reply will continue executing below in
1762 // onChangeConfiguration3.
1763 sp<AMessage> notify = mNotify->dup();
1764 notify->setInt32("what", kWhatStreamsChanged);
1765 notify->setInt32("changedMask", changedMask);
1766
1767 msg->setWhat(kWhatChangeConfiguration3);
1768 msg->setTarget(this);
1769
1770 notify->setMessage("reply", msg);
1771 notify->post();
1772 }
1773
onChangeConfiguration3(const sp<AMessage> & msg)1774 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1775 mContinuation.clear();
1776 // All remaining fetchers are still suspended, the player has shutdown
1777 // any decoders that needed it.
1778
1779 uint32_t streamMask, resumeMask;
1780 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1781 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1782
1783 mNewStreamMask = streamMask | resumeMask;
1784
1785 int64_t timeUs;
1786 int32_t pickTrack;
1787 bool switching = false;
1788 CHECK(msg->findInt64("timeUs", &timeUs));
1789 CHECK(msg->findInt32("pickTrack", &pickTrack));
1790
1791 if (timeUs < 0ll) {
1792 if (!pickTrack) {
1793 // mSwapMask contains streams that are in both old and new variant,
1794 // (in mNewStreamMask & mStreamMask) but with different URIs
1795 // (not in resumeMask).
1796 // For example, old variant has video and audio in two separate
1797 // URIs, and new variant has only audio with unchanged URI. mSwapMask
1798 // should be 0 as there is nothing to swap. We only need to stop video,
1799 // and resume audio.
1800 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask;
1801 switching = (mSwapMask != 0);
1802 }
1803 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1804 } else {
1805 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1806 }
1807
1808 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1809 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1810 (long long)timeUs, switching, pickTrack,
1811 mStreamMask, mNewStreamMask, mSwapMask);
1812
1813 for (size_t i = 0; i < kMaxStreams; ++i) {
1814 if (streamMask & indexToType(i)) {
1815 if (switching) {
1816 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1817 } else {
1818 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1819 }
1820 }
1821 }
1822
1823 // Of all existing fetchers:
1824 // * Resume fetchers that are still needed and assign them original packet sources.
1825 // * Mark otherwise unneeded fetchers for removal.
1826 ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1827 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1828 const AString &uri = mFetcherInfos.keyAt(i);
1829 if (!resumeFetcher(uri, resumeMask, timeUs)) {
1830 ALOGV("marking fetcher-%d to be removed",
1831 mFetcherInfos[i].mFetcher->getFetcherID());
1832
1833 mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1834 }
1835 }
1836
1837 // streamMask now only contains the types that need a new fetcher created.
1838 if (streamMask != 0) {
1839 ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1840 }
1841
1842 // Find out when the original fetchers have buffered up to and start the new fetchers
1843 // at a later timestamp.
1844 for (size_t i = 0; i < kMaxStreams; i++) {
1845 if (!(indexToType(i) & streamMask)) {
1846 continue;
1847 }
1848
1849 AString uri;
1850 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1851
1852 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1853 CHECK(fetcher != NULL);
1854
1855 HLSTime startTime;
1856 SeekMode seekMode = kSeekModeExactPosition;
1857 sp<AnotherPacketSource> sources[kNumSources];
1858
1859 if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1860 startTime = latestMediaSegmentStartTime();
1861 }
1862
1863 // TRICKY: looping from i as earlier streams are already removed from streamMask
1864 for (size_t j = i; j < kMaxStreams; ++j) {
1865 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1866 if ((streamMask & indexToType(j)) && uri == streamUri) {
1867 sources[j] = mPacketSources.valueFor(indexToType(j));
1868
1869 if (timeUs >= 0) {
1870 startTime.mTimeUs = timeUs;
1871 } else {
1872 int32_t type;
1873 sp<AMessage> meta;
1874 if (!switching) {
1875 // selecting, or adapting but no swap required
1876 meta = sources[j]->getLatestDequeuedMeta();
1877 } else {
1878 // adapting and swap required
1879 meta = sources[j]->getLatestEnqueuedMeta();
1880 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1881 // switching up
1882 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1883 }
1884 }
1885
1886 if ((j == kAudioIndex || j == kVideoIndex)
1887 && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1888 HLSTime tmpTime(meta);
1889 if (startTime < tmpTime) {
1890 startTime = tmpTime;
1891 }
1892 }
1893
1894 if (!switching) {
1895 // selecting, or adapting but no swap required
1896 sources[j]->clear();
1897 if (j == kSubtitleIndex) {
1898 break;
1899 }
1900
1901 ALOGV("stream[%zu]: queue format change", j);
1902 sources[j]->queueDiscontinuity(
1903 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1904 } else {
1905 // switching, queue discontinuities after resume
1906 sources[j] = mPacketSources2.valueFor(indexToType(j));
1907 sources[j]->clear();
1908 // the new fetcher might be providing streams that used to be
1909 // provided by two different fetchers, if one of the fetcher
1910 // paused in the middle while the other somehow paused in next
1911 // seg, we have to start from next seg.
1912 if (seekMode < mStreams[j].mSeekMode) {
1913 seekMode = mStreams[j].mSeekMode;
1914 }
1915 }
1916 }
1917
1918 streamMask &= ~indexToType(j);
1919 }
1920 }
1921
1922 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1923 "segmentStartTimeUs %lld seekMode %d",
1924 fetcher->getFetcherID(),
1925 (long long)startTime.mTimeUs,
1926 (long long)mLastSeekTimeUs,
1927 (long long)startTime.getSegmentTimeUs(),
1928 seekMode);
1929
1930 // Set the target segment start time to the middle point of the
1931 // segment where the last sample was.
1932 // This gives a better guess if segments of the two variants are not
1933 // perfectly aligned. (If the corresponding segment in new variant
1934 // starts slightly later than that in the old variant, we still want
1935 // to pick that segment, not the one before)
1936 fetcher->startAsync(
1937 sources[kAudioIndex],
1938 sources[kVideoIndex],
1939 sources[kSubtitleIndex],
1940 getMetadataSource(sources, mNewStreamMask, switching),
1941 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1942 startTime.getSegmentTimeUs(),
1943 startTime.mSeq,
1944 seekMode);
1945 }
1946
1947 // All fetchers have now been started, the configuration change
1948 // has completed.
1949
1950 mReconfigurationInProgress = false;
1951 if (switching) {
1952 mSwitchInProgress = true;
1953 } else {
1954 mStreamMask = mNewStreamMask;
1955 if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1956 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1957 mOrigBandwidthIndex, mCurBandwidthIndex);
1958 mOrigBandwidthIndex = mCurBandwidthIndex;
1959 }
1960 }
1961
1962 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1963 mSwitchInProgress, mStreamMask);
1964
1965 if (mDisconnectReplyID != NULL) {
1966 finishDisconnect();
1967 }
1968 }
1969
swapPacketSource(StreamType stream)1970 void LiveSession::swapPacketSource(StreamType stream) {
1971 ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1972
1973 // transfer packets from source2 to source
1974 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1975 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1976
1977 // queue discontinuity in mPacketSource
1978 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1979
1980 // queue packets in mPacketSource2 to mPacketSource
1981 status_t finalResult = OK;
1982 sp<ABuffer> accessUnit;
1983 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1984 OK == aps2->dequeueAccessUnit(&accessUnit)) {
1985 aps->queueAccessUnit(accessUnit);
1986 }
1987 aps2->clear();
1988 }
1989
tryToFinishBandwidthSwitch(const AString & oldUri)1990 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1991 if (!mSwitchInProgress) {
1992 return;
1993 }
1994
1995 ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1996 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1997 return;
1998 }
1999
2000 // Swap packet source of streams provided by old variant
2001 for (size_t idx = 0; idx < kMaxStreams; idx++) {
2002 StreamType stream = indexToType(idx);
2003 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
2004 swapPacketSource(stream);
2005
2006 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2007 ALOGW("swapping stream type %d %s to empty stream",
2008 stream, mStreams[idx].mUri.c_str());
2009 }
2010 mStreams[idx].mUri = mStreams[idx].mNewUri;
2011 mStreams[idx].mNewUri.clear();
2012
2013 mSwapMask &= ~stream;
2014 }
2015 }
2016
2017 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2018
2019 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2020 if (mSwapMask != 0) {
2021 return;
2022 }
2023
2024 // Check if new variant contains extra streams.
2025 uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2026 while (extraStreams) {
2027 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2028 extraStreams &= ~stream;
2029
2030 swapPacketSource(stream);
2031
2032 ssize_t idx = typeToIndex(stream);
2033 CHECK(idx >= 0);
2034 if (mStreams[idx].mNewUri.empty()) {
2035 ALOGW("swapping extra stream type %d %s to empty stream",
2036 stream, mStreams[idx].mUri.c_str());
2037 }
2038 mStreams[idx].mUri = mStreams[idx].mNewUri;
2039 mStreams[idx].mNewUri.clear();
2040 }
2041
2042 // Restart new fetcher (it was paused after the first 47k block)
2043 // and let it fetch into mPacketSources (not mPacketSources2)
2044 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2045 FetcherInfo &info = mFetcherInfos.editValueAt(i);
2046 if (info.mToBeResumed) {
2047 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2048 info.mToBeResumed = false;
2049 }
2050 }
2051
2052 ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2053 mOrigBandwidthIndex, mCurBandwidthIndex);
2054
2055 mStreamMask = mNewStreamMask;
2056 mSwitchInProgress = false;
2057 mOrigBandwidthIndex = mCurBandwidthIndex;
2058
2059 restartPollBuffering();
2060 }
2061
schedulePollBuffering()2062 void LiveSession::schedulePollBuffering() {
2063 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2064 msg->setInt32("generation", mPollBufferingGeneration);
2065 msg->post(1000000ll);
2066 }
2067
cancelPollBuffering()2068 void LiveSession::cancelPollBuffering() {
2069 ++mPollBufferingGeneration;
2070 mPrevBufferPercentage = -1;
2071 }
2072
restartPollBuffering()2073 void LiveSession::restartPollBuffering() {
2074 cancelPollBuffering();
2075 onPollBuffering();
2076 }
2077
onPollBuffering()2078 void LiveSession::onPollBuffering() {
2079 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2080 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2081 mSwitchInProgress, mReconfigurationInProgress,
2082 mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2083
2084 bool underflow, ready, down, up;
2085 if (checkBuffering(underflow, ready, down, up)) {
2086 if (mInPreparationPhase) {
2087 // Allow down switch even if we're still preparing.
2088 //
2089 // Some streams have a high bandwidth index as default,
2090 // when bandwidth is low, it takes a long time to buffer
2091 // to ready mark, then it immediately pauses after start
2092 // as we have to do a down switch. It's better experience
2093 // to restart from a lower index, if we detect low bw.
2094 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2095 postPrepared(OK);
2096 }
2097 }
2098
2099 if (!mInPreparationPhase) {
2100 if (ready) {
2101 stopBufferingIfNecessary();
2102 } else if (underflow) {
2103 startBufferingIfNecessary();
2104 }
2105 switchBandwidthIfNeeded(up, down);
2106 }
2107 }
2108
2109 schedulePollBuffering();
2110 }
2111
cancelBandwidthSwitch(bool resume)2112 void LiveSession::cancelBandwidthSwitch(bool resume) {
2113 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2114 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2115 if (!mSwitchInProgress) {
2116 return;
2117 }
2118
2119 for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2120 FetcherInfo& info = mFetcherInfos.editValueAt(i);
2121 if (info.mToBeRemoved) {
2122 info.mToBeRemoved = false;
2123 if (resume) {
2124 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2125 }
2126 }
2127 }
2128
2129 for (size_t i = 0; i < kMaxStreams; ++i) {
2130 AString newUri = mStreams[i].mNewUri;
2131 if (!newUri.empty()) {
2132 // clear all mNewUri matching this newUri
2133 for (size_t j = i; j < kMaxStreams; ++j) {
2134 if (mStreams[j].mNewUri == newUri) {
2135 mStreams[j].mNewUri.clear();
2136 }
2137 }
2138 ALOGV("stopping newUri = %s", newUri.c_str());
2139 ssize_t index = mFetcherInfos.indexOfKey(newUri);
2140 if (index < 0) {
2141 ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2142 continue;
2143 }
2144 FetcherInfo &info = mFetcherInfos.editValueAt(index);
2145 info.mToBeRemoved = true;
2146 info.mFetcher->stopAsync();
2147 }
2148 }
2149
2150 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2151 mOrigBandwidthIndex, mCurBandwidthIndex);
2152
2153 mSwitchGeneration++;
2154 mSwitchInProgress = false;
2155 mCurBandwidthIndex = mOrigBandwidthIndex;
2156 mSwapMask = 0;
2157 }
2158
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2159 bool LiveSession::checkBuffering(
2160 bool &underflow, bool &ready, bool &down, bool &up) {
2161 underflow = ready = down = up = false;
2162
2163 if (mReconfigurationInProgress) {
2164 ALOGV("Switch/Reconfig in progress, defer buffer polling");
2165 return false;
2166 }
2167
2168 size_t activeCount, underflowCount, readyCount, downCount, upCount;
2169 activeCount = underflowCount = readyCount = downCount = upCount =0;
2170 int32_t minBufferPercent = -1;
2171 int64_t durationUs;
2172 if (getDuration(&durationUs) != OK) {
2173 durationUs = -1;
2174 }
2175 for (size_t i = 0; i < mPacketSources.size(); ++i) {
2176 // we don't check subtitles for buffering level
2177 if (!(mStreamMask & mPacketSources.keyAt(i)
2178 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2179 continue;
2180 }
2181 // ignore streams that never had any packet queued.
2182 // (it's possible that the variant only has audio or video)
2183 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2184 if (meta == NULL) {
2185 continue;
2186 }
2187
2188 status_t finalResult;
2189 int64_t bufferedDurationUs =
2190 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2191 ALOGV("[%s] buffered %lld us",
2192 getNameForStream(mPacketSources.keyAt(i)),
2193 (long long)bufferedDurationUs);
2194 if (durationUs >= 0) {
2195 int32_t percent;
2196 if (mPacketSources[i]->isFinished(0 /* duration */)) {
2197 percent = 100;
2198 } else {
2199 percent = (int32_t)(100.0 *
2200 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2201 }
2202 if (minBufferPercent < 0 || percent < minBufferPercent) {
2203 minBufferPercent = percent;
2204 }
2205 }
2206
2207 ++activeCount;
2208 int64_t readyMarkUs =
2209 (mInPreparationPhase ?
2210 mBufferingSettings.mInitialMarkMs :
2211 mBufferingSettings.mResumePlaybackMarkMs) * 1000ll;
2212 if (bufferedDurationUs > readyMarkUs
2213 || mPacketSources[i]->isFinished(0)) {
2214 ++readyCount;
2215 }
2216 if (!mPacketSources[i]->isFinished(0)) {
2217 if (bufferedDurationUs < kUnderflowMarkMs * 1000ll) {
2218 ++underflowCount;
2219 }
2220 if (bufferedDurationUs > mUpSwitchMark) {
2221 ++upCount;
2222 }
2223 if (bufferedDurationUs < mDownSwitchMark) {
2224 ++downCount;
2225 }
2226 }
2227 }
2228
2229 if (minBufferPercent >= 0) {
2230 notifyBufferingUpdate(minBufferPercent);
2231 }
2232
2233 if (activeCount > 0) {
2234 up = (upCount == activeCount);
2235 down = (downCount > 0);
2236 ready = (readyCount == activeCount);
2237 underflow = (underflowCount > 0);
2238 return true;
2239 }
2240
2241 return false;
2242 }
2243
startBufferingIfNecessary()2244 void LiveSession::startBufferingIfNecessary() {
2245 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2246 mInPreparationPhase, mBuffering);
2247 if (!mBuffering) {
2248 mBuffering = true;
2249
2250 sp<AMessage> notify = mNotify->dup();
2251 notify->setInt32("what", kWhatBufferingStart);
2252 notify->post();
2253 }
2254 }
2255
stopBufferingIfNecessary()2256 void LiveSession::stopBufferingIfNecessary() {
2257 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2258 mInPreparationPhase, mBuffering);
2259
2260 if (mBuffering) {
2261 mBuffering = false;
2262
2263 sp<AMessage> notify = mNotify->dup();
2264 notify->setInt32("what", kWhatBufferingEnd);
2265 notify->post();
2266 }
2267 }
2268
notifyBufferingUpdate(int32_t percentage)2269 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2270 if (percentage < mPrevBufferPercentage) {
2271 percentage = mPrevBufferPercentage;
2272 } else if (percentage > 100) {
2273 percentage = 100;
2274 }
2275
2276 mPrevBufferPercentage = percentage;
2277
2278 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2279
2280 sp<AMessage> notify = mNotify->dup();
2281 notify->setInt32("what", kWhatBufferingUpdate);
2282 notify->setInt32("percentage", percentage);
2283 notify->post();
2284 }
2285
tryBandwidthFallback()2286 bool LiveSession::tryBandwidthFallback() {
2287 if (mInPreparationPhase || mReconfigurationInProgress) {
2288 // Don't try fallback during prepare or reconfig.
2289 // If error happens there, it's likely unrecoverable.
2290 return false;
2291 }
2292 if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2293 // if we're switching up, simply cancel and resume old variant
2294 cancelBandwidthSwitch(true /* resume */);
2295 return true;
2296 } else {
2297 // if we're switching down, we're likely about to underflow (if
2298 // not already underflowing). try the lowest viable bandwidth if
2299 // not on that variant already.
2300 ssize_t lowestValid = getLowestValidBandwidthIndex();
2301 if (mCurBandwidthIndex > lowestValid) {
2302 cancelBandwidthSwitch();
2303 changeConfiguration(-1ll, lowestValid);
2304 return true;
2305 }
2306 }
2307 // return false if we couldn't find any fallback
2308 return false;
2309 }
2310
2311 /*
2312 * returns true if a bandwidth switch is actually needed (and started),
2313 * returns false otherwise
2314 */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2315 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2316 // no need to check bandwidth if we only have 1 bandwidth settings
2317 if (mBandwidthItems.size() < 2) {
2318 return false;
2319 }
2320
2321 if (mSwitchInProgress) {
2322 if (mBuffering) {
2323 tryBandwidthFallback();
2324 }
2325 return false;
2326 }
2327
2328 int32_t bandwidthBps, shortTermBps;
2329 bool isStable;
2330 if (mBandwidthEstimator->estimateBandwidth(
2331 &bandwidthBps, &isStable, &shortTermBps)) {
2332 ALOGV("bandwidth estimated at %.2f kbps, "
2333 "stable %d, shortTermBps %.2f kbps",
2334 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2335 mLastBandwidthBps = bandwidthBps;
2336 mLastBandwidthStable = isStable;
2337 } else {
2338 ALOGV("no bandwidth estimate.");
2339 return false;
2340 }
2341
2342 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2343 // canSwithDown and canSwitchUp can't both be true.
2344 // we only want to switch up when measured bw is 120% higher than current variant,
2345 // and we only want to switch down when measured bw is below current variant.
2346 bool canSwitchDown = bufferLow
2347 && (bandwidthBps < (int32_t)curBandwidth);
2348 bool canSwitchUp = bufferHigh
2349 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2350
2351 if (canSwitchDown || canSwitchUp) {
2352 // bandwidth estimating has some delay, if we have to downswitch when
2353 // it hasn't stabilized, use the short term to guess real bandwidth,
2354 // since it may be dropping too fast.
2355 // (note this doesn't apply to upswitch, always use longer average there)
2356 if (!isStable && canSwitchDown) {
2357 if (shortTermBps < bandwidthBps) {
2358 bandwidthBps = shortTermBps;
2359 }
2360 }
2361
2362 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2363
2364 // it's possible that we're checking for canSwitchUp case, but the returned
2365 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2366 // of measured bw. In that case we don't want to do anything, since we have
2367 // both enough buffer and enough bw.
2368 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2369 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2370 // if not yet prepared, just restart again with new bw index.
2371 // this is faster and playback experience is cleaner.
2372 changeConfiguration(
2373 mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2374 return true;
2375 }
2376 }
2377 return false;
2378 }
2379
postError(status_t err)2380 void LiveSession::postError(status_t err) {
2381 // if we reached EOS, notify buffering of 100%
2382 if (err == ERROR_END_OF_STREAM) {
2383 notifyBufferingUpdate(100);
2384 }
2385 // we'll stop buffer polling now, before that notify
2386 // stop buffering to stop the spinning icon
2387 stopBufferingIfNecessary();
2388 cancelPollBuffering();
2389
2390 sp<AMessage> notify = mNotify->dup();
2391 notify->setInt32("what", kWhatError);
2392 notify->setInt32("err", err);
2393 notify->post();
2394 }
2395
postPrepared(status_t err)2396 void LiveSession::postPrepared(status_t err) {
2397 CHECK(mInPreparationPhase);
2398
2399 sp<AMessage> notify = mNotify->dup();
2400 if (err == OK || err == ERROR_END_OF_STREAM) {
2401 notify->setInt32("what", kWhatPrepared);
2402 } else {
2403 cancelPollBuffering();
2404
2405 notify->setInt32("what", kWhatPreparationFailed);
2406 notify->setInt32("err", err);
2407 }
2408
2409 notify->post();
2410
2411 mInPreparationPhase = false;
2412 }
2413
2414
2415 } // namespace android
2416
2417