1 /*
2 * Copyright 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-InputBufferManager"
19 #include <android-base/logging.h>
20
21 #include <codec2/hidl/1.0/InputBufferManager.h>
22 #include <codec2/hidl/1.0/types.h>
23
24 #include <android/hardware/media/c2/1.0/IComponentListener.h>
25 #include <android-base/logging.h>
26
27 #include <C2Buffer.h>
28 #include <C2Work.h>
29
30 #include <chrono>
31
32 namespace android {
33 namespace hardware {
34 namespace media {
35 namespace c2 {
36 namespace V1_0 {
37 namespace utils {
38
39 using namespace ::android;
40
registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)41 void InputBufferManager::registerFrameData(
42 const sp<IComponentListener>& listener,
43 const C2FrameData& input) {
44 getInstance()._registerFrameData(listener, input);
45 }
46
unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)47 void InputBufferManager::unregisterFrameData(
48 const wp<IComponentListener>& listener,
49 const C2FrameData& input) {
50 getInstance()._unregisterFrameData(listener, input);
51 }
52
unregisterFrameData(const wp<IComponentListener> & listener)53 void InputBufferManager::unregisterFrameData(
54 const wp<IComponentListener>& listener) {
55 getInstance()._unregisterFrameData(listener);
56 }
57
setNotificationInterval(nsecs_t notificationIntervalNs)58 void InputBufferManager::setNotificationInterval(
59 nsecs_t notificationIntervalNs) {
60 getInstance()._setNotificationInterval(notificationIntervalNs);
61 }
62
_registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)63 void InputBufferManager::_registerFrameData(
64 const sp<IComponentListener>& listener,
65 const C2FrameData& input) {
66 uint64_t frameIndex = input.ordinal.frameIndex.peeku();
67 LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- called with "
68 << "listener @ 0x" << std::hex << listener.get()
69 << ", frameIndex = " << std::dec << frameIndex
70 << ".";
71 std::lock_guard<std::mutex> lock(mMutex);
72
73 std::set<TrackedBuffer*> &bufferIds =
74 mTrackedBuffersMap[listener][frameIndex];
75
76 for (size_t i = 0; i < input.buffers.size(); ++i) {
77 if (!input.buffers[i]) {
78 LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- "
79 << "Input buffer at index " << i << " is null.";
80 continue;
81 }
82 TrackedBuffer *bufferId =
83 new TrackedBuffer(listener, frameIndex, i, input.buffers[i]);
84 mTrackedBufferCache.emplace(bufferId);
85 bufferIds.emplace(bufferId);
86
87 c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
88 onBufferDestroyed,
89 reinterpret_cast<void*>(bufferId));
90 if (status != C2_OK) {
91 LOG(DEBUG) << "InputBufferManager::_registerFrameData -- "
92 << "registerOnDestroyNotify() failed "
93 << "(listener @ 0x" << std::hex << listener.get()
94 << ", frameIndex = " << std::dec << frameIndex
95 << ", bufferIndex = " << i
96 << ") => status = " << status
97 << ".";
98 }
99 }
100
101 mDeathNotifications.emplace(
102 listener,
103 DeathNotifications(
104 mNotificationIntervalNs.load(std::memory_order_relaxed)));
105 }
106
107 // Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
108 // mDeathNotifications. This implies all bufferIndices are removed.
109 //
110 // This is called from onWorkDone() and flush().
_unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)111 void InputBufferManager::_unregisterFrameData(
112 const wp<IComponentListener>& listener,
113 const C2FrameData& input) {
114 uint64_t frameIndex = input.ordinal.frameIndex.peeku();
115 LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
116 << "listener @ 0x" << std::hex << listener.unsafe_get()
117 << ", frameIndex = " << std::dec << frameIndex
118 << ".";
119 std::lock_guard<std::mutex> lock(mMutex);
120
121 auto findListener = mTrackedBuffersMap.find(listener);
122 if (findListener != mTrackedBuffersMap.end()) {
123 std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds
124 = findListener->second;
125 auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
126 if (findFrameIndex != frameIndex2BufferIds.end()) {
127 std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
128 for (TrackedBuffer* bufferId : bufferIds) {
129 std::shared_ptr<C2Buffer> buffer = bufferId->buffer.lock();
130 if (buffer) {
131 c2_status_t status = buffer->unregisterOnDestroyNotify(
132 onBufferDestroyed,
133 reinterpret_cast<void*>(bufferId));
134 if (status != C2_OK) {
135 LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
136 << "-- unregisterOnDestroyNotify() failed "
137 << "(listener @ 0x"
138 << std::hex
139 << bufferId->listener.unsafe_get()
140 << ", frameIndex = "
141 << std::dec << bufferId->frameIndex
142 << ", bufferIndex = " << bufferId->bufferIndex
143 << ") => status = " << status
144 << ".";
145 }
146 }
147 mTrackedBufferCache.erase(bufferId);
148 delete bufferId;
149 }
150
151 frameIndex2BufferIds.erase(findFrameIndex);
152 if (frameIndex2BufferIds.empty()) {
153 mTrackedBuffersMap.erase(findListener);
154 }
155 }
156 }
157
158 auto findListenerD = mDeathNotifications.find(listener);
159 if (findListenerD != mDeathNotifications.end()) {
160 DeathNotifications &deathNotifications = findListenerD->second;
161 auto findFrameIndex = deathNotifications.indices.find(frameIndex);
162 if (findFrameIndex != deathNotifications.indices.end()) {
163 std::vector<size_t> &bufferIndices = findFrameIndex->second;
164 deathNotifications.count -= bufferIndices.size();
165 deathNotifications.indices.erase(findFrameIndex);
166 }
167 }
168 }
169
170 // Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
171 // all frameIndices and bufferIndices are removed.
172 //
173 // This is called when the component cleans up all input buffers, i.e., when
174 // reset(), release(), stop() or ~Component() is called.
_unregisterFrameData(const wp<IComponentListener> & listener)175 void InputBufferManager::_unregisterFrameData(
176 const wp<IComponentListener>& listener) {
177 LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
178 << "listener @ 0x" << std::hex << listener.unsafe_get()
179 << std::dec << ".";
180 std::lock_guard<std::mutex> lock(mMutex);
181
182 auto findListener = mTrackedBuffersMap.find(listener);
183 if (findListener != mTrackedBuffersMap.end()) {
184 std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds =
185 findListener->second;
186 for (auto findFrameIndex = frameIndex2BufferIds.begin();
187 findFrameIndex != frameIndex2BufferIds.end();
188 ++findFrameIndex) {
189 std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
190 for (TrackedBuffer* bufferId : bufferIds) {
191 std::shared_ptr<C2Buffer> buffer = bufferId->buffer.lock();
192 if (buffer) {
193 c2_status_t status = buffer->unregisterOnDestroyNotify(
194 onBufferDestroyed,
195 reinterpret_cast<void*>(bufferId));
196 if (status != C2_OK) {
197 LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
198 << "-- unregisterOnDestroyNotify() failed "
199 << "(listener @ 0x"
200 << std::hex
201 << bufferId->listener.unsafe_get()
202 << ", frameIndex = "
203 << std::dec << bufferId->frameIndex
204 << ", bufferIndex = " << bufferId->bufferIndex
205 << ") => status = " << status
206 << ".";
207 }
208 mTrackedBufferCache.erase(bufferId);
209 delete bufferId;
210 }
211 }
212 }
213 mTrackedBuffersMap.erase(findListener);
214 }
215
216 mDeathNotifications.erase(listener);
217 }
218
219 // Set mNotificationIntervalNs.
_setNotificationInterval(nsecs_t notificationIntervalNs)220 void InputBufferManager::_setNotificationInterval(
221 nsecs_t notificationIntervalNs) {
222 mNotificationIntervalNs.store(
223 notificationIntervalNs,
224 std::memory_order_relaxed);
225 }
226
227 // Move a buffer from mTrackedBuffersMap to mDeathNotifications.
228 // This is called when a registered C2Buffer object is destroyed.
onBufferDestroyed(const C2Buffer * buf,void * arg)229 void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
230 getInstance()._onBufferDestroyed(buf, arg);
231 }
232
_onBufferDestroyed(const C2Buffer * buf,void * arg)233 void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
234 if (!buf || !arg) {
235 LOG(WARNING) << "InputBufferManager::_onBufferDestroyed -- called with "
236 << "null argument (s): "
237 << "buf @ 0x" << std::hex << buf
238 << ", arg @ 0x" << std::hex << arg
239 << std::dec << ".";
240 return;
241 }
242
243 std::lock_guard<std::mutex> lock(mMutex);
244 TrackedBuffer *bufferId = reinterpret_cast<TrackedBuffer*>(arg);
245
246 if (mTrackedBufferCache.find(bufferId) == mTrackedBufferCache.end()) {
247 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
248 << "unregistered buffer: "
249 << "buf @ 0x" << std::hex << buf
250 << ", arg @ 0x" << std::hex << arg
251 << std::dec << ".";
252 return;
253 }
254
255 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
256 << "buf @ 0x" << std::hex << buf
257 << ", arg @ 0x" << std::hex << arg
258 << std::dec << " -- "
259 << "listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
260 << ", frameIndex = " << std::dec << bufferId->frameIndex
261 << ", bufferIndex = " << bufferId->bufferIndex
262 << ".";
263 auto findListener = mTrackedBuffersMap.find(bufferId->listener);
264 if (findListener == mTrackedBuffersMap.end()) {
265 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- "
266 << "received invalid listener: "
267 << "listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
268 << " (frameIndex = " << std::dec << bufferId->frameIndex
269 << ", bufferIndex = " << bufferId->bufferIndex
270 << ").";
271 return;
272 }
273
274 std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds
275 = findListener->second;
276 auto findFrameIndex = frameIndex2BufferIds.find(bufferId->frameIndex);
277 if (findFrameIndex == frameIndex2BufferIds.end()) {
278 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
279 << "received invalid frame index: "
280 << "frameIndex = " << bufferId->frameIndex
281 << " (listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
282 << ", bufferIndex = " << std::dec << bufferId->bufferIndex
283 << ").";
284 return;
285 }
286
287 std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
288 auto findBufferId = bufferIds.find(bufferId);
289 if (findBufferId == bufferIds.end()) {
290 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
291 << "received invalid buffer index: "
292 << "bufferIndex = " << bufferId->bufferIndex
293 << " (frameIndex = " << bufferId->frameIndex
294 << ", listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
295 << std::dec << ").";
296 return;
297 }
298
299 bufferIds.erase(findBufferId);
300 if (bufferIds.empty()) {
301 frameIndex2BufferIds.erase(findFrameIndex);
302 if (frameIndex2BufferIds.empty()) {
303 mTrackedBuffersMap.erase(findListener);
304 }
305 }
306
307 DeathNotifications &deathNotifications = mDeathNotifications[bufferId->listener];
308 deathNotifications.indices[bufferId->frameIndex].emplace_back(bufferId->bufferIndex);
309 ++deathNotifications.count;
310 mOnBufferDestroyed.notify_one();
311
312 mTrackedBufferCache.erase(bufferId);
313 delete bufferId;
314 }
315
316 // Notify the clients about buffer destructions.
317 // Return false if all destructions have been notified.
318 // Return true and set timeToRetry to the time point to wait for before
319 // retrying if some destructions have not been notified.
processNotifications(nsecs_t * timeToRetryNs)320 bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
321
322 struct Notification {
323 sp<IComponentListener> listener;
324 hidl_vec<IComponentListener::InputBuffer> inputBuffers;
325 Notification(const sp<IComponentListener>& l, size_t s)
326 : listener(l), inputBuffers(s) {}
327 };
328 std::list<Notification> notifications;
329 nsecs_t notificationIntervalNs =
330 mNotificationIntervalNs.load(std::memory_order_relaxed);
331
332 bool retry = false;
333 {
334 std::lock_guard<std::mutex> lock(mMutex);
335 *timeToRetryNs = notificationIntervalNs;
336 nsecs_t timeNowNs = systemTime();
337 for (auto it = mDeathNotifications.begin();
338 it != mDeathNotifications.end(); ) {
339 sp<IComponentListener> listener = it->first.promote();
340 if (!listener) {
341 ++it;
342 continue;
343 }
344 DeathNotifications &deathNotifications = it->second;
345
346 nsecs_t timeSinceLastNotifiedNs =
347 timeNowNs - deathNotifications.lastSentNs;
348 // If not enough time has passed since the last callback, leave the
349 // notifications for this listener untouched for now and retry
350 // later.
351 if (timeSinceLastNotifiedNs < notificationIntervalNs) {
352 retry = true;
353 *timeToRetryNs = std::min(*timeToRetryNs,
354 notificationIntervalNs - timeSinceLastNotifiedNs);
355 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
356 << "Notifications for listener @ "
357 << std::hex << listener.get()
358 << " will be postponed.";
359 ++it;
360 continue;
361 }
362
363 // If enough time has passed since the last notification to this
364 // listener but there are currently no pending notifications, the
365 // listener can be removed from mDeathNotifications---there is no
366 // need to keep track of the last notification time anymore.
367 if (deathNotifications.count == 0) {
368 it = mDeathNotifications.erase(it);
369 continue;
370 }
371
372 // Create the argument for the callback.
373 notifications.emplace_back(listener, deathNotifications.count);
374 hidl_vec<IComponentListener::InputBuffer> &inputBuffers =
375 notifications.back().inputBuffers;
376 size_t i = 0;
377 for (std::pair<const uint64_t, std::vector<size_t>>& p :
378 deathNotifications.indices) {
379 uint64_t frameIndex = p.first;
380 const std::vector<size_t> &bufferIndices = p.second;
381 for (const size_t& bufferIndex : bufferIndices) {
382 IComponentListener::InputBuffer &inputBuffer
383 = inputBuffers[i++];
384 inputBuffer.arrayIndex = bufferIndex;
385 inputBuffer.frameIndex = frameIndex;
386 }
387 }
388
389 // Clear deathNotifications for this listener and set retry to true
390 // so processNotifications will be called again. This will
391 // guarantee that a listener with no pending notifications will
392 // eventually be removed from mDeathNotifications after
393 // mNotificationIntervalNs nanoseconds has passed.
394 retry = true;
395 deathNotifications.indices.clear();
396 deathNotifications.count = 0;
397 deathNotifications.lastSentNs = timeNowNs;
398 ++it;
399 }
400 }
401
402 // Call onInputBuffersReleased() outside the lock to avoid deadlock.
403 for (const Notification& notification : notifications) {
404 if (!notification.listener->onInputBuffersReleased(
405 notification.inputBuffers).isOk()) {
406 // This may trigger if the client has died.
407 LOG(DEBUG) << "InputBufferManager::processNotifications -- "
408 << "failed to send death notifications to "
409 << "listener @ 0x" << std::hex
410 << notification.listener.get()
411 << std::dec << ".";
412 } else {
413 #if LOG_NDEBUG == 0
414 std::stringstream inputBufferLog;
415 for (const IComponentListener::InputBuffer& inputBuffer :
416 notification.inputBuffers) {
417 inputBufferLog << " (" << inputBuffer.frameIndex
418 << ", " << inputBuffer.arrayIndex
419 << ")";
420 }
421 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
422 << "death notifications sent to "
423 << "listener @ 0x" << std::hex
424 << notification.listener.get()
425 << std::dec
426 << " with these (frameIndex, bufferIndex) pairs:"
427 << inputBufferLog.str();
428 #endif
429 }
430 }
431 #if LOG_NDEBUG == 0
432 if (retry) {
433 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
434 << "will retry again in " << *timeToRetryNs << "ns.";
435 } else {
436 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
437 << "no pending death notifications.";
438 }
439 #endif
440 return retry;
441 }
442
main()443 void InputBufferManager::main() {
444 LOG(VERBOSE) << "InputBufferManager main -- started.";
445 nsecs_t timeToRetryNs;
446 while (true) {
447 std::unique_lock<std::mutex> lock(mMutex);
448 while (mDeathNotifications.empty()) {
449 mOnBufferDestroyed.wait(lock);
450 }
451 lock.unlock();
452 while (processNotifications(&timeToRetryNs)) {
453 std::this_thread::sleep_for(
454 std::chrono::nanoseconds(timeToRetryNs));
455 }
456 }
457 }
458
InputBufferManager()459 InputBufferManager::InputBufferManager()
460 : mMainThread{&InputBufferManager::main, this} {
461 }
462
getInstance()463 InputBufferManager& InputBufferManager::getInstance() {
464 static InputBufferManager instance{};
465 return instance;
466 }
467
468 } // namespace utils
469 } // namespace V1_0
470 } // namespace c2
471 } // namespace media
472 } // namespace hardware
473 } // namespace android
474
475
476
477