1 #include "include/private/dvr/buffer_hub_queue_client.h"
2
3 #include <inttypes.h>
4 #include <log/log.h>
5 #include <poll.h>
6 #include <sys/epoll.h>
7
8 #include <array>
9
10 #include <pdx/default_transport/client_channel.h>
11 #include <pdx/default_transport/client_channel_factory.h>
12 #include <pdx/file_handle.h>
13 #include <pdx/trace.h>
14
15 #define RETRY_EINTR(fnc_call) \
16 ([&]() -> decltype(fnc_call) { \
17 decltype(fnc_call) result; \
18 do { \
19 result = (fnc_call); \
20 } while (result == -1 && errno == EINTR); \
21 return result; \
22 })()
23
24 using android::pdx::ErrorStatus;
25 using android::pdx::LocalChannelHandle;
26 using android::pdx::LocalHandle;
27 using android::pdx::Status;
28
29 namespace android {
30 namespace dvr {
31
32 namespace {
33
Unstuff(uint64_t value)34 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
35 return {static_cast<int32_t>(value >> 32),
36 static_cast<int32_t>(value & ((1ull << 32) - 1))};
37 }
38
Stuff(int32_t a,int32_t b)39 uint64_t Stuff(int32_t a, int32_t b) {
40 const uint32_t ua = static_cast<uint32_t>(a);
41 const uint32_t ub = static_cast<uint32_t>(b);
42 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
43 }
44
45 } // anonymous namespace
46
BufferHubQueue(LocalChannelHandle channel_handle)47 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
48 : Client{pdx::default_transport::ClientChannel::Create(
49 std::move(channel_handle))} {
50 Initialize();
51 }
52
BufferHubQueue(const std::string & endpoint_path)53 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
54 : Client{
55 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
56 Initialize();
57 }
58
Initialize()59 void BufferHubQueue::Initialize() {
60 int ret = epoll_fd_.Create();
61 if (ret < 0) {
62 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
63 strerror(-ret));
64 return;
65 }
66
67 epoll_event event = {
68 .events = EPOLLIN | EPOLLET,
69 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
70 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
71 if (ret < 0) {
72 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
73 strerror(-ret));
74 }
75 }
76
ImportQueue()77 Status<void> BufferHubQueue::ImportQueue() {
78 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
79 if (!status) {
80 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
81 status.GetErrorMessage().c_str());
82 return ErrorStatus(status.error());
83 } else {
84 SetupQueue(status.get());
85 return {};
86 }
87 }
88
SetupQueue(const QueueInfo & queue_info)89 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
90 is_async_ = queue_info.producer_config.is_async;
91 default_width_ = queue_info.producer_config.default_width;
92 default_height_ = queue_info.producer_config.default_height;
93 default_format_ = queue_info.producer_config.default_format;
94 user_metadata_size_ = queue_info.producer_config.user_metadata_size;
95 id_ = queue_info.id;
96 }
97
CreateConsumerQueue()98 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
99 if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
100 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
101 else
102 return nullptr;
103 }
104
CreateSilentConsumerQueue()105 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
106 if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
107 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
108 else
109 return nullptr;
110 }
111
CreateConsumerQueueHandle(bool silent)112 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
113 bool silent) {
114 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
115 if (!status) {
116 ALOGE(
117 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
118 "%s",
119 status.GetErrorMessage().c_str());
120 return ErrorStatus(status.error());
121 }
122
123 return status;
124 }
125
126 pdx::Status<ConsumerQueueParcelable>
CreateConsumerQueueParcelable(bool silent)127 BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
128 auto status = CreateConsumerQueueHandle(silent);
129 if (!status)
130 return status.error_status();
131
132 // A temporary consumer queue client to pull its channel parcelable.
133 auto consumer_queue =
134 std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
135 ConsumerQueueParcelable queue_parcelable(
136 consumer_queue->GetChannel()->TakeChannelParcelable());
137
138 if (!queue_parcelable.IsValid()) {
139 ALOGE(
140 "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create "
141 "consumer queue parcelable.");
142 return ErrorStatus(EINVAL);
143 }
144
145 return {std::move(queue_parcelable)};
146 }
147
WaitForBuffers(int timeout)148 bool BufferHubQueue::WaitForBuffers(int timeout) {
149 ATRACE_NAME("BufferHubQueue::WaitForBuffers");
150 std::array<epoll_event, kMaxEvents> events;
151
152 // Loop at least once to check for hangups.
153 do {
154 ALOGD_IF(
155 TRACE,
156 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
157 id(), count(), capacity());
158
159 // If there is already a buffer then just check for hangup without waiting.
160 const int ret = epoll_fd_.Wait(events.data(), events.size(),
161 count() == 0 ? timeout : 0);
162
163 if (ret == 0) {
164 ALOGI_IF(TRACE,
165 "BufferHubQueue::WaitForBuffers: No events before timeout: "
166 "queue_id=%d",
167 id());
168 return count() != 0;
169 }
170
171 if (ret < 0 && ret != -EINTR) {
172 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
173 strerror(-ret));
174 return false;
175 }
176
177 const int num_events = ret;
178
179 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
180 // one for each buffer in the queue, and one extra event for the queue
181 // client itself.
182 for (int i = 0; i < num_events; i++) {
183 int32_t event_fd;
184 int32_t index;
185 std::tie(event_fd, index) = Unstuff(events[i].data.u64);
186
187 PDX_TRACE_FORMAT(
188 "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
189 "slot=%d|",
190 id(), num_events, i, event_fd, index);
191
192 ALOGD_IF(TRACE,
193 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
194 i, event_fd, index);
195
196 if (is_buffer_event_index(index)) {
197 HandleBufferEvent(static_cast<size_t>(index), event_fd,
198 events[i].events);
199 } else if (is_queue_event_index(index)) {
200 HandleQueueEvent(events[i].events);
201 } else {
202 ALOGW(
203 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
204 "index=%d",
205 event_fd, index);
206 }
207 }
208 } while (count() == 0 && capacity() > 0 && !hung_up());
209
210 return count() != 0;
211 }
212
HandleBufferEvent(size_t slot,int event_fd,int poll_events)213 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
214 int poll_events) {
215 ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
216 if (!buffers_[slot]) {
217 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
218 return ErrorStatus(ENOENT);
219 }
220
221 auto status = buffers_[slot]->GetEventMask(poll_events);
222 if (!status) {
223 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
224 status.GetErrorMessage().c_str());
225 return status.error_status();
226 }
227
228 const int events = status.get();
229 PDX_TRACE_FORMAT(
230 "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
231 "events=%d|",
232 id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
233
234 if (events & EPOLLIN) {
235 return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
236 } else if (events & EPOLLHUP) {
237 ALOGW(
238 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
239 "event_fd=%d buffer_id=%d",
240 slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
241 return RemoveBuffer(slot);
242 } else {
243 ALOGW(
244 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
245 "events=%d",
246 slot, events);
247 }
248
249 return {};
250 }
251
HandleQueueEvent(int poll_event)252 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
253 ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
254 auto status = GetEventMask(poll_event);
255 if (!status) {
256 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
257 status.GetErrorMessage().c_str());
258 return status.error_status();
259 }
260
261 const int events = status.get();
262 if (events & EPOLLIN) {
263 // Note that after buffer imports, if |count()| still returns 0, epoll
264 // wait will be tried again to acquire the newly imported buffer.
265 auto buffer_status = OnBufferAllocated();
266 if (!buffer_status) {
267 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
268 buffer_status.GetErrorMessage().c_str());
269 }
270 } else if (events & EPOLLHUP) {
271 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
272 hung_up_ = true;
273 } else {
274 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
275 }
276
277 return {};
278 }
279
AddBuffer(const std::shared_ptr<BufferHubBuffer> & buffer,size_t slot)280 Status<void> BufferHubQueue::AddBuffer(
281 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
282 ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
283 buffer->id(), slot);
284
285 if (is_full()) {
286 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
287 capacity_);
288 return ErrorStatus(E2BIG);
289 }
290
291 if (buffers_[slot]) {
292 // Replace the buffer if the slot is occupied. This could happen when the
293 // producer side replaced the slot with a newly allocated buffer. Remove the
294 // buffer before setting up with the new one.
295 auto remove_status = RemoveBuffer(slot);
296 if (!remove_status)
297 return remove_status.error_status();
298 }
299
300 for (const auto& event_source : buffer->GetEventSources()) {
301 epoll_event event = {.events = event_source.event_mask | EPOLLET,
302 .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
303 const int ret =
304 epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
305 if (ret < 0) {
306 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
307 strerror(-ret));
308 return ErrorStatus(-ret);
309 }
310 }
311
312 buffers_[slot] = buffer;
313 capacity_++;
314 return {};
315 }
316
RemoveBuffer(size_t slot)317 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
318 ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
319
320 if (buffers_[slot]) {
321 for (const auto& event_source : buffers_[slot]->GetEventSources()) {
322 const int ret =
323 epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
324 if (ret < 0) {
325 ALOGE(
326 "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
327 "set: %s",
328 strerror(-ret));
329 return ErrorStatus(-ret);
330 }
331 }
332
333 // Trigger OnBufferRemoved callback if registered.
334 if (on_buffer_removed_)
335 on_buffer_removed_(buffers_[slot]);
336
337 buffers_[slot] = nullptr;
338 capacity_--;
339 }
340
341 return {};
342 }
343
Enqueue(Entry entry)344 Status<void> BufferHubQueue::Enqueue(Entry entry) {
345 if (!is_full()) {
346 available_buffers_.push(std::move(entry));
347
348 // Trigger OnBufferAvailable callback if registered.
349 if (on_buffer_available_)
350 on_buffer_available_();
351
352 return {};
353 } else {
354 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
355 return ErrorStatus(E2BIG);
356 }
357 }
358
Dequeue(int timeout,size_t * slot)359 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
360 size_t* slot) {
361 ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
362 timeout);
363
364 PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
365
366 if (count() == 0) {
367 if (!WaitForBuffers(timeout))
368 return ErrorStatus(ETIMEDOUT);
369 }
370
371 auto& entry = available_buffers_.top();
372 PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
373 entry.slot);
374
375 std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
376 *slot = entry.slot;
377
378 available_buffers_.pop();
379
380 return {std::move(buffer)};
381 }
382
SetBufferAvailableCallback(BufferAvailableCallback callback)383 void BufferHubQueue::SetBufferAvailableCallback(
384 BufferAvailableCallback callback) {
385 on_buffer_available_ = callback;
386 }
387
SetBufferRemovedCallback(BufferRemovedCallback callback)388 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
389 on_buffer_removed_ = callback;
390 }
391
FreeAllBuffers()392 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
393 // Clear all available buffers.
394 while (!available_buffers_.empty())
395 available_buffers_.pop();
396
397 pdx::Status<void> last_error; // No error.
398 // Clear all buffers this producer queue is tracking.
399 for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
400 if (buffers_[slot] != nullptr) {
401 auto status = RemoveBuffer(slot);
402 if (!status) {
403 ALOGE(
404 "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
405 "slot=%zu.",
406 slot);
407 last_error = status.error_status();
408 }
409 }
410 }
411
412 return last_error;
413 }
414
ProducerQueue(LocalChannelHandle handle)415 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
416 : BASE(std::move(handle)) {
417 auto status = ImportQueue();
418 if (!status) {
419 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
420 status.GetErrorMessage().c_str());
421 Close(-status.error());
422 }
423 }
424
ProducerQueue(const ProducerQueueConfig & config,const UsagePolicy & usage)425 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
426 const UsagePolicy& usage)
427 : BASE(BufferHubRPC::kClientPath) {
428 auto status =
429 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
430 if (!status) {
431 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
432 status.GetErrorMessage().c_str());
433 Close(-status.error());
434 return;
435 }
436
437 SetupQueue(status.get());
438 }
439
AllocateBuffers(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)440 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
441 uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
442 uint64_t usage, size_t buffer_count) {
443 if (capacity() + buffer_count > kMaxQueueCapacity) {
444 ALOGE(
445 "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
446 "allocate %zu more buffer(s).",
447 capacity(), buffer_count);
448 return ErrorStatus(E2BIG);
449 }
450
451 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
452 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
453 width, height, layer_count, format, usage, buffer_count);
454 if (!status) {
455 ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
456 status.GetErrorMessage().c_str());
457 return status.error_status();
458 }
459
460 auto buffer_handle_slots = status.take();
461 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
462 "BufferHubRPC::ProducerQueueAllocateBuffers should "
463 "return %zu buffer handle(s), but returned %zu instead.",
464 buffer_count, buffer_handle_slots.size());
465
466 std::vector<size_t> buffer_slots;
467 buffer_slots.reserve(buffer_count);
468
469 // Bookkeeping for each buffer.
470 for (auto& hs : buffer_handle_slots) {
471 auto& buffer_handle = hs.first;
472 size_t buffer_slot = hs.second;
473
474 // Note that import might (though very unlikely) fail. If so, buffer_handle
475 // will be closed and included in returned buffer_slots.
476 if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
477 buffer_slot)) {
478 ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
479 buffer_slot);
480 buffer_slots.push_back(buffer_slot);
481 }
482 }
483
484 if (buffer_slots.size() == 0) {
485 // Error out if no buffer is allocated and improted.
486 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated.");
487 ErrorStatus(ENOMEM);
488 }
489
490 return {std::move(buffer_slots)};
491 }
492
AllocateBuffer(uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)493 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
494 uint32_t layer_count,
495 uint32_t format, uint64_t usage) {
496 // We only allocate one buffer at a time.
497 constexpr size_t buffer_count = 1;
498 auto status =
499 AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
500 if (!status) {
501 ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
502 status.GetErrorMessage().c_str());
503 return status.error_status();
504 }
505
506 if (status.get().size() == 0) {
507 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated.");
508 ErrorStatus(ENOMEM);
509 }
510
511 return {status.get()[0]};
512 }
513
AddBuffer(const std::shared_ptr<BufferProducer> & buffer,size_t slot)514 Status<void> ProducerQueue::AddBuffer(
515 const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
516 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
517 id(), buffer->id(), slot);
518 // For producer buffer, we need to enqueue the newly added buffer
519 // immediately. Producer queue starts with all buffers in available state.
520 auto status = BufferHubQueue::AddBuffer(buffer, slot);
521 if (!status)
522 return status;
523
524 return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
525 }
526
RemoveBuffer(size_t slot)527 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
528 auto status =
529 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
530 if (!status) {
531 ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
532 status.GetErrorMessage().c_str());
533 return status.error_status();
534 }
535
536 return BufferHubQueue::RemoveBuffer(slot);
537 }
538
Dequeue(int timeout,size_t * slot,LocalHandle * release_fence)539 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
540 int timeout, size_t* slot, LocalHandle* release_fence) {
541 DvrNativeBufferMetadata canonical_meta;
542 return Dequeue(timeout, slot, &canonical_meta, release_fence);
543 }
544
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * release_fence)545 pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
546 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
547 pdx::LocalHandle* release_fence) {
548 ATRACE_NAME("ProducerQueue::Dequeue");
549 if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
550 ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
551 return ErrorStatus(EINVAL);
552 }
553
554 auto status = BufferHubQueue::Dequeue(timeout, slot);
555 if (!status)
556 return status.error_status();
557
558 auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
559 const int ret = buffer->GainAsync(out_meta, release_fence);
560 if (ret < 0 && ret != -EALREADY)
561 return ErrorStatus(-ret);
562
563 return {std::move(buffer)};
564 }
565
TakeAsParcelable()566 pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
567 if (capacity() != 0) {
568 ALOGE(
569 "ProducerQueue::TakeAsParcelable: producer queue can only be taken out"
570 " as a parcelable when empty. Current queue capacity: %zu",
571 capacity());
572 return ErrorStatus(EINVAL);
573 }
574
575 std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
576 ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
577
578 // Here the queue parcelable is returned and holds the underlying system
579 // resources backing the queue; while the original client channel of this
580 // producer queue is destroyed in place so that this client can no longer
581 // provide producer operations.
582 return {std::move(queue_parcelable)};
583 }
584
585 /*static */
Import(LocalChannelHandle handle)586 std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
587 LocalChannelHandle handle) {
588 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
589 }
590
ConsumerQueue(LocalChannelHandle handle)591 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
592 : BufferHubQueue(std::move(handle)) {
593 auto status = ImportQueue();
594 if (!status) {
595 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
596 status.GetErrorMessage().c_str());
597 Close(-status.error());
598 }
599
600 auto import_status = ImportBuffers();
601 if (import_status) {
602 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
603 import_status.get());
604 } else {
605 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
606 import_status.GetErrorMessage().c_str());
607 }
608 }
609
ImportBuffers()610 Status<size_t> ConsumerQueue::ImportBuffers() {
611 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
612 if (!status) {
613 if (status.error() == EBADR) {
614 ALOGI(
615 "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
616 "imported.");
617 return {0};
618 } else {
619 ALOGE(
620 "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
621 status.GetErrorMessage().c_str());
622 return status.error_status();
623 }
624 }
625
626 int ret;
627 Status<void> last_error;
628 size_t imported_buffers_count = 0;
629
630 auto buffer_handle_slots = status.take();
631 for (auto& buffer_handle_slot : buffer_handle_slots) {
632 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
633 buffer_handle_slot.first.value());
634
635 std::unique_ptr<BufferConsumer> buffer_consumer =
636 BufferConsumer::Import(std::move(buffer_handle_slot.first));
637 if (!buffer_consumer) {
638 ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
639 buffer_handle_slot.second);
640 last_error = ErrorStatus(EPIPE);
641 continue;
642 }
643
644 auto add_status =
645 AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
646 if (!add_status) {
647 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
648 add_status.GetErrorMessage().c_str());
649 last_error = add_status;
650 } else {
651 imported_buffers_count++;
652 }
653 }
654
655 if (imported_buffers_count > 0)
656 return {imported_buffers_count};
657 else
658 return last_error.error_status();
659 }
660
AddBuffer(const std::shared_ptr<BufferConsumer> & buffer,size_t slot)661 Status<void> ConsumerQueue::AddBuffer(
662 const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
663 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
664 id(), buffer->id(), slot);
665 return BufferHubQueue::AddBuffer(buffer, slot);
666 }
667
Dequeue(int timeout,size_t * slot,void * meta,size_t user_metadata_size,LocalHandle * acquire_fence)668 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
669 int timeout, size_t* slot, void* meta, size_t user_metadata_size,
670 LocalHandle* acquire_fence) {
671 if (user_metadata_size != user_metadata_size_) {
672 ALOGE(
673 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
674 "does not match metadata size (%zu) for the queue.",
675 user_metadata_size, user_metadata_size_);
676 return ErrorStatus(EINVAL);
677 }
678
679 DvrNativeBufferMetadata canonical_meta;
680 auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
681 if (!status)
682 return status.error_status();
683
684 if (meta && user_metadata_size) {
685 void* metadata_src =
686 reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
687 if (metadata_src) {
688 memcpy(meta, metadata_src, user_metadata_size);
689 } else {
690 ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
691 }
692 }
693
694 return status;
695 }
696
Dequeue(int timeout,size_t * slot,DvrNativeBufferMetadata * out_meta,pdx::LocalHandle * acquire_fence)697 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
698 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
699 pdx::LocalHandle* acquire_fence) {
700 ATRACE_NAME("ConsumerQueue::Dequeue");
701 if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
702 ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
703 return ErrorStatus(EINVAL);
704 }
705
706 auto status = BufferHubQueue::Dequeue(timeout, slot);
707 if (!status)
708 return status.error_status();
709
710 auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
711 const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
712 if (ret < 0)
713 return ErrorStatus(-ret);
714
715 return {std::move(buffer)};
716 }
717
OnBufferAllocated()718 Status<void> ConsumerQueue::OnBufferAllocated() {
719 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
720
721 auto status = ImportBuffers();
722 if (!status) {
723 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
724 status.GetErrorMessage().c_str());
725 return ErrorStatus(status.error());
726 } else if (status.get() == 0) {
727 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
728 return ErrorStatus(ENOBUFS);
729 } else {
730 ALOGD_IF(TRACE,
731 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
732 status.get());
733 return {};
734 }
735 }
736
737 } // namespace dvr
738 } // namespace android
739