1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include <thread>
25 #include "AccessorImpl.h"
26 #include "Connection.h"
27 
28 namespace android {
29 namespace hardware {
30 namespace media {
31 namespace bufferpool {
32 namespace V2_0 {
33 namespace implementation {
34 
35 namespace {
36     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38 
39     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40     static constexpr size_t kMinBufferCountForEviction = 40;
41 }
42 
43 // Buffer structure in bufferpool process
44 struct InternalBuffer {
45     BufferId mId;
46     size_t mOwnerCount;
47     size_t mTransactionCount;
48     const std::shared_ptr<BufferPoolAllocation> mAllocation;
49     const size_t mAllocSize;
50     const std::vector<uint8_t> mConfig;
51     bool mInvalidated;
52 
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer53     InternalBuffer(
54             BufferId id,
55             const std::shared_ptr<BufferPoolAllocation> &alloc,
56             const size_t allocSize,
57             const std::vector<uint8_t> &allocConfig)
58             : mId(id), mOwnerCount(0), mTransactionCount(0),
59             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60             mInvalidated(false) {}
61 
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer62     const native_handle_t *handle() {
63         return mAllocation->handle();
64     }
65 
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer66     void invalidate() {
67         mInvalidated = true;
68     }
69 };
70 
71 struct TransactionStatus {
72     TransactionId mId;
73     BufferId mBufferId;
74     ConnectionId mSender;
75     ConnectionId mReceiver;
76     BufferStatus mStatus;
77     int64_t mTimestampUs;
78     bool mSenderValidated;
79 
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus80     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81         mId = message.transactionId;
82         mBufferId = message.bufferId;
83         mStatus = message.newStatus;
84         mTimestampUs = timestampUs;
85         if (mStatus == BufferStatus::TRANSFER_TO) {
86             mSender = message.connectionId;
87             mReceiver = message.targetConnectionId;
88             mSenderValidated = true;
89         } else {
90             mSender = -1LL;
91             mReceiver = message.connectionId;
92             mSenderValidated = false;
93         }
94     }
95 };
96 
97 // Helper template methods for handling map of set.
98 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)99 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100     auto iter = mapOfSet->find(key);
101     if (iter == mapOfSet->end()) {
102         std::set<U> valueSet{value};
103         mapOfSet->insert(std::make_pair(key, valueSet));
104         return true;
105     } else if (iter->second.find(value)  == iter->second.end()) {
106         iter->second.insert(value);
107         return true;
108     }
109     return false;
110 }
111 
112 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)113 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114     bool ret = false;
115     auto iter = mapOfSet->find(key);
116     if (iter != mapOfSet->end()) {
117         if (iter->second.erase(value) > 0) {
118             ret = true;
119         }
120         if (iter->second.size() == 0) {
121             mapOfSet->erase(iter);
122         }
123     }
124     return ret;
125 }
126 
127 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)128 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129     auto iter = mapOfSet->find(key);
130     if (iter != mapOfSet->end()) {
131         auto setIter = iter->second.find(value);
132         return setIter != iter->second.end();
133     }
134     return false;
135 }
136 
137 int32_t Accessor::Impl::sPid = getpid();
138 uint32_t Accessor::Impl::sSeqId = time(nullptr);
139 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)140 Accessor::Impl::Impl(
141         const std::shared_ptr<BufferPoolAllocator> &allocator)
142         : mAllocator(allocator) {}
143 
~Impl()144 Accessor::Impl::~Impl() {
145 }
146 
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)147 ResultStatus Accessor::Impl::connect(
148         const sp<Accessor> &accessor, const sp<IObserver> &observer,
149         sp<Connection> *connection,
150         ConnectionId *pConnectionId,
151         uint32_t *pMsgId,
152         const StatusDescriptor** statusDescPtr,
153         const InvalidationDescriptor** invDescPtr) {
154     sp<Connection> newConnection = new Connection();
155     ResultStatus status = ResultStatus::CRITICAL_ERROR;
156     {
157         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158         if (newConnection) {
159             ConnectionId id = (int64_t)sPid << 32 | sSeqId;
160             status = mBufferPool.mObserver.open(id, statusDescPtr);
161             if (status == ResultStatus::OK) {
162                 newConnection->initialize(accessor, id);
163                 *connection = newConnection;
164                 *pConnectionId = id;
165                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
166                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
167                 mBufferPool.mInvalidation.onConnect(id, observer);
168                 ++sSeqId;
169             }
170 
171         }
172         mBufferPool.processStatusMessages();
173         mBufferPool.cleanUp();
174     }
175     return status;
176 }
177 
close(ConnectionId connectionId)178 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
179     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
180     ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
181     mBufferPool.processStatusMessages();
182     mBufferPool.handleClose(connectionId);
183     mBufferPool.mObserver.close(connectionId);
184     mBufferPool.mInvalidation.onClose(connectionId);
185     // Since close# will be called after all works are finished, it is OK to
186     // evict unused buffers.
187     mBufferPool.cleanUp(true);
188     return ResultStatus::OK;
189 }
190 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)191 ResultStatus Accessor::Impl::allocate(
192         ConnectionId connectionId, const std::vector<uint8_t>& params,
193         BufferId *bufferId, const native_handle_t** handle) {
194     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
195     mBufferPool.processStatusMessages();
196     ResultStatus status = ResultStatus::OK;
197     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
198         lock.unlock();
199         std::shared_ptr<BufferPoolAllocation> alloc;
200         size_t allocSize;
201         status = mAllocator->allocate(params, &alloc, &allocSize);
202         lock.lock();
203         if (status == ResultStatus::OK) {
204             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
205         }
206         ALOGV("create a buffer %d : %u %p",
207               status == ResultStatus::OK, *bufferId, *handle);
208     }
209     if (status == ResultStatus::OK) {
210         // TODO: handle ownBuffer failure
211         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
212     }
213     mBufferPool.cleanUp();
214     return status;
215 }
216 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)217 ResultStatus Accessor::Impl::fetch(
218         ConnectionId connectionId, TransactionId transactionId,
219         BufferId bufferId, const native_handle_t** handle) {
220     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
221     mBufferPool.processStatusMessages();
222     auto found = mBufferPool.mTransactions.find(transactionId);
223     if (found != mBufferPool.mTransactions.end() &&
224             contains(&mBufferPool.mPendingTransactions,
225                      connectionId, transactionId)) {
226         if (found->second->mSenderValidated &&
227                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
228                 found->second->mBufferId == bufferId) {
229             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
230             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
231             if (bufferIt != mBufferPool.mBuffers.end()) {
232                 mBufferPool.mStats.onBufferFetched();
233                 *handle = bufferIt->second->handle();
234                 return ResultStatus::OK;
235             }
236         }
237     }
238     mBufferPool.cleanUp();
239     return ResultStatus::CRITICAL_ERROR;
240 }
241 
cleanUp(bool clearCache)242 void Accessor::Impl::cleanUp(bool clearCache) {
243     // transaction timeout, buffer cacheing TTL handling
244     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
245     mBufferPool.processStatusMessages();
246     mBufferPool.cleanUp(clearCache);
247 }
248 
flush()249 void Accessor::Impl::flush() {
250     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
251     mBufferPool.processStatusMessages();
252     mBufferPool.flush(shared_from_this());
253 }
254 
handleInvalidateAck()255 void Accessor::Impl::handleInvalidateAck() {
256     std::map<ConnectionId, const sp<IObserver>> observers;
257     uint32_t invalidationId;
258     {
259         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
260         mBufferPool.processStatusMessages();
261         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
262     }
263     // Do not hold lock for send invalidations
264     size_t deadClients = 0;
265     for (auto it = observers.begin(); it != observers.end(); ++it) {
266         const sp<IObserver> observer = it->second;
267         if (observer) {
268             Return<void> transResult = observer->onMessage(it->first, invalidationId);
269             if (!transResult.isOk()) {
270                 ++deadClients;
271             }
272         }
273     }
274     if (deadClients > 0) {
275         ALOGD("During invalidation found %zu dead clients", deadClients);
276     }
277 }
278 
isValid()279 bool Accessor::Impl::isValid() {
280     return mBufferPool.isValid();
281 }
282 
BufferPool()283 Accessor::Impl::Impl::BufferPool::BufferPool()
284     : mTimestampUs(getTimestampNow()),
285       mLastCleanUpUs(mTimestampUs),
286       mLastLogUs(mTimestampUs),
287       mSeq(0),
288       mStartSeq(0) {
289     mValid = mInvalidationChannel.isValid();
290 }
291 
292 
293 // Statistics helper
294 template<typename T, typename S>
percentage(T base,S total)295 int percentage(T base, S total) {
296     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
297 }
298 
299 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
300 
~BufferPool()301 Accessor::Impl::Impl::BufferPool::~BufferPool() {
302     std::lock_guard<std::mutex> lock(mMutex);
303     ALOGD("Destruction - bufferpool2 %p "
304           "cached: %zu/%zuM, %zu/%d%% in use; "
305           "allocs: %zu, %d%% recycled; "
306           "transfers: %zu, %d%% unfetced",
307           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
308           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
309           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
310           mStats.mTotalTransfers,
311           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
312 }
313 
onConnect(ConnectionId conId,const sp<IObserver> & observer)314 void Accessor::Impl::BufferPool::Invalidation::onConnect(
315         ConnectionId conId, const sp<IObserver>& observer) {
316     mAcks[conId] = mInvalidationId; // starts from current invalidationId
317     mObservers.insert(std::make_pair(conId, observer));
318 }
319 
onClose(ConnectionId conId)320 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
321     mAcks.erase(conId);
322     mObservers.erase(conId);
323 }
324 
onAck(ConnectionId conId,uint32_t msgId)325 void Accessor::Impl::BufferPool::Invalidation::onAck(
326         ConnectionId conId,
327         uint32_t msgId) {
328     auto it = mAcks.find(conId);
329     if (it == mAcks.end()) {
330         ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
331         return;
332     }
333     if (isMessageLater(msgId, it->second)) {
334         mAcks[conId] = msgId;
335     }
336 }
337 
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)338 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
339         BufferId bufferId,
340         BufferInvalidationChannel &channel) {
341     for (auto it = mPendings.begin(); it != mPendings.end();) {
342         if (it->isInvalidated(bufferId)) {
343             uint32_t msgId = 0;
344             if (it->mNeedsAck) {
345                 msgId = ++mInvalidationId;
346                 if (msgId == 0) {
347                     // wrap happens
348                     msgId = ++mInvalidationId;
349                 }
350             }
351             channel.postInvalidation(msgId, it->mFrom, it->mTo);
352             it = mPendings.erase(it);
353             continue;
354         }
355         ++it;
356     }
357 }
358 
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)359 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
360         bool needsAck,
361         uint32_t from,
362         uint32_t to,
363         size_t left,
364         BufferInvalidationChannel &channel,
365         const std::shared_ptr<Accessor::Impl> &impl) {
366         uint32_t msgId = 0;
367     if (needsAck) {
368         msgId = ++mInvalidationId;
369         if (msgId == 0) {
370             // wrap happens
371             msgId = ++mInvalidationId;
372         }
373     }
374     ALOGV("bufferpool2 invalidation requested and queued");
375     if (left == 0) {
376         channel.postInvalidation(msgId, from, to);
377     } else {
378         // TODO: sending hint message?
379         ALOGV("bufferpoo2 invalidation requested and pending");
380         Pending pending(needsAck, from, to, left, impl);
381         mPendings.push_back(pending);
382     }
383     sInvalidator->addAccessor(mId, impl);
384 }
385 
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)386 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
387         std::map<ConnectionId, const sp<IObserver>> *observers,
388         uint32_t *invalidationId) {
389     if (mInvalidationId != 0) {
390         *invalidationId = mInvalidationId;
391         std::set<int> deads;
392         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
393             if (it->second != mInvalidationId) {
394                 const sp<IObserver> observer = mObservers[it->first];
395                 if (observer) {
396                     observers->emplace(it->first, observer);
397                     ALOGV("connection %lld will call observer (%u: %u)",
398                           (long long)it->first, it->second, mInvalidationId);
399                     // N.B: onMessage will be called later. ignore possibility of
400                     // onMessage# oneway call being lost.
401                     it->second = mInvalidationId;
402                 } else {
403                     ALOGV("bufferpool2 observer died %lld", (long long)it->first);
404                     deads.insert(it->first);
405                 }
406             }
407         }
408         if (deads.size() > 0) {
409             for (auto it = deads.begin(); it != deads.end(); ++it) {
410                 onClose(*it);
411             }
412         }
413     }
414     if (mPendings.size() == 0) {
415         // All invalidation Ids are synced and no more pending invalidations.
416         sInvalidator->delAccessor(mId);
417     }
418 }
419 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)420 bool Accessor::Impl::BufferPool::handleOwnBuffer(
421         ConnectionId connectionId, BufferId bufferId) {
422 
423     bool added = insert(&mUsingBuffers, connectionId, bufferId);
424     if (added) {
425         auto iter = mBuffers.find(bufferId);
426         iter->second->mOwnerCount++;
427     }
428     insert(&mUsingConnections, bufferId, connectionId);
429     return added;
430 }
431 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)432 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
433         ConnectionId connectionId, BufferId bufferId) {
434     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
435     if (deleted) {
436         auto iter = mBuffers.find(bufferId);
437         iter->second->mOwnerCount--;
438         if (iter->second->mOwnerCount == 0 &&
439                 iter->second->mTransactionCount == 0) {
440             if (!iter->second->mInvalidated) {
441                 mStats.onBufferUnused(iter->second->mAllocSize);
442                 mFreeBuffers.insert(bufferId);
443             } else {
444                 mStats.onBufferUnused(iter->second->mAllocSize);
445                 mStats.onBufferEvicted(iter->second->mAllocSize);
446                 mBuffers.erase(iter);
447                 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
448             }
449         }
450     }
451     erase(&mUsingConnections, bufferId, connectionId);
452     ALOGV("release buffer %u : %d", bufferId, deleted);
453     return deleted;
454 }
455 
handleTransferTo(const BufferStatusMessage & message)456 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
457     auto completed = mCompletedTransactions.find(
458             message.transactionId);
459     if (completed != mCompletedTransactions.end()) {
460         // already completed
461         mCompletedTransactions.erase(completed);
462         return true;
463     }
464     // the buffer should exist and be owned.
465     auto bufferIter = mBuffers.find(message.bufferId);
466     if (bufferIter == mBuffers.end() ||
467             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
468         return false;
469     }
470     auto found = mTransactions.find(message.transactionId);
471     if (found != mTransactions.end()) {
472         // transfer_from was received earlier.
473         found->second->mSender = message.connectionId;
474         found->second->mSenderValidated = true;
475         return true;
476     }
477     // TODO: verify there is target connection Id
478     mStats.onBufferSent();
479     mTransactions.insert(std::make_pair(
480             message.transactionId,
481             std::make_unique<TransactionStatus>(message, mTimestampUs)));
482     insert(&mPendingTransactions, message.targetConnectionId,
483            message.transactionId);
484     bufferIter->second->mTransactionCount++;
485     return true;
486 }
487 
handleTransferFrom(const BufferStatusMessage & message)488 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
489     auto found = mTransactions.find(message.transactionId);
490     if (found == mTransactions.end()) {
491         // TODO: is it feasible to check ownership here?
492         mStats.onBufferSent();
493         mTransactions.insert(std::make_pair(
494                 message.transactionId,
495                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
496         insert(&mPendingTransactions, message.connectionId,
497                message.transactionId);
498         auto bufferIter = mBuffers.find(message.bufferId);
499         bufferIter->second->mTransactionCount++;
500     } else {
501         if (message.connectionId == found->second->mReceiver) {
502             found->second->mStatus = BufferStatus::TRANSFER_FROM;
503         }
504     }
505     return true;
506 }
507 
handleTransferResult(const BufferStatusMessage & message)508 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
509     auto found = mTransactions.find(message.transactionId);
510     if (found != mTransactions.end()) {
511         bool deleted = erase(&mPendingTransactions, message.connectionId,
512                              message.transactionId);
513         if (deleted) {
514             if (!found->second->mSenderValidated) {
515                 mCompletedTransactions.insert(message.transactionId);
516             }
517             auto bufferIter = mBuffers.find(message.bufferId);
518             if (message.newStatus == BufferStatus::TRANSFER_OK) {
519                 handleOwnBuffer(message.connectionId, message.bufferId);
520             }
521             bufferIter->second->mTransactionCount--;
522             if (bufferIter->second->mOwnerCount == 0
523                 && bufferIter->second->mTransactionCount == 0) {
524                 if (!bufferIter->second->mInvalidated) {
525                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
526                     mFreeBuffers.insert(message.bufferId);
527                 } else {
528                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
529                     mStats.onBufferEvicted(bufferIter->second->mAllocSize);
530                     mBuffers.erase(bufferIter);
531                     mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
532                 }
533             }
534             mTransactions.erase(found);
535         }
536         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
537               message.bufferId, deleted);
538         return deleted;
539     }
540     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
541           message.bufferId);
542     return false;
543 }
544 
processStatusMessages()545 void Accessor::Impl::BufferPool::processStatusMessages() {
546     std::vector<BufferStatusMessage> messages;
547     mObserver.getBufferStatusChanges(messages);
548     mTimestampUs = getTimestampNow();
549     for (BufferStatusMessage& message: messages) {
550         bool ret = false;
551         switch (message.newStatus) {
552             case BufferStatus::NOT_USED:
553                 ret = handleReleaseBuffer(
554                         message.connectionId, message.bufferId);
555                 break;
556             case BufferStatus::USED:
557                 // not happening
558                 break;
559             case BufferStatus::TRANSFER_TO:
560                 ret = handleTransferTo(message);
561                 break;
562             case BufferStatus::TRANSFER_FROM:
563                 ret = handleTransferFrom(message);
564                 break;
565             case BufferStatus::TRANSFER_TIMEOUT:
566                 // TODO
567                 break;
568             case BufferStatus::TRANSFER_LOST:
569                 // TODO
570                 break;
571             case BufferStatus::TRANSFER_FETCH:
572                 // not happening
573                 break;
574             case BufferStatus::TRANSFER_OK:
575             case BufferStatus::TRANSFER_ERROR:
576                 ret = handleTransferResult(message);
577                 break;
578             case BufferStatus::INVALIDATION_ACK:
579                 mInvalidation.onAck(message.connectionId, message.bufferId);
580                 ret = true;
581                 break;
582         }
583         if (ret == false) {
584             ALOGW("buffer status message processing failure - message : %d connection : %lld",
585                   message.newStatus, (long long)message.connectionId);
586         }
587     }
588     messages.clear();
589 }
590 
handleClose(ConnectionId connectionId)591 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
592     // Cleaning buffers
593     auto buffers = mUsingBuffers.find(connectionId);
594     if (buffers != mUsingBuffers.end()) {
595         for (const BufferId& bufferId : buffers->second) {
596             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
597             if (deleted) {
598                 auto bufferIter = mBuffers.find(bufferId);
599                 bufferIter->second->mOwnerCount--;
600                 if (bufferIter->second->mOwnerCount == 0 &&
601                         bufferIter->second->mTransactionCount == 0) {
602                     // TODO: handle freebuffer insert fail
603                     if (!bufferIter->second->mInvalidated) {
604                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
605                         mFreeBuffers.insert(bufferId);
606                     } else {
607                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
608                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
609                         mBuffers.erase(bufferIter);
610                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
611                     }
612                 }
613             }
614         }
615         mUsingBuffers.erase(buffers);
616     }
617 
618     // Cleaning transactions
619     auto pending = mPendingTransactions.find(connectionId);
620     if (pending != mPendingTransactions.end()) {
621         for (const TransactionId& transactionId : pending->second) {
622             auto iter = mTransactions.find(transactionId);
623             if (iter != mTransactions.end()) {
624                 if (!iter->second->mSenderValidated) {
625                     mCompletedTransactions.insert(transactionId);
626                 }
627                 BufferId bufferId = iter->second->mBufferId;
628                 auto bufferIter = mBuffers.find(bufferId);
629                 bufferIter->second->mTransactionCount--;
630                 if (bufferIter->second->mOwnerCount == 0 &&
631                     bufferIter->second->mTransactionCount == 0) {
632                     // TODO: handle freebuffer insert fail
633                     if (!bufferIter->second->mInvalidated) {
634                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
635                         mFreeBuffers.insert(bufferId);
636                     } else {
637                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
638                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
639                         mBuffers.erase(bufferIter);
640                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
641                     }
642                 }
643                 mTransactions.erase(iter);
644             }
645         }
646     }
647     return true;
648 }
649 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)650 bool Accessor::Impl::BufferPool::getFreeBuffer(
651         const std::shared_ptr<BufferPoolAllocator> &allocator,
652         const std::vector<uint8_t> &params, BufferId *pId,
653         const native_handle_t** handle) {
654     auto bufferIt = mFreeBuffers.begin();
655     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
656         BufferId bufferId = *bufferIt;
657         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
658             break;
659         }
660     }
661     if (bufferIt != mFreeBuffers.end()) {
662         BufferId id = *bufferIt;
663         mFreeBuffers.erase(bufferIt);
664         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
665         *handle = mBuffers[id]->handle();
666         *pId = id;
667         ALOGV("recycle a buffer %u %p", id, *handle);
668         return true;
669     }
670     return false;
671 }
672 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)673 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
674         const std::shared_ptr<BufferPoolAllocation> &alloc,
675         const size_t allocSize,
676         const std::vector<uint8_t> &params,
677         BufferId *pId,
678         const native_handle_t** handle) {
679 
680     BufferId bufferId = mSeq++;
681     if (mSeq == Connection::SYNC_BUFFERID) {
682         mSeq = 0;
683     }
684     std::unique_ptr<InternalBuffer> buffer =
685             std::make_unique<InternalBuffer>(
686                     bufferId, alloc, allocSize, params);
687     if (buffer) {
688         auto res = mBuffers.insert(std::make_pair(
689                 bufferId, std::move(buffer)));
690         if (res.second) {
691             mStats.onBufferAllocated(allocSize);
692             *handle = alloc->handle();
693             *pId = bufferId;
694             return ResultStatus::OK;
695         }
696     }
697     return ResultStatus::NO_MEMORY;
698 }
699 
cleanUp(bool clearCache)700 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
701     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
702         mLastCleanUpUs = mTimestampUs;
703         if (mTimestampUs > mLastLogUs + kLogDurationUs) {
704             mLastLogUs = mTimestampUs;
705             ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
706                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
707                   "%zu/%zu (fetch/transfer)",
708                   this, mStats.mBuffersCached, mStats.mSizeCached,
709                   mStats.mBuffersInUse, mStats.mSizeInUse,
710                   mStats.mTotalRecycles, mStats.mTotalAllocations,
711                   mStats.mTotalFetches, mStats.mTotalTransfers);
712         }
713         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
714             if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
715                     && mBuffers.size() < kMinBufferCountForEviction) {
716                 break;
717             }
718             auto it = mBuffers.find(*freeIt);
719             if (it != mBuffers.end() &&
720                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
721                 mStats.onBufferEvicted(it->second->mAllocSize);
722                 mBuffers.erase(it);
723                 freeIt = mFreeBuffers.erase(freeIt);
724             } else {
725                 ++freeIt;
726                 ALOGW("bufferpool2 inconsistent!");
727             }
728         }
729     }
730 }
731 
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)732 void Accessor::Impl::BufferPool::invalidate(
733         bool needsAck, BufferId from, BufferId to,
734         const std::shared_ptr<Accessor::Impl> &impl) {
735     for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
736         if (isBufferInRange(from, to, *freeIt)) {
737             auto it = mBuffers.find(*freeIt);
738             if (it != mBuffers.end() &&
739                 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
740                 mStats.onBufferEvicted(it->second->mAllocSize);
741                 mBuffers.erase(it);
742                 freeIt = mFreeBuffers.erase(freeIt);
743                 continue;
744             } else {
745                 ALOGW("bufferpool2 inconsistent!");
746             }
747         }
748         ++freeIt;
749     }
750 
751     size_t left = 0;
752     for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
753         if (isBufferInRange(from, to, it->first)) {
754             it->second->invalidate();
755             ++left;
756         }
757     }
758     mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
759 }
760 
flush(const std::shared_ptr<Accessor::Impl> & impl)761 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
762     BufferId from = mStartSeq;
763     BufferId to = mSeq;
764     mStartSeq = mSeq;
765     // TODO: needsAck params
766     ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
767     if (from != to) {
768         invalidate(true, from, to, impl);
769     }
770 }
771 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)772 void Accessor::Impl::invalidatorThread(
773             std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
774             std::mutex &mutex,
775             std::condition_variable &cv,
776             bool &ready) {
777     while(true) {
778         std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
779         {
780             std::unique_lock<std::mutex> lock(mutex);
781             if (!ready) {
782                 cv.wait(lock);
783             }
784             copied.insert(accessors.begin(), accessors.end());
785         }
786         std::list<ConnectionId> erased;
787         for (auto it = copied.begin(); it != copied.end(); ++it) {
788             const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
789             if (!impl) {
790                 erased.push_back(it->first);
791             } else {
792                 impl->handleInvalidateAck();
793             }
794         }
795         {
796             std::unique_lock<std::mutex> lock(mutex);
797             for (auto it = erased.begin(); it != erased.end(); ++it) {
798                 accessors.erase(*it);
799             }
800             if (accessors.size() == 0) {
801                 ready = false;
802             } else {
803                 // prevent draining cpu.
804                 lock.unlock();
805                 std::this_thread::yield();
806             }
807         }
808     }
809 }
810 
AccessorInvalidator()811 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
812     std::thread invalidator(
813             invalidatorThread,
814             std::ref(mAccessors),
815             std::ref(mMutex),
816             std::ref(mCv),
817             std::ref(mReady));
818     invalidator.detach();
819 }
820 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)821 void Accessor::Impl::AccessorInvalidator::addAccessor(
822         uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
823     bool notify = false;
824     std::unique_lock<std::mutex> lock(mMutex);
825     if (mAccessors.find(accessorId) == mAccessors.end()) {
826         if (!mReady) {
827             mReady = true;
828             notify = true;
829         }
830         mAccessors.insert(std::make_pair(accessorId, impl));
831         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
832     }
833     lock.unlock();
834     if (notify) {
835         mCv.notify_one();
836     }
837 }
838 
delAccessor(uint32_t accessorId)839 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
840     std::lock_guard<std::mutex> lock(mMutex);
841     mAccessors.erase(accessorId);
842     ALOGV("buffer invalidation deleted bp:%u", accessorId);
843     if (mAccessors.size() == 0) {
844         mReady = false;
845     }
846 }
847 
848 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
849 
createInvalidator()850 void Accessor::Impl::createInvalidator() {
851     if (!sInvalidator) {
852         sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
853     }
854 }
855 
856 }  // namespace implementation
857 }  // namespace V2_0
858 }  // namespace bufferpool
859 }  // namespace media
860 }  // namespace hardware
861 }  // namespace android
862