1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "SimpleC2Component"
19 #include <log/log.h>
20 
21 #include <cutils/properties.h>
22 
23 #include <inttypes.h>
24 
25 #include <C2Config.h>
26 #include <C2Debug.h>
27 #include <C2PlatformSupport.h>
28 #include <SimpleC2Component.h>
29 
30 namespace android {
31 
pop_front()32 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
33     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
34     mQueue.pop_front();
35     return work;
36 }
37 
push_back(std::unique_ptr<C2Work> work)38 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
39     mQueue.push_back({ std::move(work), NO_DRAIN });
40 }
41 
empty() const42 bool SimpleC2Component::WorkQueue::empty() const {
43     return mQueue.empty();
44 }
45 
clear()46 void SimpleC2Component::WorkQueue::clear() {
47     mQueue.clear();
48 }
49 
drainMode() const50 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
51     return mQueue.front().drainMode;
52 }
53 
markDrain(uint32_t drainMode)54 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
55     mQueue.push_back({ nullptr, drainMode });
56 }
57 
58 ////////////////////////////////////////////////////////////////////////////////
59 
60 namespace {
61 
62 struct DummyReadView : public C2ReadView {
DummyReadViewandroid::__anon98cc350a0111::DummyReadView63     DummyReadView() : C2ReadView(C2_NO_INIT) {}
64 };
65 
66 }  // namespace
67 
SimpleC2Component(const std::shared_ptr<C2ComponentInterface> & intf)68 SimpleC2Component::SimpleC2Component(
69         const std::shared_ptr<C2ComponentInterface> &intf)
70     : mDummyReadView(DummyReadView()),
71       mIntf(intf) {
72 }
73 
setListener_vb(const std::shared_ptr<C2Component::Listener> & listener,c2_blocking_t mayBlock)74 c2_status_t SimpleC2Component::setListener_vb(
75         const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
76     Mutexed<ExecState>::Locked state(mExecState);
77     if (state->mState == RUNNING) {
78         if (listener) {
79             return C2_BAD_STATE;
80         } else if (!mayBlock) {
81             return C2_BLOCKING;
82         }
83     }
84     state->mListener = listener;
85     // TODO: wait for listener change to have taken place before returning
86     // (e.g. if there is an ongoing listener callback)
87     return C2_OK;
88 }
89 
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)90 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
91     {
92         Mutexed<ExecState>::Locked state(mExecState);
93         if (state->mState != RUNNING) {
94             return C2_BAD_STATE;
95         }
96     }
97     {
98         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
99         while (!items->empty()) {
100             queue->push_back(std::move(items->front()));
101             items->pop_front();
102         }
103         queue->mCondition.broadcast();
104     }
105     return C2_OK;
106 }
107 
announce_nb(const std::vector<C2WorkOutline> & items)108 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
109     (void)items;
110     return C2_OMITTED;
111 }
112 
flush_sm(flush_mode_t flushMode,std::list<std::unique_ptr<C2Work>> * const flushedWork)113 c2_status_t SimpleC2Component::flush_sm(
114         flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
115     (void)flushMode;
116     {
117         Mutexed<ExecState>::Locked state(mExecState);
118         if (state->mState != RUNNING) {
119             return C2_BAD_STATE;
120         }
121     }
122     {
123         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
124         queue->incGeneration();
125         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
126         while (!queue->empty()) {
127             std::unique_ptr<C2Work> work = queue->pop_front();
128             if (work) {
129                 flushedWork->push_back(std::move(work));
130             }
131         }
132     }
133     {
134         Mutexed<PendingWork>::Locked pending(mPendingWork);
135         while (!pending->empty()) {
136             flushedWork->push_back(std::move(pending->begin()->second));
137             pending->erase(pending->begin());
138         }
139     }
140 
141     return C2_OK;
142 }
143 
drain_nb(drain_mode_t drainMode)144 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
145     if (drainMode == DRAIN_CHAIN) {
146         return C2_OMITTED;
147     }
148     {
149         Mutexed<ExecState>::Locked state(mExecState);
150         if (state->mState != RUNNING) {
151             return C2_BAD_STATE;
152         }
153     }
154     {
155         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
156         queue->markDrain(drainMode);
157         queue->mCondition.broadcast();
158     }
159 
160     return C2_OK;
161 }
162 
start()163 c2_status_t SimpleC2Component::start() {
164     Mutexed<ExecState>::Locked state(mExecState);
165     if (state->mState == RUNNING) {
166         return C2_BAD_STATE;
167     }
168     bool needsInit = (state->mState == UNINITIALIZED);
169     if (needsInit) {
170         state.unlock();
171         c2_status_t err = onInit();
172         if (err != C2_OK) {
173             return err;
174         }
175         state.lock();
176     }
177     if (!state->mThread.joinable()) {
178         mExitRequested = false;
179         {
180             Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
181             monitor->mExited = false;
182         }
183         state->mThread = std::thread(
184                 [](std::weak_ptr<SimpleC2Component> wp) {
185                     while (true) {
186                         std::shared_ptr<SimpleC2Component> thiz = wp.lock();
187                         if (!thiz) {
188                             return;
189                         }
190                         if (thiz->exitRequested()) {
191                             ALOGV("stop processing");
192                             thiz->signalExit();
193                             return;
194                         }
195                         thiz->processQueue();
196                     }
197                 },
198                 shared_from_this());
199     }
200     state->mState = RUNNING;
201     return C2_OK;
202 }
203 
signalExit()204 void SimpleC2Component::signalExit() {
205     Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
206     monitor->mExited = true;
207     monitor->mCondition.broadcast();
208 }
209 
requestExitAndWait(std::function<void ()> job)210 void SimpleC2Component::requestExitAndWait(std::function<void()> job) {
211     {
212         Mutexed<ExecState>::Locked state(mExecState);
213         if (!state->mThread.joinable()) {
214             return;
215         }
216     }
217     mExitRequested = true;
218     {
219         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
220         queue->mCondition.broadcast();
221     }
222     // TODO: timeout?
223     {
224         Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
225         while (!monitor->mExited) {
226             monitor.waitForCondition(monitor->mCondition);
227         }
228         job();
229     }
230     Mutexed<ExecState>::Locked state(mExecState);
231     if (state->mThread.joinable()) {
232         ALOGV("joining the processing thread");
233         state->mThread.join();
234         ALOGV("joined the processing thread");
235     }
236 }
237 
stop()238 c2_status_t SimpleC2Component::stop() {
239     ALOGV("stop");
240     {
241         Mutexed<ExecState>::Locked state(mExecState);
242         if (state->mState != RUNNING) {
243             return C2_BAD_STATE;
244         }
245         state->mState = STOPPED;
246     }
247     {
248         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
249         queue->clear();
250     }
251     {
252         Mutexed<PendingWork>::Locked pending(mPendingWork);
253         pending->clear();
254     }
255     c2_status_t err;
256     requestExitAndWait([this, &err]{ err = onStop(); });
257     if (err != C2_OK) {
258         return err;
259     }
260     return C2_OK;
261 }
262 
reset()263 c2_status_t SimpleC2Component::reset() {
264     ALOGV("reset");
265     {
266         Mutexed<ExecState>::Locked state(mExecState);
267         state->mState = UNINITIALIZED;
268     }
269     {
270         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
271         queue->clear();
272     }
273     {
274         Mutexed<PendingWork>::Locked pending(mPendingWork);
275         pending->clear();
276     }
277     requestExitAndWait([this]{ onReset(); });
278     return C2_OK;
279 }
280 
release()281 c2_status_t SimpleC2Component::release() {
282     ALOGV("release");
283     requestExitAndWait([this]{ onRelease(); });
284     return C2_OK;
285 }
286 
intf()287 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
288     return mIntf;
289 }
290 
291 namespace {
292 
vec(std::unique_ptr<C2Work> & work)293 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
294     std::list<std::unique_ptr<C2Work>> ret;
295     ret.push_back(std::move(work));
296     return ret;
297 }
298 
299 }  // namespace
300 
finish(uint64_t frameIndex,std::function<void (const std::unique_ptr<C2Work> &)> fillWork)301 void SimpleC2Component::finish(
302         uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
303     std::unique_ptr<C2Work> work;
304     {
305         Mutexed<PendingWork>::Locked pending(mPendingWork);
306         if (pending->count(frameIndex) == 0) {
307             ALOGW("unknown frame index: %" PRIu64, frameIndex);
308             return;
309         }
310         work = std::move(pending->at(frameIndex));
311         pending->erase(frameIndex);
312     }
313     if (work) {
314         fillWork(work);
315         Mutexed<ExecState>::Locked state(mExecState);
316         std::shared_ptr<C2Component::Listener> listener = state->mListener;
317         state.unlock();
318         listener->onWorkDone_nb(shared_from_this(), vec(work));
319         ALOGV("returning pending work");
320     }
321 }
322 
processQueue()323 void SimpleC2Component::processQueue() {
324     std::unique_ptr<C2Work> work;
325     uint64_t generation;
326     int32_t drainMode;
327     bool isFlushPending = false;
328     {
329         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
330         nsecs_t deadline = systemTime() + ms2ns(250);
331         while (queue->empty()) {
332             if (exitRequested()) {
333                 return;
334             }
335             nsecs_t now = systemTime();
336             if (now >= deadline) {
337                 return;
338             }
339             status_t err = queue.waitForConditionRelative(queue->mCondition, deadline - now);
340             if (err == TIMED_OUT) {
341                 return;
342             }
343         }
344 
345         generation = queue->generation();
346         drainMode = queue->drainMode();
347         isFlushPending = queue->popPendingFlush();
348         work = queue->pop_front();
349     }
350     if (isFlushPending) {
351         ALOGV("processing pending flush");
352         c2_status_t err = onFlush_sm();
353         if (err != C2_OK) {
354             ALOGD("flush err: %d", err);
355             // TODO: error
356         }
357     }
358 
359     if (!mOutputBlockPool) {
360         c2_status_t err = [this] {
361             // TODO: don't use query_vb
362             C2StreamFormatConfig::output outputFormat(0u);
363             std::vector<std::unique_ptr<C2Param>> params;
364             c2_status_t err = intf()->query_vb(
365                     { &outputFormat },
366                     { C2PortBlockPoolsTuning::output::PARAM_TYPE },
367                     C2_DONT_BLOCK,
368                     &params);
369             if (err != C2_OK && err != C2_BAD_INDEX) {
370                 ALOGD("query err = %d", err);
371                 return err;
372             }
373             C2BlockPool::local_id_t poolId =
374                 outputFormat.value == C2FormatVideo
375                         ? C2BlockPool::BASIC_GRAPHIC
376                         : C2BlockPool::BASIC_LINEAR;
377             if (params.size()) {
378                 C2PortBlockPoolsTuning::output *outputPools =
379                     C2PortBlockPoolsTuning::output::From(params[0].get());
380                 if (outputPools && outputPools->flexCount() >= 1) {
381                     poolId = outputPools->m.values[0];
382                 }
383             }
384 
385             err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool);
386             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
387                     (unsigned long long)poolId,
388                     (unsigned long long)(
389                             mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111),
390                     err);
391             return err;
392         }();
393         if (err != C2_OK) {
394             Mutexed<ExecState>::Locked state(mExecState);
395             std::shared_ptr<C2Component::Listener> listener = state->mListener;
396             state.unlock();
397             listener->onError_nb(shared_from_this(), err);
398             return;
399         }
400     }
401 
402     if (!work) {
403         c2_status_t err = drain(drainMode, mOutputBlockPool);
404         if (err != C2_OK) {
405             Mutexed<ExecState>::Locked state(mExecState);
406             std::shared_ptr<C2Component::Listener> listener = state->mListener;
407             state.unlock();
408             listener->onError_nb(shared_from_this(), err);
409         }
410         return;
411     }
412 
413     process(work, mOutputBlockPool);
414     ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
415     {
416         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
417         if (queue->generation() != generation) {
418             ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
419                     queue->generation(), generation);
420             work->result = C2_NOT_FOUND;
421             queue.unlock();
422             {
423                 Mutexed<ExecState>::Locked state(mExecState);
424                 std::shared_ptr<C2Component::Listener> listener = state->mListener;
425                 state.unlock();
426                 listener->onWorkDone_nb(shared_from_this(), vec(work));
427             }
428             queue.lock();
429             return;
430         }
431     }
432     if (work->workletsProcessed != 0u) {
433         Mutexed<ExecState>::Locked state(mExecState);
434         ALOGV("returning this work");
435         std::shared_ptr<C2Component::Listener> listener = state->mListener;
436         state.unlock();
437         listener->onWorkDone_nb(shared_from_this(), vec(work));
438     } else {
439         ALOGV("queue pending work");
440         std::unique_ptr<C2Work> unexpected;
441         {
442             Mutexed<PendingWork>::Locked pending(mPendingWork);
443             uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
444             if (pending->count(frameIndex) != 0) {
445                 unexpected = std::move(pending->at(frameIndex));
446                 pending->erase(frameIndex);
447             }
448             (void)pending->insert({ frameIndex, std::move(work) });
449         }
450         if (unexpected) {
451             ALOGD("unexpected pending work");
452             unexpected->result = C2_CORRUPTED;
453             Mutexed<ExecState>::Locked state(mExecState);
454             std::shared_ptr<C2Component::Listener> listener = state->mListener;
455             state.unlock();
456             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
457         }
458     }
459 }
460 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block)461 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
462         const std::shared_ptr<C2LinearBlock> &block) {
463     return createLinearBuffer(block, block->offset(), block->size());
464 }
465 
createLinearBuffer(const std::shared_ptr<C2LinearBlock> & block,size_t offset,size_t size)466 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
467         const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
468     return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
469 }
470 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block)471 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
472         const std::shared_ptr<C2GraphicBlock> &block) {
473     return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
474 }
475 
createGraphicBuffer(const std::shared_ptr<C2GraphicBlock> & block,const C2Rect & crop)476 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
477         const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
478     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
479 }
480 
481 } // namespace android
482