1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <utils/Log.h>
24 #include "AccessorImpl.h"
25 #include "Connection.h"
26 
27 namespace android {
28 namespace hardware {
29 namespace media {
30 namespace bufferpool {
31 namespace V1_0 {
32 namespace implementation {
33 
34 namespace {
35     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
36     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
37 
38     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
39     static constexpr size_t kMinBufferCountForEviction = 40;
40 }
41 
42 // Buffer structure in bufferpool process
43 struct InternalBuffer {
44     BufferId mId;
45     size_t mOwnerCount;
46     size_t mTransactionCount;
47     const std::shared_ptr<BufferPoolAllocation> mAllocation;
48     const size_t mAllocSize;
49     const std::vector<uint8_t> mConfig;
50 
InternalBufferandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer51     InternalBuffer(
52             BufferId id,
53             const std::shared_ptr<BufferPoolAllocation> &alloc,
54             const size_t allocSize,
55             const std::vector<uint8_t> &allocConfig)
56             : mId(id), mOwnerCount(0), mTransactionCount(0),
57             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
58 
handleandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer59     const native_handle_t *handle() {
60         return mAllocation->handle();
61     }
62 };
63 
64 struct TransactionStatus {
65     TransactionId mId;
66     BufferId mBufferId;
67     ConnectionId mSender;
68     ConnectionId mReceiver;
69     BufferStatus mStatus;
70     int64_t mTimestampUs;
71     bool mSenderValidated;
72 
TransactionStatusandroid::hardware::media::bufferpool::V1_0::implementation::TransactionStatus73     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
74         mId = message.transactionId;
75         mBufferId = message.bufferId;
76         mStatus = message.newStatus;
77         mTimestampUs = timestampUs;
78         if (mStatus == BufferStatus::TRANSFER_TO) {
79             mSender = message.connectionId;
80             mReceiver = message.targetConnectionId;
81             mSenderValidated = true;
82         } else {
83             mSender = -1LL;
84             mReceiver = message.connectionId;
85             mSenderValidated = false;
86         }
87     }
88 };
89 
90 // Helper template methods for handling map of set.
91 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)92 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
93     auto iter = mapOfSet->find(key);
94     if (iter == mapOfSet->end()) {
95         std::set<U> valueSet{value};
96         mapOfSet->insert(std::make_pair(key, valueSet));
97         return true;
98     } else if (iter->second.find(value)  == iter->second.end()) {
99         iter->second.insert(value);
100         return true;
101     }
102     return false;
103 }
104 
105 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)106 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
107     bool ret = false;
108     auto iter = mapOfSet->find(key);
109     if (iter != mapOfSet->end()) {
110         if (iter->second.erase(value) > 0) {
111             ret = true;
112         }
113         if (iter->second.size() == 0) {
114             mapOfSet->erase(iter);
115         }
116     }
117     return ret;
118 }
119 
120 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)121 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
122     auto iter = mapOfSet->find(key);
123     if (iter != mapOfSet->end()) {
124         auto setIter = iter->second.find(value);
125         return setIter != iter->second.end();
126     }
127     return false;
128 }
129 
130 int32_t Accessor::Impl::sPid = getpid();
131 uint32_t Accessor::Impl::sSeqId = time(nullptr);
132 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)133 Accessor::Impl::Impl(
134         const std::shared_ptr<BufferPoolAllocator> &allocator)
135         : mAllocator(allocator) {}
136 
~Impl()137 Accessor::Impl::~Impl() {
138 }
139 
connect(const sp<Accessor> & accessor,sp<Connection> * connection,ConnectionId * pConnectionId,const QueueDescriptor ** fmqDescPtr)140 ResultStatus Accessor::Impl::connect(
141         const sp<Accessor> &accessor, sp<Connection> *connection,
142         ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) {
143     sp<Connection> newConnection = new Connection();
144     ResultStatus status = ResultStatus::CRITICAL_ERROR;
145     {
146         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
147         if (newConnection) {
148             ConnectionId id = (int64_t)sPid << 32 | sSeqId;
149             status = mBufferPool.mObserver.open(id, fmqDescPtr);
150             if (status == ResultStatus::OK) {
151                 newConnection->initialize(accessor, id);
152                 *connection = newConnection;
153                 *pConnectionId = id;
154                 ++sSeqId;
155             }
156         }
157         mBufferPool.processStatusMessages();
158         mBufferPool.cleanUp();
159     }
160     return status;
161 }
162 
close(ConnectionId connectionId)163 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
164     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
165     mBufferPool.processStatusMessages();
166     mBufferPool.handleClose(connectionId);
167     mBufferPool.mObserver.close(connectionId);
168     // Since close# will be called after all works are finished, it is OK to
169     // evict unused buffers.
170     mBufferPool.cleanUp(true);
171     return ResultStatus::OK;
172 }
173 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)174 ResultStatus Accessor::Impl::allocate(
175         ConnectionId connectionId, const std::vector<uint8_t>& params,
176         BufferId *bufferId, const native_handle_t** handle) {
177     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
178     mBufferPool.processStatusMessages();
179     ResultStatus status = ResultStatus::OK;
180     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
181         lock.unlock();
182         std::shared_ptr<BufferPoolAllocation> alloc;
183         size_t allocSize;
184         status = mAllocator->allocate(params, &alloc, &allocSize);
185         lock.lock();
186         if (status == ResultStatus::OK) {
187             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
188         }
189         ALOGV("create a buffer %d : %u %p",
190               status == ResultStatus::OK, *bufferId, *handle);
191     }
192     if (status == ResultStatus::OK) {
193         // TODO: handle ownBuffer failure
194         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
195     }
196     mBufferPool.cleanUp();
197     return status;
198 }
199 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)200 ResultStatus Accessor::Impl::fetch(
201         ConnectionId connectionId, TransactionId transactionId,
202         BufferId bufferId, const native_handle_t** handle) {
203     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
204     mBufferPool.processStatusMessages();
205     auto found = mBufferPool.mTransactions.find(transactionId);
206     if (found != mBufferPool.mTransactions.end() &&
207             contains(&mBufferPool.mPendingTransactions,
208                      connectionId, transactionId)) {
209         if (found->second->mSenderValidated &&
210                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
211                 found->second->mBufferId == bufferId) {
212             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
213             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
214             if (bufferIt != mBufferPool.mBuffers.end()) {
215                 mBufferPool.mStats.onBufferFetched();
216                 *handle = bufferIt->second->handle();
217                 return ResultStatus::OK;
218             }
219         }
220     }
221     mBufferPool.cleanUp();
222     return ResultStatus::CRITICAL_ERROR;
223 }
224 
cleanUp(bool clearCache)225 void Accessor::Impl::cleanUp(bool clearCache) {
226     // transaction timeout, buffer cacheing TTL handling
227     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
228     mBufferPool.processStatusMessages();
229     mBufferPool.cleanUp(clearCache);
230 }
231 
BufferPool()232 Accessor::Impl::Impl::BufferPool::BufferPool()
233     : mTimestampUs(getTimestampNow()),
234       mLastCleanUpUs(mTimestampUs),
235       mLastLogUs(mTimestampUs),
236       mSeq(0) {}
237 
238 
239 // Statistics helper
240 template<typename T, typename S>
percentage(T base,S total)241 int percentage(T base, S total) {
242     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
243 }
244 
~BufferPool()245 Accessor::Impl::Impl::BufferPool::~BufferPool() {
246     std::lock_guard<std::mutex> lock(mMutex);
247     ALOGD("Destruction - bufferpool %p "
248           "cached: %zu/%zuM, %zu/%d%% in use; "
249           "allocs: %zu, %d%% recycled; "
250           "transfers: %zu, %d%% unfetced",
251           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
252           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
253           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
254           mStats.mTotalTransfers,
255           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
256 }
257 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)258 bool Accessor::Impl::BufferPool::handleOwnBuffer(
259         ConnectionId connectionId, BufferId bufferId) {
260 
261     bool added = insert(&mUsingBuffers, connectionId, bufferId);
262     if (added) {
263         auto iter = mBuffers.find(bufferId);
264         iter->second->mOwnerCount++;
265     }
266     insert(&mUsingConnections, bufferId, connectionId);
267     return added;
268 }
269 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)270 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
271         ConnectionId connectionId, BufferId bufferId) {
272     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
273     if (deleted) {
274         auto iter = mBuffers.find(bufferId);
275         iter->second->mOwnerCount--;
276         if (iter->second->mOwnerCount == 0 &&
277                 iter->second->mTransactionCount == 0) {
278             mStats.onBufferUnused(iter->second->mAllocSize);
279             mFreeBuffers.insert(bufferId);
280         }
281     }
282     erase(&mUsingConnections, bufferId, connectionId);
283     ALOGV("release buffer %u : %d", bufferId, deleted);
284     return deleted;
285 }
286 
handleTransferTo(const BufferStatusMessage & message)287 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
288     auto completed = mCompletedTransactions.find(
289             message.transactionId);
290     if (completed != mCompletedTransactions.end()) {
291         // already completed
292         mCompletedTransactions.erase(completed);
293         return true;
294     }
295     // the buffer should exist and be owned.
296     auto bufferIter = mBuffers.find(message.bufferId);
297     if (bufferIter == mBuffers.end() ||
298             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
299         return false;
300     }
301     auto found = mTransactions.find(message.transactionId);
302     if (found != mTransactions.end()) {
303         // transfer_from was received earlier.
304         found->second->mSender = message.connectionId;
305         found->second->mSenderValidated = true;
306         return true;
307     }
308     // TODO: verify there is target connection Id
309     mStats.onBufferSent();
310     mTransactions.insert(std::make_pair(
311             message.transactionId,
312             std::make_unique<TransactionStatus>(message, mTimestampUs)));
313     insert(&mPendingTransactions, message.targetConnectionId,
314            message.transactionId);
315     bufferIter->second->mTransactionCount++;
316     return true;
317 }
318 
handleTransferFrom(const BufferStatusMessage & message)319 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
320     auto found = mTransactions.find(message.transactionId);
321     if (found == mTransactions.end()) {
322         // TODO: is it feasible to check ownership here?
323         mStats.onBufferSent();
324         mTransactions.insert(std::make_pair(
325                 message.transactionId,
326                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
327         insert(&mPendingTransactions, message.connectionId,
328                message.transactionId);
329         auto bufferIter = mBuffers.find(message.bufferId);
330         bufferIter->second->mTransactionCount++;
331     } else {
332         if (message.connectionId == found->second->mReceiver) {
333             found->second->mStatus = BufferStatus::TRANSFER_FROM;
334         }
335     }
336     return true;
337 }
338 
handleTransferResult(const BufferStatusMessage & message)339 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
340     auto found = mTransactions.find(message.transactionId);
341     if (found != mTransactions.end()) {
342         bool deleted = erase(&mPendingTransactions, message.connectionId,
343                              message.transactionId);
344         if (deleted) {
345             if (!found->second->mSenderValidated) {
346                 mCompletedTransactions.insert(message.transactionId);
347             }
348             auto bufferIter = mBuffers.find(message.bufferId);
349             if (message.newStatus == BufferStatus::TRANSFER_OK) {
350                 handleOwnBuffer(message.connectionId, message.bufferId);
351             }
352             bufferIter->second->mTransactionCount--;
353             if (bufferIter->second->mOwnerCount == 0
354                 && bufferIter->second->mTransactionCount == 0) {
355                 mStats.onBufferUnused(bufferIter->second->mAllocSize);
356                 mFreeBuffers.insert(message.bufferId);
357             }
358             mTransactions.erase(found);
359         }
360         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
361               message.bufferId, deleted);
362         return deleted;
363     }
364     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
365           message.bufferId);
366     return false;
367 }
368 
processStatusMessages()369 void Accessor::Impl::BufferPool::processStatusMessages() {
370     std::vector<BufferStatusMessage> messages;
371     mObserver.getBufferStatusChanges(messages);
372     mTimestampUs = getTimestampNow();
373     for (BufferStatusMessage& message: messages) {
374         bool ret = false;
375         switch (message.newStatus) {
376             case BufferStatus::NOT_USED:
377                 ret = handleReleaseBuffer(
378                         message.connectionId, message.bufferId);
379                 break;
380             case BufferStatus::USED:
381                 // not happening
382                 break;
383             case BufferStatus::TRANSFER_TO:
384                 ret = handleTransferTo(message);
385                 break;
386             case BufferStatus::TRANSFER_FROM:
387                 ret = handleTransferFrom(message);
388                 break;
389             case BufferStatus::TRANSFER_TIMEOUT:
390                 // TODO
391                 break;
392             case BufferStatus::TRANSFER_LOST:
393                 // TODO
394                 break;
395             case BufferStatus::TRANSFER_FETCH:
396                 // not happening
397                 break;
398             case BufferStatus::TRANSFER_OK:
399             case BufferStatus::TRANSFER_ERROR:
400                 ret = handleTransferResult(message);
401                 break;
402         }
403         if (ret == false) {
404             ALOGW("buffer status message processing failure - message : %d connection : %lld",
405                   message.newStatus, (long long)message.connectionId);
406         }
407     }
408     messages.clear();
409 }
410 
handleClose(ConnectionId connectionId)411 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
412     // Cleaning buffers
413     auto buffers = mUsingBuffers.find(connectionId);
414     if (buffers != mUsingBuffers.end()) {
415         for (const BufferId& bufferId : buffers->second) {
416             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
417             if (deleted) {
418                 auto bufferIter = mBuffers.find(bufferId);
419                 bufferIter->second->mOwnerCount--;
420                 if (bufferIter->second->mOwnerCount == 0 &&
421                         bufferIter->second->mTransactionCount == 0) {
422                     // TODO: handle freebuffer insert fail
423                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
424                     mFreeBuffers.insert(bufferId);
425                 }
426             }
427         }
428         mUsingBuffers.erase(buffers);
429     }
430 
431     // Cleaning transactions
432     auto pending = mPendingTransactions.find(connectionId);
433     if (pending != mPendingTransactions.end()) {
434         for (const TransactionId& transactionId : pending->second) {
435             auto iter = mTransactions.find(transactionId);
436             if (iter != mTransactions.end()) {
437                 if (!iter->second->mSenderValidated) {
438                     mCompletedTransactions.insert(transactionId);
439                 }
440                 BufferId bufferId = iter->second->mBufferId;
441                 auto bufferIter = mBuffers.find(bufferId);
442                 bufferIter->second->mTransactionCount--;
443                 if (bufferIter->second->mOwnerCount == 0 &&
444                     bufferIter->second->mTransactionCount == 0) {
445                     // TODO: handle freebuffer insert fail
446                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
447                     mFreeBuffers.insert(bufferId);
448                 }
449                 mTransactions.erase(iter);
450             }
451         }
452     }
453     return true;
454 }
455 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)456 bool Accessor::Impl::BufferPool::getFreeBuffer(
457         const std::shared_ptr<BufferPoolAllocator> &allocator,
458         const std::vector<uint8_t> &params, BufferId *pId,
459         const native_handle_t** handle) {
460     auto bufferIt = mFreeBuffers.begin();
461     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
462         BufferId bufferId = *bufferIt;
463         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
464             break;
465         }
466     }
467     if (bufferIt != mFreeBuffers.end()) {
468         BufferId id = *bufferIt;
469         mFreeBuffers.erase(bufferIt);
470         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
471         *handle = mBuffers[id]->handle();
472         *pId = id;
473         ALOGV("recycle a buffer %u %p", id, *handle);
474         return true;
475     }
476     return false;
477 }
478 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)479 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
480         const std::shared_ptr<BufferPoolAllocation> &alloc,
481         const size_t allocSize,
482         const std::vector<uint8_t> &params,
483         BufferId *pId,
484         const native_handle_t** handle) {
485 
486     BufferId bufferId = mSeq++;
487     if (mSeq == Connection::SYNC_BUFFERID) {
488         mSeq = 0;
489     }
490     std::unique_ptr<InternalBuffer> buffer =
491             std::make_unique<InternalBuffer>(
492                     bufferId, alloc, allocSize, params);
493     if (buffer) {
494         auto res = mBuffers.insert(std::make_pair(
495                 bufferId, std::move(buffer)));
496         if (res.second) {
497             mStats.onBufferAllocated(allocSize);
498             *handle = alloc->handle();
499             *pId = bufferId;
500             return ResultStatus::OK;
501         }
502     }
503     return ResultStatus::NO_MEMORY;
504 }
505 
cleanUp(bool clearCache)506 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
507     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
508         mLastCleanUpUs = mTimestampUs;
509         if (mTimestampUs > mLastLogUs + kLogDurationUs) {
510             mLastLogUs = mTimestampUs;
511             ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
512                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
513                   "%zu/%zu (fetch/transfer)",
514                   this, mStats.mBuffersCached, mStats.mSizeCached,
515                   mStats.mBuffersInUse, mStats.mSizeInUse,
516                   mStats.mTotalRecycles, mStats.mTotalAllocations,
517                   mStats.mTotalFetches, mStats.mTotalTransfers);
518         }
519         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
520             if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
521                     && mBuffers.size() < kMinBufferCountForEviction) {
522                 break;
523             }
524             auto it = mBuffers.find(*freeIt);
525             if (it != mBuffers.end() &&
526                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
527                 mStats.onBufferEvicted(it->second->mAllocSize);
528                 mBuffers.erase(it);
529                 freeIt = mFreeBuffers.erase(freeIt);
530             } else {
531                 ++freeIt;
532                 ALOGW("bufferpool inconsistent!");
533             }
534         }
535     }
536 }
537 
538 }  // namespace implementation
539 }  // namespace V1_0
540 }  // namespace bufferpool
541 }  // namespace media
542 }  // namespace hardware
543 }  // namespace android
544