1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "OutputStreamImpl.h"
16
17 #include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
18 #include <aidl/device/google/atv/audio_proxy/PresentationPosition.h>
19 #include <android-base/logging.h>
20 #include <time.h>
21
22 #include "AudioProxyClientError.h"
23 #include "AudioProxyStreamOut.h"
24
25 using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
26 using android::status_t;
27
28 namespace audio_proxy {
29 namespace {
30 // 1GB
31 constexpr uint32_t kMaxBufferSize = 1 << 30;
32
deleteEventFlag(EventFlag * obj)33 void deleteEventFlag(EventFlag* obj) {
34 if (!obj) {
35 return;
36 }
37
38 status_t status = EventFlag::deleteEventFlag(&obj);
39 if (status) {
40 LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
41 }
42 }
43
44 class WriteThread : public Thread {
45 public:
46 // WriteThread's lifespan never exceeds StreamOut's lifespan.
47 WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
48 OutputStreamImpl::DataMQ* dataMQ,
49 OutputStreamImpl::StatusMQ* statusMQ, EventFlag* eventFlag);
50
51 ~WriteThread() override;
52
53 private:
54 bool threadLoop() override;
55
56 PresentationPosition doGetPresentationPosition();
57 int64_t doWrite();
58
59 std::atomic<bool>* const mStop;
60 AudioProxyStreamOut* mStream;
61 OutputStreamImpl::DataMQ* const mDataMQ;
62 OutputStreamImpl::StatusMQ* const mStatusMQ;
63 EventFlag* const mEventFlag;
64 const std::unique_ptr<int8_t[]> mBuffer;
65 };
66
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,OutputStreamImpl::DataMQ * dataMQ,OutputStreamImpl::StatusMQ * statusMQ,EventFlag * eventFlag)67 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
68 OutputStreamImpl::DataMQ* dataMQ,
69 OutputStreamImpl::StatusMQ* statusMQ,
70 EventFlag* eventFlag)
71 : Thread(false /*canCallJava*/),
72 mStop(stop),
73 mStream(stream),
74 mDataMQ(dataMQ),
75 mStatusMQ(statusMQ),
76 mEventFlag(eventFlag),
77 mBuffer(new int8_t[mDataMQ->getQuantumCount()]) {}
78
79 WriteThread::~WriteThread() = default;
80
doWrite()81 int64_t WriteThread::doWrite() {
82 const size_t availToRead = mDataMQ->availableToRead();
83 if (availToRead == 0) {
84 return 0;
85 }
86
87 if (!mDataMQ->read(&mBuffer[0], availToRead)) {
88 return 0;
89 }
90
91 return mStream->write(&mBuffer[0], availToRead);
92 }
93
doGetPresentationPosition()94 PresentationPosition WriteThread::doGetPresentationPosition() {
95 PresentationPosition position;
96 mStream->getPresentationPosition(&position.frames, &position.timestamp);
97 return position;
98 }
99
threadLoop()100 bool WriteThread::threadLoop() {
101 // This implementation doesn't return control back to the Thread until the
102 // parent thread decides to stop, as the Thread uses mutexes, and this can
103 // lead to priority inversion.
104 while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
105 uint32_t efState = 0;
106 mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY),
107 &efState);
108 if (!(efState & static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY))) {
109 continue; // Nothing to do.
110 }
111
112 WriteStatus status;
113 status.written = doWrite();
114 status.position = doGetPresentationPosition();
115
116 if (!mStatusMQ->write(&status)) {
117 LOG(ERROR) << "status message queue write failed.";
118 }
119 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_FULL));
120 }
121
122 return false;
123 }
124
125 } // namespace
126
OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)127 OutputStreamImpl::OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)
128 : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
129
~OutputStreamImpl()130 OutputStreamImpl::~OutputStreamImpl() {
131 closeImpl();
132
133 if (mWriteThread) {
134 status_t status = mWriteThread->join();
135 if (status) {
136 LOG(ERROR) << "write thread exit error: " << strerror(-status);
137 }
138 }
139
140 mEventFlag.reset();
141 }
142
standby()143 ndk::ScopedAStatus OutputStreamImpl::standby() {
144 mStream->standby();
145 return ndk::ScopedAStatus::ok();
146 }
147
close()148 ndk::ScopedAStatus OutputStreamImpl::close() { return closeImpl(); }
149
closeImpl()150 ndk::ScopedAStatus OutputStreamImpl::closeImpl() {
151 if (mStopWriteThread.load(
152 std::memory_order_relaxed)) { // only this thread writes
153 return ndk::ScopedAStatus::ok();
154 }
155 mStopWriteThread.store(true, std::memory_order_release);
156 if (mEventFlag) {
157 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
158 }
159
160 return ndk::ScopedAStatus::ok();
161 }
162
prepareForWriting(int32_t frameSize,int32_t framesCount,DataMQDesc * dataMQDesc,StatusMQDesc * statusMQDesc)163 ndk::ScopedAStatus OutputStreamImpl::prepareForWriting(
164 int32_t frameSize, int32_t framesCount, DataMQDesc* dataMQDesc,
165 StatusMQDesc* statusMQDesc) {
166 if (mDataMQ) {
167 LOG(ERROR) << "the client attempted to call prepareForWriting twice.";
168 return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
169 }
170
171 if (frameSize == 0 || framesCount == 0) {
172 LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
173 << framesCount << ")";
174 return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
175 }
176
177 if (frameSize > kMaxBufferSize / framesCount) {
178 LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
179 << " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
180 return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
181 }
182
183 auto dataMQ =
184 std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
185 if (!dataMQ->isValid()) {
186 LOG(ERROR) << "data MQ is invalid";
187 return ndk::ScopedAStatus::fromServiceSpecificError(
188 ERROR_FMQ_CREATION_FAILURE);
189 }
190
191 auto statusMQ = std::make_unique<StatusMQ>(1);
192 if (!statusMQ->isValid()) {
193 LOG(ERROR) << "status MQ is invalid";
194 return ndk::ScopedAStatus::fromServiceSpecificError(
195 ERROR_FMQ_CREATION_FAILURE);
196 }
197
198 EventFlag* rawEventFlag = nullptr;
199 status_t status =
200 EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
201 std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
202 deleteEventFlag);
203 if (status != ::android::OK || !eventFlag) {
204 LOG(ERROR) << "failed creating event flag for data MQ: "
205 << strerror(-status);
206 return ndk::ScopedAStatus::fromServiceSpecificError(
207 ERROR_FMQ_CREATION_FAILURE);
208 }
209
210 sp<WriteThread> writeThread =
211 new WriteThread(&mStopWriteThread, mStream.get(), dataMQ.get(),
212 statusMQ.get(), eventFlag.get());
213 status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
214 if (status != ::android::OK) {
215 LOG(ERROR) << "failed to start writer thread: " << strerror(-status);
216 return ndk::ScopedAStatus::fromServiceSpecificError(
217 ERROR_FMQ_CREATION_FAILURE);
218 }
219
220 mDataMQ = std::move(dataMQ);
221 mStatusMQ = std::move(statusMQ);
222 mEventFlag = std::move(eventFlag);
223 mWriteThread = std::move(writeThread);
224
225 *dataMQDesc = mDataMQ->dupeDesc();
226 *statusMQDesc = mStatusMQ->dupeDesc();
227
228 return ndk::ScopedAStatus::ok();
229 }
230
pause()231 ndk::ScopedAStatus OutputStreamImpl::pause() {
232 mStream->pause();
233 return ndk::ScopedAStatus::ok();
234 }
235
resume()236 ndk::ScopedAStatus OutputStreamImpl::resume() {
237 mStream->resume();
238 return ndk::ScopedAStatus::ok();
239 }
240
drain(AudioDrain type)241 ndk::ScopedAStatus OutputStreamImpl::drain(AudioDrain type) {
242 mStream->drain(type);
243 return ndk::ScopedAStatus::ok();
244 }
245
flush()246 ndk::ScopedAStatus OutputStreamImpl::flush() {
247 mStream->flush();
248 return ndk::ScopedAStatus::ok();
249 }
250
setVolume(float left,float right)251 ndk::ScopedAStatus OutputStreamImpl::setVolume(float left, float right) {
252 mStream->setVolume(left, right);
253 return ndk::ScopedAStatus::ok();
254 }
255
getBufferSizeBytes(int64_t * bufferSizeBytes)256 ndk::ScopedAStatus OutputStreamImpl::getBufferSizeBytes(
257 int64_t* bufferSizeBytes) {
258 *bufferSizeBytes = mStream->getBufferSizeBytes();
259 return ndk::ScopedAStatus::ok();
260 }
261
getLatencyMs(int32_t * latencyMs)262 ndk::ScopedAStatus OutputStreamImpl::getLatencyMs(int32_t* latencyMs) {
263 *latencyMs = mStream->getLatencyMs();
264 return ndk::ScopedAStatus::ok();
265 }
266
start()267 ndk::ScopedAStatus OutputStreamImpl::start() {
268 mStream->start();
269 return ndk::ScopedAStatus::ok();
270 }
271
stop()272 ndk::ScopedAStatus OutputStreamImpl::stop() {
273 mStream->stop();
274 return ndk::ScopedAStatus::ok();
275 }
276
createMmapBuffer(int32_t minBufferSizeFrames,MmapBufferInfo * info)277 ndk::ScopedAStatus OutputStreamImpl::createMmapBuffer(
278 int32_t minBufferSizeFrames, MmapBufferInfo* info) {
279 *info = mStream->createMmapBuffer(minBufferSizeFrames);
280 return ndk::ScopedAStatus::ok();
281 }
282
getMmapPosition(PresentationPosition * position)283 ndk::ScopedAStatus OutputStreamImpl::getMmapPosition(
284 PresentationPosition* position) {
285 *position = mStream->getMmapPosition();
286 return ndk::ScopedAStatus::ok();
287 }
288
289 } // namespace audio_proxy