1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "examples/provider/event_task_runner.h"
6
7 #include <signal.h>
8
9 namespace weave {
10 namespace examples {
11
12 namespace {
13 event_base* g_event_base = nullptr;
14 }
15
PostDelayedTask(const tracked_objects::Location & from_here,const base::Closure & task,base::TimeDelta delay)16 void EventTaskRunner::PostDelayedTask(
17 const tracked_objects::Location& from_here,
18 const base::Closure& task,
19 base::TimeDelta delay) {
20 base::Time new_time = base::Time::Now() + delay;
21 if (queue_.empty() || new_time < queue_.top().first.first) {
22 ReScheduleEvent(delay);
23 }
24 queue_.emplace(std::make_pair(new_time, ++counter_), task);
25 }
26
AddIoCompletionTask(int fd,int16_t what,const EventTaskRunner::IoCompletionCallback & task)27 void EventTaskRunner::AddIoCompletionTask(
28 int fd,
29 int16_t what,
30 const EventTaskRunner::IoCompletionCallback& task) {
31 int16_t flags = EV_PERSIST | EV_ET;
32 flags |= (what & kReadable) ? EV_READ : 0;
33 flags |= (what & kWriteable) ? EV_WRITE : 0;
34 #if LIBEVENT_VERSION_NUMBER >= 0x02010400
35 flags |= (what & kClosed) ? EV_CLOSED : 0;
36 #endif
37 event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this);
38 EventPtr<event> ioeventPtr{ioevent};
39 fd_task_map_.insert(
40 std::make_pair(fd, std::make_pair(std::move(ioeventPtr), task)));
41 event_add(ioevent, nullptr);
42 }
43
RemoveIoCompletionTask(int fd)44 void EventTaskRunner::RemoveIoCompletionTask(int fd) {
45 fd_task_map_.erase(fd);
46 }
47
Run()48 void EventTaskRunner::Run() {
49 g_event_base = base_.get();
50
51 struct sigaction sa = {};
52 sa.sa_handler = [](int signal) {
53 event_base_loopexit(g_event_base, nullptr);
54 };
55 sigfillset(&sa.sa_mask);
56 sigaction(SIGINT, &sa, nullptr);
57
58 do {
59 event_base_loop(g_event_base, EVLOOP_ONCE);
60 } while (!event_base_got_exit(g_event_base));
61 g_event_base = nullptr;
62 }
63
ReScheduleEvent(base::TimeDelta delay)64 void EventTaskRunner::ReScheduleEvent(base::TimeDelta delay) {
65 timespec ts = delay.ToTimeSpec();
66 timeval tv = {ts.tv_sec, ts.tv_nsec / 1000};
67 event_add(task_event_.get(), &tv);
68 }
69
EventHandler(int,int16_t,void * runner)70 void EventTaskRunner::EventHandler(int /* fd */,
71 int16_t /* what */,
72 void* runner) {
73 static_cast<EventTaskRunner*>(runner)->Process();
74 }
75
FreeEvent(event * evnt)76 void EventTaskRunner::FreeEvent(event* evnt) {
77 event_del(evnt);
78 event_free(evnt);
79 }
80
Process()81 void EventTaskRunner::Process() {
82 while (!queue_.empty() && queue_.top().first.first <= base::Time::Now()) {
83 auto cb = queue_.top().second;
84 queue_.pop();
85 cb.Run();
86 }
87 if (!queue_.empty()) {
88 base::TimeDelta delta = std::max(
89 base::TimeDelta(), queue_.top().first.first - base::Time::Now());
90 ReScheduleEvent(delta);
91 }
92 }
93
FdEventHandler(int fd,int16_t what,void * runner)94 void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) {
95 static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what);
96 }
97
ProcessFd(int fd,int16_t what)98 void EventTaskRunner::ProcessFd(int fd, int16_t what) {
99 auto it = fd_task_map_.find(fd);
100 if (it != fd_task_map_.end()) {
101 const IoCompletionCallback& callback = it->second.second;
102 callback.Run(fd, what, this);
103 }
104 }
105
106 } // namespace examples
107 } // namespace weave
108