1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20 
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23 
24 #include <inttypes.h>
25 
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30 
31 #define DEBUG 0
32 #if DEBUG
33 #define DDD(...) ALOGD(__VA_ARGS__)
34 #else
35 #define DDD(...) ((void)0)
36 #endif
37 
38 namespace android {
39 
pop_front()40 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
41     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
42     mQueue.pop_front();
43     return work;
44 }
45 
push_back(std::unique_ptr<C2Work> work)46 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
47     mQueue.push_back({std::move(work), NO_DRAIN});
48 }
49 
empty() const50 bool SimpleC2Component::WorkQueue::empty() const { return mQueue.empty(); }
51 
clear()52 void SimpleC2Component::WorkQueue::clear() { mQueue.clear(); }
53 
drainMode() const54 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
55     return mQueue.front().drainMode;
56 }
57 
markDrain(uint32_t drainMode)58 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
59     mQueue.push_back({nullptr, drainMode});
60 }
61 
62 ////////////////////////////////////////////////////////////////////////////////
63 
WorkHandler()64 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
65 
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)66 void SimpleC2Component::WorkHandler::setComponent(
67     const std::shared_ptr<SimpleC2Component> &thiz) {
68     mThiz = thiz;
69 }
70 
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)71 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
72     sp<AReplyToken> replyId;
73     CHECK(msg->senderAwaitsResponse(&replyId));
74     sp<AMessage> reply = new AMessage;
75     if (err) {
76         reply->setInt32("err", *err);
77     }
78     reply->postReply(replyId);
79 }
80 
onMessageReceived(const sp<AMessage> & msg)81 void SimpleC2Component::WorkHandler::onMessageReceived(
82     const sp<AMessage> &msg) {
83     std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
84     if (!thiz) {
85         ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
86         sp<AReplyToken> replyId;
87         if (msg->senderAwaitsResponse(&replyId)) {
88             sp<AMessage> reply = new AMessage;
89             reply->setInt32("err", C2_CORRUPTED);
90             reply->postReply(replyId);
91         }
92         return;
93     }
94 
95     switch (msg->what()) {
96     case kWhatProcess: {
97         if (mRunning) {
98             if (thiz->processQueue()) {
99                 (new AMessage(kWhatProcess, this))->post();
100             }
101         } else {
102             DDD("Ignore process message as we're not running");
103         }
104         break;
105     }
106     case kWhatInit: {
107         int32_t err = thiz->onInit();
108         Reply(msg, &err);
109         [[fallthrough]];
110     }
111     case kWhatStart: {
112         mRunning = true;
113         break;
114     }
115     case kWhatStop: {
116         int32_t err = thiz->onStop();
117         thiz->mOutputBlockPool.reset();
118         Reply(msg, &err);
119         break;
120     }
121     case kWhatReset: {
122         thiz->onReset();
123         thiz->mOutputBlockPool.reset();
124         mRunning = false;
125         Reply(msg);
126         break;
127     }
128     case kWhatRelease: {
129         thiz->onRelease();
130         thiz->mOutputBlockPool.reset();
131         mRunning = false;
132         Reply(msg);
133         break;
134     }
135     default: {
136         ALOGD("Unrecognized msg: %d", msg->what());
137         break;
138     }
139     }
140 }
141 
142 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
143   public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)144     BlockingBlockPool(const std::shared_ptr<C2BlockPool> &base) : mBase{base} {}
145 
getLocalId() const146     virtual local_id_t getLocalId() const override {
147         return mBase->getLocalId();
148     }
149 
getAllocatorId() const150     virtual C2Allocator::id_t getAllocatorId() const override {
151         return mBase->getAllocatorId();
152     }
153 
154     virtual c2_status_t
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)155     fetchLinearBlock(uint32_t capacity, C2MemoryUsage usage,
156                      std::shared_ptr<C2LinearBlock> *block) {
157         c2_status_t status;
158         do {
159             status = mBase->fetchLinearBlock(capacity, usage, block);
160         } while (status == C2_BLOCKING);
161         return status;
162     }
163 
164     virtual c2_status_t
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)165     fetchCircularBlock(uint32_t capacity, C2MemoryUsage usage,
166                        std::shared_ptr<C2CircularBlock> *block) {
167         c2_status_t status;
168         do {
169             status = mBase->fetchCircularBlock(capacity, usage, block);
170         } while (status == C2_BLOCKING);
171         return status;
172     }
173 
174     virtual c2_status_t
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)175     fetchGraphicBlock(uint32_t width, uint32_t height, uint32_t format,
176                       C2MemoryUsage usage,
177                       std::shared_ptr<C2GraphicBlock> *block) {
178         c2_status_t status;
179         do {
180             status =
181                 mBase->fetchGraphicBlock(width, height, format, usage, block);
182         } while (status == C2_BLOCKING);
183         return status;
184     }
185 
186   private:
187     std::shared_ptr<C2BlockPool> mBase;
188 };
189 
190 ////////////////////////////////////////////////////////////////////////////////
191 
192 namespace {
193 
194 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anond85038380111::DummyReadView195     DummyReadView() : C2ReadView(C2_NO_INIT) {}
196 };
197 
198 } // namespace
199 
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)200 SimpleC2Component::SimpleC2Component(
201     const std::shared_ptr<C2ComponentInterface> &intf)
202     : mDummyReadView(DummyReadView()), mIntf(intf), mLooper(new ALooper),
203       mHandler(new WorkHandler) {
204     mLooper->setName(intf->getName().c_str());
205     (void)mLooper->registerHandler(mHandler);
206     mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
207 }
208 
~SimpleC2Component()209 SimpleC2Component::~SimpleC2Component() {
210     mLooper->unregisterHandler(mHandler->id());
211     (void)mLooper->stop();
212 }
213 
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)214 c2_status_t SimpleC2Component::setListener_vb(
215     const std::shared_ptr<C2Component::Listener> &listener,
216     c2_blocking_t mayBlock) {
217     mHandler->setComponent(shared_from_this());
218 
219     Mutexed<ExecState>::Locked state(mExecState);
220     if (state->mState == RUNNING) {
221         if (listener) {
222             return C2_BAD_STATE;
223         } else if (!mayBlock) {
224             return C2_BLOCKING;
225         }
226     }
227     state->mListener = listener;
228     // TODO: wait for listener change to have taken place before returning
229     // (e.g. if there is an ongoing listener callback)
230     return C2_OK;
231 }
232 
233 c2_status_t
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)234 SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> *const items) {
235     {
236         Mutexed<ExecState>::Locked state(mExecState);
237         if (state->mState != RUNNING) {
238             return C2_BAD_STATE;
239         }
240     }
241     bool queueWasEmpty = false;
242     {
243         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
244         queueWasEmpty = queue->empty();
245         while (!items->empty()) {
246             queue->push_back(std::move(items->front()));
247             items->pop_front();
248         }
249     }
250     if (queueWasEmpty) {
251         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
252     }
253     return C2_OK;
254 }
255 
256 c2_status_t
announce_nb(const std::vector<C2WorkOutline> & items)257 SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
258     (void)items;
259     return C2_OMITTED;
260 }
261 
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)262 c2_status_t SimpleC2Component::flush_sm(
263     flush_mode_t flushMode,
264     std::list<std::unique_ptr<C2Work>> *const flushedWork) {
265     (void)flushMode;
266     {
267         Mutexed<ExecState>::Locked state(mExecState);
268         if (state->mState != RUNNING) {
269             return C2_BAD_STATE;
270         }
271     }
272     {
273         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
274         queue->incGeneration();
275         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
276         while (!queue->empty()) {
277             std::unique_ptr<C2Work> work = queue->pop_front();
278             if (work) {
279                 flushedWork->push_back(std::move(work));
280             }
281         }
282         while (!queue->pending().empty()) {
283             flushedWork->push_back(std::move(queue->pending().begin()->second));
284             queue->pending().erase(queue->pending().begin());
285         }
286     }
287 
288     return C2_OK;
289 }
290 
drain_nb(drain_mode_t drainMode)291 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
292     if (drainMode == DRAIN_CHAIN) {
293         return C2_OMITTED;
294     }
295     {
296         Mutexed<ExecState>::Locked state(mExecState);
297         if (state->mState != RUNNING) {
298             return C2_BAD_STATE;
299         }
300     }
301     bool queueWasEmpty = false;
302     {
303         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
304         queueWasEmpty = queue->empty();
305         queue->markDrain(drainMode);
306     }
307     if (queueWasEmpty) {
308         (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
309     }
310 
311     return C2_OK;
312 }
313 
start()314 c2_status_t SimpleC2Component::start() {
315     Mutexed<ExecState>::Locked state(mExecState);
316     if (state->mState == RUNNING) {
317         return C2_BAD_STATE;
318     }
319     bool needsInit = (state->mState == UNINITIALIZED);
320     state.unlock();
321     if (needsInit) {
322         sp<AMessage> reply;
323         (new AMessage(WorkHandler::kWhatInit, mHandler))
324             ->postAndAwaitResponse(&reply);
325         int32_t err;
326         CHECK(reply->findInt32("err", &err));
327         if (err != C2_OK) {
328             return (c2_status_t)err;
329         }
330     } else {
331         (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
332     }
333     state.lock();
334     state->mState = RUNNING;
335     return C2_OK;
336 }
337 
stop()338 c2_status_t SimpleC2Component::stop() {
339     DDD("stop");
340     {
341         Mutexed<ExecState>::Locked state(mExecState);
342         if (state->mState != RUNNING) {
343             return C2_BAD_STATE;
344         }
345         state->mState = STOPPED;
346     }
347     {
348         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
349         queue->clear();
350         queue->pending().clear();
351     }
352     sp<AMessage> reply;
353     (new AMessage(WorkHandler::kWhatStop, mHandler))
354         ->postAndAwaitResponse(&reply);
355     int32_t err;
356     CHECK(reply->findInt32("err", &err));
357     if (err != C2_OK) {
358         return (c2_status_t)err;
359     }
360     return C2_OK;
361 }
362 
reset()363 c2_status_t SimpleC2Component::reset() {
364     DDD("reset");
365     {
366         Mutexed<ExecState>::Locked state(mExecState);
367         state->mState = UNINITIALIZED;
368     }
369     {
370         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
371         queue->clear();
372         queue->pending().clear();
373     }
374     sp<AMessage> reply;
375     (new AMessage(WorkHandler::kWhatReset, mHandler))
376         ->postAndAwaitResponse(&reply);
377     return C2_OK;
378 }
379 
release()380 c2_status_t SimpleC2Component::release() {
381     DDD("release");
382     sp<AMessage> reply;
383     (new AMessage(WorkHandler::kWhatRelease, mHandler))
384         ->postAndAwaitResponse(&reply);
385     return C2_OK;
386 }
387 
intf()388 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
389     return mIntf;
390 }
391 
392 namespace {
393 
vec(std::unique_ptr<C2Work> & work)394 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
395     std::list<std::unique_ptr<C2Work>> ret;
396     ret.push_back(std::move(work));
397     return ret;
398 }
399 
400 } // namespace
401 
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)402 void SimpleC2Component::finish(
403     uint64_t frameIndex,
404     std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
405     std::unique_ptr<C2Work> work;
406     {
407         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
408         if (queue->pending().count(frameIndex) == 0) {
409             ALOGW("unknown frame index: %" PRIu64, frameIndex);
410             return;
411         }
412         work = std::move(queue->pending().at(frameIndex));
413         queue->pending().erase(frameIndex);
414     }
415     if (work) {
416         fillWork(work);
417         std::shared_ptr<C2Component::Listener> listener =
418             mExecState.lock()->mListener;
419         listener->onWorkDone_nb(shared_from_this(), vec(work));
420         DDD("returning pending work");
421     }
422 }
423 
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)424 void SimpleC2Component::cloneAndSend(
425     uint64_t frameIndex, const std::unique_ptr<C2Work> &currentWork,
426     std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
427     std::unique_ptr<C2Work> work(new C2Work);
428     if (currentWork->input.ordinal.frameIndex == frameIndex) {
429         work->input.flags = currentWork->input.flags;
430         work->input.ordinal = currentWork->input.ordinal;
431     } else {
432         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
433         if (queue->pending().count(frameIndex) == 0) {
434             ALOGW("unknown frame index: %" PRIu64, frameIndex);
435             return;
436         }
437         work->input.flags = queue->pending().at(frameIndex)->input.flags;
438         work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
439     }
440     work->worklets.emplace_back(new C2Worklet);
441     if (work) {
442         fillWork(work);
443         std::shared_ptr<C2Component::Listener> listener =
444             mExecState.lock()->mListener;
445         listener->onWorkDone_nb(shared_from_this(), vec(work));
446         DDD("cloned and sending work");
447     }
448 }
449 
processQueue()450 bool SimpleC2Component::processQueue() {
451     std::unique_ptr<C2Work> work;
452     uint64_t generation;
453     int32_t drainMode;
454     bool isFlushPending = false;
455     bool hasQueuedWork = false;
456     {
457         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
458         if (queue->empty()) {
459             return false;
460         }
461 
462         generation = queue->generation();
463         drainMode = queue->drainMode();
464         isFlushPending = queue->popPendingFlush();
465         work = queue->pop_front();
466         hasQueuedWork = !queue->empty();
467     }
468     if (isFlushPending) {
469         DDD("processing pending flush");
470         c2_status_t err = onFlush_sm();
471         if (err != C2_OK) {
472             ALOGD("flush err: %d", err);
473             // TODO: error
474         }
475     }
476 
477     if (!mOutputBlockPool) {
478         c2_status_t err = [this] {
479             // TODO: don't use query_vb
480             C2StreamBufferTypeSetting::output outputFormat(0u);
481             std::vector<std::unique_ptr<C2Param>> params;
482             c2_status_t err = intf()->query_vb(
483                 {&outputFormat}, {C2PortBlockPoolsTuning::output::PARAM_TYPE},
484                 C2_DONT_BLOCK, &params);
485             if (err != C2_OK && err != C2_BAD_INDEX) {
486                 ALOGD("query err = %d", err);
487                 return err;
488             }
489             C2BlockPool::local_id_t poolId =
490                 outputFormat.value == C2BufferData::GRAPHIC
491                     ? C2BlockPool::BASIC_GRAPHIC
492                     : C2BlockPool::BASIC_LINEAR;
493             if (params.size()) {
494                 C2PortBlockPoolsTuning::output *outputPools =
495                     C2PortBlockPoolsTuning::output::From(params[0].get());
496                 if (outputPools && outputPools->flexCount() >= 1) {
497                     poolId = outputPools->m.values[0];
498                 }
499             }
500 
501             std::shared_ptr<C2BlockPool> blockPool;
502             err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
503             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
504                   (unsigned long long)poolId,
505                   (unsigned long long)(blockPool ? blockPool->getLocalId()
506                                                  : 111000111),
507                   err);
508             if (err == C2_OK) {
509                 mOutputBlockPool =
510                     std::make_shared<BlockingBlockPool>(blockPool);
511             }
512             return err;
513         }();
514         if (err != C2_OK) {
515             Mutexed<ExecState>::Locked state(mExecState);
516             std::shared_ptr<C2Component::Listener> listener = state->mListener;
517             state.unlock();
518             listener->onError_nb(shared_from_this(), err);
519             return hasQueuedWork;
520         }
521     }
522 
523     if (!work) {
524         c2_status_t err = drain(drainMode, mOutputBlockPool);
525         if (err != C2_OK) {
526             Mutexed<ExecState>::Locked state(mExecState);
527             std::shared_ptr<C2Component::Listener> listener = state->mListener;
528             state.unlock();
529             listener->onError_nb(shared_from_this(), err);
530         }
531         return hasQueuedWork;
532     }
533 
534     {
535         std::vector<C2Param *> updates;
536         for (const std::unique_ptr<C2Param> &param : work->input.configUpdate) {
537             if (param) {
538                 updates.emplace_back(param.get());
539             }
540         }
541         if (!updates.empty()) {
542             std::vector<std::unique_ptr<C2SettingResult>> failures;
543             c2_status_t err =
544                 intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
545             ALOGD("applied %zu configUpdates => %s (%d)", updates.size(),
546                   asString(err), err);
547         }
548     }
549 
550     DDD("start processing frame #%" PRIu64,
551         work->input.ordinal.frameIndex.peeku());
552     // If input buffer list is not empty, it means we have some input to process
553     // on. However, input could be a null buffer. In such case, clear the buffer
554     // list before making call to process().
555     if (!work->input.buffers.empty() && !work->input.buffers[0]) {
556         ALOGD("Encountered null input buffer. Clearing the input buffer");
557         work->input.buffers.clear();
558     }
559     process(work, mOutputBlockPool);
560     DDD("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
561     Mutexed<WorkQueue>::Locked queue(mWorkQueue);
562     if (queue->generation() != generation) {
563         ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
564               queue->generation(), generation);
565         work->result = C2_NOT_FOUND;
566         queue.unlock();
567 
568         Mutexed<ExecState>::Locked state(mExecState);
569         std::shared_ptr<C2Component::Listener> listener = state->mListener;
570         state.unlock();
571         listener->onWorkDone_nb(shared_from_this(), vec(work));
572         return hasQueuedWork;
573     }
574     if (work->workletsProcessed != 0u) {
575         queue.unlock();
576         Mutexed<ExecState>::Locked state(mExecState);
577         DDD("returning this work");
578         std::shared_ptr<C2Component::Listener> listener = state->mListener;
579         state.unlock();
580         listener->onWorkDone_nb(shared_from_this(), vec(work));
581     } else {
582         work->input.buffers.clear();
583         std::unique_ptr<C2Work> unexpected;
584 
585         uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
586         DDD("queue pending work %" PRIu64, frameIndex);
587         if (queue->pending().count(frameIndex) != 0) {
588             unexpected = std::move(queue->pending().at(frameIndex));
589             queue->pending().erase(frameIndex);
590         }
591         (void)queue->pending().insert({frameIndex, std::move(work)});
592 
593         queue.unlock();
594         if (unexpected) {
595             ALOGD("unexpected pending work");
596             unexpected->result = C2_CORRUPTED;
597             Mutexed<ExecState>::Locked state(mExecState);
598             std::shared_ptr<C2Component::Listener> listener = state->mListener;
599             state.unlock();
600             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
601         }
602     }
603     return hasQueuedWork;
604 }
605 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)606 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
607     const std::shared_ptr<C2LinearBlock> &block) {
608     return createLinearBuffer(block, block->offset(), block->size());
609 }
610 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)611 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
612     const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
613     return C2Buffer::CreateLinearBuffer(
614         block->share(offset, size, ::C2Fence()));
615 }
616 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)617 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
618     const std::shared_ptr<C2GraphicBlock> &block) {
619     return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
620 }
621 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)622 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
623     const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
624     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
625 }
626 
627 } // namespace android
628