1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor2.0"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "AccessorImpl.h"
27 #include "Connection.h"
28 
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35 
36 namespace {
37     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
38     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
39 
40     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
41     static constexpr size_t kMinBufferCountForEviction = 25;
42 
43     static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
44     static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
45 }
46 
47 // Buffer structure in bufferpool process
48 struct InternalBuffer {
49     BufferId mId;
50     size_t mOwnerCount;
51     size_t mTransactionCount;
52     const std::shared_ptr<BufferPoolAllocation> mAllocation;
53     const size_t mAllocSize;
54     const std::vector<uint8_t> mConfig;
55     bool mInvalidated;
56 
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer57     InternalBuffer(
58             BufferId id,
59             const std::shared_ptr<BufferPoolAllocation> &alloc,
60             const size_t allocSize,
61             const std::vector<uint8_t> &allocConfig)
62             : mId(id), mOwnerCount(0), mTransactionCount(0),
63             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
64             mInvalidated(false) {}
65 
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer66     const native_handle_t *handle() {
67         return mAllocation->handle();
68     }
69 
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer70     void invalidate() {
71         mInvalidated = true;
72     }
73 };
74 
75 struct TransactionStatus {
76     TransactionId mId;
77     BufferId mBufferId;
78     ConnectionId mSender;
79     ConnectionId mReceiver;
80     BufferStatus mStatus;
81     int64_t mTimestampUs;
82     bool mSenderValidated;
83 
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus84     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
85         mId = message.transactionId;
86         mBufferId = message.bufferId;
87         mStatus = message.newStatus;
88         mTimestampUs = timestampUs;
89         if (mStatus == BufferStatus::TRANSFER_TO) {
90             mSender = message.connectionId;
91             mReceiver = message.targetConnectionId;
92             mSenderValidated = true;
93         } else {
94             mSender = -1LL;
95             mReceiver = message.connectionId;
96             mSenderValidated = false;
97         }
98     }
99 };
100 
101 // Helper template methods for handling map of set.
102 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)103 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
104     auto iter = mapOfSet->find(key);
105     if (iter == mapOfSet->end()) {
106         std::set<U> valueSet{value};
107         mapOfSet->insert(std::make_pair(key, valueSet));
108         return true;
109     } else if (iter->second.find(value)  == iter->second.end()) {
110         iter->second.insert(value);
111         return true;
112     }
113     return false;
114 }
115 
116 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)117 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
118     bool ret = false;
119     auto iter = mapOfSet->find(key);
120     if (iter != mapOfSet->end()) {
121         if (iter->second.erase(value) > 0) {
122             ret = true;
123         }
124         if (iter->second.size() == 0) {
125             mapOfSet->erase(iter);
126         }
127     }
128     return ret;
129 }
130 
131 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)132 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
133     auto iter = mapOfSet->find(key);
134     if (iter != mapOfSet->end()) {
135         auto setIter = iter->second.find(value);
136         return setIter != iter->second.end();
137     }
138     return false;
139 }
140 
141 #ifdef __ANDROID_VNDK__
142 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
143 #else
144 static constexpr uint32_t kSeqIdVndkBit = 0;
145 #endif
146 
147 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
148 uint32_t Accessor::Impl::sSeqId = time(nullptr) & kSeqIdMax;
149 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)150 Accessor::Impl::Impl(
151         const std::shared_ptr<BufferPoolAllocator> &allocator)
152         : mAllocator(allocator), mScheduleEvictTs(0) {}
153 
~Impl()154 Accessor::Impl::~Impl() {
155 }
156 
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)157 ResultStatus Accessor::Impl::connect(
158         const sp<Accessor> &accessor, const sp<IObserver> &observer,
159         sp<Connection> *connection,
160         ConnectionId *pConnectionId,
161         uint32_t *pMsgId,
162         const StatusDescriptor** statusDescPtr,
163         const InvalidationDescriptor** invDescPtr) {
164     sp<Connection> newConnection = new Connection();
165     ResultStatus status = ResultStatus::CRITICAL_ERROR;
166     {
167         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
168         if (newConnection) {
169             int32_t pid = getpid();
170             ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
171             status = mBufferPool.mObserver.open(id, statusDescPtr);
172             if (status == ResultStatus::OK) {
173                 newConnection->initialize(accessor, id);
174                 *connection = newConnection;
175                 *pConnectionId = id;
176                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
177                 mBufferPool.mConnectionIds.insert(id);
178                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
179                 mBufferPool.mInvalidation.onConnect(id, observer);
180                 if (sSeqId == kSeqIdMax) {
181                    sSeqId = 0;
182                 } else {
183                     ++sSeqId;
184                 }
185             }
186 
187         }
188         mBufferPool.processStatusMessages();
189         mBufferPool.cleanUp();
190         scheduleEvictIfNeeded();
191     }
192     return status;
193 }
194 
close(ConnectionId connectionId)195 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
196     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
197     ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
198     mBufferPool.processStatusMessages();
199     mBufferPool.handleClose(connectionId);
200     mBufferPool.mObserver.close(connectionId);
201     mBufferPool.mInvalidation.onClose(connectionId);
202     // Since close# will be called after all works are finished, it is OK to
203     // evict unused buffers.
204     mBufferPool.cleanUp(true);
205     scheduleEvictIfNeeded();
206     return ResultStatus::OK;
207 }
208 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)209 ResultStatus Accessor::Impl::allocate(
210         ConnectionId connectionId, const std::vector<uint8_t>& params,
211         BufferId *bufferId, const native_handle_t** handle) {
212     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
213     mBufferPool.processStatusMessages();
214     ResultStatus status = ResultStatus::OK;
215     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
216         lock.unlock();
217         std::shared_ptr<BufferPoolAllocation> alloc;
218         size_t allocSize;
219         status = mAllocator->allocate(params, &alloc, &allocSize);
220         lock.lock();
221         if (status == ResultStatus::OK) {
222             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
223         }
224         ALOGV("create a buffer %d : %u %p",
225               status == ResultStatus::OK, *bufferId, *handle);
226     }
227     if (status == ResultStatus::OK) {
228         // TODO: handle ownBuffer failure
229         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
230     }
231     mBufferPool.cleanUp();
232     scheduleEvictIfNeeded();
233     return status;
234 }
235 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)236 ResultStatus Accessor::Impl::fetch(
237         ConnectionId connectionId, TransactionId transactionId,
238         BufferId bufferId, const native_handle_t** handle) {
239     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
240     mBufferPool.processStatusMessages();
241     auto found = mBufferPool.mTransactions.find(transactionId);
242     if (found != mBufferPool.mTransactions.end() &&
243             contains(&mBufferPool.mPendingTransactions,
244                      connectionId, transactionId)) {
245         if (found->second->mSenderValidated &&
246                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
247                 found->second->mBufferId == bufferId) {
248             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
249             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
250             if (bufferIt != mBufferPool.mBuffers.end()) {
251                 mBufferPool.mStats.onBufferFetched();
252                 *handle = bufferIt->second->handle();
253                 return ResultStatus::OK;
254             }
255         }
256     }
257     mBufferPool.cleanUp();
258     scheduleEvictIfNeeded();
259     return ResultStatus::CRITICAL_ERROR;
260 }
261 
cleanUp(bool clearCache)262 void Accessor::Impl::cleanUp(bool clearCache) {
263     // transaction timeout, buffer cacheing TTL handling
264     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
265     mBufferPool.processStatusMessages();
266     mBufferPool.cleanUp(clearCache);
267 }
268 
flush()269 void Accessor::Impl::flush() {
270     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
271     mBufferPool.processStatusMessages();
272     mBufferPool.flush(shared_from_this());
273 }
274 
handleInvalidateAck()275 void Accessor::Impl::handleInvalidateAck() {
276     std::map<ConnectionId, const sp<IObserver>> observers;
277     uint32_t invalidationId;
278     {
279         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
280         mBufferPool.processStatusMessages();
281         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
282     }
283     // Do not hold lock for send invalidations
284     size_t deadClients = 0;
285     for (auto it = observers.begin(); it != observers.end(); ++it) {
286         const sp<IObserver> observer = it->second;
287         if (observer) {
288             Return<void> transResult = observer->onMessage(it->first, invalidationId);
289             if (!transResult.isOk()) {
290                 ++deadClients;
291             }
292         }
293     }
294     if (deadClients > 0) {
295         ALOGD("During invalidation found %zu dead clients", deadClients);
296     }
297 }
298 
isValid()299 bool Accessor::Impl::isValid() {
300     return mBufferPool.isValid();
301 }
302 
BufferPool()303 Accessor::Impl::Impl::BufferPool::BufferPool()
304     : mTimestampUs(getTimestampNow()),
305       mLastCleanUpUs(mTimestampUs),
306       mLastLogUs(mTimestampUs),
307       mSeq(0),
308       mStartSeq(0) {
309     mValid = mInvalidationChannel.isValid();
310 }
311 
312 
313 // Statistics helper
314 template<typename T, typename S>
percentage(T base,S total)315 int percentage(T base, S total) {
316     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
317 }
318 
319 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
320 
~BufferPool()321 Accessor::Impl::Impl::BufferPool::~BufferPool() {
322     std::lock_guard<std::mutex> lock(mMutex);
323     ALOGD("Destruction - bufferpool2 %p "
324           "cached: %zu/%zuM, %zu/%d%% in use; "
325           "allocs: %zu, %d%% recycled; "
326           "transfers: %zu, %d%% unfetched",
327           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
328           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
329           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
330           mStats.mTotalTransfers,
331           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
332 }
333 
onConnect(ConnectionId conId,const sp<IObserver> & observer)334 void Accessor::Impl::BufferPool::Invalidation::onConnect(
335         ConnectionId conId, const sp<IObserver>& observer) {
336     mAcks[conId] = mInvalidationId; // starts from current invalidationId
337     mObservers.insert(std::make_pair(conId, observer));
338 }
339 
onClose(ConnectionId conId)340 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
341     mAcks.erase(conId);
342     mObservers.erase(conId);
343 }
344 
onAck(ConnectionId conId,uint32_t msgId)345 void Accessor::Impl::BufferPool::Invalidation::onAck(
346         ConnectionId conId,
347         uint32_t msgId) {
348     auto it = mAcks.find(conId);
349     if (it == mAcks.end()) {
350         ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
351         return;
352     }
353     if (isMessageLater(msgId, it->second)) {
354         mAcks[conId] = msgId;
355     }
356 }
357 
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)358 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
359         BufferId bufferId,
360         BufferInvalidationChannel &channel) {
361     for (auto it = mPendings.begin(); it != mPendings.end();) {
362         if (it->isInvalidated(bufferId)) {
363             uint32_t msgId = 0;
364             if (it->mNeedsAck) {
365                 msgId = ++mInvalidationId;
366                 if (msgId == 0) {
367                     // wrap happens
368                     msgId = ++mInvalidationId;
369                 }
370             }
371             channel.postInvalidation(msgId, it->mFrom, it->mTo);
372             it = mPendings.erase(it);
373             continue;
374         }
375         ++it;
376     }
377 }
378 
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)379 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
380         bool needsAck,
381         uint32_t from,
382         uint32_t to,
383         size_t left,
384         BufferInvalidationChannel &channel,
385         const std::shared_ptr<Accessor::Impl> &impl) {
386         uint32_t msgId = 0;
387     if (needsAck) {
388         msgId = ++mInvalidationId;
389         if (msgId == 0) {
390             // wrap happens
391             msgId = ++mInvalidationId;
392         }
393     }
394     ALOGV("bufferpool2 invalidation requested and queued");
395     if (left == 0) {
396         channel.postInvalidation(msgId, from, to);
397     } else {
398         // TODO: sending hint message?
399         ALOGV("bufferpoo2 invalidation requested and pending");
400         Pending pending(needsAck, from, to, left, impl);
401         mPendings.push_back(pending);
402     }
403     sInvalidator->addAccessor(mId, impl);
404 }
405 
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)406 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
407         std::map<ConnectionId, const sp<IObserver>> *observers,
408         uint32_t *invalidationId) {
409     if (mInvalidationId != 0) {
410         *invalidationId = mInvalidationId;
411         std::set<int> deads;
412         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
413             if (it->second != mInvalidationId) {
414                 const sp<IObserver> observer = mObservers[it->first];
415                 if (observer) {
416                     observers->emplace(it->first, observer);
417                     ALOGV("connection %lld will call observer (%u: %u)",
418                           (long long)it->first, it->second, mInvalidationId);
419                     // N.B: onMessage will be called later. ignore possibility of
420                     // onMessage# oneway call being lost.
421                     it->second = mInvalidationId;
422                 } else {
423                     ALOGV("bufferpool2 observer died %lld", (long long)it->first);
424                     deads.insert(it->first);
425                 }
426             }
427         }
428         if (deads.size() > 0) {
429             for (auto it = deads.begin(); it != deads.end(); ++it) {
430                 onClose(*it);
431             }
432         }
433     }
434     if (mPendings.size() == 0) {
435         // All invalidation Ids are synced and no more pending invalidations.
436         sInvalidator->delAccessor(mId);
437     }
438 }
439 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)440 bool Accessor::Impl::BufferPool::handleOwnBuffer(
441         ConnectionId connectionId, BufferId bufferId) {
442 
443     bool added = insert(&mUsingBuffers, connectionId, bufferId);
444     if (added) {
445         auto iter = mBuffers.find(bufferId);
446         iter->second->mOwnerCount++;
447     }
448     insert(&mUsingConnections, bufferId, connectionId);
449     return added;
450 }
451 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)452 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
453         ConnectionId connectionId, BufferId bufferId) {
454     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
455     if (deleted) {
456         auto iter = mBuffers.find(bufferId);
457         iter->second->mOwnerCount--;
458         if (iter->second->mOwnerCount == 0 &&
459                 iter->second->mTransactionCount == 0) {
460             if (!iter->second->mInvalidated) {
461                 mStats.onBufferUnused(iter->second->mAllocSize);
462                 mFreeBuffers.insert(bufferId);
463             } else {
464                 mStats.onBufferUnused(iter->second->mAllocSize);
465                 mStats.onBufferEvicted(iter->second->mAllocSize);
466                 mBuffers.erase(iter);
467                 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
468             }
469         }
470     }
471     erase(&mUsingConnections, bufferId, connectionId);
472     ALOGV("release buffer %u : %d", bufferId, deleted);
473     return deleted;
474 }
475 
handleTransferTo(const BufferStatusMessage & message)476 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
477     auto completed = mCompletedTransactions.find(
478             message.transactionId);
479     if (completed != mCompletedTransactions.end()) {
480         // already completed
481         mCompletedTransactions.erase(completed);
482         return true;
483     }
484     // the buffer should exist and be owned.
485     auto bufferIter = mBuffers.find(message.bufferId);
486     if (bufferIter == mBuffers.end() ||
487             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
488         return false;
489     }
490     auto found = mTransactions.find(message.transactionId);
491     if (found != mTransactions.end()) {
492         // transfer_from was received earlier.
493         found->second->mSender = message.connectionId;
494         found->second->mSenderValidated = true;
495         return true;
496     }
497     if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
498         // N.B: it could be fake or receive connection already closed.
499         ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
500               this, (long long)message.targetConnectionId);
501         return false;
502     }
503     mStats.onBufferSent();
504     mTransactions.insert(std::make_pair(
505             message.transactionId,
506             std::make_unique<TransactionStatus>(message, mTimestampUs)));
507     insert(&mPendingTransactions, message.targetConnectionId,
508            message.transactionId);
509     bufferIter->second->mTransactionCount++;
510     return true;
511 }
512 
handleTransferFrom(const BufferStatusMessage & message)513 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
514     auto found = mTransactions.find(message.transactionId);
515     if (found == mTransactions.end()) {
516         // TODO: is it feasible to check ownership here?
517         mStats.onBufferSent();
518         mTransactions.insert(std::make_pair(
519                 message.transactionId,
520                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
521         insert(&mPendingTransactions, message.connectionId,
522                message.transactionId);
523         auto bufferIter = mBuffers.find(message.bufferId);
524         bufferIter->second->mTransactionCount++;
525     } else {
526         if (message.connectionId == found->second->mReceiver) {
527             found->second->mStatus = BufferStatus::TRANSFER_FROM;
528         }
529     }
530     return true;
531 }
532 
handleTransferResult(const BufferStatusMessage & message)533 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
534     auto found = mTransactions.find(message.transactionId);
535     if (found != mTransactions.end()) {
536         bool deleted = erase(&mPendingTransactions, message.connectionId,
537                              message.transactionId);
538         if (deleted) {
539             if (!found->second->mSenderValidated) {
540                 mCompletedTransactions.insert(message.transactionId);
541             }
542             auto bufferIter = mBuffers.find(message.bufferId);
543             if (message.newStatus == BufferStatus::TRANSFER_OK) {
544                 handleOwnBuffer(message.connectionId, message.bufferId);
545             }
546             bufferIter->second->mTransactionCount--;
547             if (bufferIter->second->mOwnerCount == 0
548                 && bufferIter->second->mTransactionCount == 0) {
549                 if (!bufferIter->second->mInvalidated) {
550                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
551                     mFreeBuffers.insert(message.bufferId);
552                 } else {
553                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
554                     mStats.onBufferEvicted(bufferIter->second->mAllocSize);
555                     mBuffers.erase(bufferIter);
556                     mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
557                 }
558             }
559             mTransactions.erase(found);
560         }
561         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
562               message.bufferId, deleted);
563         return deleted;
564     }
565     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
566           message.bufferId);
567     return false;
568 }
569 
processStatusMessages()570 void Accessor::Impl::BufferPool::processStatusMessages() {
571     std::vector<BufferStatusMessage> messages;
572     mObserver.getBufferStatusChanges(messages);
573     mTimestampUs = getTimestampNow();
574     for (BufferStatusMessage& message: messages) {
575         bool ret = false;
576         switch (message.newStatus) {
577             case BufferStatus::NOT_USED:
578                 ret = handleReleaseBuffer(
579                         message.connectionId, message.bufferId);
580                 break;
581             case BufferStatus::USED:
582                 // not happening
583                 break;
584             case BufferStatus::TRANSFER_TO:
585                 ret = handleTransferTo(message);
586                 break;
587             case BufferStatus::TRANSFER_FROM:
588                 ret = handleTransferFrom(message);
589                 break;
590             case BufferStatus::TRANSFER_TIMEOUT:
591                 // TODO
592                 break;
593             case BufferStatus::TRANSFER_LOST:
594                 // TODO
595                 break;
596             case BufferStatus::TRANSFER_FETCH:
597                 // not happening
598                 break;
599             case BufferStatus::TRANSFER_OK:
600             case BufferStatus::TRANSFER_ERROR:
601                 ret = handleTransferResult(message);
602                 break;
603             case BufferStatus::INVALIDATION_ACK:
604                 mInvalidation.onAck(message.connectionId, message.bufferId);
605                 ret = true;
606                 break;
607         }
608         if (ret == false) {
609             ALOGW("buffer status message processing failure - message : %d connection : %lld",
610                   message.newStatus, (long long)message.connectionId);
611         }
612     }
613     messages.clear();
614 }
615 
handleClose(ConnectionId connectionId)616 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
617     // Cleaning buffers
618     auto buffers = mUsingBuffers.find(connectionId);
619     if (buffers != mUsingBuffers.end()) {
620         for (const BufferId& bufferId : buffers->second) {
621             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
622             if (deleted) {
623                 auto bufferIter = mBuffers.find(bufferId);
624                 bufferIter->second->mOwnerCount--;
625                 if (bufferIter->second->mOwnerCount == 0 &&
626                         bufferIter->second->mTransactionCount == 0) {
627                     // TODO: handle freebuffer insert fail
628                     if (!bufferIter->second->mInvalidated) {
629                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
630                         mFreeBuffers.insert(bufferId);
631                     } else {
632                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
633                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
634                         mBuffers.erase(bufferIter);
635                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
636                     }
637                 }
638             }
639         }
640         mUsingBuffers.erase(buffers);
641     }
642 
643     // Cleaning transactions
644     auto pending = mPendingTransactions.find(connectionId);
645     if (pending != mPendingTransactions.end()) {
646         for (const TransactionId& transactionId : pending->second) {
647             auto iter = mTransactions.find(transactionId);
648             if (iter != mTransactions.end()) {
649                 if (!iter->second->mSenderValidated) {
650                     mCompletedTransactions.insert(transactionId);
651                 }
652                 BufferId bufferId = iter->second->mBufferId;
653                 auto bufferIter = mBuffers.find(bufferId);
654                 bufferIter->second->mTransactionCount--;
655                 if (bufferIter->second->mOwnerCount == 0 &&
656                     bufferIter->second->mTransactionCount == 0) {
657                     // TODO: handle freebuffer insert fail
658                     if (!bufferIter->second->mInvalidated) {
659                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
660                         mFreeBuffers.insert(bufferId);
661                     } else {
662                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
663                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
664                         mBuffers.erase(bufferIter);
665                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
666                     }
667                 }
668                 mTransactions.erase(iter);
669             }
670         }
671     }
672     mConnectionIds.erase(connectionId);
673     return true;
674 }
675 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)676 bool Accessor::Impl::BufferPool::getFreeBuffer(
677         const std::shared_ptr<BufferPoolAllocator> &allocator,
678         const std::vector<uint8_t> &params, BufferId *pId,
679         const native_handle_t** handle) {
680     auto bufferIt = mFreeBuffers.begin();
681     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
682         BufferId bufferId = *bufferIt;
683         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
684             break;
685         }
686     }
687     if (bufferIt != mFreeBuffers.end()) {
688         BufferId id = *bufferIt;
689         mFreeBuffers.erase(bufferIt);
690         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
691         *handle = mBuffers[id]->handle();
692         *pId = id;
693         ALOGV("recycle a buffer %u %p", id, *handle);
694         return true;
695     }
696     return false;
697 }
698 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)699 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
700         const std::shared_ptr<BufferPoolAllocation> &alloc,
701         const size_t allocSize,
702         const std::vector<uint8_t> &params,
703         BufferId *pId,
704         const native_handle_t** handle) {
705 
706     BufferId bufferId = mSeq++;
707     if (mSeq == Connection::SYNC_BUFFERID) {
708         mSeq = 0;
709     }
710     std::unique_ptr<InternalBuffer> buffer =
711             std::make_unique<InternalBuffer>(
712                     bufferId, alloc, allocSize, params);
713     if (buffer) {
714         auto res = mBuffers.insert(std::make_pair(
715                 bufferId, std::move(buffer)));
716         if (res.second) {
717             mStats.onBufferAllocated(allocSize);
718             *handle = alloc->handle();
719             *pId = bufferId;
720             return ResultStatus::OK;
721         }
722     }
723     return ResultStatus::NO_MEMORY;
724 }
725 
cleanUp(bool clearCache)726 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
727     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
728         mLastCleanUpUs = mTimestampUs;
729         if (mTimestampUs > mLastLogUs + kLogDurationUs) {
730             mLastLogUs = mTimestampUs;
731             ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
732                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
733                   "%zu/%zu (fetch/transfer)",
734                   this, mStats.mBuffersCached, mStats.mSizeCached,
735                   mStats.mBuffersInUse, mStats.mSizeInUse,
736                   mStats.mTotalRecycles, mStats.mTotalAllocations,
737                   mStats.mTotalFetches, mStats.mTotalTransfers);
738         }
739         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
740             if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
741                     || mBuffers.size() < kMinBufferCountForEviction)) {
742                 break;
743             }
744             auto it = mBuffers.find(*freeIt);
745             if (it != mBuffers.end() &&
746                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
747                 mStats.onBufferEvicted(it->second->mAllocSize);
748                 mBuffers.erase(it);
749                 freeIt = mFreeBuffers.erase(freeIt);
750             } else {
751                 ++freeIt;
752                 ALOGW("bufferpool2 inconsistent!");
753             }
754         }
755     }
756 }
757 
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)758 void Accessor::Impl::BufferPool::invalidate(
759         bool needsAck, BufferId from, BufferId to,
760         const std::shared_ptr<Accessor::Impl> &impl) {
761     for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
762         if (isBufferInRange(from, to, *freeIt)) {
763             auto it = mBuffers.find(*freeIt);
764             if (it != mBuffers.end() &&
765                 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
766                 mStats.onBufferEvicted(it->second->mAllocSize);
767                 mBuffers.erase(it);
768                 freeIt = mFreeBuffers.erase(freeIt);
769                 continue;
770             } else {
771                 ALOGW("bufferpool2 inconsistent!");
772             }
773         }
774         ++freeIt;
775     }
776 
777     size_t left = 0;
778     for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
779         if (isBufferInRange(from, to, it->first)) {
780             it->second->invalidate();
781             ++left;
782         }
783     }
784     mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
785 }
786 
flush(const std::shared_ptr<Accessor::Impl> & impl)787 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
788     BufferId from = mStartSeq;
789     BufferId to = mSeq;
790     mStartSeq = mSeq;
791     // TODO: needsAck params
792     ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
793     if (from != to) {
794         invalidate(true, from, to, impl);
795     }
796 }
797 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)798 void Accessor::Impl::invalidatorThread(
799             std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
800             std::mutex &mutex,
801             std::condition_variable &cv,
802             bool &ready) {
803     constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
804     constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
805     constexpr useconds_t MAX_SLEEP_US = 10000;
806     uint32_t numSpin = 0;
807     useconds_t sleepUs = 1;
808 
809     while(true) {
810         std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
811         {
812             std::unique_lock<std::mutex> lock(mutex);
813             if (!ready) {
814                 numSpin = 0;
815                 sleepUs = 1;
816                 cv.wait(lock);
817             }
818             copied.insert(accessors.begin(), accessors.end());
819         }
820         std::list<ConnectionId> erased;
821         for (auto it = copied.begin(); it != copied.end(); ++it) {
822             const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
823             if (!impl) {
824                 erased.push_back(it->first);
825             } else {
826                 impl->handleInvalidateAck();
827             }
828         }
829         {
830             std::unique_lock<std::mutex> lock(mutex);
831             for (auto it = erased.begin(); it != erased.end(); ++it) {
832                 accessors.erase(*it);
833             }
834             if (accessors.size() == 0) {
835                 ready = false;
836             } else {
837                 // TODO Use an efficient way to wait over FMQ.
838                 // N.B. Since there is not a efficient way to wait over FMQ,
839                 // polling over the FMQ is the current way to prevent draining
840                 // CPU.
841                 lock.unlock();
842                 ++numSpin;
843                 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
844                     sleepUs < MAX_SLEEP_US) {
845                     sleepUs *= 10;
846                 }
847                 if (numSpin % NUM_SPIN_TO_LOG == 0) {
848                     ALOGW("invalidator thread spinning");
849                 }
850                 ::usleep(sleepUs);
851             }
852         }
853     }
854 }
855 
AccessorInvalidator()856 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
857     std::thread invalidator(
858             invalidatorThread,
859             std::ref(mAccessors),
860             std::ref(mMutex),
861             std::ref(mCv),
862             std::ref(mReady));
863     invalidator.detach();
864 }
865 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)866 void Accessor::Impl::AccessorInvalidator::addAccessor(
867         uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
868     bool notify = false;
869     std::unique_lock<std::mutex> lock(mMutex);
870     if (mAccessors.find(accessorId) == mAccessors.end()) {
871         if (!mReady) {
872             mReady = true;
873             notify = true;
874         }
875         mAccessors.insert(std::make_pair(accessorId, impl));
876         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
877     }
878     lock.unlock();
879     if (notify) {
880         mCv.notify_one();
881     }
882 }
883 
delAccessor(uint32_t accessorId)884 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
885     std::lock_guard<std::mutex> lock(mMutex);
886     mAccessors.erase(accessorId);
887     ALOGV("buffer invalidation deleted bp:%u", accessorId);
888     if (mAccessors.size() == 0) {
889         mReady = false;
890     }
891 }
892 
893 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
894 
createInvalidator()895 void Accessor::Impl::createInvalidator() {
896     if (!sInvalidator) {
897         sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
898     }
899 }
900 
evictorThread(std::map<const std::weak_ptr<Accessor::Impl>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)901 void Accessor::Impl::evictorThread(
902         std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
903         std::mutex &mutex,
904         std::condition_variable &cv) {
905     std::list<const std::weak_ptr<Accessor::Impl>> evictList;
906     while (true) {
907         int expired = 0;
908         int evicted = 0;
909         {
910             nsecs_t now = systemTime();
911             std::unique_lock<std::mutex> lock(mutex);
912             if (accessors.size() == 0) {
913                 cv.wait(lock);
914             }
915             auto it = accessors.begin();
916             while (it != accessors.end()) {
917                 if (now > (it->second + kEvictDurationNs)) {
918                     ++expired;
919                     evictList.push_back(it->first);
920                     it = accessors.erase(it);
921                 } else {
922                     ++it;
923                 }
924             }
925         }
926         // evict idle accessors;
927         for (auto it = evictList.begin(); it != evictList.end(); ++it) {
928             const std::shared_ptr<Accessor::Impl> accessor = it->lock();
929             if (accessor) {
930                 accessor->cleanUp(true);
931                 ++evicted;
932             }
933         }
934         if (expired > 0) {
935             ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
936         }
937         evictList.clear();
938         ::usleep(kEvictGranularityNs / 1000);
939     }
940 }
941 
AccessorEvictor()942 Accessor::Impl::AccessorEvictor::AccessorEvictor() {
943     std::thread evictor(
944             evictorThread,
945             std::ref(mAccessors),
946             std::ref(mMutex),
947             std::ref(mCv));
948     evictor.detach();
949 }
950 
addAccessor(const std::weak_ptr<Accessor::Impl> & impl,nsecs_t ts)951 void Accessor::Impl::AccessorEvictor::addAccessor(
952         const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
953     std::lock_guard<std::mutex> lock(mMutex);
954     bool notify = mAccessors.empty();
955     auto it = mAccessors.find(impl);
956     if (it == mAccessors.end()) {
957         mAccessors.emplace(impl, ts);
958     } else {
959         it->second = ts;
960     }
961     if (notify) {
962         mCv.notify_one();
963     }
964 }
965 
966 std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
967 
createEvictor()968 void Accessor::Impl::createEvictor() {
969     if (!sEvictor) {
970         sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
971     }
972 }
973 
scheduleEvictIfNeeded()974 void Accessor::Impl::scheduleEvictIfNeeded() {
975     nsecs_t now = systemTime();
976 
977     if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
978         mScheduleEvictTs = now;
979         sEvictor->addAccessor(shared_from_this(), now);
980     }
981 }
982 
983 }  // namespace implementation
984 }  // namespace V2_0
985 }  // namespace bufferpool
986 }  // namespace media
987 }  // namespace hardware
988 }  // namespace android
989