1 #include "consumer_channel.h"
2
3 #include <log/log.h>
4 #include <utils/Trace.h>
5
6 #include <thread>
7
8 #include <private/dvr/bufferhub_rpc.h>
9 #include "producer_channel.h"
10
11 using android::pdx::ErrorStatus;
12 using android::pdx::BorrowedHandle;
13 using android::pdx::Channel;
14 using android::pdx::Message;
15 using android::pdx::Status;
16 using android::pdx::rpc::DispatchRemoteMethod;
17
18 namespace android {
19 namespace dvr {
20
ConsumerChannel(BufferHubService * service,int buffer_id,int channel_id,const std::shared_ptr<Channel> producer)21 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
22 int channel_id,
23 const std::shared_ptr<Channel> producer)
24 : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
25 handled_(true),
26 ignored_(false),
27 producer_(producer) {
28 GetProducer()->AddConsumer(this);
29 }
30
~ConsumerChannel()31 ConsumerChannel::~ConsumerChannel() {
32 ALOGD_IF(TRACE,
33 "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
34 channel_id(), buffer_id());
35
36 if (auto producer = GetProducer()) {
37 if (!handled_) // Producer is waiting for our Release.
38 producer->OnConsumerIgnored();
39 producer->RemoveConsumer(this);
40 }
41 }
42
GetBufferInfo() const43 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
44 BufferHubChannel::BufferInfo info;
45 if (auto producer = GetProducer()) {
46 // If producer has not hung up, copy most buffer info from the producer.
47 info = producer->GetBufferInfo();
48 }
49 info.id = buffer_id();
50 return info;
51 }
52
GetProducer() const53 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
54 return std::static_pointer_cast<ProducerChannel>(producer_.lock());
55 }
56
HandleImpulse(Message & message)57 void ConsumerChannel::HandleImpulse(Message& message) {
58 ATRACE_NAME("ConsumerChannel::HandleImpulse");
59 switch (message.GetOp()) {
60 case BufferHubRPC::ConsumerRelease::Opcode:
61 OnConsumerRelease(message, {});
62 break;
63 }
64 }
65
HandleMessage(Message & message)66 bool ConsumerChannel::HandleMessage(Message& message) {
67 ATRACE_NAME("ConsumerChannel::HandleMessage");
68 auto producer = GetProducer();
69 if (!producer)
70 REPLY_ERROR_RETURN(message, EPIPE, true);
71
72 switch (message.GetOp()) {
73 case BufferHubRPC::GetBuffer::Opcode:
74 DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
75 *producer, &ProducerChannel::OnGetBuffer, message);
76 return true;
77
78 case BufferHubRPC::NewConsumer::Opcode:
79 DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
80 *producer, &ProducerChannel::OnNewConsumer, message);
81 return true;
82
83 case BufferHubRPC::ConsumerAcquire::Opcode:
84 DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
85 *this, &ConsumerChannel::OnConsumerAcquire, message);
86 return true;
87
88 case BufferHubRPC::ConsumerRelease::Opcode:
89 DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
90 *this, &ConsumerChannel::OnConsumerRelease, message);
91 return true;
92
93 case BufferHubRPC::ConsumerSetIgnore::Opcode:
94 DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
95 *this, &ConsumerChannel::OnConsumerSetIgnore, message);
96 return true;
97
98 default:
99 return false;
100 }
101 }
102
103 Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>>
OnConsumerAcquire(Message & message,std::size_t metadata_size)104 ConsumerChannel::OnConsumerAcquire(Message& message,
105 std::size_t metadata_size) {
106 ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
107 auto producer = GetProducer();
108 if (!producer)
109 return ErrorStatus(EPIPE);
110
111 if (ignored_ || handled_) {
112 ALOGE(
113 "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
114 "ignored=%d handled=%d channel_id=%d buffer_id=%d",
115 ignored_, handled_, message.GetChannelId(), producer->buffer_id());
116 return ErrorStatus(EBUSY);
117 } else {
118 ClearAvailable();
119 return producer->OnConsumerAcquire(message, metadata_size);
120 }
121 }
122
OnConsumerRelease(Message & message,LocalFence release_fence)123 Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
124 LocalFence release_fence) {
125 ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
126 auto producer = GetProducer();
127 if (!producer)
128 return ErrorStatus(EPIPE);
129
130 if (ignored_ || handled_) {
131 ALOGE(
132 "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
133 "ignored=%d handled=%d channel_id=%d buffer_id=%d",
134 ignored_, handled_, message.GetChannelId(), producer->buffer_id());
135 return ErrorStatus(EBUSY);
136 } else {
137 ClearAvailable();
138 auto status =
139 producer->OnConsumerRelease(message, std::move(release_fence));
140 handled_ = !!status;
141 return status;
142 }
143 }
144
OnConsumerSetIgnore(Message &,bool ignored)145 Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
146 ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
147 auto producer = GetProducer();
148 if (!producer)
149 return ErrorStatus(EPIPE);
150
151 ignored_ = ignored;
152 if (ignored_ && !handled_) {
153 // Update the producer if ignore is set after the consumer acquires the
154 // buffer.
155 ClearAvailable();
156 producer->OnConsumerIgnored();
157 handled_ = false;
158 }
159
160 return {};
161 }
162
OnProducerPosted()163 bool ConsumerChannel::OnProducerPosted() {
164 if (ignored_) {
165 handled_ = true;
166 return false;
167 } else {
168 handled_ = false;
169 SignalAvailable();
170 return true;
171 }
172 }
173
OnProducerClosed()174 void ConsumerChannel::OnProducerClosed() {
175 producer_.reset();
176 Hangup();
177 }
178
179 } // namespace dvr
180 } // namespace android
181