1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20 
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25 
26 #include <mpeg2ts/AnotherPacketSource.h>
27 
28 #include <cutils/properties.h>
29 #include <media/MediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37 #include <media/stagefright/FoundationUtils.h>
38 
39 #include <utils/Mutex.h>
40 
41 #include <ctype.h>
42 #include <inttypes.h>
43 
44 namespace android {
45 
46 // static
47 // Bandwidth Switch Mark Defaults
48 const int64_t LiveSession::kUpSwitchMarkUs = 15000000LL;
49 const int64_t LiveSession::kDownSwitchMarkUs = 20000000LL;
50 const int64_t LiveSession::kUpSwitchMarginUs = 5000000LL;
51 const int64_t LiveSession::kResumeThresholdUs = 100000LL;
52 
53 //TODO: redefine this mark to a fair value
54 // default buffer underflow mark
55 static const int kUnderflowMarkMs = 1000;  // 1 second
56 
57 struct LiveSession::BandwidthEstimator : public RefBase {
58     BandwidthEstimator();
59 
60     void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61     bool estimateBandwidth(
62             int32_t *bandwidth,
63             bool *isStable = NULL,
64             int32_t *shortTermBps = NULL);
65 
66 private:
67     // Bandwidth estimation parameters
68     static const int32_t kShortTermBandwidthItems = 3;
69     static const int32_t kMinBandwidthHistoryItems = 20;
70     static const int64_t kMinBandwidthHistoryWindowUs = 5000000LL; // 5 sec
71     static const int64_t kMaxBandwidthHistoryWindowUs = 30000000LL; // 30 sec
72     static const int64_t kMaxBandwidthHistoryAgeUs = 60000000LL; // 60 sec
73 
74     struct BandwidthEntry {
75         int64_t mTimestampUs;
76         int64_t mDelayUs;
77         size_t mNumBytes;
78     };
79 
80     Mutex mLock;
81     List<BandwidthEntry> mBandwidthHistory;
82     List<int32_t> mPrevEstimates;
83     int32_t mShortTermEstimate;
84     bool mHasNewSample;
85     bool mIsStable;
86     int64_t mTotalTransferTimeUs;
87     size_t mTotalTransferBytes;
88 
89     DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
90 };
91 
BandwidthEstimator()92 LiveSession::BandwidthEstimator::BandwidthEstimator() :
93     mShortTermEstimate(0),
94     mHasNewSample(false),
95     mIsStable(true),
96     mTotalTransferTimeUs(0),
97     mTotalTransferBytes(0) {
98 }
99 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)100 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
101         size_t numBytes, int64_t delayUs) {
102     AutoMutex autoLock(mLock);
103 
104     int64_t nowUs = ALooper::GetNowUs();
105     BandwidthEntry entry;
106     entry.mTimestampUs = nowUs;
107     entry.mDelayUs = delayUs;
108     entry.mNumBytes = numBytes;
109     mTotalTransferTimeUs += delayUs;
110     mTotalTransferBytes += numBytes;
111     mBandwidthHistory.push_back(entry);
112     mHasNewSample = true;
113 
114     // Remove no more than 10% of total transfer time at a time
115     // to avoid sudden jump on bandwidth estimation. There might
116     // be long blocking reads that takes up signification time,
117     // we have to keep a longer window in that case.
118     int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
119     if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
120         bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
121     } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
122         bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
123     }
124     // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
125     // and total transfer time at least kMaxBandwidthHistoryWindowUs.
126     while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
127         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
128         // remove sample if either absolute age or total transfer time is
129         // over kMaxBandwidthHistoryWindowUs
130         if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
131                 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
132             break;
133         }
134         mTotalTransferTimeUs -= it->mDelayUs;
135         mTotalTransferBytes -= it->mNumBytes;
136         mBandwidthHistory.erase(mBandwidthHistory.begin());
137     }
138 }
139 
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)140 bool LiveSession::BandwidthEstimator::estimateBandwidth(
141         int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
142     AutoMutex autoLock(mLock);
143 
144     if (mBandwidthHistory.size() < 2) {
145         return false;
146     }
147 
148     if (!mHasNewSample) {
149         *bandwidthBps = *(--mPrevEstimates.end());
150         if (isStable) {
151             *isStable = mIsStable;
152         }
153         if (shortTermBps) {
154             *shortTermBps = mShortTermEstimate;
155         }
156         return true;
157     }
158 
159     *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
160     mPrevEstimates.push_back(*bandwidthBps);
161     while (mPrevEstimates.size() > 3) {
162         mPrevEstimates.erase(mPrevEstimates.begin());
163     }
164     mHasNewSample = false;
165 
166     int64_t totalTimeUs = 0;
167     size_t totalBytes = 0;
168     if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
169         List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
170         for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
171             totalTimeUs += it->mDelayUs;
172             totalBytes += it->mNumBytes;
173         }
174     }
175     mShortTermEstimate = totalTimeUs > 0 ?
176             (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
177     if (shortTermBps) {
178         *shortTermBps = mShortTermEstimate;
179     }
180 
181     int64_t minEstimate = -1, maxEstimate = -1;
182     List<int32_t>::iterator it;
183     for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
184         int32_t estimate = *it;
185         if (minEstimate < 0 || minEstimate > estimate) {
186             minEstimate = estimate;
187         }
188         if (maxEstimate < 0 || maxEstimate < estimate) {
189             maxEstimate = estimate;
190         }
191     }
192     // consider it stable if long-term average is not jumping a lot
193     // and short-term average is not much lower than long-term average
194     mIsStable = (maxEstimate <= minEstimate * 4 / 3)
195             && mShortTermEstimate > minEstimate * 7 / 10;
196     if (isStable) {
197         *isStable = mIsStable;
198     }
199 
200 #if 0
201     {
202         char dumpStr[1024] = {0};
203         size_t itemIdx = 0;
204         size_t histSize = mBandwidthHistory.size();
205         sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
206             *bandwidthBps, mIsStable, histSize);
207         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
208         for (; it != mBandwidthHistory.end(); ++it) {
209             if (itemIdx > 50) {
210                 sprintf(dumpStr + strlen(dumpStr),
211                         "...(%zd more items)... }", histSize - itemIdx);
212                 break;
213             }
214             sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
215                 it->mNumBytes / 1024,
216                 (double)it->mDelayUs * 1.0e-6,
217                 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
218             itemIdx++;
219         }
220         ALOGE(dumpStr);
221     }
222 #endif
223     return true;
224 }
225 
226 //static
getKeyForStream(StreamType type)227 const char *LiveSession::getKeyForStream(StreamType type) {
228     switch (type) {
229         case STREAMTYPE_VIDEO:
230             return "timeUsVideo";
231         case STREAMTYPE_AUDIO:
232             return "timeUsAudio";
233         case STREAMTYPE_SUBTITLES:
234             return "timeUsSubtitle";
235         case STREAMTYPE_METADATA:
236             return "timeUsMetadata"; // unused
237         default:
238             TRESPASS();
239     }
240     return NULL;
241 }
242 
243 //static
getNameForStream(StreamType type)244 const char *LiveSession::getNameForStream(StreamType type) {
245     switch (type) {
246         case STREAMTYPE_VIDEO:
247             return "video";
248         case STREAMTYPE_AUDIO:
249             return "audio";
250         case STREAMTYPE_SUBTITLES:
251             return "subs";
252         case STREAMTYPE_METADATA:
253             return "metadata";
254         default:
255             break;
256     }
257     return "unknown";
258 }
259 
260 //static
getSourceTypeForStream(StreamType type)261 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
262     switch (type) {
263         case STREAMTYPE_VIDEO:
264             return ATSParser::VIDEO;
265         case STREAMTYPE_AUDIO:
266             return ATSParser::AUDIO;
267         case STREAMTYPE_METADATA:
268             return ATSParser::META;
269         case STREAMTYPE_SUBTITLES:
270         default:
271             TRESPASS();
272     }
273     return ATSParser::NUM_SOURCE_TYPES; // should not reach here
274 }
275 
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<MediaHTTPService> & httpService)276 LiveSession::LiveSession(
277         const sp<AMessage> &notify, uint32_t flags,
278         const sp<MediaHTTPService> &httpService)
279     : mNotify(notify),
280       mFlags(flags),
281       mHTTPService(httpService),
282       mBuffering(false),
283       mInPreparationPhase(true),
284       mPollBufferingGeneration(0),
285       mPrevBufferPercentage(-1),
286       mCurBandwidthIndex(-1),
287       mOrigBandwidthIndex(-1),
288       mLastBandwidthBps(-1LL),
289       mLastBandwidthStable(false),
290       mBandwidthEstimator(new BandwidthEstimator()),
291       mMaxWidth(720),
292       mMaxHeight(480),
293       mStreamMask(0),
294       mNewStreamMask(0),
295       mSwapMask(0),
296       mSwitchGeneration(0),
297       mSubtitleGeneration(0),
298       mLastDequeuedTimeUs(0LL),
299       mRealTimeBaseUs(0LL),
300       mReconfigurationInProgress(false),
301       mSwitchInProgress(false),
302       mUpSwitchMark(kUpSwitchMarkUs),
303       mDownSwitchMark(kDownSwitchMarkUs),
304       mUpSwitchMargin(kUpSwitchMarginUs),
305       mFirstTimeUsValid(false),
306       mFirstTimeUs(0),
307       mLastSeekTimeUs(0),
308       mHasMetadata(false) {
309     mStreams[kAudioIndex] = StreamItem("audio");
310     mStreams[kVideoIndex] = StreamItem("video");
311     mStreams[kSubtitleIndex] = StreamItem("subtitles");
312 
313     for (size_t i = 0; i < kNumSources; ++i) {
314         mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315         mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
316     }
317 }
318 
~LiveSession()319 LiveSession::~LiveSession() {
320     if (mFetcherLooper != NULL) {
321         mFetcherLooper->stop();
322     }
323 }
324 
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)325 int64_t LiveSession::calculateMediaTimeUs(
326         int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
327     if (timeUs >= firstTimeUs) {
328         timeUs -= firstTimeUs;
329     } else {
330         timeUs = 0;
331     }
332     timeUs += mLastSeekTimeUs;
333     if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
334         timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
335     }
336     return timeUs;
337 }
338 
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)339 status_t LiveSession::dequeueAccessUnit(
340         StreamType stream, sp<ABuffer> *accessUnit) {
341     status_t finalResult = OK;
342     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
343 
344     ssize_t streamIdx = typeToIndex(stream);
345     if (streamIdx < 0) {
346         return BAD_VALUE;
347     }
348     const char *streamStr = getNameForStream(stream);
349     // Do not let client pull data if we don't have data packets yet.
350     // We might only have a format discontinuity queued without data.
351     // When NuPlayerDecoder dequeues the format discontinuity, it will
352     // immediately try to getFormat. If we return NULL, NuPlayerDecoder
353     // thinks it can do seamless change, so will not shutdown decoder.
354     // When the actual format arrives, it can't handle it and get stuck.
355     if (!packetSource->hasDataBufferAvailable(&finalResult)) {
356         ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
357                 streamStr, finalResult);
358 
359         if (finalResult == OK) {
360             return -EAGAIN;
361         } else {
362             return finalResult;
363         }
364     }
365 
366     // Let the client dequeue as long as we have buffers available
367     // Do not make pause/resume decisions here.
368 
369     status_t err = packetSource->dequeueAccessUnit(accessUnit);
370 
371     if (err == INFO_DISCONTINUITY) {
372         // adaptive streaming, discontinuities in the playlist
373         int32_t type;
374         CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
375 
376         sp<AMessage> extra;
377         if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
378             extra.clear();
379         }
380 
381         ALOGI("[%s] read discontinuity of type %d, extra = %s",
382               streamStr,
383               type,
384               extra == NULL ? "NULL" : extra->debugString().c_str());
385     } else if (err == OK) {
386 
387         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
388             int64_t timeUs, originalTimeUs;
389             int32_t discontinuitySeq = 0;
390             StreamItem& strm = mStreams[streamIdx];
391             CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
392             originalTimeUs = timeUs;
393             (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
394             if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
395                 int64_t offsetTimeUs;
396                 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
397                     offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
398                 } else {
399                     offsetTimeUs = 0;
400                 }
401 
402                 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
403                         && strm.mLastDequeuedTimeUs >= 0) {
404                     int64_t firstTimeUs;
405                     firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
406                     offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
407                     offsetTimeUs += strm.mLastSampleDurationUs;
408                 } else {
409                     offsetTimeUs += strm.mLastSampleDurationUs;
410                 }
411 
412                 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
413                 strm.mCurDiscontinuitySeq = discontinuitySeq;
414             }
415 
416             int32_t discard = 0;
417             int64_t firstTimeUs;
418             if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
419                 int64_t durUs; // approximate sample duration
420                 if (timeUs > strm.mLastDequeuedTimeUs) {
421                     durUs = timeUs - strm.mLastDequeuedTimeUs;
422                 } else {
423                     durUs = strm.mLastDequeuedTimeUs - timeUs;
424                 }
425                 strm.mLastSampleDurationUs = durUs;
426                 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
427             } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
428                 firstTimeUs = timeUs;
429             } else {
430                 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
431                 firstTimeUs = timeUs;
432             }
433 
434             strm.mLastDequeuedTimeUs = timeUs;
435             timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
436 
437             ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
438                     streamStr, (long long)timeUs, (long long)originalTimeUs);
439             (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
440             mLastDequeuedTimeUs = timeUs;
441             mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
442         } else if (stream == STREAMTYPE_SUBTITLES) {
443             int32_t subtitleGeneration;
444             if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
445                     && subtitleGeneration != mSubtitleGeneration) {
446                return -EAGAIN;
447             };
448             (*accessUnit)->meta()->setInt32(
449                     "track-index", mPlaylist->getSelectedIndex());
450             (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
451         } else if (stream == STREAMTYPE_METADATA) {
452             HLSTime mdTime((*accessUnit)->meta());
453             if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
454                 packetSource->requeueAccessUnit((*accessUnit));
455                 return -EAGAIN;
456             } else {
457                 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
458                 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
459                 (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
460                 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
461             }
462         }
463     } else {
464         ALOGI("[%s] encountered error %d", streamStr, err);
465     }
466 
467     return err;
468 }
469 
getStreamFormatMeta(StreamType stream,sp<MetaData> * meta)470 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) {
471     if (!(mStreamMask & stream)) {
472         return UNKNOWN_ERROR;
473     }
474 
475     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
476 
477     *meta = packetSource->getFormat();
478 
479     if (*meta == NULL) {
480         return -EWOULDBLOCK;
481     }
482 
483     if (stream == STREAMTYPE_AUDIO) {
484         // set AAC input buffer size to 32K bytes (256kbps x 1sec)
485         (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024);
486     } else if (stream == STREAMTYPE_VIDEO) {
487         (*meta)->setInt32(kKeyMaxWidth, mMaxWidth);
488         (*meta)->setInt32(kKeyMaxHeight, mMaxHeight);
489     }
490 
491     return OK;
492 }
493 
getHTTPDownloader()494 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
495     return new HTTPDownloader(mHTTPService, mExtraHeaders);
496 }
497 
setBufferingSettings(const BufferingSettings & buffering)498 void LiveSession::setBufferingSettings(
499         const BufferingSettings &buffering) {
500     sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this);
501     writeToAMessage(msg, buffering);
502     msg->post();
503 }
504 
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)505 void LiveSession::connectAsync(
506         const char *url, const KeyedVector<String8, String8> *headers) {
507     sp<AMessage> msg = new AMessage(kWhatConnect, this);
508     msg->setString("url", url);
509 
510     if (headers != NULL) {
511         msg->setPointer(
512                 "headers",
513                 new KeyedVector<String8, String8>(*headers));
514     }
515 
516     msg->post();
517 }
518 
disconnect()519 status_t LiveSession::disconnect() {
520     sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
521 
522     sp<AMessage> response;
523     status_t err = msg->postAndAwaitResponse(&response);
524 
525     return err;
526 }
527 
seekTo(int64_t timeUs,MediaPlayerSeekMode mode)528 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) {
529     sp<AMessage> msg = new AMessage(kWhatSeek, this);
530     msg->setInt64("timeUs", timeUs);
531     msg->setInt32("mode", mode);
532 
533     sp<AMessage> response;
534     status_t err = msg->postAndAwaitResponse(&response);
535 
536     return err;
537 }
538 
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)539 bool LiveSession::checkSwitchProgress(
540         sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
541     AString newUri;
542     CHECK(stopParams->findString("uri", &newUri));
543 
544     *needResumeUntil = false;
545     sp<AMessage> firstNewMeta[kMaxStreams];
546     for (size_t i = 0; i < kMaxStreams; ++i) {
547         StreamType stream = indexToType(i);
548         if (!(mSwapMask & mNewStreamMask & stream)
549             || (mStreams[i].mNewUri != newUri)) {
550             continue;
551         }
552         if (stream == STREAMTYPE_SUBTITLES) {
553             continue;
554         }
555         sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
556 
557         // First, get latest dequeued meta, which is where the decoder is at.
558         // (when upswitching, we take the meta after a certain delay, so that
559         // the decoder is left with some cushion)
560         sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
561         if (delayUs > 0) {
562             lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
563             if (lastDequeueMeta == NULL) {
564                 // this means we don't have enough cushion, try again later
565                 ALOGV("[%s] up switching failed due to insufficient buffer",
566                         getNameForStream(stream));
567                 return false;
568             }
569         } else {
570             // It's okay for lastDequeueMeta to be NULL here, it means the
571             // decoder hasn't even started dequeueing
572             lastDequeueMeta = source->getLatestDequeuedMeta();
573         }
574         // Then, trim off packets at beginning of mPacketSources2 that's before
575         // the latest dequeued time. These samples are definitely too late.
576         firstNewMeta[i] = mPacketSources2.editValueAt(i)
577                             ->trimBuffersBeforeMeta(lastDequeueMeta);
578 
579         // Now firstNewMeta[i] is the first sample after the trim.
580         // If it's NULL, we failed because dequeue already past all samples
581         // in mPacketSource2, we have to try again.
582         if (firstNewMeta[i] == NULL) {
583             HLSTime dequeueTime(lastDequeueMeta);
584             ALOGV("[%s] dequeue time (%d, %lld) past start time",
585                     getNameForStream(stream),
586                     dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
587             return false;
588         }
589 
590         // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
591         // already fetched, and see if we need to resumeUntil
592         lastEnqueueMeta = source->getLatestEnqueuedMeta();
593         // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
594         // boundary, no need to resume as the content will look different anyways
595         if (lastEnqueueMeta != NULL) {
596             HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
597 
598             // no need to resume old fetcher if new fetcher started in different
599             // discontinuity sequence, as the content will look different.
600             *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
601                     && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
602 
603             // update the stopTime for resumeUntil
604             stopParams->setInt32("discontinuitySeq", startTime.mSeq);
605             stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
606         }
607     }
608 
609     // if we're here, it means dequeue progress hasn't passed some samples in
610     // mPacketSource2, we can trim off the excess in mPacketSource.
611     // (old fetcher might still need to resumeUntil the start time of new fetcher)
612     for (size_t i = 0; i < kMaxStreams; ++i) {
613         StreamType stream = indexToType(i);
614         if (!(mSwapMask & mNewStreamMask & stream)
615             || (newUri != mStreams[i].mNewUri)
616             || stream == STREAMTYPE_SUBTITLES) {
617             continue;
618         }
619         mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
620     }
621 
622     // no resumeUntil if already underflow
623     *needResumeUntil &= !mBuffering;
624 
625     return true;
626 }
627 
onMessageReceived(const sp<AMessage> & msg)628 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
629     switch (msg->what()) {
630         case kWhatSetBufferingSettings:
631         {
632             readFromAMessage(msg, &mBufferingSettings);
633             break;
634         }
635 
636         case kWhatConnect:
637         {
638             onConnect(msg);
639             break;
640         }
641 
642         case kWhatDisconnect:
643         {
644             CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
645 
646             if (mReconfigurationInProgress) {
647                 break;
648             }
649 
650             finishDisconnect();
651             break;
652         }
653 
654         case kWhatSeek:
655         {
656             if (mReconfigurationInProgress) {
657                 msg->post(50000);
658                 break;
659             }
660 
661             CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
662             mSeekReply = new AMessage;
663 
664             onSeek(msg);
665             break;
666         }
667 
668         case kWhatFetcherNotify:
669         {
670             int32_t what;
671             CHECK(msg->findInt32("what", &what));
672 
673             switch (what) {
674                 case PlaylistFetcher::kWhatStarted:
675                     break;
676                 case PlaylistFetcher::kWhatPaused:
677                 case PlaylistFetcher::kWhatStopped:
678                 {
679                     AString uri;
680                     CHECK(msg->findString("uri", &uri));
681                     ssize_t index = mFetcherInfos.indexOfKey(uri);
682                     if (index < 0) {
683                         // ignore msgs from fetchers that's already gone
684                         break;
685                     }
686 
687                     ALOGV("fetcher-%d %s",
688                             mFetcherInfos[index].mFetcher->getFetcherID(),
689                             what == PlaylistFetcher::kWhatPaused ?
690                                     "paused" : "stopped");
691 
692                     if (what == PlaylistFetcher::kWhatStopped) {
693                         mFetcherLooper->unregisterHandler(
694                                 mFetcherInfos[index].mFetcher->id());
695                         mFetcherInfos.removeItemsAt(index);
696                     } else if (what == PlaylistFetcher::kWhatPaused) {
697                         int32_t seekMode;
698                         CHECK(msg->findInt32("seekMode", &seekMode));
699                         for (size_t i = 0; i < kMaxStreams; ++i) {
700                             if (mStreams[i].mUri == uri) {
701                                 mStreams[i].mSeekMode = (SeekMode) seekMode;
702                             }
703                         }
704                     }
705 
706                     if (mContinuation != NULL) {
707                         CHECK_GT(mContinuationCounter, 0u);
708                         if (--mContinuationCounter == 0) {
709                             mContinuation->post();
710                         }
711                         ALOGV("%zu fetcher(s) left", mContinuationCounter);
712                     }
713                     break;
714                 }
715 
716                 case PlaylistFetcher::kWhatDurationUpdate:
717                 {
718                     AString uri;
719                     CHECK(msg->findString("uri", &uri));
720 
721                     int64_t durationUs;
722                     CHECK(msg->findInt64("durationUs", &durationUs));
723 
724                     ssize_t index = mFetcherInfos.indexOfKey(uri);
725                     if (index >= 0) {
726                         FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
727                         info->mDurationUs = durationUs;
728                     }
729                     break;
730                 }
731 
732                 case PlaylistFetcher::kWhatTargetDurationUpdate:
733                 {
734                     int64_t targetDurationUs;
735                     CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
736                     mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
737                     mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
738                     mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
739                     break;
740                 }
741 
742                 case PlaylistFetcher::kWhatError:
743                 {
744                     status_t err;
745                     CHECK(msg->findInt32("err", &err));
746 
747                     ALOGE("XXX Received error %d from PlaylistFetcher.", err);
748 
749                     // handle EOS on subtitle tracks independently
750                     AString uri;
751                     if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
752                         ssize_t i = mFetcherInfos.indexOfKey(uri);
753                         if (i >= 0) {
754                             const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
755                             if (fetcher != NULL) {
756                                 uint32_t type = fetcher->getStreamTypeMask();
757                                 if (type == STREAMTYPE_SUBTITLES) {
758                                     mPacketSources.valueFor(
759                                             STREAMTYPE_SUBTITLES)->signalEOS(err);;
760                                     break;
761                                 }
762                             }
763                         }
764                     }
765 
766                     // remember the failure index (as mCurBandwidthIndex will be restored
767                     // after cancelBandwidthSwitch()), and record last fail time
768                     size_t failureIndex = mCurBandwidthIndex;
769                     mBandwidthItems.editItemAt(
770                             failureIndex).mLastFailureUs = ALooper::GetNowUs();
771 
772                     if (mSwitchInProgress) {
773                         // if error happened when we switch to a variant, try fallback
774                         // to other variant to save the session
775                         if (tryBandwidthFallback()) {
776                             break;
777                         }
778                     }
779 
780                     if (mInPreparationPhase) {
781                         postPrepared(err);
782                     }
783 
784                     cancelBandwidthSwitch();
785 
786                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
787 
788                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
789 
790                     mPacketSources.valueFor(
791                             STREAMTYPE_SUBTITLES)->signalEOS(err);
792 
793                     postError(err);
794                     break;
795                 }
796 
797                 case PlaylistFetcher::kWhatStopReached:
798                 {
799                     ALOGV("kWhatStopReached");
800 
801                     AString oldUri;
802                     CHECK(msg->findString("uri", &oldUri));
803 
804                     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
805                     if (index < 0) {
806                         break;
807                     }
808 
809                     tryToFinishBandwidthSwitch(oldUri);
810                     break;
811                 }
812 
813                 case PlaylistFetcher::kWhatStartedAt:
814                 {
815                     int32_t switchGeneration;
816                     CHECK(msg->findInt32("switchGeneration", &switchGeneration));
817 
818                     ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
819                             switchGeneration, mSwitchGeneration);
820 
821                     if (switchGeneration != mSwitchGeneration) {
822                         break;
823                     }
824 
825                     AString uri;
826                     CHECK(msg->findString("uri", &uri));
827 
828                     // mark new fetcher mToBeResumed
829                     ssize_t index = mFetcherInfos.indexOfKey(uri);
830                     if (index >= 0) {
831                         mFetcherInfos.editValueAt(index).mToBeResumed = true;
832                     }
833 
834                     // temporarily disable packet sources to be swapped to prevent
835                     // NuPlayerDecoder from dequeuing while we check progress
836                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
837                         if ((mSwapMask & mPacketSources.keyAt(i))
838                                 && uri == mStreams[i].mNewUri) {
839                             mPacketSources.editValueAt(i)->enable(false);
840                         }
841                     }
842                     bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
843                     // If switching up, require a cushion bigger than kUnderflowMark
844                     // to avoid buffering immediately after the switch.
845                     // (If we don't have that cushion we'd rather cancel and try again.)
846                     int64_t delayUs =
847                         switchUp ?
848                             (kUnderflowMarkMs * 1000LL + 1000000LL)
849                             : 0;
850                     bool needResumeUntil = false;
851                     sp<AMessage> stopParams = msg;
852                     if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
853                         // playback time hasn't passed startAt time
854                         if (!needResumeUntil) {
855                             ALOGV("finish switch");
856                             for (size_t i = 0; i < kMaxStreams; ++i) {
857                                 if ((mSwapMask & indexToType(i))
858                                         && uri == mStreams[i].mNewUri) {
859                                     // have to make a copy of mStreams[i].mUri because
860                                     // tryToFinishBandwidthSwitch is modifying mStreams[]
861                                     AString oldURI = mStreams[i].mUri;
862                                     tryToFinishBandwidthSwitch(oldURI);
863                                     break;
864                                 }
865                             }
866                         } else {
867                             // startAt time is after last enqueue time
868                             // Resume fetcher for the original variant; the resumed fetcher should
869                             // continue until the timestamps found in msg, which is stored by the
870                             // new fetcher to indicate where the new variant has started buffering.
871                             ALOGV("finish switch with resumeUntilAsync");
872                             for (size_t i = 0; i < mFetcherInfos.size(); i++) {
873                                 const FetcherInfo &info = mFetcherInfos.valueAt(i);
874                                 if (info.mToBeRemoved) {
875                                     info.mFetcher->resumeUntilAsync(stopParams);
876                                 }
877                             }
878                         }
879                     } else {
880                         // playback time passed startAt time
881                         if (switchUp) {
882                             // if switching up, cancel and retry if condition satisfies again
883                             ALOGV("cancel up switch because we're too late");
884                             cancelBandwidthSwitch(true /* resume */);
885                         } else {
886                             ALOGV("retry down switch at next sample");
887                             resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
888                         }
889                     }
890                     // re-enable all packet sources
891                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
892                         mPacketSources.editValueAt(i)->enable(true);
893                     }
894 
895                     break;
896                 }
897 
898                 case PlaylistFetcher::kWhatPlaylistFetched:
899                 {
900                     onMasterPlaylistFetched(msg);
901                     break;
902                 }
903 
904                 case PlaylistFetcher::kWhatMetadataDetected:
905                 {
906                     if (!mHasMetadata) {
907                         mHasMetadata = true;
908                         sp<AMessage> notify = mNotify->dup();
909                         notify->setInt32("what", kWhatMetadataDetected);
910                         notify->post();
911                     }
912                     break;
913                 }
914 
915                 default:
916                     TRESPASS();
917             }
918 
919             break;
920         }
921 
922         case kWhatChangeConfiguration:
923         {
924             onChangeConfiguration(msg);
925             break;
926         }
927 
928         case kWhatChangeConfiguration2:
929         {
930             onChangeConfiguration2(msg);
931             break;
932         }
933 
934         case kWhatChangeConfiguration3:
935         {
936             onChangeConfiguration3(msg);
937             break;
938         }
939 
940         case kWhatPollBuffering:
941         {
942             int32_t generation;
943             CHECK(msg->findInt32("generation", &generation));
944             if (generation == mPollBufferingGeneration) {
945                 onPollBuffering();
946             }
947             break;
948         }
949 
950         default:
951             TRESPASS();
952             break;
953     }
954 }
955 
956 // static
isBandwidthValid(const BandwidthItem & item)957 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
958     static const int64_t kBlacklistWindowUs = 300 * 1000000LL;
959     return item.mLastFailureUs < 0
960             || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
961 }
962 
963 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)964 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
965     if (a->mBandwidth < b->mBandwidth) {
966         return -1;
967     } else if (a->mBandwidth == b->mBandwidth) {
968         return 0;
969     }
970 
971     return 1;
972 }
973 
974 // static
indexToType(int idx)975 LiveSession::StreamType LiveSession::indexToType(int idx) {
976     CHECK(idx >= 0 && idx < kNumSources);
977     return (StreamType)(1 << idx);
978 }
979 
980 // static
typeToIndex(int32_t type)981 ssize_t LiveSession::typeToIndex(int32_t type) {
982     switch (type) {
983         case STREAMTYPE_AUDIO:
984             return 0;
985         case STREAMTYPE_VIDEO:
986             return 1;
987         case STREAMTYPE_SUBTITLES:
988             return 2;
989         case STREAMTYPE_METADATA:
990             return 3;
991         default:
992             return -1;
993     };
994     return -1;
995 }
996 
onConnect(const sp<AMessage> & msg)997 void LiveSession::onConnect(const sp<AMessage> &msg) {
998     CHECK(msg->findString("url", &mMasterURL));
999 
1000     // TODO currently we don't know if we are coming here from incognito mode
1001     ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
1002 
1003     KeyedVector<String8, String8> *headers = NULL;
1004     if (!msg->findPointer("headers", (void **)&headers)) {
1005         mExtraHeaders.clear();
1006     } else {
1007         mExtraHeaders = *headers;
1008 
1009         delete headers;
1010         headers = NULL;
1011     }
1012 
1013     // create looper for fetchers
1014     if (mFetcherLooper == NULL) {
1015         mFetcherLooper = new ALooper();
1016 
1017         mFetcherLooper->setName("Fetcher");
1018         mFetcherLooper->start(false, /* runOnCallingThread */
1019                               true  /* canCallJava */);
1020     }
1021 
1022     // create fetcher to fetch the master playlist
1023     addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1024 }
1025 
onMasterPlaylistFetched(const sp<AMessage> & msg)1026 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1027     AString uri;
1028     CHECK(msg->findString("uri", &uri));
1029     ssize_t index = mFetcherInfos.indexOfKey(uri);
1030     if (index < 0) {
1031         ALOGW("fetcher for master playlist is gone.");
1032         return;
1033     }
1034 
1035     // no longer useful, remove
1036     mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1037     mFetcherInfos.removeItemsAt(index);
1038 
1039     CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1040     if (mPlaylist == NULL) {
1041         ALOGE("unable to fetch master playlist %s.",
1042                 uriDebugString(mMasterURL).c_str());
1043 
1044         postPrepared(ERROR_IO);
1045         return;
1046     }
1047     // We trust the content provider to make a reasonable choice of preferred
1048     // initial bandwidth by listing it first in the variant playlist.
1049     // At startup we really don't have a good estimate on the available
1050     // network bandwidth since we haven't tranferred any data yet. Once
1051     // we have we can make a better informed choice.
1052     size_t initialBandwidth = 0;
1053     size_t initialBandwidthIndex = 0;
1054 
1055     int32_t maxWidth = 0;
1056     int32_t maxHeight = 0;
1057 
1058     if (mPlaylist->isVariantPlaylist()) {
1059         Vector<BandwidthItem> itemsWithVideo;
1060         for (size_t i = 0; i < mPlaylist->size(); ++i) {
1061             BandwidthItem item;
1062 
1063             item.mPlaylistIndex = i;
1064             item.mLastFailureUs = -1LL;
1065 
1066             sp<AMessage> meta;
1067             AString uri;
1068             mPlaylist->itemAt(i, &uri, &meta);
1069 
1070             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1071 
1072             int32_t width, height;
1073             if (meta->findInt32("width", &width)) {
1074                 maxWidth = max(maxWidth, width);
1075             }
1076             if (meta->findInt32("height", &height)) {
1077                 maxHeight = max(maxHeight, height);
1078             }
1079 
1080             mBandwidthItems.push(item);
1081             if (mPlaylist->hasType(i, "video")) {
1082                 itemsWithVideo.push(item);
1083             }
1084         }
1085         // remove the audio-only variants if we have at least one with video
1086         if (!itemsWithVideo.empty()
1087                 && itemsWithVideo.size() < mBandwidthItems.size()) {
1088             mBandwidthItems.clear();
1089             for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1090                 mBandwidthItems.push(itemsWithVideo[i]);
1091             }
1092         }
1093 
1094         CHECK_GT(mBandwidthItems.size(), 0u);
1095         initialBandwidth = mBandwidthItems[0].mBandwidth;
1096 
1097         mBandwidthItems.sort(SortByBandwidth);
1098 
1099         for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1100             if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1101                 initialBandwidthIndex = i;
1102                 break;
1103             }
1104         }
1105     } else {
1106         // dummy item.
1107         BandwidthItem item;
1108         item.mPlaylistIndex = 0;
1109         item.mBandwidth = 0;
1110         mBandwidthItems.push(item);
1111     }
1112 
1113     mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1114     mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1115 
1116     mPlaylist->pickRandomMediaItems();
1117     changeConfiguration(
1118             0LL /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1119 }
1120 
finishDisconnect()1121 void LiveSession::finishDisconnect() {
1122     ALOGV("finishDisconnect");
1123 
1124     // No reconfiguration is currently pending, make sure none will trigger
1125     // during disconnection either.
1126     cancelBandwidthSwitch();
1127 
1128     // cancel buffer polling
1129     cancelPollBuffering();
1130 
1131     // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1132     //
1133     // Some fetchers might be stuck in connect/getSize at this point. These
1134     // operations will eventually timeout (as we have a timeout set in
1135     // MediaHTTPConnection), but we don't want to block the main UI thread
1136     // until then. Here we just need to make sure we clear all references
1137     // to the fetchers, so that when they finally exit from the blocking
1138     // operation, they can be destructed.
1139     //
1140     // There is one very tricky point though. For this scheme to work, the
1141     // fecther must hold a reference to LiveSession, so that LiveSession is
1142     // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1143     // own destructor when it waits for mFetcherLooper to stop, which still
1144     // blocks main UI thread.
1145     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1146         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1147         mFetcherLooper->unregisterHandler(
1148                 mFetcherInfos.valueAt(i).mFetcher->id());
1149     }
1150     mFetcherInfos.clear();
1151 
1152     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1153     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1154 
1155     mPacketSources.valueFor(
1156             STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1157 
1158     sp<AMessage> response = new AMessage;
1159     response->setInt32("err", OK);
1160 
1161     response->postReply(mDisconnectReplyID);
1162     mDisconnectReplyID.clear();
1163 }
1164 
addFetcher(const char * uri)1165 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1166     ssize_t index = mFetcherInfos.indexOfKey(uri);
1167 
1168     if (index >= 0) {
1169         return NULL;
1170     }
1171 
1172     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1173     notify->setString("uri", uri);
1174     notify->setInt32("switchGeneration", mSwitchGeneration);
1175 
1176     FetcherInfo info;
1177     info.mFetcher = new PlaylistFetcher(
1178             notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1179     info.mDurationUs = -1LL;
1180     info.mToBeRemoved = false;
1181     info.mToBeResumed = false;
1182     mFetcherLooper->registerHandler(info.mFetcher);
1183 
1184     mFetcherInfos.add(uri, info);
1185 
1186     return info.mFetcher;
1187 }
1188 
1189 #if 0
1190 static double uniformRand() {
1191     return (double)rand() / RAND_MAX;
1192 }
1193 #endif
1194 
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1195 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1196     ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1197             newUri ? "true" : "false",
1198             newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1199     return i >= 0
1200             && ((!newUri && uri == mStreams[i].mUri)
1201             || (newUri && uri == mStreams[i].mNewUri));
1202 }
1203 
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1204 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1205         size_t trackIndex, bool newUri) {
1206     StreamType type = indexToType(trackIndex);
1207     sp<AnotherPacketSource> source = NULL;
1208     if (newUri) {
1209         source = mPacketSources2.valueFor(type);
1210         source->clear();
1211     } else {
1212         source = mPacketSources.valueFor(type);
1213     };
1214     return source;
1215 }
1216 
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1217 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1218         sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1219     // todo: One case where the following strategy can fail is when audio and video
1220     // are in separate playlists, both are transport streams, and the metadata
1221     // is actually contained in the audio stream.
1222     ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1223             streamMask, newUri ? "true" : "false");
1224 
1225     if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1226             || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1227             // ... audio fetcher for audio only variant
1228         return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1229     }
1230 
1231     return NULL;
1232 }
1233 
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1234 bool LiveSession::resumeFetcher(
1235         const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1236     ssize_t index = mFetcherInfos.indexOfKey(uri);
1237     if (index < 0) {
1238         ALOGE("did not find fetcher for uri: %s", uriDebugString(uri).c_str());
1239         return false;
1240     }
1241 
1242     bool resume = false;
1243     sp<AnotherPacketSource> sources[kNumSources];
1244     for (size_t i = 0; i < kMaxStreams; ++i) {
1245         if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1246             resume = true;
1247             sources[i] = getPacketSourceForStreamIndex(i, newUri);
1248         }
1249     }
1250 
1251     if (resume) {
1252         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1253         SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1254 
1255         ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1256                 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1257 
1258         fetcher->startAsync(
1259                 sources[kAudioIndex],
1260                 sources[kVideoIndex],
1261                 sources[kSubtitleIndex],
1262                 getMetadataSource(sources, streamMask, newUri),
1263                 timeUs, -1, -1, seekMode);
1264     }
1265 
1266     return resume;
1267 }
1268 
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1269 float LiveSession::getAbortThreshold(
1270         ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1271     float abortThreshold = -1.0f;
1272     if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1273         /*
1274            If we're switching down, we need to decide whether to
1275 
1276            1) finish last segment of high-bandwidth variant, or
1277            2) abort last segment of high-bandwidth variant, and fetch an
1278               overlapping portion from low-bandwidth variant.
1279 
1280            Here we try to maximize the amount of buffer left when the
1281            switch point is met. Given the following parameters:
1282 
1283            B: our current buffering level in seconds
1284            T: target duration in seconds
1285            X: sample duration in seconds remain to fetch in last segment
1286            bw0: bandwidth of old variant (as specified in playlist)
1287            bw1: bandwidth of new variant (as specified in playlist)
1288            bw: measured bandwidth available
1289 
1290            If we choose 1), when switch happens at the end of current
1291            segment, our buffering will be
1292                   B + X - X * bw0 / bw
1293 
1294            If we choose 2), when switch happens where we aborted current
1295            segment, our buffering will be
1296                   B - (T - X) * bw1 / bw
1297 
1298            We should only choose 1) if
1299                   X/T < bw1 / (bw1 + bw0 - bw)
1300         */
1301 
1302         // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1303         // our estimate could be far off, and fetching old bandwidth could
1304         // take too long.
1305         if (!mLastBandwidthStable) {
1306             return 0.0f;
1307         }
1308 
1309         // Taking the measured current bandwidth at 50% face value only,
1310         // as our bandwidth estimation is a lagging indicator. Being
1311         // conservative on this, we prefer switching to lower bandwidth
1312         // unless we're really confident finishing up the last segment
1313         // of higher bandwidth will be fast.
1314         CHECK(mLastBandwidthBps >= 0);
1315         abortThreshold =
1316                 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1317              / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1318               + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1319               - (float)mLastBandwidthBps * 0.5f);
1320         if (abortThreshold < 0.0f) {
1321             abortThreshold = -1.0f; // do not abort
1322         }
1323         ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1324                 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1325                 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1326                 mLastBandwidthBps,
1327                 abortThreshold);
1328     }
1329     return abortThreshold;
1330 }
1331 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1332 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1333     mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1334 }
1335 
getLowestValidBandwidthIndex() const1336 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1337     for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1338         if (isBandwidthValid(mBandwidthItems[index])) {
1339             return index;
1340         }
1341     }
1342     // if playlists are all blacklisted, return 0 and hope it's alive
1343     return 0;
1344 }
1345 
getBandwidthIndex(int32_t bandwidthBps)1346 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1347     if (mBandwidthItems.size() < 2) {
1348         // shouldn't be here if we only have 1 bandwidth, check
1349         // logic to get rid of redundant bandwidth polling
1350         ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1351         return 0;
1352     }
1353 
1354 #if 1
1355     char value[PROPERTY_VALUE_MAX];
1356     ssize_t index = -1;
1357     if (property_get("media.httplive.bw-index", value, NULL)) {
1358         char *end;
1359         index = strtol(value, &end, 10);
1360         CHECK(end > value && *end == '\0');
1361 
1362         if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1363             index = mBandwidthItems.size() - 1;
1364         }
1365     }
1366 
1367     if (index < 0) {
1368         char value[PROPERTY_VALUE_MAX];
1369         if (property_get("media.httplive.max-bw", value, NULL)) {
1370             char *end;
1371             long maxBw = strtoul(value, &end, 10);
1372             if (end > value && *end == '\0') {
1373                 if (maxBw > 0 && bandwidthBps > maxBw) {
1374                     ALOGV("bandwidth capped to %ld bps", maxBw);
1375                     bandwidthBps = maxBw;
1376                 }
1377             }
1378         }
1379 
1380         // Pick the highest bandwidth stream that's not currently blacklisted
1381         // below or equal to estimated bandwidth.
1382 
1383         index = mBandwidthItems.size() - 1;
1384         ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1385         while (index > lowestBandwidth) {
1386             // be conservative (70%) to avoid overestimating and immediately
1387             // switching down again.
1388             size_t adjustedBandwidthBps = bandwidthBps * .7f;
1389             const BandwidthItem &item = mBandwidthItems[index];
1390             if (item.mBandwidth <= adjustedBandwidthBps
1391                     && isBandwidthValid(item)) {
1392                 break;
1393             }
1394             --index;
1395         }
1396     }
1397 #elif 0
1398     // Change bandwidth at random()
1399     size_t index = uniformRand() * mBandwidthItems.size();
1400 #elif 0
1401     // There's a 50% chance to stay on the current bandwidth and
1402     // a 50% chance to switch to the next higher bandwidth (wrapping around
1403     // to lowest)
1404     const size_t kMinIndex = 0;
1405 
1406     static ssize_t mCurBandwidthIndex = -1;
1407 
1408     size_t index;
1409     if (mCurBandwidthIndex < 0) {
1410         index = kMinIndex;
1411     } else if (uniformRand() < 0.5) {
1412         index = (size_t)mCurBandwidthIndex;
1413     } else {
1414         index = mCurBandwidthIndex + 1;
1415         if (index == mBandwidthItems.size()) {
1416             index = kMinIndex;
1417         }
1418     }
1419     mCurBandwidthIndex = index;
1420 #elif 0
1421     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1422 
1423     size_t index = mBandwidthItems.size() - 1;
1424     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1425         --index;
1426     }
1427 #elif 1
1428     char value[PROPERTY_VALUE_MAX];
1429     size_t index;
1430     if (property_get("media.httplive.bw-index", value, NULL)) {
1431         char *end;
1432         index = strtoul(value, &end, 10);
1433         CHECK(end > value && *end == '\0');
1434 
1435         if (index >= mBandwidthItems.size()) {
1436             index = mBandwidthItems.size() - 1;
1437         }
1438     } else {
1439         index = 0;
1440     }
1441 #else
1442     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1443 #endif
1444 
1445     CHECK_GE(index, 0);
1446 
1447     return index;
1448 }
1449 
latestMediaSegmentStartTime() const1450 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1451     HLSTime audioTime(mPacketSources.valueFor(
1452                     STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1453 
1454     HLSTime videoTime(mPacketSources.valueFor(
1455                     STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1456 
1457     return audioTime < videoTime ? videoTime : audioTime;
1458 }
1459 
onSeek(const sp<AMessage> & msg)1460 void LiveSession::onSeek(const sp<AMessage> &msg) {
1461     int64_t timeUs;
1462     int32_t mode;
1463     CHECK(msg->findInt64("timeUs", &timeUs));
1464     CHECK(msg->findInt32("mode", &mode));
1465     // TODO: add "mode" to changeConfiguration.
1466     changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */);
1467 }
1468 
getDuration(int64_t * durationUs) const1469 status_t LiveSession::getDuration(int64_t *durationUs) const {
1470     int64_t maxDurationUs = -1LL;
1471     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1472         int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1473 
1474         if (fetcherDurationUs > maxDurationUs) {
1475             maxDurationUs = fetcherDurationUs;
1476         }
1477     }
1478 
1479     *durationUs = maxDurationUs;
1480 
1481     return OK;
1482 }
1483 
isSeekable() const1484 bool LiveSession::isSeekable() const {
1485     int64_t durationUs;
1486     return getDuration(&durationUs) == OK && durationUs >= 0;
1487 }
1488 
hasDynamicDuration() const1489 bool LiveSession::hasDynamicDuration() const {
1490     return false;
1491 }
1492 
getTrackCount() const1493 size_t LiveSession::getTrackCount() const {
1494     if (mPlaylist == NULL) {
1495         return 0;
1496     } else {
1497         return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1498     }
1499 }
1500 
getTrackInfo(size_t trackIndex) const1501 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1502     if (mPlaylist == NULL) {
1503         return NULL;
1504     } else {
1505         if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1506             sp<AMessage> format = new AMessage();
1507             format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1508             format->setString("language", "und");
1509             format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1510             return format;
1511         }
1512         return mPlaylist->getTrackInfo(trackIndex);
1513     }
1514 }
1515 
selectTrack(size_t index,bool select)1516 status_t LiveSession::selectTrack(size_t index, bool select) {
1517     if (mPlaylist == NULL) {
1518         return INVALID_OPERATION;
1519     }
1520 
1521     ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1522             index, select, mSubtitleGeneration);
1523 
1524     ++mSubtitleGeneration;
1525     status_t err = mPlaylist->selectTrack(index, select);
1526     if (err == OK) {
1527         sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1528         msg->setInt32("pickTrack", select);
1529         msg->post();
1530     }
1531     return err;
1532 }
1533 
getSelectedTrack(media_track_type type) const1534 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1535     if (mPlaylist == NULL) {
1536         return -1;
1537     } else {
1538         return mPlaylist->getSelectedTrack(type);
1539     }
1540 }
1541 
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1542 void LiveSession::changeConfiguration(
1543         int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1544     ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1545           (long long)timeUs, bandwidthIndex, pickTrack);
1546 
1547     cancelBandwidthSwitch();
1548 
1549     CHECK(!mReconfigurationInProgress);
1550     mReconfigurationInProgress = true;
1551     if (bandwidthIndex >= 0) {
1552         mOrigBandwidthIndex = mCurBandwidthIndex;
1553         mCurBandwidthIndex = bandwidthIndex;
1554         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1555             ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1556                     mOrigBandwidthIndex, mCurBandwidthIndex);
1557         }
1558     }
1559     CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size());
1560     const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1561 
1562     uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1563     uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1564 
1565     AString URIs[kMaxStreams];
1566     for (size_t i = 0; i < kMaxStreams; ++i) {
1567         if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1568             streamMask |= indexToType(i);
1569         }
1570     }
1571 
1572     // Step 1, stop and discard fetchers that are no longer needed.
1573     // Pause those that we'll reuse.
1574     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1575         // skip fetchers that are marked mToBeRemoved,
1576         // these are done and can't be reused
1577         if (mFetcherInfos[i].mToBeRemoved) {
1578             continue;
1579         }
1580 
1581         const AString &uri = mFetcherInfos.keyAt(i);
1582         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1583 
1584         bool discardFetcher = true, delayRemoval = false;
1585         for (size_t j = 0; j < kMaxStreams; ++j) {
1586             StreamType type = indexToType(j);
1587             if ((streamMask & type) && uri == URIs[j]) {
1588                 resumeMask |= type;
1589                 streamMask &= ~type;
1590                 discardFetcher = false;
1591             }
1592         }
1593         // Delay fetcher removal if not picking tracks, AND old fetcher
1594         // has stream mask that overlaps new variant. (Okay to discard
1595         // old fetcher now, if completely no overlap.)
1596         if (discardFetcher && timeUs < 0LL && !pickTrack
1597                 && (fetcher->getStreamTypeMask() & streamMask)) {
1598             discardFetcher = false;
1599             delayRemoval = true;
1600         }
1601 
1602         if (discardFetcher) {
1603             ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1604             fetcher->stopAsync();
1605         } else {
1606             float threshold = 0.0f; // default to pause after current block (47Kbytes)
1607             bool disconnect = false;
1608             if (timeUs >= 0LL) {
1609                 // seeking, no need to finish fetching
1610                 disconnect = true;
1611             } else if (delayRemoval) {
1612                 // adapting, abort if remaining of current segment is over threshold
1613                 threshold = getAbortThreshold(
1614                         mOrigBandwidthIndex, mCurBandwidthIndex);
1615             }
1616 
1617             ALOGV("pausing fetcher-%d, threshold=%.2f",
1618                     fetcher->getFetcherID(), threshold);
1619             fetcher->pauseAsync(threshold, disconnect);
1620         }
1621     }
1622 
1623     sp<AMessage> msg;
1624     if (timeUs < 0LL) {
1625         // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1626         msg = new AMessage(kWhatChangeConfiguration3, this);
1627     } else {
1628         msg = new AMessage(kWhatChangeConfiguration2, this);
1629     }
1630     msg->setInt32("streamMask", streamMask);
1631     msg->setInt32("resumeMask", resumeMask);
1632     msg->setInt32("pickTrack", pickTrack);
1633     msg->setInt64("timeUs", timeUs);
1634     for (size_t i = 0; i < kMaxStreams; ++i) {
1635         if ((streamMask | resumeMask) & indexToType(i)) {
1636             msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1637         }
1638     }
1639 
1640     // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1641     // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1642     // fetchers have completed their asynchronous operation, we'll post
1643     // mContinuation, which then is handled below in onChangeConfiguration2.
1644     mContinuationCounter = mFetcherInfos.size();
1645     mContinuation = msg;
1646 
1647     if (mContinuationCounter == 0) {
1648         msg->post();
1649     }
1650 }
1651 
onChangeConfiguration(const sp<AMessage> & msg)1652 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1653     ALOGV("onChangeConfiguration");
1654 
1655     if (!mReconfigurationInProgress) {
1656         int32_t pickTrack = 0;
1657         msg->findInt32("pickTrack", &pickTrack);
1658         changeConfiguration(-1LL /* timeUs */, -1, pickTrack);
1659     } else {
1660         msg->post(1000000LL); // retry in 1 sec
1661     }
1662 }
1663 
onChangeConfiguration2(const sp<AMessage> & msg)1664 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1665     ALOGV("onChangeConfiguration2");
1666 
1667     mContinuation.clear();
1668 
1669     // All fetchers are either suspended or have been removed now.
1670 
1671     // If we're seeking, clear all packet sources before we report
1672     // seek complete, to prevent decoder from pulling stale data.
1673     int64_t timeUs;
1674     CHECK(msg->findInt64("timeUs", &timeUs));
1675 
1676     if (timeUs >= 0) {
1677         mLastSeekTimeUs = timeUs;
1678         mLastDequeuedTimeUs = timeUs;
1679 
1680         for (size_t i = 0; i < mPacketSources.size(); i++) {
1681             sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1682             sp<MetaData> format = packetSource->getFormat();
1683             packetSource->clear();
1684             // Set a tentative format here such that HTTPLiveSource will always have
1685             // a format available when NuPlayer queries. Without an available video
1686             // format when setting a surface NuPlayer might disable video decoding
1687             // altogether. The tentative format will be overwritten by the
1688             // authoritative (and possibly same) format once content from the new
1689             // position is dequeued.
1690             packetSource->setFormat(format);
1691         }
1692 
1693         for (size_t i = 0; i < kMaxStreams; ++i) {
1694             mStreams[i].reset();
1695         }
1696 
1697         mDiscontinuityOffsetTimesUs.clear();
1698         mDiscontinuityAbsStartTimesUs.clear();
1699 
1700         if (mSeekReplyID != NULL) {
1701             CHECK(mSeekReply != NULL);
1702             mSeekReply->setInt32("err", OK);
1703             mSeekReply->postReply(mSeekReplyID);
1704             mSeekReplyID.clear();
1705             mSeekReply.clear();
1706         }
1707 
1708         // restart buffer polling after seek becauese previous
1709         // buffering position is no longer valid.
1710         restartPollBuffering();
1711     }
1712 
1713     uint32_t streamMask, resumeMask;
1714     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1715     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1716 
1717     streamMask |= resumeMask;
1718 
1719     AString URIs[kMaxStreams];
1720     for (size_t i = 0; i < kMaxStreams; ++i) {
1721         if (streamMask & indexToType(i)) {
1722             const AString &uriKey = mStreams[i].uriKey();
1723             CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1724             ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1725         }
1726     }
1727 
1728     uint32_t changedMask = 0;
1729     for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1730         // stream URI could change even if onChangeConfiguration2 is only
1731         // used for seek. Seek could happen during a bw switch, in this
1732         // case bw switch will be cancelled, but the seekTo position will
1733         // fetch from the new URI.
1734         if ((mStreamMask & streamMask & indexToType(i))
1735                 && !mStreams[i].mUri.empty()
1736                 && !(URIs[i] == mStreams[i].mUri)) {
1737             ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1738                     mStreams[i].mUri.c_str(), URIs[i].c_str());
1739             sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1740             if (source->getLatestDequeuedMeta() != NULL) {
1741                 source->queueDiscontinuity(
1742                         ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1743             }
1744         }
1745         // Determine which decoders to shutdown on the player side,
1746         // a decoder has to be shutdown if its streamtype was active
1747         // before but now longer isn't.
1748         if ((mStreamMask & ~streamMask & indexToType(i))) {
1749             changedMask |= indexToType(i);
1750         }
1751     }
1752 
1753     if (changedMask == 0) {
1754         // If nothing changed as far as the audio/video decoders
1755         // are concerned we can proceed.
1756         onChangeConfiguration3(msg);
1757         return;
1758     }
1759 
1760     // Something changed, inform the player which will shutdown the
1761     // corresponding decoders and will post the reply once that's done.
1762     // Handling the reply will continue executing below in
1763     // onChangeConfiguration3.
1764     sp<AMessage> notify = mNotify->dup();
1765     notify->setInt32("what", kWhatStreamsChanged);
1766     notify->setInt32("changedMask", changedMask);
1767 
1768     msg->setWhat(kWhatChangeConfiguration3);
1769     msg->setTarget(this);
1770 
1771     notify->setMessage("reply", msg);
1772     notify->post();
1773 }
1774 
onChangeConfiguration3(const sp<AMessage> & msg)1775 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1776     mContinuation.clear();
1777     // All remaining fetchers are still suspended, the player has shutdown
1778     // any decoders that needed it.
1779 
1780     uint32_t streamMask, resumeMask;
1781     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1782     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1783 
1784     mNewStreamMask = streamMask | resumeMask;
1785 
1786     int64_t timeUs;
1787     int32_t pickTrack;
1788     bool switching = false;
1789     CHECK(msg->findInt64("timeUs", &timeUs));
1790     CHECK(msg->findInt32("pickTrack", &pickTrack));
1791 
1792     if (timeUs < 0LL) {
1793         if (!pickTrack) {
1794             // mSwapMask contains streams that are in both old and new variant,
1795             // (in mNewStreamMask & mStreamMask) but with different URIs
1796             // (not in resumeMask).
1797             // For example, old variant has video and audio in two separate
1798             // URIs, and new variant has only audio with unchanged URI. mSwapMask
1799             // should be 0 as there is nothing to swap. We only need to stop video,
1800             // and resume audio.
1801             mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1802             switching = (mSwapMask != 0);
1803         }
1804         mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1805     } else {
1806         mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1807     }
1808 
1809     ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1810             "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1811             (long long)timeUs, switching, pickTrack,
1812             mStreamMask, mNewStreamMask, mSwapMask);
1813 
1814     for (size_t i = 0; i < kMaxStreams; ++i) {
1815         if (streamMask & indexToType(i)) {
1816             if (switching) {
1817                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1818             } else {
1819                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1820             }
1821         }
1822     }
1823 
1824     // Of all existing fetchers:
1825     // * Resume fetchers that are still needed and assign them original packet sources.
1826     // * Mark otherwise unneeded fetchers for removal.
1827     ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1828     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1829         const AString &uri = mFetcherInfos.keyAt(i);
1830         if (!resumeFetcher(uri, resumeMask, timeUs)) {
1831             ALOGV("marking fetcher-%d to be removed",
1832                     mFetcherInfos[i].mFetcher->getFetcherID());
1833 
1834             mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1835         }
1836     }
1837 
1838     // streamMask now only contains the types that need a new fetcher created.
1839     if (streamMask != 0) {
1840         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1841     }
1842 
1843     // Find out when the original fetchers have buffered up to and start the new fetchers
1844     // at a later timestamp.
1845     for (size_t i = 0; i < kMaxStreams; i++) {
1846         if (!(indexToType(i) & streamMask)) {
1847             continue;
1848         }
1849 
1850         AString uri;
1851         uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1852 
1853         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1854         CHECK(fetcher != NULL);
1855 
1856         HLSTime startTime;
1857         SeekMode seekMode = kSeekModeExactPosition;
1858         sp<AnotherPacketSource> sources[kNumSources];
1859 
1860         if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1861             startTime = latestMediaSegmentStartTime();
1862         }
1863 
1864         // TRICKY: looping from i as earlier streams are already removed from streamMask
1865         for (size_t j = i; j < kMaxStreams; ++j) {
1866             const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1867             if ((streamMask & indexToType(j)) && uri == streamUri) {
1868                 sources[j] = mPacketSources.valueFor(indexToType(j));
1869 
1870                 if (timeUs >= 0) {
1871                     startTime.mTimeUs = timeUs;
1872                 } else {
1873                     int32_t type;
1874                     sp<AMessage> meta;
1875                     if (!switching) {
1876                         // selecting, or adapting but no swap required
1877                         meta = sources[j]->getLatestDequeuedMeta();
1878                     } else {
1879                         // adapting and swap required
1880                         meta = sources[j]->getLatestEnqueuedMeta();
1881                         if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1882                             // switching up
1883                             meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1884                         }
1885                     }
1886 
1887                     if ((j == kAudioIndex || j == kVideoIndex)
1888                             && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1889                         HLSTime tmpTime(meta);
1890                         if (startTime < tmpTime) {
1891                             startTime = tmpTime;
1892                         }
1893                     }
1894 
1895                     if (!switching) {
1896                         // selecting, or adapting but no swap required
1897                         sources[j]->clear();
1898                         if (j == kSubtitleIndex) {
1899                             break;
1900                         }
1901 
1902                         ALOGV("stream[%zu]: queue format change", j);
1903                         sources[j]->queueDiscontinuity(
1904                                 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1905                     } else {
1906                         // switching, queue discontinuities after resume
1907                         sources[j] = mPacketSources2.valueFor(indexToType(j));
1908                         sources[j]->clear();
1909                         // the new fetcher might be providing streams that used to be
1910                         // provided by two different fetchers,  if one of the fetcher
1911                         // paused in the middle while the other somehow paused in next
1912                         // seg, we have to start from next seg.
1913                         if (seekMode < mStreams[j].mSeekMode) {
1914                             seekMode = mStreams[j].mSeekMode;
1915                         }
1916                     }
1917                 }
1918 
1919                 streamMask &= ~indexToType(j);
1920             }
1921         }
1922 
1923         ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1924                 "segmentStartTimeUs %lld seekMode %d",
1925                 fetcher->getFetcherID(),
1926                 (long long)startTime.mTimeUs,
1927                 (long long)mLastSeekTimeUs,
1928                 (long long)startTime.getSegmentTimeUs(),
1929                 seekMode);
1930 
1931         // Set the target segment start time to the middle point of the
1932         // segment where the last sample was.
1933         // This gives a better guess if segments of the two variants are not
1934         // perfectly aligned. (If the corresponding segment in new variant
1935         // starts slightly later than that in the old variant, we still want
1936         // to pick that segment, not the one before)
1937         fetcher->startAsync(
1938                 sources[kAudioIndex],
1939                 sources[kVideoIndex],
1940                 sources[kSubtitleIndex],
1941                 getMetadataSource(sources, mNewStreamMask, switching),
1942                 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1943                 startTime.getSegmentTimeUs(),
1944                 startTime.mSeq,
1945                 seekMode);
1946     }
1947 
1948     // All fetchers have now been started, the configuration change
1949     // has completed.
1950 
1951     mReconfigurationInProgress = false;
1952     if (switching) {
1953         mSwitchInProgress = true;
1954     } else {
1955         mStreamMask = mNewStreamMask;
1956         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1957             ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1958                     mOrigBandwidthIndex, mCurBandwidthIndex);
1959             mOrigBandwidthIndex = mCurBandwidthIndex;
1960         }
1961     }
1962 
1963     ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1964             mSwitchInProgress, mStreamMask);
1965 
1966     if (mDisconnectReplyID != NULL) {
1967         finishDisconnect();
1968     }
1969 }
1970 
swapPacketSource(StreamType stream)1971 void LiveSession::swapPacketSource(StreamType stream) {
1972     ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1973 
1974     // transfer packets from source2 to source
1975     sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1976     sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1977 
1978     // queue discontinuity in mPacketSource
1979     aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1980 
1981     // queue packets in mPacketSource2 to mPacketSource
1982     status_t finalResult = OK;
1983     sp<ABuffer> accessUnit;
1984     while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1985           OK == aps2->dequeueAccessUnit(&accessUnit)) {
1986         aps->queueAccessUnit(accessUnit);
1987     }
1988     aps2->clear();
1989 }
1990 
tryToFinishBandwidthSwitch(const AString & oldUri)1991 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1992     if (!mSwitchInProgress) {
1993         return;
1994     }
1995 
1996     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1997     if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1998         return;
1999     }
2000 
2001     // Swap packet source of streams provided by old variant
2002     for (size_t idx = 0; idx < kMaxStreams; idx++) {
2003         StreamType stream = indexToType(idx);
2004         if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
2005             swapPacketSource(stream);
2006 
2007             if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
2008                 ALOGW("swapping stream type %d %s to empty stream",
2009                         stream, uriDebugString(mStreams[idx].mUri).c_str());
2010             }
2011             mStreams[idx].mUri = mStreams[idx].mNewUri;
2012             mStreams[idx].mNewUri.clear();
2013 
2014             mSwapMask &= ~stream;
2015         }
2016     }
2017 
2018     mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
2019 
2020     ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2021     if (mSwapMask != 0) {
2022         return;
2023     }
2024 
2025     // Check if new variant contains extra streams.
2026     uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2027     while (extraStreams) {
2028         StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2029         extraStreams &= ~stream;
2030 
2031         swapPacketSource(stream);
2032 
2033         ssize_t idx = typeToIndex(stream);
2034         CHECK(idx >= 0);
2035         if (mStreams[idx].mNewUri.empty()) {
2036             ALOGW("swapping extra stream type %d %s to empty stream",
2037                     stream, uriDebugString(mStreams[idx].mUri).c_str());
2038         }
2039         mStreams[idx].mUri = mStreams[idx].mNewUri;
2040         mStreams[idx].mNewUri.clear();
2041     }
2042 
2043     // Restart new fetcher (it was paused after the first 47k block)
2044     // and let it fetch into mPacketSources (not mPacketSources2)
2045     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2046         FetcherInfo &info = mFetcherInfos.editValueAt(i);
2047         if (info.mToBeResumed) {
2048             resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2049             info.mToBeResumed = false;
2050         }
2051     }
2052 
2053     ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2054             mOrigBandwidthIndex, mCurBandwidthIndex);
2055 
2056     mStreamMask = mNewStreamMask;
2057     mSwitchInProgress = false;
2058     mOrigBandwidthIndex = mCurBandwidthIndex;
2059 
2060     restartPollBuffering();
2061 }
2062 
schedulePollBuffering()2063 void LiveSession::schedulePollBuffering() {
2064     sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2065     msg->setInt32("generation", mPollBufferingGeneration);
2066     msg->post(1000000LL);
2067 }
2068 
cancelPollBuffering()2069 void LiveSession::cancelPollBuffering() {
2070     ++mPollBufferingGeneration;
2071     mPrevBufferPercentage = -1;
2072 }
2073 
restartPollBuffering()2074 void LiveSession::restartPollBuffering() {
2075     cancelPollBuffering();
2076     onPollBuffering();
2077 }
2078 
onPollBuffering()2079 void LiveSession::onPollBuffering() {
2080     ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2081             "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2082         mSwitchInProgress, mReconfigurationInProgress,
2083         mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2084 
2085     bool underflow, ready, down, up;
2086     if (checkBuffering(underflow, ready, down, up)) {
2087         if (mInPreparationPhase) {
2088             // Allow down switch even if we're still preparing.
2089             //
2090             // Some streams have a high bandwidth index as default,
2091             // when bandwidth is low, it takes a long time to buffer
2092             // to ready mark, then it immediately pauses after start
2093             // as we have to do a down switch. It's better experience
2094             // to restart from a lower index, if we detect low bw.
2095             if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2096                 postPrepared(OK);
2097             }
2098         }
2099 
2100         if (!mInPreparationPhase) {
2101             if (ready) {
2102                 stopBufferingIfNecessary();
2103             } else if (underflow) {
2104                 startBufferingIfNecessary();
2105             }
2106             switchBandwidthIfNeeded(up, down);
2107         }
2108     }
2109 
2110     schedulePollBuffering();
2111 }
2112 
cancelBandwidthSwitch(bool resume)2113 void LiveSession::cancelBandwidthSwitch(bool resume) {
2114     ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2115             mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2116     if (!mSwitchInProgress) {
2117         return;
2118     }
2119 
2120     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2121         FetcherInfo& info = mFetcherInfos.editValueAt(i);
2122         if (info.mToBeRemoved) {
2123             info.mToBeRemoved = false;
2124             if (resume) {
2125                 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2126             }
2127         }
2128     }
2129 
2130     for (size_t i = 0; i < kMaxStreams; ++i) {
2131         AString newUri = mStreams[i].mNewUri;
2132         if (!newUri.empty()) {
2133             // clear all mNewUri matching this newUri
2134             for (size_t j = i; j < kMaxStreams; ++j) {
2135                 if (mStreams[j].mNewUri == newUri) {
2136                     mStreams[j].mNewUri.clear();
2137                 }
2138             }
2139             ALOGV("stopping newUri = %s", newUri.c_str());
2140             ssize_t index = mFetcherInfos.indexOfKey(newUri);
2141             if (index < 0) {
2142                 ALOGE("did not find fetcher for newUri: %s", uriDebugString(newUri).c_str());
2143                 continue;
2144             }
2145             FetcherInfo &info = mFetcherInfos.editValueAt(index);
2146             info.mToBeRemoved = true;
2147             info.mFetcher->stopAsync();
2148         }
2149     }
2150 
2151     ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2152             mOrigBandwidthIndex, mCurBandwidthIndex);
2153 
2154     mSwitchGeneration++;
2155     mSwitchInProgress = false;
2156     mCurBandwidthIndex = mOrigBandwidthIndex;
2157     mSwapMask = 0;
2158 }
2159 
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2160 bool LiveSession::checkBuffering(
2161         bool &underflow, bool &ready, bool &down, bool &up) {
2162     underflow = ready = down = up = false;
2163 
2164     if (mReconfigurationInProgress) {
2165         ALOGV("Switch/Reconfig in progress, defer buffer polling");
2166         return false;
2167     }
2168 
2169     size_t activeCount, underflowCount, readyCount, downCount, upCount;
2170     activeCount = underflowCount = readyCount = downCount = upCount =0;
2171     int32_t minBufferPercent = -1;
2172     int64_t durationUs;
2173     if (getDuration(&durationUs) != OK) {
2174         durationUs = -1;
2175     }
2176     for (size_t i = 0; i < mPacketSources.size(); ++i) {
2177         // we don't check subtitles for buffering level
2178         if (!(mStreamMask & mPacketSources.keyAt(i)
2179                 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2180             continue;
2181         }
2182         // ignore streams that never had any packet queued.
2183         // (it's possible that the variant only has audio or video)
2184         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2185         if (meta == NULL) {
2186             continue;
2187         }
2188 
2189         status_t finalResult;
2190         int64_t bufferedDurationUs =
2191                 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2192         ALOGV("[%s] buffered %lld us",
2193                 getNameForStream(mPacketSources.keyAt(i)),
2194                 (long long)bufferedDurationUs);
2195         if (durationUs >= 0) {
2196             int32_t percent;
2197             if (mPacketSources[i]->isFinished(0 /* duration */)) {
2198                 percent = 100;
2199             } else {
2200                 percent = (int32_t)(100.0 *
2201                         (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2202             }
2203             if (minBufferPercent < 0 || percent < minBufferPercent) {
2204                 minBufferPercent = percent;
2205             }
2206         }
2207 
2208         ++activeCount;
2209         int64_t readyMarkUs =
2210             (mInPreparationPhase ?
2211                 mBufferingSettings.mInitialMarkMs :
2212                 mBufferingSettings.mResumePlaybackMarkMs) * 1000LL;
2213         if (bufferedDurationUs > readyMarkUs
2214                 || mPacketSources[i]->isFinished(0)) {
2215             ++readyCount;
2216         }
2217         if (!mPacketSources[i]->isFinished(0)) {
2218             if (bufferedDurationUs < kUnderflowMarkMs * 1000LL) {
2219                 ++underflowCount;
2220             }
2221             if (bufferedDurationUs > mUpSwitchMark) {
2222                 ++upCount;
2223             }
2224             if (bufferedDurationUs < mDownSwitchMark) {
2225                 ++downCount;
2226             }
2227         }
2228     }
2229 
2230     if (minBufferPercent >= 0) {
2231         notifyBufferingUpdate(minBufferPercent);
2232     }
2233 
2234     if (activeCount > 0) {
2235         up        = (upCount == activeCount);
2236         down      = (downCount > 0);
2237         ready     = (readyCount == activeCount);
2238         underflow = (underflowCount > 0);
2239         return true;
2240     }
2241 
2242     return false;
2243 }
2244 
startBufferingIfNecessary()2245 void LiveSession::startBufferingIfNecessary() {
2246     ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2247             mInPreparationPhase, mBuffering);
2248     if (!mBuffering) {
2249         mBuffering = true;
2250 
2251         sp<AMessage> notify = mNotify->dup();
2252         notify->setInt32("what", kWhatBufferingStart);
2253         notify->post();
2254     }
2255 }
2256 
stopBufferingIfNecessary()2257 void LiveSession::stopBufferingIfNecessary() {
2258     ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2259             mInPreparationPhase, mBuffering);
2260 
2261     if (mBuffering) {
2262         mBuffering = false;
2263 
2264         sp<AMessage> notify = mNotify->dup();
2265         notify->setInt32("what", kWhatBufferingEnd);
2266         notify->post();
2267     }
2268 }
2269 
notifyBufferingUpdate(int32_t percentage)2270 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2271     if (percentage < mPrevBufferPercentage) {
2272         percentage = mPrevBufferPercentage;
2273     } else if (percentage > 100) {
2274         percentage = 100;
2275     }
2276 
2277     mPrevBufferPercentage = percentage;
2278 
2279     ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2280 
2281     sp<AMessage> notify = mNotify->dup();
2282     notify->setInt32("what", kWhatBufferingUpdate);
2283     notify->setInt32("percentage", percentage);
2284     notify->post();
2285 }
2286 
tryBandwidthFallback()2287 bool LiveSession::tryBandwidthFallback() {
2288     if (mInPreparationPhase || mReconfigurationInProgress) {
2289         // Don't try fallback during prepare or reconfig.
2290         // If error happens there, it's likely unrecoverable.
2291         return false;
2292     }
2293     if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2294         // if we're switching up, simply cancel and resume old variant
2295         cancelBandwidthSwitch(true /* resume */);
2296         return true;
2297     } else {
2298         // if we're switching down, we're likely about to underflow (if
2299         // not already underflowing). try the lowest viable bandwidth if
2300         // not on that variant already.
2301         ssize_t lowestValid = getLowestValidBandwidthIndex();
2302         if (mCurBandwidthIndex > lowestValid) {
2303             cancelBandwidthSwitch();
2304             changeConfiguration(-1LL, lowestValid);
2305             return true;
2306         }
2307     }
2308     // return false if we couldn't find any fallback
2309     return false;
2310 }
2311 
2312 /*
2313  * returns true if a bandwidth switch is actually needed (and started),
2314  * returns false otherwise
2315  */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2316 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2317     // no need to check bandwidth if we only have 1 bandwidth settings
2318     if (mBandwidthItems.size() < 2) {
2319         return false;
2320     }
2321 
2322     if (mSwitchInProgress) {
2323         if (mBuffering) {
2324             tryBandwidthFallback();
2325         }
2326         return false;
2327     }
2328 
2329     int32_t bandwidthBps, shortTermBps;
2330     bool isStable;
2331     if (mBandwidthEstimator->estimateBandwidth(
2332             &bandwidthBps, &isStable, &shortTermBps)) {
2333         ALOGV("bandwidth estimated at %.2f kbps, "
2334                 "stable %d, shortTermBps %.2f kbps",
2335                 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2336         mLastBandwidthBps = bandwidthBps;
2337         mLastBandwidthStable = isStable;
2338     } else {
2339         ALOGV("no bandwidth estimate.");
2340         return false;
2341     }
2342 
2343     int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2344     // canSwithDown and canSwitchUp can't both be true.
2345     // we only want to switch up when measured bw is 120% higher than current variant,
2346     // and we only want to switch down when measured bw is below current variant.
2347     bool canSwitchDown = bufferLow
2348             && (bandwidthBps < (int32_t)curBandwidth);
2349     bool canSwitchUp = bufferHigh
2350             && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2351 
2352     if (canSwitchDown || canSwitchUp) {
2353         // bandwidth estimating has some delay, if we have to downswitch when
2354         // it hasn't stabilized, use the short term to guess real bandwidth,
2355         // since it may be dropping too fast.
2356         // (note this doesn't apply to upswitch, always use longer average there)
2357         if (!isStable && canSwitchDown) {
2358             if (shortTermBps < bandwidthBps) {
2359                 bandwidthBps = shortTermBps;
2360             }
2361         }
2362 
2363         ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2364 
2365         // it's possible that we're checking for canSwitchUp case, but the returned
2366         // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2367         // of measured bw. In that case we don't want to do anything, since we have
2368         // both enough buffer and enough bw.
2369         if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2370          || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2371             // if not yet prepared, just restart again with new bw index.
2372             // this is faster and playback experience is cleaner.
2373             changeConfiguration(
2374                     mInPreparationPhase ? 0 : -1LL, bandwidthIndex);
2375             return true;
2376         }
2377     }
2378     return false;
2379 }
2380 
postError(status_t err)2381 void LiveSession::postError(status_t err) {
2382     // if we reached EOS, notify buffering of 100%
2383     if (err == ERROR_END_OF_STREAM) {
2384         notifyBufferingUpdate(100);
2385     }
2386     // we'll stop buffer polling now, before that notify
2387     // stop buffering to stop the spinning icon
2388     stopBufferingIfNecessary();
2389     cancelPollBuffering();
2390 
2391     sp<AMessage> notify = mNotify->dup();
2392     notify->setInt32("what", kWhatError);
2393     notify->setInt32("err", err);
2394     notify->post();
2395 }
2396 
postPrepared(status_t err)2397 void LiveSession::postPrepared(status_t err) {
2398     CHECK(mInPreparationPhase);
2399 
2400     sp<AMessage> notify = mNotify->dup();
2401     if (err == OK || err == ERROR_END_OF_STREAM) {
2402         notify->setInt32("what", kWhatPrepared);
2403     } else {
2404         cancelPollBuffering();
2405 
2406         notify->setInt32("what", kWhatPreparationFailed);
2407         notify->setInt32("err", err);
2408     }
2409 
2410     notify->post();
2411 
2412     mInPreparationPhase = false;
2413 }
2414 
2415 
2416 }  // namespace android
2417 
2418