1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "client/openscreen/platform/task_runner.h"
18 
19 #include <chrono>
20 #include <vector>
21 
22 #include <android-base/logging.h>
23 #include <android-base/threads.h>
24 #include <platform/api/time.h>
25 
26 #include "fdevent/fdevent.h"
27 
28 using android::base::ScopedLockAssertion;
29 using namespace openscreen;
30 
31 namespace mdns {
32 
AdbOspTaskRunner()33 AdbOspTaskRunner::AdbOspTaskRunner() {
34     fdevent_check_looper();
35     thread_id_ = android::base::GetThreadId();
36     task_handler_ = std::thread([this]() { TaskExecutorWorker(); });
37 }
38 
~AdbOspTaskRunner()39 AdbOspTaskRunner::~AdbOspTaskRunner() {
40     if (task_handler_.joinable()) {
41         terminate_loop_ = true;
42         cv_.notify_one();
43         task_handler_.join();
44     }
45 }
46 
PostPackagedTask(Task task)47 void AdbOspTaskRunner::PostPackagedTask(Task task) {
48     PostPackagedTaskWithDelay(std::move(task), openscreen::Clock::duration::zero());
49 }
50 
PostPackagedTaskWithDelay(Task task,Clock::duration delay)51 void AdbOspTaskRunner::PostPackagedTaskWithDelay(Task task, Clock::duration delay) {
52     auto now = std::chrono::steady_clock::now();
53     {
54         std::lock_guard<std::mutex> lock(mutex_);
55         tasks_.emplace(now + delay, std::move(task));
56     }
57     cv_.notify_one();
58 }
59 
IsRunningOnTaskRunner()60 bool AdbOspTaskRunner::IsRunningOnTaskRunner() {
61     return (thread_id_ == android::base::GetThreadId());
62 }
63 
TaskExecutorWorker()64 void AdbOspTaskRunner::TaskExecutorWorker() {
65     for (;;) {
66         {
67             // Wait until there's a task available.
68             std::unique_lock<std::mutex> lock(mutex_);
69             ScopedLockAssertion assume_locked(mutex_);
70             while (!terminate_loop_ && tasks_.empty()) {
71                 cv_.wait(lock);
72             }
73             if (terminate_loop_) {
74                 return;
75             }
76 
77             // Wait until the task with the closest time point is ready to run.
78             auto timepoint = tasks_.begin()->first;
79             while (timepoint > std::chrono::steady_clock::now()) {
80                 cv_.wait_until(lock, timepoint);
81                 // It's possible that another task with an earlier time was added
82                 // while waiting for |timepoint|.
83                 timepoint = tasks_.begin()->first;
84 
85                 if (terminate_loop_) {
86                     return;
87                 }
88             }
89         }
90 
91         // Execute all tasks whose time points have passed.
92         std::vector<Task> running_tasks;
93         {
94             std::lock_guard<std::mutex> lock(mutex_);
95 
96             while (!tasks_.empty()) {
97                 auto task_with_delay = tasks_.begin();
98                 if (task_with_delay->first > std::chrono::steady_clock::now()) {
99                     break;
100                 } else {
101                     running_tasks.emplace_back(std::move(task_with_delay->second));
102                     tasks_.erase(task_with_delay);
103                 }
104             }
105         }
106 
107         CHECK(!running_tasks.empty());
108         std::packaged_task<int()> waitable_task([&] {
109             for (Task& task : running_tasks) {
110                 task();
111             }
112             return 0;
113         });
114 
115         fdevent_run_on_looper([&]() { waitable_task(); });
116 
117         waitable_task.get_future().wait();
118     }
119 }
120 }  // namespace mdns
121