1 #include "producer_queue_channel.h"
2 
3 #include <inttypes.h>
4 
5 #include "consumer_queue_channel.h"
6 #include "producer_channel.h"
7 
8 using android::pdx::ErrorStatus;
9 using android::pdx::Message;
10 using android::pdx::RemoteChannelHandle;
11 using android::pdx::Status;
12 using android::pdx::rpc::DispatchRemoteMethod;
13 
14 namespace android {
15 namespace dvr {
16 
ProducerQueueChannel(BufferHubService * service,int channel_id,const ProducerQueueConfig & config,const UsagePolicy & usage_policy,int * error)17 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
18                                            int channel_id,
19                                            const ProducerQueueConfig& config,
20                                            const UsagePolicy& usage_policy,
21                                            int* error)
22     : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
23       config_(config),
24       usage_policy_(usage_policy),
25       capacity_(0) {
26   *error = 0;
27 }
28 
~ProducerQueueChannel()29 ProducerQueueChannel::~ProducerQueueChannel() {
30   ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
31            buffer_id());
32   for (auto* consumer : consumer_channels_)
33     consumer->OnProducerClosed();
34 }
35 
36 /* static */
Create(BufferHubService * service,int channel_id,const ProducerQueueConfig & config,const UsagePolicy & usage_policy)37 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
38     BufferHubService* service, int channel_id,
39     const ProducerQueueConfig& config, const UsagePolicy& usage_policy) {
40   // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
41   // should be mutually exclusive.
42   if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
43     ALOGE(
44         "BufferHubService::OnCreateProducerQueue: illegal usage mask "
45         "configuration: usage_deny_set_mask=%" PRIx64
46         " usage_deny_clear_mask=%" PRIx64,
47         usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
48     return ErrorStatus(EINVAL);
49   }
50 
51   int error = 0;
52   std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
53       service, channel_id, config, usage_policy, &error));
54   if (error < 0)
55     return ErrorStatus(-error);
56   else
57     return {std::move(producer)};
58 }
59 
HandleMessage(Message & message)60 bool ProducerQueueChannel::HandleMessage(Message& message) {
61   ATRACE_NAME("ProducerQueueChannel::HandleMessage");
62   switch (message.GetOp()) {
63     case BufferHubRPC::CreateConsumerQueue::Opcode:
64       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
65           *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
66       return true;
67 
68     case BufferHubRPC::GetQueueInfo::Opcode:
69       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
70           *this, &ProducerQueueChannel::OnGetQueueInfo, message);
71       return true;
72 
73     case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
74       DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
75           *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
76           message);
77       return true;
78 
79     case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode:
80       DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(
81           *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message);
82       return true;
83 
84     default:
85       return false;
86   }
87 }
88 
HandleImpulse(Message &)89 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
90   ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
91 }
92 
GetBufferInfo() const93 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
94   return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
95                     usage_policy_);
96 }
97 
OnCreateConsumerQueue(Message & message,bool silent)98 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
99     Message& message, bool silent) {
100   ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
101   ALOGD_IF(
102       TRACE,
103       "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
104       channel_id(), silent);
105 
106   int channel_id;
107   auto status = message.PushChannel(0, nullptr, &channel_id);
108   if (!status) {
109     ALOGE(
110         "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
111         "channel: %s",
112         status.GetErrorMessage().c_str());
113     return ErrorStatus(ENOMEM);
114   }
115 
116   auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
117       service(), buffer_id(), channel_id, shared_from_this(), silent);
118 
119   // Register the existing buffers with the new consumer queue.
120   for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
121     if (auto buffer = buffers_[slot].lock())
122       consumer_queue_channel->RegisterNewBuffer(buffer, slot);
123   }
124 
125   const auto channel_status =
126       service()->SetChannel(channel_id, consumer_queue_channel);
127   if (!channel_status) {
128     ALOGE(
129         "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
130         "%s",
131         channel_status.GetErrorMessage().c_str());
132     return ErrorStatus(ENOMEM);
133   }
134 
135   return {status.take()};
136 }
137 
OnGetQueueInfo(Message &)138 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
139   return {{config_, buffer_id()}};
140 }
141 
142 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnProducerQueueAllocateBuffers(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)143 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
144     Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
145     uint32_t format, uint64_t usage, size_t buffer_count) {
146   ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
147   ALOGD_IF(TRACE,
148            "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
149            "producer_channel_id=%d",
150            channel_id());
151 
152   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
153 
154   // Deny buffer allocation violating preset rules.
155   if (usage & usage_policy_.usage_deny_set_mask) {
156     ALOGE(
157         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
158         " is not permitted. Violating usage_deny_set_mask, the following  bits "
159         "shall not be set: %" PRIx64 ".",
160         usage, usage_policy_.usage_deny_set_mask);
161     return ErrorStatus(EINVAL);
162   }
163 
164   if (~usage & usage_policy_.usage_deny_clear_mask) {
165     ALOGE(
166         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
167         " is not permitted. Violating usage_deny_clear_mask, the following "
168         "bits must be set: %" PRIx64 ".",
169         usage, usage_policy_.usage_deny_clear_mask);
170     return ErrorStatus(EINVAL);
171   }
172 
173   // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
174   // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
175   uint64_t effective_usage =
176       (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
177 
178   for (size_t i = 0; i < buffer_count; i++) {
179     auto status = AllocateBuffer(message, width, height, layer_count, format,
180                                  effective_usage);
181     if (!status) {
182       ALOGE(
183           "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
184           "allocate new buffer.");
185       return ErrorStatus(status.error());
186     }
187     buffer_handles.push_back(status.take());
188   }
189 
190   return {std::move(buffer_handles)};
191 }
192 
193 Status<std::pair<RemoteChannelHandle, size_t>>
AllocateBuffer(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)194 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
195                                      uint32_t height, uint32_t layer_count,
196                                      uint32_t format, uint64_t usage) {
197   ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
198   ALOGD_IF(TRACE,
199            "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
200            channel_id());
201 
202   if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
203     ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
204     return ErrorStatus(E2BIG);
205   }
206 
207   // Here we are creating a new BufferHubBuffer, initialize the producer
208   // channel, and returning its file handle back to the client.
209   // buffer_id is the id of the producer channel of BufferHubBuffer.
210   int buffer_id;
211   auto status = message.PushChannel(0, nullptr, &buffer_id);
212 
213   if (!status) {
214     ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
215           status.GetErrorMessage().c_str());
216     return ErrorStatus(status.error());
217   }
218 
219   ALOGD_IF(TRACE,
220            "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
221            "height=%u layer_count=%u format=%u usage=%" PRIx64,
222            buffer_id, width, height, layer_count, format, usage);
223   auto buffer_handle = status.take();
224 
225   auto producer_channel_status =
226       ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
227                               format, usage, config_.user_metadata_size);
228   if (!producer_channel_status) {
229     ALOGE(
230         "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
231         "buffer: %s",
232         producer_channel_status.GetErrorMessage().c_str());
233     return ErrorStatus(ENOMEM);
234   }
235   auto producer_channel = producer_channel_status.take();
236 
237   ALOGD_IF(
238       TRACE,
239       "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
240       buffer_id, buffer_handle.value());
241 
242   const auto channel_status =
243       service()->SetChannel(buffer_id, producer_channel);
244   if (!channel_status) {
245     ALOGE(
246         "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
247         "for new BufferHubBuffer: %s",
248         channel_status.GetErrorMessage().c_str());
249     return ErrorStatus(ENOMEM);
250   }
251 
252   // Register the newly allocated buffer's channel_id into the first empty
253   // buffer slot.
254   size_t slot = 0;
255   for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
256     if (buffers_[slot].expired())
257       break;
258   }
259   if (slot == BufferHubRPC::kMaxQueueCapacity) {
260     ALOGE(
261         "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
262         "buffer allocation.");
263     return ErrorStatus(E2BIG);
264   }
265 
266   buffers_[slot] = producer_channel;
267   capacity_++;
268 
269   // Notify each consumer channel about the new buffer.
270   for (auto* consumer_channel : consumer_channels_) {
271     ALOGD(
272         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
273         "buffer, buffer_id=%d",
274         buffer_id);
275     consumer_channel->RegisterNewBuffer(producer_channel, slot);
276   }
277 
278   return {{std::move(buffer_handle), slot}};
279 }
280 
OnProducerQueueRemoveBuffer(Message &,size_t slot)281 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer(
282     Message& /*message*/, size_t slot) {
283   if (buffers_[slot].expired()) {
284     ALOGE(
285         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove "
286         "an invalid buffer producer at slot %zu",
287         slot);
288     return ErrorStatus(EINVAL);
289   }
290 
291   if (capacity_ == 0) {
292     ALOGE(
293         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a "
294         "buffer producer while the queue's capacity is already zero.");
295     return ErrorStatus(EINVAL);
296   }
297 
298   buffers_[slot].reset();
299   capacity_--;
300   return {};
301 }
302 
AddConsumer(ConsumerQueueChannel * channel)303 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
304   consumer_channels_.push_back(channel);
305 }
306 
RemoveConsumer(ConsumerQueueChannel * channel)307 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
308   consumer_channels_.erase(
309       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
310 }
311 
312 }  // namespace dvr
313 }  // namespace android
314