1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "StreamBufferCacheManager"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20 
21 #include <cutils/native_handle.h>
22 #include <cutils/properties.h>
23 #include <log/log.h>
24 #include <sync/sync.h>
25 #include <sys/resource.h>
26 #include <utils/Trace.h>
27 
28 #include <chrono>
29 
30 #include "stream_buffer_cache_manager.h"
31 #include "utils.h"
32 
33 using namespace std::chrono_literals;
34 
35 namespace android {
36 namespace google_camera_hal {
37 
38 // setprop key for raising buffer allocation priority
39 inline constexpr char kRaiseBufAllocationPriority[] =
40     "persist.vendor.camera.raise_buf_allocation_priority";
41 
42 // For CTS testCameraDeviceCaptureFailure, it holds image buffers and hal hits
43 // refill buffer timeout. Large timeout time also results in close session time
44 // is larger than 5 second in this test case. Typical buffer request from
45 // provider(e.g. framework) usually takes 1~2 ms. Small timeout time here may
46 // cause more framedrop in certain cases. But large timeout time can lead to
47 // extra long delay of traffic(in both ways) between the framework and the layer
48 // below HWL.
49 static constexpr auto kBufferWaitingTimeOutSec = 400ms;
50 
StreamBufferCacheManager()51 StreamBufferCacheManager::StreamBufferCacheManager() {
52   workload_thread_ = std::thread([this] { this->WorkloadThreadLoop(); });
53   if (utils::SupportRealtimeThread()) {
54     status_t res = utils::SetRealtimeThread(workload_thread_.native_handle());
55     if (res != OK) {
56       ALOGE("%s: SetRealtimeThread fail", __FUNCTION__);
57     } else {
58       ALOGI("%s: SetRealtimeThread OK", __FUNCTION__);
59     }
60   }
61 }
62 
~StreamBufferCacheManager()63 StreamBufferCacheManager::~StreamBufferCacheManager() {
64   ALOGI("%s: Destroying stream buffer cache manager.", __FUNCTION__);
65   {
66     std::lock_guard<std::mutex> lock(workload_mutex_);
67     workload_thread_exiting_ = true;
68   }
69   workload_cv_.notify_one();
70   workload_thread_.join();
71 }
72 
Create()73 std::unique_ptr<StreamBufferCacheManager> StreamBufferCacheManager::Create() {
74   ATRACE_CALL();
75 
76   auto manager =
77       std::unique_ptr<StreamBufferCacheManager>(new StreamBufferCacheManager());
78   if (manager == nullptr) {
79     ALOGE("%s: Failed to create stream buffer cache manager.", __FUNCTION__);
80     return nullptr;
81   }
82 
83   manager->dummy_buffer_allocator_ = GrallocBufferAllocator::Create();
84   if (manager->dummy_buffer_allocator_ == nullptr) {
85     ALOGE("%s: Failed to create gralloc buffer allocator", __FUNCTION__);
86     return nullptr;
87   }
88 
89   ALOGI("%s: Created StreamBufferCacheManager.", __FUNCTION__);
90 
91   return manager;
92 }
93 
RegisterStream(const StreamBufferCacheRegInfo & reg_info)94 status_t StreamBufferCacheManager::RegisterStream(
95     const StreamBufferCacheRegInfo& reg_info) {
96   ATRACE_CALL();
97   if (reg_info.request_func == nullptr || reg_info.return_func == nullptr) {
98     ALOGE("%s: Can't register stream, request or return function is nullptr.",
99           __FUNCTION__);
100     return BAD_VALUE;
101   }
102 
103   if (reg_info.num_buffers_to_cache != 1) {
104     ALOGE("%s: Only support caching one buffer.", __FUNCTION__);
105     return BAD_VALUE;
106   }
107 
108   std::lock_guard<std::mutex> lock(caches_map_mutex_);
109   if (stream_buffer_caches_.find(reg_info.stream_id) !=
110       stream_buffer_caches_.end()) {
111     ALOGE("%s: Stream %d has been registered.", __FUNCTION__,
112           reg_info.stream_id);
113     return INVALID_OPERATION;
114   }
115 
116   status_t res = AddStreamBufferCacheLocked(reg_info);
117   if (res != OK) {
118     ALOGE("%s: Failed to add stream buffer cache.", __FUNCTION__);
119     return UNKNOWN_ERROR;
120   }
121   return OK;
122 }
123 
GetStreamBuffer(int32_t stream_id,StreamBufferRequestResult * res)124 status_t StreamBufferCacheManager::GetStreamBuffer(
125     int32_t stream_id, StreamBufferRequestResult* res) {
126   ATRACE_CALL();
127 
128   StreamBufferCache* stream_buffer_cache = nullptr;
129   status_t result = GetStreamBufferCache(stream_id, &stream_buffer_cache);
130   if (result != OK) {
131     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
132     return result;
133   }
134 
135   result = stream_buffer_cache->GetBuffer(res);
136   if (result != OK) {
137     ALOGE("%s: Get buffer for stream %d failed.", __FUNCTION__, stream_id);
138     return UNKNOWN_ERROR;
139   }
140 
141   {
142     int fence_status = 0;
143     if (res->buffer.acquire_fence != nullptr) {
144       native_handle_t* fence_handle =
145           const_cast<native_handle_t*>(res->buffer.acquire_fence);
146       if (fence_handle->numFds == 1) {
147         fence_status = sync_wait(fence_handle->data[0], kSyncWaitTimeMs);
148       }
149       if (0 != fence_status) {
150         ALOGE("%s: Fence check failed.", __FUNCTION__);
151       }
152       native_handle_close(fence_handle);
153       native_handle_delete(fence_handle);
154       res->buffer.acquire_fence = nullptr;
155     }
156   }
157 
158   NotifyThreadWorkload();
159   return OK;
160 }
161 
NotifyProviderReadiness(int32_t stream_id)162 status_t StreamBufferCacheManager::NotifyProviderReadiness(int32_t stream_id) {
163   StreamBufferCache* stream_buffer_cache = nullptr;
164   status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
165   if (res != OK) {
166     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
167     return res;
168   }
169 
170   stream_buffer_cache->SetManagerState(/*active=*/true);
171 
172   NotifyThreadWorkload();
173   return OK;
174 }
175 
NotifyFlushingAll()176 status_t StreamBufferCacheManager::NotifyFlushingAll() {
177   // Mark all StreamBufferCache as need to be flushed
178   std::vector<StreamBufferCache*> stream_buffer_caches;
179   {
180     std::lock_guard<std::mutex> map_lock(caches_map_mutex_);
181     for (auto& [stream_id, stream_buffer_cache] : stream_buffer_caches_) {
182       stream_buffer_caches.push_back(stream_buffer_cache.get());
183     }
184   }
185 
186   {
187     std::unique_lock<std::mutex> flush_lock(flush_mutex_);
188     for (auto& stream_buffer_cache : stream_buffer_caches) {
189       stream_buffer_cache->SetManagerState(/*active=*/false);
190     }
191   }
192 
193   NotifyThreadWorkload();
194   return OK;
195 }
196 
IsStreamActive(int32_t stream_id,bool * is_active)197 status_t StreamBufferCacheManager::IsStreamActive(int32_t stream_id,
198                                                   bool* is_active) {
199   StreamBufferCache* stream_buffer_cache = nullptr;
200   status_t res = GetStreamBufferCache(stream_id, &stream_buffer_cache);
201   if (res != OK) {
202     ALOGE("%s: Querying stream buffer cache failed.", __FUNCTION__);
203     return res;
204   }
205 
206   *is_active = !stream_buffer_cache->IsStreamDeactivated();
207   return OK;
208 }
209 
AddStreamBufferCacheLocked(const StreamBufferCacheRegInfo & reg_info)210 status_t StreamBufferCacheManager::AddStreamBufferCacheLocked(
211     const StreamBufferCacheRegInfo& reg_info) {
212   auto stream_buffer_cache = StreamBufferCacheManager::StreamBufferCache::Create(
213       reg_info, [this] { this->NotifyThreadWorkload(); },
214       dummy_buffer_allocator_.get());
215   if (stream_buffer_cache == nullptr) {
216     ALOGE("%s: Failed to create StreamBufferCache for stream %d", __FUNCTION__,
217           reg_info.stream_id);
218     return UNKNOWN_ERROR;
219   }
220 
221   stream_buffer_caches_[reg_info.stream_id] = std::move(stream_buffer_cache);
222   return OK;
223 }
224 
WorkloadThreadLoop()225 void StreamBufferCacheManager::WorkloadThreadLoop() {
226   if (property_get_bool(kRaiseBufAllocationPriority, false)) {
227     pid_t tid = gettid();
228     setpriority(PRIO_PROCESS, tid, -20);
229   }
230   // max thread name len = 16
231   pthread_setname_np(pthread_self(), "StreamBufMgr");
232   while (1) {
233     bool exiting = false;
234     {
235       std::unique_lock<std::mutex> thread_lock(workload_mutex_);
236       workload_cv_.wait(thread_lock, [this] {
237         return has_new_workload_ || workload_thread_exiting_;
238       });
239       has_new_workload_ = false;
240       exiting = workload_thread_exiting_;
241     }
242 
243     std::vector<StreamBufferCacheManager::StreamBufferCache*> stream_buffer_caches;
244     {
245       std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
246       for (auto& [stream_id, cache] : stream_buffer_caches_) {
247         stream_buffer_caches.push_back(cache.get());
248       }
249     }
250 
251     {
252       std::unique_lock<std::mutex> flush_lock(flush_mutex_);
253       for (auto& stream_buffer_cache : stream_buffer_caches) {
254         status_t res = stream_buffer_cache->UpdateCache(exiting);
255         if (res != OK) {
256           ALOGE("%s: Updating(flush/refill) cache failed.", __FUNCTION__);
257         }
258       }
259     }
260 
261     if (exiting) {
262       ALOGI("%s: Exiting stream buffer cache manager workload thread.",
263             __FUNCTION__);
264       return;
265     }
266   }
267 }
268 
NotifyThreadWorkload()269 void StreamBufferCacheManager::NotifyThreadWorkload() {
270   {
271     std::lock_guard<std::mutex> lock(workload_mutex_);
272     has_new_workload_ = true;
273   }
274   workload_cv_.notify_one();
275 }
276 
277 std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>
Create(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)278 StreamBufferCacheManager::StreamBufferCache::Create(
279     const StreamBufferCacheRegInfo& reg_info,
280     NotifyManagerThreadWorkloadFunc notify,
281     IHalBufferAllocator* dummy_buffer_allocator) {
282   if (notify == nullptr || dummy_buffer_allocator == nullptr) {
283     ALOGE("%s: notify is nullptr or dummy_buffer_allocator is nullptr.",
284           __FUNCTION__);
285     return nullptr;
286   }
287 
288   auto cache = std::unique_ptr<StreamBufferCacheManager::StreamBufferCache>(
289       new StreamBufferCacheManager::StreamBufferCache(reg_info, notify,
290                                                       dummy_buffer_allocator));
291   if (cache == nullptr) {
292     ALOGE("%s: Failed to create stream buffer cache.", __FUNCTION__);
293     return nullptr;
294   }
295 
296   return cache;
297 }
298 
StreamBufferCache(const StreamBufferCacheRegInfo & reg_info,NotifyManagerThreadWorkloadFunc notify,IHalBufferAllocator * dummy_buffer_allocator)299 StreamBufferCacheManager::StreamBufferCache::StreamBufferCache(
300     const StreamBufferCacheRegInfo& reg_info,
301     NotifyManagerThreadWorkloadFunc notify,
302     IHalBufferAllocator* dummy_buffer_allocator)
303     : cache_info_(reg_info) {
304   std::lock_guard<std::mutex> lock(cache_access_mutex_);
305   notify_for_workload_ = notify;
306   dummy_buffer_allocator_ = dummy_buffer_allocator;
307 }
308 
UpdateCache(bool forced_flushing)309 status_t StreamBufferCacheManager::StreamBufferCache::UpdateCache(
310     bool forced_flushing) {
311   status_t res = OK;
312   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
313   if (forced_flushing || !is_active_) {
314     res = FlushLocked(forced_flushing);
315     if (res != OK) {
316       ALOGE("%s: Failed to flush stream buffer cache for stream %d",
317             __FUNCTION__, cache_info_.stream_id);
318       return res;
319     }
320   } else if (RefillableLocked()) {
321     cache_lock.unlock();
322     res = Refill();
323     if (res != OK) {
324       ALOGE("%s: Failed to refill stream buffer cache for stream %d",
325             __FUNCTION__, cache_info_.stream_id);
326       return res;
327     }
328   }
329   return OK;
330 }
331 
GetBuffer(StreamBufferRequestResult * res)332 status_t StreamBufferCacheManager::StreamBufferCache::GetBuffer(
333     StreamBufferRequestResult* res) {
334   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
335 
336   // 0. the buffer cache must be active
337   if (!is_active_) {
338     ALOGW("%s: The buffer cache for stream %d is not active.", __FUNCTION__,
339           cache_info_.stream_id);
340     return INVALID_OPERATION;
341   }
342 
343   // 1. check if the cache is deactived
344   if (stream_deactived_) {
345     res->is_dummy_buffer = true;
346     res->buffer = dummy_buffer_;
347     return OK;
348   }
349 
350   // 2. check if there is any buffer available in the cache. If not, try
351   // to wait for a short period and check again. In case of timeout, use the
352   // dummy buffer instead.
353   if (cached_buffers_.empty()) {
354     // In case the GetStreamBufer is called after NotifyFlushingAll, this will
355     // be the first event that should trigger the dedicated thread to restart
356     // and refill the caches. An extra notification of thread workload is
357     // harmless and will be bypassed very quickly.
358     cache_lock.unlock();
359     notify_for_workload_();
360     cache_lock.lock();
361     // Need to check this again since the state may change after the lock is
362     // acquired for the second time.
363     if (cached_buffers_.empty()) {
364       // Wait for a certain amount of time for the cache to be refilled
365       if (cache_access_cv_.wait_for(cache_lock, kBufferWaitingTimeOutSec) ==
366           std::cv_status::timeout) {
367         ALOGW("%s: StreamBufferCache for stream %d waiting for refill timeout.",
368               __FUNCTION__, cache_info_.stream_id);
369       }
370     }
371   }
372 
373   // 3. use dummy buffer if the cache is still empty
374   if (cached_buffers_.empty()) {
375     // Only allocate dummy buffer for the first time
376     if (dummy_buffer_.buffer == nullptr) {
377       status_t result = AllocateDummyBufferLocked();
378       if (result != OK) {
379         ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
380         return UNKNOWN_ERROR;
381       }
382     }
383     res->is_dummy_buffer = true;
384     res->buffer = dummy_buffer_;
385     return OK;
386   } else {
387     res->is_dummy_buffer = false;
388     res->buffer = cached_buffers_.back();
389     cached_buffers_.pop_back();
390   }
391 
392   return OK;
393 }
394 
IsStreamDeactivated()395 bool StreamBufferCacheManager::StreamBufferCache::IsStreamDeactivated() {
396   std::unique_lock<std::mutex> lock(cache_access_mutex_);
397   return stream_deactived_;
398 }
399 
SetManagerState(bool active)400 void StreamBufferCacheManager::StreamBufferCache::SetManagerState(bool active) {
401   std::unique_lock<std::mutex> lock(cache_access_mutex_);
402   is_active_ = active;
403 }
404 
FlushLocked(bool forced_flushing)405 status_t StreamBufferCacheManager::StreamBufferCache::FlushLocked(
406     bool forced_flushing) {
407   if (is_active_ && !forced_flushing) {
408     ALOGI("%s: Active stream buffer cache is not notified for forced flushing.",
409           __FUNCTION__);
410     return INVALID_OPERATION;
411   }
412 
413   if (cache_info_.return_func == nullptr) {
414     ALOGE("%s: return_func is nullptr.", __FUNCTION__);
415     return UNKNOWN_ERROR;
416   }
417 
418   if (cached_buffers_.empty()) {
419     ALOGV("%s: Stream buffer cache is already empty.", __FUNCTION__);
420     ReleaseDummyBufferLocked();
421     return OK;
422   }
423 
424   status_t res = cache_info_.return_func(cached_buffers_);
425   if (res != OK) {
426     ALOGE("%s: Failed to return buffers.", __FUNCTION__);
427     return res;
428   }
429 
430   cached_buffers_.clear();
431   ReleaseDummyBufferLocked();
432 
433   return OK;
434 }
435 
Refill()436 status_t StreamBufferCacheManager::StreamBufferCache::Refill() {
437   int32_t num_buffers_to_acquire = 0;
438   {
439     std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
440     if (cache_info_.request_func == nullptr) {
441       ALOGE("%s: request_func is nullptr.", __FUNCTION__);
442       return UNKNOWN_ERROR;
443     }
444 
445     if (!is_active_) {
446       ALOGI("%s: Buffer cache is not active.", __FUNCTION__);
447       return UNKNOWN_ERROR;
448     }
449 
450     if (stream_deactived_) {
451       ALOGI("%s: Stream already deactived.", __FUNCTION__);
452       return OK;
453     }
454 
455     if (cached_buffers_.size() >= cache_info_.num_buffers_to_cache) {
456       ALOGV("%s: Stream buffer cache is already full.", __FUNCTION__);
457       return INVALID_OPERATION;
458     }
459 
460     num_buffers_to_acquire =
461         cache_info_.num_buffers_to_cache - cached_buffers_.size();
462   }
463 
464   // Requesting buffer from the provider can take long(e.g. even > 1sec),
465   // consumer should not be blocked by this procedure and can get dummy buffer
466   // to unblock other pipelines. Thus, cache_access_mutex_ doesn't need to be
467   // locked here.
468   std::vector<StreamBuffer> buffers;
469   StreamBufferRequestError req_status = StreamBufferRequestError::kOk;
470   status_t res =
471       cache_info_.request_func(num_buffers_to_acquire, &buffers, &req_status);
472 
473   std::unique_lock<std::mutex> cache_lock(cache_access_mutex_);
474   if (res != OK) {
475     status_t result = AllocateDummyBufferLocked();
476     if (result != OK) {
477       ALOGE("%s: Allocate dummy buffer failed.", __FUNCTION__);
478       return UNKNOWN_ERROR;
479     }
480   }
481 
482   if (buffers.empty() || res != OK) {
483     ALOGW("%s: Failed to acquire buffer for stream %d, error %d", __FUNCTION__,
484           cache_info_.stream_id, req_status);
485     switch (req_status) {
486       case StreamBufferRequestError::kNoBufferAvailable:
487       case StreamBufferRequestError::kMaxBufferExceeded:
488         ALOGI(
489             "%s: No buffer available or max buffer exceeded for stream %d. "
490             "Will retry for next request or when refilling other streams.",
491             __FUNCTION__, cache_info_.stream_id);
492         break;
493       case StreamBufferRequestError::kStreamDisconnected:
494       case StreamBufferRequestError::kUnknownError:
495         ALOGW(
496             "%s: Stream %d is disconnected or unknown error observed."
497             "This stream is marked as inactive.",
498             __FUNCTION__, cache_info_.stream_id);
499         ALOGI("%s: Stream %d begin to use dummy buffer.", __FUNCTION__,
500               cache_info_.stream_id);
501         stream_deactived_ = true;
502         break;
503       default:
504         ALOGE("%s: Unknown error code: %d", __FUNCTION__, req_status);
505         break;
506     }
507   } else {
508     for (auto& buffer : buffers) {
509       cached_buffers_.push_back(buffer);
510     }
511   }
512 
513   cache_access_cv_.notify_one();
514 
515   return OK;
516 }
517 
RefillableLocked() const518 bool StreamBufferCacheManager::StreamBufferCache::RefillableLocked() const {
519   // No need to refill if the buffer cache is not active
520   if (!is_active_) {
521     return false;
522   }
523 
524   // Need to refill if the cache is not full
525   return cached_buffers_.size() < cache_info_.num_buffers_to_cache;
526 }
527 
AllocateDummyBufferLocked()528 status_t StreamBufferCacheManager::StreamBufferCache::AllocateDummyBufferLocked() {
529   if (dummy_buffer_.buffer != nullptr) {
530     ALOGW("%s: Dummy buffer has already been allocated.", __FUNCTION__);
531     return OK;
532   }
533 
534   HalBufferDescriptor hal_buffer_descriptor{
535       .stream_id = cache_info_.stream_id,
536       .width = cache_info_.width,
537       .height = cache_info_.height,
538       .format = cache_info_.format,
539       .producer_flags = cache_info_.producer_flags,
540       .consumer_flags = cache_info_.consumer_flags,
541       .immediate_num_buffers = 1,
542       .max_num_buffers = 1,
543   };
544   std::vector<buffer_handle_t> buffers;
545 
546   status_t res =
547       dummy_buffer_allocator_->AllocateBuffers(hal_buffer_descriptor, &buffers);
548   if (res != OK) {
549     ALOGE("%s: Dummy buffer allocator AllocateBuffers failed.", __FUNCTION__);
550     return res;
551   }
552 
553   if (buffers.size() != hal_buffer_descriptor.immediate_num_buffers) {
554     ALOGE("%s: Not enough buffers allocated.", __FUNCTION__);
555     return NO_MEMORY;
556   }
557   dummy_buffer_.stream_id = cache_info_.stream_id;
558   dummy_buffer_.buffer = buffers[0];
559   ALOGI("%s: [sbc] Dummy buffer allocated: strm %d buffer %p", __FUNCTION__,
560         dummy_buffer_.stream_id, dummy_buffer_.buffer);
561 
562   return OK;
563 }
564 
ReleaseDummyBufferLocked()565 void StreamBufferCacheManager::StreamBufferCache::ReleaseDummyBufferLocked() {
566   // Release dummy buffer if ever acquired from the dummy_buffer_allocator_.
567   if (dummy_buffer_.buffer != nullptr) {
568     std::vector<buffer_handle_t> buffers(1, dummy_buffer_.buffer);
569     dummy_buffer_allocator_->FreeBuffers(&buffers);
570     dummy_buffer_.buffer = nullptr;
571   }
572 }
573 
GetStreamBufferCache(int32_t stream_id,StreamBufferCache ** stream_buffer_cache)574 status_t StreamBufferCacheManager::GetStreamBufferCache(
575     int32_t stream_id, StreamBufferCache** stream_buffer_cache) {
576   std::unique_lock<std::mutex> map_lock(caches_map_mutex_);
577   if (stream_buffer_caches_.find(stream_id) == stream_buffer_caches_.end()) {
578     ALOGE("%s: Sream %d can not be found.", __FUNCTION__, stream_id);
579     return BAD_VALUE;
580   }
581 
582   *stream_buffer_cache = stream_buffer_caches_[stream_id].get();
583   if (*stream_buffer_cache == nullptr) {
584     ALOGE("%s: Get null cache pointer.", __FUNCTION__);
585     return UNKNOWN_ERROR;
586   }
587   return OK;
588 }
589 
590 }  // namespace google_camera_hal
591 }  // namespace android
592