1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #define LOG_TAG "bt_osi_reactor"
20 
21 #include "osi/include/reactor.h"
22 
23 #include <base/logging.h>
24 #include <errno.h>
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/epoll.h>
29 #include <sys/eventfd.h>
30 #include <unistd.h>
31 
32 #include <mutex>
33 
34 #include "osi/include/allocator.h"
35 #include "osi/include/list.h"
36 #include "osi/include/log.h"
37 
38 #if !defined(EFD_SEMAPHORE)
39 #define EFD_SEMAPHORE (1 << 0)
40 #endif
41 
42 struct reactor_t {
43   int epoll_fd;
44   int event_fd;
45   std::mutex* list_mutex;
46   list_t* invalidation_list;  // reactor objects that have been unregistered.
47   pthread_t run_thread;       // the pthread on which reactor_run is executing.
48   bool is_running;            // indicates whether |run_thread| is valid.
49   bool object_removed;
50 };
51 
52 struct reactor_object_t {
53   int fd;              // the file descriptor to monitor for events.
54   void* context;       // a context that's passed back to the *_ready functions.
55   reactor_t* reactor;  // the reactor instance this object is registered with.
56   std::mutex* mutex;  // protects the lifetime of this object and all variables.
57 
58   void (*read_ready)(void* context);   // function to call when the file
59                                        // descriptor becomes readable.
60   void (*write_ready)(void* context);  // function to call when the file
61                                        // descriptor becomes writeable.
62 };
63 
64 static reactor_status_t run_reactor(reactor_t* reactor, int iterations);
65 
66 static const size_t MAX_EVENTS = 64;
67 static const eventfd_t EVENT_REACTOR_STOP = 1;
68 
reactor_new(void)69 reactor_t* reactor_new(void) {
70   reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t));
71 
72   ret->epoll_fd = INVALID_FD;
73   ret->event_fd = INVALID_FD;
74 
75   ret->epoll_fd = epoll_create(MAX_EVENTS);
76   if (ret->epoll_fd == INVALID_FD) {
77     LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__,
78               strerror(errno));
79     goto error;
80   }
81 
82   ret->event_fd = eventfd(0, 0);
83   if (ret->event_fd == INVALID_FD) {
84     LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__,
85               strerror(errno));
86     goto error;
87   }
88 
89   ret->list_mutex = new std::mutex;
90   ret->invalidation_list = list_new(NULL);
91   if (!ret->invalidation_list) {
92     LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.",
93               __func__);
94     goto error;
95   }
96 
97   struct epoll_event event;
98   memset(&event, 0, sizeof(event));
99   event.events = EPOLLIN;
100   event.data.ptr = NULL;
101   if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
102     LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s",
103               __func__, strerror(errno));
104     goto error;
105   }
106 
107   return ret;
108 
109 error:;
110   reactor_free(ret);
111   return NULL;
112 }
113 
reactor_free(reactor_t * reactor)114 void reactor_free(reactor_t* reactor) {
115   if (!reactor) return;
116 
117   list_free(reactor->invalidation_list);
118   close(reactor->event_fd);
119   close(reactor->epoll_fd);
120   osi_free(reactor);
121 }
122 
reactor_start(reactor_t * reactor)123 reactor_status_t reactor_start(reactor_t* reactor) {
124   CHECK(reactor != NULL);
125   return run_reactor(reactor, 0);
126 }
127 
reactor_run_once(reactor_t * reactor)128 reactor_status_t reactor_run_once(reactor_t* reactor) {
129   CHECK(reactor != NULL);
130   return run_reactor(reactor, 1);
131 }
132 
reactor_stop(reactor_t * reactor)133 void reactor_stop(reactor_t* reactor) {
134   CHECK(reactor != NULL);
135 
136   eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
137 }
138 
reactor_register(reactor_t * reactor,int fd,void * context,void (* read_ready)(void * context),void (* write_ready)(void * context))139 reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context,
140                                    void (*read_ready)(void* context),
141                                    void (*write_ready)(void* context)) {
142   CHECK(reactor != NULL);
143   CHECK(fd != INVALID_FD);
144 
145   reactor_object_t* object =
146       (reactor_object_t*)osi_calloc(sizeof(reactor_object_t));
147 
148   object->reactor = reactor;
149   object->fd = fd;
150   object->context = context;
151   object->read_ready = read_ready;
152   object->write_ready = write_ready;
153   object->mutex = new std::mutex;
154 
155   struct epoll_event event;
156   memset(&event, 0, sizeof(event));
157   if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
158   if (write_ready) event.events |= EPOLLOUT;
159   event.data.ptr = object;
160 
161   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
162     LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__,
163               fd, strerror(errno));
164     delete object->mutex;
165     osi_free(object);
166     return NULL;
167   }
168 
169   return object;
170 }
171 
reactor_change_registration(reactor_object_t * object,void (* read_ready)(void * context),void (* write_ready)(void * context))172 bool reactor_change_registration(reactor_object_t* object,
173                                  void (*read_ready)(void* context),
174                                  void (*write_ready)(void* context)) {
175   CHECK(object != NULL);
176 
177   struct epoll_event event;
178   memset(&event, 0, sizeof(event));
179   if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
180   if (write_ready) event.events |= EPOLLOUT;
181   event.data.ptr = object;
182 
183   if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) ==
184       -1) {
185     LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s",
186               __func__, object->fd, strerror(errno));
187     return false;
188   }
189 
190   std::lock_guard<std::mutex> lock(*object->mutex);
191   object->read_ready = read_ready;
192   object->write_ready = write_ready;
193 
194   return true;
195 }
196 
reactor_unregister(reactor_object_t * obj)197 void reactor_unregister(reactor_object_t* obj) {
198   CHECK(obj != NULL);
199 
200   reactor_t* reactor = obj->reactor;
201 
202   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
203     LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s",
204               __func__, obj->fd, strerror(errno));
205 
206   if (reactor->is_running &&
207       pthread_equal(pthread_self(), reactor->run_thread)) {
208     reactor->object_removed = true;
209     return;
210   }
211 
212   {
213     std::unique_lock<std::mutex> lock(*reactor->list_mutex);
214     list_append(reactor->invalidation_list, obj);
215   }
216 
217   // Taking the object lock here makes sure a callback for |obj| isn't
218   // currently executing. The reactor thread must then either be before
219   // the callbacks or after. If after, we know that the object won't be
220   // referenced because it has been taken out of the epoll set. If before,
221   // it won't be referenced because the reactor thread will check the
222   // invalidation_list and find it in there. So by taking this lock, we
223   // are waiting until the reactor thread drops all references to |obj|.
224   // One the wait completes, we can unlock and destroy |obj| safely.
225   obj->mutex->lock();
226   obj->mutex->unlock();
227   delete obj->mutex;
228   osi_free(obj);
229 }
230 
231 // Runs the reactor loop for a maximum of |iterations|.
232 // 0 |iterations| means loop forever.
233 // |reactor| may not be NULL.
run_reactor(reactor_t * reactor,int iterations)234 static reactor_status_t run_reactor(reactor_t* reactor, int iterations) {
235   CHECK(reactor != NULL);
236 
237   reactor->run_thread = pthread_self();
238   reactor->is_running = true;
239 
240   struct epoll_event events[MAX_EVENTS];
241   for (int i = 0; iterations == 0 || i < iterations; ++i) {
242     {
243       std::lock_guard<std::mutex> lock(*reactor->list_mutex);
244       list_clear(reactor->invalidation_list);
245     }
246 
247     int ret;
248     OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
249     if (ret == -1) {
250       LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__,
251                 strerror(errno));
252       reactor->is_running = false;
253       return REACTOR_STATUS_ERROR;
254     }
255 
256     for (int j = 0; j < ret; ++j) {
257       // The event file descriptor is the only one that registers with
258       // a NULL data pointer. We use the NULL to identify it and break
259       // out of the reactor loop.
260       if (events[j].data.ptr == NULL) {
261         eventfd_t value;
262         eventfd_read(reactor->event_fd, &value);
263         reactor->is_running = false;
264         return REACTOR_STATUS_STOP;
265       }
266 
267       reactor_object_t* object = (reactor_object_t*)events[j].data.ptr;
268 
269       std::unique_lock<std::mutex> lock(*reactor->list_mutex);
270       if (list_contains(reactor->invalidation_list, object)) {
271         continue;
272       }
273 
274       // Downgrade the list lock to an object lock.
275       {
276         std::lock_guard<std::mutex> obj_lock(*object->mutex);
277         lock.unlock();
278 
279         reactor->object_removed = false;
280         if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
281             object->read_ready)
282           object->read_ready(object->context);
283         if (!reactor->object_removed && events[j].events & EPOLLOUT &&
284             object->write_ready)
285           object->write_ready(object->context);
286       }
287 
288       if (reactor->object_removed) {
289         delete object->mutex;
290         osi_free(object);
291       }
292     }
293   }
294 
295   reactor->is_running = false;
296   return REACTOR_STATUS_DONE;
297 }
298