1 #include <pdx/service_dispatcher.h>
2 
3 #include <errno.h>
4 #include <log/log.h>
5 #include <sys/epoll.h>
6 #include <sys/eventfd.h>
7 
8 #include <pdx/service.h>
9 #include <pdx/service_endpoint.h>
10 
11 static const int kMaxEventsPerLoop = 128;
12 
13 namespace android {
14 namespace pdx {
15 
Create()16 std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() {
17   std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
18   if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
19     dispatcher.reset();
20   }
21 
22   return dispatcher;
23 }
24 
ServiceDispatcher()25 ServiceDispatcher::ServiceDispatcher() {
26   event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
27   if (!event_fd_) {
28     ALOGE("Failed to create event fd because: %s\n", strerror(errno));
29     return;
30   }
31 
32   epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
33   if (!epoll_fd_) {
34     ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
35     return;
36   }
37 
38   // Use "this" as a unique pointer to distinguish the event fd from all
39   // the other entries that point to instances of Service.
40   epoll_event event;
41   event.events = EPOLLIN;
42   event.data.ptr = this;
43 
44   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
45     ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
46 
47     // Close the fds here and signal failure to the factory method.
48     event_fd_.Close();
49     epoll_fd_.Close();
50   }
51 }
52 
~ServiceDispatcher()53 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
54 
ThreadEnter()55 int ServiceDispatcher::ThreadEnter() {
56   std::lock_guard<std::mutex> autolock(mutex_);
57 
58   if (canceled_)
59     return -EBUSY;
60 
61   thread_count_++;
62   return 0;
63 }
64 
ThreadExit()65 void ServiceDispatcher::ThreadExit() {
66   std::lock_guard<std::mutex> autolock(mutex_);
67   thread_count_--;
68   condition_.notify_one();
69 }
70 
AddService(const std::shared_ptr<Service> & service)71 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
72   std::lock_guard<std::mutex> autolock(mutex_);
73 
74   epoll_event event;
75   event.events = EPOLLIN;
76   event.data.ptr = service.get();
77 
78   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(),
79                 &event) < 0) {
80     ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
81     return -errno;
82   }
83 
84   services_.push_back(service);
85   return 0;
86 }
87 
RemoveService(const std::shared_ptr<Service> & service)88 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
89   std::lock_guard<std::mutex> autolock(mutex_);
90 
91   // It's dangerous to remove a service while other threads may be using it.
92   if (thread_count_ > 0)
93     return -EBUSY;
94 
95   epoll_event ee;  // See BUGS in man 2 epoll_ctl.
96   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(),
97                 &ee) < 0) {
98     ALOGE("Failed to remove service from dispatcher because: %s\n",
99           strerror(errno));
100     return -errno;
101   }
102 
103   services_.erase(std::remove(services_.begin(), services_.end(), service),
104                   services_.end());
105   return 0;
106 }
107 
ReceiveAndDispatch()108 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
109 
ReceiveAndDispatch(int timeout)110 int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
111   int ret = ThreadEnter();
112   if (ret < 0)
113     return ret;
114 
115   epoll_event events[kMaxEventsPerLoop];
116 
117   int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
118   if (count <= 0) {
119     ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
120              strerror(errno));
121     ThreadExit();
122     return count < 0 ? -errno : -ETIMEDOUT;
123   }
124 
125   for (int i = 0; i < count; i++) {
126     if (events[i].data.ptr == this) {
127       ThreadExit();
128       return -EBUSY;
129     } else {
130       Service* service = static_cast<Service*>(events[i].data.ptr);
131 
132       ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
133                service->endpoint()->epoll_fd());
134       service->ReceiveAndDispatch();
135     }
136   }
137 
138   ThreadExit();
139   return 0;
140 }
141 
EnterDispatchLoop()142 int ServiceDispatcher::EnterDispatchLoop() {
143   int ret = ThreadEnter();
144   if (ret < 0)
145     return ret;
146 
147   epoll_event events[kMaxEventsPerLoop];
148 
149   while (!IsCanceled()) {
150     int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
151     if (count < 0 && errno != EINTR) {
152       ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
153       ThreadExit();
154       return -errno;
155     }
156 
157     for (int i = 0; i < count; i++) {
158       if (events[i].data.ptr == this) {
159         ThreadExit();
160         return -EBUSY;
161       } else {
162         Service* service = static_cast<Service*>(events[i].data.ptr);
163 
164         ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
165                  service->endpoint()->epoll_fd());
166         service->ReceiveAndDispatch();
167       }
168     }
169   }
170 
171   ThreadExit();
172   return 0;
173 }
174 
SetCanceled(bool cancel)175 void ServiceDispatcher::SetCanceled(bool cancel) {
176   std::unique_lock<std::mutex> lock(mutex_);
177   canceled_ = cancel;
178 
179   if (canceled_ && thread_count_ > 0) {
180     eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit.
181 
182     condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
183 
184     eventfd_t value;
185     eventfd_read(event_fd_.Get(), &value);  // Unsignal.
186   }
187 }
188 
IsCanceled() const189 bool ServiceDispatcher::IsCanceled() const { return canceled_; }
190 
191 }  // namespace pdx
192 }  // namespace android
193