1 #include "include/private/dvr/buffer_hub_queue_client.h"
2 
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7 
8 #include <array>
9 
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <pdx/trace.h>
14 
15 #define RETRY_EINTR(fnc_call)                 \
16   ([&]() -> decltype(fnc_call) {              \
17     decltype(fnc_call) result;                \
18     do {                                      \
19       result = (fnc_call);                    \
20     } while (result == -1 && errno == EINTR); \
21     return result;                            \
22   })()
23 
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::LocalHandle;
27 using android::pdx::Status;
28 
29 namespace android {
30 namespace dvr {
31 
32 namespace {
33 
Unstuff(uint64_t value)34 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
35   return {static_cast<int32_t>(value >> 32),
36           static_cast<int32_t>(value & ((1ull << 32) - 1))};
37 }
38 
Stuff(int32_t a,int32_t b)39 uint64_t Stuff(int32_t a, int32_t b) {
40   const uint32_t ua = static_cast<uint32_t>(a);
41   const uint32_t ub = static_cast<uint32_t>(b);
42   return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
43 }
44 
45 }  // anonymous namespace
46 
BufferHubQueue(LocalChannelHandle channel_handle)47 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
48     : Client{pdx::default_transport::ClientChannel::Create(
49           std::move(channel_handle))} {
50   Initialize();
51 }
52 
BufferHubQueue(const std::string & endpoint_path)53 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
54     : Client{
55           pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
56   Initialize();
57 }
58 
Initialize()59 void BufferHubQueue::Initialize() {
60   int ret = epoll_fd_.Create();
61   if (ret < 0) {
62     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
63           strerror(-ret));
64     return;
65   }
66 
67   epoll_event event = {
68       .events = EPOLLIN | EPOLLET,
69       .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
70   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
71   if (ret < 0) {
72     ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
73           strerror(-ret));
74   }
75 }
76 
ImportQueue()77 Status<void> BufferHubQueue::ImportQueue() {
78   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
79   if (!status) {
80     ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
81           status.GetErrorMessage().c_str());
82     return ErrorStatus(status.error());
83   } else {
84     SetupQueue(status.get());
85     return {};
86   }
87 }
88 
SetupQueue(const QueueInfo & queue_info)89 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
90   is_async_ = queue_info.producer_config.is_async;
91   default_width_ = queue_info.producer_config.default_width;
92   default_height_ = queue_info.producer_config.default_height;
93   default_format_ = queue_info.producer_config.default_format;
94   user_metadata_size_ = queue_info.producer_config.user_metadata_size;
95   id_ = queue_info.id;
96 }
97 
CreateConsumerQueue()98 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
99   if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
100     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
101   else
102     return nullptr;
103 }
104 
CreateSilentConsumerQueue()105 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
106   if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
107     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
108   else
109     return nullptr;
110 }
111 
CreateConsumerQueueHandle(bool silent)112 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
113     bool silent) {
114   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
115   if (!status) {
116     ALOGE(
117         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
118         "%s",
119         status.GetErrorMessage().c_str());
120     return ErrorStatus(status.error());
121   }
122 
123   return status;
124 }
125 
126 pdx::Status<ConsumerQueueParcelable>
CreateConsumerQueueParcelable(bool silent)127 BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
128   auto status = CreateConsumerQueueHandle(silent);
129   if (!status)
130     return status.error_status();
131 
132   // A temporary consumer queue client to pull its channel parcelable.
133   auto consumer_queue =
134       std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
135   ConsumerQueueParcelable queue_parcelable(
136       consumer_queue->GetChannel()->TakeChannelParcelable());
137 
138   if (!queue_parcelable.IsValid()) {
139     ALOGE(
140         "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create "
141         "consumer queue parcelable.");
142     return ErrorStatus(EINVAL);
143   }
144 
145   return {std::move(queue_parcelable)};
146 }
147 
WaitForBuffers(int timeout)148 bool BufferHubQueue::WaitForBuffers(int timeout) {
149   ATRACE_NAME("BufferHubQueue::WaitForBuffers");
150   std::array<epoll_event, kMaxEvents> events;
151 
152   // Loop at least once to check for hangups.
153   do {
154     ALOGD_IF(
155         TRACE,
156         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
157         id(), count(), capacity());
158 
159     // If there is already a buffer then just check for hangup without waiting.
160     const int ret = epoll_fd_.Wait(events.data(), events.size(),
161                                    count() == 0 ? timeout : 0);
162 
163     if (ret == 0) {
164       ALOGI_IF(TRACE,
165                "BufferHubQueue::WaitForBuffers: No events before timeout: "
166                "queue_id=%d",
167                id());
168       return count() != 0;
169     }
170 
171     if (ret < 0 && ret != -EINTR) {
172       ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
173             strerror(-ret));
174       return false;
175     }
176 
177     const int num_events = ret;
178 
179     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
180     // one for each buffer in the queue, and one extra event for the queue
181     // client itself.
182     for (int i = 0; i < num_events; i++) {
183       int32_t event_fd;
184       int32_t index;
185       std::tie(event_fd, index) = Unstuff(events[i].data.u64);
186 
187       PDX_TRACE_FORMAT(
188           "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
189           "slot=%d|",
190           id(), num_events, i, event_fd, index);
191 
192       ALOGD_IF(TRACE,
193                "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
194                i, event_fd, index);
195 
196       if (is_buffer_event_index(index)) {
197         HandleBufferEvent(static_cast<size_t>(index), event_fd,
198                           events[i].events);
199       } else if (is_queue_event_index(index)) {
200         HandleQueueEvent(events[i].events);
201       } else {
202         ALOGW(
203             "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
204             "index=%d",
205             event_fd, index);
206       }
207     }
208   } while (count() == 0 && capacity() > 0 && !hung_up());
209 
210   return count() != 0;
211 }
212 
HandleBufferEvent(size_t slot,int event_fd,int poll_events)213 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
214                                                int poll_events) {
215   ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
216   if (!buffers_[slot]) {
217     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
218     return ErrorStatus(ENOENT);
219   }
220 
221   auto status = buffers_[slot]->GetEventMask(poll_events);
222   if (!status) {
223     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
224           status.GetErrorMessage().c_str());
225     return status.error_status();
226   }
227 
228   const int events = status.get();
229   PDX_TRACE_FORMAT(
230       "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
231       "events=%d|",
232       id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
233 
234   if (events & EPOLLIN) {
235     return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
236   } else if (events & EPOLLHUP) {
237     ALOGW(
238         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
239         "event_fd=%d buffer_id=%d",
240         slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
241     return RemoveBuffer(slot);
242   } else {
243     ALOGW(
244         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
245         "events=%d",
246         slot, events);
247   }
248 
249   return {};
250 }
251 
HandleQueueEvent(int poll_event)252 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
253   ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
254   auto status = GetEventMask(poll_event);
255   if (!status) {
256     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
257           status.GetErrorMessage().c_str());
258     return status.error_status();
259   }
260 
261   const int events = status.get();
262   if (events & EPOLLIN) {
263     // Note that after buffer imports, if |count()| still returns 0, epoll
264     // wait will be tried again to acquire the newly imported buffer.
265     auto buffer_status = OnBufferAllocated();
266     if (!buffer_status) {
267       ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
268             buffer_status.GetErrorMessage().c_str());
269     }
270   } else if (events & EPOLLHUP) {
271     ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
272     hung_up_ = true;
273   } else {
274     ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
275   }
276 
277   return {};
278 }
279 
AddBuffer(const std::shared_ptr<BufferHubBuffer> & buffer,size_t slot)280 Status<void> BufferHubQueue::AddBuffer(
281     const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
282   ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
283            buffer->id(), slot);
284 
285   if (is_full()) {
286     ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
287           capacity_);
288     return ErrorStatus(E2BIG);
289   }
290 
291   if (buffers_[slot]) {
292     // Replace the buffer if the slot is occupied. This could happen when the
293     // producer side replaced the slot with a newly allocated buffer. Remove the
294     // buffer before setting up with the new one.
295     auto remove_status = RemoveBuffer(slot);
296     if (!remove_status)
297       return remove_status.error_status();
298   }
299 
300   for (const auto& event_source : buffer->GetEventSources()) {
301     epoll_event event = {.events = event_source.event_mask | EPOLLET,
302                          .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
303     const int ret =
304         epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
305     if (ret < 0) {
306       ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
307             strerror(-ret));
308       return ErrorStatus(-ret);
309     }
310   }
311 
312   buffers_[slot] = buffer;
313   capacity_++;
314   return {};
315 }
316 
RemoveBuffer(size_t slot)317 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
318   ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
319 
320   if (buffers_[slot]) {
321     for (const auto& event_source : buffers_[slot]->GetEventSources()) {
322       const int ret =
323           epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
324       if (ret < 0) {
325         ALOGE(
326             "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
327             "set: %s",
328             strerror(-ret));
329         return ErrorStatus(-ret);
330       }
331     }
332 
333     // Trigger OnBufferRemoved callback if registered.
334     if (on_buffer_removed_)
335       on_buffer_removed_(buffers_[slot]);
336 
337     buffers_[slot] = nullptr;
338     capacity_--;
339   }
340 
341   return {};
342 }
343 
Enqueue(Entry entry)344 Status<void> BufferHubQueue::Enqueue(Entry entry) {
345   if (!is_full()) {
346     available_buffers_.push(std::move(entry));
347 
348     // Trigger OnBufferAvailable callback if registered.
349     if (on_buffer_available_)
350       on_buffer_available_();
351 
352     return {};
353   } else {
354     ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
355     return ErrorStatus(E2BIG);
356   }
357 }
358 
Dequeue(int timeout,size_t * slot)359 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
360                                                                  size_t* slot) {
361   ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
362            timeout);
363 
364   PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
365 
366   if (count() == 0) {
367     if (!WaitForBuffers(timeout))
368       return ErrorStatus(ETIMEDOUT);
369   }
370 
371   auto& entry = available_buffers_.top();
372   PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
373                    entry.slot);
374 
375   std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
376   *slot = entry.slot;
377 
378   available_buffers_.pop();
379 
380   return {std::move(buffer)};
381 }
382 
SetBufferAvailableCallback(BufferAvailableCallback callback)383 void BufferHubQueue::SetBufferAvailableCallback(
384     BufferAvailableCallback callback) {
385   on_buffer_available_ = callback;
386 }
387 
SetBufferRemovedCallback(BufferRemovedCallback callback)388 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
389   on_buffer_removed_ = callback;
390 }
391 
FreeAllBuffers()392 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
393   // Clear all available buffers.
394   while (!available_buffers_.empty())
395     available_buffers_.pop();
396 
397   pdx::Status<void> last_error;  // No error.
398   // Clear all buffers this producer queue is tracking.
399   for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
400     if (buffers_[slot] != nullptr) {
401       auto status = RemoveBuffer(slot);
402       if (!status) {
403         ALOGE(
404             "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
405             "slot=%zu.",
406             slot);
407         last_error = status.error_status();
408       }
409     }
410   }
411 
412   return last_error;
413 }
414 
ProducerQueue(LocalChannelHandle handle)415 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
416     : BASE(std::move(handle)) {
417   auto status = ImportQueue();
418   if (!status) {
419     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
420           status.GetErrorMessage().c_str());
421     Close(-status.error());
422   }
423 }
424 
ProducerQueue(const ProducerQueueConfig & config,const UsagePolicy & usage)425 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
426                              const UsagePolicy& usage)
427     : BASE(BufferHubRPC::kClientPath) {
428   auto status =
429       InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
430   if (!status) {
431     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
432           status.GetErrorMessage().c_str());
433     Close(-status.error());
434     return;
435   }
436 
437   SetupQueue(status.get());
438 }
439 
AllocateBuffers(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)440 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
441     uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
442     uint64_t usage, size_t buffer_count) {
443   if (capacity() + buffer_count > kMaxQueueCapacity) {
444     ALOGE(
445         "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
446         "allocate %zu more buffer(s).",
447         capacity(), buffer_count);
448     return ErrorStatus(E2BIG);
449   }
450 
451   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
452       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
453           width, height, layer_count, format, usage, buffer_count);
454   if (!status) {
455     ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
456           status.GetErrorMessage().c_str());
457     return status.error_status();
458   }
459 
460   auto buffer_handle_slots = status.take();
461   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
462                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
463                       "return %zu buffer handle(s), but returned %zu instead.",
464                       buffer_count, buffer_handle_slots.size());
465 
466   std::vector<size_t> buffer_slots;
467   buffer_slots.reserve(buffer_count);
468 
469   // Bookkeeping for each buffer.
470   for (auto& hs : buffer_handle_slots) {
471     auto& buffer_handle = hs.first;
472     size_t buffer_slot = hs.second;
473 
474     // Note that import might (though very unlikely) fail. If so, buffer_handle
475     // will be closed and included in returned buffer_slots.
476     if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
477                   buffer_slot)) {
478       ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
479                buffer_slot);
480       buffer_slots.push_back(buffer_slot);
481     }
482   }
483 
484   if (buffer_slots.size() == 0) {
485     // Error out if no buffer is allocated and improted.
486     ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated.");
487     ErrorStatus(ENOMEM);
488   }
489 
490   return {std::move(buffer_slots)};
491 }
492 
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)493 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
494                                              uint32_t layer_count,
495                                              uint32_t format, uint64_t usage) {
496   // We only allocate one buffer at a time.
497   constexpr size_t buffer_count = 1;
498   auto status =
499       AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
500   if (!status) {
501     ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
502           status.GetErrorMessage().c_str());
503     return status.error_status();
504   }
505 
506   if (status.get().size() == 0) {
507     ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated.");
508     ErrorStatus(ENOMEM);
509   }
510 
511   return {status.get()[0]};
512 }
513 
AddBuffer(const std::shared_ptr<BufferProducer> & buffer,size_t slot)514 Status<void> ProducerQueue::AddBuffer(
515     const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
516   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
517            id(), buffer->id(), slot);
518   // For producer buffer, we need to enqueue the newly added buffer
519   // immediately. Producer queue starts with all buffers in available state.
520   auto status = BufferHubQueue::AddBuffer(buffer, slot);
521   if (!status)
522     return status;
523 
524   return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
525 }
526 
RemoveBuffer(size_t slot)527 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
528   auto status =
529       InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
530   if (!status) {
531     ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
532           status.GetErrorMessage().c_str());
533     return status.error_status();
534   }
535 
536   return BufferHubQueue::RemoveBuffer(slot);
537 }
538 
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)539 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
540     int timeout, size_t* slot, LocalHandle* release_fence) {
541   DvrNativeBufferMetadata canonical_meta;
542   return Dequeue(timeout, slot, &canonical_meta, release_fence);
543 }
544 
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * release_fence)545 pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
546     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
547     pdx::LocalHandle* release_fence) {
548   ATRACE_NAME("ProducerQueue::Dequeue");
549   if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
550     ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
551     return ErrorStatus(EINVAL);
552   }
553 
554   auto status = BufferHubQueue::Dequeue(timeout, slot);
555   if (!status)
556     return status.error_status();
557 
558   auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
559   const int ret = buffer->GainAsync(out_meta, release_fence);
560   if (ret < 0 && ret != -EALREADY)
561     return ErrorStatus(-ret);
562 
563   return {std::move(buffer)};
564 }
565 
TakeAsParcelable()566 pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
567   if (capacity() != 0) {
568     ALOGE(
569         "ProducerQueue::TakeAsParcelable: producer queue can only be taken out"
570         " as a parcelable when empty. Current queue capacity: %zu",
571         capacity());
572     return ErrorStatus(EINVAL);
573   }
574 
575   std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
576   ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
577 
578   // Here the queue parcelable is returned and holds the underlying system
579   // resources backing the queue; while the original client channel of this
580   // producer queue is destroyed in place so that this client can no longer
581   // provide producer operations.
582   return {std::move(queue_parcelable)};
583 }
584 
585 /*static */
Import(LocalChannelHandle handle)586 std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
587     LocalChannelHandle handle) {
588   return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
589 }
590 
ConsumerQueue(LocalChannelHandle handle)591 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
592     : BufferHubQueue(std::move(handle)) {
593   auto status = ImportQueue();
594   if (!status) {
595     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
596           status.GetErrorMessage().c_str());
597     Close(-status.error());
598   }
599 
600   auto import_status = ImportBuffers();
601   if (import_status) {
602     ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
603           import_status.get());
604   } else {
605     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
606           import_status.GetErrorMessage().c_str());
607   }
608 }
609 
ImportBuffers()610 Status<size_t> ConsumerQueue::ImportBuffers() {
611   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
612   if (!status) {
613     if (status.error() == EBADR) {
614       ALOGI(
615           "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
616           "imported.");
617       return {0};
618     } else {
619       ALOGE(
620           "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
621           status.GetErrorMessage().c_str());
622       return status.error_status();
623     }
624   }
625 
626   int ret;
627   Status<void> last_error;
628   size_t imported_buffers_count = 0;
629 
630   auto buffer_handle_slots = status.take();
631   for (auto& buffer_handle_slot : buffer_handle_slots) {
632     ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
633              buffer_handle_slot.first.value());
634 
635     std::unique_ptr<BufferConsumer> buffer_consumer =
636         BufferConsumer::Import(std::move(buffer_handle_slot.first));
637     if (!buffer_consumer) {
638       ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
639             buffer_handle_slot.second);
640       last_error = ErrorStatus(EPIPE);
641       continue;
642     }
643 
644     auto add_status =
645         AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
646     if (!add_status) {
647       ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
648             add_status.GetErrorMessage().c_str());
649       last_error = add_status;
650     } else {
651       imported_buffers_count++;
652     }
653   }
654 
655   if (imported_buffers_count > 0)
656     return {imported_buffers_count};
657   else
658     return last_error.error_status();
659 }
660 
AddBuffer(const std::shared_ptr<BufferConsumer> & buffer,size_t slot)661 Status<void> ConsumerQueue::AddBuffer(
662     const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
663   ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
664            id(), buffer->id(), slot);
665   return BufferHubQueue::AddBuffer(buffer, slot);
666 }
667 
Dequeue(int timeout,size_t * slot,void * meta,size_t user_metadata_size,LocalHandle * acquire_fence)668 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
669     int timeout, size_t* slot, void* meta, size_t user_metadata_size,
670     LocalHandle* acquire_fence) {
671   if (user_metadata_size != user_metadata_size_) {
672     ALOGE(
673         "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
674         "does not match metadata size (%zu) for the queue.",
675         user_metadata_size, user_metadata_size_);
676     return ErrorStatus(EINVAL);
677   }
678 
679   DvrNativeBufferMetadata canonical_meta;
680   auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
681   if (!status)
682     return status.error_status();
683 
684   if (meta && user_metadata_size) {
685     void* metadata_src =
686         reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
687     if (metadata_src) {
688       memcpy(meta, metadata_src, user_metadata_size);
689     } else {
690       ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
691     }
692   }
693 
694   return status;
695 }
696 
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * acquire_fence)697 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
698     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
699     pdx::LocalHandle* acquire_fence) {
700   ATRACE_NAME("ConsumerQueue::Dequeue");
701   if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
702     ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
703     return ErrorStatus(EINVAL);
704   }
705 
706   auto status = BufferHubQueue::Dequeue(timeout, slot);
707   if (!status)
708     return status.error_status();
709 
710   auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
711   const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
712   if (ret < 0)
713     return ErrorStatus(-ret);
714 
715   return {std::move(buffer)};
716 }
717 
OnBufferAllocated()718 Status<void> ConsumerQueue::OnBufferAllocated() {
719   ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
720 
721   auto status = ImportBuffers();
722   if (!status) {
723     ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
724           status.GetErrorMessage().c_str());
725     return ErrorStatus(status.error());
726   } else if (status.get() == 0) {
727     ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
728     return ErrorStatus(ENOBUFS);
729   } else {
730     ALOGD_IF(TRACE,
731              "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
732              status.get());
733     return {};
734   }
735 }
736 
737 }  // namespace dvr
738 }  // namespace android
739