1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "WriteThread.h"
16 
17 #include <android-base/logging.h>
18 #include <time.h>
19 
20 #include <atomic>
21 
22 #include "AidlTypes.h"
23 #include "BusOutputStream.h"
24 
25 namespace audio_proxy::service {
26 
WriteThread(std::shared_ptr<BusOutputStream> stream,CommandMQ * commandMQ,DataMQ * dataMQ,StatusMQ * statusMQ,EventFlag * eventFlag,uint32_t latencyMs)27 WriteThread::WriteThread(std::shared_ptr<BusOutputStream> stream,
28                          CommandMQ* commandMQ, DataMQ* dataMQ,
29                          StatusMQ* statusMQ, EventFlag* eventFlag,
30                          uint32_t latencyMs)
31     : Thread(false /*canCallJava*/),
32       mStream(std::move(stream)),
33       mCommandMQ(commandMQ),
34       mDataMQ(dataMQ),
35       mStatusMQ(statusMQ),
36       mEventFlag(eventFlag),
37       mLatencyMs(latencyMs) {}
38 
39 WriteThread::~WriteThread() = default;
40 
stop()41 void WriteThread::stop() {
42   if (mStop.load(std::memory_order_relaxed)) {
43     return;
44   }
45 
46   mStop.store(true, std::memory_order_release);
47   mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
48 }
49 
updateOutputStream(std::shared_ptr<BusOutputStream> stream)50 void WriteThread::updateOutputStream(std::shared_ptr<BusOutputStream> stream) {
51   {
52     std::scoped_lock<std::mutex> lock(mStreamLock);
53     mStream = std::move(stream);
54   }
55 
56   // Assume all the written frames are already played out by the old stream.
57   std::scoped_lock<std::mutex> lock(mPositionLock);
58   mPresentationFramesOffset = mTotalWrittenFrames;
59 }
60 
getPresentationPosition()61 std::pair<uint64_t, TimeSpec> WriteThread::getPresentationPosition() {
62   std::scoped_lock<std::mutex> lock(mPositionLock);
63   return std::make_pair(mPresentationFrames, mPresentationTimestamp);
64 }
65 
doWrite(BusOutputStream * stream)66 IStreamOut::WriteStatus WriteThread::doWrite(BusOutputStream* stream) {
67   IStreamOut::WriteStatus status;
68   status.replyTo = IStreamOut::WriteCommand::WRITE;
69   status.retval = Result::INVALID_STATE;
70   status.reply.written = 0;
71 
72   const size_t availToRead = mDataMQ->availableToRead();
73   if (stream->availableToWrite() < availToRead) {
74     LOG(WARNING) << "No space to write, wait...";
75     return status;
76   }
77 
78   DataMQ::MemTransaction tx;
79   if (mDataMQ->beginRead(availToRead, &tx)) {
80     status.retval = Result::OK;
81     AidlWriteStatus writeStatus = stream->writeRingBuffer(
82         tx.getFirstRegion().getAddress(), tx.getFirstRegion().getLength(),
83         tx.getSecondRegion().getAddress(), tx.getSecondRegion().getLength());
84     if (writeStatus.written < availToRead) {
85       LOG(WARNING) << "Failed to write all the bytes to client. Written "
86                    << writeStatus.written << ", available " << availToRead;
87     }
88 
89     if (writeStatus.written < 0) {
90       writeStatus.written = 0;
91     }
92 
93     status.reply.written = writeStatus.written;
94     mDataMQ->commitRead(writeStatus.written);
95 
96     if (writeStatus.position.frames < 0 ||
97         writeStatus.position.timestamp.tvSec < 0 ||
98         writeStatus.position.timestamp.tvNSec < 0) {
99       LOG(WARNING) << "Invalid latency info.";
100       return status;
101     }
102 
103     updatePresentationPosition(writeStatus, stream);
104   }
105 
106   return status;
107 }
108 
doGetPresentationPosition() const109 IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() const {
110   IStreamOut::WriteStatus status;
111   status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION;
112   status.retval = Result::OK;
113   // Write always happens on the same thread, there's no need to lock.
114   status.reply.presentationPosition = {mPresentationFrames,
115                                        mPresentationTimestamp};
116   return status;
117 }
118 
doGetLatency() const119 IStreamOut::WriteStatus WriteThread::doGetLatency() const {
120   IStreamOut::WriteStatus status;
121   status.replyTo = IStreamOut::WriteCommand::GET_LATENCY;
122   status.retval = Result::OK;
123   // Write always happens on the same thread, there's no need to lock.
124   status.reply.latencyMs = mLatencyMs;
125   return status;
126 }
127 
threadLoop()128 bool WriteThread::threadLoop() {
129   // This implementation doesn't return control back to the Thread until the
130   // parent thread decides to stop, as the Thread uses mutexes, and this can
131   // lead to priority inversion.
132   while (!mStop.load(std::memory_order_acquire)) {
133     std::shared_ptr<BusOutputStream> stream;
134     {
135       std::scoped_lock<std::mutex> lock(mStreamLock);
136       stream = mStream;
137     }
138 
139     // Read command. Don't use readBlocking, because readBlocking will block
140     // when there's no data. When stopping the thread, there's a chance that we
141     // only wake the mEventFlag without writing any data to FMQ. In this case,
142     // readBlocking will block until timeout.
143     IStreamOut::WriteCommand replyTo;
144     uint32_t efState = 0;
145     mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY),
146                      &efState);
147     if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
148       continue;  // Nothing to do.
149     }
150     if (!mCommandMQ->read(&replyTo)) {
151       continue;  // Nothing to do.
152     }
153 
154     if (replyTo == IStreamOut::WriteCommand::WRITE) {
155       mNonWriteCommandCount = 0;
156     } else {
157       mNonWriteCommandCount++;
158     }
159 
160     IStreamOut::WriteStatus status;
161     switch (replyTo) {
162       case IStreamOut::WriteCommand::WRITE:
163         status = doWrite(stream.get());
164         break;
165       case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
166         // If we don't write data for a while, the presentation position info
167         // may not be accurate. Write 0 bytes data to the client to get the
168         // latest presentation position info.
169         if (mNonWriteCommandCount >= 3 || mNonWriteCommandCount < 0) {
170           queryPresentationPosition(stream.get());
171         }
172         status = doGetPresentationPosition();
173         break;
174       case IStreamOut::WriteCommand::GET_LATENCY:
175         status = doGetLatency();
176         break;
177       default:
178         LOG(ERROR) << "Unknown write thread command code "
179                    << static_cast<int>(replyTo);
180         status.retval = Result::NOT_SUPPORTED;
181         break;
182     }
183 
184     if (!mStatusMQ->write(&status)) {
185       LOG(ERROR) << "Status message queue write failed";
186     }
187     mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
188   }
189 
190   return false;
191 }
192 
queryPresentationPosition(BusOutputStream * stream)193 void WriteThread::queryPresentationPosition(BusOutputStream* stream) {
194     AidlWriteStatus writeStatus =
195         stream->writeRingBuffer(nullptr, 0, nullptr, 0);
196     updatePresentationPosition(writeStatus, stream);
197 }
198 
updatePresentationPosition(const AidlWriteStatus & writeStatus,BusOutputStream * stream)199 void WriteThread::updatePresentationPosition(const AidlWriteStatus& writeStatus,
200                                              BusOutputStream* stream) {
201   std::scoped_lock<std::mutex> lock(mPositionLock);
202   mPresentationFrames = mPresentationFramesOffset + writeStatus.position.frames;
203   mPresentationTimestamp = {
204     .tvSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvSec),
205     .tvNSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvNSec),
206   };
207 
208   mTotalWrittenFrames += writeStatus.written / stream->getFrameSize();
209 }
210 
211 }  // namespace audio_proxy::service
212