1 #include "consumer_channel.h"
2 
3 #include <log/log.h>
4 #include <utils/Trace.h>
5 
6 #include <thread>
7 
8 #include <private/dvr/bufferhub_rpc.h>
9 #include "producer_channel.h"
10 
11 using android::pdx::BorrowedHandle;
12 using android::pdx::Channel;
13 using android::pdx::ErrorStatus;
14 using android::pdx::Message;
15 using android::pdx::Status;
16 using android::pdx::rpc::DispatchRemoteMethod;
17 
18 namespace android {
19 namespace dvr {
20 
ConsumerChannel(BufferHubService * service,int buffer_id,int channel_id,uint64_t consumer_state_bit,const std::shared_ptr<Channel> producer)21 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
22                                  int channel_id, uint64_t consumer_state_bit,
23                                  const std::shared_ptr<Channel> producer)
24     : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
25       consumer_state_bit_(consumer_state_bit),
26       producer_(producer) {
27   GetProducer()->AddConsumer(this);
28 }
29 
~ConsumerChannel()30 ConsumerChannel::~ConsumerChannel() {
31   ALOGD_IF(TRACE,
32            "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
33            channel_id(), buffer_id());
34 
35   if (auto producer = GetProducer()) {
36     producer->RemoveConsumer(this);
37   }
38 }
39 
GetBufferInfo() const40 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
41   BufferHubChannel::BufferInfo info;
42   if (auto producer = GetProducer()) {
43     // If producer has not hung up, copy most buffer info from the producer.
44     info = producer->GetBufferInfo();
45   } else {
46     info.signaled_mask = consumer_state_bit();
47   }
48   info.id = buffer_id();
49   return info;
50 }
51 
GetProducer() const52 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
53   return std::static_pointer_cast<ProducerChannel>(producer_.lock());
54 }
55 
HandleImpulse(Message & message)56 void ConsumerChannel::HandleImpulse(Message& message) {
57   ATRACE_NAME("ConsumerChannel::HandleImpulse");
58   switch (message.GetOp()) {
59     case BufferHubRPC::ConsumerAcquire::Opcode:
60       OnConsumerAcquire(message);
61       break;
62     case BufferHubRPC::ConsumerRelease::Opcode:
63       OnConsumerRelease(message, {});
64       break;
65   }
66 }
67 
HandleMessage(Message & message)68 bool ConsumerChannel::HandleMessage(Message& message) {
69   ATRACE_NAME("ConsumerChannel::HandleMessage");
70   auto producer = GetProducer();
71   if (!producer)
72     REPLY_ERROR_RETURN(message, EPIPE, true);
73 
74   switch (message.GetOp()) {
75     case BufferHubRPC::GetBuffer::Opcode:
76       DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
77           *this, &ConsumerChannel::OnGetBuffer, message);
78       return true;
79 
80     case BufferHubRPC::NewConsumer::Opcode:
81       DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
82           *producer, &ProducerChannel::OnNewConsumer, message);
83       return true;
84 
85     case BufferHubRPC::ConsumerAcquire::Opcode:
86       DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
87           *this, &ConsumerChannel::OnConsumerAcquire, message);
88       return true;
89 
90     case BufferHubRPC::ConsumerRelease::Opcode:
91       DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
92           *this, &ConsumerChannel::OnConsumerRelease, message);
93       return true;
94 
95     case BufferHubRPC::ConsumerSetIgnore::Opcode:
96       DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
97           *this, &ConsumerChannel::OnConsumerSetIgnore, message);
98       return true;
99 
100     default:
101       return false;
102   }
103 }
104 
OnGetBuffer(Message &)105 Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer(
106     Message& /*message*/) {
107   ATRACE_NAME("ConsumerChannel::OnGetBuffer");
108   ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id());
109   if (auto producer = GetProducer()) {
110     return {producer->GetBuffer(consumer_state_bit_)};
111   } else {
112     return ErrorStatus(EPIPE);
113   }
114 }
115 
OnConsumerAcquire(Message & message)116 Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) {
117   ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
118   auto producer = GetProducer();
119   if (!producer)
120     return ErrorStatus(EPIPE);
121 
122   if (acquired_ || released_) {
123     ALOGE(
124         "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
125         "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
126         ignored_, acquired_, released_, message.GetChannelId(),
127         producer->buffer_id());
128     return ErrorStatus(EBUSY);
129   } else {
130     auto status = producer->OnConsumerAcquire(message);
131     if (status) {
132       ClearAvailable();
133       acquired_ = true;
134     }
135     return status;
136   }
137 }
138 
OnConsumerRelease(Message & message,LocalFence release_fence)139 Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
140                                                 LocalFence release_fence) {
141   ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
142   auto producer = GetProducer();
143   if (!producer)
144     return ErrorStatus(EPIPE);
145 
146   if (!acquired_ || released_) {
147     ALOGE(
148         "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
149         "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
150         ignored_, acquired_, released_, message.GetChannelId(),
151         producer->buffer_id());
152     return ErrorStatus(EBUSY);
153   } else {
154     auto status =
155         producer->OnConsumerRelease(message, std::move(release_fence));
156     if (status) {
157       ClearAvailable();
158       acquired_ = false;
159       released_ = true;
160     }
161     return status;
162   }
163 }
164 
OnConsumerSetIgnore(Message &,bool ignored)165 Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
166   ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
167   auto producer = GetProducer();
168   if (!producer)
169     return ErrorStatus(EPIPE);
170 
171   ignored_ = ignored;
172   if (ignored_ && acquired_) {
173     // Update the producer if ignore is set after the consumer acquires the
174     // buffer.
175     ClearAvailable();
176     producer->OnConsumerIgnored();
177     acquired_ = false;
178     released_ = true;
179   }
180 
181   return {};
182 }
183 
OnProducerPosted()184 bool ConsumerChannel::OnProducerPosted() {
185   if (ignored_) {
186     acquired_ = false;
187     released_ = true;
188     return false;
189   } else {
190     acquired_ = false;
191     released_ = false;
192     SignalAvailable();
193     return true;
194   }
195 }
196 
OnProducerClosed()197 void ConsumerChannel::OnProducerClosed() {
198   producer_.reset();
199   Hangup();
200 }
201 
202 }  // namespace dvr
203 }  // namespace android
204