1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 #include "host/commands/modem_simulator/thread_looper.h"
17
18 #include <android-base/logging.h>
19
20 namespace cuttlefish {
21
ThreadLooper()22 ThreadLooper::ThreadLooper()
23 : stopped_(false), next_serial_(1) {
24 looper_thread_ = std::thread([this]() { ThreadLoop(); });
25 }
26
~ThreadLooper()27 ThreadLooper::~ThreadLooper() { Stop(); }
28
operator <=(const Event & other) const29 bool ThreadLooper::Event::operator<=(const Event &other) const {
30 return when <= other.when;
31 }
32
Post(Callback cb)33 ThreadLooper::Serial ThreadLooper::Post(Callback cb) {
34 CHECK(cb != nullptr);
35
36 auto serial = next_serial_++;
37 // If it's the time to process event with delay exactly when posting
38 // a event without delay. Looper would process the event without delay firstly
39 // if when set to be std::nullptr. so set when_ to be now.
40 Insert({ std::chrono::steady_clock::now(), cb, serial });
41
42 return serial;
43 }
44
Post(Callback cb,std::chrono::steady_clock::duration delay)45 ThreadLooper::Serial ThreadLooper::Post(
46 Callback cb, std::chrono::steady_clock::duration delay) {
47 CHECK(cb != nullptr);
48
49 auto serial = next_serial_++;
50 Insert({ std::chrono::steady_clock::now() + delay, cb, serial });
51
52 return serial;
53 }
54
CancelSerial(Serial serial)55 bool ThreadLooper::CancelSerial(Serial serial) {
56 std::lock_guard<std::mutex> autolock(lock_);
57
58 bool found = false;
59 for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
60 if (iter->serial == serial) {
61 queue_.erase(iter);
62 cond_.notify_all();
63
64 found = true;
65 break;
66 }
67 }
68
69 return found;
70 }
71
Insert(const Event & event)72 void ThreadLooper::Insert(const Event &event) {
73 std::lock_guard<std::mutex> autolock(lock_);
74
75 auto iter = queue_.begin();
76 while (iter != queue_.end() && *iter <= event) {
77 ++iter;
78 }
79
80 queue_.insert(iter, event);
81 cond_.notify_all();
82 }
83
ThreadLoop()84 void ThreadLooper::ThreadLoop() {
85 for(;;) {
86 Callback cb;
87 {
88 std::unique_lock<std::mutex> lock(lock_);
89
90 if (stopped_) {
91 break;
92 }
93
94 if (queue_.empty()) {
95 cond_.wait(lock);
96 continue;
97 }
98
99 auto time_to_wait = queue_.front().when - std::chrono::steady_clock::now();
100 if (time_to_wait.count() > 0) {
101 // wait with timeout
102 auto durationMs =
103 std::chrono::duration_cast<std::chrono::milliseconds>(time_to_wait);
104 cond_.wait_for(lock, durationMs);
105 continue;
106 }
107 cb = queue_.front().cb; // callback at front of queue
108 queue_.pop_front();
109 }
110 cb();
111 }
112 }
113
Stop()114 void ThreadLooper::Stop() {
115 if (stopped_) {
116 return;
117 }
118 CHECK(looper_thread_.get_id() != std::this_thread::get_id())
119 << "Destructor called from looper thread";
120 {
121 std::lock_guard<std::mutex> autolock(lock_);
122 stopped_ = true;
123 }
124 cond_.notify_all();
125 if (looper_thread_.joinable()) {
126 looper_thread_.join();
127 }
128 }
129
130 } // namespace cuttlefish
131