1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolClient"
18 //#define LOG_NDEBUG 0
19 
20 #include <thread>
21 #include <utils/Log.h>
22 #include "BufferPoolClient.h"
23 #include "Connection.h"
24 
25 namespace android {
26 namespace hardware {
27 namespace media {
28 namespace bufferpool {
29 namespace V1_0 {
30 namespace implementation {
31 
32 static constexpr int64_t kReceiveTimeoutUs = 1000000; // 100ms
33 static constexpr int kPostMaxRetry = 3;
34 static constexpr int kCacheTtlUs = 1000000; // TODO: tune
35 
36 class BufferPoolClient::Impl
37         : public std::enable_shared_from_this<BufferPoolClient::Impl> {
38 public:
39     explicit Impl(const sp<Accessor> &accessor);
40 
41     explicit Impl(const sp<IAccessor> &accessor);
42 
isValid()43     bool isValid() {
44         return mValid;
45     }
46 
isLocal()47     bool isLocal() {
48         return mValid && mLocal;
49     }
50 
getConnectionId()51     ConnectionId getConnectionId() {
52         return mConnectionId;
53     }
54 
getAccessor()55     sp<IAccessor> &getAccessor() {
56         return mAccessor;
57     }
58 
59     bool isActive(int64_t *lastTransactionUs, bool clearCache);
60 
61     ResultStatus allocate(const std::vector<uint8_t> &params,
62                           native_handle_t **handle,
63                           std::shared_ptr<BufferPoolData> *buffer);
64 
65     ResultStatus receive(
66             TransactionId transactionId, BufferId bufferId,
67             int64_t timestampUs,
68             native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
69 
70     void postBufferRelease(BufferId bufferId);
71 
72     bool postSend(
73             BufferId bufferId, ConnectionId receiver,
74             TransactionId *transactionId, int64_t *timestampUs);
75 private:
76 
77     bool postReceive(
78             BufferId bufferId, TransactionId transactionId,
79             int64_t timestampUs);
80 
81     bool postReceiveResult(
82             BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
83 
84     void trySyncFromRemote();
85 
86     bool syncReleased();
87 
88     void evictCaches(bool clearCache = false);
89 
90     ResultStatus allocateBufferHandle(
91             const std::vector<uint8_t>& params, BufferId *bufferId,
92             native_handle_t **handle);
93 
94     ResultStatus fetchBufferHandle(
95             TransactionId transactionId, BufferId bufferId,
96             native_handle_t **handle);
97 
98     struct BlockPoolDataDtor;
99     struct ClientBuffer;
100 
101     bool mLocal;
102     bool mValid;
103     sp<IAccessor> mAccessor;
104     sp<Connection> mLocalConnection;
105     sp<IConnection> mRemoteConnection;
106     uint32_t mSeqId;
107     ConnectionId mConnectionId;
108     int64_t mLastEvictCacheUs;
109 
110     // CachedBuffers
111     struct BufferCache {
112         std::mutex mLock;
113         bool mCreating;
114         std::condition_variable mCreateCv;
115         std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
116         int mActive;
117         int64_t mLastChangeUs;
118 
BufferCacheandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::BufferCache119         BufferCache() : mCreating(false), mActive(0), mLastChangeUs(getTimestampNow()) {}
120 
incActive_landroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::BufferCache121         void incActive_l() {
122             ++mActive;
123             mLastChangeUs = getTimestampNow();
124         }
125 
decActive_landroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::BufferCache126         void decActive_l() {
127             --mActive;
128             mLastChangeUs = getTimestampNow();
129         }
130     } mCache;
131 
132     // FMQ - release notifier
133     struct {
134         std::mutex mLock;
135         // TODO: use only one list?(using one list may dealy sending messages?)
136         std::list<BufferId> mReleasingIds;
137         std::list<BufferId> mReleasedIds;
138         std::unique_ptr<BufferStatusChannel> mStatusChannel;
139     } mReleasing;
140 
141     // This lock is held during synchronization from remote side.
142     // In order to minimize remote calls and locking durtaion, this lock is held
143     // by best effort approach using try_lock().
144     std::mutex mRemoteSyncLock;
145 };
146 
147 struct BufferPoolClient::Impl::BlockPoolDataDtor {
BlockPoolDataDtorandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::BlockPoolDataDtor148     BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
149             : mImpl(impl) {}
150 
operator ()android::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::BlockPoolDataDtor151     void operator()(BufferPoolData *buffer) {
152         BufferId id = buffer->mId;
153         delete buffer;
154 
155         auto impl = mImpl.lock();
156         if (impl && impl->isValid()) {
157             impl->postBufferRelease(id);
158         }
159     }
160     const std::weak_ptr<BufferPoolClient::Impl> mImpl;
161 };
162 
163 struct BufferPoolClient::Impl::ClientBuffer {
164 private:
165     bool mInvalidated; // TODO: implement
166     int64_t mExpireUs;
167     bool mHasCache;
168     ConnectionId mConnectionId;
169     BufferId mId;
170     native_handle_t *mHandle;
171     std::weak_ptr<BufferPoolData> mCache;
172 
updateExpireandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer173     void updateExpire() {
174         mExpireUs = getTimestampNow() + kCacheTtlUs;
175     }
176 
177 public:
ClientBufferandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer178     ClientBuffer(
179             ConnectionId connectionId, BufferId id, native_handle_t *handle)
180             : mInvalidated(false), mHasCache(false),
181               mConnectionId(connectionId), mId(id), mHandle(handle) {
182         (void)mInvalidated;
183         mExpireUs = getTimestampNow() + kCacheTtlUs;
184     }
185 
~ClientBufferandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer186     ~ClientBuffer() {
187         if (mHandle) {
188             native_handle_close(mHandle);
189             native_handle_delete(mHandle);
190         }
191     }
192 
expireandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer193     bool expire() const {
194         int64_t now = getTimestampNow();
195         return now >= mExpireUs;
196     }
197 
hasCacheandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer198     bool hasCache() const {
199         return mHasCache;
200     }
201 
fetchCacheandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer202     std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
203         if (mHasCache) {
204             std::shared_ptr<BufferPoolData> cache = mCache.lock();
205             if (cache) {
206                 *pHandle = mHandle;
207             }
208             return cache;
209         }
210         return nullptr;
211     }
212 
createCacheandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer213     std::shared_ptr<BufferPoolData> createCache(
214             const std::shared_ptr<BufferPoolClient::Impl> &impl,
215             native_handle_t **pHandle) {
216         if (!mHasCache) {
217             // Allocates a raw ptr in order to avoid sending #postBufferRelease
218             // from deleter, in case of native_handle_clone failure.
219             BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
220             if (ptr) {
221                 std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
222                 if (cache) {
223                     mCache = cache;
224                     mHasCache = true;
225                     *pHandle = mHandle;
226                     return cache;
227                 }
228             }
229             if (ptr) {
230                 delete ptr;
231             }
232         }
233         return nullptr;
234     }
235 
onCacheReleaseandroid::hardware::media::bufferpool::V1_0::implementation::BufferPoolClient::Impl::ClientBuffer236     bool onCacheRelease() {
237         if (mHasCache) {
238             // TODO: verify mCache is not valid;
239             updateExpire();
240             mHasCache = false;
241             return true;
242         }
243         return false;
244     }
245 };
246 
Impl(const sp<Accessor> & accessor)247 BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor)
248     : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
249       mLastEvictCacheUs(getTimestampNow()) {
250     const QueueDescriptor *fmqDesc;
251     ResultStatus status = accessor->connect(
252             &mLocalConnection, &mConnectionId, &fmqDesc, true);
253     if (status == ResultStatus::OK) {
254         mReleasing.mStatusChannel =
255                 std::make_unique<BufferStatusChannel>(*fmqDesc);
256         mValid = mReleasing.mStatusChannel &&
257                 mReleasing.mStatusChannel->isValid();
258     }
259 }
260 
Impl(const sp<IAccessor> & accessor)261 BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor)
262     : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
263       mLastEvictCacheUs(getTimestampNow()) {
264     bool valid = false;
265     sp<IConnection>& outConnection = mRemoteConnection;
266     ConnectionId& id = mConnectionId;
267     std::unique_ptr<BufferStatusChannel>& outChannel =
268             mReleasing.mStatusChannel;
269     Return<void> transResult = accessor->connect(
270             [&valid, &outConnection, &id, &outChannel]
271             (ResultStatus status, sp<IConnection> connection,
272              ConnectionId connectionId, const QueueDescriptor& desc) {
273                 if (status == ResultStatus::OK) {
274                     outConnection = connection;
275                     id = connectionId;
276                     outChannel = std::make_unique<BufferStatusChannel>(desc);
277                     if (outChannel && outChannel->isValid()) {
278                         valid = true;
279                     }
280                 }
281             });
282     mValid = transResult.isOk() && valid;
283 }
284 
isActive(int64_t * lastTransactionUs,bool clearCache)285 bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionUs, bool clearCache) {
286     bool active = false;
287     {
288         std::lock_guard<std::mutex> lock(mCache.mLock);
289         syncReleased();
290         evictCaches(clearCache);
291         *lastTransactionUs = mCache.mLastChangeUs;
292         active = mCache.mActive > 0;
293     }
294     if (mValid && mLocal && mLocalConnection) {
295         mLocalConnection->cleanUp(clearCache);
296         return true;
297     }
298     return active;
299 }
300 
allocate(const std::vector<uint8_t> & params,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)301 ResultStatus BufferPoolClient::Impl::allocate(
302         const std::vector<uint8_t> &params,
303         native_handle_t **pHandle,
304         std::shared_ptr<BufferPoolData> *buffer) {
305     if (!mLocal || !mLocalConnection || !mValid) {
306         return ResultStatus::CRITICAL_ERROR;
307     }
308     BufferId bufferId;
309     native_handle_t *handle = nullptr;
310     buffer->reset();
311     ResultStatus status = allocateBufferHandle(params, &bufferId, &handle);
312     if (status == ResultStatus::OK) {
313         if (handle) {
314             std::unique_lock<std::mutex> lock(mCache.mLock);
315             syncReleased();
316             evictCaches();
317             auto cacheIt = mCache.mBuffers.find(bufferId);
318             if (cacheIt != mCache.mBuffers.end()) {
319                 // TODO: verify it is recycled. (not having active ref)
320                 mCache.mBuffers.erase(cacheIt);
321             }
322             auto clientBuffer = std::make_unique<ClientBuffer>(
323                     mConnectionId, bufferId, handle);
324             if (clientBuffer) {
325                 auto result = mCache.mBuffers.insert(std::make_pair(
326                         bufferId, std::move(clientBuffer)));
327                 if (result.second) {
328                     *buffer = result.first->second->createCache(
329                             shared_from_this(), pHandle);
330                     if (*buffer) {
331                         mCache.incActive_l();
332                     }
333                 }
334             }
335         }
336         if (!*buffer) {
337             ALOGV("client cache creation failure %d: %lld",
338                   handle != nullptr, (long long)mConnectionId);
339             status = ResultStatus::NO_MEMORY;
340             postBufferRelease(bufferId);
341         }
342     }
343     return status;
344 }
345 
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)346 ResultStatus BufferPoolClient::Impl::receive(
347         TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
348         native_handle_t **pHandle,
349         std::shared_ptr<BufferPoolData> *buffer) {
350     if (!mValid) {
351         return ResultStatus::CRITICAL_ERROR;
352     }
353     if (timestampUs != 0) {
354         timestampUs += kReceiveTimeoutUs;
355     }
356     if (!postReceive(bufferId, transactionId, timestampUs)) {
357         return ResultStatus::CRITICAL_ERROR;
358     }
359     ResultStatus status = ResultStatus::CRITICAL_ERROR;
360     buffer->reset();
361     while(1) {
362         std::unique_lock<std::mutex> lock(mCache.mLock);
363         syncReleased();
364         evictCaches();
365         auto cacheIt = mCache.mBuffers.find(bufferId);
366         if (cacheIt != mCache.mBuffers.end()) {
367             if (cacheIt->second->hasCache()) {
368                 *buffer = cacheIt->second->fetchCache(pHandle);
369                 if (!*buffer) {
370                     // check transfer time_out
371                     lock.unlock();
372                     std::this_thread::yield();
373                     continue;
374                 }
375                 ALOGV("client receive from reference %lld", (long long)mConnectionId);
376                 break;
377             } else {
378                 *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
379                 if (*buffer) {
380                     mCache.incActive_l();
381                 }
382                 ALOGV("client receive from cache %lld", (long long)mConnectionId);
383                 break;
384             }
385         } else {
386             if (!mCache.mCreating) {
387                 mCache.mCreating = true;
388                 lock.unlock();
389                 native_handle_t* handle = nullptr;
390                 status = fetchBufferHandle(transactionId, bufferId, &handle);
391                 lock.lock();
392                 if (status == ResultStatus::OK) {
393                     if (handle) {
394                         auto clientBuffer = std::make_unique<ClientBuffer>(
395                                 mConnectionId, bufferId, handle);
396                         if (clientBuffer) {
397                             auto result = mCache.mBuffers.insert(
398                                     std::make_pair(bufferId, std::move(
399                                             clientBuffer)));
400                             if (result.second) {
401                                 *buffer = result.first->second->createCache(
402                                         shared_from_this(), pHandle);
403                                 if (*buffer) {
404                                     mCache.incActive_l();
405                                 }
406                             }
407                         }
408                     }
409                     if (!*buffer) {
410                         status = ResultStatus::NO_MEMORY;
411                     }
412                 }
413                 mCache.mCreating = false;
414                 lock.unlock();
415                 mCache.mCreateCv.notify_all();
416                 break;
417             }
418             mCache.mCreateCv.wait(lock);
419         }
420     }
421     bool needsSync = false;
422     bool posted = postReceiveResult(bufferId, transactionId,
423                                       *buffer ? true : false, &needsSync);
424     ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
425           *buffer ? "ok" : "fail", posted);
426     if (mValid && mLocal && mLocalConnection) {
427         mLocalConnection->cleanUp(false);
428     }
429     if (needsSync && mRemoteConnection) {
430         trySyncFromRemote();
431     }
432     if (*buffer) {
433         if (!posted) {
434             buffer->reset();
435             return ResultStatus::CRITICAL_ERROR;
436         }
437         return ResultStatus::OK;
438     }
439     return status;
440 }
441 
442 
postBufferRelease(BufferId bufferId)443 void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
444     std::lock_guard<std::mutex> lock(mReleasing.mLock);
445     mReleasing.mReleasingIds.push_back(bufferId);
446     mReleasing.mStatusChannel->postBufferRelease(
447             mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
448 }
449 
450 // TODO: revise ad-hoc posting data structure
postSend(BufferId bufferId,ConnectionId receiver,TransactionId * transactionId,int64_t * timestampUs)451 bool BufferPoolClient::Impl::postSend(
452         BufferId bufferId, ConnectionId receiver,
453         TransactionId *transactionId, int64_t *timestampUs) {
454     bool ret = false;
455     bool needsSync = false;
456     {
457         std::lock_guard<std::mutex> lock(mReleasing.mLock);
458         *timestampUs = getTimestampNow();
459         *transactionId = (mConnectionId << 32) | mSeqId++;
460         // TODO: retry, add timeout, target?
461         ret =  mReleasing.mStatusChannel->postBufferStatusMessage(
462                 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
463                 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
464         needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
465     }
466     if (mValid && mLocal && mLocalConnection) {
467         mLocalConnection->cleanUp(false);
468     }
469     if (needsSync && mRemoteConnection) {
470         trySyncFromRemote();
471     }
472     return ret;
473 }
474 
postReceive(BufferId bufferId,TransactionId transactionId,int64_t timestampUs)475 bool BufferPoolClient::Impl::postReceive(
476         BufferId bufferId, TransactionId transactionId, int64_t timestampUs) {
477     for (int i = 0; i < kPostMaxRetry; ++i) {
478         std::unique_lock<std::mutex> lock(mReleasing.mLock);
479         int64_t now = getTimestampNow();
480         if (timestampUs == 0 || now < timestampUs) {
481             bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
482                     transactionId, bufferId, BufferStatus::TRANSFER_FROM,
483                     mConnectionId, -1, mReleasing.mReleasingIds,
484                     mReleasing.mReleasedIds);
485             if (result) {
486                 return true;
487             }
488             lock.unlock();
489             std::this_thread::yield();
490         } else {
491             mReleasing.mStatusChannel->postBufferStatusMessage(
492                     transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
493                     mConnectionId, -1, mReleasing.mReleasingIds,
494                     mReleasing.mReleasedIds);
495             return false;
496         }
497     }
498     return false;
499 }
500 
postReceiveResult(BufferId bufferId,TransactionId transactionId,bool result,bool * needsSync)501 bool BufferPoolClient::Impl::postReceiveResult(
502         BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
503     std::lock_guard<std::mutex> lock(mReleasing.mLock);
504     // TODO: retry, add timeout
505     bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
506             transactionId, bufferId,
507             result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
508             mConnectionId, -1, mReleasing.mReleasingIds,
509             mReleasing.mReleasedIds);
510     *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
511     return ret;
512 }
513 
trySyncFromRemote()514 void BufferPoolClient::Impl::trySyncFromRemote() {
515     if (mRemoteSyncLock.try_lock()) {
516         bool needsSync = false;
517         {
518             std::lock_guard<std::mutex> lock(mReleasing.mLock);
519             needsSync = mReleasing.mStatusChannel->needsSync();
520         }
521         if (needsSync) {
522             TransactionId transactionId = (mConnectionId << 32);
523             BufferId bufferId = Connection::SYNC_BUFFERID;
524             Return<void> transResult = mRemoteConnection->fetch(
525                     transactionId, bufferId,
526                     []
527                     (ResultStatus outStatus, Buffer outBuffer) {
528                         (void) outStatus;
529                         (void) outBuffer;
530                     });
531             if(!transResult.isOk()) {
532                 ALOGD("sync from client %lld failed: bufferpool process died.",
533                       (long long)mConnectionId);
534             }
535         }
536         mRemoteSyncLock.unlock();
537     }
538 }
539 
540 // should have mCache.mLock
syncReleased()541 bool BufferPoolClient::Impl::syncReleased() {
542     std::lock_guard<std::mutex> lock(mReleasing.mLock);
543     if (mReleasing.mReleasingIds.size() > 0) {
544         mReleasing.mStatusChannel->postBufferRelease(
545                 mConnectionId, mReleasing.mReleasingIds,
546                 mReleasing.mReleasedIds);
547     }
548     if (mReleasing.mReleasedIds.size() > 0) {
549         for (BufferId& id: mReleasing.mReleasedIds) {
550             ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
551             auto found = mCache.mBuffers.find(id);
552             if (found != mCache.mBuffers.end()) {
553                 if (found->second->onCacheRelease()) {
554                     mCache.decActive_l();
555                 } else {
556                     // should not happen!
557                     ALOGW("client %lld cache release status inconsitent!",
558                           (long long)mConnectionId);
559                 }
560             } else {
561                 // should not happen!
562                 ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
563             }
564         }
565         mReleasing.mReleasedIds.clear();
566         return true;
567     }
568     return false;
569 }
570 
571 // should have mCache.mLock
evictCaches(bool clearCache)572 void BufferPoolClient::Impl::evictCaches(bool clearCache) {
573     int64_t now = getTimestampNow();
574     if (now >= mLastEvictCacheUs + kCacheTtlUs || clearCache) {
575         size_t evicted = 0;
576         for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
577             if (!it->second->hasCache() && (it->second->expire() || clearCache)) {
578                 it = mCache.mBuffers.erase(it);
579                 ++evicted;
580             } else {
581                 ++it;
582             }
583         }
584         ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
585               (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
586         mLastEvictCacheUs = now;
587     }
588 }
589 
allocateBufferHandle(const std::vector<uint8_t> & params,BufferId * bufferId,native_handle_t ** handle)590 ResultStatus BufferPoolClient::Impl::allocateBufferHandle(
591         const std::vector<uint8_t>& params, BufferId *bufferId,
592         native_handle_t** handle) {
593     if (mLocalConnection) {
594         const native_handle_t* allocHandle = nullptr;
595         ResultStatus status = mLocalConnection->allocate(
596                 params, bufferId, &allocHandle);
597         if (status == ResultStatus::OK) {
598             *handle = native_handle_clone(allocHandle);
599         }
600         ALOGV("client allocate result %lld %d : %u clone %p",
601               (long long)mConnectionId, status == ResultStatus::OK,
602               *handle ? *bufferId : 0 , *handle);
603         return status;
604     }
605     return ResultStatus::CRITICAL_ERROR;
606 }
607 
fetchBufferHandle(TransactionId transactionId,BufferId bufferId,native_handle_t ** handle)608 ResultStatus BufferPoolClient::Impl::fetchBufferHandle(
609         TransactionId transactionId, BufferId bufferId,
610         native_handle_t **handle) {
611     sp<IConnection> connection;
612     if (mLocal) {
613         connection = mLocalConnection;
614     } else {
615         connection = mRemoteConnection;
616     }
617     ResultStatus status;
618     Return<void> transResult = connection->fetch(
619             transactionId, bufferId,
620             [&status, &handle]
621             (ResultStatus outStatus, Buffer outBuffer) {
622                 status = outStatus;
623                 if (status == ResultStatus::OK) {
624                     *handle = native_handle_clone(
625                             outBuffer.buffer.getNativeHandle());
626                 }
627             });
628     return transResult.isOk() ? status : ResultStatus::CRITICAL_ERROR;
629 }
630 
631 
BufferPoolClient(const sp<Accessor> & accessor)632 BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor) {
633     mImpl = std::make_shared<Impl>(accessor);
634 }
635 
BufferPoolClient(const sp<IAccessor> & accessor)636 BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor) {
637     mImpl = std::make_shared<Impl>(accessor);
638 }
639 
~BufferPoolClient()640 BufferPoolClient::~BufferPoolClient() {
641     // TODO: how to handle orphaned buffers?
642 }
643 
isValid()644 bool BufferPoolClient::isValid() {
645     return mImpl && mImpl->isValid();
646 }
647 
isLocal()648 bool BufferPoolClient::isLocal() {
649     return mImpl && mImpl->isLocal();
650 }
651 
isActive(int64_t * lastTransactionUs,bool clearCache)652 bool BufferPoolClient::isActive(int64_t *lastTransactionUs, bool clearCache) {
653     if (!isValid()) {
654         *lastTransactionUs = 0;
655         return false;
656     }
657     return mImpl->isActive(lastTransactionUs, clearCache);
658 }
659 
getConnectionId()660 ConnectionId BufferPoolClient::getConnectionId() {
661     if (isValid()) {
662         return mImpl->getConnectionId();
663     }
664     return -1;
665 }
666 
getAccessor(sp<IAccessor> * accessor)667 ResultStatus BufferPoolClient::getAccessor(sp<IAccessor> *accessor) {
668     if (isValid()) {
669         *accessor = mImpl->getAccessor();
670         return ResultStatus::OK;
671     }
672     return ResultStatus::CRITICAL_ERROR;
673 }
674 
allocate(const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)675 ResultStatus BufferPoolClient::allocate(
676         const std::vector<uint8_t> &params,
677         native_handle_t **handle,
678         std::shared_ptr<BufferPoolData> *buffer) {
679     if (isValid()) {
680         return mImpl->allocate(params, handle, buffer);
681     }
682     return ResultStatus::CRITICAL_ERROR;
683 }
684 
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)685 ResultStatus BufferPoolClient::receive(
686         TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
687         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
688     if (isValid()) {
689         return mImpl->receive(transactionId, bufferId, timestampUs, handle, buffer);
690     }
691     return ResultStatus::CRITICAL_ERROR;
692 }
693 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)694 ResultStatus BufferPoolClient::postSend(
695         ConnectionId receiverId,
696         const std::shared_ptr<BufferPoolData> &buffer,
697         TransactionId *transactionId,
698         int64_t *timestampUs) {
699     if (isValid()) {
700         bool result = mImpl->postSend(
701                 buffer->mId, receiverId, transactionId, timestampUs);
702         return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
703     }
704     return ResultStatus::CRITICAL_ERROR;
705 }
706 
707 }  // namespace implementation
708 }  // namespace V1_0
709 }  // namespace bufferpool
710 }  // namespace media
711 }  // namespace hardware
712 }  // namespace android
713