1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 #include "host/commands/modem_simulator/thread_looper.h"
17 
18 #include <android-base/logging.h>
19 
20 namespace cuttlefish {
21 
ThreadLooper()22 ThreadLooper::ThreadLooper()
23   :   stopped_(false), next_serial_(1) {
24   looper_thread_ = std::thread([this]() { ThreadLoop(); });
25 }
26 
~ThreadLooper()27 ThreadLooper::~ThreadLooper() { Stop(); }
28 
operator <=(const Event & other) const29 bool ThreadLooper::Event::operator<=(const Event &other) const {
30   return when <= other.when;
31 }
32 
Post(Callback cb)33 ThreadLooper::Serial ThreadLooper::Post(Callback cb) {
34   CHECK(cb != nullptr);
35 
36   auto serial = next_serial_++;
37   // If it's the time to process event with delay exactly when posting
38   // a event without delay. Looper would process the event without delay firstly
39   // if when set to be std::nullptr. so set when_ to be now.
40   Insert({ std::chrono::steady_clock::now(), cb, serial });
41 
42   return serial;
43 }
44 
Post(Callback cb,std::chrono::steady_clock::duration delay)45 ThreadLooper::Serial ThreadLooper::Post(
46     Callback cb, std::chrono::steady_clock::duration delay) {
47   CHECK(cb != nullptr);
48 
49   auto serial = next_serial_++;
50   Insert({ std::chrono::steady_clock::now() + delay, cb, serial });
51 
52   return serial;
53 }
54 
CancelSerial(Serial serial)55 bool ThreadLooper::CancelSerial(Serial serial) {
56   std::lock_guard<std::mutex> autolock(lock_);
57 
58   bool found = false;
59   for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
60     if (iter->serial == serial) {
61       queue_.erase(iter);
62       cond_.notify_all();
63 
64       found = true;
65       break;
66     }
67   }
68 
69   return found;
70 }
71 
Insert(const Event & event)72 void ThreadLooper::Insert(const Event &event) {
73   std::lock_guard<std::mutex> autolock(lock_);
74 
75   auto iter = queue_.begin();
76   while (iter != queue_.end() && *iter <= event) {
77     ++iter;
78   }
79 
80   queue_.insert(iter, event);
81   cond_.notify_all();
82 }
83 
ThreadLoop()84 void ThreadLooper::ThreadLoop() {
85   for(;;) {
86     Callback cb;
87     {
88       std::unique_lock<std::mutex> lock(lock_);
89 
90       if (stopped_) {
91         break;
92       }
93 
94       if (queue_.empty()) {
95         cond_.wait(lock);
96         continue;
97       }
98 
99       auto time_to_wait = queue_.front().when - std::chrono::steady_clock::now();
100       if (time_to_wait.count() > 0) {
101         // wait with timeout
102         auto durationMs =
103             std::chrono::duration_cast<std::chrono::milliseconds>(time_to_wait);
104         cond_.wait_for(lock, durationMs);
105         continue;
106       }
107       cb = queue_.front().cb; // callback at front of queue
108       queue_.pop_front();
109     }
110     cb();
111   }
112 }
113 
Stop()114 void ThreadLooper::Stop() {
115   if (stopped_) {
116     return;
117   }
118   CHECK(looper_thread_.get_id() != std::this_thread::get_id())
119       << "Destructor called from looper thread";
120   {
121     std::lock_guard<std::mutex> autolock(lock_);
122     stopped_ = true;
123   }
124   cond_.notify_all();
125   if (looper_thread_.joinable()) {
126     looper_thread_.join();
127   }
128 }
129 
130 }  // namespace cuttlefish
131