1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "LiveSession"
19 #include <utils/Log.h>
20 
21 #include "LiveSession.h"
22 #include "HTTPDownloader.h"
23 #include "M3UParser.h"
24 #include "PlaylistFetcher.h"
25 
26 #include "mpeg2ts/AnotherPacketSource.h"
27 
28 #include <cutils/properties.h>
29 #include <media/IMediaHTTPService.h>
30 #include <media/stagefright/foundation/ABuffer.h>
31 #include <media/stagefright/foundation/ADebug.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/foundation/AUtils.h>
34 #include <media/stagefright/MediaDefs.h>
35 #include <media/stagefright/MetaData.h>
36 #include <media/stagefright/Utils.h>
37 
38 #include <utils/Mutex.h>
39 
40 #include <ctype.h>
41 #include <inttypes.h>
42 
43 namespace android {
44 
45 // static
46 // Bandwidth Switch Mark Defaults
47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll;
48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll;
49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll;
50 const int64_t LiveSession::kResumeThresholdUs = 100000ll;
51 
52 // Buffer Prepare/Ready/Underflow Marks
53 const int64_t LiveSession::kReadyMarkUs = 5000000ll;
54 const int64_t LiveSession::kPrepareMarkUs = 1500000ll;
55 const int64_t LiveSession::kUnderflowMarkUs = 1000000ll;
56 
57 struct LiveSession::BandwidthEstimator : public RefBase {
58     BandwidthEstimator();
59 
60     void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
61     bool estimateBandwidth(
62             int32_t *bandwidth,
63             bool *isStable = NULL,
64             int32_t *shortTermBps = NULL);
65 
66 private:
67     // Bandwidth estimation parameters
68     static const int32_t kShortTermBandwidthItems = 3;
69     static const int32_t kMinBandwidthHistoryItems = 20;
70     static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec
71     static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec
72     static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec
73 
74     struct BandwidthEntry {
75         int64_t mTimestampUs;
76         int64_t mDelayUs;
77         size_t mNumBytes;
78     };
79 
80     Mutex mLock;
81     List<BandwidthEntry> mBandwidthHistory;
82     List<int32_t> mPrevEstimates;
83     int32_t mShortTermEstimate;
84     bool mHasNewSample;
85     bool mIsStable;
86     int64_t mTotalTransferTimeUs;
87     size_t mTotalTransferBytes;
88 
89     DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
90 };
91 
BandwidthEstimator()92 LiveSession::BandwidthEstimator::BandwidthEstimator() :
93     mShortTermEstimate(0),
94     mHasNewSample(false),
95     mIsStable(true),
96     mTotalTransferTimeUs(0),
97     mTotalTransferBytes(0) {
98 }
99 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)100 void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
101         size_t numBytes, int64_t delayUs) {
102     AutoMutex autoLock(mLock);
103 
104     int64_t nowUs = ALooper::GetNowUs();
105     BandwidthEntry entry;
106     entry.mTimestampUs = nowUs;
107     entry.mDelayUs = delayUs;
108     entry.mNumBytes = numBytes;
109     mTotalTransferTimeUs += delayUs;
110     mTotalTransferBytes += numBytes;
111     mBandwidthHistory.push_back(entry);
112     mHasNewSample = true;
113 
114     // Remove no more than 10% of total transfer time at a time
115     // to avoid sudden jump on bandwidth estimation. There might
116     // be long blocking reads that takes up signification time,
117     // we have to keep a longer window in that case.
118     int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10;
119     if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) {
120         bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs;
121     } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) {
122         bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs;
123     }
124     // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
125     // and total transfer time at least kMaxBandwidthHistoryWindowUs.
126     while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) {
127         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
128         // remove sample if either absolute age or total transfer time is
129         // over kMaxBandwidthHistoryWindowUs
130         if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs &&
131                 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) {
132             break;
133         }
134         mTotalTransferTimeUs -= it->mDelayUs;
135         mTotalTransferBytes -= it->mNumBytes;
136         mBandwidthHistory.erase(mBandwidthHistory.begin());
137     }
138 }
139 
estimateBandwidth(int32_t * bandwidthBps,bool * isStable,int32_t * shortTermBps)140 bool LiveSession::BandwidthEstimator::estimateBandwidth(
141         int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) {
142     AutoMutex autoLock(mLock);
143 
144     if (mBandwidthHistory.size() < 2) {
145         return false;
146     }
147 
148     if (!mHasNewSample) {
149         *bandwidthBps = *(--mPrevEstimates.end());
150         if (isStable) {
151             *isStable = mIsStable;
152         }
153         if (shortTermBps) {
154             *shortTermBps = mShortTermEstimate;
155         }
156         return true;
157     }
158 
159     *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
160     mPrevEstimates.push_back(*bandwidthBps);
161     while (mPrevEstimates.size() > 3) {
162         mPrevEstimates.erase(mPrevEstimates.begin());
163     }
164     mHasNewSample = false;
165 
166     int64_t totalTimeUs = 0;
167     size_t totalBytes = 0;
168     if (mBandwidthHistory.size() >= kShortTermBandwidthItems) {
169         List<BandwidthEntry>::iterator it = --mBandwidthHistory.end();
170         for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) {
171             totalTimeUs += it->mDelayUs;
172             totalBytes += it->mNumBytes;
173         }
174     }
175     mShortTermEstimate = totalTimeUs > 0 ?
176             (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps;
177     if (shortTermBps) {
178         *shortTermBps = mShortTermEstimate;
179     }
180 
181     int32_t minEstimate = -1, maxEstimate = -1;
182     List<int32_t>::iterator it;
183     for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) {
184         int32_t estimate = *it;
185         if (minEstimate < 0 || minEstimate > estimate) {
186             minEstimate = estimate;
187         }
188         if (maxEstimate < 0 || maxEstimate < estimate) {
189             maxEstimate = estimate;
190         }
191     }
192     // consider it stable if long-term average is not jumping a lot
193     // and short-term average is not much lower than long-term average
194     mIsStable = (maxEstimate <= minEstimate * 4 / 3)
195             && mShortTermEstimate > minEstimate * 7 / 10;
196     if (isStable) {
197         *isStable = mIsStable;
198     }
199 
200 #if 0
201     {
202         char dumpStr[1024] = {0};
203         size_t itemIdx = 0;
204         size_t histSize = mBandwidthHistory.size();
205         sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {",
206             *bandwidthBps, mIsStable, histSize);
207         List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
208         for (; it != mBandwidthHistory.end(); ++it) {
209             if (itemIdx > 50) {
210                 sprintf(dumpStr + strlen(dumpStr),
211                         "...(%zd more items)... }", histSize - itemIdx);
212                 break;
213             }
214             sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s",
215                 it->mNumBytes / 1024,
216                 (double)it->mDelayUs * 1.0e-6,
217                 (it == (--mBandwidthHistory.end())) ? "}" : ", ");
218             itemIdx++;
219         }
220         ALOGE(dumpStr);
221     }
222 #endif
223     return true;
224 }
225 
226 //static
getKeyForStream(StreamType type)227 const char *LiveSession::getKeyForStream(StreamType type) {
228     switch (type) {
229         case STREAMTYPE_VIDEO:
230             return "timeUsVideo";
231         case STREAMTYPE_AUDIO:
232             return "timeUsAudio";
233         case STREAMTYPE_SUBTITLES:
234             return "timeUsSubtitle";
235         case STREAMTYPE_METADATA:
236             return "timeUsMetadata"; // unused
237         default:
238             TRESPASS();
239     }
240     return NULL;
241 }
242 
243 //static
getNameForStream(StreamType type)244 const char *LiveSession::getNameForStream(StreamType type) {
245     switch (type) {
246         case STREAMTYPE_VIDEO:
247             return "video";
248         case STREAMTYPE_AUDIO:
249             return "audio";
250         case STREAMTYPE_SUBTITLES:
251             return "subs";
252         case STREAMTYPE_METADATA:
253             return "metadata";
254         default:
255             break;
256     }
257     return "unknown";
258 }
259 
260 //static
getSourceTypeForStream(StreamType type)261 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) {
262     switch (type) {
263         case STREAMTYPE_VIDEO:
264             return ATSParser::VIDEO;
265         case STREAMTYPE_AUDIO:
266             return ATSParser::AUDIO;
267         case STREAMTYPE_METADATA:
268             return ATSParser::META;
269         case STREAMTYPE_SUBTITLES:
270         default:
271             TRESPASS();
272     }
273     return ATSParser::NUM_SOURCE_TYPES; // should not reach here
274 }
275 
LiveSession(const sp<AMessage> & notify,uint32_t flags,const sp<IMediaHTTPService> & httpService)276 LiveSession::LiveSession(
277         const sp<AMessage> &notify, uint32_t flags,
278         const sp<IMediaHTTPService> &httpService)
279     : mNotify(notify),
280       mFlags(flags),
281       mHTTPService(httpService),
282       mBuffering(false),
283       mInPreparationPhase(true),
284       mPollBufferingGeneration(0),
285       mPrevBufferPercentage(-1),
286       mCurBandwidthIndex(-1),
287       mOrigBandwidthIndex(-1),
288       mLastBandwidthBps(-1ll),
289       mLastBandwidthStable(false),
290       mBandwidthEstimator(new BandwidthEstimator()),
291       mMaxWidth(720),
292       mMaxHeight(480),
293       mStreamMask(0),
294       mNewStreamMask(0),
295       mSwapMask(0),
296       mSwitchGeneration(0),
297       mSubtitleGeneration(0),
298       mLastDequeuedTimeUs(0ll),
299       mRealTimeBaseUs(0ll),
300       mReconfigurationInProgress(false),
301       mSwitchInProgress(false),
302       mUpSwitchMark(kUpSwitchMarkUs),
303       mDownSwitchMark(kDownSwitchMarkUs),
304       mUpSwitchMargin(kUpSwitchMarginUs),
305       mFirstTimeUsValid(false),
306       mFirstTimeUs(0),
307       mLastSeekTimeUs(0),
308       mHasMetadata(false) {
309     mStreams[kAudioIndex] = StreamItem("audio");
310     mStreams[kVideoIndex] = StreamItem("video");
311     mStreams[kSubtitleIndex] = StreamItem("subtitles");
312 
313     for (size_t i = 0; i < kNumSources; ++i) {
314         mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
315         mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
316     }
317 }
318 
~LiveSession()319 LiveSession::~LiveSession() {
320     if (mFetcherLooper != NULL) {
321         mFetcherLooper->stop();
322     }
323 }
324 
calculateMediaTimeUs(int64_t firstTimeUs,int64_t timeUs,int32_t discontinuitySeq)325 int64_t LiveSession::calculateMediaTimeUs(
326         int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) {
327     if (timeUs >= firstTimeUs) {
328         timeUs -= firstTimeUs;
329     } else {
330         timeUs = 0;
331     }
332     timeUs += mLastSeekTimeUs;
333     if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
334         timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
335     }
336     return timeUs;
337 }
338 
dequeueAccessUnit(StreamType stream,sp<ABuffer> * accessUnit)339 status_t LiveSession::dequeueAccessUnit(
340         StreamType stream, sp<ABuffer> *accessUnit) {
341     status_t finalResult = OK;
342     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
343 
344     ssize_t streamIdx = typeToIndex(stream);
345     if (streamIdx < 0) {
346         return BAD_VALUE;
347     }
348     const char *streamStr = getNameForStream(stream);
349     // Do not let client pull data if we don't have data packets yet.
350     // We might only have a format discontinuity queued without data.
351     // When NuPlayerDecoder dequeues the format discontinuity, it will
352     // immediately try to getFormat. If we return NULL, NuPlayerDecoder
353     // thinks it can do seamless change, so will not shutdown decoder.
354     // When the actual format arrives, it can't handle it and get stuck.
355     if (!packetSource->hasDataBufferAvailable(&finalResult)) {
356         ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)",
357                 streamStr, finalResult);
358 
359         if (finalResult == OK) {
360             return -EAGAIN;
361         } else {
362             return finalResult;
363         }
364     }
365 
366     // Let the client dequeue as long as we have buffers available
367     // Do not make pause/resume decisions here.
368 
369     status_t err = packetSource->dequeueAccessUnit(accessUnit);
370 
371     if (err == INFO_DISCONTINUITY) {
372         // adaptive streaming, discontinuities in the playlist
373         int32_t type;
374         CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
375 
376         sp<AMessage> extra;
377         if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
378             extra.clear();
379         }
380 
381         ALOGI("[%s] read discontinuity of type %d, extra = %s",
382               streamStr,
383               type,
384               extra == NULL ? "NULL" : extra->debugString().c_str());
385     } else if (err == OK) {
386 
387         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
388             int64_t timeUs, originalTimeUs;
389             int32_t discontinuitySeq = 0;
390             StreamItem& strm = mStreams[streamIdx];
391             CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
392             originalTimeUs = timeUs;
393             (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
394             if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
395                 int64_t offsetTimeUs;
396                 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
397                     offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
398                 } else {
399                     offsetTimeUs = 0;
400                 }
401 
402                 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0
403                         && strm.mLastDequeuedTimeUs >= 0) {
404                     int64_t firstTimeUs;
405                     firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
406                     offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
407                     offsetTimeUs += strm.mLastSampleDurationUs;
408                 } else {
409                     offsetTimeUs += strm.mLastSampleDurationUs;
410                 }
411 
412                 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
413                 strm.mCurDiscontinuitySeq = discontinuitySeq;
414             }
415 
416             int32_t discard = 0;
417             int64_t firstTimeUs;
418             if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
419                 int64_t durUs; // approximate sample duration
420                 if (timeUs > strm.mLastDequeuedTimeUs) {
421                     durUs = timeUs - strm.mLastDequeuedTimeUs;
422                 } else {
423                     durUs = strm.mLastDequeuedTimeUs - timeUs;
424                 }
425                 strm.mLastSampleDurationUs = durUs;
426                 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
427             } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
428                 firstTimeUs = timeUs;
429             } else {
430                 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
431                 firstTimeUs = timeUs;
432             }
433 
434             strm.mLastDequeuedTimeUs = timeUs;
435             timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq);
436 
437             ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us",
438                     streamStr, (long long)timeUs, (long long)originalTimeUs);
439             (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
440             mLastDequeuedTimeUs = timeUs;
441             mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
442         } else if (stream == STREAMTYPE_SUBTITLES) {
443             int32_t subtitleGeneration;
444             if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
445                     && subtitleGeneration != mSubtitleGeneration) {
446                return -EAGAIN;
447             };
448             (*accessUnit)->meta()->setInt32(
449                     "trackIndex", mPlaylist->getSelectedIndex());
450             (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
451         } else if (stream == STREAMTYPE_METADATA) {
452             HLSTime mdTime((*accessUnit)->meta());
453             if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) {
454                 packetSource->requeueAccessUnit((*accessUnit));
455                 return -EAGAIN;
456             } else {
457                 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq);
458                 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq);
459                 (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
460                 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
461             }
462         }
463     } else {
464         ALOGI("[%s] encountered error %d", streamStr, err);
465     }
466 
467     return err;
468 }
469 
getStreamFormat(StreamType stream,sp<AMessage> * format)470 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
471     if (!(mStreamMask & stream)) {
472         return UNKNOWN_ERROR;
473     }
474 
475     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
476 
477     sp<MetaData> meta = packetSource->getFormat();
478 
479     if (meta == NULL) {
480         return -EAGAIN;
481     }
482 
483     if (stream == STREAMTYPE_AUDIO) {
484         // set AAC input buffer size to 32K bytes (256kbps x 1sec)
485         meta->setInt32(kKeyMaxInputSize, 32 * 1024);
486     } else if (stream == STREAMTYPE_VIDEO) {
487         meta->setInt32(kKeyMaxWidth, mMaxWidth);
488         meta->setInt32(kKeyMaxHeight, mMaxHeight);
489     }
490 
491     return convertMetaDataToMessage(meta, format);
492 }
493 
getHTTPDownloader()494 sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
495     return new HTTPDownloader(mHTTPService, mExtraHeaders);
496 }
497 
connectAsync(const char * url,const KeyedVector<String8,String8> * headers)498 void LiveSession::connectAsync(
499         const char *url, const KeyedVector<String8, String8> *headers) {
500     sp<AMessage> msg = new AMessage(kWhatConnect, this);
501     msg->setString("url", url);
502 
503     if (headers != NULL) {
504         msg->setPointer(
505                 "headers",
506                 new KeyedVector<String8, String8>(*headers));
507     }
508 
509     msg->post();
510 }
511 
disconnect()512 status_t LiveSession::disconnect() {
513     sp<AMessage> msg = new AMessage(kWhatDisconnect, this);
514 
515     sp<AMessage> response;
516     status_t err = msg->postAndAwaitResponse(&response);
517 
518     return err;
519 }
520 
seekTo(int64_t timeUs)521 status_t LiveSession::seekTo(int64_t timeUs) {
522     sp<AMessage> msg = new AMessage(kWhatSeek, this);
523     msg->setInt64("timeUs", timeUs);
524 
525     sp<AMessage> response;
526     status_t err = msg->postAndAwaitResponse(&response);
527 
528     return err;
529 }
530 
checkSwitchProgress(sp<AMessage> & stopParams,int64_t delayUs,bool * needResumeUntil)531 bool LiveSession::checkSwitchProgress(
532         sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
533     AString newUri;
534     CHECK(stopParams->findString("uri", &newUri));
535 
536     *needResumeUntil = false;
537     sp<AMessage> firstNewMeta[kMaxStreams];
538     for (size_t i = 0; i < kMaxStreams; ++i) {
539         StreamType stream = indexToType(i);
540         if (!(mSwapMask & mNewStreamMask & stream)
541             || (mStreams[i].mNewUri != newUri)) {
542             continue;
543         }
544         if (stream == STREAMTYPE_SUBTITLES) {
545             continue;
546         }
547         sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
548 
549         // First, get latest dequeued meta, which is where the decoder is at.
550         // (when upswitching, we take the meta after a certain delay, so that
551         // the decoder is left with some cushion)
552         sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
553         if (delayUs > 0) {
554             lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
555             if (lastDequeueMeta == NULL) {
556                 // this means we don't have enough cushion, try again later
557                 ALOGV("[%s] up switching failed due to insufficient buffer",
558                         getNameForStream(stream));
559                 return false;
560             }
561         } else {
562             // It's okay for lastDequeueMeta to be NULL here, it means the
563             // decoder hasn't even started dequeueing
564             lastDequeueMeta = source->getLatestDequeuedMeta();
565         }
566         // Then, trim off packets at beginning of mPacketSources2 that's before
567         // the latest dequeued time. These samples are definitely too late.
568         firstNewMeta[i] = mPacketSources2.editValueAt(i)
569                             ->trimBuffersBeforeMeta(lastDequeueMeta);
570 
571         // Now firstNewMeta[i] is the first sample after the trim.
572         // If it's NULL, we failed because dequeue already past all samples
573         // in mPacketSource2, we have to try again.
574         if (firstNewMeta[i] == NULL) {
575             HLSTime dequeueTime(lastDequeueMeta);
576             ALOGV("[%s] dequeue time (%d, %lld) past start time",
577                     getNameForStream(stream),
578                     dequeueTime.mSeq, (long long) dequeueTime.mTimeUs);
579             return false;
580         }
581 
582         // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
583         // already fetched, and see if we need to resumeUntil
584         lastEnqueueMeta = source->getLatestEnqueuedMeta();
585         // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
586         // boundary, no need to resume as the content will look different anyways
587         if (lastEnqueueMeta != NULL) {
588             HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]);
589 
590             // no need to resume old fetcher if new fetcher started in different
591             // discontinuity sequence, as the content will look different.
592             *needResumeUntil |= (startTime.mSeq == lastTime.mSeq
593                     && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs);
594 
595             // update the stopTime for resumeUntil
596             stopParams->setInt32("discontinuitySeq", startTime.mSeq);
597             stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs);
598         }
599     }
600 
601     // if we're here, it means dequeue progress hasn't passed some samples in
602     // mPacketSource2, we can trim off the excess in mPacketSource.
603     // (old fetcher might still need to resumeUntil the start time of new fetcher)
604     for (size_t i = 0; i < kMaxStreams; ++i) {
605         StreamType stream = indexToType(i);
606         if (!(mSwapMask & mNewStreamMask & stream)
607             || (newUri != mStreams[i].mNewUri)
608             || stream == STREAMTYPE_SUBTITLES) {
609             continue;
610         }
611         mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]);
612     }
613 
614     // no resumeUntil if already underflow
615     *needResumeUntil &= !mBuffering;
616 
617     return true;
618 }
619 
onMessageReceived(const sp<AMessage> & msg)620 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
621     switch (msg->what()) {
622         case kWhatConnect:
623         {
624             onConnect(msg);
625             break;
626         }
627 
628         case kWhatDisconnect:
629         {
630             CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
631 
632             if (mReconfigurationInProgress) {
633                 break;
634             }
635 
636             finishDisconnect();
637             break;
638         }
639 
640         case kWhatSeek:
641         {
642             if (mReconfigurationInProgress) {
643                 msg->post(50000);
644                 break;
645             }
646 
647             CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
648             mSeekReply = new AMessage;
649 
650             onSeek(msg);
651             break;
652         }
653 
654         case kWhatFetcherNotify:
655         {
656             int32_t what;
657             CHECK(msg->findInt32("what", &what));
658 
659             switch (what) {
660                 case PlaylistFetcher::kWhatStarted:
661                     break;
662                 case PlaylistFetcher::kWhatPaused:
663                 case PlaylistFetcher::kWhatStopped:
664                 {
665                     AString uri;
666                     CHECK(msg->findString("uri", &uri));
667                     ssize_t index = mFetcherInfos.indexOfKey(uri);
668                     if (index < 0) {
669                         // ignore msgs from fetchers that's already gone
670                         break;
671                     }
672 
673                     ALOGV("fetcher-%d %s",
674                             mFetcherInfos[index].mFetcher->getFetcherID(),
675                             what == PlaylistFetcher::kWhatPaused ?
676                                     "paused" : "stopped");
677 
678                     if (what == PlaylistFetcher::kWhatStopped) {
679                         mFetcherLooper->unregisterHandler(
680                                 mFetcherInfos[index].mFetcher->id());
681                         mFetcherInfos.removeItemsAt(index);
682                     } else if (what == PlaylistFetcher::kWhatPaused) {
683                         int32_t seekMode;
684                         CHECK(msg->findInt32("seekMode", &seekMode));
685                         for (size_t i = 0; i < kMaxStreams; ++i) {
686                             if (mStreams[i].mUri == uri) {
687                                 mStreams[i].mSeekMode = (SeekMode) seekMode;
688                             }
689                         }
690                     }
691 
692                     if (mContinuation != NULL) {
693                         CHECK_GT(mContinuationCounter, 0);
694                         if (--mContinuationCounter == 0) {
695                             mContinuation->post();
696                         }
697                         ALOGV("%zu fetcher(s) left", mContinuationCounter);
698                     }
699                     break;
700                 }
701 
702                 case PlaylistFetcher::kWhatDurationUpdate:
703                 {
704                     AString uri;
705                     CHECK(msg->findString("uri", &uri));
706 
707                     int64_t durationUs;
708                     CHECK(msg->findInt64("durationUs", &durationUs));
709 
710                     ssize_t index = mFetcherInfos.indexOfKey(uri);
711                     if (index >= 0) {
712                         FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
713                         info->mDurationUs = durationUs;
714                     }
715                     break;
716                 }
717 
718                 case PlaylistFetcher::kWhatTargetDurationUpdate:
719                 {
720                     int64_t targetDurationUs;
721                     CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
722                     mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4);
723                     mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4);
724                     mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs);
725                     break;
726                 }
727 
728                 case PlaylistFetcher::kWhatError:
729                 {
730                     status_t err;
731                     CHECK(msg->findInt32("err", &err));
732 
733                     ALOGE("XXX Received error %d from PlaylistFetcher.", err);
734 
735                     // handle EOS on subtitle tracks independently
736                     AString uri;
737                     if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
738                         ssize_t i = mFetcherInfos.indexOfKey(uri);
739                         if (i >= 0) {
740                             const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
741                             if (fetcher != NULL) {
742                                 uint32_t type = fetcher->getStreamTypeMask();
743                                 if (type == STREAMTYPE_SUBTITLES) {
744                                     mPacketSources.valueFor(
745                                             STREAMTYPE_SUBTITLES)->signalEOS(err);;
746                                     break;
747                                 }
748                             }
749                         }
750                     }
751 
752                     // remember the failure index (as mCurBandwidthIndex will be restored
753                     // after cancelBandwidthSwitch()), and record last fail time
754                     size_t failureIndex = mCurBandwidthIndex;
755                     mBandwidthItems.editItemAt(
756                             failureIndex).mLastFailureUs = ALooper::GetNowUs();
757 
758                     if (mSwitchInProgress) {
759                         // if error happened when we switch to a variant, try fallback
760                         // to other variant to save the session
761                         if (tryBandwidthFallback()) {
762                             break;
763                         }
764                     }
765 
766                     if (mInPreparationPhase) {
767                         postPrepared(err);
768                     }
769 
770                     cancelBandwidthSwitch();
771 
772                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
773 
774                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
775 
776                     mPacketSources.valueFor(
777                             STREAMTYPE_SUBTITLES)->signalEOS(err);
778 
779                     postError(err);
780                     break;
781                 }
782 
783                 case PlaylistFetcher::kWhatStopReached:
784                 {
785                     ALOGV("kWhatStopReached");
786 
787                     AString oldUri;
788                     CHECK(msg->findString("uri", &oldUri));
789 
790                     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
791                     if (index < 0) {
792                         break;
793                     }
794 
795                     tryToFinishBandwidthSwitch(oldUri);
796                     break;
797                 }
798 
799                 case PlaylistFetcher::kWhatStartedAt:
800                 {
801                     int32_t switchGeneration;
802                     CHECK(msg->findInt32("switchGeneration", &switchGeneration));
803 
804                     ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d",
805                             switchGeneration, mSwitchGeneration);
806 
807                     if (switchGeneration != mSwitchGeneration) {
808                         break;
809                     }
810 
811                     AString uri;
812                     CHECK(msg->findString("uri", &uri));
813 
814                     // mark new fetcher mToBeResumed
815                     ssize_t index = mFetcherInfos.indexOfKey(uri);
816                     if (index >= 0) {
817                         mFetcherInfos.editValueAt(index).mToBeResumed = true;
818                     }
819 
820                     // temporarily disable packet sources to be swapped to prevent
821                     // NuPlayerDecoder from dequeuing while we check progress
822                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
823                         if ((mSwapMask & mPacketSources.keyAt(i))
824                                 && uri == mStreams[i].mNewUri) {
825                             mPacketSources.editValueAt(i)->enable(false);
826                         }
827                     }
828                     bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
829                     // If switching up, require a cushion bigger than kUnderflowMark
830                     // to avoid buffering immediately after the switch.
831                     // (If we don't have that cushion we'd rather cancel and try again.)
832                     int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0;
833                     bool needResumeUntil = false;
834                     sp<AMessage> stopParams = msg;
835                     if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
836                         // playback time hasn't passed startAt time
837                         if (!needResumeUntil) {
838                             ALOGV("finish switch");
839                             for (size_t i = 0; i < kMaxStreams; ++i) {
840                                 if ((mSwapMask & indexToType(i))
841                                         && uri == mStreams[i].mNewUri) {
842                                     // have to make a copy of mStreams[i].mUri because
843                                     // tryToFinishBandwidthSwitch is modifying mStreams[]
844                                     AString oldURI = mStreams[i].mUri;
845                                     tryToFinishBandwidthSwitch(oldURI);
846                                     break;
847                                 }
848                             }
849                         } else {
850                             // startAt time is after last enqueue time
851                             // Resume fetcher for the original variant; the resumed fetcher should
852                             // continue until the timestamps found in msg, which is stored by the
853                             // new fetcher to indicate where the new variant has started buffering.
854                             ALOGV("finish switch with resumeUntilAsync");
855                             for (size_t i = 0; i < mFetcherInfos.size(); i++) {
856                                 const FetcherInfo &info = mFetcherInfos.valueAt(i);
857                                 if (info.mToBeRemoved) {
858                                     info.mFetcher->resumeUntilAsync(stopParams);
859                                 }
860                             }
861                         }
862                     } else {
863                         // playback time passed startAt time
864                         if (switchUp) {
865                             // if switching up, cancel and retry if condition satisfies again
866                             ALOGV("cancel up switch because we're too late");
867                             cancelBandwidthSwitch(true /* resume */);
868                         } else {
869                             ALOGV("retry down switch at next sample");
870                             resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
871                         }
872                     }
873                     // re-enable all packet sources
874                     for (size_t i = 0; i < mPacketSources.size(); ++i) {
875                         mPacketSources.editValueAt(i)->enable(true);
876                     }
877 
878                     break;
879                 }
880 
881                 case PlaylistFetcher::kWhatPlaylistFetched:
882                 {
883                     onMasterPlaylistFetched(msg);
884                     break;
885                 }
886 
887                 case PlaylistFetcher::kWhatMetadataDetected:
888                 {
889                     if (!mHasMetadata) {
890                         mHasMetadata = true;
891                         sp<AMessage> notify = mNotify->dup();
892                         notify->setInt32("what", kWhatMetadataDetected);
893                         notify->post();
894                     }
895                     break;
896                 }
897 
898                 default:
899                     TRESPASS();
900             }
901 
902             break;
903         }
904 
905         case kWhatChangeConfiguration:
906         {
907             onChangeConfiguration(msg);
908             break;
909         }
910 
911         case kWhatChangeConfiguration2:
912         {
913             onChangeConfiguration2(msg);
914             break;
915         }
916 
917         case kWhatChangeConfiguration3:
918         {
919             onChangeConfiguration3(msg);
920             break;
921         }
922 
923         case kWhatPollBuffering:
924         {
925             int32_t generation;
926             CHECK(msg->findInt32("generation", &generation));
927             if (generation == mPollBufferingGeneration) {
928                 onPollBuffering();
929             }
930             break;
931         }
932 
933         default:
934             TRESPASS();
935             break;
936     }
937 }
938 
939 // static
isBandwidthValid(const BandwidthItem & item)940 bool LiveSession::isBandwidthValid(const BandwidthItem &item) {
941     static const int64_t kBlacklistWindowUs = 300 * 1000000ll;
942     return item.mLastFailureUs < 0
943             || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs;
944 }
945 
946 // static
SortByBandwidth(const BandwidthItem * a,const BandwidthItem * b)947 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
948     if (a->mBandwidth < b->mBandwidth) {
949         return -1;
950     } else if (a->mBandwidth == b->mBandwidth) {
951         return 0;
952     }
953 
954     return 1;
955 }
956 
957 // static
indexToType(int idx)958 LiveSession::StreamType LiveSession::indexToType(int idx) {
959     CHECK(idx >= 0 && idx < kNumSources);
960     return (StreamType)(1 << idx);
961 }
962 
963 // static
typeToIndex(int32_t type)964 ssize_t LiveSession::typeToIndex(int32_t type) {
965     switch (type) {
966         case STREAMTYPE_AUDIO:
967             return 0;
968         case STREAMTYPE_VIDEO:
969             return 1;
970         case STREAMTYPE_SUBTITLES:
971             return 2;
972         case STREAMTYPE_METADATA:
973             return 3;
974         default:
975             return -1;
976     };
977     return -1;
978 }
979 
onConnect(const sp<AMessage> & msg)980 void LiveSession::onConnect(const sp<AMessage> &msg) {
981     CHECK(msg->findString("url", &mMasterURL));
982 
983     // TODO currently we don't know if we are coming here from incognito mode
984     ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
985 
986     KeyedVector<String8, String8> *headers = NULL;
987     if (!msg->findPointer("headers", (void **)&headers)) {
988         mExtraHeaders.clear();
989     } else {
990         mExtraHeaders = *headers;
991 
992         delete headers;
993         headers = NULL;
994     }
995 
996     // create looper for fetchers
997     if (mFetcherLooper == NULL) {
998         mFetcherLooper = new ALooper();
999 
1000         mFetcherLooper->setName("Fetcher");
1001         mFetcherLooper->start(false, false);
1002     }
1003 
1004     // create fetcher to fetch the master playlist
1005     addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
1006 }
1007 
onMasterPlaylistFetched(const sp<AMessage> & msg)1008 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
1009     AString uri;
1010     CHECK(msg->findString("uri", &uri));
1011     ssize_t index = mFetcherInfos.indexOfKey(uri);
1012     if (index < 0) {
1013         ALOGW("fetcher for master playlist is gone.");
1014         return;
1015     }
1016 
1017     // no longer useful, remove
1018     mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
1019     mFetcherInfos.removeItemsAt(index);
1020 
1021     CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
1022     if (mPlaylist == NULL) {
1023         ALOGE("unable to fetch master playlist %s.",
1024                 uriDebugString(mMasterURL).c_str());
1025 
1026         postPrepared(ERROR_IO);
1027         return;
1028     }
1029     // We trust the content provider to make a reasonable choice of preferred
1030     // initial bandwidth by listing it first in the variant playlist.
1031     // At startup we really don't have a good estimate on the available
1032     // network bandwidth since we haven't tranferred any data yet. Once
1033     // we have we can make a better informed choice.
1034     size_t initialBandwidth = 0;
1035     size_t initialBandwidthIndex = 0;
1036 
1037     int32_t maxWidth = 0;
1038     int32_t maxHeight = 0;
1039 
1040     if (mPlaylist->isVariantPlaylist()) {
1041         Vector<BandwidthItem> itemsWithVideo;
1042         for (size_t i = 0; i < mPlaylist->size(); ++i) {
1043             BandwidthItem item;
1044 
1045             item.mPlaylistIndex = i;
1046             item.mLastFailureUs = -1ll;
1047 
1048             sp<AMessage> meta;
1049             AString uri;
1050             mPlaylist->itemAt(i, &uri, &meta);
1051 
1052             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
1053 
1054             int32_t width, height;
1055             if (meta->findInt32("width", &width)) {
1056                 maxWidth = max(maxWidth, width);
1057             }
1058             if (meta->findInt32("height", &height)) {
1059                 maxHeight = max(maxHeight, height);
1060             }
1061 
1062             mBandwidthItems.push(item);
1063             if (mPlaylist->hasType(i, "video")) {
1064                 itemsWithVideo.push(item);
1065             }
1066         }
1067         // remove the audio-only variants if we have at least one with video
1068         if (!itemsWithVideo.empty()
1069                 && itemsWithVideo.size() < mBandwidthItems.size()) {
1070             mBandwidthItems.clear();
1071             for (size_t i = 0; i < itemsWithVideo.size(); ++i) {
1072                 mBandwidthItems.push(itemsWithVideo[i]);
1073             }
1074         }
1075 
1076         CHECK_GT(mBandwidthItems.size(), 0u);
1077         initialBandwidth = mBandwidthItems[0].mBandwidth;
1078 
1079         mBandwidthItems.sort(SortByBandwidth);
1080 
1081         for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
1082             if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
1083                 initialBandwidthIndex = i;
1084                 break;
1085             }
1086         }
1087     } else {
1088         // dummy item.
1089         BandwidthItem item;
1090         item.mPlaylistIndex = 0;
1091         item.mBandwidth = 0;
1092         mBandwidthItems.push(item);
1093     }
1094 
1095     mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth;
1096     mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight;
1097 
1098     mPlaylist->pickRandomMediaItems();
1099     changeConfiguration(
1100             0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
1101 }
1102 
finishDisconnect()1103 void LiveSession::finishDisconnect() {
1104     ALOGV("finishDisconnect");
1105 
1106     // No reconfiguration is currently pending, make sure none will trigger
1107     // during disconnection either.
1108     cancelBandwidthSwitch();
1109 
1110     // cancel buffer polling
1111     cancelPollBuffering();
1112 
1113     // TRICKY: don't wait for all fetcher to be stopped when disconnecting
1114     //
1115     // Some fetchers might be stuck in connect/getSize at this point. These
1116     // operations will eventually timeout (as we have a timeout set in
1117     // MediaHTTPConnection), but we don't want to block the main UI thread
1118     // until then. Here we just need to make sure we clear all references
1119     // to the fetchers, so that when they finally exit from the blocking
1120     // operation, they can be destructed.
1121     //
1122     // There is one very tricky point though. For this scheme to work, the
1123     // fecther must hold a reference to LiveSession, so that LiveSession is
1124     // destroyed after fetcher. Otherwise LiveSession would get stuck in its
1125     // own destructor when it waits for mFetcherLooper to stop, which still
1126     // blocks main UI thread.
1127     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1128         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
1129         mFetcherLooper->unregisterHandler(
1130                 mFetcherInfos.valueAt(i).mFetcher->id());
1131     }
1132     mFetcherInfos.clear();
1133 
1134     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
1135     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
1136 
1137     mPacketSources.valueFor(
1138             STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
1139 
1140     sp<AMessage> response = new AMessage;
1141     response->setInt32("err", OK);
1142 
1143     response->postReply(mDisconnectReplyID);
1144     mDisconnectReplyID.clear();
1145 }
1146 
addFetcher(const char * uri)1147 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
1148     ssize_t index = mFetcherInfos.indexOfKey(uri);
1149 
1150     if (index >= 0) {
1151         return NULL;
1152     }
1153 
1154     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this);
1155     notify->setString("uri", uri);
1156     notify->setInt32("switchGeneration", mSwitchGeneration);
1157 
1158     FetcherInfo info;
1159     info.mFetcher = new PlaylistFetcher(
1160             notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration);
1161     info.mDurationUs = -1ll;
1162     info.mToBeRemoved = false;
1163     info.mToBeResumed = false;
1164     mFetcherLooper->registerHandler(info.mFetcher);
1165 
1166     mFetcherInfos.add(uri, info);
1167 
1168     return info.mFetcher;
1169 }
1170 
1171 #if 0
1172 static double uniformRand() {
1173     return (double)rand() / RAND_MAX;
1174 }
1175 #endif
1176 
UriIsSameAsIndex(const AString & uri,int32_t i,bool newUri)1177 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) {
1178     ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i,
1179             newUri ? "true" : "false",
1180             newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str());
1181     return i >= 0
1182             && ((!newUri && uri == mStreams[i].mUri)
1183             || (newUri && uri == mStreams[i].mNewUri));
1184 }
1185 
getPacketSourceForStreamIndex(size_t trackIndex,bool newUri)1186 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex(
1187         size_t trackIndex, bool newUri) {
1188     StreamType type = indexToType(trackIndex);
1189     sp<AnotherPacketSource> source = NULL;
1190     if (newUri) {
1191         source = mPacketSources2.valueFor(type);
1192         source->clear();
1193     } else {
1194         source = mPacketSources.valueFor(type);
1195     };
1196     return source;
1197 }
1198 
getMetadataSource(sp<AnotherPacketSource> sources[kNumSources],uint32_t streamMask,bool newUri)1199 sp<AnotherPacketSource> LiveSession::getMetadataSource(
1200         sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) {
1201     // todo: One case where the following strategy can fail is when audio and video
1202     // are in separate playlists, both are transport streams, and the metadata
1203     // is actually contained in the audio stream.
1204     ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s",
1205             streamMask, newUri ? "true" : "false");
1206 
1207     if ((sources[kVideoIndex] != NULL) // video fetcher; or ...
1208             || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) {
1209             // ... audio fetcher for audio only variant
1210         return getPacketSourceForStreamIndex(kMetaDataIndex, newUri);
1211     }
1212 
1213     return NULL;
1214 }
1215 
resumeFetcher(const AString & uri,uint32_t streamMask,int64_t timeUs,bool newUri)1216 bool LiveSession::resumeFetcher(
1217         const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
1218     ssize_t index = mFetcherInfos.indexOfKey(uri);
1219     if (index < 0) {
1220         ALOGE("did not find fetcher for uri: %s", uri.c_str());
1221         return false;
1222     }
1223 
1224     bool resume = false;
1225     sp<AnotherPacketSource> sources[kNumSources];
1226     for (size_t i = 0; i < kMaxStreams; ++i) {
1227         if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) {
1228             resume = true;
1229             sources[i] = getPacketSourceForStreamIndex(i, newUri);
1230         }
1231     }
1232 
1233     if (resume) {
1234         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher;
1235         SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
1236 
1237         ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d",
1238                 fetcher->getFetcherID(), (long long)timeUs, seekMode);
1239 
1240         fetcher->startAsync(
1241                 sources[kAudioIndex],
1242                 sources[kVideoIndex],
1243                 sources[kSubtitleIndex],
1244                 getMetadataSource(sources, streamMask, newUri),
1245                 timeUs, -1, -1, seekMode);
1246     }
1247 
1248     return resume;
1249 }
1250 
getAbortThreshold(ssize_t currentBWIndex,ssize_t targetBWIndex) const1251 float LiveSession::getAbortThreshold(
1252         ssize_t currentBWIndex, ssize_t targetBWIndex) const {
1253     float abortThreshold = -1.0f;
1254     if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
1255         /*
1256            If we're switching down, we need to decide whether to
1257 
1258            1) finish last segment of high-bandwidth variant, or
1259            2) abort last segment of high-bandwidth variant, and fetch an
1260               overlapping portion from low-bandwidth variant.
1261 
1262            Here we try to maximize the amount of buffer left when the
1263            switch point is met. Given the following parameters:
1264 
1265            B: our current buffering level in seconds
1266            T: target duration in seconds
1267            X: sample duration in seconds remain to fetch in last segment
1268            bw0: bandwidth of old variant (as specified in playlist)
1269            bw1: bandwidth of new variant (as specified in playlist)
1270            bw: measured bandwidth available
1271 
1272            If we choose 1), when switch happens at the end of current
1273            segment, our buffering will be
1274                   B + X - X * bw0 / bw
1275 
1276            If we choose 2), when switch happens where we aborted current
1277            segment, our buffering will be
1278                   B - (T - X) * bw1 / bw
1279 
1280            We should only choose 1) if
1281                   X/T < bw1 / (bw1 + bw0 - bw)
1282         */
1283 
1284         // abort old bandwidth immediately if bandwidth is fluctuating a lot.
1285         // our estimate could be far off, and fetching old bandwidth could
1286         // take too long.
1287         if (!mLastBandwidthStable) {
1288             return 0.0f;
1289         }
1290 
1291         // Taking the measured current bandwidth at 50% face value only,
1292         // as our bandwidth estimation is a lagging indicator. Being
1293         // conservative on this, we prefer switching to lower bandwidth
1294         // unless we're really confident finishing up the last segment
1295         // of higher bandwidth will be fast.
1296         CHECK(mLastBandwidthBps >= 0);
1297         abortThreshold =
1298                 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1299              / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
1300               + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
1301               - (float)mLastBandwidthBps * 0.5f);
1302         if (abortThreshold < 0.0f) {
1303             abortThreshold = -1.0f; // do not abort
1304         }
1305         ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
1306                 mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
1307                 mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
1308                 mLastBandwidthBps,
1309                 abortThreshold);
1310     }
1311     return abortThreshold;
1312 }
1313 
addBandwidthMeasurement(size_t numBytes,int64_t delayUs)1314 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
1315     mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
1316 }
1317 
getLowestValidBandwidthIndex() const1318 ssize_t LiveSession::getLowestValidBandwidthIndex() const {
1319     for (size_t index = 0; index < mBandwidthItems.size(); index++) {
1320         if (isBandwidthValid(mBandwidthItems[index])) {
1321             return index;
1322         }
1323     }
1324     // if playlists are all blacklisted, return 0 and hope it's alive
1325     return 0;
1326 }
1327 
getBandwidthIndex(int32_t bandwidthBps)1328 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
1329     if (mBandwidthItems.size() < 2) {
1330         // shouldn't be here if we only have 1 bandwidth, check
1331         // logic to get rid of redundant bandwidth polling
1332         ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
1333         return 0;
1334     }
1335 
1336 #if 1
1337     char value[PROPERTY_VALUE_MAX];
1338     ssize_t index = -1;
1339     if (property_get("media.httplive.bw-index", value, NULL)) {
1340         char *end;
1341         index = strtol(value, &end, 10);
1342         CHECK(end > value && *end == '\0');
1343 
1344         if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
1345             index = mBandwidthItems.size() - 1;
1346         }
1347     }
1348 
1349     if (index < 0) {
1350         char value[PROPERTY_VALUE_MAX];
1351         if (property_get("media.httplive.max-bw", value, NULL)) {
1352             char *end;
1353             long maxBw = strtoul(value, &end, 10);
1354             if (end > value && *end == '\0') {
1355                 if (maxBw > 0 && bandwidthBps > maxBw) {
1356                     ALOGV("bandwidth capped to %ld bps", maxBw);
1357                     bandwidthBps = maxBw;
1358                 }
1359             }
1360         }
1361 
1362         // Pick the highest bandwidth stream that's not currently blacklisted
1363         // below or equal to estimated bandwidth.
1364 
1365         index = mBandwidthItems.size() - 1;
1366         ssize_t lowestBandwidth = getLowestValidBandwidthIndex();
1367         while (index > lowestBandwidth) {
1368             // be conservative (70%) to avoid overestimating and immediately
1369             // switching down again.
1370             size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
1371             const BandwidthItem &item = mBandwidthItems[index];
1372             if (item.mBandwidth <= adjustedBandwidthBps
1373                     && isBandwidthValid(item)) {
1374                 break;
1375             }
1376             --index;
1377         }
1378     }
1379 #elif 0
1380     // Change bandwidth at random()
1381     size_t index = uniformRand() * mBandwidthItems.size();
1382 #elif 0
1383     // There's a 50% chance to stay on the current bandwidth and
1384     // a 50% chance to switch to the next higher bandwidth (wrapping around
1385     // to lowest)
1386     const size_t kMinIndex = 0;
1387 
1388     static ssize_t mCurBandwidthIndex = -1;
1389 
1390     size_t index;
1391     if (mCurBandwidthIndex < 0) {
1392         index = kMinIndex;
1393     } else if (uniformRand() < 0.5) {
1394         index = (size_t)mCurBandwidthIndex;
1395     } else {
1396         index = mCurBandwidthIndex + 1;
1397         if (index == mBandwidthItems.size()) {
1398             index = kMinIndex;
1399         }
1400     }
1401     mCurBandwidthIndex = index;
1402 #elif 0
1403     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
1404 
1405     size_t index = mBandwidthItems.size() - 1;
1406     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
1407         --index;
1408     }
1409 #elif 1
1410     char value[PROPERTY_VALUE_MAX];
1411     size_t index;
1412     if (property_get("media.httplive.bw-index", value, NULL)) {
1413         char *end;
1414         index = strtoul(value, &end, 10);
1415         CHECK(end > value && *end == '\0');
1416 
1417         if (index >= mBandwidthItems.size()) {
1418             index = mBandwidthItems.size() - 1;
1419         }
1420     } else {
1421         index = 0;
1422     }
1423 #else
1424     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
1425 #endif
1426 
1427     CHECK_GE(index, 0);
1428 
1429     return index;
1430 }
1431 
latestMediaSegmentStartTime() const1432 HLSTime LiveSession::latestMediaSegmentStartTime() const {
1433     HLSTime audioTime(mPacketSources.valueFor(
1434                     STREAMTYPE_AUDIO)->getLatestDequeuedMeta());
1435 
1436     HLSTime videoTime(mPacketSources.valueFor(
1437                     STREAMTYPE_VIDEO)->getLatestDequeuedMeta());
1438 
1439     return audioTime < videoTime ? videoTime : audioTime;
1440 }
1441 
onSeek(const sp<AMessage> & msg)1442 void LiveSession::onSeek(const sp<AMessage> &msg) {
1443     int64_t timeUs;
1444     CHECK(msg->findInt64("timeUs", &timeUs));
1445     changeConfiguration(timeUs);
1446 }
1447 
getDuration(int64_t * durationUs) const1448 status_t LiveSession::getDuration(int64_t *durationUs) const {
1449     int64_t maxDurationUs = -1ll;
1450     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1451         int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
1452 
1453         if (fetcherDurationUs > maxDurationUs) {
1454             maxDurationUs = fetcherDurationUs;
1455         }
1456     }
1457 
1458     *durationUs = maxDurationUs;
1459 
1460     return OK;
1461 }
1462 
isSeekable() const1463 bool LiveSession::isSeekable() const {
1464     int64_t durationUs;
1465     return getDuration(&durationUs) == OK && durationUs >= 0;
1466 }
1467 
hasDynamicDuration() const1468 bool LiveSession::hasDynamicDuration() const {
1469     return false;
1470 }
1471 
getTrackCount() const1472 size_t LiveSession::getTrackCount() const {
1473     if (mPlaylist == NULL) {
1474         return 0;
1475     } else {
1476         return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0);
1477     }
1478 }
1479 
getTrackInfo(size_t trackIndex) const1480 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
1481     if (mPlaylist == NULL) {
1482         return NULL;
1483     } else {
1484         if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) {
1485             sp<AMessage> format = new AMessage();
1486             format->setInt32("type", MEDIA_TRACK_TYPE_METADATA);
1487             format->setString("language", "und");
1488             format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3);
1489             return format;
1490         }
1491         return mPlaylist->getTrackInfo(trackIndex);
1492     }
1493 }
1494 
selectTrack(size_t index,bool select)1495 status_t LiveSession::selectTrack(size_t index, bool select) {
1496     if (mPlaylist == NULL) {
1497         return INVALID_OPERATION;
1498     }
1499 
1500     ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++",
1501             index, select, mSubtitleGeneration);
1502 
1503     ++mSubtitleGeneration;
1504     status_t err = mPlaylist->selectTrack(index, select);
1505     if (err == OK) {
1506         sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
1507         msg->setInt32("pickTrack", select);
1508         msg->post();
1509     }
1510     return err;
1511 }
1512 
getSelectedTrack(media_track_type type) const1513 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
1514     if (mPlaylist == NULL) {
1515         return -1;
1516     } else {
1517         return mPlaylist->getSelectedTrack(type);
1518     }
1519 }
1520 
changeConfiguration(int64_t timeUs,ssize_t bandwidthIndex,bool pickTrack)1521 void LiveSession::changeConfiguration(
1522         int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
1523     ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d",
1524           (long long)timeUs, bandwidthIndex, pickTrack);
1525 
1526     cancelBandwidthSwitch();
1527 
1528     CHECK(!mReconfigurationInProgress);
1529     mReconfigurationInProgress = true;
1530     if (bandwidthIndex >= 0) {
1531         mOrigBandwidthIndex = mCurBandwidthIndex;
1532         mCurBandwidthIndex = bandwidthIndex;
1533         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1534             ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
1535                     mOrigBandwidthIndex, mCurBandwidthIndex);
1536         }
1537     }
1538     CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
1539     const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
1540 
1541     uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
1542     uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
1543 
1544     AString URIs[kMaxStreams];
1545     for (size_t i = 0; i < kMaxStreams; ++i) {
1546         if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
1547             streamMask |= indexToType(i);
1548         }
1549     }
1550 
1551     // Step 1, stop and discard fetchers that are no longer needed.
1552     // Pause those that we'll reuse.
1553     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1554         // skip fetchers that are marked mToBeRemoved,
1555         // these are done and can't be reused
1556         if (mFetcherInfos[i].mToBeRemoved) {
1557             continue;
1558         }
1559 
1560         const AString &uri = mFetcherInfos.keyAt(i);
1561         sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher;
1562 
1563         bool discardFetcher = true, delayRemoval = false;
1564         for (size_t j = 0; j < kMaxStreams; ++j) {
1565             StreamType type = indexToType(j);
1566             if ((streamMask & type) && uri == URIs[j]) {
1567                 resumeMask |= type;
1568                 streamMask &= ~type;
1569                 discardFetcher = false;
1570             }
1571         }
1572         // Delay fetcher removal if not picking tracks, AND old fetcher
1573         // has stream mask that overlaps new variant. (Okay to discard
1574         // old fetcher now, if completely no overlap.)
1575         if (discardFetcher && timeUs < 0ll && !pickTrack
1576                 && (fetcher->getStreamTypeMask() & streamMask)) {
1577             discardFetcher = false;
1578             delayRemoval = true;
1579         }
1580 
1581         if (discardFetcher) {
1582             ALOGV("discarding fetcher-%d", fetcher->getFetcherID());
1583             fetcher->stopAsync();
1584         } else {
1585             float threshold = 0.0f; // default to pause after current block (47Kbytes)
1586             bool disconnect = false;
1587             if (timeUs >= 0ll) {
1588                 // seeking, no need to finish fetching
1589                 disconnect = true;
1590             } else if (delayRemoval) {
1591                 // adapting, abort if remaining of current segment is over threshold
1592                 threshold = getAbortThreshold(
1593                         mOrigBandwidthIndex, mCurBandwidthIndex);
1594             }
1595 
1596             ALOGV("pausing fetcher-%d, threshold=%.2f",
1597                     fetcher->getFetcherID(), threshold);
1598             fetcher->pauseAsync(threshold, disconnect);
1599         }
1600     }
1601 
1602     sp<AMessage> msg;
1603     if (timeUs < 0ll) {
1604         // skip onChangeConfiguration2 (decoder destruction) if not seeking.
1605         msg = new AMessage(kWhatChangeConfiguration3, this);
1606     } else {
1607         msg = new AMessage(kWhatChangeConfiguration2, this);
1608     }
1609     msg->setInt32("streamMask", streamMask);
1610     msg->setInt32("resumeMask", resumeMask);
1611     msg->setInt32("pickTrack", pickTrack);
1612     msg->setInt64("timeUs", timeUs);
1613     for (size_t i = 0; i < kMaxStreams; ++i) {
1614         if ((streamMask | resumeMask) & indexToType(i)) {
1615             msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
1616         }
1617     }
1618 
1619     // Every time a fetcher acknowledges the stopAsync or pauseAsync request
1620     // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
1621     // fetchers have completed their asynchronous operation, we'll post
1622     // mContinuation, which then is handled below in onChangeConfiguration2.
1623     mContinuationCounter = mFetcherInfos.size();
1624     mContinuation = msg;
1625 
1626     if (mContinuationCounter == 0) {
1627         msg->post();
1628     }
1629 }
1630 
onChangeConfiguration(const sp<AMessage> & msg)1631 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
1632     ALOGV("onChangeConfiguration");
1633 
1634     if (!mReconfigurationInProgress) {
1635         int32_t pickTrack = 0;
1636         msg->findInt32("pickTrack", &pickTrack);
1637         changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
1638     } else {
1639         msg->post(1000000ll); // retry in 1 sec
1640     }
1641 }
1642 
onChangeConfiguration2(const sp<AMessage> & msg)1643 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
1644     ALOGV("onChangeConfiguration2");
1645 
1646     mContinuation.clear();
1647 
1648     // All fetchers are either suspended or have been removed now.
1649 
1650     // If we're seeking, clear all packet sources before we report
1651     // seek complete, to prevent decoder from pulling stale data.
1652     int64_t timeUs;
1653     CHECK(msg->findInt64("timeUs", &timeUs));
1654 
1655     if (timeUs >= 0) {
1656         mLastSeekTimeUs = timeUs;
1657         mLastDequeuedTimeUs = timeUs;
1658 
1659         for (size_t i = 0; i < mPacketSources.size(); i++) {
1660             sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i);
1661             sp<MetaData> format = packetSource->getFormat();
1662             packetSource->clear();
1663             // Set a tentative format here such that HTTPLiveSource will always have
1664             // a format available when NuPlayer queries. Without an available video
1665             // format when setting a surface NuPlayer might disable video decoding
1666             // altogether. The tentative format will be overwritten by the
1667             // authoritative (and possibly same) format once content from the new
1668             // position is dequeued.
1669             packetSource->setFormat(format);
1670         }
1671 
1672         for (size_t i = 0; i < kMaxStreams; ++i) {
1673             mStreams[i].reset();
1674         }
1675 
1676         mDiscontinuityOffsetTimesUs.clear();
1677         mDiscontinuityAbsStartTimesUs.clear();
1678 
1679         if (mSeekReplyID != NULL) {
1680             CHECK(mSeekReply != NULL);
1681             mSeekReply->setInt32("err", OK);
1682             mSeekReply->postReply(mSeekReplyID);
1683             mSeekReplyID.clear();
1684             mSeekReply.clear();
1685         }
1686 
1687         // restart buffer polling after seek becauese previous
1688         // buffering position is no longer valid.
1689         restartPollBuffering();
1690     }
1691 
1692     uint32_t streamMask, resumeMask;
1693     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1694     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1695 
1696     streamMask |= resumeMask;
1697 
1698     AString URIs[kMaxStreams];
1699     for (size_t i = 0; i < kMaxStreams; ++i) {
1700         if (streamMask & indexToType(i)) {
1701             const AString &uriKey = mStreams[i].uriKey();
1702             CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
1703             ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
1704         }
1705     }
1706 
1707     uint32_t changedMask = 0;
1708     for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
1709         // stream URI could change even if onChangeConfiguration2 is only
1710         // used for seek. Seek could happen during a bw switch, in this
1711         // case bw switch will be cancelled, but the seekTo position will
1712         // fetch from the new URI.
1713         if ((mStreamMask & streamMask & indexToType(i))
1714                 && !mStreams[i].mUri.empty()
1715                 && !(URIs[i] == mStreams[i].mUri)) {
1716             ALOGV("stream %zu changed: oldURI %s, newURI %s", i,
1717                     mStreams[i].mUri.c_str(), URIs[i].c_str());
1718             sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
1719             if (source->getLatestDequeuedMeta() != NULL) {
1720                 source->queueDiscontinuity(
1721                         ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
1722             }
1723         }
1724         // Determine which decoders to shutdown on the player side,
1725         // a decoder has to be shutdown if its streamtype was active
1726         // before but now longer isn't.
1727         if ((mStreamMask & ~streamMask & indexToType(i))) {
1728             changedMask |= indexToType(i);
1729         }
1730     }
1731 
1732     if (changedMask == 0) {
1733         // If nothing changed as far as the audio/video decoders
1734         // are concerned we can proceed.
1735         onChangeConfiguration3(msg);
1736         return;
1737     }
1738 
1739     // Something changed, inform the player which will shutdown the
1740     // corresponding decoders and will post the reply once that's done.
1741     // Handling the reply will continue executing below in
1742     // onChangeConfiguration3.
1743     sp<AMessage> notify = mNotify->dup();
1744     notify->setInt32("what", kWhatStreamsChanged);
1745     notify->setInt32("changedMask", changedMask);
1746 
1747     msg->setWhat(kWhatChangeConfiguration3);
1748     msg->setTarget(this);
1749 
1750     notify->setMessage("reply", msg);
1751     notify->post();
1752 }
1753 
onChangeConfiguration3(const sp<AMessage> & msg)1754 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
1755     mContinuation.clear();
1756     // All remaining fetchers are still suspended, the player has shutdown
1757     // any decoders that needed it.
1758 
1759     uint32_t streamMask, resumeMask;
1760     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
1761     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
1762 
1763     mNewStreamMask = streamMask | resumeMask;
1764 
1765     int64_t timeUs;
1766     int32_t pickTrack;
1767     bool switching = false;
1768     CHECK(msg->findInt64("timeUs", &timeUs));
1769     CHECK(msg->findInt32("pickTrack", &pickTrack));
1770 
1771     if (timeUs < 0ll) {
1772         if (!pickTrack) {
1773             // mSwapMask contains streams that are in both old and new variant,
1774             // (in mNewStreamMask & mStreamMask) but with different URIs
1775             // (not in resumeMask).
1776             // For example, old variant has video and audio in two separate
1777             // URIs, and new variant has only audio with unchanged URI. mSwapMask
1778             // should be 0 as there is nothing to swap. We only need to stop video,
1779             // and resume audio.
1780             mSwapMask =  mNewStreamMask & mStreamMask & ~resumeMask;
1781             switching = (mSwapMask != 0);
1782         }
1783         mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
1784     } else {
1785         mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
1786     }
1787 
1788     ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, "
1789             "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x",
1790             (long long)timeUs, switching, pickTrack,
1791             mStreamMask, mNewStreamMask, mSwapMask);
1792 
1793     for (size_t i = 0; i < kMaxStreams; ++i) {
1794         if (streamMask & indexToType(i)) {
1795             if (switching) {
1796                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
1797             } else {
1798                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
1799             }
1800         }
1801     }
1802 
1803     // Of all existing fetchers:
1804     // * Resume fetchers that are still needed and assign them original packet sources.
1805     // * Mark otherwise unneeded fetchers for removal.
1806     ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
1807     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
1808         const AString &uri = mFetcherInfos.keyAt(i);
1809         if (!resumeFetcher(uri, resumeMask, timeUs)) {
1810             ALOGV("marking fetcher-%d to be removed",
1811                     mFetcherInfos[i].mFetcher->getFetcherID());
1812 
1813             mFetcherInfos.editValueAt(i).mToBeRemoved = true;
1814         }
1815     }
1816 
1817     // streamMask now only contains the types that need a new fetcher created.
1818     if (streamMask != 0) {
1819         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
1820     }
1821 
1822     // Find out when the original fetchers have buffered up to and start the new fetchers
1823     // at a later timestamp.
1824     for (size_t i = 0; i < kMaxStreams; i++) {
1825         if (!(indexToType(i) & streamMask)) {
1826             continue;
1827         }
1828 
1829         AString uri;
1830         uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
1831 
1832         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
1833         CHECK(fetcher != NULL);
1834 
1835         HLSTime startTime;
1836         SeekMode seekMode = kSeekModeExactPosition;
1837         sp<AnotherPacketSource> sources[kNumSources];
1838 
1839         if (i == kSubtitleIndex || (!pickTrack && !switching)) {
1840             startTime = latestMediaSegmentStartTime();
1841         }
1842 
1843         // TRICKY: looping from i as earlier streams are already removed from streamMask
1844         for (size_t j = i; j < kMaxStreams; ++j) {
1845             const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
1846             if ((streamMask & indexToType(j)) && uri == streamUri) {
1847                 sources[j] = mPacketSources.valueFor(indexToType(j));
1848 
1849                 if (timeUs >= 0) {
1850                     startTime.mTimeUs = timeUs;
1851                 } else {
1852                     int32_t type;
1853                     sp<AMessage> meta;
1854                     if (!switching) {
1855                         // selecting, or adapting but no swap required
1856                         meta = sources[j]->getLatestDequeuedMeta();
1857                     } else {
1858                         // adapting and swap required
1859                         meta = sources[j]->getLatestEnqueuedMeta();
1860                         if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
1861                             // switching up
1862                             meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
1863                         }
1864                     }
1865 
1866                     if ((j == kAudioIndex || j == kVideoIndex)
1867                             && meta != NULL && !meta->findInt32("discontinuity", &type)) {
1868                         HLSTime tmpTime(meta);
1869                         if (startTime < tmpTime) {
1870                             startTime = tmpTime;
1871                         }
1872                     }
1873 
1874                     if (!switching) {
1875                         // selecting, or adapting but no swap required
1876                         sources[j]->clear();
1877                         if (j == kSubtitleIndex) {
1878                             break;
1879                         }
1880 
1881                         ALOGV("stream[%zu]: queue format change", j);
1882                         sources[j]->queueDiscontinuity(
1883                                 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
1884                     } else {
1885                         // switching, queue discontinuities after resume
1886                         sources[j] = mPacketSources2.valueFor(indexToType(j));
1887                         sources[j]->clear();
1888                         // the new fetcher might be providing streams that used to be
1889                         // provided by two different fetchers,  if one of the fetcher
1890                         // paused in the middle while the other somehow paused in next
1891                         // seg, we have to start from next seg.
1892                         if (seekMode < mStreams[j].mSeekMode) {
1893                             seekMode = mStreams[j].mSeekMode;
1894                         }
1895                     }
1896                 }
1897 
1898                 streamMask &= ~indexToType(j);
1899             }
1900         }
1901 
1902         ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld "
1903                 "segmentStartTimeUs %lld seekMode %d",
1904                 fetcher->getFetcherID(),
1905                 (long long)startTime.mTimeUs,
1906                 (long long)mLastSeekTimeUs,
1907                 (long long)startTime.getSegmentTimeUs(),
1908                 seekMode);
1909 
1910         // Set the target segment start time to the middle point of the
1911         // segment where the last sample was.
1912         // This gives a better guess if segments of the two variants are not
1913         // perfectly aligned. (If the corresponding segment in new variant
1914         // starts slightly later than that in the old variant, we still want
1915         // to pick that segment, not the one before)
1916         fetcher->startAsync(
1917                 sources[kAudioIndex],
1918                 sources[kVideoIndex],
1919                 sources[kSubtitleIndex],
1920                 getMetadataSource(sources, mNewStreamMask, switching),
1921                 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs,
1922                 startTime.getSegmentTimeUs(),
1923                 startTime.mSeq,
1924                 seekMode);
1925     }
1926 
1927     // All fetchers have now been started, the configuration change
1928     // has completed.
1929 
1930     mReconfigurationInProgress = false;
1931     if (switching) {
1932         mSwitchInProgress = true;
1933     } else {
1934         mStreamMask = mNewStreamMask;
1935         if (mOrigBandwidthIndex != mCurBandwidthIndex) {
1936             ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd",
1937                     mOrigBandwidthIndex, mCurBandwidthIndex);
1938             mOrigBandwidthIndex = mCurBandwidthIndex;
1939         }
1940     }
1941 
1942     ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x",
1943             mSwitchInProgress, mStreamMask);
1944 
1945     if (mDisconnectReplyID != NULL) {
1946         finishDisconnect();
1947     }
1948 }
1949 
swapPacketSource(StreamType stream)1950 void LiveSession::swapPacketSource(StreamType stream) {
1951     ALOGV("[%s] swapPacketSource", getNameForStream(stream));
1952 
1953     // transfer packets from source2 to source
1954     sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
1955     sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
1956 
1957     // queue discontinuity in mPacketSource
1958     aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
1959 
1960     // queue packets in mPacketSource2 to mPacketSource
1961     status_t finalResult = OK;
1962     sp<ABuffer> accessUnit;
1963     while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
1964           OK == aps2->dequeueAccessUnit(&accessUnit)) {
1965         aps->queueAccessUnit(accessUnit);
1966     }
1967     aps2->clear();
1968 }
1969 
tryToFinishBandwidthSwitch(const AString & oldUri)1970 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
1971     if (!mSwitchInProgress) {
1972         return;
1973     }
1974 
1975     ssize_t index = mFetcherInfos.indexOfKey(oldUri);
1976     if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
1977         return;
1978     }
1979 
1980     // Swap packet source of streams provided by old variant
1981     for (size_t idx = 0; idx < kMaxStreams; idx++) {
1982         StreamType stream = indexToType(idx);
1983         if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
1984             swapPacketSource(stream);
1985 
1986             if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
1987                 ALOGW("swapping stream type %d %s to empty stream",
1988                         stream, mStreams[idx].mUri.c_str());
1989             }
1990             mStreams[idx].mUri = mStreams[idx].mNewUri;
1991             mStreams[idx].mNewUri.clear();
1992 
1993             mSwapMask &= ~stream;
1994         }
1995     }
1996 
1997     mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
1998 
1999     ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask);
2000     if (mSwapMask != 0) {
2001         return;
2002     }
2003 
2004     // Check if new variant contains extra streams.
2005     uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
2006     while (extraStreams) {
2007         StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
2008         extraStreams &= ~stream;
2009 
2010         swapPacketSource(stream);
2011 
2012         ssize_t idx = typeToIndex(stream);
2013         CHECK(idx >= 0);
2014         if (mStreams[idx].mNewUri.empty()) {
2015             ALOGW("swapping extra stream type %d %s to empty stream",
2016                     stream, mStreams[idx].mUri.c_str());
2017         }
2018         mStreams[idx].mUri = mStreams[idx].mNewUri;
2019         mStreams[idx].mNewUri.clear();
2020     }
2021 
2022     // Restart new fetcher (it was paused after the first 47k block)
2023     // and let it fetch into mPacketSources (not mPacketSources2)
2024     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2025         FetcherInfo &info = mFetcherInfos.editValueAt(i);
2026         if (info.mToBeResumed) {
2027             resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
2028             info.mToBeResumed = false;
2029         }
2030     }
2031 
2032     ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
2033             mOrigBandwidthIndex, mCurBandwidthIndex);
2034 
2035     mStreamMask = mNewStreamMask;
2036     mSwitchInProgress = false;
2037     mOrigBandwidthIndex = mCurBandwidthIndex;
2038 
2039     restartPollBuffering();
2040 }
2041 
schedulePollBuffering()2042 void LiveSession::schedulePollBuffering() {
2043     sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
2044     msg->setInt32("generation", mPollBufferingGeneration);
2045     msg->post(1000000ll);
2046 }
2047 
cancelPollBuffering()2048 void LiveSession::cancelPollBuffering() {
2049     ++mPollBufferingGeneration;
2050     mPrevBufferPercentage = -1;
2051 }
2052 
restartPollBuffering()2053 void LiveSession::restartPollBuffering() {
2054     cancelPollBuffering();
2055     onPollBuffering();
2056 }
2057 
onPollBuffering()2058 void LiveSession::onPollBuffering() {
2059     ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
2060             "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
2061         mSwitchInProgress, mReconfigurationInProgress,
2062         mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
2063 
2064     bool underflow, ready, down, up;
2065     if (checkBuffering(underflow, ready, down, up)) {
2066         if (mInPreparationPhase) {
2067             // Allow down switch even if we're still preparing.
2068             //
2069             // Some streams have a high bandwidth index as default,
2070             // when bandwidth is low, it takes a long time to buffer
2071             // to ready mark, then it immediately pauses after start
2072             // as we have to do a down switch. It's better experience
2073             // to restart from a lower index, if we detect low bw.
2074             if (!switchBandwidthIfNeeded(false /* up */, down) && ready) {
2075                 postPrepared(OK);
2076             }
2077         }
2078 
2079         if (!mInPreparationPhase) {
2080             if (ready) {
2081                 stopBufferingIfNecessary();
2082             } else if (underflow) {
2083                 startBufferingIfNecessary();
2084             }
2085             switchBandwidthIfNeeded(up, down);
2086         }
2087     }
2088 
2089     schedulePollBuffering();
2090 }
2091 
cancelBandwidthSwitch(bool resume)2092 void LiveSession::cancelBandwidthSwitch(bool resume) {
2093     ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
2094             mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
2095     if (!mSwitchInProgress) {
2096         return;
2097     }
2098 
2099     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
2100         FetcherInfo& info = mFetcherInfos.editValueAt(i);
2101         if (info.mToBeRemoved) {
2102             info.mToBeRemoved = false;
2103             if (resume) {
2104                 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
2105             }
2106         }
2107     }
2108 
2109     for (size_t i = 0; i < kMaxStreams; ++i) {
2110         AString newUri = mStreams[i].mNewUri;
2111         if (!newUri.empty()) {
2112             // clear all mNewUri matching this newUri
2113             for (size_t j = i; j < kMaxStreams; ++j) {
2114                 if (mStreams[j].mNewUri == newUri) {
2115                     mStreams[j].mNewUri.clear();
2116                 }
2117             }
2118             ALOGV("stopping newUri = %s", newUri.c_str());
2119             ssize_t index = mFetcherInfos.indexOfKey(newUri);
2120             if (index < 0) {
2121                 ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
2122                 continue;
2123             }
2124             FetcherInfo &info = mFetcherInfos.editValueAt(index);
2125             info.mToBeRemoved = true;
2126             info.mFetcher->stopAsync();
2127         }
2128     }
2129 
2130     ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
2131             mOrigBandwidthIndex, mCurBandwidthIndex);
2132 
2133     mSwitchGeneration++;
2134     mSwitchInProgress = false;
2135     mCurBandwidthIndex = mOrigBandwidthIndex;
2136     mSwapMask = 0;
2137 }
2138 
checkBuffering(bool & underflow,bool & ready,bool & down,bool & up)2139 bool LiveSession::checkBuffering(
2140         bool &underflow, bool &ready, bool &down, bool &up) {
2141     underflow = ready = down = up = false;
2142 
2143     if (mReconfigurationInProgress) {
2144         ALOGV("Switch/Reconfig in progress, defer buffer polling");
2145         return false;
2146     }
2147 
2148     size_t activeCount, underflowCount, readyCount, downCount, upCount;
2149     activeCount = underflowCount = readyCount = downCount = upCount =0;
2150     int32_t minBufferPercent = -1;
2151     int64_t durationUs;
2152     if (getDuration(&durationUs) != OK) {
2153         durationUs = -1;
2154     }
2155     for (size_t i = 0; i < mPacketSources.size(); ++i) {
2156         // we don't check subtitles for buffering level
2157         if (!(mStreamMask & mPacketSources.keyAt(i)
2158                 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
2159             continue;
2160         }
2161         // ignore streams that never had any packet queued.
2162         // (it's possible that the variant only has audio or video)
2163         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
2164         if (meta == NULL) {
2165             continue;
2166         }
2167 
2168         status_t finalResult;
2169         int64_t bufferedDurationUs =
2170                 mPacketSources[i]->getBufferedDurationUs(&finalResult);
2171         ALOGV("[%s] buffered %lld us",
2172                 getNameForStream(mPacketSources.keyAt(i)),
2173                 (long long)bufferedDurationUs);
2174         if (durationUs >= 0) {
2175             int32_t percent;
2176             if (mPacketSources[i]->isFinished(0 /* duration */)) {
2177                 percent = 100;
2178             } else {
2179                 percent = (int32_t)(100.0 *
2180                         (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
2181             }
2182             if (minBufferPercent < 0 || percent < minBufferPercent) {
2183                 minBufferPercent = percent;
2184             }
2185         }
2186 
2187         ++activeCount;
2188         int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs;
2189         if (bufferedDurationUs > readyMark
2190                 || mPacketSources[i]->isFinished(0)) {
2191             ++readyCount;
2192         }
2193         if (!mPacketSources[i]->isFinished(0)) {
2194             if (bufferedDurationUs < kUnderflowMarkUs) {
2195                 ++underflowCount;
2196             }
2197             if (bufferedDurationUs > mUpSwitchMark) {
2198                 ++upCount;
2199             }
2200             if (bufferedDurationUs < mDownSwitchMark) {
2201                 ++downCount;
2202             }
2203         }
2204     }
2205 
2206     if (minBufferPercent >= 0) {
2207         notifyBufferingUpdate(minBufferPercent);
2208     }
2209 
2210     if (activeCount > 0) {
2211         up        = (upCount == activeCount);
2212         down      = (downCount > 0);
2213         ready     = (readyCount == activeCount);
2214         underflow = (underflowCount > 0);
2215         return true;
2216     }
2217 
2218     return false;
2219 }
2220 
startBufferingIfNecessary()2221 void LiveSession::startBufferingIfNecessary() {
2222     ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2223             mInPreparationPhase, mBuffering);
2224     if (!mBuffering) {
2225         mBuffering = true;
2226 
2227         sp<AMessage> notify = mNotify->dup();
2228         notify->setInt32("what", kWhatBufferingStart);
2229         notify->post();
2230     }
2231 }
2232 
stopBufferingIfNecessary()2233 void LiveSession::stopBufferingIfNecessary() {
2234     ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
2235             mInPreparationPhase, mBuffering);
2236 
2237     if (mBuffering) {
2238         mBuffering = false;
2239 
2240         sp<AMessage> notify = mNotify->dup();
2241         notify->setInt32("what", kWhatBufferingEnd);
2242         notify->post();
2243     }
2244 }
2245 
notifyBufferingUpdate(int32_t percentage)2246 void LiveSession::notifyBufferingUpdate(int32_t percentage) {
2247     if (percentage < mPrevBufferPercentage) {
2248         percentage = mPrevBufferPercentage;
2249     } else if (percentage > 100) {
2250         percentage = 100;
2251     }
2252 
2253     mPrevBufferPercentage = percentage;
2254 
2255     ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
2256 
2257     sp<AMessage> notify = mNotify->dup();
2258     notify->setInt32("what", kWhatBufferingUpdate);
2259     notify->setInt32("percentage", percentage);
2260     notify->post();
2261 }
2262 
tryBandwidthFallback()2263 bool LiveSession::tryBandwidthFallback() {
2264     if (mInPreparationPhase || mReconfigurationInProgress) {
2265         // Don't try fallback during prepare or reconfig.
2266         // If error happens there, it's likely unrecoverable.
2267         return false;
2268     }
2269     if (mCurBandwidthIndex > mOrigBandwidthIndex) {
2270         // if we're switching up, simply cancel and resume old variant
2271         cancelBandwidthSwitch(true /* resume */);
2272         return true;
2273     } else {
2274         // if we're switching down, we're likely about to underflow (if
2275         // not already underflowing). try the lowest viable bandwidth if
2276         // not on that variant already.
2277         ssize_t lowestValid = getLowestValidBandwidthIndex();
2278         if (mCurBandwidthIndex > lowestValid) {
2279             cancelBandwidthSwitch();
2280             changeConfiguration(-1ll, lowestValid);
2281             return true;
2282         }
2283     }
2284     // return false if we couldn't find any fallback
2285     return false;
2286 }
2287 
2288 /*
2289  * returns true if a bandwidth switch is actually needed (and started),
2290  * returns false otherwise
2291  */
switchBandwidthIfNeeded(bool bufferHigh,bool bufferLow)2292 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
2293     // no need to check bandwidth if we only have 1 bandwidth settings
2294     if (mBandwidthItems.size() < 2) {
2295         return false;
2296     }
2297 
2298     if (mSwitchInProgress) {
2299         if (mBuffering) {
2300             tryBandwidthFallback();
2301         }
2302         return false;
2303     }
2304 
2305     int32_t bandwidthBps, shortTermBps;
2306     bool isStable;
2307     if (mBandwidthEstimator->estimateBandwidth(
2308             &bandwidthBps, &isStable, &shortTermBps)) {
2309         ALOGV("bandwidth estimated at %.2f kbps, "
2310                 "stable %d, shortTermBps %.2f kbps",
2311                 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f);
2312         mLastBandwidthBps = bandwidthBps;
2313         mLastBandwidthStable = isStable;
2314     } else {
2315         ALOGV("no bandwidth estimate.");
2316         return false;
2317     }
2318 
2319     int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
2320     // canSwithDown and canSwitchUp can't both be true.
2321     // we only want to switch up when measured bw is 120% higher than current variant,
2322     // and we only want to switch down when measured bw is below current variant.
2323     bool canSwitchDown = bufferLow
2324             && (bandwidthBps < (int32_t)curBandwidth);
2325     bool canSwitchUp = bufferHigh
2326             && (bandwidthBps > (int32_t)curBandwidth * 12 / 10);
2327 
2328     if (canSwitchDown || canSwitchUp) {
2329         // bandwidth estimating has some delay, if we have to downswitch when
2330         // it hasn't stabilized, use the short term to guess real bandwidth,
2331         // since it may be dropping too fast.
2332         // (note this doesn't apply to upswitch, always use longer average there)
2333         if (!isStable && canSwitchDown) {
2334             if (shortTermBps < bandwidthBps) {
2335                 bandwidthBps = shortTermBps;
2336             }
2337         }
2338 
2339         ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);
2340 
2341         // it's possible that we're checking for canSwitchUp case, but the returned
2342         // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70%
2343         // of measured bw. In that case we don't want to do anything, since we have
2344         // both enough buffer and enough bw.
2345         if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
2346          || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) {
2347             // if not yet prepared, just restart again with new bw index.
2348             // this is faster and playback experience is cleaner.
2349             changeConfiguration(
2350                     mInPreparationPhase ? 0 : -1ll, bandwidthIndex);
2351             return true;
2352         }
2353     }
2354     return false;
2355 }
2356 
postError(status_t err)2357 void LiveSession::postError(status_t err) {
2358     // if we reached EOS, notify buffering of 100%
2359     if (err == ERROR_END_OF_STREAM) {
2360         notifyBufferingUpdate(100);
2361     }
2362     // we'll stop buffer polling now, before that notify
2363     // stop buffering to stop the spinning icon
2364     stopBufferingIfNecessary();
2365     cancelPollBuffering();
2366 
2367     sp<AMessage> notify = mNotify->dup();
2368     notify->setInt32("what", kWhatError);
2369     notify->setInt32("err", err);
2370     notify->post();
2371 }
2372 
postPrepared(status_t err)2373 void LiveSession::postPrepared(status_t err) {
2374     CHECK(mInPreparationPhase);
2375 
2376     sp<AMessage> notify = mNotify->dup();
2377     if (err == OK || err == ERROR_END_OF_STREAM) {
2378         notify->setInt32("what", kWhatPrepared);
2379     } else {
2380         cancelPollBuffering();
2381 
2382         notify->setInt32("what", kWhatPreparationFailed);
2383         notify->setInt32("err", err);
2384     }
2385 
2386     notify->post();
2387 
2388     mInPreparationPhase = false;
2389 }
2390 
2391 
2392 }  // namespace android
2393 
2394