1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <sched.h>
20 
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24 
25 template <typename Impl>
26 class StreamWorker {
27     enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };
28 
29   public:
30     StreamWorker() = default;
~StreamWorker()31     ~StreamWorker() { stop(); }
start()32     bool start() {
33         mWorker = std::thread(&StreamWorker::workerThread, this);
34         std::unique_lock<std::mutex> lock(mWorkerLock);
35         mWorkerCv.wait(lock, [&] { return mWorkerState != WorkerState::STOPPED; });
36         return mWorkerState == WorkerState::RUNNING;
37     }
pause()38     void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
resume()39     void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
hasError()40     bool hasError() {
41         std::lock_guard<std::mutex> lock(mWorkerLock);
42         return mWorkerState == WorkerState::ERROR;
43     }
stop()44     void stop() {
45         {
46             std::lock_guard<std::mutex> lock(mWorkerLock);
47             if (mWorkerState == WorkerState::STOPPED) return;
48             mWorkerState = WorkerState::STOPPED;
49         }
50         if (mWorker.joinable()) {
51             mWorker.join();
52         }
53     }
waitForAtLeastOneCycle()54     bool waitForAtLeastOneCycle() {
55         WorkerState newState;
56         switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
57         if (newState != WorkerState::PAUSED) return false;
58         switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
59         return newState == WorkerState::RUNNING;
60     }
61 
62     // Methods that need to be provided by subclasses:
63     //
64     // Called once at the beginning of the thread loop. Must return
65     // 'true' to enter the thread loop, otherwise the thread loop
66     // exits and the worker switches into the 'error' state.
67     // bool workerInit();
68     //
69     // Called for each thread loop unless the thread is in 'paused' state.
70     // Must return 'true' to continue running, otherwise the thread loop
71     // exits and the worker switches into the 'error' state.
72     // bool workerCycle();
73 
74   private:
75     void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
76                                WorkerState* finalState = nullptr) {
77         std::unique_lock<std::mutex> lock(mWorkerLock);
78         if (mWorkerState != oldState) {
79             if (finalState) *finalState = mWorkerState;
80             return;
81         }
82         mWorkerState = newState;
83         mWorkerCv.wait(lock, [&] { return mWorkerState != newState; });
84         if (finalState) *finalState = mWorkerState;
85     }
workerThread()86     void workerThread() {
87         bool success = static_cast<Impl*>(this)->workerInit();
88         {
89             std::lock_guard<std::mutex> lock(mWorkerLock);
90             mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
91         }
92         mWorkerCv.notify_one();
93         if (!success) return;
94 
95         for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
96             bool needToNotify = false;
97             if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
98                                              : (sched_yield(), true)) {
99                 //
100                 // Pause and resume are synchronous. One worker cycle must complete
101                 // before the worker indicates a state change. This is how 'mWorkerState' and
102                 // 'state' interact:
103                 //
104                 // mWorkerState == RUNNING
105                 // client sets mWorkerState := PAUSE_REQUESTED
106                 // last workerCycle gets executed, state := mWorkerState := PAUSED by us
107                 //   (or the workers enters the 'error' state if workerCycle fails)
108                 // client gets notified about state change in any case
109                 // thread is doing a busy wait while 'state == PAUSED'
110                 // client sets mWorkerState := RESUME_REQUESTED
111                 // state := mWorkerState (RESUME_REQUESTED)
112                 // mWorkerState := RUNNING, but we don't notify the client yet
113                 // first workerCycle gets executed, the code below triggers a client notification
114                 //   (or if workerCycle fails, worker enters 'error' state and also notifies)
115                 // state := mWorkerState (RUNNING)
116                 if (state == WorkerState::RESUME_REQUESTED) {
117                     needToNotify = true;
118                 }
119                 std::lock_guard<std::mutex> lock(mWorkerLock);
120                 state = mWorkerState;
121                 if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
122                     state = mWorkerState = WorkerState::PAUSED;
123                     needToNotify = true;
124                 } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
125                     mWorkerState = WorkerState::RUNNING;
126                 }
127             } else {
128                 std::lock_guard<std::mutex> lock(mWorkerLock);
129                 if (state == WorkerState::RESUME_REQUESTED ||
130                     mWorkerState == WorkerState::PAUSE_REQUESTED) {
131                     needToNotify = true;
132                 }
133                 mWorkerState = WorkerState::ERROR;
134                 state = WorkerState::STOPPED;
135             }
136             if (needToNotify) {
137                 mWorkerCv.notify_one();
138             }
139         }
140     }
141 
142     std::thread mWorker;
143     std::mutex mWorkerLock;
144     std::condition_variable mWorkerCv;
145     WorkerState mWorkerState = WorkerState::STOPPED;  // GUARDED_BY(mWorkerLock);
146 };
147