1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include "AccessorImpl.h"
25 #include "Connection.h"
26
27 namespace android {
28 namespace hardware {
29 namespace media {
30 namespace bufferpool {
31 namespace V1_0 {
32 namespace implementation {
33
34 namespace {
35 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
36 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
37
38 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
39 static constexpr size_t kMinBufferCountForEviction = 40;
40 }
41
42 // Buffer structure in bufferpool process
43 struct InternalBuffer {
44 BufferId mId;
45 size_t mOwnerCount;
46 size_t mTransactionCount;
47 const std::shared_ptr<BufferPoolAllocation> mAllocation;
48 const size_t mAllocSize;
49 const std::vector<uint8_t> mConfig;
50
InternalBufferandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer51 InternalBuffer(
52 BufferId id,
53 const std::shared_ptr<BufferPoolAllocation> &alloc,
54 const size_t allocSize,
55 const std::vector<uint8_t> &allocConfig)
56 : mId(id), mOwnerCount(0), mTransactionCount(0),
57 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
58
handleandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer59 const native_handle_t *handle() {
60 return mAllocation->handle();
61 }
62 };
63
64 struct TransactionStatus {
65 TransactionId mId;
66 BufferId mBufferId;
67 ConnectionId mSender;
68 ConnectionId mReceiver;
69 BufferStatus mStatus;
70 int64_t mTimestampUs;
71 bool mSenderValidated;
72
TransactionStatusandroid::hardware::media::bufferpool::V1_0::implementation::TransactionStatus73 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
74 mId = message.transactionId;
75 mBufferId = message.bufferId;
76 mStatus = message.newStatus;
77 mTimestampUs = timestampUs;
78 if (mStatus == BufferStatus::TRANSFER_TO) {
79 mSender = message.connectionId;
80 mReceiver = message.targetConnectionId;
81 mSenderValidated = true;
82 } else {
83 mSender = -1LL;
84 mReceiver = message.connectionId;
85 mSenderValidated = false;
86 }
87 }
88 };
89
90 // Helper template methods for handling map of set.
91 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)92 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
93 auto iter = mapOfSet->find(key);
94 if (iter == mapOfSet->end()) {
95 std::set<U> valueSet{value};
96 mapOfSet->insert(std::make_pair(key, valueSet));
97 return true;
98 } else if (iter->second.find(value) == iter->second.end()) {
99 iter->second.insert(value);
100 return true;
101 }
102 return false;
103 }
104
105 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)106 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
107 bool ret = false;
108 auto iter = mapOfSet->find(key);
109 if (iter != mapOfSet->end()) {
110 if (iter->second.erase(value) > 0) {
111 ret = true;
112 }
113 if (iter->second.size() == 0) {
114 mapOfSet->erase(iter);
115 }
116 }
117 return ret;
118 }
119
120 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)121 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
122 auto iter = mapOfSet->find(key);
123 if (iter != mapOfSet->end()) {
124 auto setIter = iter->second.find(value);
125 return setIter != iter->second.end();
126 }
127 return false;
128 }
129
130 int32_t Accessor::Impl::sPid = getpid();
131 uint32_t Accessor::Impl::sSeqId = time(nullptr);
132
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)133 Accessor::Impl::Impl(
134 const std::shared_ptr<BufferPoolAllocator> &allocator)
135 : mAllocator(allocator) {}
136
~Impl()137 Accessor::Impl::~Impl() {
138 }
139
connect(const sp<Accessor> & accessor,sp<Connection> * connection,ConnectionId * pConnectionId,const QueueDescriptor ** fmqDescPtr)140 ResultStatus Accessor::Impl::connect(
141 const sp<Accessor> &accessor, sp<Connection> *connection,
142 ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) {
143 sp<Connection> newConnection = new Connection();
144 ResultStatus status = ResultStatus::CRITICAL_ERROR;
145 {
146 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
147 if (newConnection) {
148 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
149 status = mBufferPool.mObserver.open(id, fmqDescPtr);
150 if (status == ResultStatus::OK) {
151 newConnection->initialize(accessor, id);
152 *connection = newConnection;
153 *pConnectionId = id;
154 ++sSeqId;
155 }
156 }
157 mBufferPool.processStatusMessages();
158 mBufferPool.cleanUp();
159 }
160 return status;
161 }
162
close(ConnectionId connectionId)163 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
164 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
165 mBufferPool.processStatusMessages();
166 mBufferPool.handleClose(connectionId);
167 mBufferPool.mObserver.close(connectionId);
168 // Since close# will be called after all works are finished, it is OK to
169 // evict unused buffers.
170 mBufferPool.cleanUp(true);
171 return ResultStatus::OK;
172 }
173
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)174 ResultStatus Accessor::Impl::allocate(
175 ConnectionId connectionId, const std::vector<uint8_t>& params,
176 BufferId *bufferId, const native_handle_t** handle) {
177 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
178 mBufferPool.processStatusMessages();
179 ResultStatus status = ResultStatus::OK;
180 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
181 lock.unlock();
182 std::shared_ptr<BufferPoolAllocation> alloc;
183 size_t allocSize;
184 status = mAllocator->allocate(params, &alloc, &allocSize);
185 lock.lock();
186 if (status == ResultStatus::OK) {
187 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
188 }
189 ALOGV("create a buffer %d : %u %p",
190 status == ResultStatus::OK, *bufferId, *handle);
191 }
192 if (status == ResultStatus::OK) {
193 // TODO: handle ownBuffer failure
194 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
195 }
196 mBufferPool.cleanUp();
197 return status;
198 }
199
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)200 ResultStatus Accessor::Impl::fetch(
201 ConnectionId connectionId, TransactionId transactionId,
202 BufferId bufferId, const native_handle_t** handle) {
203 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
204 mBufferPool.processStatusMessages();
205 auto found = mBufferPool.mTransactions.find(transactionId);
206 if (found != mBufferPool.mTransactions.end() &&
207 contains(&mBufferPool.mPendingTransactions,
208 connectionId, transactionId)) {
209 if (found->second->mSenderValidated &&
210 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
211 found->second->mBufferId == bufferId) {
212 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
213 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
214 if (bufferIt != mBufferPool.mBuffers.end()) {
215 mBufferPool.mStats.onBufferFetched();
216 *handle = bufferIt->second->handle();
217 return ResultStatus::OK;
218 }
219 }
220 }
221 mBufferPool.cleanUp();
222 return ResultStatus::CRITICAL_ERROR;
223 }
224
cleanUp(bool clearCache)225 void Accessor::Impl::cleanUp(bool clearCache) {
226 // transaction timeout, buffer cacheing TTL handling
227 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
228 mBufferPool.processStatusMessages();
229 mBufferPool.cleanUp(clearCache);
230 }
231
BufferPool()232 Accessor::Impl::Impl::BufferPool::BufferPool()
233 : mTimestampUs(getTimestampNow()),
234 mLastCleanUpUs(mTimestampUs),
235 mLastLogUs(mTimestampUs),
236 mSeq(0) {}
237
238
239 // Statistics helper
240 template<typename T, typename S>
percentage(T base,S total)241 int percentage(T base, S total) {
242 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
243 }
244
~BufferPool()245 Accessor::Impl::Impl::BufferPool::~BufferPool() {
246 std::lock_guard<std::mutex> lock(mMutex);
247 ALOGD("Destruction - bufferpool %p "
248 "cached: %zu/%zuM, %zu/%d%% in use; "
249 "allocs: %zu, %d%% recycled; "
250 "transfers: %zu, %d%% unfetced",
251 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
252 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
253 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
254 mStats.mTotalTransfers,
255 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
256 }
257
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)258 bool Accessor::Impl::BufferPool::handleOwnBuffer(
259 ConnectionId connectionId, BufferId bufferId) {
260
261 bool added = insert(&mUsingBuffers, connectionId, bufferId);
262 if (added) {
263 auto iter = mBuffers.find(bufferId);
264 iter->second->mOwnerCount++;
265 }
266 insert(&mUsingConnections, bufferId, connectionId);
267 return added;
268 }
269
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)270 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
271 ConnectionId connectionId, BufferId bufferId) {
272 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
273 if (deleted) {
274 auto iter = mBuffers.find(bufferId);
275 iter->second->mOwnerCount--;
276 if (iter->second->mOwnerCount == 0 &&
277 iter->second->mTransactionCount == 0) {
278 mStats.onBufferUnused(iter->second->mAllocSize);
279 mFreeBuffers.insert(bufferId);
280 }
281 }
282 erase(&mUsingConnections, bufferId, connectionId);
283 ALOGV("release buffer %u : %d", bufferId, deleted);
284 return deleted;
285 }
286
handleTransferTo(const BufferStatusMessage & message)287 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
288 auto completed = mCompletedTransactions.find(
289 message.transactionId);
290 if (completed != mCompletedTransactions.end()) {
291 // already completed
292 mCompletedTransactions.erase(completed);
293 return true;
294 }
295 // the buffer should exist and be owned.
296 auto bufferIter = mBuffers.find(message.bufferId);
297 if (bufferIter == mBuffers.end() ||
298 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
299 return false;
300 }
301 auto found = mTransactions.find(message.transactionId);
302 if (found != mTransactions.end()) {
303 // transfer_from was received earlier.
304 found->second->mSender = message.connectionId;
305 found->second->mSenderValidated = true;
306 return true;
307 }
308 // TODO: verify there is target connection Id
309 mStats.onBufferSent();
310 mTransactions.insert(std::make_pair(
311 message.transactionId,
312 std::make_unique<TransactionStatus>(message, mTimestampUs)));
313 insert(&mPendingTransactions, message.targetConnectionId,
314 message.transactionId);
315 bufferIter->second->mTransactionCount++;
316 return true;
317 }
318
handleTransferFrom(const BufferStatusMessage & message)319 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
320 auto found = mTransactions.find(message.transactionId);
321 if (found == mTransactions.end()) {
322 // TODO: is it feasible to check ownership here?
323 mStats.onBufferSent();
324 mTransactions.insert(std::make_pair(
325 message.transactionId,
326 std::make_unique<TransactionStatus>(message, mTimestampUs)));
327 insert(&mPendingTransactions, message.connectionId,
328 message.transactionId);
329 auto bufferIter = mBuffers.find(message.bufferId);
330 bufferIter->second->mTransactionCount++;
331 } else {
332 if (message.connectionId == found->second->mReceiver) {
333 found->second->mStatus = BufferStatus::TRANSFER_FROM;
334 }
335 }
336 return true;
337 }
338
handleTransferResult(const BufferStatusMessage & message)339 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
340 auto found = mTransactions.find(message.transactionId);
341 if (found != mTransactions.end()) {
342 bool deleted = erase(&mPendingTransactions, message.connectionId,
343 message.transactionId);
344 if (deleted) {
345 if (!found->second->mSenderValidated) {
346 mCompletedTransactions.insert(message.transactionId);
347 }
348 auto bufferIter = mBuffers.find(message.bufferId);
349 if (message.newStatus == BufferStatus::TRANSFER_OK) {
350 handleOwnBuffer(message.connectionId, message.bufferId);
351 }
352 bufferIter->second->mTransactionCount--;
353 if (bufferIter->second->mOwnerCount == 0
354 && bufferIter->second->mTransactionCount == 0) {
355 mStats.onBufferUnused(bufferIter->second->mAllocSize);
356 mFreeBuffers.insert(message.bufferId);
357 }
358 mTransactions.erase(found);
359 }
360 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
361 message.bufferId, deleted);
362 return deleted;
363 }
364 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
365 message.bufferId);
366 return false;
367 }
368
processStatusMessages()369 void Accessor::Impl::BufferPool::processStatusMessages() {
370 std::vector<BufferStatusMessage> messages;
371 mObserver.getBufferStatusChanges(messages);
372 mTimestampUs = getTimestampNow();
373 for (BufferStatusMessage& message: messages) {
374 bool ret = false;
375 switch (message.newStatus) {
376 case BufferStatus::NOT_USED:
377 ret = handleReleaseBuffer(
378 message.connectionId, message.bufferId);
379 break;
380 case BufferStatus::USED:
381 // not happening
382 break;
383 case BufferStatus::TRANSFER_TO:
384 ret = handleTransferTo(message);
385 break;
386 case BufferStatus::TRANSFER_FROM:
387 ret = handleTransferFrom(message);
388 break;
389 case BufferStatus::TRANSFER_TIMEOUT:
390 // TODO
391 break;
392 case BufferStatus::TRANSFER_LOST:
393 // TODO
394 break;
395 case BufferStatus::TRANSFER_FETCH:
396 // not happening
397 break;
398 case BufferStatus::TRANSFER_OK:
399 case BufferStatus::TRANSFER_ERROR:
400 ret = handleTransferResult(message);
401 break;
402 }
403 if (ret == false) {
404 ALOGW("buffer status message processing failure - message : %d connection : %lld",
405 message.newStatus, (long long)message.connectionId);
406 }
407 }
408 messages.clear();
409 }
410
handleClose(ConnectionId connectionId)411 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
412 // Cleaning buffers
413 auto buffers = mUsingBuffers.find(connectionId);
414 if (buffers != mUsingBuffers.end()) {
415 for (const BufferId& bufferId : buffers->second) {
416 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
417 if (deleted) {
418 auto bufferIter = mBuffers.find(bufferId);
419 bufferIter->second->mOwnerCount--;
420 if (bufferIter->second->mOwnerCount == 0 &&
421 bufferIter->second->mTransactionCount == 0) {
422 // TODO: handle freebuffer insert fail
423 mStats.onBufferUnused(bufferIter->second->mAllocSize);
424 mFreeBuffers.insert(bufferId);
425 }
426 }
427 }
428 mUsingBuffers.erase(buffers);
429 }
430
431 // Cleaning transactions
432 auto pending = mPendingTransactions.find(connectionId);
433 if (pending != mPendingTransactions.end()) {
434 for (const TransactionId& transactionId : pending->second) {
435 auto iter = mTransactions.find(transactionId);
436 if (iter != mTransactions.end()) {
437 if (!iter->second->mSenderValidated) {
438 mCompletedTransactions.insert(transactionId);
439 }
440 BufferId bufferId = iter->second->mBufferId;
441 auto bufferIter = mBuffers.find(bufferId);
442 bufferIter->second->mTransactionCount--;
443 if (bufferIter->second->mOwnerCount == 0 &&
444 bufferIter->second->mTransactionCount == 0) {
445 // TODO: handle freebuffer insert fail
446 mStats.onBufferUnused(bufferIter->second->mAllocSize);
447 mFreeBuffers.insert(bufferId);
448 }
449 mTransactions.erase(iter);
450 }
451 }
452 }
453 return true;
454 }
455
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)456 bool Accessor::Impl::BufferPool::getFreeBuffer(
457 const std::shared_ptr<BufferPoolAllocator> &allocator,
458 const std::vector<uint8_t> ¶ms, BufferId *pId,
459 const native_handle_t** handle) {
460 auto bufferIt = mFreeBuffers.begin();
461 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
462 BufferId bufferId = *bufferIt;
463 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
464 break;
465 }
466 }
467 if (bufferIt != mFreeBuffers.end()) {
468 BufferId id = *bufferIt;
469 mFreeBuffers.erase(bufferIt);
470 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
471 *handle = mBuffers[id]->handle();
472 *pId = id;
473 ALOGV("recycle a buffer %u %p", id, *handle);
474 return true;
475 }
476 return false;
477 }
478
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)479 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
480 const std::shared_ptr<BufferPoolAllocation> &alloc,
481 const size_t allocSize,
482 const std::vector<uint8_t> ¶ms,
483 BufferId *pId,
484 const native_handle_t** handle) {
485
486 BufferId bufferId = mSeq++;
487 if (mSeq == Connection::SYNC_BUFFERID) {
488 mSeq = 0;
489 }
490 std::unique_ptr<InternalBuffer> buffer =
491 std::make_unique<InternalBuffer>(
492 bufferId, alloc, allocSize, params);
493 if (buffer) {
494 auto res = mBuffers.insert(std::make_pair(
495 bufferId, std::move(buffer)));
496 if (res.second) {
497 mStats.onBufferAllocated(allocSize);
498 *handle = alloc->handle();
499 *pId = bufferId;
500 return ResultStatus::OK;
501 }
502 }
503 return ResultStatus::NO_MEMORY;
504 }
505
cleanUp(bool clearCache)506 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
507 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
508 mLastCleanUpUs = mTimestampUs;
509 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
510 mLastLogUs = mTimestampUs;
511 ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
512 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
513 "%zu/%zu (fetch/transfer)",
514 this, mStats.mBuffersCached, mStats.mSizeCached,
515 mStats.mBuffersInUse, mStats.mSizeInUse,
516 mStats.mTotalRecycles, mStats.mTotalAllocations,
517 mStats.mTotalFetches, mStats.mTotalTransfers);
518 }
519 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
520 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
521 && mBuffers.size() < kMinBufferCountForEviction) {
522 break;
523 }
524 auto it = mBuffers.find(*freeIt);
525 if (it != mBuffers.end() &&
526 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
527 mStats.onBufferEvicted(it->second->mAllocSize);
528 mBuffers.erase(it);
529 freeIt = mFreeBuffers.erase(freeIt);
530 } else {
531 ++freeIt;
532 ALOGW("bufferpool inconsistent!");
533 }
534 }
535 }
536 }
537
538 } // namespace implementation
539 } // namespace V1_0
540 } // namespace bufferpool
541 } // namespace media
542 } // namespace hardware
543 } // namespace android
544