1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define USE_LOG SLAndroidLogLevel_Verbose
18 
19 #include "sles_allinclusive.h"
20 #include "android_StreamPlayer.h"
21 
22 #include <media/IStreamSource.h>
23 #include <media/IMediaPlayerService.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 #include <media/stagefright/foundation/MediaKeys.h>
26 #include <binder/IPCThreadState.h>
27 
28 #include <ATSParser.h>
29 
30 //--------------------------------------------------------------------------------------------------
31 namespace android {
32 
StreamSourceAppProxy(IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector,StreamPlayer * player)33 StreamSourceAppProxy::StreamSourceAppProxy(
34         IAndroidBufferQueue *androidBufferQueue,
35         const sp<CallbackProtector> &callbackProtector,
36         // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
37         // construction.   If you pass in a sp<> to 'this' inside a constructor, then first the
38         // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
39         // destructor to run from inside it's own constructor.
40         StreamPlayer * /* const sp<StreamPlayer> & */ player) :
41     mBuffersHasBeenSet(false),
42     mAndroidBufferQueue(androidBufferQueue),
43     mCallbackProtector(callbackProtector),
44     mPlayer(player)
45 {
46     SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
47 }
48 
~StreamSourceAppProxy()49 StreamSourceAppProxy::~StreamSourceAppProxy() {
50     SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
51     disconnect();
52 }
53 
54 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
55         SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
56         sizeof(SLuint32),                    // item size
57         SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
58 };
59 
60 //--------------------------------------------------
61 // IStreamSource implementation
setListener(const sp<IStreamListener> & listener)62 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
63     assert(listener != NULL);
64     Mutex::Autolock _l(mLock);
65     assert(mListener == NULL);
66     mListener = listener;
67 }
68 
setBuffers(const Vector<sp<IMemory>> & buffers)69 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
70     Mutex::Autolock _l(mLock);
71     assert(!mBuffersHasBeenSet);
72     mBuffers = buffers;
73     mBuffersHasBeenSet = true;
74 }
75 
onBufferAvailable(size_t index)76 void StreamSourceAppProxy::onBufferAvailable(size_t index) {
77     //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
78 
79     {
80         Mutex::Autolock _l(mLock);
81         if (!mBuffersHasBeenSet) {
82             // no buffers available to push data to from the buffer queue, bail
83             return;
84         }
85         CHECK_LT(index, mBuffers.size());
86 #if 0   // enable if needed for debugging
87         sp<IMemory> mem = mBuffers.itemAt(index);
88         SLAint64 length = (SLAint64) mem->size();
89 #endif
90         mAvailableBuffers.push_back(index);
91         //SL_LOGD("onBufferAvailable() now %d buffers available in queue",
92         //         mAvailableBuffers.size());
93     }
94 
95     // a new shared mem buffer is available: let's try to fill immediately
96     pullFromBuffQueue();
97 }
98 
receivedCmd_l(IStreamListener::Command cmd,const sp<AMessage> & msg)99 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
100     if (mListener != 0) {
101         mListener->issueCommand(cmd, false /* synchronous */, msg);
102     }
103 }
104 
receivedBuffer_l(size_t buffIndex,size_t buffLength)105 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
106     if (mListener != 0) {
107         mListener->queueBuffer(buffIndex, buffLength);
108     }
109 }
110 
disconnect()111 void StreamSourceAppProxy::disconnect() {
112     Mutex::Autolock _l(mLock);
113     mListener.clear();
114     // Force binder to push the decremented reference count for sp<IStreamListener>.
115     // mediaserver and client both have sp<> to the other. When you decrement an sp<>
116     // reference count, binder doesn't push that to the other process immediately.
117     IPCThreadState::self()->flushCommands();
118     mBuffers.clear();
119     mBuffersHasBeenSet = false;
120     mAvailableBuffers.clear();
121 }
122 
123 //--------------------------------------------------
124 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
pullFromBuffQueue()125 void StreamSourceAppProxy::pullFromBuffQueue() {
126 
127   if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
128 
129     size_t bufferId;
130     void* bufferLoc;
131     size_t buffSize;
132 
133     slAndroidBufferQueueCallback callback = NULL;
134     void* pBufferContext, *pBufferData, *callbackPContext = NULL;
135     AdvancedBufferHeader *oldFront = NULL;
136     uint32_t dataSize /* , dataUsed */;
137 
138     // retrieve data from the buffer queue
139     interface_lock_exclusive(mAndroidBufferQueue);
140 
141     // can this read operation cause us to call the buffer queue callback
142     // (either because there was a command with no data, or all the data has been consumed)
143     bool queueCallbackCandidate = false;
144 
145     if (mAndroidBufferQueue->mState.count != 0) {
146         // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
147         assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
148 
149         oldFront = mAndroidBufferQueue->mFront;
150         AdvancedBufferHeader *newFront = &oldFront[1];
151 
152         // consume events when starting to read data from a buffer for the first time
153         if (oldFront->mDataSizeConsumed == 0) {
154             // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
155             if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
156                 receivedCmd_l(IStreamListener::EOS);
157                 // EOS has no associated data
158                 queueCallbackCandidate = true;
159             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
160                 receivedCmd_l(IStreamListener::DISCONTINUITY);
161             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
162                 sp<AMessage> msg = new AMessage();
163                 msg->setInt64(kATSParserKeyResumeAtPTS,
164                         (int64_t)oldFront->mItems.mTsCmdData.mPts);
165                 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
166             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
167                     & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) {
168                 sp<AMessage> msg = new AMessage();
169                 msg->setInt32(
170                         kIStreamListenerKeyDiscontinuityMask,
171                         ATSParser::DISCONTINUITY_FORMATCHANGE);
172                 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
173             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
174                     & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) {
175                 sp<AMessage> msg = new AMessage();
176                 msg->setInt32(
177                         kIStreamListenerKeyDiscontinuityMask,
178                         ATSParser::DISCONTINUITY_VIDEO_FORMAT);
179                 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
180             }
181             // note that here we are intentionally only supporting
182             //   ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c
183 
184             // some commands may introduce a time discontinuity, reevaluate position if needed
185             if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
186                     ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) {
187                 const sp<StreamPlayer> player(mPlayer.promote());
188                 if (player != NULL) {
189                     // FIXME see note at onSeek
190                     player->seek(ANDROID_UNKNOWN_TIME);
191                 }
192             }
193             oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
194         }
195 
196         {
197             // we're going to change the shared mem buffer queue, so lock it
198             Mutex::Autolock _l(mLock);
199             if (!mAvailableBuffers.empty()) {
200                 bufferId = *mAvailableBuffers.begin();
201                 CHECK_LT(bufferId, mBuffers.size());
202                 sp<IMemory> mem = mBuffers.itemAt(bufferId);
203                 bufferLoc = mem->unsecurePointer();
204                 buffSize = mem->size();
205 
206                 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
207                 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
208                     // more available than requested, copy as much as requested
209                     // consume data: 1/ copy to given destination
210                     memcpy(bufferLoc, pSrc, buffSize);
211                     //               2/ keep track of how much has been consumed
212                     oldFront->mDataSizeConsumed += buffSize;
213                     //               3/ notify shared mem listener that new data is available
214                     receivedBuffer_l(bufferId, buffSize);
215                     mAvailableBuffers.erase(mAvailableBuffers.begin());
216                 } else {
217                     // requested as much available or more: consume the whole of the current
218                     //   buffer and move to the next
219                     size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
220                     //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
221                     oldFront->mDataSizeConsumed = oldFront->mDataSize;
222 
223                     // move queue to next
224                     if (newFront == &mAndroidBufferQueue->
225                             mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
226                         // reached the end, circle back
227                         newFront = mAndroidBufferQueue->mBufferArray;
228                     }
229                     mAndroidBufferQueue->mFront = newFront;
230                     mAndroidBufferQueue->mState.count--;
231                     mAndroidBufferQueue->mState.index++;
232 
233                     if (consumed > 0) {
234                         // consume data: 1/ copy to given destination
235                         memcpy(bufferLoc, pSrc, consumed);
236                         //               2/ keep track of how much has been consumed
237                         // here nothing to do because we are done with this buffer
238                         //               3/ notify StreamPlayer that new data is available
239                         receivedBuffer_l(bufferId, consumed);
240                         mAvailableBuffers.erase(mAvailableBuffers.begin());
241                     }
242 
243                     // data has been consumed, and the buffer queue state has been updated
244                     // we will notify the client if applicable
245                     queueCallbackCandidate = true;
246                 }
247             }
248 
249             if (queueCallbackCandidate) {
250                 if (mAndroidBufferQueue->mCallbackEventsMask &
251                         SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
252                     callback = mAndroidBufferQueue->mCallback;
253                     // save callback data while under lock
254                     callbackPContext = mAndroidBufferQueue->mContext;
255                     pBufferContext = (void *)oldFront->mBufferContext;
256                     pBufferData    = (void *)oldFront->mDataBuffer;
257                     dataSize       = oldFront->mDataSize;
258                     // here a buffer is only dequeued when fully consumed
259                     //dataUsed     = oldFront->mDataSizeConsumed;
260                 }
261             }
262             //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
263             if (!mAvailableBuffers.empty()) {
264                 // there is still room in the shared memory, recheck later if we can pull
265                 // data from the buffer queue and write it to shared memory
266                 const sp<StreamPlayer> player(mPlayer.promote());
267                 if (player != NULL) {
268                     player->queueRefilled();
269                 }
270             }
271         }
272 
273     } else { // empty queue
274         SL_LOGD("ABQ empty, starving!");
275     }
276 
277     interface_unlock_exclusive(mAndroidBufferQueue);
278 
279     // notify client of buffer processed
280     if (NULL != callback) {
281         SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
282                 pBufferContext, pBufferData, dataSize,
283                 dataSize, /* dataUsed  */
284                 // no messages during playback other than marking the buffer as processed
285                 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
286                 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
287         if (SL_RESULT_SUCCESS != result) {
288             // Reserved for future use
289             SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
290         }
291     }
292 
293     mCallbackProtector->exitCb();
294   } // enterCbIfOk
295 }
296 
297 
298 //--------------------------------------------------------------------------------------------------
StreamPlayer(const AudioPlayback_Parameters * params,bool hasVideo,IAndroidBufferQueue * androidBufferQueue,const sp<CallbackProtector> & callbackProtector)299 StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo,
300         IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
301         GenericMediaPlayer(params, hasVideo),
302         mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
303         mStopForDestroyCompleted(false)
304 {
305     SL_LOGD("StreamPlayer::StreamPlayer()");
306 }
307 
~StreamPlayer()308 StreamPlayer::~StreamPlayer() {
309     SL_LOGD("StreamPlayer::~StreamPlayer()");
310     mAppProxy->disconnect();
311 }
312 
313 
onMessageReceived(const sp<AMessage> & msg)314 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
315     switch (msg->what()) {
316         case kWhatPullFromAbq:
317             onPullFromAndroidBufferQueue();
318             break;
319 
320         case kWhatStopForDestroy:
321             onStopForDestroy();
322             break;
323 
324         default:
325             GenericMediaPlayer::onMessageReceived(msg);
326             break;
327     }
328 }
329 
330 
preDestroy()331 void StreamPlayer::preDestroy() {
332     // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
333     (new AMessage(kWhatStopForDestroy, this))->post();
334     {
335         Mutex::Autolock _l(mStopForDestroyLock);
336         while (!mStopForDestroyCompleted) {
337             mStopForDestroyCondition.wait(mStopForDestroyLock);
338         }
339     }
340     // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
341     GenericMediaPlayer::preDestroy();
342 }
343 
344 
onStopForDestroy()345 void StreamPlayer::onStopForDestroy() {
346     if (mPlayer != 0) {
347         mPlayer->stop();
348         // causes CHECK failure in Nuplayer
349         //mPlayer->setDataSource(NULL);
350         mPlayer->setVideoSurfaceTexture(NULL);
351         mPlayer->disconnect();
352         mPlayer.clear();
353         {
354             // FIXME ugh make this a method
355             Mutex::Autolock _l(mPreparedPlayerLock);
356             mPreparedPlayer.clear();
357         }
358     }
359     {
360         Mutex::Autolock _l(mStopForDestroyLock);
361         mStopForDestroyCompleted = true;
362     }
363     mStopForDestroyCondition.signal();
364 }
365 
366 
367 /**
368  * Asynchronously notify the player that the queue is ready to be pulled from.
369  */
queueRefilled()370 void StreamPlayer::queueRefilled() {
371     // async notification that the ABQ was refilled: the player should pull from the ABQ, and
372     //    and push to shared memory (to the media server)
373     (new AMessage(kWhatPullFromAbq, this))->post();
374 }
375 
376 
appClear_l()377 void StreamPlayer::appClear_l() {
378     // the user of StreamPlayer has cleared its AndroidBufferQueue:
379     // there's no clear() for the shared memory queue, so this is a no-op
380 }
381 
382 
383 //--------------------------------------------------
384 // Event handlers
onPrepare()385 void StreamPlayer::onPrepare() {
386     SL_LOGD("StreamPlayer::onPrepare()");
387         sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
388         if (mediaPlayerService != NULL) {
389             mPlayer = mediaPlayerService->create(mPlayerClient /*IMediaPlayerClient*/,
390                     mPlaybackParams.sessionId);
391             if (mPlayer == NULL) {
392                 SL_LOGE("media player service failed to create player by app proxy");
393             } else if (mPlayer->setDataSource(static_cast<sp<IStreamSource>>(mAppProxy)) !=
394                     NO_ERROR) {
395                 SL_LOGE("setDataSource failed");
396                 mPlayer.clear();
397             }
398         }
399     if (mPlayer == NULL) {
400         mStateFlags |= kFlagPreparedUnsuccessfully;
401     }
402     GenericMediaPlayer::onPrepare();
403     SL_LOGD("StreamPlayer::onPrepare() done");
404 }
405 
406 
onPlay()407 void StreamPlayer::onPlay() {
408     SL_LOGD("StreamPlayer::onPlay()");
409     // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
410     // player had starved the shared memory)
411     queueRefilled();
412 
413     GenericMediaPlayer::onPlay();
414 }
415 
416 
onPullFromAndroidBufferQueue()417 void StreamPlayer::onPullFromAndroidBufferQueue() {
418     SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
419     mAppProxy->pullFromBuffQueue();
420 }
421 
422 } // namespace android
423