1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-InputBufferManager"
19 #include <android-base/logging.h>
20 
21 #include <codec2/hidl/1.0/InputBufferManager.h>
22 #include <codec2/hidl/1.0/types.h>
23 
24 #include <android/hardware/media/c2/1.0/IComponentListener.h>
25 #include <android-base/logging.h>
26 
27 #include <C2Buffer.h>
28 #include <C2Work.h>
29 
30 #include <chrono>
31 
32 namespace android {
33 namespace hardware {
34 namespace media {
35 namespace c2 {
36 namespace V1_0 {
37 namespace utils {
38 
39 using namespace ::android;
40 
registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)41 void InputBufferManager::registerFrameData(
42         const sp<IComponentListener>& listener,
43         const C2FrameData& input) {
44     getInstance()._registerFrameData(listener, input);
45 }
46 
unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)47 void InputBufferManager::unregisterFrameData(
48         const wp<IComponentListener>& listener,
49         const C2FrameData& input) {
50     getInstance()._unregisterFrameData(listener, input);
51 }
52 
unregisterFrameData(const wp<IComponentListener> & listener)53 void InputBufferManager::unregisterFrameData(
54         const wp<IComponentListener>& listener) {
55     getInstance()._unregisterFrameData(listener);
56 }
57 
setNotificationInterval(nsecs_t notificationIntervalNs)58 void InputBufferManager::setNotificationInterval(
59         nsecs_t notificationIntervalNs) {
60     getInstance()._setNotificationInterval(notificationIntervalNs);
61 }
62 
_registerFrameData(const sp<IComponentListener> & listener,const C2FrameData & input)63 void InputBufferManager::_registerFrameData(
64         const sp<IComponentListener>& listener,
65         const C2FrameData& input) {
66     uint64_t frameIndex = input.ordinal.frameIndex.peeku();
67     LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- called with "
68                  << "listener @ 0x" << std::hex << listener.get()
69                  << ", frameIndex = " << std::dec << frameIndex
70                  << ".";
71     std::lock_guard<std::mutex> lock(mMutex);
72 
73     std::set<TrackedBuffer*> &bufferIds =
74             mTrackedBuffersMap[listener][frameIndex];
75 
76     for (size_t i = 0; i < input.buffers.size(); ++i) {
77         if (!input.buffers[i]) {
78             LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- "
79                          << "Input buffer at index " << i << " is null.";
80             continue;
81         }
82         TrackedBuffer *bufferId =
83             new TrackedBuffer(listener, frameIndex, i, input.buffers[i]);
84         mTrackedBufferCache.emplace(bufferId);
85         bufferIds.emplace(bufferId);
86 
87         c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
88                 onBufferDestroyed,
89                 reinterpret_cast<void*>(bufferId));
90         if (status != C2_OK) {
91             LOG(DEBUG) << "InputBufferManager::_registerFrameData -- "
92                        << "registerOnDestroyNotify() failed "
93                        << "(listener @ 0x" << std::hex << listener.get()
94                        << ", frameIndex = " << std::dec << frameIndex
95                        << ", bufferIndex = " << i
96                        << ") => status = " << status
97                        << ".";
98         }
99     }
100 
101     mDeathNotifications.emplace(
102             listener,
103             DeathNotifications(
104                 mNotificationIntervalNs.load(std::memory_order_relaxed)));
105 }
106 
107 // Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
108 // mDeathNotifications. This implies all bufferIndices are removed.
109 //
110 // This is called from onWorkDone() and flush().
_unregisterFrameData(const wp<IComponentListener> & listener,const C2FrameData & input)111 void InputBufferManager::_unregisterFrameData(
112         const wp<IComponentListener>& listener,
113         const C2FrameData& input) {
114     uint64_t frameIndex = input.ordinal.frameIndex.peeku();
115     LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
116                  << "listener @ 0x" << std::hex << listener.unsafe_get()
117                  << ", frameIndex = " << std::dec << frameIndex
118                  << ".";
119     std::lock_guard<std::mutex> lock(mMutex);
120 
121     auto findListener = mTrackedBuffersMap.find(listener);
122     if (findListener != mTrackedBuffersMap.end()) {
123         std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds
124                 = findListener->second;
125         auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
126         if (findFrameIndex != frameIndex2BufferIds.end()) {
127             std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
128             for (TrackedBuffer* bufferId : bufferIds) {
129                 std::shared_ptr<C2Buffer> buffer = bufferId->buffer.lock();
130                 if (buffer) {
131                     c2_status_t status = buffer->unregisterOnDestroyNotify(
132                             onBufferDestroyed,
133                             reinterpret_cast<void*>(bufferId));
134                     if (status != C2_OK) {
135                         LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
136                                    << "-- unregisterOnDestroyNotify() failed "
137                                    << "(listener @ 0x"
138                                         << std::hex
139                                         << bufferId->listener.unsafe_get()
140                                    << ", frameIndex = "
141                                         << std::dec << bufferId->frameIndex
142                                    << ", bufferIndex = " << bufferId->bufferIndex
143                                    << ") => status = " << status
144                                    << ".";
145                     }
146                 }
147                 mTrackedBufferCache.erase(bufferId);
148                 delete bufferId;
149             }
150 
151             frameIndex2BufferIds.erase(findFrameIndex);
152             if (frameIndex2BufferIds.empty()) {
153                 mTrackedBuffersMap.erase(findListener);
154             }
155         }
156     }
157 
158     auto findListenerD = mDeathNotifications.find(listener);
159     if (findListenerD != mDeathNotifications.end()) {
160         DeathNotifications &deathNotifications = findListenerD->second;
161         auto findFrameIndex = deathNotifications.indices.find(frameIndex);
162         if (findFrameIndex != deathNotifications.indices.end()) {
163             std::vector<size_t> &bufferIndices = findFrameIndex->second;
164             deathNotifications.count -= bufferIndices.size();
165             deathNotifications.indices.erase(findFrameIndex);
166         }
167     }
168 }
169 
170 // Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
171 // all frameIndices and bufferIndices are removed.
172 //
173 // This is called when the component cleans up all input buffers, i.e., when
174 // reset(), release(), stop() or ~Component() is called.
_unregisterFrameData(const wp<IComponentListener> & listener)175 void InputBufferManager::_unregisterFrameData(
176         const wp<IComponentListener>& listener) {
177     LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
178                  << "listener @ 0x" << std::hex << listener.unsafe_get()
179                  << std::dec << ".";
180     std::lock_guard<std::mutex> lock(mMutex);
181 
182     auto findListener = mTrackedBuffersMap.find(listener);
183     if (findListener != mTrackedBuffersMap.end()) {
184         std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds =
185                 findListener->second;
186         for (auto findFrameIndex = frameIndex2BufferIds.begin();
187                 findFrameIndex != frameIndex2BufferIds.end();
188                 ++findFrameIndex) {
189             std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
190             for (TrackedBuffer* bufferId : bufferIds) {
191                 std::shared_ptr<C2Buffer> buffer = bufferId->buffer.lock();
192                 if (buffer) {
193                     c2_status_t status = buffer->unregisterOnDestroyNotify(
194                             onBufferDestroyed,
195                             reinterpret_cast<void*>(bufferId));
196                     if (status != C2_OK) {
197                         LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
198                                    << "-- unregisterOnDestroyNotify() failed "
199                                    << "(listener @ 0x"
200                                         << std::hex
201                                         << bufferId->listener.unsafe_get()
202                                    << ", frameIndex = "
203                                         << std::dec << bufferId->frameIndex
204                                    << ", bufferIndex = " << bufferId->bufferIndex
205                                    << ") => status = " << status
206                                    << ".";
207                     }
208                     mTrackedBufferCache.erase(bufferId);
209                     delete bufferId;
210                 }
211             }
212         }
213         mTrackedBuffersMap.erase(findListener);
214     }
215 
216     mDeathNotifications.erase(listener);
217 }
218 
219 // Set mNotificationIntervalNs.
_setNotificationInterval(nsecs_t notificationIntervalNs)220 void InputBufferManager::_setNotificationInterval(
221         nsecs_t notificationIntervalNs) {
222     mNotificationIntervalNs.store(
223             notificationIntervalNs,
224             std::memory_order_relaxed);
225 }
226 
227 // Move a buffer from mTrackedBuffersMap to mDeathNotifications.
228 // This is called when a registered C2Buffer object is destroyed.
onBufferDestroyed(const C2Buffer * buf,void * arg)229 void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
230     getInstance()._onBufferDestroyed(buf, arg);
231 }
232 
_onBufferDestroyed(const C2Buffer * buf,void * arg)233 void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
234     if (!buf || !arg) {
235         LOG(WARNING) << "InputBufferManager::_onBufferDestroyed -- called with "
236                      << "null argument (s): "
237                      << "buf @ 0x" << std::hex << buf
238                      << ", arg @ 0x" << std::hex << arg
239                      << std::dec << ".";
240         return;
241     }
242 
243     std::lock_guard<std::mutex> lock(mMutex);
244     TrackedBuffer *bufferId = reinterpret_cast<TrackedBuffer*>(arg);
245 
246     if (mTrackedBufferCache.find(bufferId) == mTrackedBufferCache.end()) {
247         LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
248                      << "unregistered buffer: "
249                      << "buf @ 0x" << std::hex << buf
250                      << ", arg @ 0x" << std::hex << arg
251                      << std::dec << ".";
252         return;
253     }
254 
255     LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
256                  << "buf @ 0x" << std::hex << buf
257                  << ", arg @ 0x" << std::hex << arg
258                  << std::dec << " -- "
259                  << "listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
260                  << ", frameIndex = " << std::dec << bufferId->frameIndex
261                  << ", bufferIndex = " << bufferId->bufferIndex
262                  << ".";
263     auto findListener = mTrackedBuffersMap.find(bufferId->listener);
264     if (findListener == mTrackedBuffersMap.end()) {
265         LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- "
266                      << "received invalid listener: "
267                      << "listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
268                      << " (frameIndex = " << std::dec << bufferId->frameIndex
269                      << ", bufferIndex = " << bufferId->bufferIndex
270                      << ").";
271         return;
272     }
273 
274     std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds
275             = findListener->second;
276     auto findFrameIndex = frameIndex2BufferIds.find(bufferId->frameIndex);
277     if (findFrameIndex == frameIndex2BufferIds.end()) {
278         LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
279                    << "received invalid frame index: "
280                    << "frameIndex = " << bufferId->frameIndex
281                    << " (listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
282                    << ", bufferIndex = " << std::dec << bufferId->bufferIndex
283                    << ").";
284         return;
285     }
286 
287     std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
288     auto findBufferId = bufferIds.find(bufferId);
289     if (findBufferId == bufferIds.end()) {
290         LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
291                    << "received invalid buffer index: "
292                    << "bufferIndex = " << bufferId->bufferIndex
293                    << " (frameIndex = " << bufferId->frameIndex
294                    << ", listener @ 0x" << std::hex << bufferId->listener.unsafe_get()
295                    << std::dec << ").";
296         return;
297     }
298 
299     bufferIds.erase(findBufferId);
300     if (bufferIds.empty()) {
301         frameIndex2BufferIds.erase(findFrameIndex);
302         if (frameIndex2BufferIds.empty()) {
303             mTrackedBuffersMap.erase(findListener);
304         }
305     }
306 
307     DeathNotifications &deathNotifications = mDeathNotifications[bufferId->listener];
308     deathNotifications.indices[bufferId->frameIndex].emplace_back(bufferId->bufferIndex);
309     ++deathNotifications.count;
310     mOnBufferDestroyed.notify_one();
311 
312     mTrackedBufferCache.erase(bufferId);
313     delete bufferId;
314 }
315 
316 // Notify the clients about buffer destructions.
317 // Return false if all destructions have been notified.
318 // Return true and set timeToRetry to the time point to wait for before
319 // retrying if some destructions have not been notified.
processNotifications(nsecs_t * timeToRetryNs)320 bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
321 
322     struct Notification {
323         sp<IComponentListener> listener;
324         hidl_vec<IComponentListener::InputBuffer> inputBuffers;
325         Notification(const sp<IComponentListener>& l, size_t s)
326               : listener(l), inputBuffers(s) {}
327     };
328     std::list<Notification> notifications;
329     nsecs_t notificationIntervalNs =
330             mNotificationIntervalNs.load(std::memory_order_relaxed);
331 
332     bool retry = false;
333     {
334         std::lock_guard<std::mutex> lock(mMutex);
335         *timeToRetryNs = notificationIntervalNs;
336         nsecs_t timeNowNs = systemTime();
337         for (auto it = mDeathNotifications.begin();
338                 it != mDeathNotifications.end(); ) {
339             sp<IComponentListener> listener = it->first.promote();
340             if (!listener) {
341                 ++it;
342                 continue;
343             }
344             DeathNotifications &deathNotifications = it->second;
345 
346             nsecs_t timeSinceLastNotifiedNs =
347                     timeNowNs - deathNotifications.lastSentNs;
348             // If not enough time has passed since the last callback, leave the
349             // notifications for this listener untouched for now and retry
350             // later.
351             if (timeSinceLastNotifiedNs < notificationIntervalNs) {
352                 retry = true;
353                 *timeToRetryNs = std::min(*timeToRetryNs,
354                         notificationIntervalNs - timeSinceLastNotifiedNs);
355                 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
356                              << "Notifications for listener @ "
357                                  << std::hex << listener.get()
358                              << " will be postponed.";
359                 ++it;
360                 continue;
361             }
362 
363             // If enough time has passed since the last notification to this
364             // listener but there are currently no pending notifications, the
365             // listener can be removed from mDeathNotifications---there is no
366             // need to keep track of the last notification time anymore.
367             if (deathNotifications.count == 0) {
368                 it = mDeathNotifications.erase(it);
369                 continue;
370             }
371 
372             // Create the argument for the callback.
373             notifications.emplace_back(listener, deathNotifications.count);
374             hidl_vec<IComponentListener::InputBuffer> &inputBuffers =
375                     notifications.back().inputBuffers;
376             size_t i = 0;
377             for (std::pair<const uint64_t, std::vector<size_t>>& p :
378                     deathNotifications.indices) {
379                 uint64_t frameIndex = p.first;
380                 const std::vector<size_t> &bufferIndices = p.second;
381                 for (const size_t& bufferIndex : bufferIndices) {
382                     IComponentListener::InputBuffer &inputBuffer
383                             = inputBuffers[i++];
384                     inputBuffer.arrayIndex = bufferIndex;
385                     inputBuffer.frameIndex = frameIndex;
386                 }
387             }
388 
389             // Clear deathNotifications for this listener and set retry to true
390             // so processNotifications will be called again. This will
391             // guarantee that a listener with no pending notifications will
392             // eventually be removed from mDeathNotifications after
393             // mNotificationIntervalNs nanoseconds has passed.
394             retry = true;
395             deathNotifications.indices.clear();
396             deathNotifications.count = 0;
397             deathNotifications.lastSentNs = timeNowNs;
398             ++it;
399         }
400     }
401 
402     // Call onInputBuffersReleased() outside the lock to avoid deadlock.
403     for (const Notification& notification : notifications) {
404         if (!notification.listener->onInputBuffersReleased(
405                 notification.inputBuffers).isOk()) {
406             // This may trigger if the client has died.
407             LOG(DEBUG) << "InputBufferManager::processNotifications -- "
408                        << "failed to send death notifications to "
409                        << "listener @ 0x" << std::hex
410                                           << notification.listener.get()
411                        << std::dec << ".";
412         } else {
413 #if LOG_NDEBUG == 0
414             std::stringstream inputBufferLog;
415             for (const IComponentListener::InputBuffer& inputBuffer :
416                     notification.inputBuffers) {
417                 inputBufferLog << " (" << inputBuffer.frameIndex
418                                << ", " << inputBuffer.arrayIndex
419                                << ")";
420             }
421             LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
422                          << "death notifications sent to "
423                          << "listener @ 0x" << std::hex
424                                             << notification.listener.get()
425                                             << std::dec
426                          << " with these (frameIndex, bufferIndex) pairs:"
427                          << inputBufferLog.str();
428 #endif
429         }
430     }
431 #if LOG_NDEBUG == 0
432     if (retry) {
433         LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
434                      << "will retry again in " << *timeToRetryNs << "ns.";
435     } else {
436         LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
437                      << "no pending death notifications.";
438     }
439 #endif
440     return retry;
441 }
442 
main()443 void InputBufferManager::main() {
444     LOG(VERBOSE) << "InputBufferManager main -- started.";
445     nsecs_t timeToRetryNs;
446     while (true) {
447         std::unique_lock<std::mutex> lock(mMutex);
448         while (mDeathNotifications.empty()) {
449             mOnBufferDestroyed.wait(lock);
450         }
451         lock.unlock();
452         while (processNotifications(&timeToRetryNs)) {
453             std::this_thread::sleep_for(
454                     std::chrono::nanoseconds(timeToRetryNs));
455         }
456     }
457 }
458 
InputBufferManager()459 InputBufferManager::InputBufferManager()
460       : mMainThread{&InputBufferManager::main, this} {
461 }
462 
getInstance()463 InputBufferManager& InputBufferManager::getInstance() {
464     static InputBufferManager instance{};
465     return instance;
466 }
467 
468 }  // namespace utils
469 }  // namespace V1_0
470 }  // namespace c2
471 }  // namespace media
472 }  // namespace hardware
473 }  // namespace android
474 
475 
476 
477