1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20 
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25 
26 #include "mpeg2ts/AnotherPacketSource.h"
27 
28 #include <cutils/properties.h>
29 #include <media/IMediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37 
38 #include <utils/Mutex.h>
39 
40 #include <ctype.h>
41 #include <inttypes.h>
42 
43 namespace android {
44 
45 // static
46 // Bandwidth Switch Mark Defaults
47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50 const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51 
52 struct LiveSession::BandwidthEstimator : public RefBase {
53     BandwidthEstimator();
54 
55     void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
56     bool estimateBandwidth(
57             int32_t *bandwidth,
58             bool *isStable = NULL,
59             int32_t *shortTermBps = NULL);
60 
61 private:
62     // Bandwidth estimation parameters
63     static const int32_t kShortTermBandwidthItems = 3;
64     static const int32_t kMinBandwidthHistoryItems = 20;
65     static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
66     static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
67     static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
68 
69     struct BandwidthEntry {
70         int64_t mTimestampUs;
71         int64_t mDelayUs;
72         size_t mNumBytes;
73     };
74 
75     Mutex mLock;
76     List<BandwidthEntry> mBandwidthHistory;
77     List<int32_t> mPrevEstimates;
78     int32_t mShortTermEstimate;
79     bool mHasNewSample;
80     bool mIsStable;
81     int64_t mTotalTransferTimeUs;
82     size_t mTotalTransferBytes;
83 
84     DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
85 };
86 
BandwidthEstimator()87 LiveSession::BandwidthEstimator::BandwidthEstimator() :
88     mShortTermEstimate(0),
89     mHasNewSample(false),
90     mIsStable(true),
91     mTotalTransferTimeUs(0),
92     mTotalTransferBytes(0) {
93 }
94 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)95 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
96         size_t numBytes, int64_t delayUs) {
97     AutoMutex autoLock(mLock);
98 
99     int64_t nowUs = ALooper::GetNowUs();
100     BandwidthEntry entry;
101     entry.mTimestampUs = nowUs;
102     entry.mDelayUs = delayUs;
103     entry.mNumBytes = numBytes;
104     mTotalTransferTimeUs += delayUs;
105     mTotalTransferBytes += numBytes;
106     mBandwidthHistory.push_back(entry);
107     mHasNewSample = true;
108 
109     // Remove no more than 10% of total transfer time at a time
110     // to avoid sudden jump on bandwidth estimation. There might
111     // be long blocking reads that takes up signification time,
112     // we have to keep a longer window in that case.
113     int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
114     if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
115         bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
116     } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
117         bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
118     }
119     // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
120     // and total transfer time at least kMaxBandwidthHistoryWindowUs.
121     while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
122         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
123         // remove sample if either absolute age or total transfer time is
124         // over kMaxBandwidthHistoryWindowUs
125         if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
126                 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
127             break;
128         }
129         mTotalTransferTimeUs -= it->mDelayUs;
130         mTotalTransferBytes -= it->mNumBytes;
131         mBandwidthHistory.erase(mBandwidthHistory.begin());
132     }
133 }
134 
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)135 bool LiveSession::BandwidthEstimator::estimateBandwidth(
136         int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
137     AutoMutex autoLock(mLock);
138 
139     if (mBandwidthHistory.size() < 2) {
140         return false;
141     }
142 
143     if (!mHasNewSample) {
144         *bandwidthBps = *(--mPrevEstimates.end());
145         if (isStable) {
146             *isStable = mIsStable;
147         }
148         if (shortTermBps) {
149             *shortTermBps = mShortTermEstimate;
150         }
151         return true;
152     }
153 
154     *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
155     mPrevEstimates.push_back(*bandwidthBps);
156     while (mPrevEstimates.size() > 3) {
157         mPrevEstimates.erase(mPrevEstimates.begin());
158     }
159     mHasNewSample = false;
160 
161     int64_t totalTimeUs = 0;
162     size_t totalBytes = 0;
163     if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
164         List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
165         for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
166             totalTimeUs += it->mDelayUs;
167             totalBytes += it->mNumBytes;
168         }
169     }
170     mShortTermEstimate = totalTimeUs > 0 ?
171             (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
172     if (shortTermBps) {
173         *shortTermBps = mShortTermEstimate;
174     }
175 
176     int64_t minEstimate = -1, maxEstimate = -1;
177     List<int32_t>::iterator it;
178     for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
179         int32_t estimate = *it;
180         if (minEstimate < 0 || minEstimate > estimate) {
181             minEstimate = estimate;
182         }
183         if (maxEstimate < 0 || maxEstimate < estimate) {
184             maxEstimate = estimate;
185         }
186     }
187     // consider it stable if long-term average is not jumping a lot
188     // and short-term average is not much lower than long-term average
189     mIsStable = (maxEstimate <= minEstimate * 4 / 3)
190             && mShortTermEstimate > minEstimate * 7 / 10;
191     if (isStable) {
192         *isStable = mIsStable;
193     }
194 
195 #if 0
196     {
197         char dumpStr[1024] = {0};
198         size_t itemIdx = 0;
199         size_t histSize = mBandwidthHistory.size();
200         sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
201             *bandwidthBps, mIsStable, histSize);
202         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
203         for (; it != mBandwidthHistory.end(); ++it) {
204             if (itemIdx > 50) {
205                 sprintf(dumpStr + strlen(dumpStr),
206                         "...(%zd more items)... }", histSize - itemIdx);
207                 break;
208             }
209             sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
210                 it->mNumBytes / 1024,
211                 (double)it->mDelayUs * 1.0e-6,
212                 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
213             itemIdx++;
214         }
215         ALOGE(dumpStr);
216     }
217 #endif
218     return true;
219 }
220 
221 //static
getKeyForStream(StreamType type)222 const char *LiveSession::getKeyForStream(StreamType type) {
223     switch (type) {
224         case STREAMTYPE_VIDEO:
225             return "timeUsVideo";
226         case STREAMTYPE_AUDIO:
227             return "timeUsAudio";
228         case STREAMTYPE_SUBTITLES:
229             return "timeUsSubtitle";
230         case STREAMTYPE_METADATA:
231             return "timeUsMetadata"; // unused
232         default:
233             TRESPASS();
234     }
235     return NULL;
236 }
237 
238 //static
getNameForStream(StreamType type)239 const char *LiveSession::getNameForStream(StreamType type) {
240     switch (type) {
241         case STREAMTYPE_VIDEO:
242             return "video";
243         case STREAMTYPE_AUDIO:
244             return "audio";
245         case STREAMTYPE_SUBTITLES:
246             return "subs";
247         case STREAMTYPE_METADATA:
248             return "metadata";
249         default:
250             break;
251     }
252     return "unknown";
253 }
254 
255 //static
getSourceTypeForStream(StreamType type)256 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
257     switch (type) {
258         case STREAMTYPE_VIDEO:
259             return ATSParser::VIDEO;
260         case STREAMTYPE_AUDIO:
261             return ATSParser::AUDIO;
262         case STREAMTYPE_METADATA:
263             return ATSParser::META;
264         case STREAMTYPE_SUBTITLES:
265         default:
266             TRESPASS();
267     }
268     return ATSParser::NUM_SOURCE_TYPES; // should not reach here
269 }
270 
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<IMediaHTTPService> & httpService)271 LiveSession::LiveSession(
272         const sp<AMessage> &notify, uint32_t flags,
273         const sp<IMediaHTTPService> &httpService)
274     : mNotify(notify),
275       mFlags(flags),
276       mHTTPService(httpService),
277       mBuffering(false),
278       mInPreparationPhase(true),
279       mPollBufferingGeneration(0),
280       mPrevBufferPercentage(-1),
281       mCurBandwidthIndex(-1),
282       mOrigBandwidthIndex(-1),
283       mLastBandwidthBps(-1ll),
284       mLastBandwidthStable(false),
285       mBandwidthEstimator(new BandwidthEstimator()),
286       mMaxWidth(720),
287       mMaxHeight(480),
288       mStreamMask(0),
289       mNewStreamMask(0),
290       mSwapMask(0),
291       mSwitchGeneration(0),
292       mSubtitleGeneration(0),
293       mLastDequeuedTimeUs(0ll),
294       mRealTimeBaseUs(0ll),
295       mReconfigurationInProgress(false),
296       mSwitchInProgress(false),
297       mUpSwitchMark(kUpSwitchMarkUs),
298       mDownSwitchMark(kDownSwitchMarkUs),
299       mUpSwitchMargin(kUpSwitchMarginUs),
300       mFirstTimeUsValid(false),
301       mFirstTimeUs(0),
302       mLastSeekTimeUs(0),
303       mHasMetadata(false) {
304     mStreams[kAudioIndex] = StreamItem("audio");
305     mStreams[kVideoIndex] = StreamItem("video");
306     mStreams[kSubtitleIndex] = StreamItem("subtitles");
307 
308     for (size_t i = 0; i < kNumSources; ++i) {
309         mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
310         mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
311     }
312 }
313 
~LiveSession()314 LiveSession::~LiveSession() {
315     if (mFetcherLooper != NULL) {
316         mFetcherLooper->stop();
317     }
318 }
319 
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)320 int64_t LiveSession::calculateMediaTimeUs(
321         int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
322     if (timeUs >= firstTimeUs) {
323         timeUs -= firstTimeUs;
324     } else {
325         timeUs = 0;
326     }
327     timeUs += mLastSeekTimeUs;
328     if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
329         timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
330     }
331     return timeUs;
332 }
333 
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)334 status_t LiveSession::dequeueAccessUnit(
335         StreamType stream, sp<ABuffer> *accessUnit) {
336     status_t finalResult = OK;
337     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
338 
339     ssize_t streamIdx = typeToIndex(stream);
340     if (streamIdx < 0) {
341         return BAD_VALUE;
342     }
343     const char *streamStr = getNameForStream(stream);
344     // Do not let client pull data if we don't have data packets yet.
345     // We might only have a format discontinuity queued without data.
346     // When NuPlayerDecoder dequeues the format discontinuity, it will
347     // immediately try to getFormat. If we return NULL, NuPlayerDecoder
348     // thinks it can do seamless change, so will not shutdown decoder.
349     // When the actual format arrives, it can't handle it and get stuck.
350     if (!packetSource->hasDataBufferAvailable(&finalResult)) {
351         ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
352                 streamStr, finalResult);
353 
354         if (finalResult == OK) {
355             return -EAGAIN;
356         } else {
357             return finalResult;
358         }
359     }
360 
361     // Let the client dequeue as long as we have buffers available
362     // Do not make pause/resume decisions here.
363 
364     status_t err = packetSource->dequeueAccessUnit(accessUnit);
365 
366     if (err == INFO_DISCONTINUITY) {
367         // adaptive streaming, discontinuities in the playlist
368         int32_t type;
369         CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
370 
371         sp<AMessage> extra;
372         if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
373             extra.clear();
374         }
375 
376         ALOGI("[%s] read discontinuity of type %d, extra = %s",
377               streamStr,
378               type,
379               extra == NULL ? "NULL" : extra->debugString().c_str());
380     } else if (err == OK) {
381 
382         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
383             int64_t timeUs, originalTimeUs;
384             int32_t discontinuitySeq = 0;
385             StreamItem& strm = mStreams[streamIdx];
386             CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
387             originalTimeUs = timeUs;
388             (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
389             if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
390                 int64_t offsetTimeUs;
391                 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
392                     offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
393                 } else {
394                     offsetTimeUs = 0;
395                 }
396 
397                 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
398                         && strm.mLastDequeuedTimeUs >= 0) {
399                     int64_t firstTimeUs;
400                     firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
401                     offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
402                     offsetTimeUs += strm.mLastSampleDurationUs;
403                 } else {
404                     offsetTimeUs += strm.mLastSampleDurationUs;
405                 }
406 
407                 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
408                 strm.mCurDiscontinuitySeq = discontinuitySeq;
409             }
410 
411             int32_t discard = 0;
412             int64_t firstTimeUs;
413             if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
414                 int64_t durUs; // approximate sample duration
415                 if (timeUs > strm.mLastDequeuedTimeUs) {
416                     durUs = timeUs - strm.mLastDequeuedTimeUs;
417                 } else {
418                     durUs = strm.mLastDequeuedTimeUs - timeUs;
419                 }
420                 strm.mLastSampleDurationUs = durUs;
421                 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
422             } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
423                 firstTimeUs = timeUs;
424             } else {
425                 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
426                 firstTimeUs = timeUs;
427             }
428 
429             strm.mLastDequeuedTimeUs = timeUs;
430             timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
431 
432             ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
433                     streamStr, (long long)timeUs, (long long)originalTimeUs);
434             (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
435             mLastDequeuedTimeUs = timeUs;
436             mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
437         } else if (stream == STREAMTYPE_SUBTITLES) {
438             int32_t subtitleGeneration;
439             if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
440                     && subtitleGeneration != mSubtitleGeneration) {
441                return -EAGAIN;
442             };
443             (*accessUnit)->meta()->setInt32(
444                     "trackIndex", mPlaylist->getSelectedIndex());
445             (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
446         } else if (stream == STREAMTYPE_METADATA) {
447             HLSTime mdTime((*accessUnit)->meta());
448             if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
449                 packetSource->requeueAccessUnit((*accessUnit));
450                 return -EAGAIN;
451             } else {
452                 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
453                 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
454                 (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
455                 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
456             }
457         }
458     } else {
459         ALOGI("[%s] encountered error %d", streamStr, err);
460     }
461 
462     return err;
463 }
464 
getStreamFormatMeta(StreamType stream,sp<MetaData> * meta)465 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
466     if (!(mStreamMask & stream)) {
467         return UNKNOWN_ERROR;
468     }
469 
470     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
471 
472     *meta = packetSource->getFormat();
473 
474     if (*meta == NULL) {
475         return -EWOULDBLOCK;
476     }
477 
478     if (stream == STREAMTYPE_AUDIO) {
479         // set AAC input buffer size to 32K bytes (256kbps x 1sec)
480         (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
481     } else if (stream == STREAMTYPE_VIDEO) {
482         (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
483         (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
484     }
485 
486     return OK;
487 }
488 
getHTTPDownloader()489 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
490     return new HTTPDownloader(mHTTPService, mExtraHeaders);
491 }
492 
setBufferingSettings(const BufferingSettings & buffering)493 void LiveSession::setBufferingSettings(
494         const BufferingSettings &buffering) {
495     sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
496     writeToAMessage(msg, buffering);
497     msg->post();
498 }
499 
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)500 void LiveSession::connectAsync(
501         const char *url, const KeyedVector<String8, String8> *headers) {
502     sp<AMessage> msg = new AMessage(kWhatConnect, this);
503     msg->setString("url", url);
504 
505     if (headers != NULL) {
506         msg->setPointer(
507                 "headers",
508                 new KeyedVector<String8, String8>(*headers));
509     }
510 
511     msg->post();
512 }
513 
disconnect()514 status_t LiveSession::disconnect() {
515     sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
516 
517     sp<AMessage> response;
518     status_t err = msg->postAndAwaitResponse(&response);
519 
520     return err;
521 }
522 
seekTo(int64_t timeUs,MediaPlayerSeekMode mode)523 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
524     sp<AMessage> msg = new AMessage(kWhatSeek, this);
525     msg->setInt64("timeUs", timeUs);
526     msg->setInt32("mode", mode);
527 
528     sp<AMessage> response;
529     status_t err = msg->postAndAwaitResponse(&response);
530 
531     return err;
532 }
533 
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)534 bool LiveSession::checkSwitchProgress(
535         sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
536     AString newUri;
537     CHECK(stopParams->findString("uri", &newUri));
538 
539     *needResumeUntil = false;
540     sp<AMessage> firstNewMeta[kMaxStreams];
541     for (size_t i = 0; i < kMaxStreams; ++i) {
542         StreamType stream = indexToType(i);
543         if (!(mSwapMask & mNewStreamMask & stream)
544             || (mStreams[i].mNewUri != newUri)) {
545             continue;
546         }
547         if (stream == STREAMTYPE_SUBTITLES) {
548             continue;
549         }
550         sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
551 
552         // First, get latest dequeued meta, which is where the decoder is at.
553         // (when upswitching, we take the meta after a certain delay, so that
554         // the decoder is left with some cushion)
555         sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
556         if (delayUs > 0) {
557             lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
558             if (lastDequeueMeta == NULL) {
559                 // this means we don't have enough cushion, try again later
560                 ALOGV("[%s] up switching failed due to insufficient buffer",
561                         getNameForStream(stream));
562                 return false;
563             }
564         } else {
565             // It's okay for lastDequeueMeta to be NULL here, it means the
566             // decoder hasn't even started dequeueing
567             lastDequeueMeta = source->getLatestDequeuedMeta();
568         }
569         // Then, trim off packets at beginning of mPacketSources2 that's before
570         // the latest dequeued time. These samples are definitely too late.
571         firstNewMeta[i] = mPacketSources2.editValueAt(i)
572                             ->trimBuffersBeforeMeta(lastDequeueMeta);
573 
574         // Now firstNewMeta[i] is the first sample after the trim.
575         // If it's NULL, we failed because dequeue already past all samples
576         // in mPacketSource2, we have to try again.
577         if (firstNewMeta[i] == NULL) {
578             HLSTime dequeueTime(lastDequeueMeta);
579             ALOGV("[%s] dequeue time (%d, %lld) past start time",
580                     getNameForStream(stream),
581                     dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
582             return false;
583         }
584 
585         // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
586         // already fetched, and see if we need to resumeUntil
587         lastEnqueueMeta = source->getLatestEnqueuedMeta();
588         // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
589         // boundary, no need to resume as the content will look different anyways
590         if (lastEnqueueMeta != NULL) {
591             HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
592 
593             // no need to resume old fetcher if new fetcher started in different
594             // discontinuity sequence, as the content will look different.
595             *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
596                     && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
597 
598             // update the stopTime for resumeUntil
599             stopParams->setInt32("discontinuitySeq", startTime.mSeq);
600             stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
601         }
602     }
603 
604     // if we're here, it means dequeue progress hasn't passed some samples in
605     // mPacketSource2, we can trim off the excess in mPacketSource.
606     // (old fetcher might still need to resumeUntil the start time of new fetcher)
607     for (size_t i = 0; i < kMaxStreams; ++i) {
608         StreamType stream = indexToType(i);
609         if (!(mSwapMask & mNewStreamMask & stream)
610             || (newUri != mStreams[i].mNewUri)
611             || stream == STREAMTYPE_SUBTITLES) {
612             continue;
613         }
614         mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
615     }
616 
617     // no resumeUntil if already underflow
618     *needResumeUntil &= !mBuffering;
619 
620     return true;
621 }
622 
onMessageReceived(const sp<AMessage> & msg)623 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
624     switch (msg->what()) {
625         case kWhatSetBufferingSettings:
626         {
627             readFromAMessage(msg, &mBufferingSettings);
628             break;
629         }
630 
631         case kWhatConnect:
632         {
633             onConnect(msg);
634             break;
635         }
636 
637         case kWhatDisconnect:
638         {
639             CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
640 
641             if (mReconfigurationInProgress) {
642                 break;
643             }
644 
645             finishDisconnect();
646             break;
647         }
648 
649         case kWhatSeek:
650         {
651             if (mReconfigurationInProgress) {
652                 msg->post(50000);
653                 break;
654             }
655 
656             CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
657             mSeekReply = new AMessage;
658 
659             onSeek(msg);
660             break;
661         }
662 
663         case kWhatFetcherNotify:
664         {
665             int32_t what;
666             CHECK(msg->findInt32("what", &what));
667 
668             switch (what) {
669                 case PlaylistFetcher::kWhatStarted:
670                     break;
671                 case PlaylistFetcher::kWhatPaused:
672                 case PlaylistFetcher::kWhatStopped:
673                 {
674                     AString uri;
675                     CHECK(msg->findString("uri", &uri));
676                     ssize_t index = mFetcherInfos.indexOfKey(uri);
677                     if (index < 0) {
678                         // ignore msgs from fetchers that's already gone
679                         break;
680                     }
681 
682                     ALOGV("fetcher-%d %s",
683                             mFetcherInfos[index].mFetcher->getFetcherID(),
684                             what == PlaylistFetcher::kWhatPaused ?
685                                     "paused" : "stopped");
686 
687                     if (what == PlaylistFetcher::kWhatStopped) {
688                         mFetcherLooper->unregisterHandler(
689                                 mFetcherInfos[index].mFetcher->id());
690                         mFetcherInfos.removeItemsAt(index);
691                     } else if (what == PlaylistFetcher::kWhatPaused) {
692                         int32_t seekMode;
693                         CHECK(msg->findInt32("seekMode", &seekMode));
694                         for (size_t i = 0; i < kMaxStreams; ++i) {
695                             if (mStreams[i].mUri == uri) {
696                                 mStreams[i].mSeekMode = (SeekMode) seekMode;
697                             }
698                         }
699                     }
700 
701                     if (mContinuation != NULL) {
702                         CHECK_GT(mContinuationCounter, 0);
703                         if (--mContinuationCounter == 0) {
704                             mContinuation->post();
705                         }
706                         ALOGV("%zu fetcher(s) left", mContinuationCounter);
707                     }
708                     break;
709                 }
710 
711                 case PlaylistFetcher::kWhatDurationUpdate:
712                 {
713                     AString uri;
714                     CHECK(msg->findString("uri", &uri));
715 
716                     int64_t durationUs;
717                     CHECK(msg->findInt64("durationUs", &durationUs));
718 
719                     ssize_t index = mFetcherInfos.indexOfKey(uri);
720                     if (index >= 0) {
721                         FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
722                         info->mDurationUs = durationUs;
723                     }
724                     break;
725                 }
726 
727                 case PlaylistFetcher::kWhatTargetDurationUpdate:
728                 {
729                     int64_t targetDurationUs;
730                     CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
731                     mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
732                     mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
733                     mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
734                     break;
735                 }
736 
737                 case PlaylistFetcher::kWhatError:
738                 {
739                     status_t err;
740                     CHECK(msg->findInt32("err", &err));
741 
742                     ALOGE("XXX Received error %d from PlaylistFetcher.", err);
743 
744                     // handle EOS on subtitle tracks independently
745                     AString uri;
746                     if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
747                         ssize_t i = mFetcherInfos.indexOfKey(uri);
748                         if (i >= 0) {
749                             const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
750                             if (fetcher != NULL) {
751                                 uint32_t type = fetcher->getStreamTypeMask();
752                                 if (type == STREAMTYPE_SUBTITLES) {
753                                     mPacketSources.valueFor(
754                                             STREAMTYPE_SUBTITLES)->signalEOS(err);;
755                                     break;
756                                 }
757                             }
758                         }
759                     }
760 
761                     // remember the failure index (as mCurBandwidthIndex will be restored
762                     // after cancelBandwidthSwitch()), and record last fail time
763                     size_t failureIndex = mCurBandwidthIndex;
764                     mBandwidthItems.editItemAt(
765                             failureIndex).mLastFailureUs = ALooper::GetNowUs();
766 
767                     if (mSwitchInProgress) {
768                         // if error happened when we switch to a variant, try fallback
769                         // to other variant to save the session
770                         if (tryBandwidthFallback()) {
771                             break;
772                         }
773                     }
774 
775                     if (mInPreparationPhase) {
776                         postPrepared(err);
777                     }
778 
779                     cancelBandwidthSwitch();
780 
781                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
782 
783                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
784 
785                     mPacketSources.valueFor(
786                             STREAMTYPE_SUBTITLES)->signalEOS(err);
787 
788                     postError(err);
789                     break;
790                 }
791 
792                 case PlaylistFetcher::kWhatStopReached:
793                 {
794                     ALOGV("kWhatStopReached");
795 
796                     AString oldUri;
797                     CHECK(msg->findString("uri", &oldUri));
798 
799                     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
800                     if (index < 0) {
801                         break;
802                     }
803 
804                     tryToFinishBandwidthSwitch(oldUri);
805                     break;
806                 }
807 
808                 case PlaylistFetcher::kWhatStartedAt:
809                 {
810                     int32_t switchGeneration;
811                     CHECK(msg->findInt32("switchGeneration", &switchGeneration));
812 
813                     ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
814                             switchGeneration, mSwitchGeneration);
815 
816                     if (switchGeneration != mSwitchGeneration) {
817                         break;
818                     }
819 
820                     AString uri;
821                     CHECK(msg->findString("uri", &uri));
822 
823                     // mark new fetcher mToBeResumed
824                     ssize_t index = mFetcherInfos.indexOfKey(uri);
825                     if (index >= 0) {
826                         mFetcherInfos.editValueAt(index).mToBeResumed = true;
827                     }
828 
829                     // temporarily disable packet sources to be swapped to prevent
830                     // NuPlayerDecoder from dequeuing while we check progress
831                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
832                         if ((mSwapMask & mPacketSources.keyAt(i))
833                                 && uri == mStreams[i].mNewUri) {
834                             mPacketSources.editValueAt(i)->enable(false);
835                         }
836                     }
837                     bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
838                     // If switching up, require a cushion bigger than kUnderflowMark
839                     // to avoid buffering immediately after the switch.
840                     // (If we don't have that cushion we'd rather cancel and try again.)
841                     int64_t delayUs =
842                         switchUp ?
843                             (mBufferingSettings.mRebufferingWatermarkLowMs * 1000ll + 1000000ll)
844                             : 0;
845                     bool needResumeUntil = false;
846                     sp<AMessage> stopParams = msg;
847                     if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
848                         // playback time hasn't passed startAt time
849                         if (!needResumeUntil) {
850                             ALOGV("finish switch");
851                             for (size_t i = 0; i < kMaxStreams; ++i) {
852                                 if ((mSwapMask & indexToType(i))
853                                         && uri == mStreams[i].mNewUri) {
854                                     // have to make a copy of mStreams[i].mUri because
855                                     // tryToFinishBandwidthSwitch is modifying mStreams[]
856                                     AString oldURI = mStreams[i].mUri;
857                                     tryToFinishBandwidthSwitch(oldURI);
858                                     break;
859                                 }
860                             }
861                         } else {
862                             // startAt time is after last enqueue time
863                             // Resume fetcher for the original variant; the resumed fetcher should
864                             // continue until the timestamps found in msg, which is stored by the
865                             // new fetcher to indicate where the new variant has started buffering.
866                             ALOGV("finish switch with resumeUntilAsync");
867                             for (size_t i = 0; i < mFetcherInfos.size(); i++) {
868                                 const FetcherInfo &info = mFetcherInfos.valueAt(i);
869                                 if (info.mToBeRemoved) {
870                                     info.mFetcher->resumeUntilAsync(stopParams);
871                                 }
872                             }
873                         }
874                     } else {
875                         // playback time passed startAt time
876                         if (switchUp) {
877                             // if switching up, cancel and retry if condition satisfies again
878                             ALOGV("cancel up switch because we're too late");
879                             cancelBandwidthSwitch(true /* resume */);
880                         } else {
881                             ALOGV("retry down switch at next sample");
882                             resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
883                         }
884                     }
885                     // re-enable all packet sources
886                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
887                         mPacketSources.editValueAt(i)->enable(true);
888                     }
889 
890                     break;
891                 }
892 
893                 case PlaylistFetcher::kWhatPlaylistFetched:
894                 {
895                     onMasterPlaylistFetched(msg);
896                     break;
897                 }
898 
899                 case PlaylistFetcher::kWhatMetadataDetected:
900                 {
901                     if (!mHasMetadata) {
902                         mHasMetadata = true;
903                         sp<AMessage> notify = mNotify->dup();
904                         notify->setInt32("what", kWhatMetadataDetected);
905                         notify->post();
906                     }
907                     break;
908                 }
909 
910                 default:
911                     TRESPASS();
912             }
913 
914             break;
915         }
916 
917         case kWhatChangeConfiguration:
918         {
919             onChangeConfiguration(msg);
920             break;
921         }
922 
923         case kWhatChangeConfiguration2:
924         {
925             onChangeConfiguration2(msg);
926             break;
927         }
928 
929         case kWhatChangeConfiguration3:
930         {
931             onChangeConfiguration3(msg);
932             break;
933         }
934 
935         case kWhatPollBuffering:
936         {
937             int32_t generation;
938             CHECK(msg->findInt32("generation", &generation));
939             if (generation == mPollBufferingGeneration) {
940                 onPollBuffering();
941             }
942             break;
943         }
944 
945         default:
946             TRESPASS();
947             break;
948     }
949 }
950 
951 // static
isBandwidthValid(const BandwidthItem & item)952 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
953     static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
954     return item.mLastFailureUs < 0
955             || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
956 }
957 
958 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)959 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
960     if (a->mBandwidth < b->mBandwidth) {
961         return -1;
962     } else if (a->mBandwidth == b->mBandwidth) {
963         return 0;
964     }
965 
966     return 1;
967 }
968 
969 // static
indexToType(int idx)970 LiveSession::StreamType LiveSession::indexToType(int idx) {
971     CHECK(idx >= 0 && idx < kNumSources);
972     return (StreamType)(1 << idx);
973 }
974 
975 // static
typeToIndex(int32_t type)976 ssize_t LiveSession::typeToIndex(int32_t type) {
977     switch (type) {
978         case STREAMTYPE_AUDIO:
979             return 0;
980         case STREAMTYPE_VIDEO:
981             return 1;
982         case STREAMTYPE_SUBTITLES:
983             return 2;
984         case STREAMTYPE_METADATA:
985             return 3;
986         default:
987             return -1;
988     };
989     return -1;
990 }
991 
onConnect(const sp<AMessage> & msg)992 void LiveSession::onConnect(const sp<AMessage> &msg) {
993     CHECK(msg->findString("url", &mMasterURL));
994 
995     // TODO currently we don't know if we are coming here from incognito mode
996     ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
997 
998     KeyedVector<String8, String8> *headers = NULL;
999     if (!msg->findPointer("headers", (void **)&headers)) {
1000         mExtraHeaders.clear();
1001     } else {
1002         mExtraHeaders = *headers;
1003 
1004         delete headers;
1005         headers = NULL;
1006     }
1007 
1008     // create looper for fetchers
1009     if (mFetcherLooper == NULL) {
1010         mFetcherLooper = new ALooper();
1011 
1012         mFetcherLooper->setName("Fetcher");
1013         mFetcherLooper->start(false, false);
1014     }
1015 
1016     // create fetcher to fetch the master playlist
1017     addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1018 }
1019 
onMasterPlaylistFetched(const sp<AMessage> & msg)1020 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1021     AString uri;
1022     CHECK(msg->findString("uri", &uri));
1023     ssize_t index = mFetcherInfos.indexOfKey(uri);
1024     if (index < 0) {
1025         ALOGW("fetcher for master playlist is gone.");
1026         return;
1027     }
1028 
1029     // no longer useful, remove
1030     mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1031     mFetcherInfos.removeItemsAt(index);
1032 
1033     CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1034     if (mPlaylist == NULL) {
1035         ALOGE("unable to fetch master playlist %s.",
1036                 uriDebugString(mMasterURL).c_str());
1037 
1038         postPrepared(ERROR_IO);
1039         return;
1040     }
1041     // We trust the content provider to make a reasonable choice of preferred
1042     // initial bandwidth by listing it first in the variant playlist.
1043     // At startup we really don't have a good estimate on the available
1044     // network bandwidth since we haven't tranferred any data yet. Once
1045     // we have we can make a better informed choice.
1046     size_t initialBandwidth = 0;
1047     size_t initialBandwidthIndex = 0;
1048 
1049     int32_t maxWidth = 0;
1050     int32_t maxHeight = 0;
1051 
1052     if (mPlaylist->isVariantPlaylist()) {
1053         Vector<BandwidthItem> itemsWithVideo;
1054         for (size_t i = 0; i < mPlaylist->size(); ++i) {
1055             BandwidthItem item;
1056 
1057             item.mPlaylistIndex = i;
1058             item.mLastFailureUs = -1ll;
1059 
1060             sp<AMessage> meta;
1061             AString uri;
1062             mPlaylist->itemAt(i, &uri, &meta);
1063 
1064             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1065 
1066             int32_t width, height;
1067             if (meta->findInt32("width", &width)) {
1068                 maxWidth = max(maxWidth, width);
1069             }
1070             if (meta->findInt32("height", &height)) {
1071                 maxHeight = max(maxHeight, height);
1072             }
1073 
1074             mBandwidthItems.push(item);
1075             if (mPlaylist->hasType(i, "video")) {
1076                 itemsWithVideo.push(item);
1077             }
1078         }
1079         // remove the audio-only variants if we have at least one with video
1080         if (!itemsWithVideo.empty()
1081                 && itemsWithVideo.size() < mBandwidthItems.size()) {
1082             mBandwidthItems.clear();
1083             for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1084                 mBandwidthItems.push(itemsWithVideo[i]);
1085             }
1086         }
1087 
1088         CHECK_GT(mBandwidthItems.size(), 0u);
1089         initialBandwidth = mBandwidthItems[0].mBandwidth;
1090 
1091         mBandwidthItems.sort(SortByBandwidth);
1092 
1093         for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1094             if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1095                 initialBandwidthIndex = i;
1096                 break;
1097             }
1098         }
1099     } else {
1100         // dummy item.
1101         BandwidthItem item;
1102         item.mPlaylistIndex = 0;
1103         item.mBandwidth = 0;
1104         mBandwidthItems.push(item);
1105     }
1106 
1107     mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1108     mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1109 
1110     mPlaylist->pickRandomMediaItems();
1111     changeConfiguration(
1112             0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1113 }
1114 
finishDisconnect()1115 void LiveSession::finishDisconnect() {
1116     ALOGV("finishDisconnect");
1117 
1118     // No reconfiguration is currently pending, make sure none will trigger
1119     // during disconnection either.
1120     cancelBandwidthSwitch();
1121 
1122     // cancel buffer polling
1123     cancelPollBuffering();
1124 
1125     // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1126     //
1127     // Some fetchers might be stuck in connect/getSize at this point. These
1128     // operations will eventually timeout (as we have a timeout set in
1129     // MediaHTTPConnection), but we don't want to block the main UI thread
1130     // until then. Here we just need to make sure we clear all references
1131     // to the fetchers, so that when they finally exit from the blocking
1132     // operation, they can be destructed.
1133     //
1134     // There is one very tricky point though. For this scheme to work, the
1135     // fecther must hold a reference to LiveSession, so that LiveSession is
1136     // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1137     // own destructor when it waits for mFetcherLooper to stop, which still
1138     // blocks main UI thread.
1139     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1140         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1141         mFetcherLooper->unregisterHandler(
1142                 mFetcherInfos.valueAt(i).mFetcher->id());
1143     }
1144     mFetcherInfos.clear();
1145 
1146     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1147     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1148 
1149     mPacketSources.valueFor(
1150             STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1151 
1152     sp<AMessage> response = new AMessage;
1153     response->setInt32("err", OK);
1154 
1155     response->postReply(mDisconnectReplyID);
1156     mDisconnectReplyID.clear();
1157 }
1158 
addFetcher(const char * uri)1159 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1160     ssize_t index = mFetcherInfos.indexOfKey(uri);
1161 
1162     if (index >= 0) {
1163         return NULL;
1164     }
1165 
1166     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1167     notify->setString("uri", uri);
1168     notify->setInt32("switchGeneration", mSwitchGeneration);
1169 
1170     FetcherInfo info;
1171     info.mFetcher = new PlaylistFetcher(
1172             notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1173     info.mDurationUs = -1ll;
1174     info.mToBeRemoved = false;
1175     info.mToBeResumed = false;
1176     mFetcherLooper->registerHandler(info.mFetcher);
1177 
1178     mFetcherInfos.add(uri, info);
1179 
1180     return info.mFetcher;
1181 }
1182 
1183 #if 0
1184 static double uniformRand() {
1185     return (double)rand() / RAND_MAX;
1186 }
1187 #endif
1188 
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1189 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1190     ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1191             newUri ? "true" : "false",
1192             newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1193     return i >= 0
1194             && ((!newUri && uri == mStreams[i].mUri)
1195             || (newUri && uri == mStreams[i].mNewUri));
1196 }
1197 
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1198 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1199         size_t trackIndex, bool newUri) {
1200     StreamType type = indexToType(trackIndex);
1201     sp<AnotherPacketSource> source = NULL;
1202     if (newUri) {
1203         source = mPacketSources2.valueFor(type);
1204         source->clear();
1205     } else {
1206         source = mPacketSources.valueFor(type);
1207     };
1208     return source;
1209 }
1210 
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1211 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1212         sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1213     // todo: One case where the following strategy can fail is when audio and video
1214     // are in separate playlists, both are transport streams, and the metadata
1215     // is actually contained in the audio stream.
1216     ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1217             streamMask, newUri ? "true" : "false");
1218 
1219     if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1220             || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1221             // ... audio fetcher for audio only variant
1222         return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1223     }
1224 
1225     return NULL;
1226 }
1227 
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1228 bool LiveSession::resumeFetcher(
1229         const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1230     ssize_t index = mFetcherInfos.indexOfKey(uri);
1231     if (index < 0) {
1232         ALOGE("did not find fetcher for uri: %s", uri.c_str());
1233         return false;
1234     }
1235 
1236     bool resume = false;
1237     sp<AnotherPacketSource> sources[kNumSources];
1238     for (size_t i = 0; i < kMaxStreams; ++i) {
1239         if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1240             resume = true;
1241             sources[i] = getPacketSourceForStreamIndex(i, newUri);
1242         }
1243     }
1244 
1245     if (resume) {
1246         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1247         SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1248 
1249         ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1250                 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1251 
1252         fetcher->startAsync(
1253                 sources[kAudioIndex],
1254                 sources[kVideoIndex],
1255                 sources[kSubtitleIndex],
1256                 getMetadataSource(sources, streamMask, newUri),
1257                 timeUs, -1, -1, seekMode);
1258     }
1259 
1260     return resume;
1261 }
1262 
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1263 float LiveSession::getAbortThreshold(
1264         ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1265     float abortThreshold = -1.0f;
1266     if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1267         /*
1268            If we're switching down, we need to decide whether to
1269 
1270            1) finish last segment of high-bandwidth variant, or
1271            2) abort last segment of high-bandwidth variant, and fetch an
1272               overlapping portion from low-bandwidth variant.
1273 
1274            Here we try to maximize the amount of buffer left when the
1275            switch point is met. Given the following parameters:
1276 
1277            B: our current buffering level in seconds
1278            T: target duration in seconds
1279            X: sample duration in seconds remain to fetch in last segment
1280            bw0: bandwidth of old variant (as specified in playlist)
1281            bw1: bandwidth of new variant (as specified in playlist)
1282            bw: measured bandwidth available
1283 
1284            If we choose 1), when switch happens at the end of current
1285            segment, our buffering will be
1286                   B + X - X * bw0 / bw
1287 
1288            If we choose 2), when switch happens where we aborted current
1289            segment, our buffering will be
1290                   B - (T - X) * bw1 / bw
1291 
1292            We should only choose 1) if
1293                   X/T < bw1 / (bw1 + bw0 - bw)
1294         */
1295 
1296         // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1297         // our estimate could be far off, and fetching old bandwidth could
1298         // take too long.
1299         if (!mLastBandwidthStable) {
1300             return 0.0f;
1301         }
1302 
1303         // Taking the measured current bandwidth at 50% face value only,
1304         // as our bandwidth estimation is a lagging indicator. Being
1305         // conservative on this, we prefer switching to lower bandwidth
1306         // unless we're really confident finishing up the last segment
1307         // of higher bandwidth will be fast.
1308         CHECK(mLastBandwidthBps >= 0);
1309         abortThreshold =
1310                 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1311              / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1312               + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1313               - (float)mLastBandwidthBps * 0.5f);
1314         if (abortThreshold < 0.0f) {
1315             abortThreshold = -1.0f; // do not abort
1316         }
1317         ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1318                 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1319                 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1320                 mLastBandwidthBps,
1321                 abortThreshold);
1322     }
1323     return abortThreshold;
1324 }
1325 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1326 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1327     mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1328 }
1329 
getLowestValidBandwidthIndex() const1330 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1331     for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1332         if (isBandwidthValid(mBandwidthItems[index])) {
1333             return index;
1334         }
1335     }
1336     // if playlists are all blacklisted, return 0 and hope it's alive
1337     return 0;
1338 }
1339 
getBandwidthIndex(int32_t bandwidthBps)1340 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1341     if (mBandwidthItems.size() < 2) {
1342         // shouldn't be here if we only have 1 bandwidth, check
1343         // logic to get rid of redundant bandwidth polling
1344         ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1345         return 0;
1346     }
1347 
1348 #if 1
1349     char value[PROPERTY_VALUE_MAX];
1350     ssize_t index = -1;
1351     if (property_get("media.httplive.bw-index", value, NULL)) {
1352         char *end;
1353         index = strtol(value, &end, 10);
1354         CHECK(end > value && *end == '\0');
1355 
1356         if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1357             index = mBandwidthItems.size() - 1;
1358         }
1359     }
1360 
1361     if (index < 0) {
1362         char value[PROPERTY_VALUE_MAX];
1363         if (property_get("media.httplive.max-bw", value, NULL)) {
1364             char *end;
1365             long maxBw = strtoul(value, &end, 10);
1366             if (end > value && *end == '\0') {
1367                 if (maxBw > 0 && bandwidthBps > maxBw) {
1368                     ALOGV("bandwidth capped to %ld bps", maxBw);
1369                     bandwidthBps = maxBw;
1370                 }
1371             }
1372         }
1373 
1374         // Pick the highest bandwidth stream that's not currently blacklisted
1375         // below or equal to estimated bandwidth.
1376 
1377         index = mBandwidthItems.size() - 1;
1378         ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1379         while (index > lowestBandwidth) {
1380             // be conservative (70%) to avoid overestimating and immediately
1381             // switching down again.
1382             size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1383             const BandwidthItem &item = mBandwidthItems[index];
1384             if (item.mBandwidth <= adjustedBandwidthBps
1385                     && isBandwidthValid(item)) {
1386                 break;
1387             }
1388             --index;
1389         }
1390     }
1391 #elif 0
1392     // Change bandwidth at random()
1393     size_t index = uniformRand() * mBandwidthItems.size();
1394 #elif 0
1395     // There's a 50% chance to stay on the current bandwidth and
1396     // a 50% chance to switch to the next higher bandwidth (wrapping around
1397     // to lowest)
1398     const size_t kMinIndex = 0;
1399 
1400     static ssize_t mCurBandwidthIndex = -1;
1401 
1402     size_t index;
1403     if (mCurBandwidthIndex < 0) {
1404         index = kMinIndex;
1405     } else if (uniformRand() < 0.5) {
1406         index = (size_t)mCurBandwidthIndex;
1407     } else {
1408         index = mCurBandwidthIndex + 1;
1409         if (index == mBandwidthItems.size()) {
1410             index = kMinIndex;
1411         }
1412     }
1413     mCurBandwidthIndex = index;
1414 #elif 0
1415     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1416 
1417     size_t index = mBandwidthItems.size() - 1;
1418     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1419         --index;
1420     }
1421 #elif 1
1422     char value[PROPERTY_VALUE_MAX];
1423     size_t index;
1424     if (property_get("media.httplive.bw-index", value, NULL)) {
1425         char *end;
1426         index = strtoul(value, &end, 10);
1427         CHECK(end > value && *end == '\0');
1428 
1429         if (index >= mBandwidthItems.size()) {
1430             index = mBandwidthItems.size() - 1;
1431         }
1432     } else {
1433         index = 0;
1434     }
1435 #else
1436     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1437 #endif
1438 
1439     CHECK_GE(index, 0);
1440 
1441     return index;
1442 }
1443 
latestMediaSegmentStartTime() const1444 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1445     HLSTime audioTime(mPacketSources.valueFor(
1446                     STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1447 
1448     HLSTime videoTime(mPacketSources.valueFor(
1449                     STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1450 
1451     return audioTime < videoTime ? videoTime : audioTime;
1452 }
1453 
onSeek(const sp<AMessage> & msg)1454 void LiveSession::onSeek(const sp<AMessage> &msg) {
1455     int64_t timeUs;
1456     int32_t mode;
1457     CHECK(msg->findInt64("timeUs", &timeUs));
1458     CHECK(msg->findInt32("mode", &mode));
1459     // TODO: add "mode" to changeConfiguration.
1460     changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1461 }
1462 
getDuration(int64_t * durationUs) const1463 status_t LiveSession::getDuration(int64_t *durationUs) const {
1464     int64_t maxDurationUs = -1ll;
1465     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1466         int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1467 
1468         if (fetcherDurationUs > maxDurationUs) {
1469             maxDurationUs = fetcherDurationUs;
1470         }
1471     }
1472 
1473     *durationUs = maxDurationUs;
1474 
1475     return OK;
1476 }
1477 
isSeekable() const1478 bool LiveSession::isSeekable() const {
1479     int64_t durationUs;
1480     return getDuration(&durationUs) == OK && durationUs >= 0;
1481 }
1482 
hasDynamicDuration() const1483 bool LiveSession::hasDynamicDuration() const {
1484     return false;
1485 }
1486 
getTrackCount() const1487 size_t LiveSession::getTrackCount() const {
1488     if (mPlaylist == NULL) {
1489         return 0;
1490     } else {
1491         return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1492     }
1493 }
1494 
getTrackInfo(size_t trackIndex) const1495 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1496     if (mPlaylist == NULL) {
1497         return NULL;
1498     } else {
1499         if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1500             sp<AMessage> format = new AMessage();
1501             format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1502             format->setString("language", "und");
1503             format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1504             return format;
1505         }
1506         return mPlaylist->getTrackInfo(trackIndex);
1507     }
1508 }
1509 
selectTrack(size_t index,bool select)1510 status_t LiveSession::selectTrack(size_t index, bool select) {
1511     if (mPlaylist == NULL) {
1512         return INVALID_OPERATION;
1513     }
1514 
1515     ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1516             index, select, mSubtitleGeneration);
1517 
1518     ++mSubtitleGeneration;
1519     status_t err = mPlaylist->selectTrack(index, select);
1520     if (err == OK) {
1521         sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1522         msg->setInt32("pickTrack", select);
1523         msg->post();
1524     }
1525     return err;
1526 }
1527 
getSelectedTrack(media_track_type type) const1528 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1529     if (mPlaylist == NULL) {
1530         return -1;
1531     } else {
1532         return mPlaylist->getSelectedTrack(type);
1533     }
1534 }
1535 
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1536 void LiveSession::changeConfiguration(
1537         int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1538     ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1539           (long long)timeUs, bandwidthIndex, pickTrack);
1540 
1541     cancelBandwidthSwitch();
1542 
1543     CHECK(!mReconfigurationInProgress);
1544     mReconfigurationInProgress = true;
1545     if (bandwidthIndex >= 0) {
1546         mOrigBandwidthIndex = mCurBandwidthIndex;
1547         mCurBandwidthIndex = bandwidthIndex;
1548         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1549             ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1550                     mOrigBandwidthIndex, mCurBandwidthIndex);
1551         }
1552     }
1553     CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1554     const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1555 
1556     uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1557     uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1558 
1559     AString URIs[kMaxStreams];
1560     for (size_t i = 0; i < kMaxStreams; ++i) {
1561         if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1562             streamMask |= indexToType(i);
1563         }
1564     }
1565 
1566     // Step 1, stop and discard fetchers that are no longer needed.
1567     // Pause those that we'll reuse.
1568     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1569         // skip fetchers that are marked mToBeRemoved,
1570         // these are done and can't be reused
1571         if (mFetcherInfos[i].mToBeRemoved) {
1572             continue;
1573         }
1574 
1575         const AString &uri = mFetcherInfos.keyAt(i);
1576         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1577 
1578         bool discardFetcher = true, delayRemoval = false;
1579         for (size_t j = 0; j < kMaxStreams; ++j) {
1580             StreamType type = indexToType(j);
1581             if ((streamMask & type) && uri == URIs[j]) {
1582                 resumeMask |= type;
1583                 streamMask &= ~type;
1584                 discardFetcher = false;
1585             }
1586         }
1587         // Delay fetcher removal if not picking tracks, AND old fetcher
1588         // has stream mask that overlaps new variant. (Okay to discard
1589         // old fetcher now, if completely no overlap.)
1590         if (discardFetcher && timeUs < 0ll && !pickTrack
1591                 && (fetcher->getStreamTypeMask() & streamMask)) {
1592             discardFetcher = false;
1593             delayRemoval = true;
1594         }
1595 
1596         if (discardFetcher) {
1597             ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1598             fetcher->stopAsync();
1599         } else {
1600             float threshold = 0.0f; // default to pause after current block (47Kbytes)
1601             bool disconnect = false;
1602             if (timeUs >= 0ll) {
1603                 // seeking, no need to finish fetching
1604                 disconnect = true;
1605             } else if (delayRemoval) {
1606                 // adapting, abort if remaining of current segment is over threshold
1607                 threshold = getAbortThreshold(
1608                         mOrigBandwidthIndex, mCurBandwidthIndex);
1609             }
1610 
1611             ALOGV("pausing fetcher-%d, threshold=%.2f",
1612                     fetcher->getFetcherID(), threshold);
1613             fetcher->pauseAsync(threshold, disconnect);
1614         }
1615     }
1616 
1617     sp<AMessage> msg;
1618     if (timeUs < 0ll) {
1619         // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1620         msg = new AMessage(kWhatChangeConfiguration3, this);
1621     } else {
1622         msg = new AMessage(kWhatChangeConfiguration2, this);
1623     }
1624     msg->setInt32("streamMask", streamMask);
1625     msg->setInt32("resumeMask", resumeMask);
1626     msg->setInt32("pickTrack", pickTrack);
1627     msg->setInt64("timeUs", timeUs);
1628     for (size_t i = 0; i < kMaxStreams; ++i) {
1629         if ((streamMask | resumeMask) & indexToType(i)) {
1630             msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1631         }
1632     }
1633 
1634     // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1635     // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1636     // fetchers have completed their asynchronous operation, we'll post
1637     // mContinuation, which then is handled below in onChangeConfiguration2.
1638     mContinuationCounter = mFetcherInfos.size();
1639     mContinuation = msg;
1640 
1641     if (mContinuationCounter == 0) {
1642         msg->post();
1643     }
1644 }
1645 
onChangeConfiguration(const sp<AMessage> & msg)1646 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1647     ALOGV("onChangeConfiguration");
1648 
1649     if (!mReconfigurationInProgress) {
1650         int32_t pickTrack = 0;
1651         msg->findInt32("pickTrack", &pickTrack);
1652         changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1653     } else {
1654         msg->post(1000000ll); // retry in 1 sec
1655     }
1656 }
1657 
onChangeConfiguration2(const sp<AMessage> & msg)1658 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1659     ALOGV("onChangeConfiguration2");
1660 
1661     mContinuation.clear();
1662 
1663     // All fetchers are either suspended or have been removed now.
1664 
1665     // If we're seeking, clear all packet sources before we report
1666     // seek complete, to prevent decoder from pulling stale data.
1667     int64_t timeUs;
1668     CHECK(msg->findInt64("timeUs", &timeUs));
1669 
1670     if (timeUs >= 0) {
1671         mLastSeekTimeUs = timeUs;
1672         mLastDequeuedTimeUs = timeUs;
1673 
1674         for (size_t i = 0; i < mPacketSources.size(); i++) {
1675             sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1676             sp<MetaData> format = packetSource->getFormat();
1677             packetSource->clear();
1678             // Set a tentative format here such that HTTPLiveSource will always have
1679             // a format available when NuPlayer queries. Without an available video
1680             // format when setting a surface NuPlayer might disable video decoding
1681             // altogether. The tentative format will be overwritten by the
1682             // authoritative (and possibly same) format once content from the new
1683             // position is dequeued.
1684             packetSource->setFormat(format);
1685         }
1686 
1687         for (size_t i = 0; i < kMaxStreams; ++i) {
1688             mStreams[i].reset();
1689         }
1690 
1691         mDiscontinuityOffsetTimesUs.clear();
1692         mDiscontinuityAbsStartTimesUs.clear();
1693 
1694         if (mSeekReplyID != NULL) {
1695             CHECK(mSeekReply != NULL);
1696             mSeekReply->setInt32("err", OK);
1697             mSeekReply->postReply(mSeekReplyID);
1698             mSeekReplyID.clear();
1699             mSeekReply.clear();
1700         }
1701 
1702         // restart buffer polling after seek becauese previous
1703         // buffering position is no longer valid.
1704         restartPollBuffering();
1705     }
1706 
1707     uint32_t streamMask, resumeMask;
1708     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1709     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1710 
1711     streamMask |= resumeMask;
1712 
1713     AString URIs[kMaxStreams];
1714     for (size_t i = 0; i < kMaxStreams; ++i) {
1715         if (streamMask & indexToType(i)) {
1716             const AString &uriKey = mStreams[i].uriKey();
1717             CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1718             ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1719         }
1720     }
1721 
1722     uint32_t changedMask = 0;
1723     for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1724         // stream URI could change even if onChangeConfiguration2 is only
1725         // used for seek. Seek could happen during a bw switch, in this
1726         // case bw switch will be cancelled, but the seekTo position will
1727         // fetch from the new URI.
1728         if ((mStreamMask & streamMask & indexToType(i))
1729                 && !mStreams[i].mUri.empty()
1730                 && !(URIs[i] == mStreams[i].mUri)) {
1731             ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1732                     mStreams[i].mUri.c_str(), URIs[i].c_str());
1733             sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1734             if (source->getLatestDequeuedMeta() != NULL) {
1735                 source->queueDiscontinuity(
1736                         ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1737             }
1738         }
1739         // Determine which decoders to shutdown on the player side,
1740         // a decoder has to be shutdown if its streamtype was active
1741         // before but now longer isn't.
1742         if ((mStreamMask & ~streamMask & indexToType(i))) {
1743             changedMask |= indexToType(i);
1744         }
1745     }
1746 
1747     if (changedMask == 0) {
1748         // If nothing changed as far as the audio/video decoders
1749         // are concerned we can proceed.
1750         onChangeConfiguration3(msg);
1751         return;
1752     }
1753 
1754     // Something changed, inform the player which will shutdown the
1755     // corresponding decoders and will post the reply once that's done.
1756     // Handling the reply will continue executing below in
1757     // onChangeConfiguration3.
1758     sp<AMessage> notify = mNotify->dup();
1759     notify->setInt32("what", kWhatStreamsChanged);
1760     notify->setInt32("changedMask", changedMask);
1761 
1762     msg->setWhat(kWhatChangeConfiguration3);
1763     msg->setTarget(this);
1764 
1765     notify->setMessage("reply", msg);
1766     notify->post();
1767 }
1768 
onChangeConfiguration3(const sp<AMessage> & msg)1769 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1770     mContinuation.clear();
1771     // All remaining fetchers are still suspended, the player has shutdown
1772     // any decoders that needed it.
1773 
1774     uint32_t streamMask, resumeMask;
1775     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1776     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1777 
1778     mNewStreamMask = streamMask | resumeMask;
1779 
1780     int64_t timeUs;
1781     int32_t pickTrack;
1782     bool switching = false;
1783     CHECK(msg->findInt64("timeUs", &timeUs));
1784     CHECK(msg->findInt32("pickTrack", &pickTrack));
1785 
1786     if (timeUs < 0ll) {
1787         if (!pickTrack) {
1788             // mSwapMask contains streams that are in both old and new variant,
1789             // (in mNewStreamMask & mStreamMask) but with different URIs
1790             // (not in resumeMask).
1791             // For example, old variant has video and audio in two separate
1792             // URIs, and new variant has only audio with unchanged URI. mSwapMask
1793             // should be 0 as there is nothing to swap. We only need to stop video,
1794             // and resume audio.
1795             mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1796             switching = (mSwapMask != 0);
1797         }
1798         mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1799     } else {
1800         mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1801     }
1802 
1803     ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1804             "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1805             (long long)timeUs, switching, pickTrack,
1806             mStreamMask, mNewStreamMask, mSwapMask);
1807 
1808     for (size_t i = 0; i < kMaxStreams; ++i) {
1809         if (streamMask & indexToType(i)) {
1810             if (switching) {
1811                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1812             } else {
1813                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1814             }
1815         }
1816     }
1817 
1818     // Of all existing fetchers:
1819     // * Resume fetchers that are still needed and assign them original packet sources.
1820     // * Mark otherwise unneeded fetchers for removal.
1821     ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1822     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1823         const AString &uri = mFetcherInfos.keyAt(i);
1824         if (!resumeFetcher(uri, resumeMask, timeUs)) {
1825             ALOGV("marking fetcher-%d to be removed",
1826                     mFetcherInfos[i].mFetcher->getFetcherID());
1827 
1828             mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1829         }
1830     }
1831 
1832     // streamMask now only contains the types that need a new fetcher created.
1833     if (streamMask != 0) {
1834         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1835     }
1836 
1837     // Find out when the original fetchers have buffered up to and start the new fetchers
1838     // at a later timestamp.
1839     for (size_t i = 0; i < kMaxStreams; i++) {
1840         if (!(indexToType(i) & streamMask)) {
1841             continue;
1842         }
1843 
1844         AString uri;
1845         uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1846 
1847         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1848         CHECK(fetcher != NULL);
1849 
1850         HLSTime startTime;
1851         SeekMode seekMode = kSeekModeExactPosition;
1852         sp<AnotherPacketSource> sources[kNumSources];
1853 
1854         if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1855             startTime = latestMediaSegmentStartTime();
1856         }
1857 
1858         // TRICKY: looping from i as earlier streams are already removed from streamMask
1859         for (size_t j = i; j < kMaxStreams; ++j) {
1860             const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1861             if ((streamMask & indexToType(j)) && uri == streamUri) {
1862                 sources[j] = mPacketSources.valueFor(indexToType(j));
1863 
1864                 if (timeUs >= 0) {
1865                     startTime.mTimeUs = timeUs;
1866                 } else {
1867                     int32_t type;
1868                     sp<AMessage> meta;
1869                     if (!switching) {
1870                         // selecting, or adapting but no swap required
1871                         meta = sources[j]->getLatestDequeuedMeta();
1872                     } else {
1873                         // adapting and swap required
1874                         meta = sources[j]->getLatestEnqueuedMeta();
1875                         if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1876                             // switching up
1877                             meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1878                         }
1879                     }
1880 
1881                     if ((j == kAudioIndex || j == kVideoIndex)
1882                             && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1883                         HLSTime tmpTime(meta);
1884                         if (startTime < tmpTime) {
1885                             startTime = tmpTime;
1886                         }
1887                     }
1888 
1889                     if (!switching) {
1890                         // selecting, or adapting but no swap required
1891                         sources[j]->clear();
1892                         if (j == kSubtitleIndex) {
1893                             break;
1894                         }
1895 
1896                         ALOGV("stream[%zu]: queue format change", j);
1897                         sources[j]->queueDiscontinuity(
1898                                 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1899                     } else {
1900                         // switching, queue discontinuities after resume
1901                         sources[j] = mPacketSources2.valueFor(indexToType(j));
1902                         sources[j]->clear();
1903                         // the new fetcher might be providing streams that used to be
1904                         // provided by two different fetchers,  if one of the fetcher
1905                         // paused in the middle while the other somehow paused in next
1906                         // seg, we have to start from next seg.
1907                         if (seekMode < mStreams[j].mSeekMode) {
1908                             seekMode = mStreams[j].mSeekMode;
1909                         }
1910                     }
1911                 }
1912 
1913                 streamMask &= ~indexToType(j);
1914             }
1915         }
1916 
1917         ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1918                 "segmentStartTimeUs %lld seekMode %d",
1919                 fetcher->getFetcherID(),
1920                 (long long)startTime.mTimeUs,
1921                 (long long)mLastSeekTimeUs,
1922                 (long long)startTime.getSegmentTimeUs(),
1923                 seekMode);
1924 
1925         // Set the target segment start time to the middle point of the
1926         // segment where the last sample was.
1927         // This gives a better guess if segments of the two variants are not
1928         // perfectly aligned. (If the corresponding segment in new variant
1929         // starts slightly later than that in the old variant, we still want
1930         // to pick that segment, not the one before)
1931         fetcher->startAsync(
1932                 sources[kAudioIndex],
1933                 sources[kVideoIndex],
1934                 sources[kSubtitleIndex],
1935                 getMetadataSource(sources, mNewStreamMask, switching),
1936                 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1937                 startTime.getSegmentTimeUs(),
1938                 startTime.mSeq,
1939                 seekMode);
1940     }
1941 
1942     // All fetchers have now been started, the configuration change
1943     // has completed.
1944 
1945     mReconfigurationInProgress = false;
1946     if (switching) {
1947         mSwitchInProgress = true;
1948     } else {
1949         mStreamMask = mNewStreamMask;
1950         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1951             ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1952                     mOrigBandwidthIndex, mCurBandwidthIndex);
1953             mOrigBandwidthIndex = mCurBandwidthIndex;
1954         }
1955     }
1956 
1957     ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1958             mSwitchInProgress, mStreamMask);
1959 
1960     if (mDisconnectReplyID != NULL) {
1961         finishDisconnect();
1962     }
1963 }
1964 
swapPacketSource(StreamType stream)1965 void LiveSession::swapPacketSource(StreamType stream) {
1966     ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1967 
1968     // transfer packets from source2 to source
1969     sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1970     sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1971 
1972     // queue discontinuity in mPacketSource
1973     aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1974 
1975     // queue packets in mPacketSource2 to mPacketSource
1976     status_t finalResult = OK;
1977     sp<ABuffer> accessUnit;
1978     while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1979           OK == aps2->dequeueAccessUnit(&accessUnit)) {
1980         aps->queueAccessUnit(accessUnit);
1981     }
1982     aps2->clear();
1983 }
1984 
tryToFinishBandwidthSwitch(const AString & oldUri)1985 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1986     if (!mSwitchInProgress) {
1987         return;
1988     }
1989 
1990     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1991     if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1992         return;
1993     }
1994 
1995     // Swap packet source of streams provided by old variant
1996     for (size_t idx = 0; idx < kMaxStreams; idx++) {
1997         StreamType stream = indexToType(idx);
1998         if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1999             swapPacketSource(stream);
2000 
2001             if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2002                 ALOGW("swapping stream type %d %s to empty stream",
2003                         stream, mStreams[idx].mUri.c_str());
2004             }
2005             mStreams[idx].mUri = mStreams[idx].mNewUri;
2006             mStreams[idx].mNewUri.clear();
2007 
2008             mSwapMask &= ~stream;
2009         }
2010     }
2011 
2012     mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2013 
2014     ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2015     if (mSwapMask != 0) {
2016         return;
2017     }
2018 
2019     // Check if new variant contains extra streams.
2020     uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2021     while (extraStreams) {
2022         StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2023         extraStreams &= ~stream;
2024 
2025         swapPacketSource(stream);
2026 
2027         ssize_t idx = typeToIndex(stream);
2028         CHECK(idx >= 0);
2029         if (mStreams[idx].mNewUri.empty()) {
2030             ALOGW("swapping extra stream type %d %s to empty stream",
2031                     stream, mStreams[idx].mUri.c_str());
2032         }
2033         mStreams[idx].mUri = mStreams[idx].mNewUri;
2034         mStreams[idx].mNewUri.clear();
2035     }
2036 
2037     // Restart new fetcher (it was paused after the first 47k block)
2038     // and let it fetch into mPacketSources (not mPacketSources2)
2039     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2040         FetcherInfo &info = mFetcherInfos.editValueAt(i);
2041         if (info.mToBeResumed) {
2042             resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2043             info.mToBeResumed = false;
2044         }
2045     }
2046 
2047     ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2048             mOrigBandwidthIndex, mCurBandwidthIndex);
2049 
2050     mStreamMask = mNewStreamMask;
2051     mSwitchInProgress = false;
2052     mOrigBandwidthIndex = mCurBandwidthIndex;
2053 
2054     restartPollBuffering();
2055 }
2056 
schedulePollBuffering()2057 void LiveSession::schedulePollBuffering() {
2058     sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2059     msg->setInt32("generation", mPollBufferingGeneration);
2060     msg->post(1000000ll);
2061 }
2062 
cancelPollBuffering()2063 void LiveSession::cancelPollBuffering() {
2064     ++mPollBufferingGeneration;
2065     mPrevBufferPercentage = -1;
2066 }
2067 
restartPollBuffering()2068 void LiveSession::restartPollBuffering() {
2069     cancelPollBuffering();
2070     onPollBuffering();
2071 }
2072 
onPollBuffering()2073 void LiveSession::onPollBuffering() {
2074     ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2075             "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2076         mSwitchInProgress, mReconfigurationInProgress,
2077         mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2078 
2079     bool underflow, ready, down, up;
2080     if (checkBuffering(underflow, ready, down, up)) {
2081         if (mInPreparationPhase) {
2082             // Allow down switch even if we're still preparing.
2083             //
2084             // Some streams have a high bandwidth index as default,
2085             // when bandwidth is low, it takes a long time to buffer
2086             // to ready mark, then it immediately pauses after start
2087             // as we have to do a down switch. It's better experience
2088             // to restart from a lower index, if we detect low bw.
2089             if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2090                 postPrepared(OK);
2091             }
2092         }
2093 
2094         if (!mInPreparationPhase) {
2095             if (ready) {
2096                 stopBufferingIfNecessary();
2097             } else if (underflow) {
2098                 startBufferingIfNecessary();
2099             }
2100             switchBandwidthIfNeeded(up, down);
2101         }
2102     }
2103 
2104     schedulePollBuffering();
2105 }
2106 
cancelBandwidthSwitch(bool resume)2107 void LiveSession::cancelBandwidthSwitch(bool resume) {
2108     ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2109             mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2110     if (!mSwitchInProgress) {
2111         return;
2112     }
2113 
2114     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2115         FetcherInfo& info = mFetcherInfos.editValueAt(i);
2116         if (info.mToBeRemoved) {
2117             info.mToBeRemoved = false;
2118             if (resume) {
2119                 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2120             }
2121         }
2122     }
2123 
2124     for (size_t i = 0; i < kMaxStreams; ++i) {
2125         AString newUri = mStreams[i].mNewUri;
2126         if (!newUri.empty()) {
2127             // clear all mNewUri matching this newUri
2128             for (size_t j = i; j < kMaxStreams; ++j) {
2129                 if (mStreams[j].mNewUri == newUri) {
2130                     mStreams[j].mNewUri.clear();
2131                 }
2132             }
2133             ALOGV("stopping newUri = %s", newUri.c_str());
2134             ssize_t index = mFetcherInfos.indexOfKey(newUri);
2135             if (index < 0) {
2136                 ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2137                 continue;
2138             }
2139             FetcherInfo &info = mFetcherInfos.editValueAt(index);
2140             info.mToBeRemoved = true;
2141             info.mFetcher->stopAsync();
2142         }
2143     }
2144 
2145     ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2146             mOrigBandwidthIndex, mCurBandwidthIndex);
2147 
2148     mSwitchGeneration++;
2149     mSwitchInProgress = false;
2150     mCurBandwidthIndex = mOrigBandwidthIndex;
2151     mSwapMask = 0;
2152 }
2153 
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2154 bool LiveSession::checkBuffering(
2155         bool &underflow, bool &ready, bool &down, bool &up) {
2156     underflow = ready = down = up = false;
2157 
2158     if (mReconfigurationInProgress) {
2159         ALOGV("Switch/Reconfig in progress, defer buffer polling");
2160         return false;
2161     }
2162 
2163     size_t activeCount, underflowCount, readyCount, downCount, upCount;
2164     activeCount = underflowCount = readyCount = downCount = upCount =0;
2165     int32_t minBufferPercent = -1;
2166     int64_t durationUs;
2167     if (getDuration(&durationUs) != OK) {
2168         durationUs = -1;
2169     }
2170     for (size_t i = 0; i < mPacketSources.size(); ++i) {
2171         // we don't check subtitles for buffering level
2172         if (!(mStreamMask & mPacketSources.keyAt(i)
2173                 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2174             continue;
2175         }
2176         // ignore streams that never had any packet queued.
2177         // (it's possible that the variant only has audio or video)
2178         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2179         if (meta == NULL) {
2180             continue;
2181         }
2182 
2183         status_t finalResult;
2184         int64_t bufferedDurationUs =
2185                 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2186         ALOGV("[%s] buffered %lld us",
2187                 getNameForStream(mPacketSources.keyAt(i)),
2188                 (long long)bufferedDurationUs);
2189         if (durationUs >= 0) {
2190             int32_t percent;
2191             if (mPacketSources[i]->isFinished(0 /* duration */)) {
2192                 percent = 100;
2193             } else {
2194                 percent = (int32_t)(100.0 *
2195                         (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2196             }
2197             if (minBufferPercent < 0 || percent < minBufferPercent) {
2198                 minBufferPercent = percent;
2199             }
2200         }
2201 
2202         ++activeCount;
2203         int64_t readyMarkUs =
2204             (mInPreparationPhase ?
2205                 mBufferingSettings.mInitialWatermarkMs :
2206                 mBufferingSettings.mRebufferingWatermarkHighMs) * 1000ll;
2207         if (bufferedDurationUs > readyMarkUs
2208                 || mPacketSources[i]->isFinished(0)) {
2209             ++readyCount;
2210         }
2211         if (!mPacketSources[i]->isFinished(0)) {
2212             if (bufferedDurationUs < mBufferingSettings.mRebufferingWatermarkLowMs * 1000ll) {
2213                 ++underflowCount;
2214             }
2215             if (bufferedDurationUs > mUpSwitchMark) {
2216                 ++upCount;
2217             }
2218             if (bufferedDurationUs < mDownSwitchMark) {
2219                 ++downCount;
2220             }
2221         }
2222     }
2223 
2224     if (minBufferPercent >= 0) {
2225         notifyBufferingUpdate(minBufferPercent);
2226     }
2227 
2228     if (activeCount > 0) {
2229         up        = (upCount == activeCount);
2230         down      = (downCount > 0);
2231         ready     = (readyCount == activeCount);
2232         underflow = (underflowCount > 0);
2233         return true;
2234     }
2235 
2236     return false;
2237 }
2238 
startBufferingIfNecessary()2239 void LiveSession::startBufferingIfNecessary() {
2240     ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2241             mInPreparationPhase, mBuffering);
2242     if (!mBuffering) {
2243         mBuffering = true;
2244 
2245         sp<AMessage> notify = mNotify->dup();
2246         notify->setInt32("what", kWhatBufferingStart);
2247         notify->post();
2248     }
2249 }
2250 
stopBufferingIfNecessary()2251 void LiveSession::stopBufferingIfNecessary() {
2252     ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2253             mInPreparationPhase, mBuffering);
2254 
2255     if (mBuffering) {
2256         mBuffering = false;
2257 
2258         sp<AMessage> notify = mNotify->dup();
2259         notify->setInt32("what", kWhatBufferingEnd);
2260         notify->post();
2261     }
2262 }
2263 
notifyBufferingUpdate(int32_t percentage)2264 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2265     if (percentage < mPrevBufferPercentage) {
2266         percentage = mPrevBufferPercentage;
2267     } else if (percentage > 100) {
2268         percentage = 100;
2269     }
2270 
2271     mPrevBufferPercentage = percentage;
2272 
2273     ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2274 
2275     sp<AMessage> notify = mNotify->dup();
2276     notify->setInt32("what", kWhatBufferingUpdate);
2277     notify->setInt32("percentage", percentage);
2278     notify->post();
2279 }
2280 
tryBandwidthFallback()2281 bool LiveSession::tryBandwidthFallback() {
2282     if (mInPreparationPhase || mReconfigurationInProgress) {
2283         // Don't try fallback during prepare or reconfig.
2284         // If error happens there, it's likely unrecoverable.
2285         return false;
2286     }
2287     if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2288         // if we're switching up, simply cancel and resume old variant
2289         cancelBandwidthSwitch(true /* resume */);
2290         return true;
2291     } else {
2292         // if we're switching down, we're likely about to underflow (if
2293         // not already underflowing). try the lowest viable bandwidth if
2294         // not on that variant already.
2295         ssize_t lowestValid = getLowestValidBandwidthIndex();
2296         if (mCurBandwidthIndex > lowestValid) {
2297             cancelBandwidthSwitch();
2298             changeConfiguration(-1ll, lowestValid);
2299             return true;
2300         }
2301     }
2302     // return false if we couldn't find any fallback
2303     return false;
2304 }
2305 
2306 /*
2307  * returns true if a bandwidth switch is actually needed (and started),
2308  * returns false otherwise
2309  */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2310 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2311     // no need to check bandwidth if we only have 1 bandwidth settings
2312     if (mBandwidthItems.size() < 2) {
2313         return false;
2314     }
2315 
2316     if (mSwitchInProgress) {
2317         if (mBuffering) {
2318             tryBandwidthFallback();
2319         }
2320         return false;
2321     }
2322 
2323     int32_t bandwidthBps, shortTermBps;
2324     bool isStable;
2325     if (mBandwidthEstimator->estimateBandwidth(
2326             &bandwidthBps, &isStable, &shortTermBps)) {
2327         ALOGV("bandwidth estimated at %.2f kbps, "
2328                 "stable %d, shortTermBps %.2f kbps",
2329                 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2330         mLastBandwidthBps = bandwidthBps;
2331         mLastBandwidthStable = isStable;
2332     } else {
2333         ALOGV("no bandwidth estimate.");
2334         return false;
2335     }
2336 
2337     int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2338     // canSwithDown and canSwitchUp can't both be true.
2339     // we only want to switch up when measured bw is 120% higher than current variant,
2340     // and we only want to switch down when measured bw is below current variant.
2341     bool canSwitchDown = bufferLow
2342             && (bandwidthBps < (int32_t)curBandwidth);
2343     bool canSwitchUp = bufferHigh
2344             && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2345 
2346     if (canSwitchDown || canSwitchUp) {
2347         // bandwidth estimating has some delay, if we have to downswitch when
2348         // it hasn't stabilized, use the short term to guess real bandwidth,
2349         // since it may be dropping too fast.
2350         // (note this doesn't apply to upswitch, always use longer average there)
2351         if (!isStable && canSwitchDown) {
2352             if (shortTermBps < bandwidthBps) {
2353                 bandwidthBps = shortTermBps;
2354             }
2355         }
2356 
2357         ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2358 
2359         // it's possible that we're checking for canSwitchUp case, but the returned
2360         // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2361         // of measured bw. In that case we don't want to do anything, since we have
2362         // both enough buffer and enough bw.
2363         if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2364          || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2365             // if not yet prepared, just restart again with new bw index.
2366             // this is faster and playback experience is cleaner.
2367             changeConfiguration(
2368                     mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2369             return true;
2370         }
2371     }
2372     return false;
2373 }
2374 
postError(status_t err)2375 void LiveSession::postError(status_t err) {
2376     // if we reached EOS, notify buffering of 100%
2377     if (err == ERROR_END_OF_STREAM) {
2378         notifyBufferingUpdate(100);
2379     }
2380     // we'll stop buffer polling now, before that notify
2381     // stop buffering to stop the spinning icon
2382     stopBufferingIfNecessary();
2383     cancelPollBuffering();
2384 
2385     sp<AMessage> notify = mNotify->dup();
2386     notify->setInt32("what", kWhatError);
2387     notify->setInt32("err", err);
2388     notify->post();
2389 }
2390 
postPrepared(status_t err)2391 void LiveSession::postPrepared(status_t err) {
2392     CHECK(mInPreparationPhase);
2393 
2394     sp<AMessage> notify = mNotify->dup();
2395     if (err == OK || err == ERROR_END_OF_STREAM) {
2396         notify->setInt32("what", kWhatPrepared);
2397     } else {
2398         cancelPollBuffering();
2399 
2400         notify->setInt32("what", kWhatPreparationFailed);
2401         notify->setInt32("err", err);
2402     }
2403 
2404     notify->post();
2405 
2406     mInPreparationPhase = false;
2407 }
2408 
2409 
2410 }  // namespace android
2411 
2412