1 /*
2  * Copyright 2014, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "MediaCodecSource"
19 #define DEBUG_DRIFT_TIME 0
20 
21 #include <inttypes.h>
22 
23 #include <gui/IGraphicBufferProducer.h>
24 #include <gui/Surface.h>
25 #include <mediadrm/ICrypto.h>
26 #include <media/MediaBufferHolder.h>
27 #include <media/MediaCodecBuffer.h>
28 #include <media/stagefright/MediaSource.h>
29 #include <media/stagefright/foundation/ABuffer.h>
30 #include <media/stagefright/foundation/ADebug.h>
31 #include <media/stagefright/foundation/ALooper.h>
32 #include <media/stagefright/foundation/AMessage.h>
33 #include <media/stagefright/MediaBuffer.h>
34 #include <media/stagefright/MediaCodec.h>
35 #include <media/stagefright/MediaCodecConstants.h>
36 #include <media/stagefright/MediaCodecList.h>
37 #include <media/stagefright/MediaCodecSource.h>
38 #include <media/stagefright/MediaErrors.h>
39 #include <media/stagefright/MetaData.h>
40 #include <media/stagefright/Utils.h>
41 
42 namespace android {
43 
44 const int32_t kDefaultSwVideoEncoderFormat = HAL_PIXEL_FORMAT_YCbCr_420_888;
45 const int32_t kDefaultHwVideoEncoderFormat = HAL_PIXEL_FORMAT_IMPLEMENTATION_DEFINED;
46 const int32_t kDefaultVideoEncoderDataSpace = HAL_DATASPACE_V0_BT709;
47 
48 const int kStopTimeoutUs = 300000; // allow 1 sec for shutting down encoder
49 // allow maximum 1 sec for stop time offset. This limits the the delay in the
50 // input source.
51 const int kMaxStopTimeOffsetUs = 1000000;
52 
53 struct MediaCodecSource::Puller : public AHandler {
54     explicit Puller(const sp<MediaSource> &source);
55 
56     void interruptSource();
57     status_t start(const sp<MetaData> &meta, const sp<AMessage> &notify);
58     void stop();
59     void stopSource();
60     void pause();
61     void resume();
62     status_t setStopTimeUs(int64_t stopTimeUs);
63     bool readBuffer(MediaBufferBase **buffer);
64 
65 protected:
66     virtual void onMessageReceived(const sp<AMessage> &msg);
67     virtual ~Puller();
68 
69 private:
70     enum {
71         kWhatStart = 'msta',
72         kWhatStop,
73         kWhatPull,
74         kWhatSetStopTimeUs,
75     };
76 
77     sp<MediaSource> mSource;
78     sp<AMessage> mNotify;
79     sp<ALooper> mLooper;
80     bool mIsAudio;
81 
82     struct Queue {
Queueandroid::MediaCodecSource::Puller::Queue83         Queue()
84             : mReadPendingSince(0),
85               mPaused(false),
86               mPulling(false) { }
87         int64_t mReadPendingSince;
88         bool mPaused;
89         bool mPulling;
90         Vector<MediaBufferBase *> mReadBuffers;
91 
92         void flush();
93         // if queue is empty, return false and set *|buffer| to NULL . Otherwise, pop
94         // buffer from front of the queue, place it into *|buffer| and return true.
95         bool readBuffer(MediaBufferBase **buffer);
96         // add a buffer to the back of the queue
97         void pushBuffer(MediaBufferBase *mbuf);
98     };
99     Mutexed<Queue> mQueue;
100 
101     status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
102     void schedulePull();
103     void handleEOS();
104 
105     DISALLOW_EVIL_CONSTRUCTORS(Puller);
106 };
107 
Puller(const sp<MediaSource> & source)108 MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
109     : mSource(source),
110       mLooper(new ALooper()),
111       mIsAudio(false)
112 {
113     sp<MetaData> meta = source->getFormat();
114     const char *mime;
115     CHECK(meta->findCString(kKeyMIMEType, &mime));
116 
117     mIsAudio = !strncasecmp(mime, "audio/", 6);
118 
119     mLooper->setName("pull_looper");
120 }
121 
~Puller()122 MediaCodecSource::Puller::~Puller() {
123     mLooper->unregisterHandler(id());
124     mLooper->stop();
125 }
126 
pushBuffer(MediaBufferBase * mbuf)127 void MediaCodecSource::Puller::Queue::pushBuffer(MediaBufferBase *mbuf) {
128     mReadBuffers.push_back(mbuf);
129 }
130 
readBuffer(MediaBufferBase ** mbuf)131 bool MediaCodecSource::Puller::Queue::readBuffer(MediaBufferBase **mbuf) {
132     if (mReadBuffers.empty()) {
133         *mbuf = NULL;
134         return false;
135     }
136     *mbuf = *mReadBuffers.begin();
137     mReadBuffers.erase(mReadBuffers.begin());
138     return true;
139 }
140 
flush()141 void MediaCodecSource::Puller::Queue::flush() {
142     MediaBufferBase *mbuf;
143     while (readBuffer(&mbuf)) {
144         // there are no null buffers in the queue
145         mbuf->release();
146     }
147 }
148 
readBuffer(MediaBufferBase ** mbuf)149 bool MediaCodecSource::Puller::readBuffer(MediaBufferBase **mbuf) {
150     Mutexed<Queue>::Locked queue(mQueue);
151     return queue->readBuffer(mbuf);
152 }
153 
postSynchronouslyAndReturnError(const sp<AMessage> & msg)154 status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
155         const sp<AMessage> &msg) {
156     sp<AMessage> response;
157     status_t err = msg->postAndAwaitResponse(&response);
158 
159     if (err != OK) {
160         return err;
161     }
162 
163     if (!response->findInt32("err", &err)) {
164         err = OK;
165     }
166 
167     return err;
168 }
169 
setStopTimeUs(int64_t stopTimeUs)170 status_t MediaCodecSource::Puller::setStopTimeUs(int64_t stopTimeUs) {
171     sp<AMessage> msg = new AMessage(kWhatSetStopTimeUs, this);
172     msg->setInt64("stop-time-us", stopTimeUs);
173     return postSynchronouslyAndReturnError(msg);
174 }
175 
start(const sp<MetaData> & meta,const sp<AMessage> & notify)176 status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta, const sp<AMessage> &notify) {
177     ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
178     mLooper->start(
179             false /* runOnCallingThread */,
180             false /* canCallJava */,
181             PRIORITY_AUDIO);
182     mLooper->registerHandler(this);
183     mNotify = notify;
184 
185     sp<AMessage> msg = new AMessage(kWhatStart, this);
186     msg->setObject("meta", meta);
187     return postSynchronouslyAndReturnError(msg);
188 }
189 
stop()190 void MediaCodecSource::Puller::stop() {
191     bool interrupt = false;
192     {
193         // mark stopping before actually reaching kWhatStop on the looper, so the pulling will
194         // stop.
195         Mutexed<Queue>::Locked queue(mQueue);
196         queue->mPulling = false;
197         interrupt = queue->mReadPendingSince && (queue->mReadPendingSince < ALooper::GetNowUs() - 1000000);
198         queue->flush(); // flush any unprocessed pulled buffers
199     }
200 
201     if (interrupt) {
202         interruptSource();
203     }
204 }
205 
interruptSource()206 void MediaCodecSource::Puller::interruptSource() {
207     // call source->stop if read has been pending for over a second
208     // We have to call this outside the looper as looper is pending on the read.
209     mSource->stop();
210 }
211 
stopSource()212 void MediaCodecSource::Puller::stopSource() {
213     sp<AMessage> msg = new AMessage(kWhatStop, this);
214     (void)postSynchronouslyAndReturnError(msg);
215 }
216 
pause()217 void MediaCodecSource::Puller::pause() {
218     Mutexed<Queue>::Locked queue(mQueue);
219     queue->mPaused = true;
220 }
221 
resume()222 void MediaCodecSource::Puller::resume() {
223     Mutexed<Queue>::Locked queue(mQueue);
224     queue->flush();
225     queue->mPaused = false;
226 }
227 
schedulePull()228 void MediaCodecSource::Puller::schedulePull() {
229     (new AMessage(kWhatPull, this))->post();
230 }
231 
handleEOS()232 void MediaCodecSource::Puller::handleEOS() {
233     ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
234     sp<AMessage> msg = mNotify->dup();
235     msg->setInt32("eos", 1);
236     msg->post();
237 }
238 
onMessageReceived(const sp<AMessage> & msg)239 void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
240     switch (msg->what()) {
241         case kWhatStart:
242         {
243             sp<RefBase> obj;
244             CHECK(msg->findObject("meta", &obj));
245 
246             {
247                 Mutexed<Queue>::Locked queue(mQueue);
248                 queue->mPulling = true;
249             }
250 
251             status_t err = mSource->start(static_cast<MetaData *>(obj.get()));
252 
253             if (err == OK) {
254                 schedulePull();
255             }
256 
257             sp<AMessage> response = new AMessage;
258             response->setInt32("err", err);
259 
260             sp<AReplyToken> replyID;
261             CHECK(msg->senderAwaitsResponse(&replyID));
262             response->postReply(replyID);
263             break;
264         }
265 
266         case kWhatSetStopTimeUs:
267         {
268             sp<AReplyToken> replyID;
269             CHECK(msg->senderAwaitsResponse(&replyID));
270             int64_t stopTimeUs;
271             CHECK(msg->findInt64("stop-time-us", &stopTimeUs));
272             status_t err = mSource->setStopTimeUs(stopTimeUs);
273 
274             sp<AMessage> response = new AMessage;
275             response->setInt32("err", err);
276             response->postReply(replyID);
277             break;
278         }
279 
280         case kWhatStop:
281         {
282             mSource->stop();
283 
284             sp<AMessage> response = new AMessage;
285             response->setInt32("err", OK);
286 
287             sp<AReplyToken> replyID;
288             CHECK(msg->senderAwaitsResponse(&replyID));
289             response->postReply(replyID);
290             break;
291         }
292 
293         case kWhatPull:
294         {
295             Mutexed<Queue>::Locked queue(mQueue);
296             queue->mReadPendingSince = ALooper::GetNowUs();
297             if (!queue->mPulling) {
298                 handleEOS();
299                 break;
300             }
301 
302             queue.unlock();
303             MediaBufferBase *mbuf = NULL;
304             status_t err = mSource->read(&mbuf);
305             queue.lock();
306 
307             queue->mReadPendingSince = 0;
308             // if we need to discard buffer
309             if (!queue->mPulling || queue->mPaused || err != OK) {
310                 if (mbuf != NULL) {
311                     mbuf->release();
312                     mbuf = NULL;
313                 }
314                 if (queue->mPulling && err == OK) {
315                     msg->post(); // if simply paused, keep pulling source
316                     break;
317                 } else if (err == ERROR_END_OF_STREAM) {
318                     ALOGV("stream ended, mbuf %p", mbuf);
319                 } else if (err != OK) {
320                     ALOGE("error %d reading stream.", err);
321                 }
322             }
323 
324             if (mbuf != NULL) {
325                 queue->pushBuffer(mbuf);
326             }
327 
328             queue.unlock();
329 
330             if (mbuf != NULL) {
331                 mNotify->post();
332                 msg->post();
333             } else {
334                 handleEOS();
335             }
336             break;
337         }
338 
339         default:
340             TRESPASS();
341     }
342 }
343 
Output()344 MediaCodecSource::Output::Output()
345     : mEncoderReachedEOS(false),
346       mErrorCode(OK) {
347 }
348 
349 // static
Create(const sp<ALooper> & looper,const sp<AMessage> & format,const sp<MediaSource> & source,const sp<PersistentSurface> & persistentSurface,uint32_t flags)350 sp<MediaCodecSource> MediaCodecSource::Create(
351         const sp<ALooper> &looper,
352         const sp<AMessage> &format,
353         const sp<MediaSource> &source,
354         const sp<PersistentSurface> &persistentSurface,
355         uint32_t flags) {
356     sp<MediaCodecSource> mediaSource = new MediaCodecSource(
357             looper, format, source, persistentSurface, flags);
358 
359     if (mediaSource->init() == OK) {
360         return mediaSource;
361     }
362     return NULL;
363 }
364 
setInputBufferTimeOffset(int64_t timeOffsetUs)365 status_t MediaCodecSource::setInputBufferTimeOffset(int64_t timeOffsetUs) {
366     sp<AMessage> msg = new AMessage(kWhatSetInputBufferTimeOffset, mReflector);
367     msg->setInt64(PARAMETER_KEY_OFFSET_TIME, timeOffsetUs);
368     return postSynchronouslyAndReturnError(msg);
369 }
370 
getFirstSampleSystemTimeUs()371 int64_t MediaCodecSource::getFirstSampleSystemTimeUs() {
372     sp<AMessage> msg = new AMessage(kWhatGetFirstSampleSystemTimeUs, mReflector);
373     sp<AMessage> response;
374     msg->postAndAwaitResponse(&response);
375     int64_t timeUs;
376     if (!response->findInt64("time-us", &timeUs)) {
377         timeUs = -1LL;
378     }
379     return timeUs;
380 }
381 
start(MetaData * params)382 status_t MediaCodecSource::start(MetaData* params) {
383     sp<AMessage> msg = new AMessage(kWhatStart, mReflector);
384     msg->setObject("meta", params);
385     return postSynchronouslyAndReturnError(msg);
386 }
387 
stop()388 status_t MediaCodecSource::stop() {
389     sp<AMessage> msg = new AMessage(kWhatStop, mReflector);
390     return postSynchronouslyAndReturnError(msg);
391 }
392 
393 
setStopTimeUs(int64_t stopTimeUs)394 status_t MediaCodecSource::setStopTimeUs(int64_t stopTimeUs) {
395     sp<AMessage> msg = new AMessage(kWhatSetStopTimeUs, mReflector);
396     msg->setInt64("stop-time-us", stopTimeUs);
397     return postSynchronouslyAndReturnError(msg);
398 }
399 
pause(MetaData * params)400 status_t MediaCodecSource::pause(MetaData* params) {
401     sp<AMessage> msg = new AMessage(kWhatPause, mReflector);
402     msg->setObject("meta", params);
403     msg->post();
404     return OK;
405 }
406 
getFormat()407 sp<MetaData> MediaCodecSource::getFormat() {
408     Mutexed<sp<MetaData>>::Locked meta(mMeta);
409     return *meta;
410 }
411 
getGraphicBufferProducer()412 sp<IGraphicBufferProducer> MediaCodecSource::getGraphicBufferProducer() {
413     CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
414     return mGraphicBufferProducer;
415 }
416 
read(MediaBufferBase ** buffer,const ReadOptions *)417 status_t MediaCodecSource::read(
418         MediaBufferBase** buffer, const ReadOptions* /* options */) {
419     Mutexed<Output>::Locked output(mOutput);
420 
421     *buffer = NULL;
422     while (output->mBufferQueue.size() == 0 && !output->mEncoderReachedEOS) {
423         output.waitForCondition(output->mCond);
424     }
425     if (!output->mEncoderReachedEOS) {
426         *buffer = *output->mBufferQueue.begin();
427         output->mBufferQueue.erase(output->mBufferQueue.begin());
428         return OK;
429     }
430     return output->mErrorCode;
431 }
432 
signalBufferReturned(MediaBufferBase * buffer)433 void MediaCodecSource::signalBufferReturned(MediaBufferBase *buffer) {
434     buffer->setObserver(0);
435     buffer->release();
436 }
437 
MediaCodecSource(const sp<ALooper> & looper,const sp<AMessage> & outputFormat,const sp<MediaSource> & source,const sp<PersistentSurface> & persistentSurface,uint32_t flags)438 MediaCodecSource::MediaCodecSource(
439         const sp<ALooper> &looper,
440         const sp<AMessage> &outputFormat,
441         const sp<MediaSource> &source,
442         const sp<PersistentSurface> &persistentSurface,
443         uint32_t flags)
444     : mLooper(looper),
445       mOutputFormat(outputFormat),
446       mMeta(new MetaData),
447       mFlags(flags),
448       mIsVideo(false),
449       mStarted(false),
450       mStopping(false),
451       mDoMoreWorkPending(false),
452       mSetEncoderFormat(false),
453       mEncoderFormat(0),
454       mEncoderDataSpace(0),
455       mPersistentSurface(persistentSurface),
456       mInputBufferTimeOffsetUs(0),
457       mFirstSampleSystemTimeUs(-1LL),
458       mPausePending(false),
459       mFirstSampleTimeUs(-1LL),
460       mGeneration(0) {
461     CHECK(mLooper != NULL);
462 
463     if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
464         mPuller = new Puller(source);
465     }
466 }
467 
~MediaCodecSource()468 MediaCodecSource::~MediaCodecSource() {
469     releaseEncoder();
470 
471     mCodecLooper->stop();
472     mLooper->unregisterHandler(mReflector->id());
473 }
474 
init()475 status_t MediaCodecSource::init() {
476     status_t err = initEncoder();
477 
478     if (err != OK) {
479         releaseEncoder();
480     }
481 
482     return err;
483 }
484 
initEncoder()485 status_t MediaCodecSource::initEncoder() {
486 
487     mReflector = new AHandlerReflector<MediaCodecSource>(this);
488     mLooper->registerHandler(mReflector);
489 
490     mCodecLooper = new ALooper;
491     mCodecLooper->setName("codec_looper");
492     mCodecLooper->start();
493 
494     if (mFlags & FLAG_USE_SURFACE_INPUT) {
495         mOutputFormat->setInt32(KEY_CREATE_INPUT_SURFACE_SUSPENDED, 1);
496     }
497 
498     AString outputMIME;
499     CHECK(mOutputFormat->findString("mime", &outputMIME));
500     mIsVideo = outputMIME.startsWithIgnoreCase("video/");
501 
502     AString name;
503     status_t err = NO_INIT;
504     if (mOutputFormat->findString("testing-name", &name)) {
505         mEncoder = MediaCodec::CreateByComponentName(mCodecLooper, name);
506 
507         mEncoderActivityNotify = new AMessage(kWhatEncoderActivity, mReflector);
508         mEncoder->setCallback(mEncoderActivityNotify);
509 
510         err = mEncoder->configure(
511                     mOutputFormat,
512                     NULL /* nativeWindow */,
513                     NULL /* crypto */,
514                     MediaCodec::CONFIGURE_FLAG_ENCODE);
515     } else {
516         Vector<AString> matchingCodecs;
517         MediaCodecList::findMatchingCodecs(
518                 outputMIME.c_str(), true /* encoder */,
519                 ((mFlags & FLAG_PREFER_SOFTWARE_CODEC) ? MediaCodecList::kPreferSoftwareCodecs : 0),
520                 &matchingCodecs);
521 
522         for (size_t ix = 0; ix < matchingCodecs.size(); ++ix) {
523             mEncoder = MediaCodec::CreateByComponentName(
524                     mCodecLooper, matchingCodecs[ix]);
525 
526             if (mEncoder == NULL) {
527                 continue;
528             }
529 
530             ALOGV("output format is '%s'", mOutputFormat->debugString(0).c_str());
531 
532             mEncoderActivityNotify = new AMessage(kWhatEncoderActivity, mReflector);
533             mEncoder->setCallback(mEncoderActivityNotify);
534 
535             err = mEncoder->configure(
536                         mOutputFormat,
537                         NULL /* nativeWindow */,
538                         NULL /* crypto */,
539                         MediaCodec::CONFIGURE_FLAG_ENCODE);
540 
541             if (err == OK) {
542                 break;
543             }
544             mEncoder->release();
545             mEncoder = NULL;
546         }
547     }
548 
549     if (err != OK) {
550         return err;
551     }
552 
553     mEncoder->getOutputFormat(&mOutputFormat);
554     sp<MetaData> meta = new MetaData;
555     convertMessageToMetaData(mOutputFormat, meta);
556     mMeta.lock().set(meta);
557 
558     if (mFlags & FLAG_USE_SURFACE_INPUT) {
559         CHECK(mIsVideo);
560 
561         if (mPersistentSurface != NULL) {
562             // When using persistent surface, we are only interested in the
563             // consumer, but have to use PersistentSurface as a wrapper to
564             // pass consumer over messages (similar to BufferProducerWrapper)
565             err = mEncoder->setInputSurface(mPersistentSurface);
566         } else {
567             err = mEncoder->createInputSurface(&mGraphicBufferProducer);
568         }
569 
570         if (err != OK) {
571             return err;
572         }
573     }
574 
575     sp<AMessage> inputFormat;
576     int32_t usingSwReadOften;
577     mSetEncoderFormat = false;
578     if (mEncoder->getInputFormat(&inputFormat) == OK) {
579         mSetEncoderFormat = true;
580         if (inputFormat->findInt32("using-sw-read-often", &usingSwReadOften)
581                 && usingSwReadOften) {
582             // this is a SW encoder; signal source to allocate SW readable buffers
583             mEncoderFormat = kDefaultSwVideoEncoderFormat;
584         } else {
585             mEncoderFormat = kDefaultHwVideoEncoderFormat;
586         }
587         if (!inputFormat->findInt32("android._dataspace", &mEncoderDataSpace)) {
588             mEncoderDataSpace = kDefaultVideoEncoderDataSpace;
589         }
590         ALOGV("setting dataspace %#x, format %#x", mEncoderDataSpace, mEncoderFormat);
591     }
592 
593     err = mEncoder->start();
594 
595     if (err != OK) {
596         return err;
597     }
598 
599     {
600         Mutexed<Output>::Locked output(mOutput);
601         output->mEncoderReachedEOS = false;
602         output->mErrorCode = OK;
603     }
604 
605     return OK;
606 }
607 
releaseEncoder()608 void MediaCodecSource::releaseEncoder() {
609     if (mEncoder == NULL) {
610         return;
611     }
612 
613     mEncoder->release();
614     mEncoder.clear();
615 }
616 
postSynchronouslyAndReturnError(const sp<AMessage> & msg)617 status_t MediaCodecSource::postSynchronouslyAndReturnError(
618         const sp<AMessage> &msg) {
619     sp<AMessage> response;
620     status_t err = msg->postAndAwaitResponse(&response);
621 
622     if (err != OK) {
623         return err;
624     }
625 
626     if (!response->findInt32("err", &err)) {
627         err = OK;
628     }
629 
630     return err;
631 }
632 
signalEOS(status_t err)633 void MediaCodecSource::signalEOS(status_t err) {
634     bool reachedEOS = false;
635     {
636         Mutexed<Output>::Locked output(mOutput);
637         reachedEOS = output->mEncoderReachedEOS;
638         if (!reachedEOS) {
639             ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
640             // release all unread media buffers
641             for (List<MediaBufferBase*>::iterator it = output->mBufferQueue.begin();
642                     it != output->mBufferQueue.end(); it++) {
643                 (*it)->release();
644             }
645             output->mBufferQueue.clear();
646             output->mEncoderReachedEOS = true;
647             output->mErrorCode = err;
648             if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
649                 mStopping = true;
650                 mPuller->stop();
651             }
652             output->mCond.signal();
653 
654             reachedEOS = true;
655             output.unlock();
656             releaseEncoder();
657         }
658     }
659 
660     if (mStopping && reachedEOS) {
661         ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
662         if (mPuller != NULL) {
663             mPuller->stopSource();
664         }
665         ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
666         // posting reply to everyone that's waiting
667         List<sp<AReplyToken>>::iterator it;
668         for (it = mStopReplyIDQueue.begin();
669                 it != mStopReplyIDQueue.end(); it++) {
670             (new AMessage)->postReply(*it);
671         }
672         mStopReplyIDQueue.clear();
673         mStopping = false;
674         ++mGeneration;
675     }
676 }
677 
resume(int64_t resumeStartTimeUs)678 void MediaCodecSource::resume(int64_t resumeStartTimeUs) {
679     CHECK(mFlags & FLAG_USE_SURFACE_INPUT);
680     if (mEncoder != NULL) {
681         sp<AMessage> params = new AMessage;
682         params->setInt32(PARAMETER_KEY_SUSPEND, false);
683         if (resumeStartTimeUs > 0) {
684             params->setInt64(PARAMETER_KEY_SUSPEND_TIME, resumeStartTimeUs);
685         }
686         mEncoder->setParameters(params);
687     }
688 }
689 
feedEncoderInputBuffers()690 status_t MediaCodecSource::feedEncoderInputBuffers() {
691     MediaBufferBase* mbuf = NULL;
692     while (!mAvailEncoderInputIndices.empty() && mPuller->readBuffer(&mbuf)) {
693         size_t bufferIndex = *mAvailEncoderInputIndices.begin();
694         mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());
695 
696         int64_t timeUs = 0LL;
697         uint32_t flags = 0;
698         size_t size = 0;
699 
700         if (mbuf != NULL) {
701             CHECK(mbuf->meta_data().findInt64(kKeyTime, &timeUs));
702             if (mFirstSampleSystemTimeUs < 0LL) {
703                 mFirstSampleSystemTimeUs = systemTime() / 1000;
704                 if (mPausePending) {
705                     mPausePending = false;
706                     onPause(mFirstSampleSystemTimeUs);
707                     mbuf->release();
708                     mAvailEncoderInputIndices.push_back(bufferIndex);
709                     return OK;
710                 }
711             }
712 
713             timeUs += mInputBufferTimeOffsetUs;
714 
715             // push decoding time for video, or drift time for audio
716             if (mIsVideo) {
717                 mDecodingTimeQueue.push_back(timeUs);
718             } else {
719 #if DEBUG_DRIFT_TIME
720                 if (mFirstSampleTimeUs < 0ll) {
721                     mFirstSampleTimeUs = timeUs;
722                 }
723                 int64_t driftTimeUs = 0;
724                 if (mbuf->meta_data().findInt64(kKeyDriftTime, &driftTimeUs)
725                         && driftTimeUs) {
726                     driftTimeUs = timeUs - mFirstSampleTimeUs - driftTimeUs;
727                 }
728                 mDriftTimeQueue.push_back(driftTimeUs);
729 #endif // DEBUG_DRIFT_TIME
730             }
731 
732             sp<MediaCodecBuffer> inbuf;
733             status_t err = mEncoder->getInputBuffer(bufferIndex, &inbuf);
734 
735             if (err != OK || inbuf == NULL || inbuf->data() == NULL
736                     || mbuf->data() == NULL || mbuf->size() == 0) {
737                 mbuf->release();
738                 signalEOS();
739                 break;
740             }
741 
742             size = mbuf->size();
743 
744             memcpy(inbuf->data(), mbuf->data(), size);
745 
746             if (mIsVideo) {
747                 // video encoder will release MediaBuffer when done
748                 // with underlying data.
749                 inbuf->meta()->setObject("mediaBufferHolder", new MediaBufferHolder(mbuf));
750                 mbuf->release();
751             } else {
752                 mbuf->release();
753             }
754         } else {
755             flags = MediaCodec::BUFFER_FLAG_EOS;
756         }
757 
758         status_t err = mEncoder->queueInputBuffer(
759                 bufferIndex, 0, size, timeUs, flags);
760 
761         if (err != OK) {
762             return err;
763         }
764     }
765 
766     return OK;
767 }
768 
onStart(MetaData * params)769 status_t MediaCodecSource::onStart(MetaData *params) {
770     if (mStopping || mOutput.lock()->mEncoderReachedEOS) {
771         ALOGE("Failed to start while we're stopping or encoder already stopped due to EOS error");
772         return INVALID_OPERATION;
773     }
774     int64_t startTimeUs;
775     if (params == NULL || !params->findInt64(kKeyTime, &startTimeUs)) {
776         startTimeUs = -1LL;
777     }
778 
779     if (mStarted) {
780         ALOGI("MediaCodecSource (%s) resuming", mIsVideo ? "video" : "audio");
781         if (mPausePending) {
782             mPausePending = false;
783             return OK;
784         }
785         if (mIsVideo) {
786             mEncoder->requestIDRFrame();
787         }
788         if (mFlags & FLAG_USE_SURFACE_INPUT) {
789             resume(startTimeUs);
790         } else {
791             CHECK(mPuller != NULL);
792             mPuller->resume();
793         }
794         return OK;
795     }
796 
797     ALOGI("MediaCodecSource (%s) starting", mIsVideo ? "video" : "audio");
798 
799     status_t err = OK;
800 
801     if (mFlags & FLAG_USE_SURFACE_INPUT) {
802         if (mEncoder != NULL) {
803             sp<AMessage> params = new AMessage;
804             params->setInt32(PARAMETER_KEY_SUSPEND, false);
805             if (startTimeUs >= 0) {
806                 params->setInt64("skip-frames-before", startTimeUs);
807             }
808             mEncoder->setParameters(params);
809         }
810     } else {
811         CHECK(mPuller != NULL);
812         sp<MetaData> meta = params;
813         if (mSetEncoderFormat) {
814             if (meta == NULL) {
815                 meta = new MetaData;
816             }
817             meta->setInt32(kKeyPixelFormat, mEncoderFormat);
818             meta->setInt32(kKeyColorSpace, mEncoderDataSpace);
819         }
820 
821         sp<AMessage> notify = new AMessage(kWhatPullerNotify, mReflector);
822         err = mPuller->start(meta.get(), notify);
823         if (err != OK) {
824             return err;
825         }
826     }
827 
828     ALOGI("MediaCodecSource (%s) started", mIsVideo ? "video" : "audio");
829 
830     mStarted = true;
831     return OK;
832 }
833 
onPause(int64_t pauseStartTimeUs)834 void MediaCodecSource::onPause(int64_t pauseStartTimeUs) {
835     if (mStopping || mOutput.lock()->mEncoderReachedEOS) {
836         // Nothing to do
837     } else if ((mFlags & FLAG_USE_SURFACE_INPUT) && (mEncoder != NULL)) {
838         sp<AMessage> params = new AMessage;
839         params->setInt32(PARAMETER_KEY_SUSPEND, true);
840         params->setInt64(PARAMETER_KEY_SUSPEND_TIME, pauseStartTimeUs);
841         mEncoder->setParameters(params);
842     } else {
843         CHECK(mPuller != NULL);
844         mPuller->pause();
845     }
846 }
847 
onMessageReceived(const sp<AMessage> & msg)848 void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
849     switch (msg->what()) {
850     case kWhatPullerNotify:
851     {
852         int32_t eos = 0;
853         if (msg->findInt32("eos", &eos) && eos) {
854             ALOGV("puller (%s) reached EOS", mIsVideo ? "video" : "audio");
855             signalEOS();
856             break;
857         }
858 
859         if (mEncoder == NULL) {
860             ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str());
861             break;
862         }
863 
864         feedEncoderInputBuffers();
865         break;
866     }
867     case kWhatEncoderActivity:
868     {
869         if (mEncoder == NULL) {
870             break;
871         }
872 
873         int32_t cbID;
874         CHECK(msg->findInt32("callbackID", &cbID));
875         if (cbID == MediaCodec::CB_INPUT_AVAILABLE) {
876             int32_t index;
877             CHECK(msg->findInt32("index", &index));
878 
879             mAvailEncoderInputIndices.push_back(index);
880             feedEncoderInputBuffers();
881         } else if (cbID == MediaCodec::CB_OUTPUT_FORMAT_CHANGED) {
882             status_t err = mEncoder->getOutputFormat(&mOutputFormat);
883             if (err != OK) {
884                 signalEOS(err);
885                 break;
886             }
887             sp<MetaData> meta = new MetaData;
888             convertMessageToMetaData(mOutputFormat, meta);
889             mMeta.lock().set(meta);
890         } else if (cbID == MediaCodec::CB_OUTPUT_AVAILABLE) {
891             int32_t index;
892             size_t offset;
893             size_t size;
894             int64_t timeUs;
895             int32_t flags;
896 
897             CHECK(msg->findInt32("index", &index));
898             CHECK(msg->findSize("offset", &offset));
899             CHECK(msg->findSize("size", &size));
900             CHECK(msg->findInt64("timeUs", &timeUs));
901             CHECK(msg->findInt32("flags", &flags));
902 
903             if (flags & MediaCodec::BUFFER_FLAG_EOS) {
904                 mEncoder->releaseOutputBuffer(index);
905                 signalEOS();
906                 break;
907             }
908 
909             sp<MediaCodecBuffer> outbuf;
910             status_t err = mEncoder->getOutputBuffer(index, &outbuf);
911             if (err != OK || outbuf == NULL || outbuf->data() == NULL
912                 || outbuf->size() == 0) {
913                 signalEOS();
914                 break;
915             }
916 
917             MediaBufferBase *mbuf = new MediaBuffer(outbuf->size());
918             mbuf->setObserver(this);
919             mbuf->add_ref();
920 
921             if (!(flags & MediaCodec::BUFFER_FLAG_CODECCONFIG)) {
922                 if (mIsVideo) {
923                     int64_t decodingTimeUs;
924                     if (mFlags & FLAG_USE_SURFACE_INPUT) {
925                         if (mFirstSampleSystemTimeUs < 0LL) {
926                             mFirstSampleSystemTimeUs = systemTime() / 1000;
927                             if (mPausePending) {
928                                 mPausePending = false;
929                                 onPause(mFirstSampleSystemTimeUs);
930                                 mbuf->release();
931                                 break;
932                             }
933                         }
934                         // Timestamp offset is already adjusted in GraphicBufferSource.
935                         // GraphicBufferSource is supposed to discard samples
936                         // queued before start, and offset timeUs by start time
937                         CHECK_GE(timeUs, 0LL);
938                         // TODO:
939                         // Decoding time for surface source is unavailable,
940                         // use presentation time for now. May need to move
941                         // this logic into MediaCodec.
942                         decodingTimeUs = timeUs;
943                     } else {
944                         CHECK(!mDecodingTimeQueue.empty());
945                         decodingTimeUs = *(mDecodingTimeQueue.begin());
946                         mDecodingTimeQueue.erase(mDecodingTimeQueue.begin());
947                     }
948                     mbuf->meta_data().setInt64(kKeyDecodingTime, decodingTimeUs);
949 
950                     ALOGV("[video] time %" PRId64 " us (%.2f secs), dts/pts diff %" PRId64,
951                             timeUs, timeUs / 1E6, decodingTimeUs - timeUs);
952                 } else {
953                     int64_t driftTimeUs = 0;
954 #if DEBUG_DRIFT_TIME
955                     CHECK(!mDriftTimeQueue.empty());
956                     driftTimeUs = *(mDriftTimeQueue.begin());
957                     mDriftTimeQueue.erase(mDriftTimeQueue.begin());
958                     mbuf->meta_data().setInt64(kKeyDriftTime, driftTimeUs);
959 #endif // DEBUG_DRIFT_TIME
960                     ALOGV("[audio] time %" PRId64 " us (%.2f secs), drift %" PRId64,
961                             timeUs, timeUs / 1E6, driftTimeUs);
962                 }
963                 mbuf->meta_data().setInt64(kKeyTime, timeUs);
964             } else {
965                 mbuf->meta_data().setInt64(kKeyTime, 0LL);
966                 mbuf->meta_data().setInt32(kKeyIsCodecConfig, true);
967             }
968             if (flags & MediaCodec::BUFFER_FLAG_SYNCFRAME) {
969                 mbuf->meta_data().setInt32(kKeyIsSyncFrame, true);
970             }
971             memcpy(mbuf->data(), outbuf->data(), outbuf->size());
972 
973             {
974                 Mutexed<Output>::Locked output(mOutput);
975                 output->mBufferQueue.push_back(mbuf);
976                 output->mCond.signal();
977             }
978 
979             mEncoder->releaseOutputBuffer(index);
980        } else if (cbID == MediaCodec::CB_ERROR) {
981             status_t err;
982             CHECK(msg->findInt32("err", &err));
983             ALOGE("Encoder (%s) reported error : 0x%x",
984                     mIsVideo ? "video" : "audio", err);
985             if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
986                 mStopping = true;
987                 mPuller->stop();
988             }
989             signalEOS();
990        }
991        break;
992     }
993     case kWhatStart:
994     {
995         sp<AReplyToken> replyID;
996         CHECK(msg->senderAwaitsResponse(&replyID));
997 
998         sp<RefBase> obj;
999         CHECK(msg->findObject("meta", &obj));
1000         MetaData *params = static_cast<MetaData *>(obj.get());
1001 
1002         sp<AMessage> response = new AMessage;
1003         response->setInt32("err", onStart(params));
1004         response->postReply(replyID);
1005         break;
1006     }
1007     case kWhatStop:
1008     {
1009         ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio");
1010 
1011         sp<AReplyToken> replyID;
1012         CHECK(msg->senderAwaitsResponse(&replyID));
1013 
1014         if (mOutput.lock()->mEncoderReachedEOS) {
1015             // if we already reached EOS, reply and return now
1016             ALOGI("encoder (%s) already stopped",
1017                     mIsVideo ? "video" : "audio");
1018             (new AMessage)->postReply(replyID);
1019             break;
1020         }
1021 
1022         mStopReplyIDQueue.push_back(replyID);
1023         if (mStopping) {
1024             // nothing to do if we're already stopping, reply will be posted
1025             // to all when we're stopped.
1026             break;
1027         }
1028 
1029         mStopping = true;
1030 
1031         int64_t timeoutUs = kStopTimeoutUs;
1032         // if using surface, signal source EOS and wait for EOS to come back.
1033         // otherwise, stop puller (which also clears the input buffer queue)
1034         // and wait for the EOS message. We cannot call source->stop() because
1035         // the encoder may still be processing input buffers.
1036         if (mFlags & FLAG_USE_SURFACE_INPUT) {
1037             mEncoder->signalEndOfInputStream();
1038             // Increase the timeout if there is delay in the GraphicBufferSource
1039             sp<AMessage> inputFormat;
1040             int64_t stopTimeOffsetUs;
1041             if (mEncoder->getInputFormat(&inputFormat) == OK &&
1042                     inputFormat->findInt64("android._stop-time-offset-us", &stopTimeOffsetUs) &&
1043                     stopTimeOffsetUs > 0) {
1044                 if (stopTimeOffsetUs > kMaxStopTimeOffsetUs) {
1045                     ALOGW("Source stopTimeOffsetUs %lld too large, limit at %lld us",
1046                         (long long)stopTimeOffsetUs, (long long)kMaxStopTimeOffsetUs);
1047                     stopTimeOffsetUs = kMaxStopTimeOffsetUs;
1048                 }
1049                 timeoutUs += stopTimeOffsetUs;
1050             } else {
1051                 // Use kMaxStopTimeOffsetUs if stop time offset is not provided by input source
1052                 timeoutUs = kMaxStopTimeOffsetUs;
1053             }
1054         } else {
1055             mPuller->stop();
1056         }
1057 
1058         // complete stop even if encoder/puller stalled
1059         sp<AMessage> timeoutMsg = new AMessage(kWhatStopStalled, mReflector);
1060         timeoutMsg->setInt32("generation", mGeneration);
1061         timeoutMsg->post(timeoutUs);
1062         break;
1063     }
1064 
1065     case kWhatStopStalled:
1066     {
1067         int32_t generation;
1068         CHECK(msg->findInt32("generation", &generation));
1069         if (generation != mGeneration) {
1070              break;
1071         }
1072 
1073         if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
1074             ALOGV("source (%s) stopping", mIsVideo ? "video" : "audio");
1075             mPuller->interruptSource();
1076             ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
1077         }
1078         signalEOS();
1079         break;
1080     }
1081 
1082     case kWhatPause:
1083     {
1084         if (mFirstSampleSystemTimeUs < 0) {
1085             mPausePending = true;
1086         } else {
1087             sp<RefBase> obj;
1088             CHECK(msg->findObject("meta", &obj));
1089             MetaData *params = static_cast<MetaData *>(obj.get());
1090             int64_t pauseStartTimeUs = -1;
1091             if (params == NULL || !params->findInt64(kKeyTime, &pauseStartTimeUs)) {
1092                 pauseStartTimeUs = -1LL;
1093             }
1094             onPause(pauseStartTimeUs);
1095         }
1096         break;
1097     }
1098     case kWhatSetInputBufferTimeOffset:
1099     {
1100         sp<AReplyToken> replyID;
1101         CHECK(msg->senderAwaitsResponse(&replyID));
1102         status_t err = OK;
1103         CHECK(msg->findInt64(PARAMETER_KEY_OFFSET_TIME, &mInputBufferTimeOffsetUs));
1104 
1105         // Propagate the timestamp offset to GraphicBufferSource.
1106         if (mFlags & FLAG_USE_SURFACE_INPUT) {
1107             sp<AMessage> params = new AMessage;
1108             params->setInt64(PARAMETER_KEY_OFFSET_TIME, mInputBufferTimeOffsetUs);
1109             err = mEncoder->setParameters(params);
1110         }
1111 
1112         sp<AMessage> response = new AMessage;
1113         response->setInt32("err", err);
1114         response->postReply(replyID);
1115         break;
1116     }
1117     case kWhatSetStopTimeUs:
1118     {
1119         sp<AReplyToken> replyID;
1120         CHECK(msg->senderAwaitsResponse(&replyID));
1121         status_t err = OK;
1122         int64_t stopTimeUs;
1123         CHECK(msg->findInt64("stop-time-us", &stopTimeUs));
1124 
1125         // Propagate the stop time to GraphicBufferSource.
1126         if (mFlags & FLAG_USE_SURFACE_INPUT) {
1127             sp<AMessage> params = new AMessage;
1128             params->setInt64("stop-time-us", stopTimeUs);
1129             err = mEncoder->setParameters(params);
1130         } else {
1131             err = mPuller->setStopTimeUs(stopTimeUs);
1132         }
1133 
1134         sp<AMessage> response = new AMessage;
1135         response->setInt32("err", err);
1136         response->postReply(replyID);
1137         break;
1138     }
1139     case kWhatGetFirstSampleSystemTimeUs:
1140     {
1141         sp<AReplyToken> replyID;
1142         CHECK(msg->senderAwaitsResponse(&replyID));
1143 
1144         sp<AMessage> response = new AMessage;
1145         response->setInt64("time-us", mFirstSampleSystemTimeUs);
1146         response->postReply(replyID);
1147         break;
1148     }
1149     default:
1150         TRESPASS();
1151     }
1152 }
1153 
1154 } // namespace android
1155