1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define LOG_TAG "AidlBufferPoolAcc"
17 //#define LOG_NDEBUG 0
18 
19 #include <android-base/no_destructor.h>
20 
21 #include <sys/types.h>
22 #include <stdint.h>
23 #include <time.h>
24 #include <unistd.h>
25 #include <utils/Log.h>
26 #include <thread>
27 
28 #include "Accessor.h"
29 #include "Connection.h"
30 #include "DataHelper.h"
31 
32 namespace aidl::android::hardware::media::bufferpool2::implementation {
33 
34 namespace {
35     static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
36     static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
37 }
38 
39 #ifdef __ANDROID_VNDK__
40 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
41 #else
42 static constexpr uint32_t kSeqIdVndkBit = 0;
43 #endif
44 
45 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
46 
ConnectionIdGenerator()47 Accessor::ConnectionIdGenerator::ConnectionIdGenerator() {
48     mSeqId = static_cast<uint32_t>(time(nullptr) & kSeqIdMax);
49     mPid = static_cast<int32_t>(getpid());
50 }
51 
getConnectionId()52 ConnectionId Accessor::ConnectionIdGenerator::getConnectionId() {
53     uint32_t seq;
54     {
55         std::lock_guard<std::mutex> l(mLock);
56         seq = mSeqId;
57         if (mSeqId == kSeqIdMax) {
58             mSeqId = 0;
59         } else {
60             ++mSeqId;
61         }
62     }
63     return (int64_t)mPid << 32 | seq | kSeqIdVndkBit;
64 }
65 
66 namespace {
67 // anonymous namespace
68 static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
69     std::make_shared<ConnectionDeathRecipient>();
70 
serviceDied(void * cookie)71 void serviceDied(void *cookie) {
72     if (sConnectionDeathRecipient) {
73         sConnectionDeathRecipient->onDead(cookie);
74     }
75 }
76 }
77 
getConnectionDeathRecipient()78 std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
79     return sConnectionDeathRecipient;
80 }
81 
ConnectionDeathRecipient()82 ConnectionDeathRecipient::ConnectionDeathRecipient() {
83     mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
84             AIBinder_DeathRecipient_new(serviceDied));
85 }
86 
add(int64_t connectionId,const std::shared_ptr<Accessor> & accessor)87 void ConnectionDeathRecipient::add(
88         int64_t connectionId,
89         const std::shared_ptr<Accessor> &accessor) {
90     std::lock_guard<std::mutex> lock(mLock);
91     if (mAccessors.find(connectionId) == mAccessors.end()) {
92         mAccessors.insert(std::make_pair(connectionId, accessor));
93     }
94 }
95 
remove(int64_t connectionId)96 void ConnectionDeathRecipient::remove(int64_t connectionId) {
97     std::lock_guard<std::mutex> lock(mLock);
98     mAccessors.erase(connectionId);
99     auto it = mConnectionToCookie.find(connectionId);
100     if (it != mConnectionToCookie.end()) {
101         void * cookie = it->second;
102         mConnectionToCookie.erase(it);
103         auto cit = mCookieToConnections.find(cookie);
104         if (cit != mCookieToConnections.end()) {
105             cit->second.erase(connectionId);
106             if (cit->second.size() == 0) {
107                 mCookieToConnections.erase(cit);
108             }
109         }
110     }
111 }
112 
addCookieToConnection(void * cookie,int64_t connectionId)113 void ConnectionDeathRecipient::addCookieToConnection(
114         void *cookie,
115         int64_t connectionId) {
116     std::lock_guard<std::mutex> lock(mLock);
117     if (mAccessors.find(connectionId) == mAccessors.end()) {
118         return;
119     }
120     mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
121     auto it = mCookieToConnections.find(cookie);
122     if (it != mCookieToConnections.end()) {
123         it->second.insert(connectionId);
124     } else {
125         mCookieToConnections.insert(std::make_pair(
126                 cookie, std::set<int64_t>{connectionId}));
127     }
128 }
129 
onDead(void * cookie)130 void ConnectionDeathRecipient::onDead(void *cookie) {
131     std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
132     {
133         std::lock_guard<std::mutex> lock(mLock);
134 
135         auto it = mCookieToConnections.find(cookie);
136         if (it != mCookieToConnections.end()) {
137             for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
138                 auto accessorIt = mAccessors.find(*conIt);
139                 if (accessorIt != mAccessors.end()) {
140                     connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
141                     mAccessors.erase(accessorIt);
142                 }
143                 mConnectionToCookie.erase(*conIt);
144             }
145             mCookieToConnections.erase(it);
146         }
147     }
148 
149     if (connectionsToClose.size() > 0) {
150         std::shared_ptr<Accessor> accessor;
151         for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
152             accessor = it->second.lock();
153 
154             if (accessor) {
155                 accessor->close(it->first);
156                 ALOGD("connection %lld closed on death", (long long)it->first);
157             }
158         }
159     }
160 }
161 
getRecipient()162 AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
163     return mDeathRecipient.get();
164 }
165 
connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver> & in_observer,::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo * _aidl_return)166 ::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
167     std::shared_ptr<Connection> connection;
168     ConnectionId connectionId;
169     uint32_t msgId;
170     StatusDescriptor statusDesc;
171     InvalidationDescriptor invDesc;
172     BufferPoolStatus status = connect(
173             in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
174     if (status == ResultStatus::OK) {
175         _aidl_return->connection = connection;
176         _aidl_return->connectionId = connectionId;
177         _aidl_return->msgId = msgId;
178         _aidl_return->toFmqDesc = std::move(statusDesc);
179         _aidl_return->fromFmqDesc = std::move(invDesc);
180         return ::ndk::ScopedAStatus::ok();
181     }
182     return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
183 }
184 
Accessor(const std::shared_ptr<BufferPoolAllocator> & allocator)185 Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
186     : mAllocator(allocator), mScheduleEvictTs(0) {}
187 
~Accessor()188 Accessor::~Accessor() {
189 }
190 
isValid()191 bool Accessor::isValid() {
192     return mBufferPool.isValid();
193 }
194 
flush()195 BufferPoolStatus Accessor::flush() {
196     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
197     mBufferPool.processStatusMessages();
198     mBufferPool.flush(ref<Accessor>());
199     return ResultStatus::OK;
200 }
201 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)202 BufferPoolStatus Accessor::allocate(
203         ConnectionId connectionId,
204         const std::vector<uint8_t> &params,
205         BufferId *bufferId, const native_handle_t** handle) {
206     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
207     mBufferPool.processStatusMessages();
208     BufferPoolStatus status = ResultStatus::OK;
209     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
210         lock.unlock();
211         std::shared_ptr<BufferPoolAllocation> alloc;
212         size_t allocSize;
213         status = mAllocator->allocate(params, &alloc, &allocSize);
214         lock.lock();
215         if (status == ResultStatus::OK) {
216             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
217         }
218         ALOGV("create a buffer %d : %u %p",
219               status == ResultStatus::OK, *bufferId, *handle);
220     }
221     if (status == ResultStatus::OK) {
222         // TODO: handle ownBuffer failure
223         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
224     }
225     mBufferPool.cleanUp();
226     scheduleEvictIfNeeded();
227     return status;
228 }
229 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)230 BufferPoolStatus Accessor::fetch(
231         ConnectionId connectionId, TransactionId transactionId,
232         BufferId bufferId, const native_handle_t** handle) {
233     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
234     mBufferPool.processStatusMessages();
235     auto found = mBufferPool.mTransactions.find(transactionId);
236     if (found != mBufferPool.mTransactions.end() &&
237             contains(&mBufferPool.mPendingTransactions,
238                      connectionId, transactionId)) {
239         if (found->second->mSenderValidated &&
240                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
241                 found->second->mBufferId == bufferId) {
242             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
243             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
244             if (bufferIt != mBufferPool.mBuffers.end()) {
245                 mBufferPool.mStats.onBufferFetched();
246                 *handle = bufferIt->second->handle();
247                 return ResultStatus::OK;
248             }
249         }
250     }
251     mBufferPool.cleanUp();
252     scheduleEvictIfNeeded();
253     return ResultStatus::CRITICAL_ERROR;
254 }
255 
connect(const std::shared_ptr<IObserver> & observer,bool local,std::shared_ptr<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,StatusDescriptor * statusDescPtr,InvalidationDescriptor * invDescPtr)256 BufferPoolStatus Accessor::connect(
257         const std::shared_ptr<IObserver> &observer, bool local,
258         std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
259         uint32_t *pMsgId,
260         StatusDescriptor* statusDescPtr,
261         InvalidationDescriptor* invDescPtr) {
262     static ::android::base::NoDestructor<ConnectionIdGenerator> sConIdGenerator;
263     std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
264     BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
265     {
266         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
267         if (newConnection) {
268             int32_t pid = getpid();
269             ConnectionId id = sConIdGenerator->getConnectionId();
270             status = mBufferPool.mObserver.open(id, statusDescPtr);
271             if (status == ResultStatus::OK) {
272                 newConnection->initialize(ref<Accessor>(), id);
273                 *connection = newConnection;
274                 *pConnectionId = id;
275                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
276                 mBufferPool.mConnectionIds.insert(id);
277                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
278                 mBufferPool.mInvalidation.onConnect(id, observer);
279             }
280 
281         }
282         mBufferPool.processStatusMessages();
283         mBufferPool.cleanUp();
284         scheduleEvictIfNeeded();
285     }
286     if (!local && status == ResultStatus::OK) {
287         std::shared_ptr<Accessor> accessor(ref<Accessor>());
288         sConnectionDeathRecipient->add(*pConnectionId, accessor);
289     }
290     return status;
291 }
292 
close(ConnectionId connectionId)293 BufferPoolStatus Accessor::close(ConnectionId connectionId) {
294     {
295         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
296         ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
297         mBufferPool.processStatusMessages();
298         mBufferPool.handleClose(connectionId);
299         mBufferPool.mObserver.close(connectionId);
300         mBufferPool.mInvalidation.onClose(connectionId);
301         // Since close# will be called after all works are finished, it is OK to
302         // evict unused buffers.
303         mBufferPool.cleanUp(true);
304         scheduleEvictIfNeeded();
305     }
306     sConnectionDeathRecipient->remove(connectionId);
307     return ResultStatus::OK;
308 }
309 
cleanUp(bool clearCache)310 void Accessor::cleanUp(bool clearCache) {
311     // transaction timeout, buffer caching TTL handling
312     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
313     mBufferPool.processStatusMessages();
314     mBufferPool.cleanUp(clearCache);
315 }
316 
handleInvalidateAck()317 void Accessor::handleInvalidateAck() {
318     std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
319     uint32_t invalidationId;
320     {
321         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
322         mBufferPool.processStatusMessages();
323         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
324     }
325     // Do not hold lock for send invalidations
326     size_t deadClients = 0;
327     for (auto it = observers.begin(); it != observers.end(); ++it) {
328         const std::shared_ptr<IObserver> observer = it->second;
329         if (observer) {
330             ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
331             if (!status.isOk()) {
332                 ++deadClients;
333             }
334         }
335     }
336     if (deadClients > 0) {
337         ALOGD("During invalidation found %zu dead clients", deadClients);
338     }
339 }
340 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)341 void Accessor::invalidatorThread(
342             std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
343             std::mutex &mutex,
344             std::condition_variable &cv,
345             bool &ready) {
346     constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
347     constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
348     constexpr useconds_t MAX_SLEEP_US = 10000;
349     uint32_t numSpin = 0;
350     useconds_t sleepUs = 1;
351 
352     while(true) {
353         std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
354         {
355             std::unique_lock<std::mutex> lock(mutex);
356             while (!ready) {
357                 numSpin = 0;
358                 sleepUs = 1;
359                 cv.wait(lock);
360             }
361             copied.insert(accessors.begin(), accessors.end());
362         }
363         std::list<ConnectionId> erased;
364         for (auto it = copied.begin(); it != copied.end(); ++it) {
365             const std::shared_ptr<Accessor> acc = it->second.lock();
366             if (!acc) {
367                 erased.push_back(it->first);
368             } else {
369                 acc->handleInvalidateAck();
370             }
371         }
372         {
373             std::unique_lock<std::mutex> lock(mutex);
374             for (auto it = erased.begin(); it != erased.end(); ++it) {
375                 accessors.erase(*it);
376             }
377             if (accessors.size() == 0) {
378                 ready = false;
379             } else {
380                 // N.B. Since there is not a efficient way to wait over FMQ,
381                 // polling over the FMQ is the current way to prevent draining
382                 // CPU.
383                 lock.unlock();
384                 ++numSpin;
385                 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
386                     sleepUs < MAX_SLEEP_US) {
387                     sleepUs *= 10;
388                 }
389                 if (numSpin % NUM_SPIN_TO_LOG == 0) {
390                     ALOGW("invalidator thread spinning");
391                 }
392                 ::usleep(sleepUs);
393             }
394         }
395     }
396 }
397 
AccessorInvalidator()398 Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
399     std::thread invalidator(
400             invalidatorThread,
401             std::ref(mAccessors),
402             std::ref(mMutex),
403             std::ref(mCv),
404             std::ref(mReady));
405     invalidator.detach();
406 }
407 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor> & accessor)408 void Accessor::AccessorInvalidator::addAccessor(
409         uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
410     bool notify = false;
411     std::unique_lock<std::mutex> lock(mMutex);
412     if (mAccessors.find(accessorId) == mAccessors.end()) {
413         if (!mReady) {
414             mReady = true;
415             notify = true;
416         }
417         mAccessors.emplace(accessorId, accessor);
418         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
419     }
420     lock.unlock();
421     if (notify) {
422         mCv.notify_one();
423     }
424 }
425 
delAccessor(uint32_t accessorId)426 void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
427     std::lock_guard<std::mutex> lock(mMutex);
428     mAccessors.erase(accessorId);
429     ALOGV("buffer invalidation deleted bp:%u", accessorId);
430     if (mAccessors.size() == 0) {
431         mReady = false;
432     }
433 }
434 
435 std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;
436 
createInvalidator()437 void Accessor::createInvalidator() {
438     if (!sInvalidator) {
439         sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
440     }
441 }
442 
evictorThread(std::map<const std::weak_ptr<Accessor>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)443 void Accessor::evictorThread(
444         std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
445         std::mutex &mutex,
446         std::condition_variable &cv) {
447     std::list<const std::weak_ptr<Accessor>> evictList;
448     while (true) {
449         int expired = 0;
450         int evicted = 0;
451         {
452             nsecs_t now = systemTime();
453             std::unique_lock<std::mutex> lock(mutex);
454             while (accessors.size() == 0) {
455                 cv.wait(lock);
456             }
457             auto it = accessors.begin();
458             while (it != accessors.end()) {
459                 if (now > (it->second + kEvictDurationNs)) {
460                     ++expired;
461                     evictList.push_back(it->first);
462                     it = accessors.erase(it);
463                 } else {
464                     ++it;
465                 }
466             }
467         }
468         // evict idle accessors;
469         for (auto it = evictList.begin(); it != evictList.end(); ++it) {
470             const std::shared_ptr<Accessor> accessor = it->lock();
471             if (accessor) {
472                 accessor->cleanUp(true);
473                 ++evicted;
474             }
475         }
476         if (expired > 0) {
477             ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
478         }
479         evictList.clear();
480         ::usleep(kEvictGranularityNs / 1000);
481     }
482 }
483 
AccessorEvictor()484 Accessor::AccessorEvictor::AccessorEvictor() {
485     std::thread evictor(
486             evictorThread,
487             std::ref(mAccessors),
488             std::ref(mMutex),
489             std::ref(mCv));
490     evictor.detach();
491 }
492 
addAccessor(const std::weak_ptr<Accessor> & accessor,nsecs_t ts)493 void Accessor::AccessorEvictor::addAccessor(
494         const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
495     std::lock_guard<std::mutex> lock(mMutex);
496     bool notify = mAccessors.empty();
497     auto it = mAccessors.find(accessor);
498     if (it == mAccessors.end()) {
499         mAccessors.emplace(accessor, ts);
500     } else {
501         it->second = ts;
502     }
503     if (notify) {
504         mCv.notify_one();
505     }
506 }
507 
508 std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;
509 
createEvictor()510 void Accessor::createEvictor() {
511     if (!sEvictor) {
512         sEvictor = std::make_unique<Accessor::AccessorEvictor>();
513     }
514 }
515 
scheduleEvictIfNeeded()516 void Accessor::scheduleEvictIfNeeded() {
517     nsecs_t now = systemTime();
518 
519     if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
520         mScheduleEvictTs = now;
521         sEvictor->addAccessor(ref<Accessor>(), now);
522     }
523 }
524 
525 }  // namespace aidl::android::hardware::media::bufferpool2::implemntation {
526