1 #include "uds/service_dispatcher.h"
2
3 #include <errno.h>
4 #include <log/log.h>
5 #include <sys/epoll.h>
6 #include <sys/eventfd.h>
7
8 #include "pdx/service.h"
9 #include "uds/service_endpoint.h"
10
11 static const int kMaxEventsPerLoop = 128;
12
13 namespace android {
14 namespace pdx {
15 namespace uds {
16
Create()17 std::unique_ptr<pdx::ServiceDispatcher> ServiceDispatcher::Create() {
18 std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
19 if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
20 dispatcher.reset();
21 }
22
23 return std::move(dispatcher);
24 }
25
ServiceDispatcher()26 ServiceDispatcher::ServiceDispatcher() {
27 event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
28 if (!event_fd_) {
29 ALOGE("Failed to create event fd because: %s\n", strerror(errno));
30 return;
31 }
32
33 epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
34 if (!epoll_fd_) {
35 ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
36 return;
37 }
38
39 // Use "this" as a unique pointer to distinguish the event fd from all
40 // the other entries that point to instances of Service.
41 epoll_event event;
42 event.events = EPOLLIN;
43 event.data.ptr = this;
44
45 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
46 ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
47
48 // Close the fds here and signal failure to the factory method.
49 event_fd_.Close();
50 epoll_fd_.Close();
51 }
52 }
53
~ServiceDispatcher()54 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
55
ThreadEnter()56 int ServiceDispatcher::ThreadEnter() {
57 std::lock_guard<std::mutex> autolock(mutex_);
58
59 if (canceled_)
60 return -EBUSY;
61
62 thread_count_++;
63 return 0;
64 }
65
ThreadExit()66 void ServiceDispatcher::ThreadExit() {
67 std::lock_guard<std::mutex> autolock(mutex_);
68 thread_count_--;
69 condition_.notify_one();
70 }
71
AddService(const std::shared_ptr<Service> & service)72 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
73 if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
74 return -EINVAL;
75
76 std::lock_guard<std::mutex> autolock(mutex_);
77
78 auto* endpoint = static_cast<Endpoint*>(service->endpoint());
79 epoll_event event;
80 event.events = EPOLLIN;
81 event.data.ptr = service.get();
82
83 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, endpoint->epoll_fd(), &event) <
84 0) {
85 ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
86 return -errno;
87 }
88
89 services_.push_back(service);
90 return 0;
91 }
92
RemoveService(const std::shared_ptr<Service> & service)93 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
94 if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
95 return -EINVAL;
96
97 std::lock_guard<std::mutex> autolock(mutex_);
98
99 // It's dangerous to remove a service while other threads may be using it.
100 if (thread_count_ > 0)
101 return -EBUSY;
102
103 epoll_event dummy; // See BUGS in man 2 epoll_ctl.
104
105 auto* endpoint = static_cast<Endpoint*>(service->endpoint());
106 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, endpoint->epoll_fd(), &dummy) <
107 0) {
108 ALOGE("Failed to remove service from dispatcher because: %s\n",
109 strerror(errno));
110 return -errno;
111 }
112
113 services_.remove(service);
114 return 0;
115 }
116
ReceiveAndDispatch()117 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
118
ReceiveAndDispatch(int timeout)119 int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
120 int ret = ThreadEnter();
121 if (ret < 0)
122 return ret;
123
124 epoll_event events[kMaxEventsPerLoop];
125
126 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
127 if (count <= 0) {
128 ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
129 strerror(errno));
130 ThreadExit();
131 return count < 0 ? -errno : -ETIMEDOUT;
132 }
133
134 for (int i = 0; i < count; i++) {
135 if (events[i].data.ptr == this) {
136 ThreadExit();
137 return -EBUSY;
138 } else {
139 Service* service = static_cast<Service*>(events[i].data.ptr);
140
141 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
142 static_cast<Endpoint*>(service->endpoint())->epoll_fd());
143 service->ReceiveAndDispatch();
144 }
145 }
146
147 ThreadExit();
148 return 0;
149 }
150
EnterDispatchLoop()151 int ServiceDispatcher::EnterDispatchLoop() {
152 int ret = ThreadEnter();
153 if (ret < 0)
154 return ret;
155
156 epoll_event events[kMaxEventsPerLoop];
157
158 while (!IsCanceled()) {
159 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
160 if (count < 0 && errno != EINTR) {
161 ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
162 ThreadExit();
163 return -errno;
164 }
165
166 for (int i = 0; i < count; i++) {
167 if (events[i].data.ptr == this) {
168 ThreadExit();
169 return -EBUSY;
170 } else {
171 Service* service = static_cast<Service*>(events[i].data.ptr);
172
173 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
174 static_cast<Endpoint*>(service->endpoint())->epoll_fd());
175 service->ReceiveAndDispatch();
176 }
177 }
178 }
179
180 ThreadExit();
181 return 0;
182 }
183
SetCanceled(bool cancel)184 void ServiceDispatcher::SetCanceled(bool cancel) {
185 std::unique_lock<std::mutex> lock(mutex_);
186 canceled_ = cancel;
187
188 if (canceled_ && thread_count_ > 0) {
189 eventfd_write(event_fd_.Get(), 1); // Signal threads to quit.
190
191 condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
192
193 eventfd_t value;
194 eventfd_read(event_fd_.Get(), &value); // Unsignal.
195 }
196 }
197
IsCanceled() const198 bool ServiceDispatcher::IsCanceled() const { return canceled_; }
199
200 } // namespace uds
201 } // namespace pdx
202 } // namespace android
203