1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "StreamInHAL"
18 //#define LOG_NDEBUG 0
19 #define ATRACE_TAG ATRACE_TAG_AUDIO
20 
21 #include <android/log.h>
22 #include <hardware/audio.h>
23 #include <utils/Trace.h>
24 #include <memory>
25 
26 #include "StreamIn.h"
27 #include "Util.h"
28 
29 using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
30 
31 namespace android {
32 namespace hardware {
33 namespace audio {
34 namespace V2_0 {
35 namespace implementation {
36 
37 using ::android::hardware::audio::common::V2_0::ThreadInfo;
38 
39 namespace {
40 
41 class ReadThread : public Thread {
42    public:
43     // ReadThread's lifespan never exceeds StreamIn's lifespan.
ReadThread(std::atomic<bool> * stop,audio_stream_in_t * stream,StreamIn::CommandMQ * commandMQ,StreamIn::DataMQ * dataMQ,StreamIn::StatusMQ * statusMQ,EventFlag * efGroup)44     ReadThread(std::atomic<bool>* stop, audio_stream_in_t* stream,
45                StreamIn::CommandMQ* commandMQ, StreamIn::DataMQ* dataMQ,
46                StreamIn::StatusMQ* statusMQ, EventFlag* efGroup)
47         : Thread(false /*canCallJava*/),
48           mStop(stop),
49           mStream(stream),
50           mCommandMQ(commandMQ),
51           mDataMQ(dataMQ),
52           mStatusMQ(statusMQ),
53           mEfGroup(efGroup),
54           mBuffer(nullptr) {}
init()55     bool init() {
56         mBuffer.reset(new (std::nothrow) uint8_t[mDataMQ->getQuantumCount()]);
57         return mBuffer != nullptr;
58     }
~ReadThread()59     virtual ~ReadThread() {}
60 
61    private:
62     std::atomic<bool>* mStop;
63     audio_stream_in_t* mStream;
64     StreamIn::CommandMQ* mCommandMQ;
65     StreamIn::DataMQ* mDataMQ;
66     StreamIn::StatusMQ* mStatusMQ;
67     EventFlag* mEfGroup;
68     std::unique_ptr<uint8_t[]> mBuffer;
69     IStreamIn::ReadParameters mParameters;
70     IStreamIn::ReadStatus mStatus;
71 
72     bool threadLoop() override;
73 
74     void doGetCapturePosition();
75     void doRead();
76 };
77 
doRead()78 void ReadThread::doRead() {
79     size_t availableToWrite = mDataMQ->availableToWrite();
80     size_t requestedToRead = mParameters.params.read;
81     if (requestedToRead > availableToWrite) {
82         ALOGW(
83             "truncating read data from %d to %d due to insufficient data queue "
84             "space",
85             (int32_t)requestedToRead, (int32_t)availableToWrite);
86         requestedToRead = availableToWrite;
87     }
88     ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead);
89     mStatus.retval = Result::OK;
90     uint64_t read = 0;
91     if (readResult >= 0) {
92         mStatus.reply.read = readResult;
93         if (!mDataMQ->write(&mBuffer[0], readResult)) {
94             ALOGW("data message queue write failed");
95         }
96     } else {
97         mStatus.retval = Stream::analyzeStatus("read", readResult);
98     }
99 }
100 
doGetCapturePosition()101 void ReadThread::doGetCapturePosition() {
102     mStatus.retval = StreamIn::getCapturePositionImpl(
103         mStream, &mStatus.reply.capturePosition.frames,
104         &mStatus.reply.capturePosition.time);
105 }
106 
threadLoop()107 bool ReadThread::threadLoop() {
108     // This implementation doesn't return control back to the Thread until it
109     // decides to stop,
110     // as the Thread uses mutexes, and this can lead to priority inversion.
111     while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
112         uint32_t efState = 0;
113         mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL),
114                        &efState);
115         if (!(efState &
116               static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) {
117             continue;  // Nothing to do.
118         }
119         if (!mCommandMQ->read(&mParameters)) {
120             continue;  // Nothing to do.
121         }
122         mStatus.replyTo = mParameters.command;
123         switch (mParameters.command) {
124             case IStreamIn::ReadCommand::READ:
125                 doRead();
126                 break;
127             case IStreamIn::ReadCommand::GET_CAPTURE_POSITION:
128                 doGetCapturePosition();
129                 break;
130             default:
131                 ALOGE("Unknown read thread command code %d",
132                       mParameters.command);
133                 mStatus.retval = Result::NOT_SUPPORTED;
134                 break;
135         }
136         if (!mStatusMQ->write(&mStatus)) {
137             ALOGW("status message queue write failed");
138         }
139         mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
140     }
141 
142     return false;
143 }
144 
145 }  // namespace
146 
StreamIn(const sp<Device> & device,audio_stream_in_t * stream)147 StreamIn::StreamIn(const sp<Device>& device, audio_stream_in_t* stream)
148     : mIsClosed(false),
149       mDevice(device),
150       mStream(stream),
151       mStreamCommon(new Stream(&stream->common)),
152       mStreamMmap(new StreamMmap<audio_stream_in_t>(stream)),
153       mEfGroup(nullptr),
154       mStopReadThread(false) {}
155 
~StreamIn()156 StreamIn::~StreamIn() {
157     ATRACE_CALL();
158     close();
159     if (mReadThread.get()) {
160         ATRACE_NAME("mReadThread->join");
161         status_t status = mReadThread->join();
162         ALOGE_IF(status, "read thread exit error: %s", strerror(-status));
163     }
164     if (mEfGroup) {
165         status_t status = EventFlag::deleteEventFlag(&mEfGroup);
166         ALOGE_IF(status, "read MQ event flag deletion error: %s",
167                  strerror(-status));
168     }
169     mDevice->closeInputStream(mStream);
170     mStream = nullptr;
171 }
172 
173 // Methods from ::android::hardware::audio::V2_0::IStream follow.
getFrameSize()174 Return<uint64_t> StreamIn::getFrameSize() {
175     return audio_stream_in_frame_size(mStream);
176 }
177 
getFrameCount()178 Return<uint64_t> StreamIn::getFrameCount() {
179     return mStreamCommon->getFrameCount();
180 }
181 
getBufferSize()182 Return<uint64_t> StreamIn::getBufferSize() {
183     return mStreamCommon->getBufferSize();
184 }
185 
getSampleRate()186 Return<uint32_t> StreamIn::getSampleRate() {
187     return mStreamCommon->getSampleRate();
188 }
189 
getSupportedSampleRates(getSupportedSampleRates_cb _hidl_cb)190 Return<void> StreamIn::getSupportedSampleRates(
191     getSupportedSampleRates_cb _hidl_cb) {
192     return mStreamCommon->getSupportedSampleRates(_hidl_cb);
193 }
194 
setSampleRate(uint32_t sampleRateHz)195 Return<Result> StreamIn::setSampleRate(uint32_t sampleRateHz) {
196     return mStreamCommon->setSampleRate(sampleRateHz);
197 }
198 
getChannelMask()199 Return<AudioChannelMask> StreamIn::getChannelMask() {
200     return mStreamCommon->getChannelMask();
201 }
202 
getSupportedChannelMasks(getSupportedChannelMasks_cb _hidl_cb)203 Return<void> StreamIn::getSupportedChannelMasks(
204     getSupportedChannelMasks_cb _hidl_cb) {
205     return mStreamCommon->getSupportedChannelMasks(_hidl_cb);
206 }
207 
setChannelMask(AudioChannelMask mask)208 Return<Result> StreamIn::setChannelMask(AudioChannelMask mask) {
209     return mStreamCommon->setChannelMask(mask);
210 }
211 
getFormat()212 Return<AudioFormat> StreamIn::getFormat() {
213     return mStreamCommon->getFormat();
214 }
215 
getSupportedFormats(getSupportedFormats_cb _hidl_cb)216 Return<void> StreamIn::getSupportedFormats(getSupportedFormats_cb _hidl_cb) {
217     return mStreamCommon->getSupportedFormats(_hidl_cb);
218 }
219 
setFormat(AudioFormat format)220 Return<Result> StreamIn::setFormat(AudioFormat format) {
221     return mStreamCommon->setFormat(format);
222 }
223 
getAudioProperties(getAudioProperties_cb _hidl_cb)224 Return<void> StreamIn::getAudioProperties(getAudioProperties_cb _hidl_cb) {
225     return mStreamCommon->getAudioProperties(_hidl_cb);
226 }
227 
addEffect(uint64_t effectId)228 Return<Result> StreamIn::addEffect(uint64_t effectId) {
229     return mStreamCommon->addEffect(effectId);
230 }
231 
removeEffect(uint64_t effectId)232 Return<Result> StreamIn::removeEffect(uint64_t effectId) {
233     return mStreamCommon->removeEffect(effectId);
234 }
235 
standby()236 Return<Result> StreamIn::standby() {
237     return mStreamCommon->standby();
238 }
239 
getDevice()240 Return<AudioDevice> StreamIn::getDevice() {
241     return mStreamCommon->getDevice();
242 }
243 
setDevice(const DeviceAddress & address)244 Return<Result> StreamIn::setDevice(const DeviceAddress& address) {
245     return mStreamCommon->setDevice(address);
246 }
247 
setConnectedState(const DeviceAddress & address,bool connected)248 Return<Result> StreamIn::setConnectedState(const DeviceAddress& address,
249                                            bool connected) {
250     return mStreamCommon->setConnectedState(address, connected);
251 }
252 
setHwAvSync(uint32_t hwAvSync)253 Return<Result> StreamIn::setHwAvSync(uint32_t hwAvSync) {
254     return mStreamCommon->setHwAvSync(hwAvSync);
255 }
256 
getParameters(const hidl_vec<hidl_string> & keys,getParameters_cb _hidl_cb)257 Return<void> StreamIn::getParameters(const hidl_vec<hidl_string>& keys,
258                                      getParameters_cb _hidl_cb) {
259     return mStreamCommon->getParameters(keys, _hidl_cb);
260 }
261 
setParameters(const hidl_vec<ParameterValue> & parameters)262 Return<Result> StreamIn::setParameters(
263     const hidl_vec<ParameterValue>& parameters) {
264     return mStreamCommon->setParameters(parameters);
265 }
266 
debugDump(const hidl_handle & fd)267 Return<void> StreamIn::debugDump(const hidl_handle& fd) {
268     return mStreamCommon->debugDump(fd);
269 }
270 
start()271 Return<Result> StreamIn::start() {
272     return mStreamMmap->start();
273 }
274 
stop()275 Return<Result> StreamIn::stop() {
276     return mStreamMmap->stop();
277 }
278 
createMmapBuffer(int32_t minSizeFrames,createMmapBuffer_cb _hidl_cb)279 Return<void> StreamIn::createMmapBuffer(int32_t minSizeFrames,
280                                         createMmapBuffer_cb _hidl_cb) {
281     return mStreamMmap->createMmapBuffer(
282         minSizeFrames, audio_stream_in_frame_size(mStream), _hidl_cb);
283 }
284 
getMmapPosition(getMmapPosition_cb _hidl_cb)285 Return<void> StreamIn::getMmapPosition(getMmapPosition_cb _hidl_cb) {
286     return mStreamMmap->getMmapPosition(_hidl_cb);
287 }
288 
close()289 Return<Result> StreamIn::close() {
290     if (mIsClosed) return Result::INVALID_STATE;
291     mIsClosed = true;
292     if (mReadThread.get()) {
293         mStopReadThread.store(true, std::memory_order_release);
294     }
295     if (mEfGroup) {
296         mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
297     }
298     return Result::OK;
299 }
300 
301 // Methods from ::android::hardware::audio::V2_0::IStreamIn follow.
getAudioSource(getAudioSource_cb _hidl_cb)302 Return<void> StreamIn::getAudioSource(getAudioSource_cb _hidl_cb) {
303     int halSource;
304     Result retval =
305         mStreamCommon->getParam(AudioParameter::keyInputSource, &halSource);
306     AudioSource source(AudioSource::DEFAULT);
307     if (retval == Result::OK) {
308         source = AudioSource(halSource);
309     }
310     _hidl_cb(retval, source);
311     return Void();
312 }
313 
setGain(float gain)314 Return<Result> StreamIn::setGain(float gain) {
315     if (!isGainNormalized(gain)) {
316         ALOGW("Can not set a stream input gain (%f) outside [0,1]", gain);
317         return Result::INVALID_ARGUMENTS;
318     }
319     return Stream::analyzeStatus("set_gain", mStream->set_gain(mStream, gain));
320 }
321 
prepareForReading(uint32_t frameSize,uint32_t framesCount,prepareForReading_cb _hidl_cb)322 Return<void> StreamIn::prepareForReading(uint32_t frameSize,
323                                          uint32_t framesCount,
324                                          prepareForReading_cb _hidl_cb) {
325     status_t status;
326     ThreadInfo threadInfo = {0, 0};
327 
328     // Wrap the _hidl_cb to return an error
329     auto sendError = [this, &threadInfo, &_hidl_cb](Result result) {
330         _hidl_cb(result, CommandMQ::Descriptor(), DataMQ::Descriptor(),
331                  StatusMQ::Descriptor(), threadInfo);
332 
333     };
334 
335     // Create message queues.
336     if (mDataMQ) {
337         ALOGE("the client attempts to call prepareForReading twice");
338         sendError(Result::INVALID_STATE);
339         return Void();
340     }
341     std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1));
342 
343     // Check frameSize and framesCount
344     if (frameSize == 0 || framesCount == 0) {
345         ALOGE("Null frameSize (%u) or framesCount (%u)", frameSize,
346               framesCount);
347         sendError(Result::INVALID_ARGUMENTS);
348         return Void();
349     }
350     // A message queue asserts if it can not handle the requested buffer,
351     // thus the client has to guess the maximum size it can handle
352     // Choose an arbitrary margin for the overhead of a message queue
353     size_t metadataOverhead = 100000;
354     if (frameSize >
355         (std::numeric_limits<size_t>::max() - metadataOverhead) / framesCount) {
356         ALOGE("Buffer too big: %u*%u bytes can not fit in a message queue",
357               frameSize, framesCount);
358         sendError(Result::INVALID_ARGUMENTS);
359         return Void();
360     }
361     std::unique_ptr<DataMQ> tempDataMQ(
362         new DataMQ(frameSize * framesCount, true /* EventFlag */));
363 
364     std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
365     if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() ||
366         !tempStatusMQ->isValid()) {
367         ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid");
368         ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
369         ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
370         sendError(Result::INVALID_ARGUMENTS);
371         return Void();
372     }
373     EventFlag* tempRawEfGroup{};
374     status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(),
375                                         &tempRawEfGroup);
376     std::unique_ptr<EventFlag, void (*)(EventFlag*)> tempElfGroup(
377         tempRawEfGroup, [](auto* ef) { EventFlag::deleteEventFlag(&ef); });
378     if (status != OK || !tempElfGroup) {
379         ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
380         sendError(Result::INVALID_ARGUMENTS);
381         return Void();
382     }
383 
384     // Create and launch the thread.
385     auto tempReadThread = std::make_unique<ReadThread>(
386         &mStopReadThread, mStream, tempCommandMQ.get(), tempDataMQ.get(),
387         tempStatusMQ.get(), tempElfGroup.get());
388     if (!tempReadThread->init()) {
389         ALOGW("failed to start reader thread: %s", strerror(-status));
390         sendError(Result::INVALID_ARGUMENTS);
391         return Void();
392     }
393     status = tempReadThread->run("reader", PRIORITY_URGENT_AUDIO);
394     if (status != OK) {
395         ALOGW("failed to start reader thread: %s", strerror(-status));
396         sendError(Result::INVALID_ARGUMENTS);
397         return Void();
398     }
399 
400     mCommandMQ = std::move(tempCommandMQ);
401     mDataMQ = std::move(tempDataMQ);
402     mStatusMQ = std::move(tempStatusMQ);
403     mReadThread = tempReadThread.release();
404     mEfGroup = tempElfGroup.release();
405     threadInfo.pid = getpid();
406     threadInfo.tid = mReadThread->getTid();
407     _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(),
408              *mStatusMQ->getDesc(), threadInfo);
409     return Void();
410 }
411 
getInputFramesLost()412 Return<uint32_t> StreamIn::getInputFramesLost() {
413     return mStream->get_input_frames_lost(mStream);
414 }
415 
416 // static
getCapturePositionImpl(audio_stream_in_t * stream,uint64_t * frames,uint64_t * time)417 Result StreamIn::getCapturePositionImpl(audio_stream_in_t* stream,
418                                         uint64_t* frames, uint64_t* time) {
419     // HAL may have a stub function, always returning ENOSYS, don't
420     // spam the log in this case.
421     static const std::vector<int> ignoredErrors{ENOSYS};
422     Result retval(Result::NOT_SUPPORTED);
423     if (stream->get_capture_position != NULL) return retval;
424     int64_t halFrames, halTime;
425     retval = Stream::analyzeStatus("get_capture_position",
426                                    stream->get_capture_position(stream, &halFrames, &halTime),
427                                    ignoredErrors);
428     if (retval == Result::OK) {
429         *frames = halFrames;
430         *time = halTime;
431     }
432     return retval;
433 };
434 
getCapturePosition(getCapturePosition_cb _hidl_cb)435 Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) {
436     uint64_t frames = 0, time = 0;
437     Result retval = getCapturePositionImpl(mStream, &frames, &time);
438     _hidl_cb(retval, frames, time);
439     return Void();
440 }
441 
442 }  // namespace implementation
443 }  // namespace V2_0
444 }  // namespace audio
445 }  // namespace hardware
446 }  // namespace android
447