1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define USE_LOG SLAndroidLogLevel_Verbose
18
19 #include "sles_allinclusive.h"
20 #include "android_StreamPlayer.h"
21
22 #include <media/IStreamSource.h>
23 #include <media/IMediaPlayerService.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 #include <media/stagefright/foundation/MediaKeys.h>
26 #include <mpeg2ts/ATSParser.h>
27 #include <binder/IPCThreadState.h>
28
29 //--------------------------------------------------------------------------------------------------
30 namespace android {
31
StreamSourceAppProxy(IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector,StreamPlayer * player)32 StreamSourceAppProxy::StreamSourceAppProxy(
33 IAndroidBufferQueue *androidBufferQueue,
34 const sp<CallbackProtector> &callbackProtector,
35 // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
36 // construction. If you pass in a sp<> to 'this' inside a constructor, then first the
37 // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
38 // destructor to run from inside it's own constructor.
39 StreamPlayer * /* const sp<StreamPlayer> & */ player) :
40 mBuffersHasBeenSet(false),
41 mAndroidBufferQueue(androidBufferQueue),
42 mCallbackProtector(callbackProtector),
43 mPlayer(player)
44 {
45 SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
46 }
47
~StreamSourceAppProxy()48 StreamSourceAppProxy::~StreamSourceAppProxy() {
49 SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
50 disconnect();
51 }
52
53 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
54 SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
55 sizeof(SLuint32), // item size
56 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
57 };
58
59 //--------------------------------------------------
60 // IStreamSource implementation
setListener(const sp<IStreamListener> & listener)61 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
62 assert(listener != NULL);
63 Mutex::Autolock _l(mLock);
64 assert(mListener == NULL);
65 mListener = listener;
66 }
67
setBuffers(const Vector<sp<IMemory>> & buffers)68 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
69 Mutex::Autolock _l(mLock);
70 assert(!mBuffersHasBeenSet);
71 mBuffers = buffers;
72 mBuffersHasBeenSet = true;
73 }
74
onBufferAvailable(size_t index)75 void StreamSourceAppProxy::onBufferAvailable(size_t index) {
76 //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
77
78 {
79 Mutex::Autolock _l(mLock);
80 if (!mBuffersHasBeenSet) {
81 // no buffers available to push data to from the buffer queue, bail
82 return;
83 }
84 CHECK_LT(index, mBuffers.size());
85 #if 0 // enable if needed for debugging
86 sp<IMemory> mem = mBuffers.itemAt(index);
87 SLAint64 length = (SLAint64) mem->size();
88 #endif
89 mAvailableBuffers.push_back(index);
90 //SL_LOGD("onBufferAvailable() now %d buffers available in queue",
91 // mAvailableBuffers.size());
92 }
93
94 // a new shared mem buffer is available: let's try to fill immediately
95 pullFromBuffQueue();
96 }
97
receivedCmd_l(IStreamListener::Command cmd,const sp<AMessage> & msg)98 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
99 if (mListener != 0) {
100 mListener->issueCommand(cmd, false /* synchronous */, msg);
101 }
102 }
103
receivedBuffer_l(size_t buffIndex,size_t buffLength)104 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
105 if (mListener != 0) {
106 mListener->queueBuffer(buffIndex, buffLength);
107 }
108 }
109
disconnect()110 void StreamSourceAppProxy::disconnect() {
111 Mutex::Autolock _l(mLock);
112 mListener.clear();
113 // Force binder to push the decremented reference count for sp<IStreamListener>.
114 // mediaserver and client both have sp<> to the other. When you decrement an sp<>
115 // reference count, binder doesn't push that to the other process immediately.
116 IPCThreadState::self()->flushCommands();
117 mBuffers.clear();
118 mBuffersHasBeenSet = false;
119 mAvailableBuffers.clear();
120 }
121
122 //--------------------------------------------------
123 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
pullFromBuffQueue()124 void StreamSourceAppProxy::pullFromBuffQueue() {
125
126 if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
127
128 size_t bufferId;
129 void* bufferLoc;
130 size_t buffSize;
131
132 slAndroidBufferQueueCallback callback = NULL;
133 void* pBufferContext, *pBufferData, *callbackPContext = NULL;
134 AdvancedBufferHeader *oldFront = NULL;
135 uint32_t dataSize /* , dataUsed */;
136
137 // retrieve data from the buffer queue
138 interface_lock_exclusive(mAndroidBufferQueue);
139
140 // can this read operation cause us to call the buffer queue callback
141 // (either because there was a command with no data, or all the data has been consumed)
142 bool queueCallbackCandidate = false;
143
144 if (mAndroidBufferQueue->mState.count != 0) {
145 // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
146 assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
147
148 oldFront = mAndroidBufferQueue->mFront;
149 AdvancedBufferHeader *newFront = &oldFront[1];
150
151 // consume events when starting to read data from a buffer for the first time
152 if (oldFront->mDataSizeConsumed == 0) {
153 // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
154 if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
155 receivedCmd_l(IStreamListener::EOS);
156 // EOS has no associated data
157 queueCallbackCandidate = true;
158 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
159 receivedCmd_l(IStreamListener::DISCONTINUITY);
160 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
161 sp<AMessage> msg = new AMessage();
162 msg->setInt64(kATSParserKeyResumeAtPTS,
163 (int64_t)oldFront->mItems.mTsCmdData.mPts);
164 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
165 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
166 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) {
167 sp<AMessage> msg = new AMessage();
168 msg->setInt32(
169 kIStreamListenerKeyDiscontinuityMask,
170 ATSParser::DISCONTINUITY_FORMATCHANGE);
171 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
172 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
173 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) {
174 sp<AMessage> msg = new AMessage();
175 msg->setInt32(
176 kIStreamListenerKeyDiscontinuityMask,
177 ATSParser::DISCONTINUITY_VIDEO_FORMAT);
178 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
179 }
180 // note that here we are intentionally only supporting
181 // ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c
182
183 // some commands may introduce a time discontinuity, reevaluate position if needed
184 if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
185 ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) {
186 const sp<StreamPlayer> player(mPlayer.promote());
187 if (player != NULL) {
188 // FIXME see note at onSeek
189 player->seek(ANDROID_UNKNOWN_TIME);
190 }
191 }
192 oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
193 }
194
195 {
196 // we're going to change the shared mem buffer queue, so lock it
197 Mutex::Autolock _l(mLock);
198 if (!mAvailableBuffers.empty()) {
199 bufferId = *mAvailableBuffers.begin();
200 CHECK_LT(bufferId, mBuffers.size());
201 sp<IMemory> mem = mBuffers.itemAt(bufferId);
202 bufferLoc = mem->unsecurePointer();
203 buffSize = mem->size();
204
205 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
206 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
207 // more available than requested, copy as much as requested
208 // consume data: 1/ copy to given destination
209 memcpy(bufferLoc, pSrc, buffSize);
210 // 2/ keep track of how much has been consumed
211 oldFront->mDataSizeConsumed += buffSize;
212 // 3/ notify shared mem listener that new data is available
213 receivedBuffer_l(bufferId, buffSize);
214 mAvailableBuffers.erase(mAvailableBuffers.begin());
215 } else {
216 // requested as much available or more: consume the whole of the current
217 // buffer and move to the next
218 size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
219 //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
220 oldFront->mDataSizeConsumed = oldFront->mDataSize;
221
222 // move queue to next
223 if (newFront == &mAndroidBufferQueue->
224 mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
225 // reached the end, circle back
226 newFront = mAndroidBufferQueue->mBufferArray;
227 }
228 mAndroidBufferQueue->mFront = newFront;
229 mAndroidBufferQueue->mState.count--;
230 mAndroidBufferQueue->mState.index++;
231
232 if (consumed > 0) {
233 // consume data: 1/ copy to given destination
234 memcpy(bufferLoc, pSrc, consumed);
235 // 2/ keep track of how much has been consumed
236 // here nothing to do because we are done with this buffer
237 // 3/ notify StreamPlayer that new data is available
238 receivedBuffer_l(bufferId, consumed);
239 mAvailableBuffers.erase(mAvailableBuffers.begin());
240 }
241
242 // data has been consumed, and the buffer queue state has been updated
243 // we will notify the client if applicable
244 queueCallbackCandidate = true;
245 }
246 }
247
248 if (queueCallbackCandidate) {
249 if (mAndroidBufferQueue->mCallbackEventsMask &
250 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
251 callback = mAndroidBufferQueue->mCallback;
252 // save callback data while under lock
253 callbackPContext = mAndroidBufferQueue->mContext;
254 pBufferContext = (void *)oldFront->mBufferContext;
255 pBufferData = (void *)oldFront->mDataBuffer;
256 dataSize = oldFront->mDataSize;
257 // here a buffer is only dequeued when fully consumed
258 //dataUsed = oldFront->mDataSizeConsumed;
259 }
260 }
261 //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
262 if (!mAvailableBuffers.empty()) {
263 // there is still room in the shared memory, recheck later if we can pull
264 // data from the buffer queue and write it to shared memory
265 const sp<StreamPlayer> player(mPlayer.promote());
266 if (player != NULL) {
267 player->queueRefilled();
268 }
269 }
270 }
271
272 } else { // empty queue
273 SL_LOGD("ABQ empty, starving!");
274 }
275
276 interface_unlock_exclusive(mAndroidBufferQueue);
277
278 // notify client of buffer processed
279 if (NULL != callback) {
280 SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
281 pBufferContext, pBufferData, dataSize,
282 dataSize, /* dataUsed */
283 // no messages during playback other than marking the buffer as processed
284 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
285 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
286 if (SL_RESULT_SUCCESS != result) {
287 // Reserved for future use
288 SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
289 }
290 }
291
292 mCallbackProtector->exitCb();
293 } // enterCbIfOk
294 }
295
296
297 //--------------------------------------------------------------------------------------------------
StreamPlayer(const AudioPlayback_Parameters * params,bool hasVideo,IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector)298 StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo,
299 IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
300 GenericMediaPlayer(params, hasVideo),
301 mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
302 mStopForDestroyCompleted(false)
303 {
304 SL_LOGD("StreamPlayer::StreamPlayer()");
305 }
306
~StreamPlayer()307 StreamPlayer::~StreamPlayer() {
308 SL_LOGD("StreamPlayer::~StreamPlayer()");
309 mAppProxy->disconnect();
310 }
311
312
onMessageReceived(const sp<AMessage> & msg)313 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
314 switch (msg->what()) {
315 case kWhatPullFromAbq:
316 onPullFromAndroidBufferQueue();
317 break;
318
319 case kWhatStopForDestroy:
320 onStopForDestroy();
321 break;
322
323 default:
324 GenericMediaPlayer::onMessageReceived(msg);
325 break;
326 }
327 }
328
329
preDestroy()330 void StreamPlayer::preDestroy() {
331 // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
332 (new AMessage(kWhatStopForDestroy, this))->post();
333 {
334 Mutex::Autolock _l(mStopForDestroyLock);
335 while (!mStopForDestroyCompleted) {
336 mStopForDestroyCondition.wait(mStopForDestroyLock);
337 }
338 }
339 // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
340 GenericMediaPlayer::preDestroy();
341 }
342
343
onStopForDestroy()344 void StreamPlayer::onStopForDestroy() {
345 if (mPlayer != 0) {
346 mPlayer->stop();
347 // causes CHECK failure in Nuplayer
348 //mPlayer->setDataSource(NULL);
349 mPlayer->setVideoSurfaceTexture(NULL);
350 mPlayer->disconnect();
351 mPlayer.clear();
352 {
353 // FIXME ugh make this a method
354 Mutex::Autolock _l(mPreparedPlayerLock);
355 mPreparedPlayer.clear();
356 }
357 }
358 {
359 Mutex::Autolock _l(mStopForDestroyLock);
360 mStopForDestroyCompleted = true;
361 }
362 mStopForDestroyCondition.signal();
363 }
364
365
366 /**
367 * Asynchronously notify the player that the queue is ready to be pulled from.
368 */
queueRefilled()369 void StreamPlayer::queueRefilled() {
370 // async notification that the ABQ was refilled: the player should pull from the ABQ, and
371 // and push to shared memory (to the media server)
372 (new AMessage(kWhatPullFromAbq, this))->post();
373 }
374
375
appClear_l()376 void StreamPlayer::appClear_l() {
377 // the user of StreamPlayer has cleared its AndroidBufferQueue:
378 // there's no clear() for the shared memory queue, so this is a no-op
379 }
380
381
382 //--------------------------------------------------
383 // Event handlers
onPrepare()384 void StreamPlayer::onPrepare() {
385 SL_LOGD("StreamPlayer::onPrepare()");
386 sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
387 if (mediaPlayerService != NULL) {
388 mPlayer = mediaPlayerService->create(mPlayerClient /*IMediaPlayerClient*/,
389 mPlaybackParams.sessionId);
390 if (mPlayer == NULL) {
391 SL_LOGE("media player service failed to create player by app proxy");
392 } else if (mPlayer->setDataSource(static_cast<sp<IStreamSource>>(mAppProxy)) !=
393 NO_ERROR) {
394 SL_LOGE("setDataSource failed");
395 mPlayer.clear();
396 }
397 }
398 if (mPlayer == NULL) {
399 mStateFlags |= kFlagPreparedUnsuccessfully;
400 }
401 GenericMediaPlayer::onPrepare();
402 SL_LOGD("StreamPlayer::onPrepare() done");
403 }
404
405
onPlay()406 void StreamPlayer::onPlay() {
407 SL_LOGD("StreamPlayer::onPlay()");
408 // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
409 // player had starved the shared memory)
410 queueRefilled();
411
412 GenericMediaPlayer::onPlay();
413 }
414
415
onPullFromAndroidBufferQueue()416 void StreamPlayer::onPullFromAndroidBufferQueue() {
417 SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
418 mAppProxy->pullFromBuffQueue();
419 }
420
421 } // namespace android
422