1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "StreamingSource"
19 #include <utils/Log.h>
20 
21 #include "StreamingSource.h"
22 
23 #include "ATSParser.h"
24 #include "AnotherPacketSource.h"
25 #include "NuPlayerStreamListener.h"
26 
27 #include <media/stagefright/foundation/ABuffer.h>
28 #include <media/stagefright/foundation/ADebug.h>
29 #include <media/stagefright/foundation/AMessage.h>
30 #include <media/stagefright/MediaSource.h>
31 #include <media/stagefright/MetaData.h>
32 #include <media/stagefright/Utils.h>
33 
34 namespace android {
35 
36 const int32_t kNumListenerQueuePackets = 80;
37 
StreamingSource(const sp<AMessage> & notify,const sp<IStreamSource> & source)38 NuPlayer::StreamingSource::StreamingSource(
39         const sp<AMessage> &notify,
40         const sp<IStreamSource> &source)
41     : Source(notify),
42       mSource(source),
43       mFinalResult(OK),
44       mBuffering(false) {
45 }
46 
~StreamingSource()47 NuPlayer::StreamingSource::~StreamingSource() {
48     if (mLooper != NULL) {
49         mLooper->unregisterHandler(id());
50         mLooper->stop();
51     }
52 }
53 
prepareAsync()54 void NuPlayer::StreamingSource::prepareAsync() {
55     if (mLooper == NULL) {
56         mLooper = new ALooper;
57         mLooper->setName("streaming");
58         mLooper->start();
59 
60         mLooper->registerHandler(this);
61     }
62 
63     notifyVideoSizeChanged();
64     notifyFlagsChanged(0);
65     notifyPrepared();
66 }
67 
start()68 void NuPlayer::StreamingSource::start() {
69     mStreamListener = new NuPlayerStreamListener(mSource, NULL);
70 
71     uint32_t sourceFlags = mSource->flags();
72 
73     uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
74     if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
75         parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
76     }
77 
78     mTSParser = new ATSParser(parserFlags);
79 
80     mStreamListener->start();
81 
82     postReadBuffer();
83 }
84 
feedMoreTSData()85 status_t NuPlayer::StreamingSource::feedMoreTSData() {
86     return postReadBuffer();
87 }
88 
onReadBuffer()89 void NuPlayer::StreamingSource::onReadBuffer() {
90     for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) {
91         char buffer[188];
92         sp<AMessage> extra;
93         ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
94 
95         if (n == 0) {
96             ALOGI("input data EOS reached.");
97             mTSParser->signalEOS(ERROR_END_OF_STREAM);
98             setError(ERROR_END_OF_STREAM);
99             break;
100         } else if (n == INFO_DISCONTINUITY) {
101             int32_t type = ATSParser::DISCONTINUITY_TIME;
102 
103             int32_t mask;
104             if (extra != NULL
105                     && extra->findInt32(
106                         IStreamListener::kKeyDiscontinuityMask, &mask)) {
107                 if (mask == 0) {
108                     ALOGE("Client specified an illegal discontinuity type.");
109                     setError(ERROR_UNSUPPORTED);
110                     break;
111                 }
112 
113                 type = mask;
114             }
115 
116             mTSParser->signalDiscontinuity(
117                     (ATSParser::DiscontinuityType)type, extra);
118         } else if (n < 0) {
119             break;
120         } else {
121             if (buffer[0] == 0x00) {
122                 // XXX legacy
123 
124                 if (extra == NULL) {
125                     extra = new AMessage;
126                 }
127 
128                 uint8_t type = buffer[1];
129 
130                 if (type & 2) {
131                     int64_t mediaTimeUs;
132                     memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
133 
134                     extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
135                 }
136 
137                 mTSParser->signalDiscontinuity(
138                         ((type & 1) == 0)
139                             ? ATSParser::DISCONTINUITY_TIME
140                             : ATSParser::DISCONTINUITY_FORMATCHANGE,
141                         extra);
142             } else {
143                 status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
144 
145                 if (err != OK) {
146                     ALOGE("TS Parser returned error %d", err);
147 
148                     mTSParser->signalEOS(err);
149                     setError(err);
150                     break;
151                 }
152             }
153         }
154     }
155 }
156 
postReadBuffer()157 status_t NuPlayer::StreamingSource::postReadBuffer() {
158     {
159         Mutex::Autolock _l(mBufferingLock);
160         if (mFinalResult != OK) {
161             return mFinalResult;
162         }
163         if (mBuffering) {
164             return OK;
165         }
166         mBuffering = true;
167     }
168 
169     (new AMessage(kWhatReadBuffer, this))->post();
170     return OK;
171 }
172 
haveSufficientDataOnAllTracks()173 bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
174     // We're going to buffer at least 2 secs worth data on all tracks before
175     // starting playback (both at startup and after a seek).
176 
177     static const int64_t kMinDurationUs = 2000000ll;
178 
179     sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
180     sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
181 
182     status_t err;
183     int64_t durationUs;
184     if (audioTrack != NULL
185             && (durationUs = audioTrack->getBufferedDurationUs(&err))
186                     < kMinDurationUs
187             && err == OK) {
188         ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
189               durationUs / 1E6);
190         return false;
191     }
192 
193     if (videoTrack != NULL
194             && (durationUs = videoTrack->getBufferedDurationUs(&err))
195                     < kMinDurationUs
196             && err == OK) {
197         ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
198               durationUs / 1E6);
199         return false;
200     }
201 
202     return true;
203 }
204 
setError(status_t err)205 void NuPlayer::StreamingSource::setError(status_t err) {
206     Mutex::Autolock _l(mBufferingLock);
207     mFinalResult = err;
208 }
209 
getSource(bool audio)210 sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
211     if (mTSParser == NULL) {
212         return NULL;
213     }
214 
215     sp<MediaSource> source = mTSParser->getSource(
216             audio ? ATSParser::AUDIO : ATSParser::VIDEO);
217 
218     return static_cast<AnotherPacketSource *>(source.get());
219 }
220 
getFormat(bool audio)221 sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) {
222     sp<AnotherPacketSource> source = getSource(audio);
223 
224     sp<AMessage> format = new AMessage;
225     if (source == NULL) {
226         format->setInt32("err", -EWOULDBLOCK);
227         return format;
228     }
229 
230     sp<MetaData> meta = source->getFormat();
231     status_t err = convertMetaDataToMessage(meta, &format);
232     if (err != OK) {
233         format->setInt32("err", err);
234     }
235     return format;
236 }
237 
dequeueAccessUnit(bool audio,sp<ABuffer> * accessUnit)238 status_t NuPlayer::StreamingSource::dequeueAccessUnit(
239         bool audio, sp<ABuffer> *accessUnit) {
240     sp<AnotherPacketSource> source = getSource(audio);
241 
242     if (source == NULL) {
243         return -EWOULDBLOCK;
244     }
245 
246     if (!haveSufficientDataOnAllTracks()) {
247         postReadBuffer();
248     }
249 
250     status_t finalResult;
251     if (!source->hasBufferAvailable(&finalResult)) {
252         return finalResult == OK ? -EWOULDBLOCK : finalResult;
253     }
254 
255     status_t err = source->dequeueAccessUnit(accessUnit);
256 
257 #if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
258     if (err == OK) {
259         int64_t timeUs;
260         CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
261         ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
262     }
263 #endif
264 
265     return err;
266 }
267 
isRealTime() const268 bool NuPlayer::StreamingSource::isRealTime() const {
269     return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
270 }
271 
onMessageReceived(const sp<AMessage> & msg)272 void NuPlayer::StreamingSource::onMessageReceived(
273         const sp<AMessage> &msg) {
274     switch (msg->what()) {
275         case kWhatReadBuffer:
276         {
277             onReadBuffer();
278 
279             {
280                 Mutex::Autolock _l(mBufferingLock);
281                 mBuffering = false;
282             }
283             break;
284         }
285         default:
286         {
287             TRESPASS();
288         }
289     }
290 }
291 
292 
293 }  // namespace android
294 
295