1 /******************************************************************************
2  *
3  *  Copyright 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #define LOG_TAG "bt_osi_reactor"
20 
21 #include "osi/include/reactor.h"
22 
23 #include <bluetooth/log.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <sys/eventfd.h>
29 #include <unistd.h>
30 
31 #include <mutex>
32 
33 #include "os/log.h"
34 #include "osi/include/allocator.h"
35 #include "osi/include/list.h"
36 
37 #if !defined(EFD_SEMAPHORE)
38 #define EFD_SEMAPHORE (1 << 0)
39 #endif
40 
41 using namespace bluetooth;
42 
43 struct reactor_t {
44   int epoll_fd;
45   int event_fd;
46   std::mutex* list_mutex;
47   list_t* invalidation_list;  // reactor objects that have been unregistered.
48   pthread_t run_thread;       // the pthread on which reactor_run is executing.
49   bool is_running;            // indicates whether |run_thread| is valid.
50   bool object_removed;
51 };
52 
53 struct reactor_object_t {
54   int fd;              // the file descriptor to monitor for events.
55   void* context;       // a context that's passed back to the *_ready functions.
56   reactor_t* reactor;  // the reactor instance this object is registered with.
57   std::mutex* mutex;  // protects the lifetime of this object and all variables.
58 
59   void (*read_ready)(void* context);   // function to call when the file
60                                        // descriptor becomes readable.
61   void (*write_ready)(void* context);  // function to call when the file
62                                        // descriptor becomes writeable.
63 };
64 
65 static reactor_status_t run_reactor(reactor_t* reactor, int iterations);
66 
67 static const size_t MAX_EVENTS = 64;
68 static const eventfd_t EVENT_REACTOR_STOP = 1;
69 
reactor_new(void)70 reactor_t* reactor_new(void) {
71   reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t));
72 
73   ret->epoll_fd = INVALID_FD;
74   ret->event_fd = INVALID_FD;
75 
76   ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
77   if (ret->epoll_fd == INVALID_FD) {
78     log::error("unable to create epoll instance: {}", strerror(errno));
79     goto error;
80   }
81 
82   ret->event_fd = eventfd(0, 0);
83   if (ret->event_fd == INVALID_FD) {
84     log::error("unable to create eventfd: {}", strerror(errno));
85     goto error;
86   }
87 
88   ret->list_mutex = new std::mutex;
89   ret->invalidation_list = list_new(NULL);
90   if (!ret->invalidation_list) {
91     log::error("unable to allocate object invalidation list.");
92     goto error;
93   }
94 
95   struct epoll_event event;
96   memset(&event, 0, sizeof(event));
97   event.events = EPOLLIN;
98   event.data.ptr = NULL;
99   if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
100     log::error("unable to register eventfd with epoll set: {}",
101                strerror(errno));
102     goto error;
103   }
104 
105   return ret;
106 
107 error:;
108   reactor_free(ret);
109   return NULL;
110 }
111 
reactor_free(reactor_t * reactor)112 void reactor_free(reactor_t* reactor) {
113   if (!reactor) return;
114 
115   list_free(reactor->invalidation_list);
116   close(reactor->event_fd);
117   close(reactor->epoll_fd);
118   delete reactor->list_mutex;
119   osi_free(reactor);
120 }
121 
reactor_start(reactor_t * reactor)122 reactor_status_t reactor_start(reactor_t* reactor) {
123   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
124   return run_reactor(reactor, 0);
125 }
126 
reactor_run_once(reactor_t * reactor)127 reactor_status_t reactor_run_once(reactor_t* reactor) {
128   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
129   return run_reactor(reactor, 1);
130 }
131 
reactor_stop(reactor_t * reactor)132 void reactor_stop(reactor_t* reactor) {
133   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
134 
135   eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
136 }
137 
reactor_register(reactor_t * reactor,int fd,void * context,void (* read_ready)(void * context),void (* write_ready)(void * context))138 reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context,
139                                    void (*read_ready)(void* context),
140                                    void (*write_ready)(void* context)) {
141   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
142   log::assert_that(fd != INVALID_FD, "assert failed: fd != INVALID_FD");
143 
144   reactor_object_t* object =
145       (reactor_object_t*)osi_calloc(sizeof(reactor_object_t));
146 
147   object->reactor = reactor;
148   object->fd = fd;
149   object->context = context;
150   object->read_ready = read_ready;
151   object->write_ready = write_ready;
152   object->mutex = new std::mutex;
153 
154   struct epoll_event event;
155   memset(&event, 0, sizeof(event));
156   if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
157   if (write_ready) event.events |= EPOLLOUT;
158   event.data.ptr = object;
159 
160   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
161     log::error("unable to register fd {} to epoll set: {}", fd,
162                strerror(errno));
163     delete object->mutex;
164     osi_free(object);
165     return NULL;
166   }
167 
168   return object;
169 }
170 
reactor_change_registration(reactor_object_t * object,void (* read_ready)(void * context),void (* write_ready)(void * context))171 bool reactor_change_registration(reactor_object_t* object,
172                                  void (*read_ready)(void* context),
173                                  void (*write_ready)(void* context)) {
174   log::assert_that(object != NULL, "assert failed: object != NULL");
175 
176   struct epoll_event event;
177   memset(&event, 0, sizeof(event));
178   if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
179   if (write_ready) event.events |= EPOLLOUT;
180   event.data.ptr = object;
181 
182   if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) ==
183       -1) {
184     log::error("unable to modify interest set for fd {}: {}", object->fd,
185                strerror(errno));
186     return false;
187   }
188 
189   std::lock_guard<std::mutex> lock(*object->mutex);
190   object->read_ready = read_ready;
191   object->write_ready = write_ready;
192 
193   return true;
194 }
195 
reactor_unregister(reactor_object_t * obj)196 void reactor_unregister(reactor_object_t* obj) {
197   log::assert_that(obj != NULL, "assert failed: obj != NULL");
198 
199   reactor_t* reactor = obj->reactor;
200 
201   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
202     log::error("unable to unregister fd {} from epoll set: {}", obj->fd,
203                strerror(errno));
204 
205   if (reactor->is_running &&
206       pthread_equal(pthread_self(), reactor->run_thread)) {
207     reactor->object_removed = true;
208     return;
209   }
210 
211   {
212     std::unique_lock<std::mutex> lock(*reactor->list_mutex);
213     list_append(reactor->invalidation_list, obj);
214   }
215 
216   // Taking the object lock here makes sure a callback for |obj| isn't
217   // currently executing. The reactor thread must then either be before
218   // the callbacks or after. If after, we know that the object won't be
219   // referenced because it has been taken out of the epoll set. If before,
220   // it won't be referenced because the reactor thread will check the
221   // invalidation_list and find it in there. So by taking this lock, we
222   // are waiting until the reactor thread drops all references to |obj|.
223   // One the wait completes, we can unlock and destroy |obj| safely.
224   obj->mutex->lock();
225   obj->mutex->unlock();
226   delete obj->mutex;
227   osi_free(obj);
228 }
229 
230 // Runs the reactor loop for a maximum of |iterations|.
231 // 0 |iterations| means loop forever.
232 // |reactor| may not be NULL.
run_reactor(reactor_t * reactor,int iterations)233 static reactor_status_t run_reactor(reactor_t* reactor, int iterations) {
234   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
235 
236   reactor->run_thread = pthread_self();
237   reactor->is_running = true;
238 
239   struct epoll_event events[MAX_EVENTS];
240   for (int i = 0; iterations == 0 || i < iterations; ++i) {
241     {
242       std::lock_guard<std::mutex> lock(*reactor->list_mutex);
243       list_clear(reactor->invalidation_list);
244     }
245 
246     int ret;
247     OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
248     if (ret == -1) {
249       log::error("error in epoll_wait: {}", strerror(errno));
250       reactor->is_running = false;
251       return REACTOR_STATUS_ERROR;
252     }
253 
254     for (int j = 0; j < ret; ++j) {
255       // The event file descriptor is the only one that registers with
256       // a NULL data pointer. We use the NULL to identify it and break
257       // out of the reactor loop.
258       if (events[j].data.ptr == NULL) {
259         eventfd_t value;
260         eventfd_read(reactor->event_fd, &value);
261         reactor->is_running = false;
262         return REACTOR_STATUS_STOP;
263       }
264 
265       reactor_object_t* object = (reactor_object_t*)events[j].data.ptr;
266 
267       std::unique_lock<std::mutex> lock(*reactor->list_mutex);
268       if (list_contains(reactor->invalidation_list, object)) {
269         continue;
270       }
271 
272       // Downgrade the list lock to an object lock.
273       {
274         std::lock_guard<std::mutex> obj_lock(*object->mutex);
275         lock.unlock();
276 
277         reactor->object_removed = false;
278         if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
279             object->read_ready)
280           object->read_ready(object->context);
281         if (!reactor->object_removed && events[j].events & EPOLLOUT &&
282             object->write_ready)
283           object->write_ready(object->context);
284       }
285 
286       if (reactor->object_removed) {
287         delete object->mutex;
288         osi_free(object);
289       }
290     }
291   }
292 
293   reactor->is_running = false;
294   return REACTOR_STATUS_DONE;
295 }
296