1 #include "consumer_queue_channel.h"
2 
3 #include <pdx/channel_handle.h>
4 
5 #include "producer_channel.h"
6 
7 using android::pdx::ErrorStatus;
8 using android::pdx::RemoteChannelHandle;
9 using android::pdx::Status;
10 using android::pdx::rpc::DispatchRemoteMethod;
11 using android::pdx::rpc::RemoteMethodError;
12 
13 namespace android {
14 namespace dvr {
15 
ConsumerQueueChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> & producer,bool silent)16 ConsumerQueueChannel::ConsumerQueueChannel(
17     BufferHubService* service, int buffer_id, int channel_id,
18     const std::shared_ptr<Channel>& producer, bool silent)
19     : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
20       producer_(producer),
21       capacity_(0),
22       silent_(silent) {
23   GetProducer()->AddConsumer(this);
24 }
25 
~ConsumerQueueChannel()26 ConsumerQueueChannel::~ConsumerQueueChannel() {
27   ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
28            channel_id());
29 
30   if (auto producer = GetProducer()) {
31     producer->RemoveConsumer(this);
32   }
33 }
34 
HandleMessage(Message & message)35 bool ConsumerQueueChannel::HandleMessage(Message& message) {
36   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
37   auto producer = GetProducer();
38   if (!producer) {
39     RemoteMethodError(message, EPIPE);
40     return true;
41   }
42 
43   switch (message.GetOp()) {
44     case BufferHubRPC::CreateConsumerQueue::Opcode:
45       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
46           *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
47       return true;
48 
49     case BufferHubRPC::GetQueueInfo::Opcode:
50       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
51           *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
52       return true;
53 
54     case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
55       DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
56           *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
57       return true;
58 
59     default:
60       return false;
61   }
62 }
63 
GetProducer() const64 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
65     const {
66   return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
67 }
68 
HandleImpulse(Message &)69 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
70   ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
71 }
72 
GetBufferInfo() const73 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
74   BufferHubChannel::BufferInfo info;
75   if (auto producer = GetProducer()) {
76     // If producer has not hung up, copy most buffer info from the producer.
77     info = producer->GetBufferInfo();
78   }
79   info.id = buffer_id();
80   info.capacity = capacity_;
81   return info;
82 }
83 
RegisterNewBuffer(const std::shared_ptr<ProducerChannel> & producer_channel,size_t slot)84 void ConsumerQueueChannel::RegisterNewBuffer(
85     const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
86   ALOGD_IF(TRACE,
87            "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
88            "slot=%zu silent=%d",
89            buffer_id(), producer_channel->buffer_id(), slot, silent_);
90   // Only register buffers if the queue is not silent.
91   if (!silent_) {
92     pending_buffer_slots_.emplace(producer_channel, slot);
93 
94     // Signal the client that there is new buffer available.
95     SignalAvailable();
96   }
97 }
98 
99 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message & message)100 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
101   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
102   ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
103   ALOGD_IF(TRACE,
104            "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
105            "pending_buffer_slots=%zu",
106            pending_buffer_slots_.size());
107 
108   // Indicate this is a silent queue that will not import buffers.
109   if (silent_)
110     return ErrorStatus(EBADR);
111 
112   while (!pending_buffer_slots_.empty()) {
113     auto producer_channel = pending_buffer_slots_.front().first.lock();
114     size_t producer_slot = pending_buffer_slots_.front().second;
115     pending_buffer_slots_.pop();
116 
117     // It's possible that the producer channel has expired. When this occurs,
118     // ignore the producer channel.
119     if (producer_channel == nullptr) {
120       ALOGW(
121           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
122           "channel has already been expired.");
123       continue;
124     }
125 
126     auto status = producer_channel->CreateConsumer(message);
127 
128     // If no buffers are imported successfully, clear available and return an
129     // error. Otherwise, return all consumer handles already imported
130     // successfully, but keep available bits on, so that the client can retry
131     // importing remaining consumer buffers.
132     if (!status) {
133       ALOGE(
134           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
135           "consumer: %s",
136           status.GetErrorMessage().c_str());
137       if (buffer_handles.empty()) {
138         ClearAvailable();
139         return status.error_status();
140       } else {
141         return {std::move(buffer_handles)};
142       }
143     }
144 
145     buffer_handles.emplace_back(status.take(), producer_slot);
146   }
147 
148   ClearAvailable();
149   return {std::move(buffer_handles)};
150 }
151 
OnProducerClosed()152 void ConsumerQueueChannel::OnProducerClosed() {
153   ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
154            buffer_id());
155   producer_.reset();
156   Hangup();
157 }
158 
159 }  // namespace dvr
160 }  // namespace android
161