1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "OutputStreamImpl.h"
16 
17 #include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
18 #include <aidl/device/google/atv/audio_proxy/PresentationPosition.h>
19 #include <android-base/logging.h>
20 #include <time.h>
21 
22 #include "AudioProxyClientError.h"
23 #include "AudioProxyStreamOut.h"
24 
25 using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
26 using android::status_t;
27 
28 namespace audio_proxy {
29 namespace {
30 // 1GB
31 constexpr uint32_t kMaxBufferSize = 1 << 30;
32 
deleteEventFlag(EventFlag * obj)33 void deleteEventFlag(EventFlag* obj) {
34   if (!obj) {
35     return;
36   }
37 
38   status_t status = EventFlag::deleteEventFlag(&obj);
39   if (status) {
40     LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
41   }
42 }
43 
44 class WriteThread : public Thread {
45  public:
46   // WriteThread's lifespan never exceeds StreamOut's lifespan.
47   WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
48               OutputStreamImpl::DataMQ* dataMQ,
49               OutputStreamImpl::StatusMQ* statusMQ, EventFlag* eventFlag);
50 
51   ~WriteThread() override;
52 
53  private:
54   bool threadLoop() override;
55 
56   PresentationPosition doGetPresentationPosition();
57   int64_t doWrite();
58 
59   std::atomic<bool>* const mStop;
60   AudioProxyStreamOut* mStream;
61   OutputStreamImpl::DataMQ* const mDataMQ;
62   OutputStreamImpl::StatusMQ* const mStatusMQ;
63   EventFlag* const mEventFlag;
64   const std::unique_ptr<int8_t[]> mBuffer;
65 };
66 
WriteThread(std::atomic<bool> * stop,AudioProxyStreamOut * stream,OutputStreamImpl::DataMQ * dataMQ,OutputStreamImpl::StatusMQ * statusMQ,EventFlag * eventFlag)67 WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
68                          OutputStreamImpl::DataMQ* dataMQ,
69                          OutputStreamImpl::StatusMQ* statusMQ,
70                          EventFlag* eventFlag)
71     : Thread(false /*canCallJava*/),
72       mStop(stop),
73       mStream(stream),
74       mDataMQ(dataMQ),
75       mStatusMQ(statusMQ),
76       mEventFlag(eventFlag),
77       mBuffer(new int8_t[mDataMQ->getQuantumCount()]) {}
78 
79 WriteThread::~WriteThread() = default;
80 
doWrite()81 int64_t WriteThread::doWrite() {
82   const size_t availToRead = mDataMQ->availableToRead();
83   if (availToRead == 0) {
84     return 0;
85   }
86 
87   if (!mDataMQ->read(&mBuffer[0], availToRead)) {
88     return 0;
89   }
90 
91   return mStream->write(&mBuffer[0], availToRead);
92 }
93 
doGetPresentationPosition()94 PresentationPosition WriteThread::doGetPresentationPosition() {
95   PresentationPosition position;
96   mStream->getPresentationPosition(&position.frames, &position.timestamp);
97   return position;
98 }
99 
threadLoop()100 bool WriteThread::threadLoop() {
101   // This implementation doesn't return control back to the Thread until the
102   // parent thread decides to stop, as the Thread uses mutexes, and this can
103   // lead to priority inversion.
104   while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
105     uint32_t efState = 0;
106     mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY),
107                      &efState);
108     if (!(efState & static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY))) {
109       continue;  // Nothing to do.
110     }
111 
112     WriteStatus status;
113     status.written = doWrite();
114     status.position = doGetPresentationPosition();
115 
116     if (!mStatusMQ->write(&status)) {
117       LOG(ERROR) << "status message queue write failed.";
118     }
119     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_FULL));
120   }
121 
122   return false;
123 }
124 
125 }  // namespace
126 
OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)127 OutputStreamImpl::OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)
128     : mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
129 
~OutputStreamImpl()130 OutputStreamImpl::~OutputStreamImpl() {
131   closeImpl();
132 
133   if (mWriteThread) {
134     status_t status = mWriteThread->join();
135     if (status) {
136       LOG(ERROR) << "write thread exit error: " << strerror(-status);
137     }
138   }
139 
140   mEventFlag.reset();
141 }
142 
standby()143 ndk::ScopedAStatus OutputStreamImpl::standby() {
144   mStream->standby();
145   return ndk::ScopedAStatus::ok();
146 }
147 
close()148 ndk::ScopedAStatus OutputStreamImpl::close() { return closeImpl(); }
149 
closeImpl()150 ndk::ScopedAStatus OutputStreamImpl::closeImpl() {
151   if (mStopWriteThread.load(
152           std::memory_order_relaxed)) {  // only this thread writes
153     return ndk::ScopedAStatus::ok();
154   }
155   mStopWriteThread.store(true, std::memory_order_release);
156   if (mEventFlag) {
157     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
158   }
159 
160   return ndk::ScopedAStatus::ok();
161 }
162 
prepareForWriting(int32_t frameSize,int32_t framesCount,DataMQDesc * dataMQDesc,StatusMQDesc * statusMQDesc)163 ndk::ScopedAStatus OutputStreamImpl::prepareForWriting(
164     int32_t frameSize, int32_t framesCount, DataMQDesc* dataMQDesc,
165     StatusMQDesc* statusMQDesc) {
166   if (mDataMQ) {
167     LOG(ERROR) << "the client attempted to call prepareForWriting twice.";
168     return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
169   }
170 
171   if (frameSize == 0 || framesCount == 0) {
172     LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
173                << framesCount << ")";
174     return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
175   }
176 
177   if (frameSize > kMaxBufferSize / framesCount) {
178     LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
179                << " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
180     return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
181   }
182 
183   auto dataMQ =
184       std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
185   if (!dataMQ->isValid()) {
186     LOG(ERROR) << "data MQ is invalid";
187     return ndk::ScopedAStatus::fromServiceSpecificError(
188         ERROR_FMQ_CREATION_FAILURE);
189   }
190 
191   auto statusMQ = std::make_unique<StatusMQ>(1);
192   if (!statusMQ->isValid()) {
193     LOG(ERROR) << "status MQ is invalid";
194     return ndk::ScopedAStatus::fromServiceSpecificError(
195         ERROR_FMQ_CREATION_FAILURE);
196   }
197 
198   EventFlag* rawEventFlag = nullptr;
199   status_t status =
200       EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
201   std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
202                                                          deleteEventFlag);
203   if (status != ::android::OK || !eventFlag) {
204     LOG(ERROR) << "failed creating event flag for data MQ: "
205                << strerror(-status);
206     return ndk::ScopedAStatus::fromServiceSpecificError(
207         ERROR_FMQ_CREATION_FAILURE);
208   }
209 
210   sp<WriteThread> writeThread =
211       new WriteThread(&mStopWriteThread, mStream.get(), dataMQ.get(),
212                       statusMQ.get(), eventFlag.get());
213   status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
214   if (status != ::android::OK) {
215     LOG(ERROR) << "failed to start writer thread: " << strerror(-status);
216     return ndk::ScopedAStatus::fromServiceSpecificError(
217         ERROR_FMQ_CREATION_FAILURE);
218   }
219 
220   mDataMQ = std::move(dataMQ);
221   mStatusMQ = std::move(statusMQ);
222   mEventFlag = std::move(eventFlag);
223   mWriteThread = std::move(writeThread);
224 
225   *dataMQDesc = mDataMQ->dupeDesc();
226   *statusMQDesc = mStatusMQ->dupeDesc();
227 
228   return ndk::ScopedAStatus::ok();
229 }
230 
pause()231 ndk::ScopedAStatus OutputStreamImpl::pause() {
232   mStream->pause();
233   return ndk::ScopedAStatus::ok();
234 }
235 
resume()236 ndk::ScopedAStatus OutputStreamImpl::resume() {
237   mStream->resume();
238   return ndk::ScopedAStatus::ok();
239 }
240 
drain(AudioDrain type)241 ndk::ScopedAStatus OutputStreamImpl::drain(AudioDrain type) {
242   mStream->drain(type);
243   return ndk::ScopedAStatus::ok();
244 }
245 
flush()246 ndk::ScopedAStatus OutputStreamImpl::flush() {
247   mStream->flush();
248   return ndk::ScopedAStatus::ok();
249 }
250 
setVolume(float left,float right)251 ndk::ScopedAStatus OutputStreamImpl::setVolume(float left, float right) {
252   mStream->setVolume(left, right);
253   return ndk::ScopedAStatus::ok();
254 }
255 
getBufferSizeBytes(int64_t * bufferSizeBytes)256 ndk::ScopedAStatus OutputStreamImpl::getBufferSizeBytes(
257     int64_t* bufferSizeBytes) {
258   *bufferSizeBytes = mStream->getBufferSizeBytes();
259   return ndk::ScopedAStatus::ok();
260 }
261 
getLatencyMs(int32_t * latencyMs)262 ndk::ScopedAStatus OutputStreamImpl::getLatencyMs(int32_t* latencyMs) {
263   *latencyMs = mStream->getLatencyMs();
264   return ndk::ScopedAStatus::ok();
265 }
266 
start()267 ndk::ScopedAStatus OutputStreamImpl::start() {
268   mStream->start();
269   return ndk::ScopedAStatus::ok();
270 }
271 
stop()272 ndk::ScopedAStatus OutputStreamImpl::stop() {
273   mStream->stop();
274   return ndk::ScopedAStatus::ok();
275 }
276 
createMmapBuffer(int32_t minBufferSizeFrames,MmapBufferInfo * info)277 ndk::ScopedAStatus OutputStreamImpl::createMmapBuffer(
278     int32_t minBufferSizeFrames, MmapBufferInfo* info) {
279   *info = mStream->createMmapBuffer(minBufferSizeFrames);
280   return ndk::ScopedAStatus::ok();
281 }
282 
getMmapPosition(PresentationPosition * position)283 ndk::ScopedAStatus OutputStreamImpl::getMmapPosition(
284     PresentationPosition* position) {
285   *position = mStream->getMmapPosition();
286   return ndk::ScopedAStatus::ok();
287 }
288 
289 }  // namespace audio_proxy