1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <sys/types.h>
20 
21 #include <atomic>
22 #include <condition_variable>
23 #include <mutex>
24 #include <string>
25 #include <thread>
26 
27 #include <android-base/thread_annotations.h>
28 #include <system/thread_defs.h>
29 
30 namespace android::hardware::audio::common {
31 
32 class StreamLogic;
33 
34 namespace internal {
35 
36 class ThreadController {
37     enum class WorkerState { INITIAL, STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
38 
39   public:
ThreadController(StreamLogic * logic)40     explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
~ThreadController()41     ~ThreadController() { stop(); }
42 
43     bool start(const std::string& name, int priority);
44     // Note: 'pause' and 'resume' methods should only be used on the "driving" side.
45     // In the case of audio HAL I/O, the driving side is the client, because the HAL
46     // implementation always blocks on getting a command.
pause()47     void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
resume()48     void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
hasError()49     bool hasError() {
50         std::lock_guard<std::mutex> lock(mWorkerLock);
51         return !mError.empty();
52     }
getError()53     std::string getError() {
54         std::lock_guard<std::mutex> lock(mWorkerLock);
55         return mError;
56     }
getTid()57     pid_t getTid() {
58         std::lock_guard<std::mutex> lock(mWorkerLock);
59         return mTid;
60     }
61     void stop();
62     // Direct use of 'join' assumes that the StreamLogic is not intended
63     // to run forever, and is guaranteed to exit by itself. This normally
64     // only happen in tests.
65     void join();
66     bool waitForAtLeastOneCycle();
67 
68     // Only used by unit tests.
lockUnlockMutex(bool lock)69     void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
70         lock ? mWorkerLock.lock() : mWorkerLock.unlock();
71     }
getThreadNativeHandle()72     std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); }
73 
74   private:
75     void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
76                                WorkerState* finalState = nullptr);
77     void workerThread();
78 
79     StreamLogic* const mLogic;
80     std::string mThreadName;
81     int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
82     std::thread mWorker;
83     std::mutex mWorkerLock;
84     std::condition_variable mWorkerCv;
85     WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::INITIAL;
86     std::string mError GUARDED_BY(mWorkerLock);
87     pid_t mTid GUARDED_BY(mWorkerLock) = -1;
88     // The atomic lock-free variable is used to prevent priority inversions
89     // that can occur when a high priority worker tries to acquire the lock
90     // which has been taken by a lower priority control thread which in its turn
91     // got preempted. To prevent a PI under normal operating conditions, that is,
92     // when there are no errors or state changes, the worker does not attempt
93     // taking `mWorkerLock` unless `mWorkerStateChangeRequest` is set.
94     // To make sure that updates to `mWorkerState` and `mWorkerStateChangeRequest`
95     // are serialized, they are always made under a lock.
96     static_assert(std::atomic<bool>::is_always_lock_free);
97     std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
98 };
99 
100 // A special thread name used in tests only.
101 static const std::string kTestSingleThread = "__testST__";
102 
103 }  // namespace internal
104 
105 class StreamLogic {
106   public:
107     friend class internal::ThreadController;
108 
109     virtual ~StreamLogic() = default;
110 
111   protected:
112     enum class Status { ABORT, CONTINUE, EXIT };
113 
114     /* Called once at the beginning of the thread loop. Must return
115      * an empty string to enter the thread loop, otherwise the thread loop
116      * exits and the worker switches into the 'error' state, setting
117      * the error to the returned value.
118      */
119     virtual std::string init() = 0;
120 
121     /* Called for each thread loop unless the thread is in 'paused' state.
122      * Must return 'CONTINUE' to continue running, otherwise the thread loop
123      * exits. If the result from worker cycle is 'ABORT' then the worker switches
124      * into the 'error' state with a generic error message. It is recommended that
125      * the subclass reports any problems via logging facilities. Returning the 'EXIT'
126      * status is equivalent to calling 'stop()' method. This is just a way of
127      * of stopping the worker by its own initiative.
128      */
129     virtual Status cycle() = 0;
130 };
131 
132 template <class LogicImpl>
133 class StreamWorker : public LogicImpl {
134   public:
135     template <class... Args>
StreamWorker(Args &&...args)136     explicit StreamWorker(Args&&... args) : LogicImpl(std::forward<Args>(args)...), mThread(this) {}
137 
138     // Methods of LogicImpl are available via inheritance.
139     // Forwarded methods of ThreadController follow.
140 
141     // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
142     // The nice number is used with the default scheduler. For threads that
143     // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
144     // it is recommended to implement an appropriate configuration sequence within
145     // 'LogicImpl' or 'StreamLogic::init'.
146     bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
147         return mThread.start(name, priority);
148     }
pause()149     void pause() { mThread.pause(); }
resume()150     void resume() { mThread.resume(); }
hasError()151     bool hasError() { return mThread.hasError(); }
getError()152     std::string getError() { return mThread.getError(); }
getTid()153     pid_t getTid() { return mThread.getTid(); }
stop()154     void stop() { mThread.stop(); }
join()155     void join() { mThread.join(); }
waitForAtLeastOneCycle()156     bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
157 
158     // Only used by unit tests.
testLockUnlockMutex(bool lock)159     void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); }
testGetThreadNativeHandle()160     std::thread::native_handle_type testGetThreadNativeHandle() {
161         return mThread.getThreadNativeHandle();
162     }
163 
164   private:
165     // The ThreadController gets destroyed before LogicImpl.
166     // After the controller has been destroyed, it is guaranteed that
167     // the thread was joined, thus the 'cycle' method of LogicImpl
168     // will not be called anymore, and it is safe to destroy LogicImpl.
169     internal::ThreadController mThread;
170 };
171 
172 }  // namespace android::hardware::audio::common
173