1 #include "consumer_queue_channel.h"
2
3 #include <pdx/channel_handle.h>
4
5 #include "producer_channel.h"
6
7 using android::pdx::ErrorStatus;
8 using android::pdx::RemoteChannelHandle;
9 using android::pdx::Status;
10 using android::pdx::rpc::DispatchRemoteMethod;
11 using android::pdx::rpc::RemoteMethodError;
12
13 namespace android {
14 namespace dvr {
15
ConsumerQueueChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> & producer,bool silent)16 ConsumerQueueChannel::ConsumerQueueChannel(
17 BufferHubService* service, int buffer_id, int channel_id,
18 const std::shared_ptr<Channel>& producer, bool silent)
19 : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
20 producer_(producer),
21 capacity_(0),
22 silent_(silent) {
23 GetProducer()->AddConsumer(this);
24 }
25
~ConsumerQueueChannel()26 ConsumerQueueChannel::~ConsumerQueueChannel() {
27 ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
28 channel_id());
29
30 if (auto producer = GetProducer()) {
31 producer->RemoveConsumer(this);
32 }
33 }
34
HandleMessage(Message & message)35 bool ConsumerQueueChannel::HandleMessage(Message& message) {
36 ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
37 auto producer = GetProducer();
38 if (!producer) {
39 RemoteMethodError(message, EPIPE);
40 return true;
41 }
42
43 switch (message.GetOp()) {
44 case BufferHubRPC::CreateConsumerQueue::Opcode:
45 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
46 *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
47 return true;
48
49 case BufferHubRPC::GetQueueInfo::Opcode:
50 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
51 *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
52 return true;
53
54 case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
55 DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
56 *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
57 return true;
58
59 default:
60 return false;
61 }
62 }
63
GetProducer() const64 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
65 const {
66 return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
67 }
68
HandleImpulse(Message &)69 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
70 ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
71 }
72
GetBufferInfo() const73 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
74 BufferHubChannel::BufferInfo info;
75 if (auto producer = GetProducer()) {
76 // If producer has not hung up, copy most buffer info from the producer.
77 info = producer->GetBufferInfo();
78 }
79 info.id = buffer_id();
80 info.capacity = capacity_;
81 return info;
82 }
83
RegisterNewBuffer(const std::shared_ptr<ProducerChannel> & producer_channel,size_t slot)84 void ConsumerQueueChannel::RegisterNewBuffer(
85 const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
86 ALOGD_IF(TRACE,
87 "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
88 "slot=%zu silent=%d",
89 buffer_id(), producer_channel->buffer_id(), slot, silent_);
90 // Only register buffers if the queue is not silent.
91 if (!silent_) {
92 pending_buffer_slots_.emplace(producer_channel, slot);
93
94 // Signal the client that there is new buffer available.
95 SignalAvailable();
96 }
97 }
98
99 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message & message)100 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
101 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
102 ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
103 ALOGD_IF(TRACE,
104 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
105 "pending_buffer_slots=%zu",
106 pending_buffer_slots_.size());
107
108 // Indicate this is a silent queue that will not import buffers.
109 if (silent_)
110 return ErrorStatus(EBADR);
111
112 while (!pending_buffer_slots_.empty()) {
113 auto producer_channel = pending_buffer_slots_.front().first.lock();
114 size_t producer_slot = pending_buffer_slots_.front().second;
115 pending_buffer_slots_.pop();
116
117 // It's possible that the producer channel has expired. When this occurs,
118 // ignore the producer channel.
119 if (producer_channel == nullptr) {
120 ALOGW(
121 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
122 "channel has already been expired.");
123 continue;
124 }
125
126 auto status = producer_channel->CreateConsumer(message);
127
128 // If no buffers are imported successfully, clear available and return an
129 // error. Otherwise, return all consumer handles already imported
130 // successfully, but keep available bits on, so that the client can retry
131 // importing remaining consumer buffers.
132 if (!status) {
133 ALOGE(
134 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
135 "consumer: %s",
136 status.GetErrorMessage().c_str());
137 if (buffer_handles.empty()) {
138 ClearAvailable();
139 return status.error_status();
140 } else {
141 return {std::move(buffer_handles)};
142 }
143 }
144
145 buffer_handles.emplace_back(status.take(), producer_slot);
146 }
147
148 ClearAvailable();
149 return {std::move(buffer_handles)};
150 }
151
OnProducerClosed()152 void ConsumerQueueChannel::OnProducerClosed() {
153 ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
154 buffer_id());
155 producer_.reset();
156 Hangup();
157 }
158
159 } // namespace dvr
160 } // namespace android
161