1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "examples/provider/event_task_runner.h"
6 
7 #include <signal.h>
8 
9 namespace weave {
10 namespace examples {
11 
12 namespace {
13 event_base* g_event_base = nullptr;
14 }
15 
PostDelayedTask(const tracked_objects::Location & from_here,const base::Closure & task,base::TimeDelta delay)16 void EventTaskRunner::PostDelayedTask(
17     const tracked_objects::Location& from_here,
18     const base::Closure& task,
19     base::TimeDelta delay) {
20   base::Time new_time = base::Time::Now() + delay;
21   if (queue_.empty() || new_time < queue_.top().first.first) {
22     ReScheduleEvent(delay);
23   }
24   queue_.emplace(std::make_pair(new_time, ++counter_), task);
25 }
26 
AddIoCompletionTask(int fd,int16_t what,const EventTaskRunner::IoCompletionCallback & task)27 void EventTaskRunner::AddIoCompletionTask(
28     int fd,
29     int16_t what,
30     const EventTaskRunner::IoCompletionCallback& task) {
31   int16_t flags = EV_PERSIST | EV_ET;
32   flags |= (what & kReadable) ? EV_READ : 0;
33   flags |= (what & kWriteable) ? EV_WRITE : 0;
34 #if LIBEVENT_VERSION_NUMBER >= 0x02010400
35   flags |= (what & kClosed) ? EV_CLOSED : 0;
36 #endif
37   event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this);
38   EventPtr<event> ioeventPtr{ioevent};
39   fd_task_map_.insert(
40       std::make_pair(fd, std::make_pair(std::move(ioeventPtr), task)));
41   event_add(ioevent, nullptr);
42 }
43 
RemoveIoCompletionTask(int fd)44 void EventTaskRunner::RemoveIoCompletionTask(int fd) {
45   fd_task_map_.erase(fd);
46 }
47 
Run()48 void EventTaskRunner::Run() {
49   g_event_base = base_.get();
50 
51   struct sigaction sa = {};
52   sa.sa_handler = [](int signal) {
53     event_base_loopexit(g_event_base, nullptr);
54   };
55   sigfillset(&sa.sa_mask);
56   sigaction(SIGINT, &sa, nullptr);
57 
58   do {
59     event_base_loop(g_event_base, EVLOOP_ONCE);
60   } while (!event_base_got_exit(g_event_base));
61   g_event_base = nullptr;
62 }
63 
ReScheduleEvent(base::TimeDelta delay)64 void EventTaskRunner::ReScheduleEvent(base::TimeDelta delay) {
65   timespec ts = delay.ToTimeSpec();
66   timeval tv = {ts.tv_sec, ts.tv_nsec / 1000};
67   event_add(task_event_.get(), &tv);
68 }
69 
EventHandler(int,int16_t,void * runner)70 void EventTaskRunner::EventHandler(int /* fd */,
71                                    int16_t /* what */,
72                                    void* runner) {
73   static_cast<EventTaskRunner*>(runner)->Process();
74 }
75 
FreeEvent(event * evnt)76 void EventTaskRunner::FreeEvent(event* evnt) {
77   event_del(evnt);
78   event_free(evnt);
79 }
80 
Process()81 void EventTaskRunner::Process() {
82   while (!queue_.empty() && queue_.top().first.first <= base::Time::Now()) {
83     auto cb = queue_.top().second;
84     queue_.pop();
85     cb.Run();
86   }
87   if (!queue_.empty()) {
88     base::TimeDelta delta = std::max(
89         base::TimeDelta(), queue_.top().first.first - base::Time::Now());
90     ReScheduleEvent(delta);
91   }
92 }
93 
FdEventHandler(int fd,int16_t what,void * runner)94 void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) {
95   static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what);
96 }
97 
ProcessFd(int fd,int16_t what)98 void EventTaskRunner::ProcessFd(int fd, int16_t what) {
99   auto it = fd_task_map_.find(fd);
100   if (it != fd_task_map_.end()) {
101     const IoCompletionCallback& callback = it->second.second;
102     callback.Run(fd, what, this);
103   }
104 }
105 
106 }  // namespace examples
107 }  // namespace weave
108