1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "WriteThread.h"
16
17 #include <android-base/logging.h>
18 #include <time.h>
19
20 #include <atomic>
21
22 #include "AidlTypes.h"
23 #include "BusOutputStream.h"
24
25 namespace audio_proxy::service {
26
WriteThread(std::shared_ptr<BusOutputStream> stream,CommandMQ * commandMQ,DataMQ * dataMQ,StatusMQ * statusMQ,EventFlag * eventFlag,uint32_t latencyMs)27 WriteThread::WriteThread(std::shared_ptr<BusOutputStream> stream,
28 CommandMQ* commandMQ, DataMQ* dataMQ,
29 StatusMQ* statusMQ, EventFlag* eventFlag,
30 uint32_t latencyMs)
31 : Thread(false /*canCallJava*/),
32 mStream(std::move(stream)),
33 mCommandMQ(commandMQ),
34 mDataMQ(dataMQ),
35 mStatusMQ(statusMQ),
36 mEventFlag(eventFlag),
37 mLatencyMs(latencyMs) {}
38
39 WriteThread::~WriteThread() = default;
40
stop()41 void WriteThread::stop() {
42 if (mStop.load(std::memory_order_relaxed)) {
43 return;
44 }
45
46 mStop.store(true, std::memory_order_release);
47 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
48 }
49
updateOutputStream(std::shared_ptr<BusOutputStream> stream)50 void WriteThread::updateOutputStream(std::shared_ptr<BusOutputStream> stream) {
51 {
52 std::scoped_lock<std::mutex> lock(mStreamLock);
53 mStream = std::move(stream);
54 }
55
56 // Assume all the written frames are already played out by the old stream.
57 std::scoped_lock<std::mutex> lock(mPositionLock);
58 mPresentationFramesOffset = mTotalWrittenFrames;
59 }
60
getPresentationPosition()61 std::pair<uint64_t, TimeSpec> WriteThread::getPresentationPosition() {
62 std::scoped_lock<std::mutex> lock(mPositionLock);
63 return std::make_pair(mPresentationFrames, mPresentationTimestamp);
64 }
65
doWrite(BusOutputStream * stream)66 IStreamOut::WriteStatus WriteThread::doWrite(BusOutputStream* stream) {
67 IStreamOut::WriteStatus status;
68 status.replyTo = IStreamOut::WriteCommand::WRITE;
69 status.retval = Result::INVALID_STATE;
70 status.reply.written = 0;
71
72 const size_t availToRead = mDataMQ->availableToRead();
73 if (stream->availableToWrite() < availToRead) {
74 LOG(WARNING) << "No space to write, wait...";
75 return status;
76 }
77
78 DataMQ::MemTransaction tx;
79 if (mDataMQ->beginRead(availToRead, &tx)) {
80 status.retval = Result::OK;
81 AidlWriteStatus writeStatus = stream->writeRingBuffer(
82 tx.getFirstRegion().getAddress(), tx.getFirstRegion().getLength(),
83 tx.getSecondRegion().getAddress(), tx.getSecondRegion().getLength());
84 if (writeStatus.written < availToRead) {
85 LOG(WARNING) << "Failed to write all the bytes to client. Written "
86 << writeStatus.written << ", available " << availToRead;
87 }
88
89 if (writeStatus.written < 0) {
90 writeStatus.written = 0;
91 }
92
93 status.reply.written = writeStatus.written;
94 mDataMQ->commitRead(writeStatus.written);
95
96 if (writeStatus.position.frames < 0 ||
97 writeStatus.position.timestamp.tvSec < 0 ||
98 writeStatus.position.timestamp.tvNSec < 0) {
99 LOG(WARNING) << "Invalid latency info.";
100 return status;
101 }
102
103 updatePresentationPosition(writeStatus, stream);
104 }
105
106 return status;
107 }
108
doGetPresentationPosition() const109 IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() const {
110 IStreamOut::WriteStatus status;
111 status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION;
112 status.retval = Result::OK;
113 // Write always happens on the same thread, there's no need to lock.
114 status.reply.presentationPosition = {mPresentationFrames,
115 mPresentationTimestamp};
116 return status;
117 }
118
doGetLatency() const119 IStreamOut::WriteStatus WriteThread::doGetLatency() const {
120 IStreamOut::WriteStatus status;
121 status.replyTo = IStreamOut::WriteCommand::GET_LATENCY;
122 status.retval = Result::OK;
123 // Write always happens on the same thread, there's no need to lock.
124 status.reply.latencyMs = mLatencyMs;
125 return status;
126 }
127
threadLoop()128 bool WriteThread::threadLoop() {
129 // This implementation doesn't return control back to the Thread until the
130 // parent thread decides to stop, as the Thread uses mutexes, and this can
131 // lead to priority inversion.
132 while (!mStop.load(std::memory_order_acquire)) {
133 std::shared_ptr<BusOutputStream> stream;
134 {
135 std::scoped_lock<std::mutex> lock(mStreamLock);
136 stream = mStream;
137 }
138
139 // Read command. Don't use readBlocking, because readBlocking will block
140 // when there's no data. When stopping the thread, there's a chance that we
141 // only wake the mEventFlag without writing any data to FMQ. In this case,
142 // readBlocking will block until timeout.
143 IStreamOut::WriteCommand replyTo;
144 uint32_t efState = 0;
145 mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY),
146 &efState);
147 if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
148 continue; // Nothing to do.
149 }
150 if (!mCommandMQ->read(&replyTo)) {
151 continue; // Nothing to do.
152 }
153
154 if (replyTo == IStreamOut::WriteCommand::WRITE) {
155 mNonWriteCommandCount = 0;
156 } else {
157 mNonWriteCommandCount++;
158 }
159
160 IStreamOut::WriteStatus status;
161 switch (replyTo) {
162 case IStreamOut::WriteCommand::WRITE:
163 status = doWrite(stream.get());
164 break;
165 case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
166 // If we don't write data for a while, the presentation position info
167 // may not be accurate. Write 0 bytes data to the client to get the
168 // latest presentation position info.
169 if (mNonWriteCommandCount >= 3 || mNonWriteCommandCount < 0) {
170 queryPresentationPosition(stream.get());
171 }
172 status = doGetPresentationPosition();
173 break;
174 case IStreamOut::WriteCommand::GET_LATENCY:
175 status = doGetLatency();
176 break;
177 default:
178 LOG(ERROR) << "Unknown write thread command code "
179 << static_cast<int>(replyTo);
180 status.retval = Result::NOT_SUPPORTED;
181 break;
182 }
183
184 if (!mStatusMQ->write(&status)) {
185 LOG(ERROR) << "Status message queue write failed";
186 }
187 mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
188 }
189
190 return false;
191 }
192
queryPresentationPosition(BusOutputStream * stream)193 void WriteThread::queryPresentationPosition(BusOutputStream* stream) {
194 AidlWriteStatus writeStatus =
195 stream->writeRingBuffer(nullptr, 0, nullptr, 0);
196 updatePresentationPosition(writeStatus, stream);
197 }
198
updatePresentationPosition(const AidlWriteStatus & writeStatus,BusOutputStream * stream)199 void WriteThread::updatePresentationPosition(const AidlWriteStatus& writeStatus,
200 BusOutputStream* stream) {
201 std::scoped_lock<std::mutex> lock(mPositionLock);
202 mPresentationFrames = mPresentationFramesOffset + writeStatus.position.frames;
203 mPresentationTimestamp = {
204 .tvSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvSec),
205 .tvNSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvNSec),
206 };
207
208 mTotalWrittenFrames += writeStatus.written / stream->getFrameSize();
209 }
210
211 } // namespace audio_proxy::service
212