1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include <functional>
18 #include <future>
19 #include <optional>
20 #include <utility>
21 #include <vector>
22 
23 #include "aemu/base/Compiler.h"
24 #include "aemu/base/synchronization/ConditionVariable.h"
25 #include "aemu/base/threads/FunctorThread.h"
26 #include "aemu/base/synchronization/Lock.h"
27 
28 //
29 // WorkerThread<Item> encapsulates an asynchronous processing queue for objects
30 // of type Item. It manages queue memory, runs processing function in a separate
31 // thread and allows the processing function to stop it at any moment.
32 //
33 // Expected usage of the class:
34 //
35 // - Define an object to store all data for processing:
36 //      struct WorkItem { int number; };
37 //
38 // - Create a WorkerThread with processing function:
39 //      WorkerThread<WorkItem> worker([](WorkItem&& item) {
40 //          std::cout << item.number;
41 //          return item.number
42 //              ? WorkerProcessingResult::Continue
43 //              : WorkerProcessingResult::Stop;
44 //      });
45 //
46 // - Start the worker and send some data for asynchronous processing
47 //      worker.start();
48 //      worker.enqueue({1});
49 //      worker.enqueue({2});
50 //      worker.enqueue({});     // <--- this item will stop processing.
51 //      worker.join();
52 //
53 // WorkerThread<>'s all methods are thread-safe, with an expectation that the
54 // work could be added from any number of threads at once.
55 //
56 // Note: destructor calls join() implicitly - it's better to send some
57 // end-of-work marker before trying to destroy a worker thread.
58 //
59 
60 namespace android {
61 namespace base {
62 
63 // Return values for a worker thread's processing function.
64 enum class WorkerProcessingResult { Continue, Stop };
65 
66 template <class Item>
67 class WorkerThread {
68     DISALLOW_COPY_AND_ASSIGN(WorkerThread);
69 
70 public:
71     using Result = WorkerProcessingResult;
72     // A function that's called for each enqueued item in a separate thread.
73     using Processor = std::function<Result(Item&&)>;
74 
WorkerThread(Processor && processor)75     WorkerThread(Processor&& processor)
76         : mProcessor(std::move(processor)), mThread([this]() { worker(); }) {
77         mQueue.reserve(10);
78     }
~WorkerThread()79     ~WorkerThread() { join(); }
80 
81     // Starts the worker thread.
start()82     bool start() {
83         bool expectedStarted = false;
84         if (!mStarted.compare_exchange_strong(expectedStarted, true)) {
85             return true;
86         }
87         if (!mThread.start()) {
88             AutoLock lock(mLock);
89             setFinishedAndDrainTasks();
90             return false;
91         }
92         return true;
93     }
94     // Waits for all enqueue()'d items to finish or the worker stops.
waitQueuedItems()95     void waitQueuedItems() {
96         // Enqueue an empty sync command.
97         std::future<void> completeFuture = enqueueImpl(Command());
98         completeFuture.wait();
99     }
100     // Waits for worker thread to complete.
join()101     void join() { mThread.wait(); }
102 
103     // Moves the |item| into internal queue for processing. If the command is enqueued after the
104     // stop command is enqueued or before start() returns, the returned future will also be ready
105     // without processing the command.
enqueue(Item && item)106     std::future<void> enqueue(Item&& item) {
107         return enqueueImpl(Command(std::move(item)));
108     }
109 
110    private:
111     struct Command {
CommandCommand112         Command() : mWorkItem(std::nullopt) {}
CommandCommand113         Command(Item&& it) : mWorkItem(std::move(it)) {}
CommandCommand114         Command(Command&& other)
115             : mCompletedPromise(std::move(other.mCompletedPromise)),
116               mWorkItem(std::move(other.mWorkItem)) {}
117 
118         std::promise<void> mCompletedPromise;
119         std::optional<Item> mWorkItem;
120     };
121 
enqueueImpl(Command command)122     std::future<void> enqueueImpl(Command command) {
123         base::AutoLock lock(mLock);
124         // We don't enqueue any new items if mFinished is set to true.
125         if (!mStarted || mFinished) {
126             command.mCompletedPromise.set_value();
127             return command.mCompletedPromise.get_future();
128         }
129 
130         std::future<void> res = command.mCompletedPromise.get_future();
131         mQueue.emplace_back(std::move(command));
132         mCv.signalAndUnlock(&lock);
133         return res;
134     }
135 
worker()136     void worker() {
137         std::vector<Command> todo;
138         todo.reserve(10);
139         for (;;) {
140             {
141                 base::AutoLock lock(mLock);
142                 while (mQueue.empty()) {
143                     mCv.wait(&lock);
144                 }
145                 todo.swap(mQueue);
146             }
147 
148             bool shouldStop = false;
149             for (Command& item : todo) {
150                 if (!shouldStop && item.mWorkItem) {
151                     // Normal work item
152                     shouldStop = mProcessor(std::move(item.mWorkItem.value())) == Result::Stop;
153                 }
154                 item.mCompletedPromise.set_value();
155             }
156             if (shouldStop) {
157                 setFinishedAndDrainTasks();
158                 return;
159             }
160 
161             todo.clear();
162         }
163     }
164 
setFinishedAndDrainTasks()165     void setFinishedAndDrainTasks() {
166         base::AutoLock lock(mLock);
167         // Set mFinished so that no new tasks will be enqueued.
168         mFinished = true;
169         // Signal pending tasks as if they are completed.
170         for (Command& item : mQueue) {
171             item.mCompletedPromise.set_value();
172         }
173         return;
174     }
175 
176     Processor mProcessor;
177     base::FunctorThread mThread;
178     std::vector<Command> mQueue;
179     base::Lock mLock;
180     base::ConditionVariable mCv;
181 
182     std::atomic_bool mStarted = false;
183     // Must be accessd after grabbing the lock.
184     bool mFinished = false;
185 };
186 
187 }  // namespace base
188 }  // namespace android
189