1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include <thread>
25 #include "AccessorImpl.h"
26 #include "Connection.h"
27
28 namespace android {
29 namespace hardware {
30 namespace media {
31 namespace bufferpool {
32 namespace V2_0 {
33 namespace implementation {
34
35 namespace {
36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38
39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40 static constexpr size_t kMinBufferCountForEviction = 40;
41 }
42
43 // Buffer structure in bufferpool process
44 struct InternalBuffer {
45 BufferId mId;
46 size_t mOwnerCount;
47 size_t mTransactionCount;
48 const std::shared_ptr<BufferPoolAllocation> mAllocation;
49 const size_t mAllocSize;
50 const std::vector<uint8_t> mConfig;
51 bool mInvalidated;
52
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer53 InternalBuffer(
54 BufferId id,
55 const std::shared_ptr<BufferPoolAllocation> &alloc,
56 const size_t allocSize,
57 const std::vector<uint8_t> &allocConfig)
58 : mId(id), mOwnerCount(0), mTransactionCount(0),
59 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60 mInvalidated(false) {}
61
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer62 const native_handle_t *handle() {
63 return mAllocation->handle();
64 }
65
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer66 void invalidate() {
67 mInvalidated = true;
68 }
69 };
70
71 struct TransactionStatus {
72 TransactionId mId;
73 BufferId mBufferId;
74 ConnectionId mSender;
75 ConnectionId mReceiver;
76 BufferStatus mStatus;
77 int64_t mTimestampUs;
78 bool mSenderValidated;
79
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus80 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81 mId = message.transactionId;
82 mBufferId = message.bufferId;
83 mStatus = message.newStatus;
84 mTimestampUs = timestampUs;
85 if (mStatus == BufferStatus::TRANSFER_TO) {
86 mSender = message.connectionId;
87 mReceiver = message.targetConnectionId;
88 mSenderValidated = true;
89 } else {
90 mSender = -1LL;
91 mReceiver = message.connectionId;
92 mSenderValidated = false;
93 }
94 }
95 };
96
97 // Helper template methods for handling map of set.
98 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)99 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100 auto iter = mapOfSet->find(key);
101 if (iter == mapOfSet->end()) {
102 std::set<U> valueSet{value};
103 mapOfSet->insert(std::make_pair(key, valueSet));
104 return true;
105 } else if (iter->second.find(value) == iter->second.end()) {
106 iter->second.insert(value);
107 return true;
108 }
109 return false;
110 }
111
112 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)113 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114 bool ret = false;
115 auto iter = mapOfSet->find(key);
116 if (iter != mapOfSet->end()) {
117 if (iter->second.erase(value) > 0) {
118 ret = true;
119 }
120 if (iter->second.size() == 0) {
121 mapOfSet->erase(iter);
122 }
123 }
124 return ret;
125 }
126
127 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)128 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129 auto iter = mapOfSet->find(key);
130 if (iter != mapOfSet->end()) {
131 auto setIter = iter->second.find(value);
132 return setIter != iter->second.end();
133 }
134 return false;
135 }
136
137 int32_t Accessor::Impl::sPid = getpid();
138 uint32_t Accessor::Impl::sSeqId = time(nullptr);
139
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)140 Accessor::Impl::Impl(
141 const std::shared_ptr<BufferPoolAllocator> &allocator)
142 : mAllocator(allocator) {}
143
~Impl()144 Accessor::Impl::~Impl() {
145 }
146
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)147 ResultStatus Accessor::Impl::connect(
148 const sp<Accessor> &accessor, const sp<IObserver> &observer,
149 sp<Connection> *connection,
150 ConnectionId *pConnectionId,
151 uint32_t *pMsgId,
152 const StatusDescriptor** statusDescPtr,
153 const InvalidationDescriptor** invDescPtr) {
154 sp<Connection> newConnection = new Connection();
155 ResultStatus status = ResultStatus::CRITICAL_ERROR;
156 {
157 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158 if (newConnection) {
159 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
160 status = mBufferPool.mObserver.open(id, statusDescPtr);
161 if (status == ResultStatus::OK) {
162 newConnection->initialize(accessor, id);
163 *connection = newConnection;
164 *pConnectionId = id;
165 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
166 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
167 mBufferPool.mInvalidation.onConnect(id, observer);
168 ++sSeqId;
169 }
170
171 }
172 mBufferPool.processStatusMessages();
173 mBufferPool.cleanUp();
174 }
175 return status;
176 }
177
close(ConnectionId connectionId)178 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
179 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
180 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
181 mBufferPool.processStatusMessages();
182 mBufferPool.handleClose(connectionId);
183 mBufferPool.mObserver.close(connectionId);
184 mBufferPool.mInvalidation.onClose(connectionId);
185 // Since close# will be called after all works are finished, it is OK to
186 // evict unused buffers.
187 mBufferPool.cleanUp(true);
188 return ResultStatus::OK;
189 }
190
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)191 ResultStatus Accessor::Impl::allocate(
192 ConnectionId connectionId, const std::vector<uint8_t>& params,
193 BufferId *bufferId, const native_handle_t** handle) {
194 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
195 mBufferPool.processStatusMessages();
196 ResultStatus status = ResultStatus::OK;
197 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
198 lock.unlock();
199 std::shared_ptr<BufferPoolAllocation> alloc;
200 size_t allocSize;
201 status = mAllocator->allocate(params, &alloc, &allocSize);
202 lock.lock();
203 if (status == ResultStatus::OK) {
204 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
205 }
206 ALOGV("create a buffer %d : %u %p",
207 status == ResultStatus::OK, *bufferId, *handle);
208 }
209 if (status == ResultStatus::OK) {
210 // TODO: handle ownBuffer failure
211 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
212 }
213 mBufferPool.cleanUp();
214 return status;
215 }
216
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)217 ResultStatus Accessor::Impl::fetch(
218 ConnectionId connectionId, TransactionId transactionId,
219 BufferId bufferId, const native_handle_t** handle) {
220 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
221 mBufferPool.processStatusMessages();
222 auto found = mBufferPool.mTransactions.find(transactionId);
223 if (found != mBufferPool.mTransactions.end() &&
224 contains(&mBufferPool.mPendingTransactions,
225 connectionId, transactionId)) {
226 if (found->second->mSenderValidated &&
227 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
228 found->second->mBufferId == bufferId) {
229 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
230 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
231 if (bufferIt != mBufferPool.mBuffers.end()) {
232 mBufferPool.mStats.onBufferFetched();
233 *handle = bufferIt->second->handle();
234 return ResultStatus::OK;
235 }
236 }
237 }
238 mBufferPool.cleanUp();
239 return ResultStatus::CRITICAL_ERROR;
240 }
241
cleanUp(bool clearCache)242 void Accessor::Impl::cleanUp(bool clearCache) {
243 // transaction timeout, buffer cacheing TTL handling
244 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
245 mBufferPool.processStatusMessages();
246 mBufferPool.cleanUp(clearCache);
247 }
248
flush()249 void Accessor::Impl::flush() {
250 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
251 mBufferPool.processStatusMessages();
252 mBufferPool.flush(shared_from_this());
253 }
254
handleInvalidateAck()255 void Accessor::Impl::handleInvalidateAck() {
256 std::map<ConnectionId, const sp<IObserver>> observers;
257 uint32_t invalidationId;
258 {
259 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
260 mBufferPool.processStatusMessages();
261 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
262 }
263 // Do not hold lock for send invalidations
264 size_t deadClients = 0;
265 for (auto it = observers.begin(); it != observers.end(); ++it) {
266 const sp<IObserver> observer = it->second;
267 if (observer) {
268 Return<void> transResult = observer->onMessage(it->first, invalidationId);
269 if (!transResult.isOk()) {
270 ++deadClients;
271 }
272 }
273 }
274 if (deadClients > 0) {
275 ALOGD("During invalidation found %zu dead clients", deadClients);
276 }
277 }
278
isValid()279 bool Accessor::Impl::isValid() {
280 return mBufferPool.isValid();
281 }
282
BufferPool()283 Accessor::Impl::Impl::BufferPool::BufferPool()
284 : mTimestampUs(getTimestampNow()),
285 mLastCleanUpUs(mTimestampUs),
286 mLastLogUs(mTimestampUs),
287 mSeq(0),
288 mStartSeq(0) {
289 mValid = mInvalidationChannel.isValid();
290 }
291
292
293 // Statistics helper
294 template<typename T, typename S>
percentage(T base,S total)295 int percentage(T base, S total) {
296 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
297 }
298
299 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
300
~BufferPool()301 Accessor::Impl::Impl::BufferPool::~BufferPool() {
302 std::lock_guard<std::mutex> lock(mMutex);
303 ALOGD("Destruction - bufferpool2 %p "
304 "cached: %zu/%zuM, %zu/%d%% in use; "
305 "allocs: %zu, %d%% recycled; "
306 "transfers: %zu, %d%% unfetced",
307 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
308 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
309 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
310 mStats.mTotalTransfers,
311 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
312 }
313
onConnect(ConnectionId conId,const sp<IObserver> & observer)314 void Accessor::Impl::BufferPool::Invalidation::onConnect(
315 ConnectionId conId, const sp<IObserver>& observer) {
316 mAcks[conId] = mInvalidationId; // starts from current invalidationId
317 mObservers.insert(std::make_pair(conId, observer));
318 }
319
onClose(ConnectionId conId)320 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
321 mAcks.erase(conId);
322 mObservers.erase(conId);
323 }
324
onAck(ConnectionId conId,uint32_t msgId)325 void Accessor::Impl::BufferPool::Invalidation::onAck(
326 ConnectionId conId,
327 uint32_t msgId) {
328 auto it = mAcks.find(conId);
329 if (it == mAcks.end()) {
330 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
331 return;
332 }
333 if (isMessageLater(msgId, it->second)) {
334 mAcks[conId] = msgId;
335 }
336 }
337
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)338 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
339 BufferId bufferId,
340 BufferInvalidationChannel &channel) {
341 for (auto it = mPendings.begin(); it != mPendings.end();) {
342 if (it->isInvalidated(bufferId)) {
343 uint32_t msgId = 0;
344 if (it->mNeedsAck) {
345 msgId = ++mInvalidationId;
346 if (msgId == 0) {
347 // wrap happens
348 msgId = ++mInvalidationId;
349 }
350 }
351 channel.postInvalidation(msgId, it->mFrom, it->mTo);
352 it = mPendings.erase(it);
353 continue;
354 }
355 ++it;
356 }
357 }
358
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)359 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
360 bool needsAck,
361 uint32_t from,
362 uint32_t to,
363 size_t left,
364 BufferInvalidationChannel &channel,
365 const std::shared_ptr<Accessor::Impl> &impl) {
366 uint32_t msgId = 0;
367 if (needsAck) {
368 msgId = ++mInvalidationId;
369 if (msgId == 0) {
370 // wrap happens
371 msgId = ++mInvalidationId;
372 }
373 }
374 ALOGV("bufferpool2 invalidation requested and queued");
375 if (left == 0) {
376 channel.postInvalidation(msgId, from, to);
377 } else {
378 // TODO: sending hint message?
379 ALOGV("bufferpoo2 invalidation requested and pending");
380 Pending pending(needsAck, from, to, left, impl);
381 mPendings.push_back(pending);
382 }
383 sInvalidator->addAccessor(mId, impl);
384 }
385
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)386 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
387 std::map<ConnectionId, const sp<IObserver>> *observers,
388 uint32_t *invalidationId) {
389 if (mInvalidationId != 0) {
390 *invalidationId = mInvalidationId;
391 std::set<int> deads;
392 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
393 if (it->second != mInvalidationId) {
394 const sp<IObserver> observer = mObservers[it->first];
395 if (observer) {
396 observers->emplace(it->first, observer);
397 ALOGV("connection %lld will call observer (%u: %u)",
398 (long long)it->first, it->second, mInvalidationId);
399 // N.B: onMessage will be called later. ignore possibility of
400 // onMessage# oneway call being lost.
401 it->second = mInvalidationId;
402 } else {
403 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
404 deads.insert(it->first);
405 }
406 }
407 }
408 if (deads.size() > 0) {
409 for (auto it = deads.begin(); it != deads.end(); ++it) {
410 onClose(*it);
411 }
412 }
413 }
414 if (mPendings.size() == 0) {
415 // All invalidation Ids are synced and no more pending invalidations.
416 sInvalidator->delAccessor(mId);
417 }
418 }
419
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)420 bool Accessor::Impl::BufferPool::handleOwnBuffer(
421 ConnectionId connectionId, BufferId bufferId) {
422
423 bool added = insert(&mUsingBuffers, connectionId, bufferId);
424 if (added) {
425 auto iter = mBuffers.find(bufferId);
426 iter->second->mOwnerCount++;
427 }
428 insert(&mUsingConnections, bufferId, connectionId);
429 return added;
430 }
431
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)432 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
433 ConnectionId connectionId, BufferId bufferId) {
434 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
435 if (deleted) {
436 auto iter = mBuffers.find(bufferId);
437 iter->second->mOwnerCount--;
438 if (iter->second->mOwnerCount == 0 &&
439 iter->second->mTransactionCount == 0) {
440 if (!iter->second->mInvalidated) {
441 mStats.onBufferUnused(iter->second->mAllocSize);
442 mFreeBuffers.insert(bufferId);
443 } else {
444 mStats.onBufferUnused(iter->second->mAllocSize);
445 mStats.onBufferEvicted(iter->second->mAllocSize);
446 mBuffers.erase(iter);
447 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
448 }
449 }
450 }
451 erase(&mUsingConnections, bufferId, connectionId);
452 ALOGV("release buffer %u : %d", bufferId, deleted);
453 return deleted;
454 }
455
handleTransferTo(const BufferStatusMessage & message)456 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
457 auto completed = mCompletedTransactions.find(
458 message.transactionId);
459 if (completed != mCompletedTransactions.end()) {
460 // already completed
461 mCompletedTransactions.erase(completed);
462 return true;
463 }
464 // the buffer should exist and be owned.
465 auto bufferIter = mBuffers.find(message.bufferId);
466 if (bufferIter == mBuffers.end() ||
467 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
468 return false;
469 }
470 auto found = mTransactions.find(message.transactionId);
471 if (found != mTransactions.end()) {
472 // transfer_from was received earlier.
473 found->second->mSender = message.connectionId;
474 found->second->mSenderValidated = true;
475 return true;
476 }
477 // TODO: verify there is target connection Id
478 mStats.onBufferSent();
479 mTransactions.insert(std::make_pair(
480 message.transactionId,
481 std::make_unique<TransactionStatus>(message, mTimestampUs)));
482 insert(&mPendingTransactions, message.targetConnectionId,
483 message.transactionId);
484 bufferIter->second->mTransactionCount++;
485 return true;
486 }
487
handleTransferFrom(const BufferStatusMessage & message)488 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
489 auto found = mTransactions.find(message.transactionId);
490 if (found == mTransactions.end()) {
491 // TODO: is it feasible to check ownership here?
492 mStats.onBufferSent();
493 mTransactions.insert(std::make_pair(
494 message.transactionId,
495 std::make_unique<TransactionStatus>(message, mTimestampUs)));
496 insert(&mPendingTransactions, message.connectionId,
497 message.transactionId);
498 auto bufferIter = mBuffers.find(message.bufferId);
499 bufferIter->second->mTransactionCount++;
500 } else {
501 if (message.connectionId == found->second->mReceiver) {
502 found->second->mStatus = BufferStatus::TRANSFER_FROM;
503 }
504 }
505 return true;
506 }
507
handleTransferResult(const BufferStatusMessage & message)508 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
509 auto found = mTransactions.find(message.transactionId);
510 if (found != mTransactions.end()) {
511 bool deleted = erase(&mPendingTransactions, message.connectionId,
512 message.transactionId);
513 if (deleted) {
514 if (!found->second->mSenderValidated) {
515 mCompletedTransactions.insert(message.transactionId);
516 }
517 auto bufferIter = mBuffers.find(message.bufferId);
518 if (message.newStatus == BufferStatus::TRANSFER_OK) {
519 handleOwnBuffer(message.connectionId, message.bufferId);
520 }
521 bufferIter->second->mTransactionCount--;
522 if (bufferIter->second->mOwnerCount == 0
523 && bufferIter->second->mTransactionCount == 0) {
524 if (!bufferIter->second->mInvalidated) {
525 mStats.onBufferUnused(bufferIter->second->mAllocSize);
526 mFreeBuffers.insert(message.bufferId);
527 } else {
528 mStats.onBufferUnused(bufferIter->second->mAllocSize);
529 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
530 mBuffers.erase(bufferIter);
531 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
532 }
533 }
534 mTransactions.erase(found);
535 }
536 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
537 message.bufferId, deleted);
538 return deleted;
539 }
540 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
541 message.bufferId);
542 return false;
543 }
544
processStatusMessages()545 void Accessor::Impl::BufferPool::processStatusMessages() {
546 std::vector<BufferStatusMessage> messages;
547 mObserver.getBufferStatusChanges(messages);
548 mTimestampUs = getTimestampNow();
549 for (BufferStatusMessage& message: messages) {
550 bool ret = false;
551 switch (message.newStatus) {
552 case BufferStatus::NOT_USED:
553 ret = handleReleaseBuffer(
554 message.connectionId, message.bufferId);
555 break;
556 case BufferStatus::USED:
557 // not happening
558 break;
559 case BufferStatus::TRANSFER_TO:
560 ret = handleTransferTo(message);
561 break;
562 case BufferStatus::TRANSFER_FROM:
563 ret = handleTransferFrom(message);
564 break;
565 case BufferStatus::TRANSFER_TIMEOUT:
566 // TODO
567 break;
568 case BufferStatus::TRANSFER_LOST:
569 // TODO
570 break;
571 case BufferStatus::TRANSFER_FETCH:
572 // not happening
573 break;
574 case BufferStatus::TRANSFER_OK:
575 case BufferStatus::TRANSFER_ERROR:
576 ret = handleTransferResult(message);
577 break;
578 case BufferStatus::INVALIDATION_ACK:
579 mInvalidation.onAck(message.connectionId, message.bufferId);
580 ret = true;
581 break;
582 }
583 if (ret == false) {
584 ALOGW("buffer status message processing failure - message : %d connection : %lld",
585 message.newStatus, (long long)message.connectionId);
586 }
587 }
588 messages.clear();
589 }
590
handleClose(ConnectionId connectionId)591 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
592 // Cleaning buffers
593 auto buffers = mUsingBuffers.find(connectionId);
594 if (buffers != mUsingBuffers.end()) {
595 for (const BufferId& bufferId : buffers->second) {
596 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
597 if (deleted) {
598 auto bufferIter = mBuffers.find(bufferId);
599 bufferIter->second->mOwnerCount--;
600 if (bufferIter->second->mOwnerCount == 0 &&
601 bufferIter->second->mTransactionCount == 0) {
602 // TODO: handle freebuffer insert fail
603 if (!bufferIter->second->mInvalidated) {
604 mStats.onBufferUnused(bufferIter->second->mAllocSize);
605 mFreeBuffers.insert(bufferId);
606 } else {
607 mStats.onBufferUnused(bufferIter->second->mAllocSize);
608 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
609 mBuffers.erase(bufferIter);
610 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
611 }
612 }
613 }
614 }
615 mUsingBuffers.erase(buffers);
616 }
617
618 // Cleaning transactions
619 auto pending = mPendingTransactions.find(connectionId);
620 if (pending != mPendingTransactions.end()) {
621 for (const TransactionId& transactionId : pending->second) {
622 auto iter = mTransactions.find(transactionId);
623 if (iter != mTransactions.end()) {
624 if (!iter->second->mSenderValidated) {
625 mCompletedTransactions.insert(transactionId);
626 }
627 BufferId bufferId = iter->second->mBufferId;
628 auto bufferIter = mBuffers.find(bufferId);
629 bufferIter->second->mTransactionCount--;
630 if (bufferIter->second->mOwnerCount == 0 &&
631 bufferIter->second->mTransactionCount == 0) {
632 // TODO: handle freebuffer insert fail
633 if (!bufferIter->second->mInvalidated) {
634 mStats.onBufferUnused(bufferIter->second->mAllocSize);
635 mFreeBuffers.insert(bufferId);
636 } else {
637 mStats.onBufferUnused(bufferIter->second->mAllocSize);
638 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
639 mBuffers.erase(bufferIter);
640 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
641 }
642 }
643 mTransactions.erase(iter);
644 }
645 }
646 }
647 return true;
648 }
649
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)650 bool Accessor::Impl::BufferPool::getFreeBuffer(
651 const std::shared_ptr<BufferPoolAllocator> &allocator,
652 const std::vector<uint8_t> ¶ms, BufferId *pId,
653 const native_handle_t** handle) {
654 auto bufferIt = mFreeBuffers.begin();
655 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
656 BufferId bufferId = *bufferIt;
657 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
658 break;
659 }
660 }
661 if (bufferIt != mFreeBuffers.end()) {
662 BufferId id = *bufferIt;
663 mFreeBuffers.erase(bufferIt);
664 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
665 *handle = mBuffers[id]->handle();
666 *pId = id;
667 ALOGV("recycle a buffer %u %p", id, *handle);
668 return true;
669 }
670 return false;
671 }
672
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)673 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
674 const std::shared_ptr<BufferPoolAllocation> &alloc,
675 const size_t allocSize,
676 const std::vector<uint8_t> ¶ms,
677 BufferId *pId,
678 const native_handle_t** handle) {
679
680 BufferId bufferId = mSeq++;
681 if (mSeq == Connection::SYNC_BUFFERID) {
682 mSeq = 0;
683 }
684 std::unique_ptr<InternalBuffer> buffer =
685 std::make_unique<InternalBuffer>(
686 bufferId, alloc, allocSize, params);
687 if (buffer) {
688 auto res = mBuffers.insert(std::make_pair(
689 bufferId, std::move(buffer)));
690 if (res.second) {
691 mStats.onBufferAllocated(allocSize);
692 *handle = alloc->handle();
693 *pId = bufferId;
694 return ResultStatus::OK;
695 }
696 }
697 return ResultStatus::NO_MEMORY;
698 }
699
cleanUp(bool clearCache)700 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
701 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
702 mLastCleanUpUs = mTimestampUs;
703 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
704 mLastLogUs = mTimestampUs;
705 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
706 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
707 "%zu/%zu (fetch/transfer)",
708 this, mStats.mBuffersCached, mStats.mSizeCached,
709 mStats.mBuffersInUse, mStats.mSizeInUse,
710 mStats.mTotalRecycles, mStats.mTotalAllocations,
711 mStats.mTotalFetches, mStats.mTotalTransfers);
712 }
713 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
714 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
715 && mBuffers.size() < kMinBufferCountForEviction) {
716 break;
717 }
718 auto it = mBuffers.find(*freeIt);
719 if (it != mBuffers.end() &&
720 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
721 mStats.onBufferEvicted(it->second->mAllocSize);
722 mBuffers.erase(it);
723 freeIt = mFreeBuffers.erase(freeIt);
724 } else {
725 ++freeIt;
726 ALOGW("bufferpool2 inconsistent!");
727 }
728 }
729 }
730 }
731
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)732 void Accessor::Impl::BufferPool::invalidate(
733 bool needsAck, BufferId from, BufferId to,
734 const std::shared_ptr<Accessor::Impl> &impl) {
735 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
736 if (isBufferInRange(from, to, *freeIt)) {
737 auto it = mBuffers.find(*freeIt);
738 if (it != mBuffers.end() &&
739 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
740 mStats.onBufferEvicted(it->second->mAllocSize);
741 mBuffers.erase(it);
742 freeIt = mFreeBuffers.erase(freeIt);
743 continue;
744 } else {
745 ALOGW("bufferpool2 inconsistent!");
746 }
747 }
748 ++freeIt;
749 }
750
751 size_t left = 0;
752 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
753 if (isBufferInRange(from, to, it->first)) {
754 it->second->invalidate();
755 ++left;
756 }
757 }
758 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
759 }
760
flush(const std::shared_ptr<Accessor::Impl> & impl)761 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
762 BufferId from = mStartSeq;
763 BufferId to = mSeq;
764 mStartSeq = mSeq;
765 // TODO: needsAck params
766 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
767 if (from != to) {
768 invalidate(true, from, to, impl);
769 }
770 }
771
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)772 void Accessor::Impl::invalidatorThread(
773 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
774 std::mutex &mutex,
775 std::condition_variable &cv,
776 bool &ready) {
777 while(true) {
778 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
779 {
780 std::unique_lock<std::mutex> lock(mutex);
781 if (!ready) {
782 cv.wait(lock);
783 }
784 copied.insert(accessors.begin(), accessors.end());
785 }
786 std::list<ConnectionId> erased;
787 for (auto it = copied.begin(); it != copied.end(); ++it) {
788 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
789 if (!impl) {
790 erased.push_back(it->first);
791 } else {
792 impl->handleInvalidateAck();
793 }
794 }
795 {
796 std::unique_lock<std::mutex> lock(mutex);
797 for (auto it = erased.begin(); it != erased.end(); ++it) {
798 accessors.erase(*it);
799 }
800 if (accessors.size() == 0) {
801 ready = false;
802 } else {
803 // prevent draining cpu.
804 lock.unlock();
805 std::this_thread::yield();
806 }
807 }
808 }
809 }
810
AccessorInvalidator()811 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
812 std::thread invalidator(
813 invalidatorThread,
814 std::ref(mAccessors),
815 std::ref(mMutex),
816 std::ref(mCv),
817 std::ref(mReady));
818 invalidator.detach();
819 }
820
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)821 void Accessor::Impl::AccessorInvalidator::addAccessor(
822 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
823 bool notify = false;
824 std::unique_lock<std::mutex> lock(mMutex);
825 if (mAccessors.find(accessorId) == mAccessors.end()) {
826 if (!mReady) {
827 mReady = true;
828 notify = true;
829 }
830 mAccessors.insert(std::make_pair(accessorId, impl));
831 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
832 }
833 lock.unlock();
834 if (notify) {
835 mCv.notify_one();
836 }
837 }
838
delAccessor(uint32_t accessorId)839 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
840 std::lock_guard<std::mutex> lock(mMutex);
841 mAccessors.erase(accessorId);
842 ALOGV("buffer invalidation deleted bp:%u", accessorId);
843 if (mAccessors.size() == 0) {
844 mReady = false;
845 }
846 }
847
848 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
849
createInvalidator()850 void Accessor::Impl::createInvalidator() {
851 if (!sInvalidator) {
852 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
853 }
854 }
855
856 } // namespace implementation
857 } // namespace V2_0
858 } // namespace bufferpool
859 } // namespace media
860 } // namespace hardware
861 } // namespace android
862