1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <grpc/support/log.h>
24
25 /* This polling engine is only relevant on linux kernels supporting epoll
26 epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/cpu.h>
43 #include <grpc/support/string_util.h>
44
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gpr/tls.h"
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/block_annotate.h"
51 #include "src/core/lib/iomgr/ev_posix.h"
52 #include "src/core/lib/iomgr/iomgr_internal.h"
53 #include "src/core/lib/iomgr/lockfree_event.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
56
57 static grpc_wakeup_fd global_wakeup_fd;
58
59 /*******************************************************************************
60 * Singleton epoll set related fields
61 */
62
63 #define MAX_EPOLL_EVENTS 100
64 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
65
66 /* NOTE ON SYNCHRONIZATION:
67 * - Fields in this struct are only modified by the designated poller. Hence
68 * there is no need for any locks to protect the struct.
69 * - num_events and cursor fields have to be of atomic type to provide memory
70 * visibility guarantees only. i.e In case of multiple pollers, the designated
71 * polling thread keeps changing; the thread that wrote these values may be
72 * different from the thread reading the values
73 */
74 typedef struct epoll_set {
75 int epfd;
76
77 /* The epoll_events after the last call to epoll_wait() */
78 struct epoll_event events[MAX_EPOLL_EVENTS];
79
80 /* The number of epoll_events after the last call to epoll_wait() */
81 gpr_atm num_events;
82
83 /* Index of the first event in epoll_events that has to be processed. This
84 * field is only valid if num_events > 0 */
85 gpr_atm cursor;
86 } epoll_set;
87
88 /* The global singleton epoll set */
89 static epoll_set g_epoll_set;
90
epoll_create_and_cloexec()91 static int epoll_create_and_cloexec() {
92 #ifdef GRPC_LINUX_EPOLL_CREATE1
93 int fd = epoll_create1(EPOLL_CLOEXEC);
94 if (fd < 0) {
95 gpr_log(GPR_ERROR, "epoll_create1 unavailable");
96 }
97 #else
98 int fd = epoll_create(MAX_EPOLL_EVENTS);
99 if (fd < 0) {
100 gpr_log(GPR_ERROR, "epoll_create unavailable");
101 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
102 gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
103 return -1;
104 }
105 #endif
106 return fd;
107 }
108
109 /* Must be called *only* once */
epoll_set_init()110 static bool epoll_set_init() {
111 g_epoll_set.epfd = epoll_create_and_cloexec();
112 if (g_epoll_set.epfd < 0) {
113 return false;
114 }
115
116 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
117 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
118 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
119 return true;
120 }
121
122 /* epoll_set_init() MUST be called before calling this. */
epoll_set_shutdown()123 static void epoll_set_shutdown() {
124 if (g_epoll_set.epfd >= 0) {
125 close(g_epoll_set.epfd);
126 g_epoll_set.epfd = -1;
127 }
128 }
129
130 /*******************************************************************************
131 * Fd Declarations
132 */
133
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 struct grpc_fork_fd_list {
136 grpc_fd* fd;
137 grpc_fd* next;
138 grpc_fd* prev;
139 };
140
141 struct grpc_fd {
142 int fd;
143
144 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
145 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
146 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
147
148 struct grpc_fd* freelist_next;
149
150 grpc_iomgr_object iomgr_object;
151
152 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
153 grpc_fork_fd_list* fork_fd_list;
154 };
155
156 static void fd_global_init(void);
157 static void fd_global_shutdown(void);
158
159 /*******************************************************************************
160 * Pollset Declarations
161 */
162
163 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
164
kick_state_string(kick_state st)165 static const char* kick_state_string(kick_state st) {
166 switch (st) {
167 case UNKICKED:
168 return "UNKICKED";
169 case KICKED:
170 return "KICKED";
171 case DESIGNATED_POLLER:
172 return "DESIGNATED_POLLER";
173 }
174 GPR_UNREACHABLE_CODE(return "UNKNOWN");
175 }
176
177 struct grpc_pollset_worker {
178 kick_state state;
179 int kick_state_mutator; // which line of code last changed kick state
180 bool initialized_cv;
181 grpc_pollset_worker* next;
182 grpc_pollset_worker* prev;
183 gpr_cv cv;
184 grpc_closure_list schedule_on_end_work;
185 };
186
187 #define SET_KICK_STATE(worker, kick_state) \
188 do { \
189 (worker)->state = (kick_state); \
190 (worker)->kick_state_mutator = __LINE__; \
191 } while (false)
192
193 #define MAX_NEIGHBORHOODS 1024
194
195 typedef struct pollset_neighborhood {
196 gpr_mu mu;
197 grpc_pollset* active_root;
198 char pad[GPR_CACHELINE_SIZE];
199 } pollset_neighborhood;
200
201 struct grpc_pollset {
202 gpr_mu mu;
203 pollset_neighborhood* neighborhood;
204 bool reassigning_neighborhood;
205 grpc_pollset_worker* root_worker;
206 bool kicked_without_poller;
207
208 /* Set to true if the pollset is observed to have no workers available to
209 poll */
210 bool seen_inactive;
211 bool shutting_down; /* Is the pollset shutting down ? */
212 grpc_closure* shutdown_closure; /* Called after after shutdown is complete */
213
214 /* Number of workers who are *about-to* attach themselves to the pollset
215 * worker list */
216 int begin_refs;
217
218 grpc_pollset* next;
219 grpc_pollset* prev;
220 };
221
222 /*******************************************************************************
223 * Pollset-set Declarations
224 */
225
226 struct grpc_pollset_set {
227 char unused;
228 };
229
230 /*******************************************************************************
231 * Common helpers
232 */
233
append_error(grpc_error ** composite,grpc_error * error,const char * desc)234 static bool append_error(grpc_error** composite, grpc_error* error,
235 const char* desc) {
236 if (error == GRPC_ERROR_NONE) return true;
237 if (*composite == GRPC_ERROR_NONE) {
238 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
239 }
240 *composite = grpc_error_add_child(*composite, error);
241 return false;
242 }
243
244 /*******************************************************************************
245 * Fd Definitions
246 */
247
248 /* We need to keep a freelist not because of any concerns of malloc performance
249 * but instead so that implementations with multiple threads in (for example)
250 * epoll_wait deal with the race between pollset removal and incoming poll
251 * notifications.
252 *
253 * The problem is that the poller ultimately holds a reference to this
254 * object, so it is very difficult to know when is safe to free it, at least
255 * without some expensive synchronization.
256 *
257 * If we keep the object freelisted, in the worst case losing this race just
258 * becomes a spurious read notification on a reused fd.
259 */
260
261 /* The alarm system needs to be able to wakeup 'some poller' sometimes
262 * (specifically when a new alarm needs to be triggered earlier than the next
263 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
264 * case occurs. */
265
266 static grpc_fd* fd_freelist = nullptr;
267 static gpr_mu fd_freelist_mu;
268
269 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
270 static grpc_fd* fork_fd_list_head = nullptr;
271 static gpr_mu fork_fd_list_mu;
272
fd_global_init(void)273 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
274
fd_global_shutdown(void)275 static void fd_global_shutdown(void) {
276 gpr_mu_lock(&fd_freelist_mu);
277 gpr_mu_unlock(&fd_freelist_mu);
278 while (fd_freelist != nullptr) {
279 grpc_fd* fd = fd_freelist;
280 fd_freelist = fd_freelist->freelist_next;
281 gpr_free(fd);
282 }
283 gpr_mu_destroy(&fd_freelist_mu);
284 }
285
fork_fd_list_add_grpc_fd(grpc_fd * fd)286 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
287 if (grpc_core::Fork::Enabled()) {
288 gpr_mu_lock(&fork_fd_list_mu);
289 fd->fork_fd_list =
290 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
291 fd->fork_fd_list->next = fork_fd_list_head;
292 fd->fork_fd_list->prev = nullptr;
293 if (fork_fd_list_head != nullptr) {
294 fork_fd_list_head->fork_fd_list->prev = fd;
295 }
296 fork_fd_list_head = fd;
297 gpr_mu_unlock(&fork_fd_list_mu);
298 }
299 }
300
fork_fd_list_remove_grpc_fd(grpc_fd * fd)301 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
302 if (grpc_core::Fork::Enabled()) {
303 gpr_mu_lock(&fork_fd_list_mu);
304 if (fork_fd_list_head == fd) {
305 fork_fd_list_head = fd->fork_fd_list->next;
306 }
307 if (fd->fork_fd_list->prev != nullptr) {
308 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
309 }
310 if (fd->fork_fd_list->next != nullptr) {
311 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
312 }
313 gpr_free(fd->fork_fd_list);
314 gpr_mu_unlock(&fork_fd_list_mu);
315 }
316 }
317
fd_create(int fd,const char * name,bool track_err)318 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
319 grpc_fd* new_fd = nullptr;
320
321 gpr_mu_lock(&fd_freelist_mu);
322 if (fd_freelist != nullptr) {
323 new_fd = fd_freelist;
324 fd_freelist = fd_freelist->freelist_next;
325 }
326 gpr_mu_unlock(&fd_freelist_mu);
327
328 if (new_fd == nullptr) {
329 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
330 new_fd->read_closure.Init();
331 new_fd->write_closure.Init();
332 new_fd->error_closure.Init();
333 }
334 new_fd->fd = fd;
335 new_fd->read_closure->InitEvent();
336 new_fd->write_closure->InitEvent();
337 new_fd->error_closure->InitEvent();
338
339 new_fd->freelist_next = nullptr;
340
341 char* fd_name;
342 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
343 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
344 fork_fd_list_add_grpc_fd(new_fd);
345 #ifndef NDEBUG
346 if (grpc_trace_fd_refcount.enabled()) {
347 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
348 }
349 #endif
350 gpr_free(fd_name);
351
352 struct epoll_event ev;
353 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
354 /* Use the least significant bit of ev.data.ptr to store track_err. We expect
355 * the addresses to be word aligned. We need to store track_err to avoid
356 * synchronization issues when accessing it after receiving an event.
357 * Accessing fd would be a data race there because the fd might have been
358 * returned to the free list at that point. */
359 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
360 (track_err ? 1 : 0));
361 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
362 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
363 }
364
365 return new_fd;
366 }
367
fd_wrapped_fd(grpc_fd * fd)368 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
369
370 /* if 'releasing_fd' is true, it means that we are going to detach the internal
371 * fd from grpc_fd structure (i.e which means we should not be calling
372 * shutdown() syscall on that fd) */
fd_shutdown_internal(grpc_fd * fd,grpc_error * why,bool releasing_fd)373 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
374 bool releasing_fd) {
375 if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
376 if (!releasing_fd) {
377 shutdown(fd->fd, SHUT_RDWR);
378 }
379 fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
380 fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
381 }
382 GRPC_ERROR_UNREF(why);
383 }
384
385 /* Might be called multiple times */
fd_shutdown(grpc_fd * fd,grpc_error * why)386 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
387 fd_shutdown_internal(fd, why, false);
388 }
389
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)390 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
391 const char* reason) {
392 grpc_error* error = GRPC_ERROR_NONE;
393 bool is_release_fd = (release_fd != nullptr);
394
395 if (!fd->read_closure->IsShutdown()) {
396 fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
397 is_release_fd);
398 }
399
400 /* If release_fd is not NULL, we should be relinquishing control of the file
401 descriptor fd->fd (but we still own the grpc_fd structure). */
402 if (is_release_fd) {
403 *release_fd = fd->fd;
404 } else {
405 close(fd->fd);
406 }
407
408 GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
409
410 grpc_iomgr_unregister_object(&fd->iomgr_object);
411 fork_fd_list_remove_grpc_fd(fd);
412 fd->read_closure->DestroyEvent();
413 fd->write_closure->DestroyEvent();
414 fd->error_closure->DestroyEvent();
415
416 gpr_mu_lock(&fd_freelist_mu);
417 fd->freelist_next = fd_freelist;
418 fd_freelist = fd;
419 gpr_mu_unlock(&fd_freelist_mu);
420 }
421
fd_is_shutdown(grpc_fd * fd)422 static bool fd_is_shutdown(grpc_fd* fd) {
423 return fd->read_closure->IsShutdown();
424 }
425
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)426 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
427 fd->read_closure->NotifyOn(closure);
428 }
429
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)430 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
431 fd->write_closure->NotifyOn(closure);
432 }
433
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)434 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
435 fd->error_closure->NotifyOn(closure);
436 }
437
fd_become_readable(grpc_fd * fd)438 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
439
fd_become_writable(grpc_fd * fd)440 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
441
fd_has_errors(grpc_fd * fd)442 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
443
444 /*******************************************************************************
445 * Pollset Definitions
446 */
447
448 GPR_TLS_DECL(g_current_thread_pollset);
449 GPR_TLS_DECL(g_current_thread_worker);
450
451 /* The designated poller */
452 static gpr_atm g_active_poller;
453
454 static pollset_neighborhood* g_neighborhoods;
455 static size_t g_num_neighborhoods;
456
457 /* Return true if first in list */
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)458 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
459 if (pollset->root_worker == nullptr) {
460 pollset->root_worker = worker;
461 worker->next = worker->prev = worker;
462 return true;
463 } else {
464 worker->next = pollset->root_worker;
465 worker->prev = worker->next->prev;
466 worker->next->prev = worker;
467 worker->prev->next = worker;
468 return false;
469 }
470 }
471
472 /* Return true if last in list */
473 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
474
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)475 static worker_remove_result worker_remove(grpc_pollset* pollset,
476 grpc_pollset_worker* worker) {
477 if (worker == pollset->root_worker) {
478 if (worker == worker->next) {
479 pollset->root_worker = nullptr;
480 return EMPTIED;
481 } else {
482 pollset->root_worker = worker->next;
483 worker->prev->next = worker->next;
484 worker->next->prev = worker->prev;
485 return NEW_ROOT;
486 }
487 } else {
488 worker->prev->next = worker->next;
489 worker->next->prev = worker->prev;
490 return REMOVED;
491 }
492 }
493
choose_neighborhood(void)494 static size_t choose_neighborhood(void) {
495 return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
496 }
497
pollset_global_init(void)498 static grpc_error* pollset_global_init(void) {
499 gpr_tls_init(&g_current_thread_pollset);
500 gpr_tls_init(&g_current_thread_worker);
501 gpr_atm_no_barrier_store(&g_active_poller, 0);
502 global_wakeup_fd.read_fd = -1;
503 grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
504 if (err != GRPC_ERROR_NONE) return err;
505 struct epoll_event ev;
506 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
507 ev.data.ptr = &global_wakeup_fd;
508 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
509 &ev) != 0) {
510 return GRPC_OS_ERROR(errno, "epoll_ctl");
511 }
512 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
513 g_neighborhoods = static_cast<pollset_neighborhood*>(
514 gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
515 for (size_t i = 0; i < g_num_neighborhoods; i++) {
516 gpr_mu_init(&g_neighborhoods[i].mu);
517 }
518 return GRPC_ERROR_NONE;
519 }
520
pollset_global_shutdown(void)521 static void pollset_global_shutdown(void) {
522 gpr_tls_destroy(&g_current_thread_pollset);
523 gpr_tls_destroy(&g_current_thread_worker);
524 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
525 for (size_t i = 0; i < g_num_neighborhoods; i++) {
526 gpr_mu_destroy(&g_neighborhoods[i].mu);
527 }
528 gpr_free(g_neighborhoods);
529 }
530
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)531 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
532 gpr_mu_init(&pollset->mu);
533 *mu = &pollset->mu;
534 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
535 pollset->reassigning_neighborhood = false;
536 pollset->root_worker = nullptr;
537 pollset->kicked_without_poller = false;
538 pollset->seen_inactive = true;
539 pollset->shutting_down = false;
540 pollset->shutdown_closure = nullptr;
541 pollset->begin_refs = 0;
542 pollset->next = pollset->prev = nullptr;
543 }
544
pollset_destroy(grpc_pollset * pollset)545 static void pollset_destroy(grpc_pollset* pollset) {
546 gpr_mu_lock(&pollset->mu);
547 if (!pollset->seen_inactive) {
548 pollset_neighborhood* neighborhood = pollset->neighborhood;
549 gpr_mu_unlock(&pollset->mu);
550 retry_lock_neighborhood:
551 gpr_mu_lock(&neighborhood->mu);
552 gpr_mu_lock(&pollset->mu);
553 if (!pollset->seen_inactive) {
554 if (pollset->neighborhood != neighborhood) {
555 gpr_mu_unlock(&neighborhood->mu);
556 neighborhood = pollset->neighborhood;
557 gpr_mu_unlock(&pollset->mu);
558 goto retry_lock_neighborhood;
559 }
560 pollset->prev->next = pollset->next;
561 pollset->next->prev = pollset->prev;
562 if (pollset == pollset->neighborhood->active_root) {
563 pollset->neighborhood->active_root =
564 pollset->next == pollset ? nullptr : pollset->next;
565 }
566 }
567 gpr_mu_unlock(&pollset->neighborhood->mu);
568 }
569 gpr_mu_unlock(&pollset->mu);
570 gpr_mu_destroy(&pollset->mu);
571 }
572
pollset_kick_all(grpc_pollset * pollset)573 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
574 GPR_TIMER_SCOPE("pollset_kick_all", 0);
575 grpc_error* error = GRPC_ERROR_NONE;
576 if (pollset->root_worker != nullptr) {
577 grpc_pollset_worker* worker = pollset->root_worker;
578 do {
579 GRPC_STATS_INC_POLLSET_KICK();
580 switch (worker->state) {
581 case KICKED:
582 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
583 break;
584 case UNKICKED:
585 SET_KICK_STATE(worker, KICKED);
586 if (worker->initialized_cv) {
587 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
588 gpr_cv_signal(&worker->cv);
589 }
590 break;
591 case DESIGNATED_POLLER:
592 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
593 SET_KICK_STATE(worker, KICKED);
594 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
595 "pollset_kick_all");
596 break;
597 }
598
599 worker = worker->next;
600 } while (worker != pollset->root_worker);
601 }
602 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
603 // in the else case
604 return error;
605 }
606
pollset_maybe_finish_shutdown(grpc_pollset * pollset)607 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
608 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
609 pollset->begin_refs == 0) {
610 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
611 GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
612 pollset->shutdown_closure = nullptr;
613 }
614 }
615
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)616 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
617 GPR_TIMER_SCOPE("pollset_shutdown", 0);
618 GPR_ASSERT(pollset->shutdown_closure == nullptr);
619 GPR_ASSERT(!pollset->shutting_down);
620 pollset->shutdown_closure = closure;
621 pollset->shutting_down = true;
622 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
623 pollset_maybe_finish_shutdown(pollset);
624 }
625
poll_deadline_to_millis_timeout(grpc_millis millis)626 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
627 if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
628 grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
629 if (delta > INT_MAX) {
630 return INT_MAX;
631 } else if (delta < 0) {
632 return 0;
633 } else {
634 return static_cast<int>(delta);
635 }
636 }
637
638 /* Process the epoll events found by do_epoll_wait() function.
639 - g_epoll_set.cursor points to the index of the first event to be processed
640 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
641 updates the g_epoll_set.cursor
642
643 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
644 called by g_active_poller thread. So there is no need for synchronization
645 when accessing fields in g_epoll_set */
process_epoll_events(grpc_pollset * pollset)646 static grpc_error* process_epoll_events(grpc_pollset* pollset) {
647 GPR_TIMER_SCOPE("process_epoll_events", 0);
648
649 static const char* err_desc = "process_events";
650 grpc_error* error = GRPC_ERROR_NONE;
651 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
652 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
653 for (int idx = 0;
654 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
655 idx++) {
656 long c = cursor++;
657 struct epoll_event* ev = &g_epoll_set.events[c];
658 void* data_ptr = ev->data.ptr;
659
660 if (data_ptr == &global_wakeup_fd) {
661 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
662 err_desc);
663 } else {
664 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
665 reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
666 bool track_err =
667 reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
668 bool cancel = (ev->events & EPOLLHUP) != 0;
669 bool error = (ev->events & EPOLLERR) != 0;
670 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
671 bool write_ev = (ev->events & EPOLLOUT) != 0;
672 bool err_fallback = error && !track_err;
673
674 if (error && !err_fallback) {
675 fd_has_errors(fd);
676 }
677
678 if (read_ev || cancel || err_fallback) {
679 fd_become_readable(fd);
680 }
681
682 if (write_ev || cancel || err_fallback) {
683 fd_become_writable(fd);
684 }
685 }
686 }
687 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
688 return error;
689 }
690
691 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
692 "process" any of the events yet; that is done in process_epoll_events().
693 *See process_epoll_events() function for more details.
694
695 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
696 (i.e the designated poller thread) will be calling this function. So there is
697 no need for any synchronization when accesing fields in g_epoll_set */
do_epoll_wait(grpc_pollset * ps,grpc_millis deadline)698 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
699 GPR_TIMER_SCOPE("do_epoll_wait", 0);
700
701 int r;
702 int timeout = poll_deadline_to_millis_timeout(deadline);
703 if (timeout != 0) {
704 GRPC_SCHEDULING_START_BLOCKING_REGION;
705 }
706 do {
707 GRPC_STATS_INC_SYSCALL_POLL();
708 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
709 timeout);
710 } while (r < 0 && errno == EINTR);
711 if (timeout != 0) {
712 GRPC_SCHEDULING_END_BLOCKING_REGION;
713 }
714
715 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
716
717 GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
718
719 if (grpc_polling_trace.enabled()) {
720 gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
721 }
722
723 gpr_atm_rel_store(&g_epoll_set.num_events, r);
724 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
725
726 return GRPC_ERROR_NONE;
727 }
728
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)729 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
730 grpc_pollset_worker** worker_hdl,
731 grpc_millis deadline) {
732 GPR_TIMER_SCOPE("begin_worker", 0);
733 if (worker_hdl != nullptr) *worker_hdl = worker;
734 worker->initialized_cv = false;
735 SET_KICK_STATE(worker, UNKICKED);
736 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
737 pollset->begin_refs++;
738
739 if (grpc_polling_trace.enabled()) {
740 gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
741 }
742
743 if (pollset->seen_inactive) {
744 // pollset has been observed to be inactive, we need to move back to the
745 // active list
746 bool is_reassigning = false;
747 if (!pollset->reassigning_neighborhood) {
748 is_reassigning = true;
749 pollset->reassigning_neighborhood = true;
750 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
751 }
752 pollset_neighborhood* neighborhood = pollset->neighborhood;
753 gpr_mu_unlock(&pollset->mu);
754 // pollset unlocked: state may change (even worker->kick_state)
755 retry_lock_neighborhood:
756 gpr_mu_lock(&neighborhood->mu);
757 gpr_mu_lock(&pollset->mu);
758 if (grpc_polling_trace.enabled()) {
759 gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
760 pollset, worker, kick_state_string(worker->state),
761 is_reassigning);
762 }
763 if (pollset->seen_inactive) {
764 if (neighborhood != pollset->neighborhood) {
765 gpr_mu_unlock(&neighborhood->mu);
766 neighborhood = pollset->neighborhood;
767 gpr_mu_unlock(&pollset->mu);
768 goto retry_lock_neighborhood;
769 }
770
771 /* In the brief time we released the pollset locks above, the worker MAY
772 have been kicked. In this case, the worker should get out of this
773 pollset ASAP and hence this should neither add the pollset to
774 neighborhood nor mark the pollset as active.
775
776 On a side note, the only way a worker's kick state could have changed
777 at this point is if it were "kicked specifically". Since the worker has
778 not added itself to the pollset yet (by calling worker_insert()), it is
779 not visible in the "kick any" path yet */
780 if (worker->state == UNKICKED) {
781 pollset->seen_inactive = false;
782 if (neighborhood->active_root == nullptr) {
783 neighborhood->active_root = pollset->next = pollset->prev = pollset;
784 /* Make this the designated poller if there isn't one already */
785 if (worker->state == UNKICKED &&
786 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
787 SET_KICK_STATE(worker, DESIGNATED_POLLER);
788 }
789 } else {
790 pollset->next = neighborhood->active_root;
791 pollset->prev = pollset->next->prev;
792 pollset->next->prev = pollset->prev->next = pollset;
793 }
794 }
795 }
796 if (is_reassigning) {
797 GPR_ASSERT(pollset->reassigning_neighborhood);
798 pollset->reassigning_neighborhood = false;
799 }
800 gpr_mu_unlock(&neighborhood->mu);
801 }
802
803 worker_insert(pollset, worker);
804 pollset->begin_refs--;
805 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
806 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
807 worker->initialized_cv = true;
808 gpr_cv_init(&worker->cv);
809 while (worker->state == UNKICKED && !pollset->shutting_down) {
810 if (grpc_polling_trace.enabled()) {
811 gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
812 pollset, worker, kick_state_string(worker->state),
813 pollset->shutting_down);
814 }
815
816 if (gpr_cv_wait(&worker->cv, &pollset->mu,
817 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
818 worker->state == UNKICKED) {
819 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
820 received a kick */
821 SET_KICK_STATE(worker, KICKED);
822 }
823 }
824 grpc_core::ExecCtx::Get()->InvalidateNow();
825 }
826
827 if (grpc_polling_trace.enabled()) {
828 gpr_log(GPR_INFO,
829 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
830 "kicked_without_poller: %d",
831 pollset, worker, kick_state_string(worker->state),
832 pollset->shutting_down, pollset->kicked_without_poller);
833 }
834
835 /* We release pollset lock in this function at a couple of places:
836 * 1. Briefly when assigning pollset to a neighborhood
837 * 2. When doing gpr_cv_wait()
838 * It is possible that 'kicked_without_poller' was set to true during (1) and
839 * 'shutting_down' is set to true during (1) or (2). If either of them is
840 * true, this worker cannot do polling */
841 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
842 * case; especially when the worker is the DESIGNATED_POLLER */
843
844 if (pollset->kicked_without_poller) {
845 pollset->kicked_without_poller = false;
846 return false;
847 }
848
849 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
850 }
851
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)852 static bool check_neighborhood_for_available_poller(
853 pollset_neighborhood* neighborhood) {
854 GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
855 bool found_worker = false;
856 do {
857 grpc_pollset* inspect = neighborhood->active_root;
858 if (inspect == nullptr) {
859 break;
860 }
861 gpr_mu_lock(&inspect->mu);
862 GPR_ASSERT(!inspect->seen_inactive);
863 grpc_pollset_worker* inspect_worker = inspect->root_worker;
864 if (inspect_worker != nullptr) {
865 do {
866 switch (inspect_worker->state) {
867 case UNKICKED:
868 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
869 (gpr_atm)inspect_worker)) {
870 if (grpc_polling_trace.enabled()) {
871 gpr_log(GPR_INFO, " .. choose next poller to be %p",
872 inspect_worker);
873 }
874 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
875 if (inspect_worker->initialized_cv) {
876 GPR_TIMER_MARK("signal worker", 0);
877 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
878 gpr_cv_signal(&inspect_worker->cv);
879 }
880 } else {
881 if (grpc_polling_trace.enabled()) {
882 gpr_log(GPR_INFO, " .. beaten to choose next poller");
883 }
884 }
885 // even if we didn't win the cas, there's a worker, we can stop
886 found_worker = true;
887 break;
888 case KICKED:
889 break;
890 case DESIGNATED_POLLER:
891 found_worker = true; // ok, so someone else found the worker, but
892 // we'll accept that
893 break;
894 }
895 inspect_worker = inspect_worker->next;
896 } while (!found_worker && inspect_worker != inspect->root_worker);
897 }
898 if (!found_worker) {
899 if (grpc_polling_trace.enabled()) {
900 gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
901 }
902 inspect->seen_inactive = true;
903 if (inspect == neighborhood->active_root) {
904 neighborhood->active_root =
905 inspect->next == inspect ? nullptr : inspect->next;
906 }
907 inspect->next->prev = inspect->prev;
908 inspect->prev->next = inspect->next;
909 inspect->next = inspect->prev = nullptr;
910 }
911 gpr_mu_unlock(&inspect->mu);
912 } while (!found_worker);
913 return found_worker;
914 }
915
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)916 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
917 grpc_pollset_worker** worker_hdl) {
918 GPR_TIMER_SCOPE("end_worker", 0);
919 if (grpc_polling_trace.enabled()) {
920 gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
921 }
922 if (worker_hdl != nullptr) *worker_hdl = nullptr;
923 /* Make sure we appear kicked */
924 SET_KICK_STATE(worker, KICKED);
925 grpc_closure_list_move(&worker->schedule_on_end_work,
926 grpc_core::ExecCtx::Get()->closure_list());
927 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
928 if (worker->next != worker && worker->next->state == UNKICKED) {
929 if (grpc_polling_trace.enabled()) {
930 gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
931 }
932 GPR_ASSERT(worker->next->initialized_cv);
933 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
934 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
935 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
936 gpr_cv_signal(&worker->next->cv);
937 if (grpc_core::ExecCtx::Get()->HasWork()) {
938 gpr_mu_unlock(&pollset->mu);
939 grpc_core::ExecCtx::Get()->Flush();
940 gpr_mu_lock(&pollset->mu);
941 }
942 } else {
943 gpr_atm_no_barrier_store(&g_active_poller, 0);
944 size_t poller_neighborhood_idx =
945 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
946 gpr_mu_unlock(&pollset->mu);
947 bool found_worker = false;
948 bool scan_state[MAX_NEIGHBORHOODS];
949 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
950 pollset_neighborhood* neighborhood =
951 &g_neighborhoods[(poller_neighborhood_idx + i) %
952 g_num_neighborhoods];
953 if (gpr_mu_trylock(&neighborhood->mu)) {
954 found_worker = check_neighborhood_for_available_poller(neighborhood);
955 gpr_mu_unlock(&neighborhood->mu);
956 scan_state[i] = true;
957 } else {
958 scan_state[i] = false;
959 }
960 }
961 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
962 if (scan_state[i]) continue;
963 pollset_neighborhood* neighborhood =
964 &g_neighborhoods[(poller_neighborhood_idx + i) %
965 g_num_neighborhoods];
966 gpr_mu_lock(&neighborhood->mu);
967 found_worker = check_neighborhood_for_available_poller(neighborhood);
968 gpr_mu_unlock(&neighborhood->mu);
969 }
970 grpc_core::ExecCtx::Get()->Flush();
971 gpr_mu_lock(&pollset->mu);
972 }
973 } else if (grpc_core::ExecCtx::Get()->HasWork()) {
974 gpr_mu_unlock(&pollset->mu);
975 grpc_core::ExecCtx::Get()->Flush();
976 gpr_mu_lock(&pollset->mu);
977 }
978 if (worker->initialized_cv) {
979 gpr_cv_destroy(&worker->cv);
980 }
981 if (grpc_polling_trace.enabled()) {
982 gpr_log(GPR_INFO, " .. remove worker");
983 }
984 if (EMPTIED == worker_remove(pollset, worker)) {
985 pollset_maybe_finish_shutdown(pollset);
986 }
987 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
988 }
989
990 /* pollset->po.mu lock must be held by the caller before calling this.
991 The function pollset_work() may temporarily release the lock (pollset->po.mu)
992 during the course of its execution but it will always re-acquire the lock and
993 ensure that it is held by the time the function returns */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)994 static grpc_error* pollset_work(grpc_pollset* ps,
995 grpc_pollset_worker** worker_hdl,
996 grpc_millis deadline) {
997 GPR_TIMER_SCOPE("pollset_work", 0);
998 grpc_pollset_worker worker;
999 grpc_error* error = GRPC_ERROR_NONE;
1000 static const char* err_desc = "pollset_work";
1001 if (ps->kicked_without_poller) {
1002 ps->kicked_without_poller = false;
1003 return GRPC_ERROR_NONE;
1004 }
1005
1006 if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1007 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1008 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1009 GPR_ASSERT(!ps->shutting_down);
1010 GPR_ASSERT(!ps->seen_inactive);
1011
1012 gpr_mu_unlock(&ps->mu); /* unlock */
1013 /* This is the designated polling thread at this point and should ideally do
1014 polling. However, if there are unprocessed events left from a previous
1015 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1016 process the pending epoll events.
1017
1018 The reason for decoupling do_epoll_wait and process_epoll_events is to
1019 better distrubute the work (i.e handling epoll events) across multiple
1020 threads
1021
1022 process_epoll_events() returns very quickly: It just queues the work on
1023 exec_ctx but does not execute it (the actual exectution or more
1024 accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1025 AFTER selecting a designated poller). So we are not waiting long periods
1026 without a designated poller */
1027 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1028 gpr_atm_acq_load(&g_epoll_set.num_events)) {
1029 append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1030 }
1031 append_error(&error, process_epoll_events(ps), err_desc);
1032
1033 gpr_mu_lock(&ps->mu); /* lock */
1034
1035 gpr_tls_set(&g_current_thread_worker, 0);
1036 } else {
1037 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1038 }
1039 end_worker(ps, &worker, worker_hdl);
1040
1041 gpr_tls_set(&g_current_thread_pollset, 0);
1042 return error;
1043 }
1044
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1045 static grpc_error* pollset_kick(grpc_pollset* pollset,
1046 grpc_pollset_worker* specific_worker) {
1047 GPR_TIMER_SCOPE("pollset_kick", 0);
1048 GRPC_STATS_INC_POLLSET_KICK();
1049 grpc_error* ret_err = GRPC_ERROR_NONE;
1050 if (grpc_polling_trace.enabled()) {
1051 gpr_strvec log;
1052 gpr_strvec_init(&log);
1053 char* tmp;
1054 gpr_asprintf(&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
1055 specific_worker, (void*)gpr_tls_get(&g_current_thread_pollset),
1056 (void*)gpr_tls_get(&g_current_thread_worker),
1057 pollset->root_worker);
1058 gpr_strvec_add(&log, tmp);
1059 if (pollset->root_worker != nullptr) {
1060 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
1061 kick_state_string(pollset->root_worker->state),
1062 pollset->root_worker->next,
1063 kick_state_string(pollset->root_worker->next->state));
1064 gpr_strvec_add(&log, tmp);
1065 }
1066 if (specific_worker != nullptr) {
1067 gpr_asprintf(&tmp, " worker_kick_state=%s",
1068 kick_state_string(specific_worker->state));
1069 gpr_strvec_add(&log, tmp);
1070 }
1071 tmp = gpr_strvec_flatten(&log, nullptr);
1072 gpr_strvec_destroy(&log);
1073 gpr_log(GPR_DEBUG, "%s", tmp);
1074 gpr_free(tmp);
1075 }
1076
1077 if (specific_worker == nullptr) {
1078 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
1079 grpc_pollset_worker* root_worker = pollset->root_worker;
1080 if (root_worker == nullptr) {
1081 GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1082 pollset->kicked_without_poller = true;
1083 if (grpc_polling_trace.enabled()) {
1084 gpr_log(GPR_INFO, " .. kicked_without_poller");
1085 }
1086 goto done;
1087 }
1088 grpc_pollset_worker* next_worker = root_worker->next;
1089 if (root_worker->state == KICKED) {
1090 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1091 if (grpc_polling_trace.enabled()) {
1092 gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1093 }
1094 SET_KICK_STATE(root_worker, KICKED);
1095 goto done;
1096 } else if (next_worker->state == KICKED) {
1097 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1098 if (grpc_polling_trace.enabled()) {
1099 gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1100 }
1101 SET_KICK_STATE(next_worker, KICKED);
1102 goto done;
1103 } else if (root_worker ==
1104 next_worker && // only try and wake up a poller if
1105 // there is no next worker
1106 root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
1107 &g_active_poller)) {
1108 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1109 if (grpc_polling_trace.enabled()) {
1110 gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1111 }
1112 SET_KICK_STATE(root_worker, KICKED);
1113 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1114 goto done;
1115 } else if (next_worker->state == UNKICKED) {
1116 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1117 if (grpc_polling_trace.enabled()) {
1118 gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1119 }
1120 GPR_ASSERT(next_worker->initialized_cv);
1121 SET_KICK_STATE(next_worker, KICKED);
1122 gpr_cv_signal(&next_worker->cv);
1123 goto done;
1124 } else if (next_worker->state == DESIGNATED_POLLER) {
1125 if (root_worker->state != DESIGNATED_POLLER) {
1126 if (grpc_polling_trace.enabled()) {
1127 gpr_log(
1128 GPR_INFO,
1129 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1130 root_worker, root_worker->initialized_cv, next_worker);
1131 }
1132 SET_KICK_STATE(root_worker, KICKED);
1133 if (root_worker->initialized_cv) {
1134 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1135 gpr_cv_signal(&root_worker->cv);
1136 }
1137 goto done;
1138 } else {
1139 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1140 if (grpc_polling_trace.enabled()) {
1141 gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1142 root_worker);
1143 }
1144 SET_KICK_STATE(next_worker, KICKED);
1145 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1146 goto done;
1147 }
1148 } else {
1149 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1150 GPR_ASSERT(next_worker->state == KICKED);
1151 SET_KICK_STATE(next_worker, KICKED);
1152 goto done;
1153 }
1154 } else {
1155 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1156 if (grpc_polling_trace.enabled()) {
1157 gpr_log(GPR_INFO, " .. kicked while waking up");
1158 }
1159 goto done;
1160 }
1161
1162 GPR_UNREACHABLE_CODE(goto done);
1163 }
1164
1165 if (specific_worker->state == KICKED) {
1166 if (grpc_polling_trace.enabled()) {
1167 gpr_log(GPR_INFO, " .. specific worker already kicked");
1168 }
1169 goto done;
1170 } else if (gpr_tls_get(&g_current_thread_worker) ==
1171 (intptr_t)specific_worker) {
1172 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1173 if (grpc_polling_trace.enabled()) {
1174 gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1175 }
1176 SET_KICK_STATE(specific_worker, KICKED);
1177 goto done;
1178 } else if (specific_worker ==
1179 (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
1180 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1181 if (grpc_polling_trace.enabled()) {
1182 gpr_log(GPR_INFO, " .. kick active poller");
1183 }
1184 SET_KICK_STATE(specific_worker, KICKED);
1185 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1186 goto done;
1187 } else if (specific_worker->initialized_cv) {
1188 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1189 if (grpc_polling_trace.enabled()) {
1190 gpr_log(GPR_INFO, " .. kick waiting worker");
1191 }
1192 SET_KICK_STATE(specific_worker, KICKED);
1193 gpr_cv_signal(&specific_worker->cv);
1194 goto done;
1195 } else {
1196 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1197 if (grpc_polling_trace.enabled()) {
1198 gpr_log(GPR_INFO, " .. kick non-waiting worker");
1199 }
1200 SET_KICK_STATE(specific_worker, KICKED);
1201 goto done;
1202 }
1203 done:
1204 return ret_err;
1205 }
1206
pollset_add_fd(grpc_pollset * pollset,grpc_fd * fd)1207 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}
1208
1209 /*******************************************************************************
1210 * Pollset-set Definitions
1211 */
1212
pollset_set_create(void)1213 static grpc_pollset_set* pollset_set_create(void) {
1214 return (grpc_pollset_set*)(static_cast<intptr_t>(0xdeafbeef));
1215 }
1216
pollset_set_destroy(grpc_pollset_set * pss)1217 static void pollset_set_destroy(grpc_pollset_set* pss) {}
1218
pollset_set_add_fd(grpc_pollset_set * pss,grpc_fd * fd)1219 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1220
pollset_set_del_fd(grpc_pollset_set * pss,grpc_fd * fd)1221 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1222
pollset_set_add_pollset(grpc_pollset_set * pss,grpc_pollset * ps)1223 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1224
pollset_set_del_pollset(grpc_pollset_set * pss,grpc_pollset * ps)1225 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1226
pollset_set_add_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)1227 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1228 grpc_pollset_set* item) {}
1229
pollset_set_del_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)1230 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1231 grpc_pollset_set* item) {}
1232
1233 /*******************************************************************************
1234 * Event engine binding
1235 */
1236
shutdown_engine(void)1237 static void shutdown_engine(void) {
1238 fd_global_shutdown();
1239 pollset_global_shutdown();
1240 epoll_set_shutdown();
1241 if (grpc_core::Fork::Enabled()) {
1242 gpr_mu_destroy(&fork_fd_list_mu);
1243 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1244 }
1245 }
1246
1247 static const grpc_event_engine_vtable vtable = {
1248 sizeof(grpc_pollset),
1249 true,
1250
1251 fd_create,
1252 fd_wrapped_fd,
1253 fd_orphan,
1254 fd_shutdown,
1255 fd_notify_on_read,
1256 fd_notify_on_write,
1257 fd_notify_on_error,
1258 fd_become_readable,
1259 fd_become_writable,
1260 fd_has_errors,
1261 fd_is_shutdown,
1262
1263 pollset_init,
1264 pollset_shutdown,
1265 pollset_destroy,
1266 pollset_work,
1267 pollset_kick,
1268 pollset_add_fd,
1269
1270 pollset_set_create,
1271 pollset_set_destroy,
1272 pollset_set_add_pollset,
1273 pollset_set_del_pollset,
1274 pollset_set_add_pollset_set,
1275 pollset_set_del_pollset_set,
1276 pollset_set_add_fd,
1277 pollset_set_del_fd,
1278
1279 shutdown_engine,
1280 };
1281
1282 /* Called by the child process's post-fork handler to close open fds, including
1283 * the global epoll fd. This allows gRPC to shutdown in the child process
1284 * without interfering with connections or RPCs ongoing in the parent. */
reset_event_manager_on_fork()1285 static void reset_event_manager_on_fork() {
1286 gpr_mu_lock(&fork_fd_list_mu);
1287 while (fork_fd_list_head != nullptr) {
1288 close(fork_fd_list_head->fd);
1289 fork_fd_list_head->fd = -1;
1290 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1291 }
1292 gpr_mu_unlock(&fork_fd_list_mu);
1293 shutdown_engine();
1294 grpc_init_epoll1_linux(true);
1295 }
1296
1297 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1298 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1299 * support is available */
grpc_init_epoll1_linux(bool explicit_request)1300 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1301 if (!grpc_has_wakeup_fd()) {
1302 gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1303 return nullptr;
1304 }
1305
1306 if (!epoll_set_init()) {
1307 return nullptr;
1308 }
1309
1310 fd_global_init();
1311
1312 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1313 fd_global_shutdown();
1314 epoll_set_shutdown();
1315 return nullptr;
1316 }
1317
1318 if (grpc_core::Fork::Enabled()) {
1319 gpr_mu_init(&fork_fd_list_mu);
1320 grpc_core::Fork::SetResetChildPollingEngineFunc(
1321 reset_event_manager_on_fork);
1322 }
1323 return &vtable;
1324 }
1325
1326 #else /* defined(GRPC_LINUX_EPOLL) */
1327 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1328 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1329 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1330 * NULL */
grpc_init_epoll1_linux(bool explicit_request)1331 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1332 return nullptr;
1333 }
1334 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1335 #endif /* !defined(GRPC_LINUX_EPOLL) */
1336