1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20
21 #include <cutils/properties.h>
22 #include <media/stagefright/foundation/AMessage.h>
23
24 #include <inttypes.h>
25
26 #include <C2Config.h>
27 #include <C2Debug.h>
28 #include <C2PlatformSupport.h>
29 #include <SimpleC2Component.h>
30
31 #define DEBUG 0
32 #if DEBUG
33 #define DDD(...) ALOGD(__VA_ARGS__)
34 #else
35 #define DDD(...) ((void)0)
36 #endif
37
38 namespace android {
39
pop_front()40 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
41 std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
42 mQueue.pop_front();
43 return work;
44 }
45
push_back(std::unique_ptr<C2Work> work)46 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
47 mQueue.push_back({std::move(work), NO_DRAIN});
48 }
49
empty() const50 bool SimpleC2Component::WorkQueue::empty() const { return mQueue.empty(); }
51
clear()52 void SimpleC2Component::WorkQueue::clear() { mQueue.clear(); }
53
drainMode() const54 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
55 return mQueue.front().drainMode;
56 }
57
markDrain(uint32_t drainMode)58 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
59 mQueue.push_back({nullptr, drainMode});
60 }
61
62 ////////////////////////////////////////////////////////////////////////////////
63
WorkHandler()64 SimpleC2Component::WorkHandler::WorkHandler() : mRunning(false) {}
65
setComponent(const std::shared_ptr<SimpleC2Component> & thiz)66 void SimpleC2Component::WorkHandler::setComponent(
67 const std::shared_ptr<SimpleC2Component> &thiz) {
68 mThiz = thiz;
69 }
70
Reply(const sp<AMessage> & msg,int32_t * err=nullptr)71 static void Reply(const sp<AMessage> &msg, int32_t *err = nullptr) {
72 sp<AReplyToken> replyId;
73 CHECK(msg->senderAwaitsResponse(&replyId));
74 sp<AMessage> reply = new AMessage;
75 if (err) {
76 reply->setInt32("err", *err);
77 }
78 reply->postReply(replyId);
79 }
80
onMessageReceived(const sp<AMessage> & msg)81 void SimpleC2Component::WorkHandler::onMessageReceived(
82 const sp<AMessage> &msg) {
83 std::shared_ptr<SimpleC2Component> thiz = mThiz.lock();
84 if (!thiz) {
85 ALOGD("component not yet set; msg = %s", msg->debugString().c_str());
86 sp<AReplyToken> replyId;
87 if (msg->senderAwaitsResponse(&replyId)) {
88 sp<AMessage> reply = new AMessage;
89 reply->setInt32("err", C2_CORRUPTED);
90 reply->postReply(replyId);
91 }
92 return;
93 }
94
95 switch (msg->what()) {
96 case kWhatProcess: {
97 if (mRunning) {
98 if (thiz->processQueue()) {
99 (new AMessage(kWhatProcess, this))->post();
100 }
101 } else {
102 DDD("Ignore process message as we're not running");
103 }
104 break;
105 }
106 case kWhatInit: {
107 int32_t err = thiz->onInit();
108 Reply(msg, &err);
109 [[fallthrough]];
110 }
111 case kWhatStart: {
112 mRunning = true;
113 break;
114 }
115 case kWhatStop: {
116 int32_t err = thiz->onStop();
117 thiz->mOutputBlockPool.reset();
118 Reply(msg, &err);
119 break;
120 }
121 case kWhatReset: {
122 thiz->onReset();
123 thiz->mOutputBlockPool.reset();
124 mRunning = false;
125 Reply(msg);
126 break;
127 }
128 case kWhatRelease: {
129 thiz->onRelease();
130 thiz->mOutputBlockPool.reset();
131 mRunning = false;
132 Reply(msg);
133 break;
134 }
135 default: {
136 ALOGD("Unrecognized msg: %d", msg->what());
137 break;
138 }
139 }
140 }
141
142 class SimpleC2Component::BlockingBlockPool : public C2BlockPool {
143 public:
BlockingBlockPool(const std::shared_ptr<C2BlockPool> & base)144 BlockingBlockPool(const std::shared_ptr<C2BlockPool> &base) : mBase{base} {}
145
getLocalId() const146 virtual local_id_t getLocalId() const override {
147 return mBase->getLocalId();
148 }
149
getAllocatorId() const150 virtual C2Allocator::id_t getAllocatorId() const override {
151 return mBase->getAllocatorId();
152 }
153
154 virtual c2_status_t
fetchLinearBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2LinearBlock> * block)155 fetchLinearBlock(uint32_t capacity, C2MemoryUsage usage,
156 std::shared_ptr<C2LinearBlock> *block) {
157 c2_status_t status;
158 do {
159 status = mBase->fetchLinearBlock(capacity, usage, block);
160 } while (status == C2_BLOCKING);
161 return status;
162 }
163
164 virtual c2_status_t
fetchCircularBlock(uint32_t capacity,C2MemoryUsage usage,std::shared_ptr<C2CircularBlock> * block)165 fetchCircularBlock(uint32_t capacity, C2MemoryUsage usage,
166 std::shared_ptr<C2CircularBlock> *block) {
167 c2_status_t status;
168 do {
169 status = mBase->fetchCircularBlock(capacity, usage, block);
170 } while (status == C2_BLOCKING);
171 return status;
172 }
173
174 virtual c2_status_t
fetchGraphicBlock(uint32_t width,uint32_t height,uint32_t format,C2MemoryUsage usage,std::shared_ptr<C2GraphicBlock> * block)175 fetchGraphicBlock(uint32_t width, uint32_t height, uint32_t format,
176 C2MemoryUsage usage,
177 std::shared_ptr<C2GraphicBlock> *block) {
178 c2_status_t status;
179 do {
180 status =
181 mBase->fetchGraphicBlock(width, height, format, usage, block);
182 } while (status == C2_BLOCKING);
183 return status;
184 }
185
186 private:
187 std::shared_ptr<C2BlockPool> mBase;
188 };
189
190 ////////////////////////////////////////////////////////////////////////////////
191
192 namespace {
193
194 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anond85038380111::DummyReadView195 DummyReadView() : C2ReadView(C2_NO_INIT) {}
196 };
197
198 } // namespace
199
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)200 SimpleC2Component::SimpleC2Component(
201 const std::shared_ptr<C2ComponentInterface> &intf)
202 : mDummyReadView(DummyReadView()), mIntf(intf), mLooper(new ALooper),
203 mHandler(new WorkHandler) {
204 mLooper->setName(intf->getName().c_str());
205 (void)mLooper->registerHandler(mHandler);
206 mLooper->start(false, false, ANDROID_PRIORITY_VIDEO);
207 }
208
~SimpleC2Component()209 SimpleC2Component::~SimpleC2Component() {
210 mLooper->unregisterHandler(mHandler->id());
211 (void)mLooper->stop();
212 }
213
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)214 c2_status_t SimpleC2Component::setListener_vb(
215 const std::shared_ptr<C2Component::Listener> &listener,
216 c2_blocking_t mayBlock) {
217 mHandler->setComponent(shared_from_this());
218
219 Mutexed<ExecState>::Locked state(mExecState);
220 if (state->mState == RUNNING) {
221 if (listener) {
222 return C2_BAD_STATE;
223 } else if (!mayBlock) {
224 return C2_BLOCKING;
225 }
226 }
227 state->mListener = listener;
228 // TODO: wait for listener change to have taken place before returning
229 // (e.g. if there is an ongoing listener callback)
230 return C2_OK;
231 }
232
233 c2_status_t
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)234 SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> *const items) {
235 {
236 Mutexed<ExecState>::Locked state(mExecState);
237 if (state->mState != RUNNING) {
238 return C2_BAD_STATE;
239 }
240 }
241 bool queueWasEmpty = false;
242 {
243 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
244 queueWasEmpty = queue->empty();
245 while (!items->empty()) {
246 queue->push_back(std::move(items->front()));
247 items->pop_front();
248 }
249 }
250 if (queueWasEmpty) {
251 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
252 }
253 return C2_OK;
254 }
255
256 c2_status_t
announce_nb(const std::vector<C2WorkOutline> & items)257 SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
258 (void)items;
259 return C2_OMITTED;
260 }
261
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)262 c2_status_t SimpleC2Component::flush_sm(
263 flush_mode_t flushMode,
264 std::list<std::unique_ptr<C2Work>> *const flushedWork) {
265 (void)flushMode;
266 {
267 Mutexed<ExecState>::Locked state(mExecState);
268 if (state->mState != RUNNING) {
269 return C2_BAD_STATE;
270 }
271 }
272 {
273 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
274 queue->incGeneration();
275 // TODO: queue->splicedBy(flushedWork, flushedWork->end());
276 while (!queue->empty()) {
277 std::unique_ptr<C2Work> work = queue->pop_front();
278 if (work) {
279 flushedWork->push_back(std::move(work));
280 }
281 }
282 while (!queue->pending().empty()) {
283 flushedWork->push_back(std::move(queue->pending().begin()->second));
284 queue->pending().erase(queue->pending().begin());
285 }
286 }
287
288 return C2_OK;
289 }
290
drain_nb(drain_mode_t drainMode)291 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
292 if (drainMode == DRAIN_CHAIN) {
293 return C2_OMITTED;
294 }
295 {
296 Mutexed<ExecState>::Locked state(mExecState);
297 if (state->mState != RUNNING) {
298 return C2_BAD_STATE;
299 }
300 }
301 bool queueWasEmpty = false;
302 {
303 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
304 queueWasEmpty = queue->empty();
305 queue->markDrain(drainMode);
306 }
307 if (queueWasEmpty) {
308 (new AMessage(WorkHandler::kWhatProcess, mHandler))->post();
309 }
310
311 return C2_OK;
312 }
313
start()314 c2_status_t SimpleC2Component::start() {
315 Mutexed<ExecState>::Locked state(mExecState);
316 if (state->mState == RUNNING) {
317 return C2_BAD_STATE;
318 }
319 bool needsInit = (state->mState == UNINITIALIZED);
320 state.unlock();
321 if (needsInit) {
322 sp<AMessage> reply;
323 (new AMessage(WorkHandler::kWhatInit, mHandler))
324 ->postAndAwaitResponse(&reply);
325 int32_t err;
326 CHECK(reply->findInt32("err", &err));
327 if (err != C2_OK) {
328 return (c2_status_t)err;
329 }
330 } else {
331 (new AMessage(WorkHandler::kWhatStart, mHandler))->post();
332 }
333 state.lock();
334 state->mState = RUNNING;
335 return C2_OK;
336 }
337
stop()338 c2_status_t SimpleC2Component::stop() {
339 DDD("stop");
340 {
341 Mutexed<ExecState>::Locked state(mExecState);
342 if (state->mState != RUNNING) {
343 return C2_BAD_STATE;
344 }
345 state->mState = STOPPED;
346 }
347 {
348 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
349 queue->clear();
350 queue->pending().clear();
351 }
352 sp<AMessage> reply;
353 (new AMessage(WorkHandler::kWhatStop, mHandler))
354 ->postAndAwaitResponse(&reply);
355 int32_t err;
356 CHECK(reply->findInt32("err", &err));
357 if (err != C2_OK) {
358 return (c2_status_t)err;
359 }
360 return C2_OK;
361 }
362
reset()363 c2_status_t SimpleC2Component::reset() {
364 DDD("reset");
365 {
366 Mutexed<ExecState>::Locked state(mExecState);
367 state->mState = UNINITIALIZED;
368 }
369 {
370 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
371 queue->clear();
372 queue->pending().clear();
373 }
374 sp<AMessage> reply;
375 (new AMessage(WorkHandler::kWhatReset, mHandler))
376 ->postAndAwaitResponse(&reply);
377 return C2_OK;
378 }
379
release()380 c2_status_t SimpleC2Component::release() {
381 DDD("release");
382 sp<AMessage> reply;
383 (new AMessage(WorkHandler::kWhatRelease, mHandler))
384 ->postAndAwaitResponse(&reply);
385 return C2_OK;
386 }
387
intf()388 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
389 return mIntf;
390 }
391
392 namespace {
393
vec(std::unique_ptr<C2Work> & work)394 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
395 std::list<std::unique_ptr<C2Work>> ret;
396 ret.push_back(std::move(work));
397 return ret;
398 }
399
400 } // namespace
401
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)402 void SimpleC2Component::finish(
403 uint64_t frameIndex,
404 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
405 std::unique_ptr<C2Work> work;
406 {
407 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
408 if (queue->pending().count(frameIndex) == 0) {
409 ALOGW("unknown frame index: %" PRIu64, frameIndex);
410 return;
411 }
412 work = std::move(queue->pending().at(frameIndex));
413 queue->pending().erase(frameIndex);
414 }
415 if (work) {
416 fillWork(work);
417 std::shared_ptr<C2Component::Listener> listener =
418 mExecState.lock()->mListener;
419 listener->onWorkDone_nb(shared_from_this(), vec(work));
420 DDD("returning pending work");
421 }
422 }
423
cloneAndSend(uint64_t frameIndex,const std::unique_ptr<C2Work> & currentWork,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)424 void SimpleC2Component::cloneAndSend(
425 uint64_t frameIndex, const std::unique_ptr<C2Work> ¤tWork,
426 std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
427 std::unique_ptr<C2Work> work(new C2Work);
428 if (currentWork->input.ordinal.frameIndex == frameIndex) {
429 work->input.flags = currentWork->input.flags;
430 work->input.ordinal = currentWork->input.ordinal;
431 } else {
432 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
433 if (queue->pending().count(frameIndex) == 0) {
434 ALOGW("unknown frame index: %" PRIu64, frameIndex);
435 return;
436 }
437 work->input.flags = queue->pending().at(frameIndex)->input.flags;
438 work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
439 }
440 work->worklets.emplace_back(new C2Worklet);
441 if (work) {
442 fillWork(work);
443 std::shared_ptr<C2Component::Listener> listener =
444 mExecState.lock()->mListener;
445 listener->onWorkDone_nb(shared_from_this(), vec(work));
446 DDD("cloned and sending work");
447 }
448 }
449
processQueue()450 bool SimpleC2Component::processQueue() {
451 std::unique_ptr<C2Work> work;
452 uint64_t generation;
453 int32_t drainMode;
454 bool isFlushPending = false;
455 bool hasQueuedWork = false;
456 {
457 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
458 if (queue->empty()) {
459 return false;
460 }
461
462 generation = queue->generation();
463 drainMode = queue->drainMode();
464 isFlushPending = queue->popPendingFlush();
465 work = queue->pop_front();
466 hasQueuedWork = !queue->empty();
467 }
468 if (isFlushPending) {
469 DDD("processing pending flush");
470 c2_status_t err = onFlush_sm();
471 if (err != C2_OK) {
472 ALOGD("flush err: %d", err);
473 // TODO: error
474 }
475 }
476
477 if (!mOutputBlockPool) {
478 c2_status_t err = [this] {
479 // TODO: don't use query_vb
480 C2StreamBufferTypeSetting::output outputFormat(0u);
481 std::vector<std::unique_ptr<C2Param>> params;
482 c2_status_t err = intf()->query_vb(
483 {&outputFormat}, {C2PortBlockPoolsTuning::output::PARAM_TYPE},
484 C2_DONT_BLOCK, ¶ms);
485 if (err != C2_OK && err != C2_BAD_INDEX) {
486 ALOGD("query err = %d", err);
487 return err;
488 }
489 C2BlockPool::local_id_t poolId =
490 outputFormat.value == C2BufferData::GRAPHIC
491 ? C2BlockPool::BASIC_GRAPHIC
492 : C2BlockPool::BASIC_LINEAR;
493 if (params.size()) {
494 C2PortBlockPoolsTuning::output *outputPools =
495 C2PortBlockPoolsTuning::output::From(params[0].get());
496 if (outputPools && outputPools->flexCount() >= 1) {
497 poolId = outputPools->m.values[0];
498 }
499 }
500
501 std::shared_ptr<C2BlockPool> blockPool;
502 err = GetCodec2BlockPool(poolId, shared_from_this(), &blockPool);
503 ALOGD("Using output block pool with poolID %llu => got %llu - %d",
504 (unsigned long long)poolId,
505 (unsigned long long)(blockPool ? blockPool->getLocalId()
506 : 111000111),
507 err);
508 if (err == C2_OK) {
509 mOutputBlockPool =
510 std::make_shared<BlockingBlockPool>(blockPool);
511 }
512 return err;
513 }();
514 if (err != C2_OK) {
515 Mutexed<ExecState>::Locked state(mExecState);
516 std::shared_ptr<C2Component::Listener> listener = state->mListener;
517 state.unlock();
518 listener->onError_nb(shared_from_this(), err);
519 return hasQueuedWork;
520 }
521 }
522
523 if (!work) {
524 c2_status_t err = drain(drainMode, mOutputBlockPool);
525 if (err != C2_OK) {
526 Mutexed<ExecState>::Locked state(mExecState);
527 std::shared_ptr<C2Component::Listener> listener = state->mListener;
528 state.unlock();
529 listener->onError_nb(shared_from_this(), err);
530 }
531 return hasQueuedWork;
532 }
533
534 {
535 std::vector<C2Param *> updates;
536 for (const std::unique_ptr<C2Param> ¶m : work->input.configUpdate) {
537 if (param) {
538 updates.emplace_back(param.get());
539 }
540 }
541 if (!updates.empty()) {
542 std::vector<std::unique_ptr<C2SettingResult>> failures;
543 c2_status_t err =
544 intf()->config_vb(updates, C2_MAY_BLOCK, &failures);
545 ALOGD("applied %zu configUpdates => %s (%d)", updates.size(),
546 asString(err), err);
547 }
548 }
549
550 DDD("start processing frame #%" PRIu64,
551 work->input.ordinal.frameIndex.peeku());
552 // If input buffer list is not empty, it means we have some input to process
553 // on. However, input could be a null buffer. In such case, clear the buffer
554 // list before making call to process().
555 if (!work->input.buffers.empty() && !work->input.buffers[0]) {
556 ALOGD("Encountered null input buffer. Clearing the input buffer");
557 work->input.buffers.clear();
558 }
559 process(work, mOutputBlockPool);
560 DDD("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
561 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
562 if (queue->generation() != generation) {
563 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
564 queue->generation(), generation);
565 work->result = C2_NOT_FOUND;
566 queue.unlock();
567
568 Mutexed<ExecState>::Locked state(mExecState);
569 std::shared_ptr<C2Component::Listener> listener = state->mListener;
570 state.unlock();
571 listener->onWorkDone_nb(shared_from_this(), vec(work));
572 return hasQueuedWork;
573 }
574 if (work->workletsProcessed != 0u) {
575 queue.unlock();
576 Mutexed<ExecState>::Locked state(mExecState);
577 DDD("returning this work");
578 std::shared_ptr<C2Component::Listener> listener = state->mListener;
579 state.unlock();
580 listener->onWorkDone_nb(shared_from_this(), vec(work));
581 } else {
582 work->input.buffers.clear();
583 std::unique_ptr<C2Work> unexpected;
584
585 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
586 DDD("queue pending work %" PRIu64, frameIndex);
587 if (queue->pending().count(frameIndex) != 0) {
588 unexpected = std::move(queue->pending().at(frameIndex));
589 queue->pending().erase(frameIndex);
590 }
591 (void)queue->pending().insert({frameIndex, std::move(work)});
592
593 queue.unlock();
594 if (unexpected) {
595 ALOGD("unexpected pending work");
596 unexpected->result = C2_CORRUPTED;
597 Mutexed<ExecState>::Locked state(mExecState);
598 std::shared_ptr<C2Component::Listener> listener = state->mListener;
599 state.unlock();
600 listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
601 }
602 }
603 return hasQueuedWork;
604 }
605
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)606 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
607 const std::shared_ptr<C2LinearBlock> &block) {
608 return createLinearBuffer(block, block->offset(), block->size());
609 }
610
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)611 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
612 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
613 return C2Buffer::CreateLinearBuffer(
614 block->share(offset, size, ::C2Fence()));
615 }
616
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)617 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
618 const std::shared_ptr<C2GraphicBlock> &block) {
619 return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
620 }
621
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)622 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
623 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
624 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
625 }
626
627 } // namespace android
628