1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20
21 #include <cutils/properties.h>
22
23 #include <inttypes.h>
24
25 #include <C2Config.h>
26 #include <C2Debug.h>
27 #include <C2PlatformSupport.h>
28 #include <SimpleC2Component.h>
29
30 namespace android {
31
pop_front()32 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
33 std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
34 mQueue.pop_front();
35 return work;
36 }
37
push_back(std::unique_ptr<C2Work> work)38 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
39 mQueue.push_back({ std::move(work), NO_DRAIN });
40 }
41
empty() const42 bool SimpleC2Component::WorkQueue::empty() const {
43 return mQueue.empty();
44 }
45
clear()46 void SimpleC2Component::WorkQueue::clear() {
47 mQueue.clear();
48 }
49
drainMode() const50 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
51 return mQueue.front().drainMode;
52 }
53
markDrain(uint32_t drainMode)54 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
55 mQueue.push_back({ nullptr, drainMode });
56 }
57
58 ////////////////////////////////////////////////////////////////////////////////
59
60 namespace {
61
62 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon98cc350a0111::DummyReadView63 DummyReadView() : C2ReadView(C2_NO_INIT) {}
64 };
65
66 } // namespace
67
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)68 SimpleC2Component::SimpleC2Component(
69 const std::shared_ptr<C2ComponentInterface> &intf)
70 : mDummyReadView(DummyReadView()),
71 mIntf(intf) {
72 }
73
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)74 c2_status_t SimpleC2Component::setListener_vb(
75 const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
76 Mutexed<ExecState>::Locked state(mExecState);
77 if (state->mState == RUNNING) {
78 if (listener) {
79 return C2_BAD_STATE;
80 } else if (!mayBlock) {
81 return C2_BLOCKING;
82 }
83 }
84 state->mListener = listener;
85 // TODO: wait for listener change to have taken place before returning
86 // (e.g. if there is an ongoing listener callback)
87 return C2_OK;
88 }
89
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)90 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
91 {
92 Mutexed<ExecState>::Locked state(mExecState);
93 if (state->mState != RUNNING) {
94 return C2_BAD_STATE;
95 }
96 }
97 {
98 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
99 while (!items->empty()) {
100 queue->push_back(std::move(items->front()));
101 items->pop_front();
102 }
103 queue->mCondition.broadcast();
104 }
105 return C2_OK;
106 }
107
announce_nb(const std::vector<C2WorkOutline> & items)108 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
109 (void)items;
110 return C2_OMITTED;
111 }
112
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)113 c2_status_t SimpleC2Component::flush_sm(
114 flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
115 (void)flushMode;
116 {
117 Mutexed<ExecState>::Locked state(mExecState);
118 if (state->mState != RUNNING) {
119 return C2_BAD_STATE;
120 }
121 }
122 {
123 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
124 queue->incGeneration();
125 // TODO: queue->splicedBy(flushedWork, flushedWork->end());
126 while (!queue->empty()) {
127 std::unique_ptr<C2Work> work = queue->pop_front();
128 if (work) {
129 flushedWork->push_back(std::move(work));
130 }
131 }
132 }
133 {
134 Mutexed<PendingWork>::Locked pending(mPendingWork);
135 while (!pending->empty()) {
136 flushedWork->push_back(std::move(pending->begin()->second));
137 pending->erase(pending->begin());
138 }
139 }
140
141 return C2_OK;
142 }
143
drain_nb(drain_mode_t drainMode)144 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
145 if (drainMode == DRAIN_CHAIN) {
146 return C2_OMITTED;
147 }
148 {
149 Mutexed<ExecState>::Locked state(mExecState);
150 if (state->mState != RUNNING) {
151 return C2_BAD_STATE;
152 }
153 }
154 {
155 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
156 queue->markDrain(drainMode);
157 queue->mCondition.broadcast();
158 }
159
160 return C2_OK;
161 }
162
start()163 c2_status_t SimpleC2Component::start() {
164 Mutexed<ExecState>::Locked state(mExecState);
165 if (state->mState == RUNNING) {
166 return C2_BAD_STATE;
167 }
168 bool needsInit = (state->mState == UNINITIALIZED);
169 if (needsInit) {
170 state.unlock();
171 c2_status_t err = onInit();
172 if (err != C2_OK) {
173 return err;
174 }
175 state.lock();
176 }
177 if (!state->mThread.joinable()) {
178 mExitRequested = false;
179 {
180 Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
181 monitor->mExited = false;
182 }
183 state->mThread = std::thread(
184 [](std::weak_ptr<SimpleC2Component> wp) {
185 while (true) {
186 std::shared_ptr<SimpleC2Component> thiz = wp.lock();
187 if (!thiz) {
188 return;
189 }
190 if (thiz->exitRequested()) {
191 ALOGV("stop processing");
192 thiz->signalExit();
193 return;
194 }
195 thiz->processQueue();
196 }
197 },
198 shared_from_this());
199 }
200 state->mState = RUNNING;
201 return C2_OK;
202 }
203
signalExit()204 void SimpleC2Component::signalExit() {
205 Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
206 monitor->mExited = true;
207 monitor->mCondition.broadcast();
208 }
209
requestExitAndWait(std::function<void ()> job)210 void SimpleC2Component::requestExitAndWait(std::function<void()> job) {
211 {
212 Mutexed<ExecState>::Locked state(mExecState);
213 if (!state->mThread.joinable()) {
214 return;
215 }
216 }
217 mExitRequested = true;
218 {
219 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
220 queue->mCondition.broadcast();
221 }
222 // TODO: timeout?
223 {
224 Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
225 while (!monitor->mExited) {
226 monitor.waitForCondition(monitor->mCondition);
227 }
228 job();
229 }
230 Mutexed<ExecState>::Locked state(mExecState);
231 if (state->mThread.joinable()) {
232 ALOGV("joining the processing thread");
233 state->mThread.join();
234 ALOGV("joined the processing thread");
235 }
236 }
237
stop()238 c2_status_t SimpleC2Component::stop() {
239 ALOGV("stop");
240 {
241 Mutexed<ExecState>::Locked state(mExecState);
242 if (state->mState != RUNNING) {
243 return C2_BAD_STATE;
244 }
245 state->mState = STOPPED;
246 }
247 {
248 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
249 queue->clear();
250 }
251 {
252 Mutexed<PendingWork>::Locked pending(mPendingWork);
253 pending->clear();
254 }
255 c2_status_t err;
256 requestExitAndWait([this, &err]{ err = onStop(); });
257 if (err != C2_OK) {
258 return err;
259 }
260 return C2_OK;
261 }
262
reset()263 c2_status_t SimpleC2Component::reset() {
264 ALOGV("reset");
265 {
266 Mutexed<ExecState>::Locked state(mExecState);
267 state->mState = UNINITIALIZED;
268 }
269 {
270 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
271 queue->clear();
272 }
273 {
274 Mutexed<PendingWork>::Locked pending(mPendingWork);
275 pending->clear();
276 }
277 requestExitAndWait([this]{ onReset(); });
278 return C2_OK;
279 }
280
release()281 c2_status_t SimpleC2Component::release() {
282 ALOGV("release");
283 requestExitAndWait([this]{ onRelease(); });
284 return C2_OK;
285 }
286
intf()287 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
288 return mIntf;
289 }
290
291 namespace {
292
vec(std::unique_ptr<C2Work> & work)293 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
294 std::list<std::unique_ptr<C2Work>> ret;
295 ret.push_back(std::move(work));
296 return ret;
297 }
298
299 } // namespace
300
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)301 void SimpleC2Component::finish(
302 uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
303 std::unique_ptr<C2Work> work;
304 {
305 Mutexed<PendingWork>::Locked pending(mPendingWork);
306 if (pending->count(frameIndex) == 0) {
307 ALOGW("unknown frame index: %" PRIu64, frameIndex);
308 return;
309 }
310 work = std::move(pending->at(frameIndex));
311 pending->erase(frameIndex);
312 }
313 if (work) {
314 fillWork(work);
315 Mutexed<ExecState>::Locked state(mExecState);
316 std::shared_ptr<C2Component::Listener> listener = state->mListener;
317 state.unlock();
318 listener->onWorkDone_nb(shared_from_this(), vec(work));
319 ALOGV("returning pending work");
320 }
321 }
322
processQueue()323 void SimpleC2Component::processQueue() {
324 std::unique_ptr<C2Work> work;
325 uint64_t generation;
326 int32_t drainMode;
327 bool isFlushPending = false;
328 {
329 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
330 nsecs_t deadline = systemTime() + ms2ns(250);
331 while (queue->empty()) {
332 if (exitRequested()) {
333 return;
334 }
335 nsecs_t now = systemTime();
336 if (now >= deadline) {
337 return;
338 }
339 status_t err = queue.waitForConditionRelative(queue->mCondition, deadline - now);
340 if (err == TIMED_OUT) {
341 return;
342 }
343 }
344
345 generation = queue->generation();
346 drainMode = queue->drainMode();
347 isFlushPending = queue->popPendingFlush();
348 work = queue->pop_front();
349 }
350 if (isFlushPending) {
351 ALOGV("processing pending flush");
352 c2_status_t err = onFlush_sm();
353 if (err != C2_OK) {
354 ALOGD("flush err: %d", err);
355 // TODO: error
356 }
357 }
358
359 if (!mOutputBlockPool) {
360 c2_status_t err = [this] {
361 // TODO: don't use query_vb
362 C2StreamFormatConfig::output outputFormat(0u);
363 std::vector<std::unique_ptr<C2Param>> params;
364 c2_status_t err = intf()->query_vb(
365 { &outputFormat },
366 { C2PortBlockPoolsTuning::output::PARAM_TYPE },
367 C2_DONT_BLOCK,
368 ¶ms);
369 if (err != C2_OK && err != C2_BAD_INDEX) {
370 ALOGD("query err = %d", err);
371 return err;
372 }
373 C2BlockPool::local_id_t poolId =
374 outputFormat.value == C2FormatVideo
375 ? C2BlockPool::BASIC_GRAPHIC
376 : C2BlockPool::BASIC_LINEAR;
377 if (params.size()) {
378 C2PortBlockPoolsTuning::output *outputPools =
379 C2PortBlockPoolsTuning::output::From(params[0].get());
380 if (outputPools && outputPools->flexCount() >= 1) {
381 poolId = outputPools->m.values[0];
382 }
383 }
384
385 err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool);
386 ALOGD("Using output block pool with poolID %llu => got %llu - %d",
387 (unsigned long long)poolId,
388 (unsigned long long)(
389 mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111),
390 err);
391 return err;
392 }();
393 if (err != C2_OK) {
394 Mutexed<ExecState>::Locked state(mExecState);
395 std::shared_ptr<C2Component::Listener> listener = state->mListener;
396 state.unlock();
397 listener->onError_nb(shared_from_this(), err);
398 return;
399 }
400 }
401
402 if (!work) {
403 c2_status_t err = drain(drainMode, mOutputBlockPool);
404 if (err != C2_OK) {
405 Mutexed<ExecState>::Locked state(mExecState);
406 std::shared_ptr<C2Component::Listener> listener = state->mListener;
407 state.unlock();
408 listener->onError_nb(shared_from_this(), err);
409 }
410 return;
411 }
412
413 process(work, mOutputBlockPool);
414 ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
415 {
416 Mutexed<WorkQueue>::Locked queue(mWorkQueue);
417 if (queue->generation() != generation) {
418 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
419 queue->generation(), generation);
420 work->result = C2_NOT_FOUND;
421 queue.unlock();
422 {
423 Mutexed<ExecState>::Locked state(mExecState);
424 std::shared_ptr<C2Component::Listener> listener = state->mListener;
425 state.unlock();
426 listener->onWorkDone_nb(shared_from_this(), vec(work));
427 }
428 queue.lock();
429 return;
430 }
431 }
432 if (work->workletsProcessed != 0u) {
433 Mutexed<ExecState>::Locked state(mExecState);
434 ALOGV("returning this work");
435 std::shared_ptr<C2Component::Listener> listener = state->mListener;
436 state.unlock();
437 listener->onWorkDone_nb(shared_from_this(), vec(work));
438 } else {
439 ALOGV("queue pending work");
440 std::unique_ptr<C2Work> unexpected;
441 {
442 Mutexed<PendingWork>::Locked pending(mPendingWork);
443 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
444 if (pending->count(frameIndex) != 0) {
445 unexpected = std::move(pending->at(frameIndex));
446 pending->erase(frameIndex);
447 }
448 (void)pending->insert({ frameIndex, std::move(work) });
449 }
450 if (unexpected) {
451 ALOGD("unexpected pending work");
452 unexpected->result = C2_CORRUPTED;
453 Mutexed<ExecState>::Locked state(mExecState);
454 std::shared_ptr<C2Component::Listener> listener = state->mListener;
455 state.unlock();
456 listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
457 }
458 }
459 }
460
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)461 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
462 const std::shared_ptr<C2LinearBlock> &block) {
463 return createLinearBuffer(block, block->offset(), block->size());
464 }
465
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)466 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
467 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
468 return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
469 }
470
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)471 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
472 const std::shared_ptr<C2GraphicBlock> &block) {
473 return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
474 }
475
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)476 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
477 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
478 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
479 }
480
481 } // namespace android
482