1 #include "producer_queue_channel.h"
2
3 #include <inttypes.h>
4
5 #include "consumer_queue_channel.h"
6 #include "producer_channel.h"
7
8 using android::pdx::ErrorStatus;
9 using android::pdx::Message;
10 using android::pdx::Status;
11 using android::pdx::RemoteChannelHandle;
12 using android::pdx::rpc::DispatchRemoteMethod;
13
14 namespace android {
15 namespace dvr {
16
ProducerQueueChannel(BufferHubService * service,int channel_id,size_t meta_size_bytes,const UsagePolicy & usage_policy,int * error)17 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
18 int channel_id,
19 size_t meta_size_bytes,
20 const UsagePolicy& usage_policy,
21 int* error)
22 : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
23 meta_size_bytes_(meta_size_bytes),
24 usage_policy_(usage_policy),
25 capacity_(0) {
26 *error = 0;
27 }
28
~ProducerQueueChannel()29 ProducerQueueChannel::~ProducerQueueChannel() {
30 ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
31 buffer_id());
32 for (auto* consumer : consumer_channels_)
33 consumer->OnProducerClosed();
34 }
35
36 /* static */
Create(BufferHubService * service,int channel_id,size_t meta_size_bytes,const UsagePolicy & usage_policy)37 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
38 BufferHubService* service, int channel_id, size_t meta_size_bytes,
39 const UsagePolicy& usage_policy) {
40 // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
41 // should be mutually exclusive.
42 if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
43 ALOGE(
44 "BufferHubService::OnCreateProducerQueue: illegal usage mask "
45 "configuration: usage_deny_set_mask=%" PRIx64
46 " usage_deny_clear_mask=%" PRIx64,
47 usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
48 return ErrorStatus(EINVAL);
49 }
50
51 int error = 0;
52 std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
53 service, channel_id, meta_size_bytes, usage_policy, &error));
54 if (error < 0)
55 return ErrorStatus(-error);
56 else
57 return {std::move(producer)};
58 }
59
HandleMessage(Message & message)60 bool ProducerQueueChannel::HandleMessage(Message& message) {
61 ATRACE_NAME("ProducerQueueChannel::HandleMessage");
62 switch (message.GetOp()) {
63 case BufferHubRPC::CreateConsumerQueue::Opcode:
64 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
65 *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
66 return true;
67
68 case BufferHubRPC::GetQueueInfo::Opcode:
69 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
70 *this, &ProducerQueueChannel::OnGetQueueInfo, message);
71 return true;
72
73 case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
74 DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
75 *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
76 message);
77 return true;
78
79 case BufferHubRPC::ProducerQueueDetachBuffer::Opcode:
80 DispatchRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(
81 *this, &ProducerQueueChannel::OnProducerQueueDetachBuffer, message);
82 return true;
83
84 default:
85 return false;
86 }
87 }
88
HandleImpulse(Message &)89 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
90 ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
91 }
92
GetBufferInfo() const93 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
94 return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
95 usage_policy_);
96 }
97
OnCreateConsumerQueue(Message & message)98 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
99 Message& message) {
100 ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
101 ALOGD_IF(TRACE, "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d",
102 channel_id());
103
104 int channel_id;
105 auto status = message.PushChannel(0, nullptr, &channel_id);
106 if (!status) {
107 ALOGE(
108 "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
109 "channel: %s",
110 status.GetErrorMessage().c_str());
111 return ErrorStatus(ENOMEM);
112 }
113
114 auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
115 service(), buffer_id(), channel_id, shared_from_this());
116
117 // Register the existing buffers with the new consumer queue.
118 for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
119 if (auto buffer = buffers_[slot].lock())
120 consumer_queue_channel->RegisterNewBuffer(buffer, slot);
121 }
122
123 const auto channel_status =
124 service()->SetChannel(channel_id, consumer_queue_channel);
125 if (!channel_status) {
126 ALOGE(
127 "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
128 "%s",
129 channel_status.GetErrorMessage().c_str());
130 return ErrorStatus(ENOMEM);
131 }
132
133 return {status.take()};
134 }
135
OnGetQueueInfo(Message &)136 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
137 return {{meta_size_bytes_, buffer_id()}};
138 }
139
140 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnProducerQueueAllocateBuffers(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)141 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
142 Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
143 uint32_t format, uint64_t usage, size_t buffer_count) {
144 ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
145 ALOGD_IF(TRACE,
146 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
147 "producer_channel_id=%d",
148 channel_id());
149
150 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
151
152 // Deny buffer allocation violating preset rules.
153 if (usage & usage_policy_.usage_deny_set_mask) {
154 ALOGE(
155 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
156 " is not permitted. Violating usage_deny_set_mask, the following bits "
157 "shall not be set: %" PRIx64 ".",
158 usage, usage_policy_.usage_deny_set_mask);
159 return ErrorStatus(EINVAL);
160 }
161
162 if (~usage & usage_policy_.usage_deny_clear_mask) {
163 ALOGE(
164 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
165 " is not permitted. Violating usage_deny_clear_mask, the following "
166 "bits must be set: %" PRIx64 ".",
167 usage, usage_policy_.usage_deny_clear_mask);
168 return ErrorStatus(EINVAL);
169 }
170
171 // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
172 // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
173 uint64_t effective_usage =
174 (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
175
176 for (size_t i = 0; i < buffer_count; i++) {
177 auto status = AllocateBuffer(message, width, height, layer_count, format,
178 effective_usage);
179 if (!status) {
180 ALOGE(
181 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
182 "allocate new buffer.");
183 return ErrorStatus(status.error());
184 }
185 buffer_handles.push_back(status.take());
186 }
187
188 return {std::move(buffer_handles)};
189 }
190
191 Status<std::pair<RemoteChannelHandle, size_t>>
AllocateBuffer(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)192 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
193 uint32_t height, uint32_t layer_count,
194 uint32_t format, uint64_t usage) {
195 ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
196 ALOGD_IF(TRACE,
197 "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
198 channel_id());
199
200 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
201 ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
202 return ErrorStatus(E2BIG);
203 }
204
205 // Here we are creating a new BufferHubBuffer, initialize the producer
206 // channel, and returning its file handle back to the client.
207 // buffer_id is the id of the producer channel of BufferHubBuffer.
208 int buffer_id;
209 auto status = message.PushChannel(0, nullptr, &buffer_id);
210
211 if (!status) {
212 ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
213 status.GetErrorMessage().c_str());
214 return ErrorStatus(status.error());
215 }
216
217 ALOGD_IF(TRACE,
218 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
219 "height=%u layer_count=%u format=%u usage=%" PRIx64,
220 buffer_id, width, height, layer_count, format, usage);
221 auto buffer_handle = status.take();
222
223 auto producer_channel_status =
224 ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
225 format, usage, meta_size_bytes_);
226 if (!producer_channel_status) {
227 ALOGE(
228 "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
229 "buffer: %s",
230 producer_channel_status.GetErrorMessage().c_str());
231 return ErrorStatus(ENOMEM);
232 }
233 auto producer_channel = producer_channel_status.take();
234
235 ALOGD_IF(
236 TRACE,
237 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
238 buffer_id, buffer_handle.value());
239
240 const auto channel_status =
241 service()->SetChannel(buffer_id, producer_channel);
242 if (!channel_status) {
243 ALOGE(
244 "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
245 "for new BufferHubBuffer: %s",
246 channel_status.GetErrorMessage().c_str());
247 return ErrorStatus(ENOMEM);
248 }
249
250 // Register the newly allocated buffer's channel_id into the first empty
251 // buffer slot.
252 size_t slot = 0;
253 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
254 if (buffers_[slot].expired())
255 break;
256 }
257 if (slot == BufferHubRPC::kMaxQueueCapacity) {
258 ALOGE(
259 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
260 "buffer allocation.");
261 return ErrorStatus(E2BIG);
262 }
263
264 buffers_[slot] = producer_channel;
265 capacity_++;
266
267 // Notify each consumer channel about the new buffer.
268 for (auto* consumer_channel : consumer_channels_) {
269 ALOGD(
270 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
271 "buffer, buffer_id=%d",
272 buffer_id);
273 consumer_channel->RegisterNewBuffer(producer_channel, slot);
274 }
275
276 return {{std::move(buffer_handle), slot}};
277 }
278
OnProducerQueueDetachBuffer(Message &,size_t slot)279 Status<void> ProducerQueueChannel::OnProducerQueueDetachBuffer(
280 Message& /*message*/, size_t slot) {
281 if (buffers_[slot].expired()) {
282 ALOGE(
283 "ProducerQueueChannel::OnProducerQueueDetachBuffer: trying to detach "
284 "an invalid buffer producer at slot %zu",
285 slot);
286 return ErrorStatus(EINVAL);
287 }
288
289 if (capacity_ == 0) {
290 ALOGE(
291 "ProducerQueueChannel::OnProducerQueueDetachBuffer: trying to detach a "
292 "buffer producer while the queue's capacity is already zero.");
293 return ErrorStatus(EINVAL);
294 }
295
296 buffers_[slot].reset();
297 capacity_--;
298 return {};
299 }
300
AddConsumer(ConsumerQueueChannel * channel)301 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
302 consumer_channels_.push_back(channel);
303 }
304
RemoveConsumer(ConsumerQueueChannel * channel)305 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
306 consumer_channels_.erase(
307 std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
308 }
309
310 } // namespace dvr
311 } // namespace android
312