1 #include <inttypes.h>
2 
3 #include <private/dvr/consumer_queue_channel.h>
4 #include <private/dvr/producer_channel.h>
5 #include <private/dvr/producer_queue_channel.h>
6 
7 using android::pdx::ErrorStatus;
8 using android::pdx::Message;
9 using android::pdx::RemoteChannelHandle;
10 using android::pdx::Status;
11 using android::pdx::rpc::DispatchRemoteMethod;
12 
13 namespace android {
14 namespace dvr {
15 
ProducerQueueChannel(BufferHubService * service,int channel_id,const ProducerQueueConfig & config,const UsagePolicy & usage_policy,int * error)16 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
17                                            int channel_id,
18                                            const ProducerQueueConfig& config,
19                                            const UsagePolicy& usage_policy,
20                                            int* error)
21     : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
22       config_(config),
23       usage_policy_(usage_policy),
24       capacity_(0) {
25   *error = 0;
26 }
27 
~ProducerQueueChannel()28 ProducerQueueChannel::~ProducerQueueChannel() {
29   ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
30            buffer_id());
31   for (auto* consumer : consumer_channels_)
32     consumer->OnProducerClosed();
33 }
34 
35 /* static */
Create(BufferHubService * service,int channel_id,const ProducerQueueConfig & config,const UsagePolicy & usage_policy)36 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
37     BufferHubService* service, int channel_id,
38     const ProducerQueueConfig& config, const UsagePolicy& usage_policy) {
39   // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
40   // should be mutually exclusive.
41   if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
42     ALOGE(
43         "BufferHubService::OnCreateProducerQueue: illegal usage mask "
44         "configuration: usage_deny_set_mask=%" PRIx64
45         " usage_deny_clear_mask=%" PRIx64,
46         usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
47     return ErrorStatus(EINVAL);
48   }
49 
50   int error = 0;
51   std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
52       service, channel_id, config, usage_policy, &error));
53   if (error < 0)
54     return ErrorStatus(-error);
55   else
56     return {std::move(producer)};
57 }
58 
HandleMessage(Message & message)59 bool ProducerQueueChannel::HandleMessage(Message& message) {
60   ATRACE_NAME("ProducerQueueChannel::HandleMessage");
61   switch (message.GetOp()) {
62     case BufferHubRPC::CreateConsumerQueue::Opcode:
63       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
64           *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
65       return true;
66 
67     case BufferHubRPC::GetQueueInfo::Opcode:
68       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
69           *this, &ProducerQueueChannel::OnGetQueueInfo, message);
70       return true;
71 
72     case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
73       DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
74           *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
75           message);
76       return true;
77 
78     case BufferHubRPC::ProducerQueueInsertBuffer::Opcode:
79       DispatchRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
80           *this, &ProducerQueueChannel::OnProducerQueueInsertBuffer, message);
81       return true;
82 
83     case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode:
84       DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(
85           *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message);
86       return true;
87 
88     default:
89       return false;
90   }
91 }
92 
HandleImpulse(Message &)93 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
94   ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
95 }
96 
GetBufferInfo() const97 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
98   return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
99                     usage_policy_);
100 }
101 
OnCreateConsumerQueue(Message & message,bool silent)102 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
103     Message& message, bool silent) {
104   ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
105   ALOGD_IF(
106       TRACE,
107       "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
108       channel_id(), silent);
109 
110   int channel_id;
111   auto status = message.PushChannel(0, nullptr, &channel_id);
112   if (!status) {
113     ALOGE(
114         "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
115         "channel: %s",
116         status.GetErrorMessage().c_str());
117     return ErrorStatus(ENOMEM);
118   }
119 
120   auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
121       service(), buffer_id(), channel_id, shared_from_this(), silent);
122 
123   // Register the existing buffers with the new consumer queue.
124   for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
125     if (auto buffer = buffers_[slot].lock())
126       consumer_queue_channel->RegisterNewBuffer(buffer, slot);
127   }
128 
129   const auto channel_status =
130       service()->SetChannel(channel_id, consumer_queue_channel);
131   if (!channel_status) {
132     ALOGE(
133         "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
134         "%s",
135         channel_status.GetErrorMessage().c_str());
136     return ErrorStatus(ENOMEM);
137   }
138 
139   return {status.take()};
140 }
141 
OnGetQueueInfo(Message &)142 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
143   return {{config_, buffer_id()}};
144 }
145 
146 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnProducerQueueAllocateBuffers(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage,size_t buffer_count)147 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
148     Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
149     uint32_t format, uint64_t usage, size_t buffer_count) {
150   ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
151   ALOGD_IF(TRACE,
152            "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
153            "producer_channel_id=%d",
154            channel_id());
155 
156   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
157 
158   // Deny buffer allocation violating preset rules.
159   if (usage & usage_policy_.usage_deny_set_mask) {
160     ALOGE(
161         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
162         " is not permitted. Violating usage_deny_set_mask, the following  bits "
163         "shall not be set: %" PRIx64 ".",
164         usage, usage_policy_.usage_deny_set_mask);
165     return ErrorStatus(EINVAL);
166   }
167 
168   if (~usage & usage_policy_.usage_deny_clear_mask) {
169     ALOGE(
170         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
171         " is not permitted. Violating usage_deny_clear_mask, the following "
172         "bits must be set: %" PRIx64 ".",
173         usage, usage_policy_.usage_deny_clear_mask);
174     return ErrorStatus(EINVAL);
175   }
176 
177   // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
178   // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
179   uint64_t effective_usage =
180       (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
181 
182   for (size_t i = 0; i < buffer_count; i++) {
183     auto status = AllocateBuffer(message, width, height, layer_count, format,
184                                  effective_usage);
185     if (!status) {
186       ALOGE(
187           "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
188           "allocate new buffer.");
189       return ErrorStatus(status.error());
190     }
191     buffer_handles.push_back(status.take());
192   }
193 
194   return {std::move(buffer_handles)};
195 }
196 
197 Status<std::pair<RemoteChannelHandle, size_t>>
AllocateBuffer(Message & message,uint32_t width,uint32_t height,uint32_t layer_count,uint32_t format,uint64_t usage)198 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
199                                      uint32_t height, uint32_t layer_count,
200                                      uint32_t format, uint64_t usage) {
201   ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
202   ALOGD_IF(TRACE,
203            "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
204            channel_id());
205 
206   if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
207     ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
208     return ErrorStatus(E2BIG);
209   }
210 
211   // Here we are creating a new BufferHubBuffer, initialize the producer
212   // channel, and returning its file handle back to the client.
213   // buffer_id is the id of the producer channel of BufferHubBuffer.
214   int buffer_id;
215   auto status = message.PushChannel(0, nullptr, &buffer_id);
216 
217   if (!status) {
218     ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
219           status.GetErrorMessage().c_str());
220     return ErrorStatus(status.error());
221   }
222 
223   ALOGD_IF(TRACE,
224            "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
225            "height=%u layer_count=%u format=%u usage=%" PRIx64,
226            buffer_id, width, height, layer_count, format, usage);
227   auto buffer_handle = status.take();
228 
229   auto producer_channel_status =
230       ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
231                               format, usage, config_.user_metadata_size);
232   if (!producer_channel_status) {
233     ALOGE(
234         "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
235         "buffer: %s",
236         producer_channel_status.GetErrorMessage().c_str());
237     return ErrorStatus(ENOMEM);
238   }
239   auto producer_channel = producer_channel_status.take();
240 
241   ALOGD_IF(
242       TRACE,
243       "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
244       buffer_id, buffer_handle.value());
245 
246   const auto channel_status =
247       service()->SetChannel(buffer_id, producer_channel);
248   if (!channel_status) {
249     ALOGE(
250         "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
251         "for new BufferHubBuffer: %s",
252         channel_status.GetErrorMessage().c_str());
253     return ErrorStatus(ENOMEM);
254   }
255 
256   // Register the newly allocated buffer's channel_id into the first empty
257   // buffer slot.
258   size_t slot = 0;
259   for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
260     if (buffers_[slot].expired())
261       break;
262   }
263   if (slot == BufferHubRPC::kMaxQueueCapacity) {
264     ALOGE(
265         "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
266         "buffer allocation.");
267     return ErrorStatus(E2BIG);
268   }
269 
270   buffers_[slot] = producer_channel;
271   capacity_++;
272 
273   // Notify each consumer channel about the new buffer.
274   for (auto* consumer_channel : consumer_channels_) {
275     ALOGD(
276         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
277         "buffer, buffer_id=%d",
278         buffer_id);
279     consumer_channel->RegisterNewBuffer(producer_channel, slot);
280   }
281 
282   return {{std::move(buffer_handle), slot}};
283 }
284 
OnProducerQueueInsertBuffer(pdx::Message & message,int buffer_cid)285 Status<size_t> ProducerQueueChannel::OnProducerQueueInsertBuffer(
286     pdx::Message& message, int buffer_cid) {
287   ATRACE_NAME("ProducerQueueChannel::InsertBuffer");
288   ALOGD_IF(TRACE,
289            "ProducerQueueChannel::InsertBuffer: channel_id=%d, buffer_cid=%d",
290            channel_id(), buffer_cid);
291 
292   if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
293     ALOGE("ProducerQueueChannel::InsertBuffer: reaches kMaxQueueCapacity.");
294     return ErrorStatus(E2BIG);
295   }
296   auto producer_channel = std::static_pointer_cast<ProducerChannel>(
297       service()->GetChannel(buffer_cid));
298   if (producer_channel == nullptr ||
299       producer_channel->channel_type() != BufferHubChannel::kProducerType) {
300     // Rejects the request if the requested buffer channel is invalid and/or
301     // it's not a ProducerChannel.
302     ALOGE(
303         "ProducerQueueChannel::InsertBuffer: Invalid buffer_cid=%d, "
304         "producer_buffer=0x%p, channel_type=%d.",
305         buffer_cid, producer_channel.get(),
306         producer_channel == nullptr ? -1 : producer_channel->channel_type());
307     return ErrorStatus(EINVAL);
308   }
309   if (producer_channel->GetActiveProcessId() != message.GetProcessId()) {
310     // Rejects the request if the requested buffer channel is not currently
311     // connected to the caller this is IPC request. This effectively prevents
312     // fake buffer_cid from being injected.
313     ALOGE(
314         "ProducerQueueChannel::InsertBuffer: Requested buffer channel "
315         "(buffer_cid=%d) is not connected to the calling process (pid=%d). "
316         "It's connected to a different process (pid=%d).",
317         buffer_cid, message.GetProcessId(),
318         producer_channel->GetActiveProcessId());
319     return ErrorStatus(EINVAL);
320   }
321   uint64_t buffer_state = producer_channel->buffer_state();
322   // TODO(b/112007999) add an atomic variable in metadata header in shared
323   // memory to indicate which client is the last producer of the buffer.
324   // Currently, the first client is the only producer to the buffer.
325   // Thus, it checks whether the first client gains the buffer below.
326   if (!BufferHubDefs::isClientGained(buffer_state,
327                                      BufferHubDefs::kFirstClientBitMask)) {
328     // Rejects the request if the requested buffer is not in Gained state.
329     ALOGE(
330         "ProducerQueueChannel::InsertBuffer: The buffer (cid=%d, "
331         "state=0x%" PRIx64 ") is not in gained state.",
332         buffer_cid, buffer_state);
333     return ErrorStatus(EINVAL);
334   }
335 
336   // Register the to-be-inserted buffer's channel_id into the first empty
337   // buffer slot.
338   size_t slot = 0;
339   for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
340     if (buffers_[slot].expired())
341       break;
342   }
343   if (slot == BufferHubRPC::kMaxQueueCapacity) {
344     ALOGE(
345         "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
346         "buffer allocation.");
347     return ErrorStatus(E2BIG);
348   }
349 
350   buffers_[slot] = producer_channel;
351   capacity_++;
352 
353   // Notify each consumer channel about the new buffer.
354   for (auto* consumer_channel : consumer_channels_) {
355     ALOGD(
356         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
357         "buffer, buffer_cid=%d",
358         buffer_cid);
359     consumer_channel->RegisterNewBuffer(producer_channel, slot);
360   }
361 
362   return {slot};
363 }
364 
OnProducerQueueRemoveBuffer(Message &,size_t slot)365 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer(
366     Message& /*message*/, size_t slot) {
367   if (buffers_[slot].expired()) {
368     ALOGE(
369         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove "
370         "an invalid buffer producer at slot %zu",
371         slot);
372     return ErrorStatus(EINVAL);
373   }
374 
375   if (capacity_ == 0) {
376     ALOGE(
377         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a "
378         "buffer producer while the queue's capacity is already zero.");
379     return ErrorStatus(EINVAL);
380   }
381 
382   buffers_[slot].reset();
383   capacity_--;
384   return {};
385 }
386 
AddConsumer(ConsumerQueueChannel * channel)387 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
388   consumer_channels_.push_back(channel);
389 }
390 
RemoveConsumer(ConsumerQueueChannel * channel)391 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
392   consumer_channels_.erase(
393       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
394 }
395 
396 }  // namespace dvr
397 }  // namespace android
398