1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor2.0"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "AccessorImpl.h"
27 #include "Connection.h"
28
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35
36 namespace {
37 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
38 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
39
40 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
41 static constexpr size_t kMinBufferCountForEviction = 25;
42
43 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
44 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
45 }
46
47 // Buffer structure in bufferpool process
48 struct InternalBuffer {
49 BufferId mId;
50 size_t mOwnerCount;
51 size_t mTransactionCount;
52 const std::shared_ptr<BufferPoolAllocation> mAllocation;
53 const size_t mAllocSize;
54 const std::vector<uint8_t> mConfig;
55 bool mInvalidated;
56
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer57 InternalBuffer(
58 BufferId id,
59 const std::shared_ptr<BufferPoolAllocation> &alloc,
60 const size_t allocSize,
61 const std::vector<uint8_t> &allocConfig)
62 : mId(id), mOwnerCount(0), mTransactionCount(0),
63 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
64 mInvalidated(false) {}
65
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer66 const native_handle_t *handle() {
67 return mAllocation->handle();
68 }
69
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer70 void invalidate() {
71 mInvalidated = true;
72 }
73 };
74
75 struct TransactionStatus {
76 TransactionId mId;
77 BufferId mBufferId;
78 ConnectionId mSender;
79 ConnectionId mReceiver;
80 BufferStatus mStatus;
81 int64_t mTimestampUs;
82 bool mSenderValidated;
83
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus84 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
85 mId = message.transactionId;
86 mBufferId = message.bufferId;
87 mStatus = message.newStatus;
88 mTimestampUs = timestampUs;
89 if (mStatus == BufferStatus::TRANSFER_TO) {
90 mSender = message.connectionId;
91 mReceiver = message.targetConnectionId;
92 mSenderValidated = true;
93 } else {
94 mSender = -1LL;
95 mReceiver = message.connectionId;
96 mSenderValidated = false;
97 }
98 }
99 };
100
101 // Helper template methods for handling map of set.
102 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)103 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
104 auto iter = mapOfSet->find(key);
105 if (iter == mapOfSet->end()) {
106 std::set<U> valueSet{value};
107 mapOfSet->insert(std::make_pair(key, valueSet));
108 return true;
109 } else if (iter->second.find(value) == iter->second.end()) {
110 iter->second.insert(value);
111 return true;
112 }
113 return false;
114 }
115
116 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)117 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
118 bool ret = false;
119 auto iter = mapOfSet->find(key);
120 if (iter != mapOfSet->end()) {
121 if (iter->second.erase(value) > 0) {
122 ret = true;
123 }
124 if (iter->second.size() == 0) {
125 mapOfSet->erase(iter);
126 }
127 }
128 return ret;
129 }
130
131 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)132 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
133 auto iter = mapOfSet->find(key);
134 if (iter != mapOfSet->end()) {
135 auto setIter = iter->second.find(value);
136 return setIter != iter->second.end();
137 }
138 return false;
139 }
140
141 #ifdef __ANDROID_VNDK__
142 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
143 #else
144 static constexpr uint32_t kSeqIdVndkBit = 0;
145 #endif
146
147 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
148 uint32_t Accessor::Impl::sSeqId = time(nullptr) & kSeqIdMax;
149
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)150 Accessor::Impl::Impl(
151 const std::shared_ptr<BufferPoolAllocator> &allocator)
152 : mAllocator(allocator), mScheduleEvictTs(0) {}
153
~Impl()154 Accessor::Impl::~Impl() {
155 }
156
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)157 ResultStatus Accessor::Impl::connect(
158 const sp<Accessor> &accessor, const sp<IObserver> &observer,
159 sp<Connection> *connection,
160 ConnectionId *pConnectionId,
161 uint32_t *pMsgId,
162 const StatusDescriptor** statusDescPtr,
163 const InvalidationDescriptor** invDescPtr) {
164 sp<Connection> newConnection = new Connection();
165 ResultStatus status = ResultStatus::CRITICAL_ERROR;
166 {
167 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
168 if (newConnection) {
169 int32_t pid = getpid();
170 ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
171 status = mBufferPool.mObserver.open(id, statusDescPtr);
172 if (status == ResultStatus::OK) {
173 newConnection->initialize(accessor, id);
174 *connection = newConnection;
175 *pConnectionId = id;
176 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
177 mBufferPool.mConnectionIds.insert(id);
178 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
179 mBufferPool.mInvalidation.onConnect(id, observer);
180 if (sSeqId == kSeqIdMax) {
181 sSeqId = 0;
182 } else {
183 ++sSeqId;
184 }
185 }
186
187 }
188 mBufferPool.processStatusMessages();
189 mBufferPool.cleanUp();
190 scheduleEvictIfNeeded();
191 }
192 return status;
193 }
194
close(ConnectionId connectionId)195 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
196 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
197 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
198 mBufferPool.processStatusMessages();
199 mBufferPool.handleClose(connectionId);
200 mBufferPool.mObserver.close(connectionId);
201 mBufferPool.mInvalidation.onClose(connectionId);
202 // Since close# will be called after all works are finished, it is OK to
203 // evict unused buffers.
204 mBufferPool.cleanUp(true);
205 scheduleEvictIfNeeded();
206 return ResultStatus::OK;
207 }
208
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)209 ResultStatus Accessor::Impl::allocate(
210 ConnectionId connectionId, const std::vector<uint8_t>& params,
211 BufferId *bufferId, const native_handle_t** handle) {
212 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
213 mBufferPool.processStatusMessages();
214 ResultStatus status = ResultStatus::OK;
215 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
216 lock.unlock();
217 std::shared_ptr<BufferPoolAllocation> alloc;
218 size_t allocSize;
219 status = mAllocator->allocate(params, &alloc, &allocSize);
220 lock.lock();
221 if (status == ResultStatus::OK) {
222 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
223 }
224 ALOGV("create a buffer %d : %u %p",
225 status == ResultStatus::OK, *bufferId, *handle);
226 }
227 if (status == ResultStatus::OK) {
228 // TODO: handle ownBuffer failure
229 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
230 }
231 mBufferPool.cleanUp();
232 scheduleEvictIfNeeded();
233 return status;
234 }
235
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)236 ResultStatus Accessor::Impl::fetch(
237 ConnectionId connectionId, TransactionId transactionId,
238 BufferId bufferId, const native_handle_t** handle) {
239 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
240 mBufferPool.processStatusMessages();
241 auto found = mBufferPool.mTransactions.find(transactionId);
242 if (found != mBufferPool.mTransactions.end() &&
243 contains(&mBufferPool.mPendingTransactions,
244 connectionId, transactionId)) {
245 if (found->second->mSenderValidated &&
246 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
247 found->second->mBufferId == bufferId) {
248 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
249 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
250 if (bufferIt != mBufferPool.mBuffers.end()) {
251 mBufferPool.mStats.onBufferFetched();
252 *handle = bufferIt->second->handle();
253 return ResultStatus::OK;
254 }
255 }
256 }
257 mBufferPool.cleanUp();
258 scheduleEvictIfNeeded();
259 return ResultStatus::CRITICAL_ERROR;
260 }
261
cleanUp(bool clearCache)262 void Accessor::Impl::cleanUp(bool clearCache) {
263 // transaction timeout, buffer cacheing TTL handling
264 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
265 mBufferPool.processStatusMessages();
266 mBufferPool.cleanUp(clearCache);
267 }
268
flush()269 void Accessor::Impl::flush() {
270 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
271 mBufferPool.processStatusMessages();
272 mBufferPool.flush(shared_from_this());
273 }
274
handleInvalidateAck()275 void Accessor::Impl::handleInvalidateAck() {
276 std::map<ConnectionId, const sp<IObserver>> observers;
277 uint32_t invalidationId;
278 {
279 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
280 mBufferPool.processStatusMessages();
281 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
282 }
283 // Do not hold lock for send invalidations
284 size_t deadClients = 0;
285 for (auto it = observers.begin(); it != observers.end(); ++it) {
286 const sp<IObserver> observer = it->second;
287 if (observer) {
288 Return<void> transResult = observer->onMessage(it->first, invalidationId);
289 if (!transResult.isOk()) {
290 ++deadClients;
291 }
292 }
293 }
294 if (deadClients > 0) {
295 ALOGD("During invalidation found %zu dead clients", deadClients);
296 }
297 }
298
isValid()299 bool Accessor::Impl::isValid() {
300 return mBufferPool.isValid();
301 }
302
BufferPool()303 Accessor::Impl::Impl::BufferPool::BufferPool()
304 : mTimestampUs(getTimestampNow()),
305 mLastCleanUpUs(mTimestampUs),
306 mLastLogUs(mTimestampUs),
307 mSeq(0),
308 mStartSeq(0) {
309 mValid = mInvalidationChannel.isValid();
310 }
311
312
313 // Statistics helper
314 template<typename T, typename S>
percentage(T base,S total)315 int percentage(T base, S total) {
316 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
317 }
318
319 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
320
~BufferPool()321 Accessor::Impl::Impl::BufferPool::~BufferPool() {
322 std::lock_guard<std::mutex> lock(mMutex);
323 ALOGD("Destruction - bufferpool2 %p "
324 "cached: %zu/%zuM, %zu/%d%% in use; "
325 "allocs: %zu, %d%% recycled; "
326 "transfers: %zu, %d%% unfetched",
327 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
328 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
329 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
330 mStats.mTotalTransfers,
331 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
332 }
333
onConnect(ConnectionId conId,const sp<IObserver> & observer)334 void Accessor::Impl::BufferPool::Invalidation::onConnect(
335 ConnectionId conId, const sp<IObserver>& observer) {
336 mAcks[conId] = mInvalidationId; // starts from current invalidationId
337 mObservers.insert(std::make_pair(conId, observer));
338 }
339
onClose(ConnectionId conId)340 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
341 mAcks.erase(conId);
342 mObservers.erase(conId);
343 }
344
onAck(ConnectionId conId,uint32_t msgId)345 void Accessor::Impl::BufferPool::Invalidation::onAck(
346 ConnectionId conId,
347 uint32_t msgId) {
348 auto it = mAcks.find(conId);
349 if (it == mAcks.end()) {
350 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
351 return;
352 }
353 if (isMessageLater(msgId, it->second)) {
354 mAcks[conId] = msgId;
355 }
356 }
357
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)358 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
359 BufferId bufferId,
360 BufferInvalidationChannel &channel) {
361 for (auto it = mPendings.begin(); it != mPendings.end();) {
362 if (it->isInvalidated(bufferId)) {
363 uint32_t msgId = 0;
364 if (it->mNeedsAck) {
365 msgId = ++mInvalidationId;
366 if (msgId == 0) {
367 // wrap happens
368 msgId = ++mInvalidationId;
369 }
370 }
371 channel.postInvalidation(msgId, it->mFrom, it->mTo);
372 it = mPendings.erase(it);
373 continue;
374 }
375 ++it;
376 }
377 }
378
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)379 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
380 bool needsAck,
381 uint32_t from,
382 uint32_t to,
383 size_t left,
384 BufferInvalidationChannel &channel,
385 const std::shared_ptr<Accessor::Impl> &impl) {
386 uint32_t msgId = 0;
387 if (needsAck) {
388 msgId = ++mInvalidationId;
389 if (msgId == 0) {
390 // wrap happens
391 msgId = ++mInvalidationId;
392 }
393 }
394 ALOGV("bufferpool2 invalidation requested and queued");
395 if (left == 0) {
396 channel.postInvalidation(msgId, from, to);
397 } else {
398 // TODO: sending hint message?
399 ALOGV("bufferpoo2 invalidation requested and pending");
400 Pending pending(needsAck, from, to, left, impl);
401 mPendings.push_back(pending);
402 }
403 sInvalidator->addAccessor(mId, impl);
404 }
405
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)406 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
407 std::map<ConnectionId, const sp<IObserver>> *observers,
408 uint32_t *invalidationId) {
409 if (mInvalidationId != 0) {
410 *invalidationId = mInvalidationId;
411 std::set<int> deads;
412 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
413 if (it->second != mInvalidationId) {
414 const sp<IObserver> observer = mObservers[it->first];
415 if (observer) {
416 observers->emplace(it->first, observer);
417 ALOGV("connection %lld will call observer (%u: %u)",
418 (long long)it->first, it->second, mInvalidationId);
419 // N.B: onMessage will be called later. ignore possibility of
420 // onMessage# oneway call being lost.
421 it->second = mInvalidationId;
422 } else {
423 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
424 deads.insert(it->first);
425 }
426 }
427 }
428 if (deads.size() > 0) {
429 for (auto it = deads.begin(); it != deads.end(); ++it) {
430 onClose(*it);
431 }
432 }
433 }
434 if (mPendings.size() == 0) {
435 // All invalidation Ids are synced and no more pending invalidations.
436 sInvalidator->delAccessor(mId);
437 }
438 }
439
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)440 bool Accessor::Impl::BufferPool::handleOwnBuffer(
441 ConnectionId connectionId, BufferId bufferId) {
442
443 bool added = insert(&mUsingBuffers, connectionId, bufferId);
444 if (added) {
445 auto iter = mBuffers.find(bufferId);
446 iter->second->mOwnerCount++;
447 }
448 insert(&mUsingConnections, bufferId, connectionId);
449 return added;
450 }
451
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)452 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
453 ConnectionId connectionId, BufferId bufferId) {
454 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
455 if (deleted) {
456 auto iter = mBuffers.find(bufferId);
457 iter->second->mOwnerCount--;
458 if (iter->second->mOwnerCount == 0 &&
459 iter->second->mTransactionCount == 0) {
460 if (!iter->second->mInvalidated) {
461 mStats.onBufferUnused(iter->second->mAllocSize);
462 mFreeBuffers.insert(bufferId);
463 } else {
464 mStats.onBufferUnused(iter->second->mAllocSize);
465 mStats.onBufferEvicted(iter->second->mAllocSize);
466 mBuffers.erase(iter);
467 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
468 }
469 }
470 }
471 erase(&mUsingConnections, bufferId, connectionId);
472 ALOGV("release buffer %u : %d", bufferId, deleted);
473 return deleted;
474 }
475
handleTransferTo(const BufferStatusMessage & message)476 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
477 auto completed = mCompletedTransactions.find(
478 message.transactionId);
479 if (completed != mCompletedTransactions.end()) {
480 // already completed
481 mCompletedTransactions.erase(completed);
482 return true;
483 }
484 // the buffer should exist and be owned.
485 auto bufferIter = mBuffers.find(message.bufferId);
486 if (bufferIter == mBuffers.end() ||
487 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
488 return false;
489 }
490 auto found = mTransactions.find(message.transactionId);
491 if (found != mTransactions.end()) {
492 // transfer_from was received earlier.
493 found->second->mSender = message.connectionId;
494 found->second->mSenderValidated = true;
495 return true;
496 }
497 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
498 // N.B: it could be fake or receive connection already closed.
499 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
500 this, (long long)message.targetConnectionId);
501 return false;
502 }
503 mStats.onBufferSent();
504 mTransactions.insert(std::make_pair(
505 message.transactionId,
506 std::make_unique<TransactionStatus>(message, mTimestampUs)));
507 insert(&mPendingTransactions, message.targetConnectionId,
508 message.transactionId);
509 bufferIter->second->mTransactionCount++;
510 return true;
511 }
512
handleTransferFrom(const BufferStatusMessage & message)513 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
514 auto found = mTransactions.find(message.transactionId);
515 if (found == mTransactions.end()) {
516 // TODO: is it feasible to check ownership here?
517 mStats.onBufferSent();
518 mTransactions.insert(std::make_pair(
519 message.transactionId,
520 std::make_unique<TransactionStatus>(message, mTimestampUs)));
521 insert(&mPendingTransactions, message.connectionId,
522 message.transactionId);
523 auto bufferIter = mBuffers.find(message.bufferId);
524 bufferIter->second->mTransactionCount++;
525 } else {
526 if (message.connectionId == found->second->mReceiver) {
527 found->second->mStatus = BufferStatus::TRANSFER_FROM;
528 }
529 }
530 return true;
531 }
532
handleTransferResult(const BufferStatusMessage & message)533 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
534 auto found = mTransactions.find(message.transactionId);
535 if (found != mTransactions.end()) {
536 bool deleted = erase(&mPendingTransactions, message.connectionId,
537 message.transactionId);
538 if (deleted) {
539 if (!found->second->mSenderValidated) {
540 mCompletedTransactions.insert(message.transactionId);
541 }
542 auto bufferIter = mBuffers.find(message.bufferId);
543 if (message.newStatus == BufferStatus::TRANSFER_OK) {
544 handleOwnBuffer(message.connectionId, message.bufferId);
545 }
546 bufferIter->second->mTransactionCount--;
547 if (bufferIter->second->mOwnerCount == 0
548 && bufferIter->second->mTransactionCount == 0) {
549 if (!bufferIter->second->mInvalidated) {
550 mStats.onBufferUnused(bufferIter->second->mAllocSize);
551 mFreeBuffers.insert(message.bufferId);
552 } else {
553 mStats.onBufferUnused(bufferIter->second->mAllocSize);
554 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
555 mBuffers.erase(bufferIter);
556 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
557 }
558 }
559 mTransactions.erase(found);
560 }
561 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
562 message.bufferId, deleted);
563 return deleted;
564 }
565 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
566 message.bufferId);
567 return false;
568 }
569
processStatusMessages()570 void Accessor::Impl::BufferPool::processStatusMessages() {
571 std::vector<BufferStatusMessage> messages;
572 mObserver.getBufferStatusChanges(messages);
573 mTimestampUs = getTimestampNow();
574 for (BufferStatusMessage& message: messages) {
575 bool ret = false;
576 switch (message.newStatus) {
577 case BufferStatus::NOT_USED:
578 ret = handleReleaseBuffer(
579 message.connectionId, message.bufferId);
580 break;
581 case BufferStatus::USED:
582 // not happening
583 break;
584 case BufferStatus::TRANSFER_TO:
585 ret = handleTransferTo(message);
586 break;
587 case BufferStatus::TRANSFER_FROM:
588 ret = handleTransferFrom(message);
589 break;
590 case BufferStatus::TRANSFER_TIMEOUT:
591 // TODO
592 break;
593 case BufferStatus::TRANSFER_LOST:
594 // TODO
595 break;
596 case BufferStatus::TRANSFER_FETCH:
597 // not happening
598 break;
599 case BufferStatus::TRANSFER_OK:
600 case BufferStatus::TRANSFER_ERROR:
601 ret = handleTransferResult(message);
602 break;
603 case BufferStatus::INVALIDATION_ACK:
604 mInvalidation.onAck(message.connectionId, message.bufferId);
605 ret = true;
606 break;
607 }
608 if (ret == false) {
609 ALOGW("buffer status message processing failure - message : %d connection : %lld",
610 message.newStatus, (long long)message.connectionId);
611 }
612 }
613 messages.clear();
614 }
615
handleClose(ConnectionId connectionId)616 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
617 // Cleaning buffers
618 auto buffers = mUsingBuffers.find(connectionId);
619 if (buffers != mUsingBuffers.end()) {
620 for (const BufferId& bufferId : buffers->second) {
621 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
622 if (deleted) {
623 auto bufferIter = mBuffers.find(bufferId);
624 bufferIter->second->mOwnerCount--;
625 if (bufferIter->second->mOwnerCount == 0 &&
626 bufferIter->second->mTransactionCount == 0) {
627 // TODO: handle freebuffer insert fail
628 if (!bufferIter->second->mInvalidated) {
629 mStats.onBufferUnused(bufferIter->second->mAllocSize);
630 mFreeBuffers.insert(bufferId);
631 } else {
632 mStats.onBufferUnused(bufferIter->second->mAllocSize);
633 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
634 mBuffers.erase(bufferIter);
635 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
636 }
637 }
638 }
639 }
640 mUsingBuffers.erase(buffers);
641 }
642
643 // Cleaning transactions
644 auto pending = mPendingTransactions.find(connectionId);
645 if (pending != mPendingTransactions.end()) {
646 for (const TransactionId& transactionId : pending->second) {
647 auto iter = mTransactions.find(transactionId);
648 if (iter != mTransactions.end()) {
649 if (!iter->second->mSenderValidated) {
650 mCompletedTransactions.insert(transactionId);
651 }
652 BufferId bufferId = iter->second->mBufferId;
653 auto bufferIter = mBuffers.find(bufferId);
654 bufferIter->second->mTransactionCount--;
655 if (bufferIter->second->mOwnerCount == 0 &&
656 bufferIter->second->mTransactionCount == 0) {
657 // TODO: handle freebuffer insert fail
658 if (!bufferIter->second->mInvalidated) {
659 mStats.onBufferUnused(bufferIter->second->mAllocSize);
660 mFreeBuffers.insert(bufferId);
661 } else {
662 mStats.onBufferUnused(bufferIter->second->mAllocSize);
663 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
664 mBuffers.erase(bufferIter);
665 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
666 }
667 }
668 mTransactions.erase(iter);
669 }
670 }
671 }
672 mConnectionIds.erase(connectionId);
673 return true;
674 }
675
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)676 bool Accessor::Impl::BufferPool::getFreeBuffer(
677 const std::shared_ptr<BufferPoolAllocator> &allocator,
678 const std::vector<uint8_t> ¶ms, BufferId *pId,
679 const native_handle_t** handle) {
680 auto bufferIt = mFreeBuffers.begin();
681 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
682 BufferId bufferId = *bufferIt;
683 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
684 break;
685 }
686 }
687 if (bufferIt != mFreeBuffers.end()) {
688 BufferId id = *bufferIt;
689 mFreeBuffers.erase(bufferIt);
690 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
691 *handle = mBuffers[id]->handle();
692 *pId = id;
693 ALOGV("recycle a buffer %u %p", id, *handle);
694 return true;
695 }
696 return false;
697 }
698
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)699 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
700 const std::shared_ptr<BufferPoolAllocation> &alloc,
701 const size_t allocSize,
702 const std::vector<uint8_t> ¶ms,
703 BufferId *pId,
704 const native_handle_t** handle) {
705
706 BufferId bufferId = mSeq++;
707 if (mSeq == Connection::SYNC_BUFFERID) {
708 mSeq = 0;
709 }
710 std::unique_ptr<InternalBuffer> buffer =
711 std::make_unique<InternalBuffer>(
712 bufferId, alloc, allocSize, params);
713 if (buffer) {
714 auto res = mBuffers.insert(std::make_pair(
715 bufferId, std::move(buffer)));
716 if (res.second) {
717 mStats.onBufferAllocated(allocSize);
718 *handle = alloc->handle();
719 *pId = bufferId;
720 return ResultStatus::OK;
721 }
722 }
723 return ResultStatus::NO_MEMORY;
724 }
725
cleanUp(bool clearCache)726 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
727 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
728 mLastCleanUpUs = mTimestampUs;
729 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
730 mLastLogUs = mTimestampUs;
731 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
732 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
733 "%zu/%zu (fetch/transfer)",
734 this, mStats.mBuffersCached, mStats.mSizeCached,
735 mStats.mBuffersInUse, mStats.mSizeInUse,
736 mStats.mTotalRecycles, mStats.mTotalAllocations,
737 mStats.mTotalFetches, mStats.mTotalTransfers);
738 }
739 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
740 if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
741 || mBuffers.size() < kMinBufferCountForEviction)) {
742 break;
743 }
744 auto it = mBuffers.find(*freeIt);
745 if (it != mBuffers.end() &&
746 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
747 mStats.onBufferEvicted(it->second->mAllocSize);
748 mBuffers.erase(it);
749 freeIt = mFreeBuffers.erase(freeIt);
750 } else {
751 ++freeIt;
752 ALOGW("bufferpool2 inconsistent!");
753 }
754 }
755 }
756 }
757
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)758 void Accessor::Impl::BufferPool::invalidate(
759 bool needsAck, BufferId from, BufferId to,
760 const std::shared_ptr<Accessor::Impl> &impl) {
761 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
762 if (isBufferInRange(from, to, *freeIt)) {
763 auto it = mBuffers.find(*freeIt);
764 if (it != mBuffers.end() &&
765 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
766 mStats.onBufferEvicted(it->second->mAllocSize);
767 mBuffers.erase(it);
768 freeIt = mFreeBuffers.erase(freeIt);
769 continue;
770 } else {
771 ALOGW("bufferpool2 inconsistent!");
772 }
773 }
774 ++freeIt;
775 }
776
777 size_t left = 0;
778 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
779 if (isBufferInRange(from, to, it->first)) {
780 it->second->invalidate();
781 ++left;
782 }
783 }
784 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
785 }
786
flush(const std::shared_ptr<Accessor::Impl> & impl)787 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
788 BufferId from = mStartSeq;
789 BufferId to = mSeq;
790 mStartSeq = mSeq;
791 // TODO: needsAck params
792 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
793 if (from != to) {
794 invalidate(true, from, to, impl);
795 }
796 }
797
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)798 void Accessor::Impl::invalidatorThread(
799 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
800 std::mutex &mutex,
801 std::condition_variable &cv,
802 bool &ready) {
803 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
804 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
805 constexpr useconds_t MAX_SLEEP_US = 10000;
806 uint32_t numSpin = 0;
807 useconds_t sleepUs = 1;
808
809 while(true) {
810 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
811 {
812 std::unique_lock<std::mutex> lock(mutex);
813 if (!ready) {
814 numSpin = 0;
815 sleepUs = 1;
816 cv.wait(lock);
817 }
818 copied.insert(accessors.begin(), accessors.end());
819 }
820 std::list<ConnectionId> erased;
821 for (auto it = copied.begin(); it != copied.end(); ++it) {
822 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
823 if (!impl) {
824 erased.push_back(it->first);
825 } else {
826 impl->handleInvalidateAck();
827 }
828 }
829 {
830 std::unique_lock<std::mutex> lock(mutex);
831 for (auto it = erased.begin(); it != erased.end(); ++it) {
832 accessors.erase(*it);
833 }
834 if (accessors.size() == 0) {
835 ready = false;
836 } else {
837 // TODO Use an efficient way to wait over FMQ.
838 // N.B. Since there is not a efficient way to wait over FMQ,
839 // polling over the FMQ is the current way to prevent draining
840 // CPU.
841 lock.unlock();
842 ++numSpin;
843 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
844 sleepUs < MAX_SLEEP_US) {
845 sleepUs *= 10;
846 }
847 if (numSpin % NUM_SPIN_TO_LOG == 0) {
848 ALOGW("invalidator thread spinning");
849 }
850 ::usleep(sleepUs);
851 }
852 }
853 }
854 }
855
AccessorInvalidator()856 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
857 std::thread invalidator(
858 invalidatorThread,
859 std::ref(mAccessors),
860 std::ref(mMutex),
861 std::ref(mCv),
862 std::ref(mReady));
863 invalidator.detach();
864 }
865
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)866 void Accessor::Impl::AccessorInvalidator::addAccessor(
867 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
868 bool notify = false;
869 std::unique_lock<std::mutex> lock(mMutex);
870 if (mAccessors.find(accessorId) == mAccessors.end()) {
871 if (!mReady) {
872 mReady = true;
873 notify = true;
874 }
875 mAccessors.insert(std::make_pair(accessorId, impl));
876 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
877 }
878 lock.unlock();
879 if (notify) {
880 mCv.notify_one();
881 }
882 }
883
delAccessor(uint32_t accessorId)884 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
885 std::lock_guard<std::mutex> lock(mMutex);
886 mAccessors.erase(accessorId);
887 ALOGV("buffer invalidation deleted bp:%u", accessorId);
888 if (mAccessors.size() == 0) {
889 mReady = false;
890 }
891 }
892
893 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
894
createInvalidator()895 void Accessor::Impl::createInvalidator() {
896 if (!sInvalidator) {
897 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
898 }
899 }
900
evictorThread(std::map<const std::weak_ptr<Accessor::Impl>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)901 void Accessor::Impl::evictorThread(
902 std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
903 std::mutex &mutex,
904 std::condition_variable &cv) {
905 std::list<const std::weak_ptr<Accessor::Impl>> evictList;
906 while (true) {
907 int expired = 0;
908 int evicted = 0;
909 {
910 nsecs_t now = systemTime();
911 std::unique_lock<std::mutex> lock(mutex);
912 if (accessors.size() == 0) {
913 cv.wait(lock);
914 }
915 auto it = accessors.begin();
916 while (it != accessors.end()) {
917 if (now > (it->second + kEvictDurationNs)) {
918 ++expired;
919 evictList.push_back(it->first);
920 it = accessors.erase(it);
921 } else {
922 ++it;
923 }
924 }
925 }
926 // evict idle accessors;
927 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
928 const std::shared_ptr<Accessor::Impl> accessor = it->lock();
929 if (accessor) {
930 accessor->cleanUp(true);
931 ++evicted;
932 }
933 }
934 if (expired > 0) {
935 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
936 }
937 evictList.clear();
938 ::usleep(kEvictGranularityNs / 1000);
939 }
940 }
941
AccessorEvictor()942 Accessor::Impl::AccessorEvictor::AccessorEvictor() {
943 std::thread evictor(
944 evictorThread,
945 std::ref(mAccessors),
946 std::ref(mMutex),
947 std::ref(mCv));
948 evictor.detach();
949 }
950
addAccessor(const std::weak_ptr<Accessor::Impl> & impl,nsecs_t ts)951 void Accessor::Impl::AccessorEvictor::addAccessor(
952 const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
953 std::lock_guard<std::mutex> lock(mMutex);
954 bool notify = mAccessors.empty();
955 auto it = mAccessors.find(impl);
956 if (it == mAccessors.end()) {
957 mAccessors.emplace(impl, ts);
958 } else {
959 it->second = ts;
960 }
961 if (notify) {
962 mCv.notify_one();
963 }
964 }
965
966 std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
967
createEvictor()968 void Accessor::Impl::createEvictor() {
969 if (!sEvictor) {
970 sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
971 }
972 }
973
scheduleEvictIfNeeded()974 void Accessor::Impl::scheduleEvictIfNeeded() {
975 nsecs_t now = systemTime();
976
977 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
978 mScheduleEvictTs = now;
979 sEvictor->addAccessor(shared_from_this(), now);
980 }
981 }
982
983 } // namespace implementation
984 } // namespace V2_0
985 } // namespace bufferpool
986 } // namespace media
987 } // namespace hardware
988 } // namespace android
989