1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "RemoteBusOutputStream.h"
16 
17 #include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
18 #include <android-base/logging.h>
19 
20 #include "RingBufferUtil.h"
21 
22 using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
23 using android::status_t;
24 
25 namespace audio_proxy {
26 namespace service {
27 namespace {
28 
29 // Time out for FMQ read in ns -- 1s.
30 constexpr int64_t kFmqReadTimeoutNs = 1'000'000'000;
31 
deleteEventFlag(EventFlag * obj)32 void deleteEventFlag(EventFlag* obj) {
33   if (!obj) {
34     return;
35   }
36 
37   status_t status = EventFlag::deleteEventFlag(&obj);
38   if (status != android::OK) {
39     LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
40   }
41 }
42 
43 }  // namespace
44 
RemoteBusOutputStream(std::shared_ptr<IOutputStream> stream,const std::string & address,const AidlAudioConfig & config,int32_t flags)45 RemoteBusOutputStream::RemoteBusOutputStream(
46     std::shared_ptr<IOutputStream> stream, const std::string& address,
47     const AidlAudioConfig& config, int32_t flags)
48     : BusOutputStream(address, config, flags),
49       mStream(std::move(stream)),
50       mEventFlag(nullptr, deleteEventFlag) {}
51 RemoteBusOutputStream::~RemoteBusOutputStream() = default;
52 
standby()53 bool RemoteBusOutputStream::standby() { return mStream->standby().isOk(); }
54 
pause()55 bool RemoteBusOutputStream::pause() { return mStream->pause().isOk(); }
56 
resume()57 bool RemoteBusOutputStream::resume() { return mStream->resume().isOk(); }
58 
drain(AidlAudioDrain drain)59 bool RemoteBusOutputStream::drain(AidlAudioDrain drain) {
60   return mStream->drain(drain).isOk();
61 }
62 
flush()63 bool RemoteBusOutputStream::flush() { return mStream->flush().isOk(); }
64 
close()65 bool RemoteBusOutputStream::close() { return mStream->close().isOk(); }
66 
setVolume(float left,float right)67 bool RemoteBusOutputStream::setVolume(float left, float right) {
68   return mStream->setVolume(left, right).isOk();
69 }
70 
availableToWrite()71 size_t RemoteBusOutputStream::availableToWrite() {
72   return mDataMQ->availableToWrite();
73 }
74 
writeRingBuffer(const uint8_t * firstMem,size_t firstLength,const uint8_t * secondMem,size_t secondLength)75 AidlWriteStatus RemoteBusOutputStream::writeRingBuffer(const uint8_t* firstMem,
76                                                        size_t firstLength,
77                                                        const uint8_t* secondMem,
78                                                        size_t secondLength) {
79   DCHECK(mDataMQ);
80   DCHECK(mStatusMQ);
81   DCHECK(mEventFlag);
82   AidlWriteStatus status;
83   DataMQ::MemTransaction tx;
84   if (!mDataMQ->beginWrite(firstLength + secondLength, &tx)) {
85     LOG(ERROR) << "Failed to begin write.";
86     return status;
87   }
88 
89   const DataMQ::MemRegion& firstRegion = tx.getFirstRegion();
90   const DataMQ::MemRegion& secondRegion = tx.getSecondRegion();
91 
92   copyRingBuffer(firstRegion.getAddress(), firstRegion.getLength(),
93                  secondRegion.getAddress(), secondRegion.getLength(),
94                  reinterpret_cast<const int8_t*>(firstMem), firstLength,
95                  reinterpret_cast<const int8_t*>(secondMem), secondLength);
96   if (!mDataMQ->commitWrite(firstLength + secondLength)) {
97     LOG(ERROR) << "Failed to commit write.";
98     return status;
99   }
100 
101   mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
102 
103   // readNotification is used to "wake" after successful read, hence we don't
104   // need it. writeNotification is used to "wait" for the other end to write
105   // enough data.
106   // It's fine to use readBlocking here because:
107   // 1. We don't wake without writing mStatusMQ.
108   // 2. The other end will always write mStatusMQ before wake mEventFlag.
109   if (!mStatusMQ->readBlocking(
110           &status, 1 /* count */, 0 /* readNotification */,
111           static_cast<uint32_t>(
112               MessageQueueFlag::NOT_FULL) /* writeNotification */,
113           kFmqReadTimeoutNs, mEventFlag.get())) {
114     LOG(ERROR) << "Failed to read status!";
115     return status;
116   }
117 
118   return status;
119 }
120 
prepareForWritingImpl(uint32_t frameSize,uint32_t frameCount)121 bool RemoteBusOutputStream::prepareForWritingImpl(uint32_t frameSize,
122                                                   uint32_t frameCount) {
123   DataMQDesc dataMQDesc;
124   StatusMQDesc statusMQDesc;
125   ndk::ScopedAStatus status = mStream->prepareForWriting(
126       frameSize, frameCount, &dataMQDesc, &statusMQDesc);
127   if (!status.isOk()) {
128     LOG(ERROR) << "prepareForWriting fails.";
129     return false;
130   }
131 
132   auto dataMQ = std::make_unique<DataMQ>(dataMQDesc);
133   if (!dataMQ->isValid()) {
134     LOG(ERROR) << "invalid data mq.";
135     return false;
136   }
137 
138   EventFlag* rawEventFlag = nullptr;
139   status_t eventFlagStatus =
140       EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
141   std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
142                                                          deleteEventFlag);
143   if (eventFlagStatus != android::OK || !eventFlag) {
144     LOG(ERROR) << "failed creating event flag for data MQ: "
145                << strerror(-eventFlagStatus);
146     return false;
147   }
148 
149   auto statusMQ = std::make_unique<StatusMQ>(statusMQDesc);
150   if (!statusMQ->isValid()) {
151     LOG(ERROR) << "invalid status mq.";
152     return false;
153   }
154 
155   mDataMQ = std::move(dataMQ);
156   mStatusMQ = std::move(statusMQ);
157   mEventFlag = std::move(eventFlag);
158   return true;
159 }
160 
start()161 bool RemoteBusOutputStream::start() { return mStream->start().isOk(); }
162 
stop()163 bool RemoteBusOutputStream::stop() { return mStream->stop().isOk(); };
164 
createMmapBuffer(int32_t minBufferSizeFrames)165 AidlMmapBufferInfo RemoteBusOutputStream::createMmapBuffer(
166     int32_t minBufferSizeFrames) {
167   AidlMmapBufferInfo info;
168   mStream->createMmapBuffer(minBufferSizeFrames, &info);
169   return info;
170 }
171 
getMmapPosition()172 AidlPresentationPosition RemoteBusOutputStream::getMmapPosition() {
173   AidlPresentationPosition position;
174   mStream->getMmapPosition(&position);
175   return position;
176 }
177 
178 }  // namespace service
179 }  // namespace audio_proxy