1 #include <pdx/channel_handle.h>
2 #include <private/dvr/consumer_queue_channel.h>
3 #include <private/dvr/producer_channel.h>
4 
5 using android::pdx::ErrorStatus;
6 using android::pdx::RemoteChannelHandle;
7 using android::pdx::Status;
8 using android::pdx::rpc::DispatchRemoteMethod;
9 using android::pdx::rpc::RemoteMethodError;
10 
11 namespace android {
12 namespace dvr {
13 
ConsumerQueueChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> & producer,bool silent)14 ConsumerQueueChannel::ConsumerQueueChannel(
15     BufferHubService* service, int buffer_id, int channel_id,
16     const std::shared_ptr<Channel>& producer, bool silent)
17     : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
18       producer_(producer),
19       capacity_(0),
20       silent_(silent) {
21   GetProducer()->AddConsumer(this);
22 }
23 
~ConsumerQueueChannel()24 ConsumerQueueChannel::~ConsumerQueueChannel() {
25   ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
26            channel_id());
27 
28   if (auto producer = GetProducer()) {
29     producer->RemoveConsumer(this);
30   }
31 }
32 
HandleMessage(Message & message)33 bool ConsumerQueueChannel::HandleMessage(Message& message) {
34   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
35   auto producer = GetProducer();
36   if (!producer) {
37     RemoteMethodError(message, EPIPE);
38     return true;
39   }
40 
41   switch (message.GetOp()) {
42     case BufferHubRPC::CreateConsumerQueue::Opcode:
43       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
44           *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
45       return true;
46 
47     case BufferHubRPC::GetQueueInfo::Opcode:
48       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
49           *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
50       return true;
51 
52     case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
53       DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
54           *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
55       return true;
56 
57     default:
58       return false;
59   }
60 }
61 
GetProducer() const62 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
63     const {
64   return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
65 }
66 
HandleImpulse(Message &)67 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
68   ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
69 }
70 
GetBufferInfo() const71 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
72   BufferHubChannel::BufferInfo info;
73   if (auto producer = GetProducer()) {
74     // If producer has not hung up, copy most buffer info from the producer.
75     info = producer->GetBufferInfo();
76   }
77   info.id = buffer_id();
78   info.capacity = capacity_;
79   return info;
80 }
81 
RegisterNewBuffer(const std::shared_ptr<ProducerChannel> & producer_channel,size_t producer_slot)82 void ConsumerQueueChannel::RegisterNewBuffer(
83     const std::shared_ptr<ProducerChannel>& producer_channel,
84     size_t producer_slot) {
85   ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
86            __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
87            producer_slot, silent_);
88   // Only register buffers if the queue is not silent.
89   if (silent_) {
90     return;
91   }
92 
93   auto status = producer_channel->CreateConsumerStateMask();
94   if (!status.ok()) {
95     ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
96           status.GetErrorMessage().c_str());
97     return;
98   }
99   uint64_t consumer_state_mask = status.get();
100 
101   pending_buffer_slots_.emplace(producer_channel, producer_slot,
102                                 consumer_state_mask);
103   // Signal the client that there is new buffer available.
104   SignalAvailable();
105 }
106 
107 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message & message)108 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
109   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
110   ATRACE_NAME(__FUNCTION__);
111   ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
112            pending_buffer_slots_.size());
113 
114   // Indicate this is a silent queue that will not import buffers.
115   if (silent_)
116     return ErrorStatus(EBADR);
117 
118   while (!pending_buffer_slots_.empty()) {
119     auto producer_channel =
120         pending_buffer_slots_.front().producer_channel.lock();
121     size_t producer_slot = pending_buffer_slots_.front().producer_slot;
122     uint64_t consumer_state_mask =
123         pending_buffer_slots_.front().consumer_state_mask;
124     pending_buffer_slots_.pop();
125 
126     // It's possible that the producer channel has expired. When this occurs,
127     // ignore the producer channel.
128     if (producer_channel == nullptr) {
129       ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
130       continue;
131     }
132 
133     auto status =
134         producer_channel->CreateConsumer(message, consumer_state_mask);
135 
136     // If no buffers are imported successfully, clear available and return an
137     // error. Otherwise, return all consumer handles already imported
138     // successfully, but keep available bits on, so that the client can retry
139     // importing remaining consumer buffers.
140     if (!status) {
141       ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
142             status.GetErrorMessage().c_str());
143       if (buffer_handles.empty()) {
144         ClearAvailable();
145         return status.error_status();
146       } else {
147         return {std::move(buffer_handles)};
148       }
149     }
150 
151     buffer_handles.emplace_back(status.take(), producer_slot);
152   }
153 
154   ClearAvailable();
155   return {std::move(buffer_handles)};
156 }
157 
OnProducerClosed()158 void ConsumerQueueChannel::OnProducerClosed() {
159   ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
160            buffer_id());
161   producer_.reset();
162   Hangup();
163 }
164 
165 }  // namespace dvr
166 }  // namespace android
167