1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "StreamingSource"
19 #include <utils/Log.h>
20
21 #include "StreamingSource.h"
22
23 #include "ATSParser.h"
24 #include "AnotherPacketSource.h"
25 #include "NuPlayerStreamListener.h"
26
27 #include <media/stagefright/foundation/ABuffer.h>
28 #include <media/stagefright/foundation/ADebug.h>
29 #include <media/stagefright/foundation/AMessage.h>
30 #include <media/stagefright/MediaSource.h>
31 #include <media/stagefright/MetaData.h>
32 #include <media/stagefright/Utils.h>
33
34 namespace android {
35
36 const int32_t kNumListenerQueuePackets = 80;
37
StreamingSource(const sp<AMessage> & notify,const sp<IStreamSource> & source)38 NuPlayer::StreamingSource::StreamingSource(
39 const sp<AMessage> ¬ify,
40 const sp<IStreamSource> &source)
41 : Source(notify),
42 mSource(source),
43 mFinalResult(OK),
44 mBuffering(false) {
45 }
46
~StreamingSource()47 NuPlayer::StreamingSource::~StreamingSource() {
48 if (mLooper != NULL) {
49 mLooper->unregisterHandler(id());
50 mLooper->stop();
51 }
52 }
53
prepareAsync()54 void NuPlayer::StreamingSource::prepareAsync() {
55 if (mLooper == NULL) {
56 mLooper = new ALooper;
57 mLooper->setName("streaming");
58 mLooper->start();
59
60 mLooper->registerHandler(this);
61 }
62
63 notifyVideoSizeChanged();
64 notifyFlagsChanged(0);
65 notifyPrepared();
66 }
67
start()68 void NuPlayer::StreamingSource::start() {
69 mStreamListener = new NuPlayerStreamListener(mSource, NULL);
70
71 uint32_t sourceFlags = mSource->flags();
72
73 uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
74 if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
75 parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
76 }
77
78 mTSParser = new ATSParser(parserFlags);
79
80 mStreamListener->start();
81
82 postReadBuffer();
83 }
84
feedMoreTSData()85 status_t NuPlayer::StreamingSource::feedMoreTSData() {
86 return postReadBuffer();
87 }
88
onReadBuffer()89 void NuPlayer::StreamingSource::onReadBuffer() {
90 for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) {
91 char buffer[188];
92 sp<AMessage> extra;
93 ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
94
95 if (n == 0) {
96 ALOGI("input data EOS reached.");
97 mTSParser->signalEOS(ERROR_END_OF_STREAM);
98 setError(ERROR_END_OF_STREAM);
99 break;
100 } else if (n == INFO_DISCONTINUITY) {
101 int32_t type = ATSParser::DISCONTINUITY_TIME;
102
103 int32_t mask;
104 if (extra != NULL
105 && extra->findInt32(
106 IStreamListener::kKeyDiscontinuityMask, &mask)) {
107 if (mask == 0) {
108 ALOGE("Client specified an illegal discontinuity type.");
109 setError(ERROR_UNSUPPORTED);
110 break;
111 }
112
113 type = mask;
114 }
115
116 mTSParser->signalDiscontinuity(
117 (ATSParser::DiscontinuityType)type, extra);
118 } else if (n < 0) {
119 break;
120 } else {
121 if (buffer[0] == 0x00) {
122 // XXX legacy
123
124 if (extra == NULL) {
125 extra = new AMessage;
126 }
127
128 uint8_t type = buffer[1];
129
130 if (type & 2) {
131 int64_t mediaTimeUs;
132 memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
133
134 extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
135 }
136
137 mTSParser->signalDiscontinuity(
138 ((type & 1) == 0)
139 ? ATSParser::DISCONTINUITY_TIME
140 : ATSParser::DISCONTINUITY_FORMATCHANGE,
141 extra);
142 } else {
143 status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
144
145 if (err != OK) {
146 ALOGE("TS Parser returned error %d", err);
147
148 mTSParser->signalEOS(err);
149 setError(err);
150 break;
151 }
152 }
153 }
154 }
155 }
156
postReadBuffer()157 status_t NuPlayer::StreamingSource::postReadBuffer() {
158 {
159 Mutex::Autolock _l(mBufferingLock);
160 if (mFinalResult != OK) {
161 return mFinalResult;
162 }
163 if (mBuffering) {
164 return OK;
165 }
166 mBuffering = true;
167 }
168
169 (new AMessage(kWhatReadBuffer, this))->post();
170 return OK;
171 }
172
haveSufficientDataOnAllTracks()173 bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
174 // We're going to buffer at least 2 secs worth data on all tracks before
175 // starting playback (both at startup and after a seek).
176
177 static const int64_t kMinDurationUs = 2000000ll;
178
179 sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
180 sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
181
182 status_t err;
183 int64_t durationUs;
184 if (audioTrack != NULL
185 && (durationUs = audioTrack->getBufferedDurationUs(&err))
186 < kMinDurationUs
187 && err == OK) {
188 ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
189 durationUs / 1E6);
190 return false;
191 }
192
193 if (videoTrack != NULL
194 && (durationUs = videoTrack->getBufferedDurationUs(&err))
195 < kMinDurationUs
196 && err == OK) {
197 ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
198 durationUs / 1E6);
199 return false;
200 }
201
202 return true;
203 }
204
setError(status_t err)205 void NuPlayer::StreamingSource::setError(status_t err) {
206 Mutex::Autolock _l(mBufferingLock);
207 mFinalResult = err;
208 }
209
getSource(bool audio)210 sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
211 if (mTSParser == NULL) {
212 return NULL;
213 }
214
215 sp<MediaSource> source = mTSParser->getSource(
216 audio ? ATSParser::AUDIO : ATSParser::VIDEO);
217
218 return static_cast<AnotherPacketSource *>(source.get());
219 }
220
getFormat(bool audio)221 sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) {
222 sp<AnotherPacketSource> source = getSource(audio);
223
224 sp<AMessage> format = new AMessage;
225 if (source == NULL) {
226 format->setInt32("err", -EWOULDBLOCK);
227 return format;
228 }
229
230 sp<MetaData> meta = source->getFormat();
231 status_t err = convertMetaDataToMessage(meta, &format);
232 if (err != OK) {
233 format->setInt32("err", err);
234 }
235 return format;
236 }
237
dequeueAccessUnit(bool audio,sp<ABuffer> * accessUnit)238 status_t NuPlayer::StreamingSource::dequeueAccessUnit(
239 bool audio, sp<ABuffer> *accessUnit) {
240 sp<AnotherPacketSource> source = getSource(audio);
241
242 if (source == NULL) {
243 return -EWOULDBLOCK;
244 }
245
246 if (!haveSufficientDataOnAllTracks()) {
247 postReadBuffer();
248 }
249
250 status_t finalResult;
251 if (!source->hasBufferAvailable(&finalResult)) {
252 return finalResult == OK ? -EWOULDBLOCK : finalResult;
253 }
254
255 status_t err = source->dequeueAccessUnit(accessUnit);
256
257 #if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
258 if (err == OK) {
259 int64_t timeUs;
260 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
261 ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
262 }
263 #endif
264
265 return err;
266 }
267
isRealTime() const268 bool NuPlayer::StreamingSource::isRealTime() const {
269 return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
270 }
271
onMessageReceived(const sp<AMessage> & msg)272 void NuPlayer::StreamingSource::onMessageReceived(
273 const sp<AMessage> &msg) {
274 switch (msg->what()) {
275 case kWhatReadBuffer:
276 {
277 onReadBuffer();
278
279 {
280 Mutex::Autolock _l(mBufferingLock);
281 mBuffering = false;
282 }
283 break;
284 }
285 default:
286 {
287 TRESPASS();
288 }
289 }
290 }
291
292
293 } // namespace android
294
295