1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20 
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25 
26 #include "mpeg2ts/AnotherPacketSource.h"
27 
28 #include <cutils/properties.h>
29 #include <media/MediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37 
38 #include <utils/Mutex.h>
39 
40 #include <ctype.h>
41 #include <inttypes.h>
42 
43 namespace android {
44 
45 // static
46 // Bandwidth Switch Mark Defaults
47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50 const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51 
52 //TODO: redefine this mark to a fair value
53 // default buffer underflow mark
54 static const int kUnderflowMarkMs = 1000;  // 1 second
55 
56 struct LiveSession::BandwidthEstimator : public RefBase {
57     BandwidthEstimator();
58 
59     void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
60     bool estimateBandwidth(
61             int32_t *bandwidth,
62             bool *isStable = NULL,
63             int32_t *shortTermBps = NULL);
64 
65 private:
66     // Bandwidth estimation parameters
67     static const int32_t kShortTermBandwidthItems = 3;
68     static const int32_t kMinBandwidthHistoryItems = 20;
69     static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
70     static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
71     static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
72 
73     struct BandwidthEntry {
74         int64_t mTimestampUs;
75         int64_t mDelayUs;
76         size_t mNumBytes;
77     };
78 
79     Mutex mLock;
80     List<BandwidthEntry> mBandwidthHistory;
81     List<int32_t> mPrevEstimates;
82     int32_t mShortTermEstimate;
83     bool mHasNewSample;
84     bool mIsStable;
85     int64_t mTotalTransferTimeUs;
86     size_t mTotalTransferBytes;
87 
88     DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
89 };
90 
BandwidthEstimator()91 LiveSession::BandwidthEstimator::BandwidthEstimator() :
92     mShortTermEstimate(0),
93     mHasNewSample(false),
94     mIsStable(true),
95     mTotalTransferTimeUs(0),
96     mTotalTransferBytes(0) {
97 }
98 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)99 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
100         size_t numBytes, int64_t delayUs) {
101     AutoMutex autoLock(mLock);
102 
103     int64_t nowUs = ALooper::GetNowUs();
104     BandwidthEntry entry;
105     entry.mTimestampUs = nowUs;
106     entry.mDelayUs = delayUs;
107     entry.mNumBytes = numBytes;
108     mTotalTransferTimeUs += delayUs;
109     mTotalTransferBytes += numBytes;
110     mBandwidthHistory.push_back(entry);
111     mHasNewSample = true;
112 
113     // Remove no more than 10% of total transfer time at a time
114     // to avoid sudden jump on bandwidth estimation. There might
115     // be long blocking reads that takes up signification time,
116     // we have to keep a longer window in that case.
117     int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
118     if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
119         bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
120     } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
121         bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
122     }
123     // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
124     // and total transfer time at least kMaxBandwidthHistoryWindowUs.
125     while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
126         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
127         // remove sample if either absolute age or total transfer time is
128         // over kMaxBandwidthHistoryWindowUs
129         if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
130                 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
131             break;
132         }
133         mTotalTransferTimeUs -= it->mDelayUs;
134         mTotalTransferBytes -= it->mNumBytes;
135         mBandwidthHistory.erase(mBandwidthHistory.begin());
136     }
137 }
138 
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)139 bool LiveSession::BandwidthEstimator::estimateBandwidth(
140         int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
141     AutoMutex autoLock(mLock);
142 
143     if (mBandwidthHistory.size() < 2) {
144         return false;
145     }
146 
147     if (!mHasNewSample) {
148         *bandwidthBps = *(--mPrevEstimates.end());
149         if (isStable) {
150             *isStable = mIsStable;
151         }
152         if (shortTermBps) {
153             *shortTermBps = mShortTermEstimate;
154         }
155         return true;
156     }
157 
158     *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
159     mPrevEstimates.push_back(*bandwidthBps);
160     while (mPrevEstimates.size() > 3) {
161         mPrevEstimates.erase(mPrevEstimates.begin());
162     }
163     mHasNewSample = false;
164 
165     int64_t totalTimeUs = 0;
166     size_t totalBytes = 0;
167     if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
168         List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
169         for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
170             totalTimeUs += it->mDelayUs;
171             totalBytes += it->mNumBytes;
172         }
173     }
174     mShortTermEstimate = totalTimeUs > 0 ?
175             (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
176     if (shortTermBps) {
177         *shortTermBps = mShortTermEstimate;
178     }
179 
180     int64_t minEstimate = -1, maxEstimate = -1;
181     List<int32_t>::iterator it;
182     for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
183         int32_t estimate = *it;
184         if (minEstimate < 0 || minEstimate > estimate) {
185             minEstimate = estimate;
186         }
187         if (maxEstimate < 0 || maxEstimate < estimate) {
188             maxEstimate = estimate;
189         }
190     }
191     // consider it stable if long-term average is not jumping a lot
192     // and short-term average is not much lower than long-term average
193     mIsStable = (maxEstimate <= minEstimate * 4 / 3)
194             && mShortTermEstimate > minEstimate * 7 / 10;
195     if (isStable) {
196         *isStable = mIsStable;
197     }
198 
199 #if 0
200     {
201         char dumpStr[1024] = {0};
202         size_t itemIdx = 0;
203         size_t histSize = mBandwidthHistory.size();
204         sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
205             *bandwidthBps, mIsStable, histSize);
206         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
207         for (; it != mBandwidthHistory.end(); ++it) {
208             if (itemIdx > 50) {
209                 sprintf(dumpStr + strlen(dumpStr),
210                         "...(%zd more items)... }", histSize - itemIdx);
211                 break;
212             }
213             sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
214                 it->mNumBytes / 1024,
215                 (double)it->mDelayUs * 1.0e-6,
216                 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
217             itemIdx++;
218         }
219         ALOGE(dumpStr);
220     }
221 #endif
222     return true;
223 }
224 
225 //static
getKeyForStream(StreamType type)226 const char *LiveSession::getKeyForStream(StreamType type) {
227     switch (type) {
228         case STREAMTYPE_VIDEO:
229             return "timeUsVideo";
230         case STREAMTYPE_AUDIO:
231             return "timeUsAudio";
232         case STREAMTYPE_SUBTITLES:
233             return "timeUsSubtitle";
234         case STREAMTYPE_METADATA:
235             return "timeUsMetadata"; // unused
236         default:
237             TRESPASS();
238     }
239     return NULL;
240 }
241 
242 //static
getNameForStream(StreamType type)243 const char *LiveSession::getNameForStream(StreamType type) {
244     switch (type) {
245         case STREAMTYPE_VIDEO:
246             return "video";
247         case STREAMTYPE_AUDIO:
248             return "audio";
249         case STREAMTYPE_SUBTITLES:
250             return "subs";
251         case STREAMTYPE_METADATA:
252             return "metadata";
253         default:
254             break;
255     }
256     return "unknown";
257 }
258 
259 //static
getSourceTypeForStream(StreamType type)260 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
261     switch (type) {
262         case STREAMTYPE_VIDEO:
263             return ATSParser::VIDEO;
264         case STREAMTYPE_AUDIO:
265             return ATSParser::AUDIO;
266         case STREAMTYPE_METADATA:
267             return ATSParser::META;
268         case STREAMTYPE_SUBTITLES:
269         default:
270             TRESPASS();
271     }
272     return ATSParser::NUM_SOURCE_TYPES; // should not reach here
273 }
274 
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<MediaHTTPService> & httpService)275 LiveSession::LiveSession(
276         const sp<AMessage> &notify, uint32_t flags,
277         const sp<MediaHTTPService> &httpService)
278     : mNotify(notify),
279       mFlags(flags),
280       mHTTPService(httpService),
281       mBuffering(false),
282       mInPreparationPhase(true),
283       mPollBufferingGeneration(0),
284       mPrevBufferPercentage(-1),
285       mCurBandwidthIndex(-1),
286       mOrigBandwidthIndex(-1),
287       mLastBandwidthBps(-1ll),
288       mLastBandwidthStable(false),
289       mBandwidthEstimator(new BandwidthEstimator()),
290       mMaxWidth(720),
291       mMaxHeight(480),
292       mStreamMask(0),
293       mNewStreamMask(0),
294       mSwapMask(0),
295       mSwitchGeneration(0),
296       mSubtitleGeneration(0),
297       mLastDequeuedTimeUs(0ll),
298       mRealTimeBaseUs(0ll),
299       mReconfigurationInProgress(false),
300       mSwitchInProgress(false),
301       mUpSwitchMark(kUpSwitchMarkUs),
302       mDownSwitchMark(kDownSwitchMarkUs),
303       mUpSwitchMargin(kUpSwitchMarginUs),
304       mFirstTimeUsValid(false),
305       mFirstTimeUs(0),
306       mLastSeekTimeUs(0),
307       mHasMetadata(false) {
308     mStreams[kAudioIndex] = StreamItem("audio");
309     mStreams[kVideoIndex] = StreamItem("video");
310     mStreams[kSubtitleIndex] = StreamItem("subtitles");
311 
312     for (size_t i = 0; i < kNumSources; ++i) {
313         mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
314         mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315     }
316 }
317 
~LiveSession()318 LiveSession::~LiveSession() {
319     if (mFetcherLooper != NULL) {
320         mFetcherLooper->stop();
321     }
322 }
323 
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)324 int64_t LiveSession::calculateMediaTimeUs(
325         int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
326     if (timeUs >= firstTimeUs) {
327         timeUs -= firstTimeUs;
328     } else {
329         timeUs = 0;
330     }
331     timeUs += mLastSeekTimeUs;
332     if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
333         timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
334     }
335     return timeUs;
336 }
337 
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)338 status_t LiveSession::dequeueAccessUnit(
339         StreamType stream, sp<ABuffer> *accessUnit) {
340     status_t finalResult = OK;
341     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
342 
343     ssize_t streamIdx = typeToIndex(stream);
344     if (streamIdx < 0) {
345         return BAD_VALUE;
346     }
347     const char *streamStr = getNameForStream(stream);
348     // Do not let client pull data if we don't have data packets yet.
349     // We might only have a format discontinuity queued without data.
350     // When NuPlayerDecoder dequeues the format discontinuity, it will
351     // immediately try to getFormat. If we return NULL, NuPlayerDecoder
352     // thinks it can do seamless change, so will not shutdown decoder.
353     // When the actual format arrives, it can't handle it and get stuck.
354     if (!packetSource->hasDataBufferAvailable(&finalResult)) {
355         ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
356                 streamStr, finalResult);
357 
358         if (finalResult == OK) {
359             return -EAGAIN;
360         } else {
361             return finalResult;
362         }
363     }
364 
365     // Let the client dequeue as long as we have buffers available
366     // Do not make pause/resume decisions here.
367 
368     status_t err = packetSource->dequeueAccessUnit(accessUnit);
369 
370     if (err == INFO_DISCONTINUITY) {
371         // adaptive streaming, discontinuities in the playlist
372         int32_t type;
373         CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
374 
375         sp<AMessage> extra;
376         if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
377             extra.clear();
378         }
379 
380         ALOGI("[%s] read discontinuity of type %d, extra = %s",
381               streamStr,
382               type,
383               extra == NULL ? "NULL" : extra->debugString().c_str());
384     } else if (err == OK) {
385 
386         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
387             int64_t timeUs, originalTimeUs;
388             int32_t discontinuitySeq = 0;
389             StreamItem& strm = mStreams[streamIdx];
390             CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
391             originalTimeUs = timeUs;
392             (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
393             if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
394                 int64_t offsetTimeUs;
395                 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
396                     offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
397                 } else {
398                     offsetTimeUs = 0;
399                 }
400 
401                 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
402                         && strm.mLastDequeuedTimeUs >= 0) {
403                     int64_t firstTimeUs;
404                     firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
405                     offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
406                     offsetTimeUs += strm.mLastSampleDurationUs;
407                 } else {
408                     offsetTimeUs += strm.mLastSampleDurationUs;
409                 }
410 
411                 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
412                 strm.mCurDiscontinuitySeq = discontinuitySeq;
413             }
414 
415             int32_t discard = 0;
416             int64_t firstTimeUs;
417             if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
418                 int64_t durUs; // approximate sample duration
419                 if (timeUs > strm.mLastDequeuedTimeUs) {
420                     durUs = timeUs - strm.mLastDequeuedTimeUs;
421                 } else {
422                     durUs = strm.mLastDequeuedTimeUs - timeUs;
423                 }
424                 strm.mLastSampleDurationUs = durUs;
425                 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
426             } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
427                 firstTimeUs = timeUs;
428             } else {
429                 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
430                 firstTimeUs = timeUs;
431             }
432 
433             strm.mLastDequeuedTimeUs = timeUs;
434             timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
435 
436             ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
437                     streamStr, (long long)timeUs, (long long)originalTimeUs);
438             (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
439             mLastDequeuedTimeUs = timeUs;
440             mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
441         } else if (stream == STREAMTYPE_SUBTITLES) {
442             int32_t subtitleGeneration;
443             if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
444                     && subtitleGeneration != mSubtitleGeneration) {
445                return -EAGAIN;
446             };
447             (*accessUnit)->meta()->setInt32(
448                     "trackIndex", mPlaylist->getSelectedIndex());
449             (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
450         } else if (stream == STREAMTYPE_METADATA) {
451             HLSTime mdTime((*accessUnit)->meta());
452             if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
453                 packetSource->requeueAccessUnit((*accessUnit));
454                 return -EAGAIN;
455             } else {
456                 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
457                 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
458                 (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
459                 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
460             }
461         }
462     } else {
463         ALOGI("[%s] encountered error %d", streamStr, err);
464     }
465 
466     return err;
467 }
468 
getStreamFormatMeta(StreamType stream,sp<MetaData> * meta)469 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
470     if (!(mStreamMask & stream)) {
471         return UNKNOWN_ERROR;
472     }
473 
474     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
475 
476     *meta = packetSource->getFormat();
477 
478     if (*meta == NULL) {
479         return -EWOULDBLOCK;
480     }
481 
482     if (stream == STREAMTYPE_AUDIO) {
483         // set AAC input buffer size to 32K bytes (256kbps x 1sec)
484         (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
485     } else if (stream == STREAMTYPE_VIDEO) {
486         (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
487         (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
488     }
489 
490     return OK;
491 }
492 
getHTTPDownloader()493 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
494     return new HTTPDownloader(mHTTPService, mExtraHeaders);
495 }
496 
setBufferingSettings(const BufferingSettings & buffering)497 void LiveSession::setBufferingSettings(
498         const BufferingSettings &buffering) {
499     sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
500     writeToAMessage(msg, buffering);
501     msg->post();
502 }
503 
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)504 void LiveSession::connectAsync(
505         const char *url, const KeyedVector<String8, String8> *headers) {
506     sp<AMessage> msg = new AMessage(kWhatConnect, this);
507     msg->setString("url", url);
508 
509     if (headers != NULL) {
510         msg->setPointer(
511                 "headers",
512                 new KeyedVector<String8, String8>(*headers));
513     }
514 
515     msg->post();
516 }
517 
disconnect()518 status_t LiveSession::disconnect() {
519     sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
520 
521     sp<AMessage> response;
522     status_t err = msg->postAndAwaitResponse(&response);
523 
524     return err;
525 }
526 
seekTo(int64_t timeUs,MediaPlayerSeekMode mode)527 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
528     sp<AMessage> msg = new AMessage(kWhatSeek, this);
529     msg->setInt64("timeUs", timeUs);
530     msg->setInt32("mode", mode);
531 
532     sp<AMessage> response;
533     status_t err = msg->postAndAwaitResponse(&response);
534 
535     return err;
536 }
537 
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)538 bool LiveSession::checkSwitchProgress(
539         sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
540     AString newUri;
541     CHECK(stopParams->findString("uri", &newUri));
542 
543     *needResumeUntil = false;
544     sp<AMessage> firstNewMeta[kMaxStreams];
545     for (size_t i = 0; i < kMaxStreams; ++i) {
546         StreamType stream = indexToType(i);
547         if (!(mSwapMask & mNewStreamMask & stream)
548             || (mStreams[i].mNewUri != newUri)) {
549             continue;
550         }
551         if (stream == STREAMTYPE_SUBTITLES) {
552             continue;
553         }
554         sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
555 
556         // First, get latest dequeued meta, which is where the decoder is at.
557         // (when upswitching, we take the meta after a certain delay, so that
558         // the decoder is left with some cushion)
559         sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
560         if (delayUs > 0) {
561             lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
562             if (lastDequeueMeta == NULL) {
563                 // this means we don't have enough cushion, try again later
564                 ALOGV("[%s] up switching failed due to insufficient buffer",
565                         getNameForStream(stream));
566                 return false;
567             }
568         } else {
569             // It's okay for lastDequeueMeta to be NULL here, it means the
570             // decoder hasn't even started dequeueing
571             lastDequeueMeta = source->getLatestDequeuedMeta();
572         }
573         // Then, trim off packets at beginning of mPacketSources2 that's before
574         // the latest dequeued time. These samples are definitely too late.
575         firstNewMeta[i] = mPacketSources2.editValueAt(i)
576                             ->trimBuffersBeforeMeta(lastDequeueMeta);
577 
578         // Now firstNewMeta[i] is the first sample after the trim.
579         // If it's NULL, we failed because dequeue already past all samples
580         // in mPacketSource2, we have to try again.
581         if (firstNewMeta[i] == NULL) {
582             HLSTime dequeueTime(lastDequeueMeta);
583             ALOGV("[%s] dequeue time (%d, %lld) past start time",
584                     getNameForStream(stream),
585                     dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
586             return false;
587         }
588 
589         // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
590         // already fetched, and see if we need to resumeUntil
591         lastEnqueueMeta = source->getLatestEnqueuedMeta();
592         // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
593         // boundary, no need to resume as the content will look different anyways
594         if (lastEnqueueMeta != NULL) {
595             HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
596 
597             // no need to resume old fetcher if new fetcher started in different
598             // discontinuity sequence, as the content will look different.
599             *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
600                     && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
601 
602             // update the stopTime for resumeUntil
603             stopParams->setInt32("discontinuitySeq", startTime.mSeq);
604             stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
605         }
606     }
607 
608     // if we're here, it means dequeue progress hasn't passed some samples in
609     // mPacketSource2, we can trim off the excess in mPacketSource.
610     // (old fetcher might still need to resumeUntil the start time of new fetcher)
611     for (size_t i = 0; i < kMaxStreams; ++i) {
612         StreamType stream = indexToType(i);
613         if (!(mSwapMask & mNewStreamMask & stream)
614             || (newUri != mStreams[i].mNewUri)
615             || stream == STREAMTYPE_SUBTITLES) {
616             continue;
617         }
618         mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
619     }
620 
621     // no resumeUntil if already underflow
622     *needResumeUntil &= !mBuffering;
623 
624     return true;
625 }
626 
onMessageReceived(const sp<AMessage> & msg)627 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
628     switch (msg->what()) {
629         case kWhatSetBufferingSettings:
630         {
631             readFromAMessage(msg, &mBufferingSettings);
632             break;
633         }
634 
635         case kWhatConnect:
636         {
637             onConnect(msg);
638             break;
639         }
640 
641         case kWhatDisconnect:
642         {
643             CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
644 
645             if (mReconfigurationInProgress) {
646                 break;
647             }
648 
649             finishDisconnect();
650             break;
651         }
652 
653         case kWhatSeek:
654         {
655             if (mReconfigurationInProgress) {
656                 msg->post(50000);
657                 break;
658             }
659 
660             CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
661             mSeekReply = new AMessage;
662 
663             onSeek(msg);
664             break;
665         }
666 
667         case kWhatFetcherNotify:
668         {
669             int32_t what;
670             CHECK(msg->findInt32("what", &what));
671 
672             switch (what) {
673                 case PlaylistFetcher::kWhatStarted:
674                     break;
675                 case PlaylistFetcher::kWhatPaused:
676                 case PlaylistFetcher::kWhatStopped:
677                 {
678                     AString uri;
679                     CHECK(msg->findString("uri", &uri));
680                     ssize_t index = mFetcherInfos.indexOfKey(uri);
681                     if (index < 0) {
682                         // ignore msgs from fetchers that's already gone
683                         break;
684                     }
685 
686                     ALOGV("fetcher-%d %s",
687                             mFetcherInfos[index].mFetcher->getFetcherID(),
688                             what == PlaylistFetcher::kWhatPaused ?
689                                     "paused" : "stopped");
690 
691                     if (what == PlaylistFetcher::kWhatStopped) {
692                         mFetcherLooper->unregisterHandler(
693                                 mFetcherInfos[index].mFetcher->id());
694                         mFetcherInfos.removeItemsAt(index);
695                     } else if (what == PlaylistFetcher::kWhatPaused) {
696                         int32_t seekMode;
697                         CHECK(msg->findInt32("seekMode", &seekMode));
698                         for (size_t i = 0; i < kMaxStreams; ++i) {
699                             if (mStreams[i].mUri == uri) {
700                                 mStreams[i].mSeekMode = (SeekMode) seekMode;
701                             }
702                         }
703                     }
704 
705                     if (mContinuation != NULL) {
706                         CHECK_GT(mContinuationCounter, 0u);
707                         if (--mContinuationCounter == 0) {
708                             mContinuation->post();
709                         }
710                         ALOGV("%zu fetcher(s) left", mContinuationCounter);
711                     }
712                     break;
713                 }
714 
715                 case PlaylistFetcher::kWhatDurationUpdate:
716                 {
717                     AString uri;
718                     CHECK(msg->findString("uri", &uri));
719 
720                     int64_t durationUs;
721                     CHECK(msg->findInt64("durationUs", &durationUs));
722 
723                     ssize_t index = mFetcherInfos.indexOfKey(uri);
724                     if (index >= 0) {
725                         FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
726                         info->mDurationUs = durationUs;
727                     }
728                     break;
729                 }
730 
731                 case PlaylistFetcher::kWhatTargetDurationUpdate:
732                 {
733                     int64_t targetDurationUs;
734                     CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
735                     mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
736                     mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
737                     mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
738                     break;
739                 }
740 
741                 case PlaylistFetcher::kWhatError:
742                 {
743                     status_t err;
744                     CHECK(msg->findInt32("err", &err));
745 
746                     ALOGE("XXX Received error %d from PlaylistFetcher.", err);
747 
748                     // handle EOS on subtitle tracks independently
749                     AString uri;
750                     if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
751                         ssize_t i = mFetcherInfos.indexOfKey(uri);
752                         if (i >= 0) {
753                             const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
754                             if (fetcher != NULL) {
755                                 uint32_t type = fetcher->getStreamTypeMask();
756                                 if (type == STREAMTYPE_SUBTITLES) {
757                                     mPacketSources.valueFor(
758                                             STREAMTYPE_SUBTITLES)->signalEOS(err);;
759                                     break;
760                                 }
761                             }
762                         }
763                     }
764 
765                     // remember the failure index (as mCurBandwidthIndex will be restored
766                     // after cancelBandwidthSwitch()), and record last fail time
767                     size_t failureIndex = mCurBandwidthIndex;
768                     mBandwidthItems.editItemAt(
769                             failureIndex).mLastFailureUs = ALooper::GetNowUs();
770 
771                     if (mSwitchInProgress) {
772                         // if error happened when we switch to a variant, try fallback
773                         // to other variant to save the session
774                         if (tryBandwidthFallback()) {
775                             break;
776                         }
777                     }
778 
779                     if (mInPreparationPhase) {
780                         postPrepared(err);
781                     }
782 
783                     cancelBandwidthSwitch();
784 
785                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
786 
787                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
788 
789                     mPacketSources.valueFor(
790                             STREAMTYPE_SUBTITLES)->signalEOS(err);
791 
792                     postError(err);
793                     break;
794                 }
795 
796                 case PlaylistFetcher::kWhatStopReached:
797                 {
798                     ALOGV("kWhatStopReached");
799 
800                     AString oldUri;
801                     CHECK(msg->findString("uri", &oldUri));
802 
803                     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
804                     if (index < 0) {
805                         break;
806                     }
807 
808                     tryToFinishBandwidthSwitch(oldUri);
809                     break;
810                 }
811 
812                 case PlaylistFetcher::kWhatStartedAt:
813                 {
814                     int32_t switchGeneration;
815                     CHECK(msg->findInt32("switchGeneration", &switchGeneration));
816 
817                     ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
818                             switchGeneration, mSwitchGeneration);
819 
820                     if (switchGeneration != mSwitchGeneration) {
821                         break;
822                     }
823 
824                     AString uri;
825                     CHECK(msg->findString("uri", &uri));
826 
827                     // mark new fetcher mToBeResumed
828                     ssize_t index = mFetcherInfos.indexOfKey(uri);
829                     if (index >= 0) {
830                         mFetcherInfos.editValueAt(index).mToBeResumed = true;
831                     }
832 
833                     // temporarily disable packet sources to be swapped to prevent
834                     // NuPlayerDecoder from dequeuing while we check progress
835                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
836                         if ((mSwapMask & mPacketSources.keyAt(i))
837                                 && uri == mStreams[i].mNewUri) {
838                             mPacketSources.editValueAt(i)->enable(false);
839                         }
840                     }
841                     bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
842                     // If switching up, require a cushion bigger than kUnderflowMark
843                     // to avoid buffering immediately after the switch.
844                     // (If we don't have that cushion we'd rather cancel and try again.)
845                     int64_t delayUs =
846                         switchUp ?
847                             (kUnderflowMarkMs * 1000ll + 1000000ll)
848                             : 0;
849                     bool needResumeUntil = false;
850                     sp<AMessage> stopParams = msg;
851                     if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
852                         // playback time hasn't passed startAt time
853                         if (!needResumeUntil) {
854                             ALOGV("finish switch");
855                             for (size_t i = 0; i < kMaxStreams; ++i) {
856                                 if ((mSwapMask & indexToType(i))
857                                         && uri == mStreams[i].mNewUri) {
858                                     // have to make a copy of mStreams[i].mUri because
859                                     // tryToFinishBandwidthSwitch is modifying mStreams[]
860                                     AString oldURI = mStreams[i].mUri;
861                                     tryToFinishBandwidthSwitch(oldURI);
862                                     break;
863                                 }
864                             }
865                         } else {
866                             // startAt time is after last enqueue time
867                             // Resume fetcher for the original variant; the resumed fetcher should
868                             // continue until the timestamps found in msg, which is stored by the
869                             // new fetcher to indicate where the new variant has started buffering.
870                             ALOGV("finish switch with resumeUntilAsync");
871                             for (size_t i = 0; i < mFetcherInfos.size(); i++) {
872                                 const FetcherInfo &info = mFetcherInfos.valueAt(i);
873                                 if (info.mToBeRemoved) {
874                                     info.mFetcher->resumeUntilAsync(stopParams);
875                                 }
876                             }
877                         }
878                     } else {
879                         // playback time passed startAt time
880                         if (switchUp) {
881                             // if switching up, cancel and retry if condition satisfies again
882                             ALOGV("cancel up switch because we're too late");
883                             cancelBandwidthSwitch(true /* resume */);
884                         } else {
885                             ALOGV("retry down switch at next sample");
886                             resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
887                         }
888                     }
889                     // re-enable all packet sources
890                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
891                         mPacketSources.editValueAt(i)->enable(true);
892                     }
893 
894                     break;
895                 }
896 
897                 case PlaylistFetcher::kWhatPlaylistFetched:
898                 {
899                     onMasterPlaylistFetched(msg);
900                     break;
901                 }
902 
903                 case PlaylistFetcher::kWhatMetadataDetected:
904                 {
905                     if (!mHasMetadata) {
906                         mHasMetadata = true;
907                         sp<AMessage> notify = mNotify->dup();
908                         notify->setInt32("what", kWhatMetadataDetected);
909                         notify->post();
910                     }
911                     break;
912                 }
913 
914                 default:
915                     TRESPASS();
916             }
917 
918             break;
919         }
920 
921         case kWhatChangeConfiguration:
922         {
923             onChangeConfiguration(msg);
924             break;
925         }
926 
927         case kWhatChangeConfiguration2:
928         {
929             onChangeConfiguration2(msg);
930             break;
931         }
932 
933         case kWhatChangeConfiguration3:
934         {
935             onChangeConfiguration3(msg);
936             break;
937         }
938 
939         case kWhatPollBuffering:
940         {
941             int32_t generation;
942             CHECK(msg->findInt32("generation", &generation));
943             if (generation == mPollBufferingGeneration) {
944                 onPollBuffering();
945             }
946             break;
947         }
948 
949         default:
950             TRESPASS();
951             break;
952     }
953 }
954 
955 // static
isBandwidthValid(const BandwidthItem & item)956 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
957     static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
958     return item.mLastFailureUs < 0
959             || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
960 }
961 
962 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)963 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
964     if (a->mBandwidth < b->mBandwidth) {
965         return -1;
966     } else if (a->mBandwidth == b->mBandwidth) {
967         return 0;
968     }
969 
970     return 1;
971 }
972 
973 // static
indexToType(int idx)974 LiveSession::StreamType LiveSession::indexToType(int idx) {
975     CHECK(idx >= 0 && idx < kNumSources);
976     return (StreamType)(1 << idx);
977 }
978 
979 // static
typeToIndex(int32_t type)980 ssize_t LiveSession::typeToIndex(int32_t type) {
981     switch (type) {
982         case STREAMTYPE_AUDIO:
983             return 0;
984         case STREAMTYPE_VIDEO:
985             return 1;
986         case STREAMTYPE_SUBTITLES:
987             return 2;
988         case STREAMTYPE_METADATA:
989             return 3;
990         default:
991             return -1;
992     };
993     return -1;
994 }
995 
onConnect(const sp<AMessage> & msg)996 void LiveSession::onConnect(const sp<AMessage> &msg) {
997     CHECK(msg->findString("url", &mMasterURL));
998 
999     // TODO currently we don't know if we are coming here from incognito mode
1000     ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
1001 
1002     KeyedVector<String8, String8> *headers = NULL;
1003     if (!msg->findPointer("headers", (void **)&headers)) {
1004         mExtraHeaders.clear();
1005     } else {
1006         mExtraHeaders = *headers;
1007 
1008         delete headers;
1009         headers = NULL;
1010     }
1011 
1012     // create looper for fetchers
1013     if (mFetcherLooper == NULL) {
1014         mFetcherLooper = new ALooper();
1015 
1016         mFetcherLooper->setName("Fetcher");
1017         mFetcherLooper->start(false, /* runOnCallingThread */
1018                               true  /* canCallJava */);
1019     }
1020 
1021     // create fetcher to fetch the master playlist
1022     addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1023 }
1024 
onMasterPlaylistFetched(const sp<AMessage> & msg)1025 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1026     AString uri;
1027     CHECK(msg->findString("uri", &uri));
1028     ssize_t index = mFetcherInfos.indexOfKey(uri);
1029     if (index < 0) {
1030         ALOGW("fetcher for master playlist is gone.");
1031         return;
1032     }
1033 
1034     // no longer useful, remove
1035     mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1036     mFetcherInfos.removeItemsAt(index);
1037 
1038     CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1039     if (mPlaylist == NULL) {
1040         ALOGE("unable to fetch master playlist %s.",
1041                 uriDebugString(mMasterURL).c_str());
1042 
1043         postPrepared(ERROR_IO);
1044         return;
1045     }
1046     // We trust the content provider to make a reasonable choice of preferred
1047     // initial bandwidth by listing it first in the variant playlist.
1048     // At startup we really don't have a good estimate on the available
1049     // network bandwidth since we haven't tranferred any data yet. Once
1050     // we have we can make a better informed choice.
1051     size_t initialBandwidth = 0;
1052     size_t initialBandwidthIndex = 0;
1053 
1054     int32_t maxWidth = 0;
1055     int32_t maxHeight = 0;
1056 
1057     if (mPlaylist->isVariantPlaylist()) {
1058         Vector<BandwidthItem> itemsWithVideo;
1059         for (size_t i = 0; i < mPlaylist->size(); ++i) {
1060             BandwidthItem item;
1061 
1062             item.mPlaylistIndex = i;
1063             item.mLastFailureUs = -1ll;
1064 
1065             sp<AMessage> meta;
1066             AString uri;
1067             mPlaylist->itemAt(i, &uri, &meta);
1068 
1069             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1070 
1071             int32_t width, height;
1072             if (meta->findInt32("width", &width)) {
1073                 maxWidth = max(maxWidth, width);
1074             }
1075             if (meta->findInt32("height", &height)) {
1076                 maxHeight = max(maxHeight, height);
1077             }
1078 
1079             mBandwidthItems.push(item);
1080             if (mPlaylist->hasType(i, "video")) {
1081                 itemsWithVideo.push(item);
1082             }
1083         }
1084         // remove the audio-only variants if we have at least one with video
1085         if (!itemsWithVideo.empty()
1086                 && itemsWithVideo.size() < mBandwidthItems.size()) {
1087             mBandwidthItems.clear();
1088             for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1089                 mBandwidthItems.push(itemsWithVideo[i]);
1090             }
1091         }
1092 
1093         CHECK_GT(mBandwidthItems.size(), 0u);
1094         initialBandwidth = mBandwidthItems[0].mBandwidth;
1095 
1096         mBandwidthItems.sort(SortByBandwidth);
1097 
1098         for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1099             if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1100                 initialBandwidthIndex = i;
1101                 break;
1102             }
1103         }
1104     } else {
1105         // dummy item.
1106         BandwidthItem item;
1107         item.mPlaylistIndex = 0;
1108         item.mBandwidth = 0;
1109         mBandwidthItems.push(item);
1110     }
1111 
1112     mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1113     mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1114 
1115     mPlaylist->pickRandomMediaItems();
1116     changeConfiguration(
1117             0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1118 }
1119 
finishDisconnect()1120 void LiveSession::finishDisconnect() {
1121     ALOGV("finishDisconnect");
1122 
1123     // No reconfiguration is currently pending, make sure none will trigger
1124     // during disconnection either.
1125     cancelBandwidthSwitch();
1126 
1127     // cancel buffer polling
1128     cancelPollBuffering();
1129 
1130     // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1131     //
1132     // Some fetchers might be stuck in connect/getSize at this point. These
1133     // operations will eventually timeout (as we have a timeout set in
1134     // MediaHTTPConnection), but we don't want to block the main UI thread
1135     // until then. Here we just need to make sure we clear all references
1136     // to the fetchers, so that when they finally exit from the blocking
1137     // operation, they can be destructed.
1138     //
1139     // There is one very tricky point though. For this scheme to work, the
1140     // fecther must hold a reference to LiveSession, so that LiveSession is
1141     // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1142     // own destructor when it waits for mFetcherLooper to stop, which still
1143     // blocks main UI thread.
1144     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1145         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1146         mFetcherLooper->unregisterHandler(
1147                 mFetcherInfos.valueAt(i).mFetcher->id());
1148     }
1149     mFetcherInfos.clear();
1150 
1151     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1152     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1153 
1154     mPacketSources.valueFor(
1155             STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1156 
1157     sp<AMessage> response = new AMessage;
1158     response->setInt32("err", OK);
1159 
1160     response->postReply(mDisconnectReplyID);
1161     mDisconnectReplyID.clear();
1162 }
1163 
addFetcher(const char * uri)1164 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1165     ssize_t index = mFetcherInfos.indexOfKey(uri);
1166 
1167     if (index >= 0) {
1168         return NULL;
1169     }
1170 
1171     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1172     notify->setString("uri", uri);
1173     notify->setInt32("switchGeneration", mSwitchGeneration);
1174 
1175     FetcherInfo info;
1176     info.mFetcher = new PlaylistFetcher(
1177             notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1178     info.mDurationUs = -1ll;
1179     info.mToBeRemoved = false;
1180     info.mToBeResumed = false;
1181     mFetcherLooper->registerHandler(info.mFetcher);
1182 
1183     mFetcherInfos.add(uri, info);
1184 
1185     return info.mFetcher;
1186 }
1187 
1188 #if 0
1189 static double uniformRand() {
1190     return (double)rand() / RAND_MAX;
1191 }
1192 #endif
1193 
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1194 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1195     ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1196             newUri ? "true" : "false",
1197             newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1198     return i >= 0
1199             && ((!newUri && uri == mStreams[i].mUri)
1200             || (newUri && uri == mStreams[i].mNewUri));
1201 }
1202 
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1203 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1204         size_t trackIndex, bool newUri) {
1205     StreamType type = indexToType(trackIndex);
1206     sp<AnotherPacketSource> source = NULL;
1207     if (newUri) {
1208         source = mPacketSources2.valueFor(type);
1209         source->clear();
1210     } else {
1211         source = mPacketSources.valueFor(type);
1212     };
1213     return source;
1214 }
1215 
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1216 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1217         sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1218     // todo: One case where the following strategy can fail is when audio and video
1219     // are in separate playlists, both are transport streams, and the metadata
1220     // is actually contained in the audio stream.
1221     ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1222             streamMask, newUri ? "true" : "false");
1223 
1224     if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1225             || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1226             // ... audio fetcher for audio only variant
1227         return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1228     }
1229 
1230     return NULL;
1231 }
1232 
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1233 bool LiveSession::resumeFetcher(
1234         const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1235     ssize_t index = mFetcherInfos.indexOfKey(uri);
1236     if (index < 0) {
1237         ALOGE("did not find fetcher for uri: %s", uri.c_str());
1238         return false;
1239     }
1240 
1241     bool resume = false;
1242     sp<AnotherPacketSource> sources[kNumSources];
1243     for (size_t i = 0; i < kMaxStreams; ++i) {
1244         if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1245             resume = true;
1246             sources[i] = getPacketSourceForStreamIndex(i, newUri);
1247         }
1248     }
1249 
1250     if (resume) {
1251         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1252         SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1253 
1254         ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1255                 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1256 
1257         fetcher->startAsync(
1258                 sources[kAudioIndex],
1259                 sources[kVideoIndex],
1260                 sources[kSubtitleIndex],
1261                 getMetadataSource(sources, streamMask, newUri),
1262                 timeUs, -1, -1, seekMode);
1263     }
1264 
1265     return resume;
1266 }
1267 
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1268 float LiveSession::getAbortThreshold(
1269         ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1270     float abortThreshold = -1.0f;
1271     if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1272         /*
1273            If we're switching down, we need to decide whether to
1274 
1275            1) finish last segment of high-bandwidth variant, or
1276            2) abort last segment of high-bandwidth variant, and fetch an
1277               overlapping portion from low-bandwidth variant.
1278 
1279            Here we try to maximize the amount of buffer left when the
1280            switch point is met. Given the following parameters:
1281 
1282            B: our current buffering level in seconds
1283            T: target duration in seconds
1284            X: sample duration in seconds remain to fetch in last segment
1285            bw0: bandwidth of old variant (as specified in playlist)
1286            bw1: bandwidth of new variant (as specified in playlist)
1287            bw: measured bandwidth available
1288 
1289            If we choose 1), when switch happens at the end of current
1290            segment, our buffering will be
1291                   B + X - X * bw0 / bw
1292 
1293            If we choose 2), when switch happens where we aborted current
1294            segment, our buffering will be
1295                   B - (T - X) * bw1 / bw
1296 
1297            We should only choose 1) if
1298                   X/T < bw1 / (bw1 + bw0 - bw)
1299         */
1300 
1301         // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1302         // our estimate could be far off, and fetching old bandwidth could
1303         // take too long.
1304         if (!mLastBandwidthStable) {
1305             return 0.0f;
1306         }
1307 
1308         // Taking the measured current bandwidth at 50% face value only,
1309         // as our bandwidth estimation is a lagging indicator. Being
1310         // conservative on this, we prefer switching to lower bandwidth
1311         // unless we're really confident finishing up the last segment
1312         // of higher bandwidth will be fast.
1313         CHECK(mLastBandwidthBps >= 0);
1314         abortThreshold =
1315                 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1316              / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1317               + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1318               - (float)mLastBandwidthBps * 0.5f);
1319         if (abortThreshold < 0.0f) {
1320             abortThreshold = -1.0f; // do not abort
1321         }
1322         ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1323                 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1324                 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1325                 mLastBandwidthBps,
1326                 abortThreshold);
1327     }
1328     return abortThreshold;
1329 }
1330 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1331 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1332     mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1333 }
1334 
getLowestValidBandwidthIndex() const1335 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1336     for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1337         if (isBandwidthValid(mBandwidthItems[index])) {
1338             return index;
1339         }
1340     }
1341     // if playlists are all blacklisted, return 0 and hope it's alive
1342     return 0;
1343 }
1344 
getBandwidthIndex(int32_t bandwidthBps)1345 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1346     if (mBandwidthItems.size() < 2) {
1347         // shouldn't be here if we only have 1 bandwidth, check
1348         // logic to get rid of redundant bandwidth polling
1349         ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1350         return 0;
1351     }
1352 
1353 #if 1
1354     char value[PROPERTY_VALUE_MAX];
1355     ssize_t index = -1;
1356     if (property_get("media.httplive.bw-index", value, NULL)) {
1357         char *end;
1358         index = strtol(value, &end, 10);
1359         CHECK(end > value && *end == '\0');
1360 
1361         if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1362             index = mBandwidthItems.size() - 1;
1363         }
1364     }
1365 
1366     if (index < 0) {
1367         char value[PROPERTY_VALUE_MAX];
1368         if (property_get("media.httplive.max-bw", value, NULL)) {
1369             char *end;
1370             long maxBw = strtoul(value, &end, 10);
1371             if (end > value && *end == '\0') {
1372                 if (maxBw > 0 && bandwidthBps > maxBw) {
1373                     ALOGV("bandwidth capped to %ld bps", maxBw);
1374                     bandwidthBps = maxBw;
1375                 }
1376             }
1377         }
1378 
1379         // Pick the highest bandwidth stream that's not currently blacklisted
1380         // below or equal to estimated bandwidth.
1381 
1382         index = mBandwidthItems.size() - 1;
1383         ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1384         while (index > lowestBandwidth) {
1385             // be conservative (70%) to avoid overestimating and immediately
1386             // switching down again.
1387             size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1388             const BandwidthItem &item = mBandwidthItems[index];
1389             if (item.mBandwidth <= adjustedBandwidthBps
1390                     && isBandwidthValid(item)) {
1391                 break;
1392             }
1393             --index;
1394         }
1395     }
1396 #elif 0
1397     // Change bandwidth at random()
1398     size_t index = uniformRand() * mBandwidthItems.size();
1399 #elif 0
1400     // There's a 50% chance to stay on the current bandwidth and
1401     // a 50% chance to switch to the next higher bandwidth (wrapping around
1402     // to lowest)
1403     const size_t kMinIndex = 0;
1404 
1405     static ssize_t mCurBandwidthIndex = -1;
1406 
1407     size_t index;
1408     if (mCurBandwidthIndex < 0) {
1409         index = kMinIndex;
1410     } else if (uniformRand() < 0.5) {
1411         index = (size_t)mCurBandwidthIndex;
1412     } else {
1413         index = mCurBandwidthIndex + 1;
1414         if (index == mBandwidthItems.size()) {
1415             index = kMinIndex;
1416         }
1417     }
1418     mCurBandwidthIndex = index;
1419 #elif 0
1420     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1421 
1422     size_t index = mBandwidthItems.size() - 1;
1423     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1424         --index;
1425     }
1426 #elif 1
1427     char value[PROPERTY_VALUE_MAX];
1428     size_t index;
1429     if (property_get("media.httplive.bw-index", value, NULL)) {
1430         char *end;
1431         index = strtoul(value, &end, 10);
1432         CHECK(end > value && *end == '\0');
1433 
1434         if (index >= mBandwidthItems.size()) {
1435             index = mBandwidthItems.size() - 1;
1436         }
1437     } else {
1438         index = 0;
1439     }
1440 #else
1441     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1442 #endif
1443 
1444     CHECK_GE(index, 0);
1445 
1446     return index;
1447 }
1448 
latestMediaSegmentStartTime() const1449 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1450     HLSTime audioTime(mPacketSources.valueFor(
1451                     STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1452 
1453     HLSTime videoTime(mPacketSources.valueFor(
1454                     STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1455 
1456     return audioTime < videoTime ? videoTime : audioTime;
1457 }
1458 
onSeek(const sp<AMessage> & msg)1459 void LiveSession::onSeek(const sp<AMessage> &msg) {
1460     int64_t timeUs;
1461     int32_t mode;
1462     CHECK(msg->findInt64("timeUs", &timeUs));
1463     CHECK(msg->findInt32("mode", &mode));
1464     // TODO: add "mode" to changeConfiguration.
1465     changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1466 }
1467 
getDuration(int64_t * durationUs) const1468 status_t LiveSession::getDuration(int64_t *durationUs) const {
1469     int64_t maxDurationUs = -1ll;
1470     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1471         int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1472 
1473         if (fetcherDurationUs > maxDurationUs) {
1474             maxDurationUs = fetcherDurationUs;
1475         }
1476     }
1477 
1478     *durationUs = maxDurationUs;
1479 
1480     return OK;
1481 }
1482 
isSeekable() const1483 bool LiveSession::isSeekable() const {
1484     int64_t durationUs;
1485     return getDuration(&durationUs) == OK && durationUs >= 0;
1486 }
1487 
hasDynamicDuration() const1488 bool LiveSession::hasDynamicDuration() const {
1489     return false;
1490 }
1491 
getTrackCount() const1492 size_t LiveSession::getTrackCount() const {
1493     if (mPlaylist == NULL) {
1494         return 0;
1495     } else {
1496         return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1497     }
1498 }
1499 
getTrackInfo(size_t trackIndex) const1500 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1501     if (mPlaylist == NULL) {
1502         return NULL;
1503     } else {
1504         if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1505             sp<AMessage> format = new AMessage();
1506             format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1507             format->setString("language", "und");
1508             format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1509             return format;
1510         }
1511         return mPlaylist->getTrackInfo(trackIndex);
1512     }
1513 }
1514 
selectTrack(size_t index,bool select)1515 status_t LiveSession::selectTrack(size_t index, bool select) {
1516     if (mPlaylist == NULL) {
1517         return INVALID_OPERATION;
1518     }
1519 
1520     ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1521             index, select, mSubtitleGeneration);
1522 
1523     ++mSubtitleGeneration;
1524     status_t err = mPlaylist->selectTrack(index, select);
1525     if (err == OK) {
1526         sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1527         msg->setInt32("pickTrack", select);
1528         msg->post();
1529     }
1530     return err;
1531 }
1532 
getSelectedTrack(media_track_type type) const1533 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1534     if (mPlaylist == NULL) {
1535         return -1;
1536     } else {
1537         return mPlaylist->getSelectedTrack(type);
1538     }
1539 }
1540 
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1541 void LiveSession::changeConfiguration(
1542         int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1543     ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1544           (long long)timeUs, bandwidthIndex, pickTrack);
1545 
1546     cancelBandwidthSwitch();
1547 
1548     CHECK(!mReconfigurationInProgress);
1549     mReconfigurationInProgress = true;
1550     if (bandwidthIndex >= 0) {
1551         mOrigBandwidthIndex = mCurBandwidthIndex;
1552         mCurBandwidthIndex = bandwidthIndex;
1553         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1554             ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1555                     mOrigBandwidthIndex, mCurBandwidthIndex);
1556         }
1557     }
1558     CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size());
1559     const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1560 
1561     uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1562     uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1563 
1564     AString URIs[kMaxStreams];
1565     for (size_t i = 0; i < kMaxStreams; ++i) {
1566         if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1567             streamMask |= indexToType(i);
1568         }
1569     }
1570 
1571     // Step 1, stop and discard fetchers that are no longer needed.
1572     // Pause those that we'll reuse.
1573     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1574         // skip fetchers that are marked mToBeRemoved,
1575         // these are done and can't be reused
1576         if (mFetcherInfos[i].mToBeRemoved) {
1577             continue;
1578         }
1579 
1580         const AString &uri = mFetcherInfos.keyAt(i);
1581         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1582 
1583         bool discardFetcher = true, delayRemoval = false;
1584         for (size_t j = 0; j < kMaxStreams; ++j) {
1585             StreamType type = indexToType(j);
1586             if ((streamMask & type) && uri == URIs[j]) {
1587                 resumeMask |= type;
1588                 streamMask &= ~type;
1589                 discardFetcher = false;
1590             }
1591         }
1592         // Delay fetcher removal if not picking tracks, AND old fetcher
1593         // has stream mask that overlaps new variant. (Okay to discard
1594         // old fetcher now, if completely no overlap.)
1595         if (discardFetcher && timeUs < 0ll && !pickTrack
1596                 && (fetcher->getStreamTypeMask() & streamMask)) {
1597             discardFetcher = false;
1598             delayRemoval = true;
1599         }
1600 
1601         if (discardFetcher) {
1602             ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1603             fetcher->stopAsync();
1604         } else {
1605             float threshold = 0.0f; // default to pause after current block (47Kbytes)
1606             bool disconnect = false;
1607             if (timeUs >= 0ll) {
1608                 // seeking, no need to finish fetching
1609                 disconnect = true;
1610             } else if (delayRemoval) {
1611                 // adapting, abort if remaining of current segment is over threshold
1612                 threshold = getAbortThreshold(
1613                         mOrigBandwidthIndex, mCurBandwidthIndex);
1614             }
1615 
1616             ALOGV("pausing fetcher-%d, threshold=%.2f",
1617                     fetcher->getFetcherID(), threshold);
1618             fetcher->pauseAsync(threshold, disconnect);
1619         }
1620     }
1621 
1622     sp<AMessage> msg;
1623     if (timeUs < 0ll) {
1624         // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1625         msg = new AMessage(kWhatChangeConfiguration3, this);
1626     } else {
1627         msg = new AMessage(kWhatChangeConfiguration2, this);
1628     }
1629     msg->setInt32("streamMask", streamMask);
1630     msg->setInt32("resumeMask", resumeMask);
1631     msg->setInt32("pickTrack", pickTrack);
1632     msg->setInt64("timeUs", timeUs);
1633     for (size_t i = 0; i < kMaxStreams; ++i) {
1634         if ((streamMask | resumeMask) & indexToType(i)) {
1635             msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1636         }
1637     }
1638 
1639     // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1640     // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1641     // fetchers have completed their asynchronous operation, we'll post
1642     // mContinuation, which then is handled below in onChangeConfiguration2.
1643     mContinuationCounter = mFetcherInfos.size();
1644     mContinuation = msg;
1645 
1646     if (mContinuationCounter == 0) {
1647         msg->post();
1648     }
1649 }
1650 
onChangeConfiguration(const sp<AMessage> & msg)1651 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1652     ALOGV("onChangeConfiguration");
1653 
1654     if (!mReconfigurationInProgress) {
1655         int32_t pickTrack = 0;
1656         msg->findInt32("pickTrack", &pickTrack);
1657         changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1658     } else {
1659         msg->post(1000000ll); // retry in 1 sec
1660     }
1661 }
1662 
onChangeConfiguration2(const sp<AMessage> & msg)1663 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1664     ALOGV("onChangeConfiguration2");
1665 
1666     mContinuation.clear();
1667 
1668     // All fetchers are either suspended or have been removed now.
1669 
1670     // If we're seeking, clear all packet sources before we report
1671     // seek complete, to prevent decoder from pulling stale data.
1672     int64_t timeUs;
1673     CHECK(msg->findInt64("timeUs", &timeUs));
1674 
1675     if (timeUs >= 0) {
1676         mLastSeekTimeUs = timeUs;
1677         mLastDequeuedTimeUs = timeUs;
1678 
1679         for (size_t i = 0; i < mPacketSources.size(); i++) {
1680             sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1681             sp<MetaData> format = packetSource->getFormat();
1682             packetSource->clear();
1683             // Set a tentative format here such that HTTPLiveSource will always have
1684             // a format available when NuPlayer queries. Without an available video
1685             // format when setting a surface NuPlayer might disable video decoding
1686             // altogether. The tentative format will be overwritten by the
1687             // authoritative (and possibly same) format once content from the new
1688             // position is dequeued.
1689             packetSource->setFormat(format);
1690         }
1691 
1692         for (size_t i = 0; i < kMaxStreams; ++i) {
1693             mStreams[i].reset();
1694         }
1695 
1696         mDiscontinuityOffsetTimesUs.clear();
1697         mDiscontinuityAbsStartTimesUs.clear();
1698 
1699         if (mSeekReplyID != NULL) {
1700             CHECK(mSeekReply != NULL);
1701             mSeekReply->setInt32("err", OK);
1702             mSeekReply->postReply(mSeekReplyID);
1703             mSeekReplyID.clear();
1704             mSeekReply.clear();
1705         }
1706 
1707         // restart buffer polling after seek becauese previous
1708         // buffering position is no longer valid.
1709         restartPollBuffering();
1710     }
1711 
1712     uint32_t streamMask, resumeMask;
1713     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1714     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1715 
1716     streamMask |= resumeMask;
1717 
1718     AString URIs[kMaxStreams];
1719     for (size_t i = 0; i < kMaxStreams; ++i) {
1720         if (streamMask & indexToType(i)) {
1721             const AString &uriKey = mStreams[i].uriKey();
1722             CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1723             ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1724         }
1725     }
1726 
1727     uint32_t changedMask = 0;
1728     for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1729         // stream URI could change even if onChangeConfiguration2 is only
1730         // used for seek. Seek could happen during a bw switch, in this
1731         // case bw switch will be cancelled, but the seekTo position will
1732         // fetch from the new URI.
1733         if ((mStreamMask & streamMask & indexToType(i))
1734                 && !mStreams[i].mUri.empty()
1735                 && !(URIs[i] == mStreams[i].mUri)) {
1736             ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1737                     mStreams[i].mUri.c_str(), URIs[i].c_str());
1738             sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1739             if (source->getLatestDequeuedMeta() != NULL) {
1740                 source->queueDiscontinuity(
1741                         ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1742             }
1743         }
1744         // Determine which decoders to shutdown on the player side,
1745         // a decoder has to be shutdown if its streamtype was active
1746         // before but now longer isn't.
1747         if ((mStreamMask & ~streamMask & indexToType(i))) {
1748             changedMask |= indexToType(i);
1749         }
1750     }
1751 
1752     if (changedMask == 0) {
1753         // If nothing changed as far as the audio/video decoders
1754         // are concerned we can proceed.
1755         onChangeConfiguration3(msg);
1756         return;
1757     }
1758 
1759     // Something changed, inform the player which will shutdown the
1760     // corresponding decoders and will post the reply once that's done.
1761     // Handling the reply will continue executing below in
1762     // onChangeConfiguration3.
1763     sp<AMessage> notify = mNotify->dup();
1764     notify->setInt32("what", kWhatStreamsChanged);
1765     notify->setInt32("changedMask", changedMask);
1766 
1767     msg->setWhat(kWhatChangeConfiguration3);
1768     msg->setTarget(this);
1769 
1770     notify->setMessage("reply", msg);
1771     notify->post();
1772 }
1773 
onChangeConfiguration3(const sp<AMessage> & msg)1774 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1775     mContinuation.clear();
1776     // All remaining fetchers are still suspended, the player has shutdown
1777     // any decoders that needed it.
1778 
1779     uint32_t streamMask, resumeMask;
1780     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1781     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1782 
1783     mNewStreamMask = streamMask | resumeMask;
1784 
1785     int64_t timeUs;
1786     int32_t pickTrack;
1787     bool switching = false;
1788     CHECK(msg->findInt64("timeUs", &timeUs));
1789     CHECK(msg->findInt32("pickTrack", &pickTrack));
1790 
1791     if (timeUs < 0ll) {
1792         if (!pickTrack) {
1793             // mSwapMask contains streams that are in both old and new variant,
1794             // (in mNewStreamMask & mStreamMask) but with different URIs
1795             // (not in resumeMask).
1796             // For example, old variant has video and audio in two separate
1797             // URIs, and new variant has only audio with unchanged URI. mSwapMask
1798             // should be 0 as there is nothing to swap. We only need to stop video,
1799             // and resume audio.
1800             mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1801             switching = (mSwapMask != 0);
1802         }
1803         mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1804     } else {
1805         mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1806     }
1807 
1808     ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1809             "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1810             (long long)timeUs, switching, pickTrack,
1811             mStreamMask, mNewStreamMask, mSwapMask);
1812 
1813     for (size_t i = 0; i < kMaxStreams; ++i) {
1814         if (streamMask & indexToType(i)) {
1815             if (switching) {
1816                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1817             } else {
1818                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1819             }
1820         }
1821     }
1822 
1823     // Of all existing fetchers:
1824     // * Resume fetchers that are still needed and assign them original packet sources.
1825     // * Mark otherwise unneeded fetchers for removal.
1826     ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1827     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1828         const AString &uri = mFetcherInfos.keyAt(i);
1829         if (!resumeFetcher(uri, resumeMask, timeUs)) {
1830             ALOGV("marking fetcher-%d to be removed",
1831                     mFetcherInfos[i].mFetcher->getFetcherID());
1832 
1833             mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1834         }
1835     }
1836 
1837     // streamMask now only contains the types that need a new fetcher created.
1838     if (streamMask != 0) {
1839         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1840     }
1841 
1842     // Find out when the original fetchers have buffered up to and start the new fetchers
1843     // at a later timestamp.
1844     for (size_t i = 0; i < kMaxStreams; i++) {
1845         if (!(indexToType(i) & streamMask)) {
1846             continue;
1847         }
1848 
1849         AString uri;
1850         uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1851 
1852         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1853         CHECK(fetcher != NULL);
1854 
1855         HLSTime startTime;
1856         SeekMode seekMode = kSeekModeExactPosition;
1857         sp<AnotherPacketSource> sources[kNumSources];
1858 
1859         if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1860             startTime = latestMediaSegmentStartTime();
1861         }
1862 
1863         // TRICKY: looping from i as earlier streams are already removed from streamMask
1864         for (size_t j = i; j < kMaxStreams; ++j) {
1865             const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1866             if ((streamMask & indexToType(j)) && uri == streamUri) {
1867                 sources[j] = mPacketSources.valueFor(indexToType(j));
1868 
1869                 if (timeUs >= 0) {
1870                     startTime.mTimeUs = timeUs;
1871                 } else {
1872                     int32_t type;
1873                     sp<AMessage> meta;
1874                     if (!switching) {
1875                         // selecting, or adapting but no swap required
1876                         meta = sources[j]->getLatestDequeuedMeta();
1877                     } else {
1878                         // adapting and swap required
1879                         meta = sources[j]->getLatestEnqueuedMeta();
1880                         if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1881                             // switching up
1882                             meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1883                         }
1884                     }
1885 
1886                     if ((j == kAudioIndex || j == kVideoIndex)
1887                             && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1888                         HLSTime tmpTime(meta);
1889                         if (startTime < tmpTime) {
1890                             startTime = tmpTime;
1891                         }
1892                     }
1893 
1894                     if (!switching) {
1895                         // selecting, or adapting but no swap required
1896                         sources[j]->clear();
1897                         if (j == kSubtitleIndex) {
1898                             break;
1899                         }
1900 
1901                         ALOGV("stream[%zu]: queue format change", j);
1902                         sources[j]->queueDiscontinuity(
1903                                 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1904                     } else {
1905                         // switching, queue discontinuities after resume
1906                         sources[j] = mPacketSources2.valueFor(indexToType(j));
1907                         sources[j]->clear();
1908                         // the new fetcher might be providing streams that used to be
1909                         // provided by two different fetchers,  if one of the fetcher
1910                         // paused in the middle while the other somehow paused in next
1911                         // seg, we have to start from next seg.
1912                         if (seekMode < mStreams[j].mSeekMode) {
1913                             seekMode = mStreams[j].mSeekMode;
1914                         }
1915                     }
1916                 }
1917 
1918                 streamMask &= ~indexToType(j);
1919             }
1920         }
1921 
1922         ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1923                 "segmentStartTimeUs %lld seekMode %d",
1924                 fetcher->getFetcherID(),
1925                 (long long)startTime.mTimeUs,
1926                 (long long)mLastSeekTimeUs,
1927                 (long long)startTime.getSegmentTimeUs(),
1928                 seekMode);
1929 
1930         // Set the target segment start time to the middle point of the
1931         // segment where the last sample was.
1932         // This gives a better guess if segments of the two variants are not
1933         // perfectly aligned. (If the corresponding segment in new variant
1934         // starts slightly later than that in the old variant, we still want
1935         // to pick that segment, not the one before)
1936         fetcher->startAsync(
1937                 sources[kAudioIndex],
1938                 sources[kVideoIndex],
1939                 sources[kSubtitleIndex],
1940                 getMetadataSource(sources, mNewStreamMask, switching),
1941                 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1942                 startTime.getSegmentTimeUs(),
1943                 startTime.mSeq,
1944                 seekMode);
1945     }
1946 
1947     // All fetchers have now been started, the configuration change
1948     // has completed.
1949 
1950     mReconfigurationInProgress = false;
1951     if (switching) {
1952         mSwitchInProgress = true;
1953     } else {
1954         mStreamMask = mNewStreamMask;
1955         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1956             ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1957                     mOrigBandwidthIndex, mCurBandwidthIndex);
1958             mOrigBandwidthIndex = mCurBandwidthIndex;
1959         }
1960     }
1961 
1962     ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1963             mSwitchInProgress, mStreamMask);
1964 
1965     if (mDisconnectReplyID != NULL) {
1966         finishDisconnect();
1967     }
1968 }
1969 
swapPacketSource(StreamType stream)1970 void LiveSession::swapPacketSource(StreamType stream) {
1971     ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1972 
1973     // transfer packets from source2 to source
1974     sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1975     sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1976 
1977     // queue discontinuity in mPacketSource
1978     aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1979 
1980     // queue packets in mPacketSource2 to mPacketSource
1981     status_t finalResult = OK;
1982     sp<ABuffer> accessUnit;
1983     while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1984           OK == aps2->dequeueAccessUnit(&accessUnit)) {
1985         aps->queueAccessUnit(accessUnit);
1986     }
1987     aps2->clear();
1988 }
1989 
tryToFinishBandwidthSwitch(const AString & oldUri)1990 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1991     if (!mSwitchInProgress) {
1992         return;
1993     }
1994 
1995     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1996     if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1997         return;
1998     }
1999 
2000     // Swap packet source of streams provided by old variant
2001     for (size_t idx = 0; idx < kMaxStreams; idx++) {
2002         StreamType stream = indexToType(idx);
2003         if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
2004             swapPacketSource(stream);
2005 
2006             if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2007                 ALOGW("swapping stream type %d %s to empty stream",
2008                         stream, mStreams[idx].mUri.c_str());
2009             }
2010             mStreams[idx].mUri = mStreams[idx].mNewUri;
2011             mStreams[idx].mNewUri.clear();
2012 
2013             mSwapMask &= ~stream;
2014         }
2015     }
2016 
2017     mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2018 
2019     ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2020     if (mSwapMask != 0) {
2021         return;
2022     }
2023 
2024     // Check if new variant contains extra streams.
2025     uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2026     while (extraStreams) {
2027         StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2028         extraStreams &= ~stream;
2029 
2030         swapPacketSource(stream);
2031 
2032         ssize_t idx = typeToIndex(stream);
2033         CHECK(idx >= 0);
2034         if (mStreams[idx].mNewUri.empty()) {
2035             ALOGW("swapping extra stream type %d %s to empty stream",
2036                     stream, mStreams[idx].mUri.c_str());
2037         }
2038         mStreams[idx].mUri = mStreams[idx].mNewUri;
2039         mStreams[idx].mNewUri.clear();
2040     }
2041 
2042     // Restart new fetcher (it was paused after the first 47k block)
2043     // and let it fetch into mPacketSources (not mPacketSources2)
2044     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2045         FetcherInfo &info = mFetcherInfos.editValueAt(i);
2046         if (info.mToBeResumed) {
2047             resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2048             info.mToBeResumed = false;
2049         }
2050     }
2051 
2052     ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2053             mOrigBandwidthIndex, mCurBandwidthIndex);
2054 
2055     mStreamMask = mNewStreamMask;
2056     mSwitchInProgress = false;
2057     mOrigBandwidthIndex = mCurBandwidthIndex;
2058 
2059     restartPollBuffering();
2060 }
2061 
schedulePollBuffering()2062 void LiveSession::schedulePollBuffering() {
2063     sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2064     msg->setInt32("generation", mPollBufferingGeneration);
2065     msg->post(1000000ll);
2066 }
2067 
cancelPollBuffering()2068 void LiveSession::cancelPollBuffering() {
2069     ++mPollBufferingGeneration;
2070     mPrevBufferPercentage = -1;
2071 }
2072 
restartPollBuffering()2073 void LiveSession::restartPollBuffering() {
2074     cancelPollBuffering();
2075     onPollBuffering();
2076 }
2077 
onPollBuffering()2078 void LiveSession::onPollBuffering() {
2079     ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2080             "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2081         mSwitchInProgress, mReconfigurationInProgress,
2082         mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2083 
2084     bool underflow, ready, down, up;
2085     if (checkBuffering(underflow, ready, down, up)) {
2086         if (mInPreparationPhase) {
2087             // Allow down switch even if we're still preparing.
2088             //
2089             // Some streams have a high bandwidth index as default,
2090             // when bandwidth is low, it takes a long time to buffer
2091             // to ready mark, then it immediately pauses after start
2092             // as we have to do a down switch. It's better experience
2093             // to restart from a lower index, if we detect low bw.
2094             if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2095                 postPrepared(OK);
2096             }
2097         }
2098 
2099         if (!mInPreparationPhase) {
2100             if (ready) {
2101                 stopBufferingIfNecessary();
2102             } else if (underflow) {
2103                 startBufferingIfNecessary();
2104             }
2105             switchBandwidthIfNeeded(up, down);
2106         }
2107     }
2108 
2109     schedulePollBuffering();
2110 }
2111 
cancelBandwidthSwitch(bool resume)2112 void LiveSession::cancelBandwidthSwitch(bool resume) {
2113     ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2114             mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2115     if (!mSwitchInProgress) {
2116         return;
2117     }
2118 
2119     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2120         FetcherInfo& info = mFetcherInfos.editValueAt(i);
2121         if (info.mToBeRemoved) {
2122             info.mToBeRemoved = false;
2123             if (resume) {
2124                 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2125             }
2126         }
2127     }
2128 
2129     for (size_t i = 0; i < kMaxStreams; ++i) {
2130         AString newUri = mStreams[i].mNewUri;
2131         if (!newUri.empty()) {
2132             // clear all mNewUri matching this newUri
2133             for (size_t j = i; j < kMaxStreams; ++j) {
2134                 if (mStreams[j].mNewUri == newUri) {
2135                     mStreams[j].mNewUri.clear();
2136                 }
2137             }
2138             ALOGV("stopping newUri = %s", newUri.c_str());
2139             ssize_t index = mFetcherInfos.indexOfKey(newUri);
2140             if (index < 0) {
2141                 ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2142                 continue;
2143             }
2144             FetcherInfo &info = mFetcherInfos.editValueAt(index);
2145             info.mToBeRemoved = true;
2146             info.mFetcher->stopAsync();
2147         }
2148     }
2149 
2150     ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2151             mOrigBandwidthIndex, mCurBandwidthIndex);
2152 
2153     mSwitchGeneration++;
2154     mSwitchInProgress = false;
2155     mCurBandwidthIndex = mOrigBandwidthIndex;
2156     mSwapMask = 0;
2157 }
2158 
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2159 bool LiveSession::checkBuffering(
2160         bool &underflow, bool &ready, bool &down, bool &up) {
2161     underflow = ready = down = up = false;
2162 
2163     if (mReconfigurationInProgress) {
2164         ALOGV("Switch/Reconfig in progress, defer buffer polling");
2165         return false;
2166     }
2167 
2168     size_t activeCount, underflowCount, readyCount, downCount, upCount;
2169     activeCount = underflowCount = readyCount = downCount = upCount =0;
2170     int32_t minBufferPercent = -1;
2171     int64_t durationUs;
2172     if (getDuration(&durationUs) != OK) {
2173         durationUs = -1;
2174     }
2175     for (size_t i = 0; i < mPacketSources.size(); ++i) {
2176         // we don't check subtitles for buffering level
2177         if (!(mStreamMask & mPacketSources.keyAt(i)
2178                 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2179             continue;
2180         }
2181         // ignore streams that never had any packet queued.
2182         // (it's possible that the variant only has audio or video)
2183         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2184         if (meta == NULL) {
2185             continue;
2186         }
2187 
2188         status_t finalResult;
2189         int64_t bufferedDurationUs =
2190                 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2191         ALOGV("[%s] buffered %lld us",
2192                 getNameForStream(mPacketSources.keyAt(i)),
2193                 (long long)bufferedDurationUs);
2194         if (durationUs >= 0) {
2195             int32_t percent;
2196             if (mPacketSources[i]->isFinished(0 /* duration */)) {
2197                 percent = 100;
2198             } else {
2199                 percent = (int32_t)(100.0 *
2200                         (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2201             }
2202             if (minBufferPercent < 0 || percent < minBufferPercent) {
2203                 minBufferPercent = percent;
2204             }
2205         }
2206 
2207         ++activeCount;
2208         int64_t readyMarkUs =
2209             (mInPreparationPhase ?
2210                 mBufferingSettings.mInitialMarkMs :
2211                 mBufferingSettings.mResumePlaybackMarkMs) * 1000ll;
2212         if (bufferedDurationUs > readyMarkUs
2213                 || mPacketSources[i]->isFinished(0)) {
2214             ++readyCount;
2215         }
2216         if (!mPacketSources[i]->isFinished(0)) {
2217             if (bufferedDurationUs < kUnderflowMarkMs * 1000ll) {
2218                 ++underflowCount;
2219             }
2220             if (bufferedDurationUs > mUpSwitchMark) {
2221                 ++upCount;
2222             }
2223             if (bufferedDurationUs < mDownSwitchMark) {
2224                 ++downCount;
2225             }
2226         }
2227     }
2228 
2229     if (minBufferPercent >= 0) {
2230         notifyBufferingUpdate(minBufferPercent);
2231     }
2232 
2233     if (activeCount > 0) {
2234         up        = (upCount == activeCount);
2235         down      = (downCount > 0);
2236         ready     = (readyCount == activeCount);
2237         underflow = (underflowCount > 0);
2238         return true;
2239     }
2240 
2241     return false;
2242 }
2243 
startBufferingIfNecessary()2244 void LiveSession::startBufferingIfNecessary() {
2245     ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2246             mInPreparationPhase, mBuffering);
2247     if (!mBuffering) {
2248         mBuffering = true;
2249 
2250         sp<AMessage> notify = mNotify->dup();
2251         notify->setInt32("what", kWhatBufferingStart);
2252         notify->post();
2253     }
2254 }
2255 
stopBufferingIfNecessary()2256 void LiveSession::stopBufferingIfNecessary() {
2257     ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2258             mInPreparationPhase, mBuffering);
2259 
2260     if (mBuffering) {
2261         mBuffering = false;
2262 
2263         sp<AMessage> notify = mNotify->dup();
2264         notify->setInt32("what", kWhatBufferingEnd);
2265         notify->post();
2266     }
2267 }
2268 
notifyBufferingUpdate(int32_t percentage)2269 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2270     if (percentage < mPrevBufferPercentage) {
2271         percentage = mPrevBufferPercentage;
2272     } else if (percentage > 100) {
2273         percentage = 100;
2274     }
2275 
2276     mPrevBufferPercentage = percentage;
2277 
2278     ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2279 
2280     sp<AMessage> notify = mNotify->dup();
2281     notify->setInt32("what", kWhatBufferingUpdate);
2282     notify->setInt32("percentage", percentage);
2283     notify->post();
2284 }
2285 
tryBandwidthFallback()2286 bool LiveSession::tryBandwidthFallback() {
2287     if (mInPreparationPhase || mReconfigurationInProgress) {
2288         // Don't try fallback during prepare or reconfig.
2289         // If error happens there, it's likely unrecoverable.
2290         return false;
2291     }
2292     if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2293         // if we're switching up, simply cancel and resume old variant
2294         cancelBandwidthSwitch(true /* resume */);
2295         return true;
2296     } else {
2297         // if we're switching down, we're likely about to underflow (if
2298         // not already underflowing). try the lowest viable bandwidth if
2299         // not on that variant already.
2300         ssize_t lowestValid = getLowestValidBandwidthIndex();
2301         if (mCurBandwidthIndex > lowestValid) {
2302             cancelBandwidthSwitch();
2303             changeConfiguration(-1ll, lowestValid);
2304             return true;
2305         }
2306     }
2307     // return false if we couldn't find any fallback
2308     return false;
2309 }
2310 
2311 /*
2312  * returns true if a bandwidth switch is actually needed (and started),
2313  * returns false otherwise
2314  */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2315 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2316     // no need to check bandwidth if we only have 1 bandwidth settings
2317     if (mBandwidthItems.size() < 2) {
2318         return false;
2319     }
2320 
2321     if (mSwitchInProgress) {
2322         if (mBuffering) {
2323             tryBandwidthFallback();
2324         }
2325         return false;
2326     }
2327 
2328     int32_t bandwidthBps, shortTermBps;
2329     bool isStable;
2330     if (mBandwidthEstimator->estimateBandwidth(
2331             &bandwidthBps, &isStable, &shortTermBps)) {
2332         ALOGV("bandwidth estimated at %.2f kbps, "
2333                 "stable %d, shortTermBps %.2f kbps",
2334                 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2335         mLastBandwidthBps = bandwidthBps;
2336         mLastBandwidthStable = isStable;
2337     } else {
2338         ALOGV("no bandwidth estimate.");
2339         return false;
2340     }
2341 
2342     int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2343     // canSwithDown and canSwitchUp can't both be true.
2344     // we only want to switch up when measured bw is 120% higher than current variant,
2345     // and we only want to switch down when measured bw is below current variant.
2346     bool canSwitchDown = bufferLow
2347             && (bandwidthBps < (int32_t)curBandwidth);
2348     bool canSwitchUp = bufferHigh
2349             && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2350 
2351     if (canSwitchDown || canSwitchUp) {
2352         // bandwidth estimating has some delay, if we have to downswitch when
2353         // it hasn't stabilized, use the short term to guess real bandwidth,
2354         // since it may be dropping too fast.
2355         // (note this doesn't apply to upswitch, always use longer average there)
2356         if (!isStable && canSwitchDown) {
2357             if (shortTermBps < bandwidthBps) {
2358                 bandwidthBps = shortTermBps;
2359             }
2360         }
2361 
2362         ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2363 
2364         // it's possible that we're checking for canSwitchUp case, but the returned
2365         // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2366         // of measured bw. In that case we don't want to do anything, since we have
2367         // both enough buffer and enough bw.
2368         if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2369          || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2370             // if not yet prepared, just restart again with new bw index.
2371             // this is faster and playback experience is cleaner.
2372             changeConfiguration(
2373                     mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2374             return true;
2375         }
2376     }
2377     return false;
2378 }
2379 
postError(status_t err)2380 void LiveSession::postError(status_t err) {
2381     // if we reached EOS, notify buffering of 100%
2382     if (err == ERROR_END_OF_STREAM) {
2383         notifyBufferingUpdate(100);
2384     }
2385     // we'll stop buffer polling now, before that notify
2386     // stop buffering to stop the spinning icon
2387     stopBufferingIfNecessary();
2388     cancelPollBuffering();
2389 
2390     sp<AMessage> notify = mNotify->dup();
2391     notify->setInt32("what", kWhatError);
2392     notify->setInt32("err", err);
2393     notify->post();
2394 }
2395 
postPrepared(status_t err)2396 void LiveSession::postPrepared(status_t err) {
2397     CHECK(mInPreparationPhase);
2398 
2399     sp<AMessage> notify = mNotify->dup();
2400     if (err == OK || err == ERROR_END_OF_STREAM) {
2401         notify->setInt32("what", kWhatPrepared);
2402     } else {
2403         cancelPollBuffering();
2404 
2405         notify->setInt32("what", kWhatPreparationFailed);
2406         notify->setInt32("err", err);
2407     }
2408 
2409     notify->post();
2410 
2411     mInPreparationPhase = false;
2412 }
2413 
2414 
2415 }  // namespace android
2416 
2417