1 #include "consumer_channel.h"
2 
3 #include <log/log.h>
4 #include <utils/Trace.h>
5 
6 #include <thread>
7 
8 #include <private/dvr/bufferhub_rpc.h>
9 #include "producer_channel.h"
10 
11 using android::pdx::ErrorStatus;
12 using android::pdx::BorrowedHandle;
13 using android::pdx::Channel;
14 using android::pdx::Message;
15 using android::pdx::Status;
16 using android::pdx::rpc::DispatchRemoteMethod;
17 
18 namespace android {
19 namespace dvr {
20 
ConsumerChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> producer)21 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
22                                  int channel_id,
23                                  const std::shared_ptr<Channel> producer)
24     : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
25       handled_(true),
26       ignored_(false),
27       producer_(producer) {
28   GetProducer()->AddConsumer(this);
29 }
30 
~ConsumerChannel()31 ConsumerChannel::~ConsumerChannel() {
32   ALOGD_IF(TRACE,
33            "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
34            channel_id(), buffer_id());
35 
36   if (auto producer = GetProducer()) {
37     if (!handled_)  // Producer is waiting for our Release.
38       producer->OnConsumerIgnored();
39     producer->RemoveConsumer(this);
40   }
41 }
42 
GetBufferInfo() const43 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
44   BufferHubChannel::BufferInfo info;
45   if (auto producer = GetProducer()) {
46     // If producer has not hung up, copy most buffer info from the producer.
47     info = producer->GetBufferInfo();
48   }
49   info.id = buffer_id();
50   return info;
51 }
52 
GetProducer() const53 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
54   return std::static_pointer_cast<ProducerChannel>(producer_.lock());
55 }
56 
HandleImpulse(Message & message)57 void ConsumerChannel::HandleImpulse(Message& message) {
58   ATRACE_NAME("ConsumerChannel::HandleImpulse");
59   switch (message.GetOp()) {
60     case BufferHubRPC::ConsumerRelease::Opcode:
61       OnConsumerRelease(message, {});
62       break;
63   }
64 }
65 
HandleMessage(Message & message)66 bool ConsumerChannel::HandleMessage(Message& message) {
67   ATRACE_NAME("ConsumerChannel::HandleMessage");
68   auto producer = GetProducer();
69   if (!producer)
70     REPLY_ERROR_RETURN(message, EPIPE, true);
71 
72   switch (message.GetOp()) {
73     case BufferHubRPC::GetBuffer::Opcode:
74       DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
75           *producer, &ProducerChannel::OnGetBuffer, message);
76       return true;
77 
78     case BufferHubRPC::NewConsumer::Opcode:
79       DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
80           *producer, &ProducerChannel::OnNewConsumer, message);
81       return true;
82 
83     case BufferHubRPC::ConsumerAcquire::Opcode:
84       DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
85           *this, &ConsumerChannel::OnConsumerAcquire, message);
86       return true;
87 
88     case BufferHubRPC::ConsumerRelease::Opcode:
89       DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
90           *this, &ConsumerChannel::OnConsumerRelease, message);
91       return true;
92 
93     case BufferHubRPC::ConsumerSetIgnore::Opcode:
94       DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
95           *this, &ConsumerChannel::OnConsumerSetIgnore, message);
96       return true;
97 
98     default:
99       return false;
100   }
101 }
102 
103 Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>>
OnConsumerAcquire(Message & message,std::size_t metadata_size)104 ConsumerChannel::OnConsumerAcquire(Message& message,
105                                    std::size_t metadata_size) {
106   ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
107   auto producer = GetProducer();
108   if (!producer)
109     return ErrorStatus(EPIPE);
110 
111   if (ignored_ || handled_) {
112     ALOGE(
113         "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
114         "ignored=%d handled=%d channel_id=%d buffer_id=%d",
115         ignored_, handled_, message.GetChannelId(), producer->buffer_id());
116     return ErrorStatus(EBUSY);
117   } else {
118     ClearAvailable();
119     return producer->OnConsumerAcquire(message, metadata_size);
120   }
121 }
122 
OnConsumerRelease(Message & message,LocalFence release_fence)123 Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
124                                                 LocalFence release_fence) {
125   ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
126   auto producer = GetProducer();
127   if (!producer)
128     return ErrorStatus(EPIPE);
129 
130   if (ignored_ || handled_) {
131     ALOGE(
132         "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
133         "ignored=%d handled=%d channel_id=%d buffer_id=%d",
134         ignored_, handled_, message.GetChannelId(), producer->buffer_id());
135     return ErrorStatus(EBUSY);
136   } else {
137     ClearAvailable();
138     auto status =
139         producer->OnConsumerRelease(message, std::move(release_fence));
140     handled_ = !!status;
141     return status;
142   }
143 }
144 
OnConsumerSetIgnore(Message &,bool ignored)145 Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
146   ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
147   auto producer = GetProducer();
148   if (!producer)
149     return ErrorStatus(EPIPE);
150 
151   ignored_ = ignored;
152   if (ignored_ && !handled_) {
153     // Update the producer if ignore is set after the consumer acquires the
154     // buffer.
155     ClearAvailable();
156     producer->OnConsumerIgnored();
157     handled_ = false;
158   }
159 
160   return {};
161 }
162 
OnProducerPosted()163 bool ConsumerChannel::OnProducerPosted() {
164   if (ignored_) {
165     handled_ = true;
166     return false;
167   } else {
168     handled_ = false;
169     SignalAvailable();
170     return true;
171   }
172 }
173 
OnProducerClosed()174 void ConsumerChannel::OnProducerClosed() {
175   producer_.reset();
176   Hangup();
177 }
178 
179 }  // namespace dvr
180 }  // namespace android
181