1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <grpc/grpc_posix.h>
24 #include <grpc/support/log.h>
25 
26 /* This polling engine is only relevant on linux kernels supporting epoll() */
27 #ifdef GRPC_LINUX_EPOLL_CREATE1
28 
29 #include "src/core/lib/iomgr/ev_epollsig_linux.h"
30 
31 #include <assert.h>
32 #include <errno.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <signal.h>
37 #include <string.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41 
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/string_util.h>
44 
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/tls.h"
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/manual_constructor.h"
49 #include "src/core/lib/iomgr/block_annotate.h"
50 #include "src/core/lib/iomgr/ev_posix.h"
51 #include "src/core/lib/iomgr/iomgr_internal.h"
52 #include "src/core/lib/iomgr/lockfree_event.h"
53 #include "src/core/lib/iomgr/timer.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
56 
57 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
58 
59 #define GRPC_POLLING_TRACE(...)       \
60   if (grpc_polling_trace.enabled()) { \
61     gpr_log(GPR_INFO, __VA_ARGS__);   \
62   }
63 
64 static int grpc_wakeup_signal = -1;
65 static bool is_grpc_wakeup_signal_initialized = false;
66 
67 /* Implements the function defined in grpc_posix.h. This function might be
68  * called before even calling grpc_init() to set either a different signal to
69  * use. If signum == -1, then the use of signals is disabled */
grpc_use_signal(int signum)70 void grpc_use_signal(int signum) {
71   grpc_wakeup_signal = signum;
72   is_grpc_wakeup_signal_initialized = true;
73 
74   if (grpc_wakeup_signal < 0) {
75     gpr_log(GPR_INFO,
76             "Use of signals is disabled. Epoll engine will not be used");
77   } else {
78     gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
79             grpc_wakeup_signal);
80   }
81 }
82 
83 struct polling_island;
84 
85 typedef enum {
86   POLL_OBJ_FD,
87   POLL_OBJ_POLLSET,
88   POLL_OBJ_POLLSET_SET
89 } poll_obj_type;
90 
91 typedef struct poll_obj {
92 #ifndef NDEBUG
93   poll_obj_type obj_type;
94 #endif
95   gpr_mu mu;
96   struct polling_island* pi;
97 } poll_obj;
98 
poll_obj_string(poll_obj_type po_type)99 const char* poll_obj_string(poll_obj_type po_type) {
100   switch (po_type) {
101     case POLL_OBJ_FD:
102       return "fd";
103     case POLL_OBJ_POLLSET:
104       return "pollset";
105     case POLL_OBJ_POLLSET_SET:
106       return "pollset_set";
107   }
108 
109   GPR_UNREACHABLE_CODE(return "UNKNOWN");
110 }
111 
112   /*******************************************************************************
113    * Fd Declarations
114    */
115 
116 #define FD_FROM_PO(po) ((grpc_fd*)(po))
117 
118 struct grpc_fd {
119   poll_obj po;
120 
121   int fd;
122   /* refst format:
123        bit 0    : 1=Active / 0=Orphaned
124        bits 1-n : refcount
125      Ref/Unref by two to avoid altering the orphaned bit */
126   gpr_atm refst;
127 
128   /* The fd is either closed or we relinquished control of it. In either
129      cases, this indicates that the 'fd' on this structure is no longer
130      valid */
131   bool orphaned;
132 
133   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
134   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
135   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
136 
137   struct grpc_fd* freelist_next;
138   grpc_closure* on_done_closure;
139 
140   grpc_iomgr_object iomgr_object;
141 
142   /* Do we need to track EPOLLERR events separately? */
143   bool track_err;
144 };
145 
146 /* Reference counting for fds */
147 #ifndef NDEBUG
148 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
149 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
150                      int line);
151 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
152 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
153 #else
154 static void fd_ref(grpc_fd* fd);
155 static void fd_unref(grpc_fd* fd);
156 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
157 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
158 #endif
159 
160 static void fd_global_init(void);
161 static void fd_global_shutdown(void);
162 
163 /*******************************************************************************
164  * Polling island Declarations
165  */
166 
167 #ifndef NDEBUG
168 
169 #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
170 #define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
171 
172 #else
173 
174 #define PI_ADD_REF(p, r) pi_add_ref((p))
175 #define PI_UNREF(p, r) pi_unref((p))
176 
177 #endif
178 
179 /* This is also used as grpc_workqueue (by directly casing it) */
180 typedef struct polling_island {
181   gpr_mu mu;
182   /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
183      the refcount.
184      Once the ref count becomes zero, this structure is destroyed which means
185      we should ensure that there is never a scenario where a PI_ADD_REF() is
186      racing with a PI_UNREF() that just made the ref_count zero. */
187   gpr_atm ref_count;
188 
189   /* Pointer to the polling_island this merged into.
190    * merged_to value is only set once in polling_island's lifetime (and that too
191    * only if the island is merged with another island). Because of this, we can
192    * use gpr_atm type here so that we can do atomic access on this and reduce
193    * lock contention on 'mu' mutex.
194    *
195    * Note that if this field is not NULL (i.e not 0), all the remaining fields
196    * (except mu and ref_count) are invalid and must be ignored. */
197   gpr_atm merged_to;
198 
199   /* Number of threads currently polling on this island */
200   gpr_atm poller_count;
201 
202   /* The fd of the underlying epoll set */
203   int epoll_fd;
204 
205   /* The file descriptors in the epoll set */
206   size_t fd_cnt;
207   size_t fd_capacity;
208   grpc_fd** fds;
209 } polling_island;
210 
211 /*******************************************************************************
212  * Pollset Declarations
213  */
214 struct grpc_pollset_worker {
215   /* Thread id of this worker */
216   pthread_t pt_id;
217 
218   /* Used to prevent a worker from getting kicked multiple times */
219   gpr_atm is_kicked;
220   struct grpc_pollset_worker* next;
221   struct grpc_pollset_worker* prev;
222 };
223 
224 struct grpc_pollset {
225   poll_obj po;
226 
227   grpc_pollset_worker root_worker;
228   bool kicked_without_pollers;
229 
230   bool shutting_down;          /* Is the pollset shutting down ? */
231   bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
232   grpc_closure* shutdown_done; /* Called after after shutdown is complete */
233 };
234 
235 /*******************************************************************************
236  * Pollset-set Declarations
237  */
238 struct grpc_pollset_set {
239   poll_obj po;
240 };
241 
242 /*******************************************************************************
243  * Common helpers
244  */
245 
append_error(grpc_error ** composite,grpc_error * error,const char * desc)246 static bool append_error(grpc_error** composite, grpc_error* error,
247                          const char* desc) {
248   if (error == GRPC_ERROR_NONE) return true;
249   if (*composite == GRPC_ERROR_NONE) {
250     *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
251   }
252   *composite = grpc_error_add_child(*composite, error);
253   return false;
254 }
255 
256 /*******************************************************************************
257  * Polling island Definitions
258  */
259 
260 /* The wakeup fd that is used to wake up all threads in a Polling island. This
261    is useful in the polling island merge operation where we need to wakeup all
262    the threads currently polling the smaller polling island (so that they can
263    start polling the new/merged polling island)
264 
265    NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
266    threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
267 static grpc_wakeup_fd polling_island_wakeup_fd;
268 
269 /* The polling island being polled right now.
270    See comments in workqueue_maybe_wakeup for why this is tracked. */
271 static __thread polling_island* g_current_thread_polling_island;
272 
273 /* Forward declaration */
274 static void polling_island_delete(polling_island* pi);
275 
276 #ifdef GRPC_TSAN
277 /* Currently TSAN may incorrectly flag data races between epoll_ctl and
278    epoll_wait for any grpc_fd structs that are added to the epoll set via
279    epoll_ctl and are returned (within a very short window) via epoll_wait().
280 
281    To work-around this race, we establish a happens-before relation between
282    the code just-before epoll_ctl() and the code after epoll_wait() by using
283    this atomic */
284 gpr_atm g_epoll_sync;
285 #endif /* defined(GRPC_TSAN) */
286 
287 static void pi_add_ref(polling_island* pi);
288 static void pi_unref(polling_island* pi);
289 
290 #ifndef NDEBUG
pi_add_ref_dbg(polling_island * pi,const char * reason,const char * file,int line)291 static void pi_add_ref_dbg(polling_island* pi, const char* reason,
292                            const char* file, int line) {
293   if (grpc_polling_trace.enabled()) {
294     gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
295     gpr_log(GPR_INFO,
296             "Add ref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
297             " (%s) - (%s, %d)",
298             pi, old_cnt, old_cnt + 1, reason, file, line);
299   }
300   pi_add_ref(pi);
301 }
302 
pi_unref_dbg(polling_island * pi,const char * reason,const char * file,int line)303 static void pi_unref_dbg(polling_island* pi, const char* reason,
304                          const char* file, int line) {
305   if (grpc_polling_trace.enabled()) {
306     gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
307     gpr_log(GPR_INFO,
308             "Unref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
309             " (%s) - (%s, %d)",
310             pi, old_cnt, (old_cnt - 1), reason, file, line);
311   }
312   pi_unref(pi);
313 }
314 #endif
315 
pi_add_ref(polling_island * pi)316 static void pi_add_ref(polling_island* pi) {
317   gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
318 }
319 
pi_unref(polling_island * pi)320 static void pi_unref(polling_island* pi) {
321   /* If ref count went to zero, delete the polling island.
322      Note that this deletion not be done under a lock. Once the ref count goes
323      to zero, we are guaranteed that no one else holds a reference to the
324      polling island (and that there is no racing pi_add_ref() call either).
325 
326      Also, if we are deleting the polling island and the merged_to field is
327      non-empty, we should remove a ref to the merged_to polling island
328    */
329   if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
330     polling_island* next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
331     polling_island_delete(pi);
332     if (next != nullptr) {
333       PI_UNREF(next, "pi_delete"); /* Recursive call */
334     }
335   }
336 }
337 
338 /* The caller is expected to hold pi->mu lock before calling this function */
polling_island_add_fds_locked(polling_island * pi,grpc_fd ** fds,size_t fd_count,bool add_fd_refs,grpc_error ** error)339 static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
340                                           size_t fd_count, bool add_fd_refs,
341                                           grpc_error** error) {
342   int err;
343   size_t i;
344   struct epoll_event ev;
345   char* err_msg;
346   const char* err_desc = "polling_island_add_fds";
347 
348 #ifdef GRPC_TSAN
349   /* See the definition of g_epoll_sync for more context */
350   gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
351 #endif /* defined(GRPC_TSAN) */
352 
353   for (i = 0; i < fd_count; i++) {
354     ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
355     /* Use the least significant bit of ev.data.ptr to store track_err to avoid
356      * synchronization issues when accessing it after receiving an event */
357     ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
358                                           (fds[i]->track_err ? 1 : 0));
359     err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
360 
361     if (err < 0) {
362       if (errno != EEXIST) {
363         gpr_asprintf(
364             &err_msg,
365             "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
366             pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
367         append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
368         gpr_free(err_msg);
369       }
370 
371       continue;
372     }
373 
374     if (pi->fd_cnt == pi->fd_capacity) {
375       pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
376       pi->fds = static_cast<grpc_fd**>(
377           gpr_realloc(pi->fds, sizeof(grpc_fd*) * pi->fd_capacity));
378     }
379 
380     pi->fds[pi->fd_cnt++] = fds[i];
381     if (add_fd_refs) {
382       GRPC_FD_REF(fds[i], "polling_island");
383     }
384   }
385 }
386 
387 /* The caller is expected to hold pi->mu before calling this */
polling_island_add_wakeup_fd_locked(polling_island * pi,grpc_wakeup_fd * wakeup_fd,grpc_error ** error)388 static void polling_island_add_wakeup_fd_locked(polling_island* pi,
389                                                 grpc_wakeup_fd* wakeup_fd,
390                                                 grpc_error** error) {
391   struct epoll_event ev;
392   int err;
393   char* err_msg;
394   const char* err_desc = "polling_island_add_wakeup_fd";
395 
396   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
397   ev.data.ptr = wakeup_fd;
398   err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
399                   GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
400   if (err < 0 && errno != EEXIST) {
401     gpr_asprintf(&err_msg,
402                  "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
403                  "error: %d (%s)",
404                  pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), errno,
405                  strerror(errno));
406     append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
407     gpr_free(err_msg);
408   }
409 }
410 
411 /* The caller is expected to hold pi->mu lock before calling this function */
polling_island_remove_all_fds_locked(polling_island * pi,bool remove_fd_refs,grpc_error ** error)412 static void polling_island_remove_all_fds_locked(polling_island* pi,
413                                                  bool remove_fd_refs,
414                                                  grpc_error** error) {
415   int err;
416   size_t i;
417   char* err_msg;
418   const char* err_desc = "polling_island_remove_fds";
419 
420   for (i = 0; i < pi->fd_cnt; i++) {
421     err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, nullptr);
422     if (err < 0 && errno != ENOENT) {
423       gpr_asprintf(&err_msg,
424                    "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
425                    "error: %d (%s)",
426                    pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
427       append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
428       gpr_free(err_msg);
429     }
430 
431     if (remove_fd_refs) {
432       GRPC_FD_UNREF(pi->fds[i], "polling_island");
433     }
434   }
435 
436   pi->fd_cnt = 0;
437 }
438 
439 /* The caller is expected to hold pi->mu lock before calling this function */
polling_island_remove_fd_locked(polling_island * pi,grpc_fd * fd,grpc_error ** error)440 static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
441                                             grpc_error** error) {
442   int err;
443   size_t i;
444   char* err_msg;
445   const char* err_desc = "polling_island_remove_fd";
446 
447   /* If fd is already closed, then it would have been automatically been removed
448      from the epoll set */
449   err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
450   if (err < 0 && errno != ENOENT) {
451     gpr_asprintf(
452         &err_msg,
453         "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
454         pi->epoll_fd, fd->fd, errno, strerror(errno));
455     append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
456     gpr_free(err_msg);
457   }
458 
459   for (i = 0; i < pi->fd_cnt; i++) {
460     if (pi->fds[i] == fd) {
461       pi->fds[i] = pi->fds[--pi->fd_cnt];
462       GRPC_FD_UNREF(fd, "polling_island");
463       break;
464     }
465   }
466 }
467 
468 /* Might return NULL in case of an error */
polling_island_create(grpc_fd * initial_fd,grpc_error ** error)469 static polling_island* polling_island_create(grpc_fd* initial_fd,
470                                              grpc_error** error) {
471   polling_island* pi = nullptr;
472   const char* err_desc = "polling_island_create";
473 
474   *error = GRPC_ERROR_NONE;
475 
476   pi = static_cast<polling_island*>(gpr_malloc(sizeof(*pi)));
477   gpr_mu_init(&pi->mu);
478   pi->fd_cnt = 0;
479   pi->fd_capacity = 0;
480   pi->fds = nullptr;
481   pi->epoll_fd = -1;
482 
483   gpr_atm_rel_store(&pi->ref_count, 0);
484   gpr_atm_rel_store(&pi->poller_count, 0);
485   gpr_atm_rel_store(&pi->merged_to, (gpr_atm) nullptr);
486 
487   pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
488 
489   if (pi->epoll_fd < 0) {
490     append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
491     goto done;
492   }
493 
494   if (initial_fd != nullptr) {
495     polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
496   }
497 
498 done:
499   if (*error != GRPC_ERROR_NONE) {
500     polling_island_delete(pi);
501     pi = nullptr;
502   }
503   return pi;
504 }
505 
polling_island_delete(polling_island * pi)506 static void polling_island_delete(polling_island* pi) {
507   GPR_ASSERT(pi->fd_cnt == 0);
508 
509   if (pi->epoll_fd >= 0) {
510     close(pi->epoll_fd);
511   }
512   gpr_mu_destroy(&pi->mu);
513   gpr_free(pi->fds);
514   gpr_free(pi);
515 }
516 
517 /* Attempts to gets the last polling island in the linked list (liked by the
518  * 'merged_to' field). Since this does not lock the polling island, there are no
519  * guarantees that the island returned is the last island */
polling_island_maybe_get_latest(polling_island * pi)520 static polling_island* polling_island_maybe_get_latest(polling_island* pi) {
521   polling_island* next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
522   while (next != nullptr) {
523     pi = next;
524     next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
525   }
526 
527   return pi;
528 }
529 
530 /* Gets the lock on the *latest* polling island i.e the last polling island in
531    the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
532    returned polling island's mu.
533    Usage: To lock/unlock polling island "pi", do the following:
534       polling_island *pi_latest = polling_island_lock(pi);
535       ...
536       ... critical section ..
537       ...
538       gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
polling_island_lock(polling_island * pi)539 static polling_island* polling_island_lock(polling_island* pi) {
540   polling_island* next = nullptr;
541 
542   while (true) {
543     next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
544     if (next == nullptr) {
545       /* Looks like 'pi' is the last node in the linked list but unless we check
546          this by holding the pi->mu lock, we cannot be sure (i.e without the
547          pi->mu lock, we don't prevent island merges).
548          To be absolutely sure, check once more by holding the pi->mu lock */
549       gpr_mu_lock(&pi->mu);
550       next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
551       if (next == nullptr) {
552         /* pi is infact the last node and we have the pi->mu lock. we're done */
553         break;
554       }
555 
556       /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
557        * isn't the lock we are interested in. Continue traversing the list */
558       gpr_mu_unlock(&pi->mu);
559     }
560 
561     pi = next;
562   }
563 
564   return pi;
565 }
566 
567 /* Gets the lock on the *latest* polling islands in the linked lists pointed by
568    *p and *q (and also updates *p and *q to point to the latest polling islands)
569 
570    This function is needed because calling the following block of code to obtain
571    locks on polling islands (*p and *q) is prone to deadlocks.
572      {
573        polling_island_lock(*p, true);
574        polling_island_lock(*q, true);
575      }
576 
577    Usage/example:
578      polling_island *p1;
579      polling_island *p2;
580      ..
581      polling_island_lock_pair(&p1, &p2);
582      ..
583      .. Critical section with both p1 and p2 locked
584      ..
585      // Release locks: Always call polling_island_unlock_pair() to release locks
586      polling_island_unlock_pair(p1, p2);
587 */
polling_island_lock_pair(polling_island ** p,polling_island ** q)588 static void polling_island_lock_pair(polling_island** p, polling_island** q) {
589   polling_island* pi_1 = *p;
590   polling_island* pi_2 = *q;
591   polling_island* next_1 = nullptr;
592   polling_island* next_2 = nullptr;
593 
594   /* The algorithm is simple:
595       - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
596         keep updating pi_1 and pi_2)
597       - Then obtain locks on the islands by following a lock order rule of
598         locking polling_island with lower address first
599            Special case: Before obtaining the locks, check if pi_1 and pi_2 are
600            pointing to the same island. If that is the case, we can just call
601            polling_island_lock()
602       - After obtaining both the locks, double check that the polling islands
603         are still the last polling islands in their respective linked lists
604         (this is because there might have been polling island merges before
605         we got the lock)
606       - If the polling islands are the last islands, we are done. If not,
607         release the locks and continue the process from the first step */
608   while (true) {
609     next_1 = (polling_island*)gpr_atm_acq_load(&pi_1->merged_to);
610     while (next_1 != nullptr) {
611       pi_1 = next_1;
612       next_1 = (polling_island*)gpr_atm_acq_load(&pi_1->merged_to);
613     }
614 
615     next_2 = (polling_island*)gpr_atm_acq_load(&pi_2->merged_to);
616     while (next_2 != nullptr) {
617       pi_2 = next_2;
618       next_2 = (polling_island*)gpr_atm_acq_load(&pi_2->merged_to);
619     }
620 
621     if (pi_1 == pi_2) {
622       pi_1 = pi_2 = polling_island_lock(pi_1);
623       break;
624     }
625 
626     if (pi_1 < pi_2) {
627       gpr_mu_lock(&pi_1->mu);
628       gpr_mu_lock(&pi_2->mu);
629     } else {
630       gpr_mu_lock(&pi_2->mu);
631       gpr_mu_lock(&pi_1->mu);
632     }
633 
634     next_1 = (polling_island*)gpr_atm_acq_load(&pi_1->merged_to);
635     next_2 = (polling_island*)gpr_atm_acq_load(&pi_2->merged_to);
636     if (next_1 == nullptr && next_2 == nullptr) {
637       break;
638     }
639 
640     gpr_mu_unlock(&pi_1->mu);
641     gpr_mu_unlock(&pi_2->mu);
642   }
643 
644   *p = pi_1;
645   *q = pi_2;
646 }
647 
polling_island_unlock_pair(polling_island * p,polling_island * q)648 static void polling_island_unlock_pair(polling_island* p, polling_island* q) {
649   if (p == q) {
650     gpr_mu_unlock(&p->mu);
651   } else {
652     gpr_mu_unlock(&p->mu);
653     gpr_mu_unlock(&q->mu);
654   }
655 }
656 
polling_island_merge(polling_island * p,polling_island * q,grpc_error ** error)657 static polling_island* polling_island_merge(polling_island* p,
658                                             polling_island* q,
659                                             grpc_error** error) {
660   /* Get locks on both the polling islands */
661   polling_island_lock_pair(&p, &q);
662 
663   if (p != q) {
664     /* Make sure that p points to the polling island with fewer fds than q */
665     if (p->fd_cnt > q->fd_cnt) {
666       GPR_SWAP(polling_island*, p, q);
667     }
668 
669     /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
670        Note that the refcounts on the fds being moved will not change here.
671        This is why the last param in the following two functions is 'false') */
672     polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
673     polling_island_remove_all_fds_locked(p, false, error);
674 
675     /* Wakeup all the pollers (if any) on p so that they pickup this change */
676     polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
677 
678     /* Add the 'merged_to' link from p --> q */
679     gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
680     PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
681   }
682   /* else if p == q, nothing needs to be done */
683 
684   polling_island_unlock_pair(p, q);
685 
686   /* Return the merged polling island (Note that no merge would have happened
687      if p == q which is ok) */
688   return q;
689 }
690 
polling_island_global_init()691 static grpc_error* polling_island_global_init() {
692   grpc_error* error = GRPC_ERROR_NONE;
693 
694   error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
695   if (error == GRPC_ERROR_NONE) {
696     error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
697   }
698 
699   return error;
700 }
701 
polling_island_global_shutdown()702 static void polling_island_global_shutdown() {
703   grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
704 }
705 
706 /*******************************************************************************
707  * Fd Definitions
708  */
709 
710 /* We need to keep a freelist not because of any concerns of malloc performance
711  * but instead so that implementations with multiple threads in (for example)
712  * epoll_wait deal with the race between pollset removal and incoming poll
713  * notifications.
714  *
715  * The problem is that the poller ultimately holds a reference to this
716  * object, so it is very difficult to know when is safe to free it, at least
717  * without some expensive synchronization.
718  *
719  * If we keep the object freelisted, in the worst case losing this race just
720  * becomes a spurious read notification on a reused fd.
721  */
722 
723 /* The alarm system needs to be able to wakeup 'some poller' sometimes
724  * (specifically when a new alarm needs to be triggered earlier than the next
725  * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
726  * case occurs. */
727 
728 static grpc_fd* fd_freelist = nullptr;
729 static gpr_mu fd_freelist_mu;
730 
731 #ifndef NDEBUG
732 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
733 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)734 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
735                    int line) {
736   if (grpc_trace_fd_refcount.enabled()) {
737     gpr_log(GPR_DEBUG,
738             "FD %d %p   ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
739             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
740             gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
741   }
742 #else
743 #define REF_BY(fd, n, reason) ref_by(fd, n)
744 #define UNREF_BY(fd, n, reason) unref_by(fd, n)
745 static void ref_by(grpc_fd* fd, int n) {
746 #endif
747   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
748 }
749 
750 #ifndef NDEBUG
751 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
752                      int line) {
753   if (grpc_trace_fd_refcount.enabled()) {
754     gpr_log(GPR_DEBUG,
755             "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
756             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
757             gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
758   }
759 #else
760 static void unref_by(grpc_fd* fd, int n) {
761 #endif
762   gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
763   if (old == n) {
764     /* Add the fd to the freelist */
765     gpr_mu_lock(&fd_freelist_mu);
766     fd->freelist_next = fd_freelist;
767     fd_freelist = fd;
768     grpc_iomgr_unregister_object(&fd->iomgr_object);
769 
770     fd->read_closure->DestroyEvent();
771     fd->write_closure->DestroyEvent();
772     fd->error_closure->DestroyEvent();
773 
774     gpr_mu_unlock(&fd_freelist_mu);
775   } else {
776     GPR_ASSERT(old > n);
777   }
778 }
779 
780 /* Increment refcount by two to avoid changing the orphan bit */
781 #ifndef NDEBUG
782 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
783                    int line) {
784   ref_by(fd, 2, reason, file, line);
785 }
786 
787 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
788                      int line) {
789   unref_by(fd, 2, reason, file, line);
790 }
791 #else
792 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
793 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
794 #endif
795 
796 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
797 
798 static void fd_global_shutdown(void) {
799   gpr_mu_lock(&fd_freelist_mu);
800   gpr_mu_unlock(&fd_freelist_mu);
801   while (fd_freelist != nullptr) {
802     grpc_fd* fd = fd_freelist;
803     fd_freelist = fd_freelist->freelist_next;
804     gpr_mu_destroy(&fd->po.mu);
805     gpr_free(fd);
806   }
807   gpr_mu_destroy(&fd_freelist_mu);
808 }
809 
810 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
811   grpc_fd* new_fd = nullptr;
812 
813   gpr_mu_lock(&fd_freelist_mu);
814   if (fd_freelist != nullptr) {
815     new_fd = fd_freelist;
816     fd_freelist = fd_freelist->freelist_next;
817   }
818   gpr_mu_unlock(&fd_freelist_mu);
819 
820   if (new_fd == nullptr) {
821     new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
822     gpr_mu_init(&new_fd->po.mu);
823     new_fd->read_closure.Init();
824     new_fd->write_closure.Init();
825     new_fd->error_closure.Init();
826   }
827 
828   /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
829    * is a newly created fd (or an fd we got from the freelist), no one else
830    * would be holding a lock to it anyway. */
831   gpr_mu_lock(&new_fd->po.mu);
832   new_fd->po.pi = nullptr;
833 #ifndef NDEBUG
834   new_fd->po.obj_type = POLL_OBJ_FD;
835 #endif
836 
837   gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
838   new_fd->fd = fd;
839   new_fd->orphaned = false;
840   new_fd->read_closure->InitEvent();
841   new_fd->write_closure->InitEvent();
842   new_fd->error_closure->InitEvent();
843   new_fd->track_err = track_err;
844 
845   new_fd->freelist_next = nullptr;
846   new_fd->on_done_closure = nullptr;
847 
848   gpr_mu_unlock(&new_fd->po.mu);
849 
850   char* fd_name;
851   gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
852   grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
853   gpr_free(fd_name);
854   return new_fd;
855 }
856 
857 static int fd_wrapped_fd(grpc_fd* fd) {
858   int ret_fd = -1;
859   gpr_mu_lock(&fd->po.mu);
860   if (!fd->orphaned) {
861     ret_fd = fd->fd;
862   }
863   gpr_mu_unlock(&fd->po.mu);
864 
865   return ret_fd;
866 }
867 
868 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
869                       const char* reason) {
870   grpc_error* error = GRPC_ERROR_NONE;
871   polling_island* unref_pi = nullptr;
872 
873   gpr_mu_lock(&fd->po.mu);
874   fd->on_done_closure = on_done;
875 
876   /* Remove the active status but keep referenced. We want this grpc_fd struct
877      to be alive (and not added to freelist) until the end of this function */
878   REF_BY(fd, 1, reason);
879 
880   /* Remove the fd from the polling island:
881      - Get a lock on the latest polling island (i.e the last island in the
882        linked list pointed by fd->po.pi). This is the island that
883        would actually contain the fd
884      - Remove the fd from the latest polling island
885      - Unlock the latest polling island
886      - Set fd->po.pi to NULL (but remove the ref on the polling island
887        before doing this.) */
888   if (fd->po.pi != nullptr) {
889     polling_island* pi_latest = polling_island_lock(fd->po.pi);
890     polling_island_remove_fd_locked(pi_latest, fd, &error);
891     gpr_mu_unlock(&pi_latest->mu);
892 
893     unref_pi = fd->po.pi;
894     fd->po.pi = nullptr;
895   }
896 
897   /* If release_fd is not NULL, we should be relinquishing control of the file
898      descriptor fd->fd (but we still own the grpc_fd structure). */
899   if (release_fd != nullptr) {
900     *release_fd = fd->fd;
901   } else {
902     close(fd->fd);
903   }
904 
905   fd->orphaned = true;
906 
907   GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_REF(error));
908 
909   gpr_mu_unlock(&fd->po.mu);
910   UNREF_BY(fd, 2, reason); /* Drop the reference */
911   if (unref_pi != nullptr) {
912     /* Unref stale polling island here, outside the fd lock above.
913        The polling island owns a workqueue which owns an fd, and unreffing
914        inside the lock can cause an eventual lock loop that makes TSAN very
915        unhappy. */
916     PI_UNREF(unref_pi, "fd_orphan");
917   }
918   if (error != GRPC_ERROR_NONE) {
919     const char* msg = grpc_error_string(error);
920     gpr_log(GPR_DEBUG, "fd_orphan: %s", msg);
921   }
922   GRPC_ERROR_UNREF(error);
923 }
924 
925 static bool fd_is_shutdown(grpc_fd* fd) {
926   return fd->read_closure->IsShutdown();
927 }
928 
929 /* Might be called multiple times */
930 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
931   if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
932     shutdown(fd->fd, SHUT_RDWR);
933     fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
934     fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
935   }
936   GRPC_ERROR_UNREF(why);
937 }
938 
939 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
940   fd->read_closure->NotifyOn(closure);
941 }
942 
943 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
944   fd->write_closure->NotifyOn(closure);
945 }
946 
947 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
948   fd->error_closure->NotifyOn(closure);
949 }
950 
951 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
952 
953 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
954 
955 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
956 
957 /*******************************************************************************
958  * Pollset Definitions
959  */
960 GPR_TLS_DECL(g_current_thread_pollset);
961 GPR_TLS_DECL(g_current_thread_worker);
962 static __thread bool g_initialized_sigmask;
963 static __thread sigset_t g_orig_sigmask;
964 
965 static void sig_handler(int sig_num) {
966 #ifdef GRPC_EPOLL_DEBUG
967   gpr_log(GPR_INFO, "Received signal %d", sig_num);
968 #endif
969 }
970 
971 static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
972 
973 /* Global state management */
974 static grpc_error* pollset_global_init(void) {
975   gpr_tls_init(&g_current_thread_pollset);
976   gpr_tls_init(&g_current_thread_worker);
977   poller_kick_init();
978   return GRPC_ERROR_NONE;
979 }
980 
981 static void pollset_global_shutdown(void) {
982   gpr_tls_destroy(&g_current_thread_pollset);
983   gpr_tls_destroy(&g_current_thread_worker);
984 }
985 
986 static grpc_error* pollset_worker_kick(grpc_pollset_worker* worker) {
987   grpc_error* err = GRPC_ERROR_NONE;
988 
989   /* Kick the worker only if it was not already kicked */
990   if (gpr_atm_no_barrier_cas(&worker->is_kicked, static_cast<gpr_atm>(0),
991                              static_cast<gpr_atm>(1))) {
992     GRPC_POLLING_TRACE(
993         "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
994         (void*)worker, (long int)worker->pt_id);
995     int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
996     if (err_num != 0) {
997       err = GRPC_OS_ERROR(err_num, "pthread_kill");
998     }
999   }
1000   return err;
1001 }
1002 
1003 /* Return 1 if the pollset has active threads in pollset_work (pollset must
1004  * be locked) */
1005 static int pollset_has_workers(grpc_pollset* p) {
1006   return p->root_worker.next != &p->root_worker;
1007 }
1008 
1009 static void remove_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
1010   worker->prev->next = worker->next;
1011   worker->next->prev = worker->prev;
1012 }
1013 
1014 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
1015   if (pollset_has_workers(p)) {
1016     grpc_pollset_worker* w = p->root_worker.next;
1017     remove_worker(p, w);
1018     return w;
1019   } else {
1020     return nullptr;
1021   }
1022 }
1023 
1024 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
1025   worker->next = &p->root_worker;
1026   worker->prev = worker->next->prev;
1027   worker->prev->next = worker->next->prev = worker;
1028 }
1029 
1030 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
1031   worker->prev = &p->root_worker;
1032   worker->next = worker->prev->next;
1033   worker->prev->next = worker->next->prev = worker;
1034 }
1035 
1036 /* p->mu must be held before calling this function */
1037 static grpc_error* pollset_kick(grpc_pollset* p,
1038                                 grpc_pollset_worker* specific_worker) {
1039   GPR_TIMER_SCOPE("pollset_kick", 0);
1040   grpc_error* error = GRPC_ERROR_NONE;
1041   GRPC_STATS_INC_POLLSET_KICK();
1042   const char* err_desc = "Kick Failure";
1043   grpc_pollset_worker* worker = specific_worker;
1044   if (worker != nullptr) {
1045     if (worker == GRPC_POLLSET_KICK_BROADCAST) {
1046       if (pollset_has_workers(p)) {
1047         GPR_TIMER_SCOPE("pollset_kick.broadcast", 0);
1048         for (worker = p->root_worker.next; worker != &p->root_worker;
1049              worker = worker->next) {
1050           if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
1051             append_error(&error, pollset_worker_kick(worker), err_desc);
1052           }
1053         }
1054       } else {
1055         p->kicked_without_pollers = true;
1056       }
1057     } else {
1058       GPR_TIMER_MARK("kicked_specifically", 0);
1059       if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
1060         append_error(&error, pollset_worker_kick(worker), err_desc);
1061       }
1062     }
1063   } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1064     /* Since worker == NULL, it means that we can kick "any" worker on this
1065        pollset 'p'. If 'p' happens to be the same pollset this thread is
1066        currently polling (i.e in pollset_work() function), then there is no need
1067        to kick any other worker since the current thread can just absorb the
1068        kick. This is the reason why we enter this case only when
1069        g_current_thread_pollset is != p */
1070 
1071     GPR_TIMER_MARK("kick_anonymous", 0);
1072     worker = pop_front_worker(p);
1073     if (worker != nullptr) {
1074       GPR_TIMER_MARK("finally_kick", 0);
1075       push_back_worker(p, worker);
1076       append_error(&error, pollset_worker_kick(worker), err_desc);
1077     } else {
1078       GPR_TIMER_MARK("kicked_no_pollers", 0);
1079       p->kicked_without_pollers = true;
1080     }
1081   }
1082 
1083   GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1084   return error;
1085 }
1086 
1087 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
1088   gpr_mu_init(&pollset->po.mu);
1089   *mu = &pollset->po.mu;
1090   pollset->po.pi = nullptr;
1091 #ifndef NDEBUG
1092   pollset->po.obj_type = POLL_OBJ_POLLSET;
1093 #endif
1094 
1095   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
1096   pollset->kicked_without_pollers = false;
1097 
1098   pollset->shutting_down = false;
1099   pollset->finish_shutdown_called = false;
1100   pollset->shutdown_done = nullptr;
1101 }
1102 
1103 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
1104   if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
1105   grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
1106   if (delta > INT_MAX)
1107     return INT_MAX;
1108   else if (delta < 0)
1109     return 0;
1110   else
1111     return static_cast<int>(delta);
1112 }
1113 
1114 static void pollset_release_polling_island(grpc_pollset* ps,
1115                                            const char* reason) {
1116   if (ps->po.pi != nullptr) {
1117     PI_UNREF(ps->po.pi, reason);
1118   }
1119   ps->po.pi = nullptr;
1120 }
1121 
1122 static void finish_shutdown_locked(grpc_pollset* pollset) {
1123   /* The pollset cannot have any workers if we are at this stage */
1124   GPR_ASSERT(!pollset_has_workers(pollset));
1125 
1126   pollset->finish_shutdown_called = true;
1127 
1128   /* Release the ref and set pollset->po.pi to NULL */
1129   pollset_release_polling_island(pollset, "ps_shutdown");
1130   GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
1131 }
1132 
1133 /* pollset->po.mu lock must be held by the caller before calling this */
1134 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1135   GPR_TIMER_SCOPE("pollset_shutdown", 0);
1136   GPR_ASSERT(!pollset->shutting_down);
1137   pollset->shutting_down = true;
1138   pollset->shutdown_done = closure;
1139   pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1140 
1141   /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1142      because it would release the underlying polling island. In such a case, we
1143      let the last worker call finish_shutdown_locked() from pollset_work() */
1144   if (!pollset_has_workers(pollset)) {
1145     GPR_ASSERT(!pollset->finish_shutdown_called);
1146     GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1147     finish_shutdown_locked(pollset);
1148   }
1149 }
1150 
1151 /* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1152  * than destroying the mutexes, there is nothing special that needs to be done
1153  * here */
1154 static void pollset_destroy(grpc_pollset* pollset) {
1155   GPR_ASSERT(!pollset_has_workers(pollset));
1156   gpr_mu_destroy(&pollset->po.mu);
1157 }
1158 
1159 #define GRPC_EPOLL_MAX_EVENTS 100
1160 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1161 static void pollset_work_and_unlock(grpc_pollset* pollset,
1162                                     grpc_pollset_worker* worker, int timeout_ms,
1163                                     sigset_t* sig_mask, grpc_error** error) {
1164   GPR_TIMER_SCOPE("pollset_work_and_unlock", 0);
1165   struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
1166   int epoll_fd = -1;
1167   int ep_rv;
1168   polling_island* pi = nullptr;
1169   char* err_msg;
1170   const char* err_desc = "pollset_work_and_unlock";
1171 
1172   /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1173      latest polling island pointed by pollset->po.pi
1174 
1175      Since epoll_fd is immutable, we can read it without obtaining the polling
1176      island lock. There is however a possibility that the polling island (from
1177      which we got the epoll_fd) got merged with another island while we are
1178      in this function. This is still okay because in such a case, we will wakeup
1179      right-away from epoll_wait() and pick up the latest polling_island the next
1180      this function (i.e pollset_work_and_unlock()) is called */
1181 
1182   if (pollset->po.pi == nullptr) {
1183     pollset->po.pi = polling_island_create(nullptr, error);
1184     if (pollset->po.pi == nullptr) {
1185       return; /* Fatal error. We cannot continue */
1186     }
1187 
1188     PI_ADD_REF(pollset->po.pi, "ps");
1189     GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1190                        (void*)pollset, (void*)pollset->po.pi);
1191   }
1192 
1193   pi = polling_island_maybe_get_latest(pollset->po.pi);
1194   epoll_fd = pi->epoll_fd;
1195 
1196   /* Update the pollset->po.pi since the island being pointed by
1197      pollset->po.pi maybe older than the one pointed by pi) */
1198   if (pollset->po.pi != pi) {
1199     /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1200        polling island to be deleted */
1201     PI_ADD_REF(pi, "ps");
1202     PI_UNREF(pollset->po.pi, "ps");
1203     pollset->po.pi = pi;
1204   }
1205 
1206   /* Add an extra ref so that the island does not get destroyed (which means
1207      the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1208      epoll_fd */
1209   PI_ADD_REF(pi, "ps_work");
1210   gpr_mu_unlock(&pollset->po.mu);
1211 
1212   gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1213   g_current_thread_polling_island = pi;
1214 
1215   GRPC_SCHEDULING_START_BLOCKING_REGION;
1216   GRPC_STATS_INC_SYSCALL_POLL();
1217   ep_rv =
1218       epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
1219   GRPC_SCHEDULING_END_BLOCKING_REGION;
1220   if (ep_rv < 0) {
1221     if (errno != EINTR) {
1222       gpr_asprintf(&err_msg,
1223                    "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1224                    epoll_fd, errno, strerror(errno));
1225       append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
1226     } else {
1227       /* We were interrupted. Save an interation by doing a zero timeout
1228          epoll_wait to see if there are any other events of interest */
1229       GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
1230                          (void*)pollset, (void*)worker);
1231       ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
1232     }
1233   }
1234 
1235 #ifdef GRPC_TSAN
1236   /* See the definition of g_poll_sync for more details */
1237   gpr_atm_acq_load(&g_epoll_sync);
1238 #endif /* defined(GRPC_TSAN) */
1239 
1240   for (int i = 0; i < ep_rv; ++i) {
1241     void* data_ptr = ep_ev[i].data.ptr;
1242     if (data_ptr == &polling_island_wakeup_fd) {
1243       GRPC_POLLING_TRACE(
1244           "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1245           "%d) got merged",
1246           (void*)pollset, (void*)worker, epoll_fd);
1247       /* This means that our polling island is merged with a different
1248          island. We do not have to do anything here since the subsequent call
1249          to the function pollset_work_and_unlock() will pick up the correct
1250          epoll_fd */
1251     } else {
1252       grpc_fd* fd = reinterpret_cast<grpc_fd*>(
1253           reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
1254       bool track_err =
1255           reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
1256       bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
1257       bool error = (ep_ev[i].events & EPOLLERR) != 0;
1258       bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
1259       bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
1260       bool err_fallback = error && !track_err;
1261 
1262       if (error && !err_fallback) {
1263         fd_has_errors(fd);
1264       }
1265       if (read_ev || cancel || err_fallback) {
1266         fd_become_readable(fd);
1267       }
1268       if (write_ev || cancel || err_fallback) {
1269         fd_become_writable(fd);
1270       }
1271     }
1272   }
1273 
1274   g_current_thread_polling_island = nullptr;
1275   gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1276 
1277   GPR_ASSERT(pi != nullptr);
1278 
1279   /* Before leaving, release the extra ref we added to the polling island. It
1280      is important to use "pi" here (i.e our old copy of pollset->po.pi
1281      that we got before releasing the polling island lock). This is because
1282      pollset->po.pi pointer might get udpated in other parts of the
1283      code when there is an island merge while we are doing epoll_wait() above */
1284   PI_UNREF(pi, "ps_work");
1285 }
1286 
1287 /* pollset->po.mu lock must be held by the caller before calling this.
1288    The function pollset_work() may temporarily release the lock (pollset->po.mu)
1289    during the course of its execution but it will always re-acquire the lock and
1290    ensure that it is held by the time the function returns */
1291 static grpc_error* pollset_work(grpc_pollset* pollset,
1292                                 grpc_pollset_worker** worker_hdl,
1293                                 grpc_millis deadline) {
1294   GPR_TIMER_SCOPE("pollset_work", 0);
1295   grpc_error* error = GRPC_ERROR_NONE;
1296   int timeout_ms = poll_deadline_to_millis_timeout(deadline);
1297 
1298   sigset_t new_mask;
1299 
1300   grpc_pollset_worker worker;
1301   worker.next = worker.prev = nullptr;
1302   worker.pt_id = pthread_self();
1303   gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
1304 
1305   if (worker_hdl) *worker_hdl = &worker;
1306 
1307   gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1308   gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1309 
1310   if (pollset->kicked_without_pollers) {
1311     /* If the pollset was kicked without pollers, pretend that the current
1312        worker got the kick and skip polling. A kick indicates that there is some
1313        work that needs attention like an event on the completion queue or an
1314        alarm */
1315     GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1316     pollset->kicked_without_pollers = 0;
1317   } else if (!pollset->shutting_down) {
1318     /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
1319        (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1320        worker that there is some pending work that needs immediate attention
1321        (like an event on the completion queue, or a polling island merge that
1322        results in a new epoll-fd to wait on) and that the worker should not
1323        spend time waiting in epoll_pwait().
1324 
1325        A worker can be kicked anytime from the point it is added to the pollset
1326        via push_front_worker() (or push_back_worker()) to the point it is
1327        removed via remove_worker().
1328        If the worker is kicked before/during it calls epoll_pwait(), it should
1329        immediately exit from epoll_wait(). If the worker is kicked after it
1330        returns from epoll_wait(), then nothing really needs to be done.
1331 
1332        To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
1333        times *except* when it is in epoll_pwait(). This way, the worker never
1334        misses acting on a kick */
1335 
1336     if (!g_initialized_sigmask) {
1337       sigemptyset(&new_mask);
1338       sigaddset(&new_mask, grpc_wakeup_signal);
1339       pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1340       sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1341       g_initialized_sigmask = true;
1342       /* new_mask:       The new thread mask which blocks 'grpc_wakeup_signal'.
1343                          This is the mask used at all times *except during
1344                          epoll_wait()*"
1345          g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
1346                          this is the mask to use *during epoll_wait()*
1347 
1348          The new_mask is set on the worker before it is added to the pollset
1349          (i.e before it can be kicked) */
1350     }
1351 
1352     push_front_worker(pollset, &worker); /* Add worker to pollset */
1353 
1354     pollset_work_and_unlock(pollset, &worker, timeout_ms, &g_orig_sigmask,
1355                             &error);
1356     grpc_core::ExecCtx::Get()->Flush();
1357 
1358     gpr_mu_lock(&pollset->po.mu);
1359 
1360     /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1361        longer going to use this worker */
1362     remove_worker(pollset, &worker);
1363   }
1364 
1365   /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1366      false at this point) and the pollset is shutting down, we may have to
1367      finish the shutdown process by calling finish_shutdown_locked().
1368      See pollset_shutdown() for more details.
1369 
1370      Note: Continuing to access pollset here is safe; it is the caller's
1371      responsibility to not destroy a pollset when it has outstanding calls to
1372      pollset_work() */
1373   if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1374       !pollset->finish_shutdown_called) {
1375     GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1376     finish_shutdown_locked(pollset);
1377 
1378     gpr_mu_unlock(&pollset->po.mu);
1379     grpc_core::ExecCtx::Get()->Flush();
1380     gpr_mu_lock(&pollset->po.mu);
1381   }
1382 
1383   if (worker_hdl) *worker_hdl = nullptr;
1384 
1385   gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1386   gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
1387 
1388   GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1389   return error;
1390 }
1391 
1392 static void add_poll_object(poll_obj* bag, poll_obj_type bag_type,
1393                             poll_obj* item, poll_obj_type item_type) {
1394   GPR_TIMER_SCOPE("add_poll_object", 0);
1395 
1396 #ifndef NDEBUG
1397   GPR_ASSERT(item->obj_type == item_type);
1398   GPR_ASSERT(bag->obj_type == bag_type);
1399 #endif
1400 
1401   grpc_error* error = GRPC_ERROR_NONE;
1402   polling_island* pi_new = nullptr;
1403 
1404   gpr_mu_lock(&bag->mu);
1405   gpr_mu_lock(&item->mu);
1406 
1407 retry:
1408   /*
1409    * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1410    * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1411    *    a refcount of 2) and point item->pi and bag->pi to the new island
1412    * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1413    *    the other's non-NULL pi
1414    * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1415    *    polling islands and update item->pi and bag->pi to point to the new
1416    *    island
1417    */
1418 
1419   /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1420    * orphaned */
1421   if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1422     gpr_mu_unlock(&item->mu);
1423     gpr_mu_unlock(&bag->mu);
1424     return;
1425   }
1426 
1427   if (item->pi == bag->pi) {
1428     pi_new = item->pi;
1429     if (pi_new == nullptr) {
1430       /* GPR_ASSERT(item->pi == bag->pi == NULL) */
1431 
1432       /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1433        * we need to do some extra work to make TSAN happy */
1434       if (item_type == POLL_OBJ_FD) {
1435         /* Unlock before creating a new polling island: the polling island will
1436            create a workqueue which creates a file descriptor, and holding an fd
1437            lock here can eventually cause a loop to appear to TSAN (making it
1438            unhappy). We don't think it's a real loop (there's an epoch point
1439            where that loop possibility disappears), but the advantages of
1440            keeping TSAN happy outweigh any performance advantage we might have
1441            by keeping the lock held. */
1442         gpr_mu_unlock(&item->mu);
1443         pi_new = polling_island_create(FD_FROM_PO(item), &error);
1444         gpr_mu_lock(&item->mu);
1445 
1446         /* Need to reverify any assumptions made between the initial lock and
1447            getting to this branch: if they've changed, we need to throw away our
1448            work and figure things out again. */
1449         if (item->pi != nullptr) {
1450           GRPC_POLLING_TRACE(
1451               "add_poll_object: Raced creating new polling island. pi_new: %p "
1452               "(fd: %d, %s: %p)",
1453               (void*)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1454               (void*)bag);
1455           /* No need to lock 'pi_new' here since this is a new polling island
1456              and no one has a reference to it yet */
1457           polling_island_remove_all_fds_locked(pi_new, true, &error);
1458 
1459           /* Ref and unref so that the polling island gets deleted during unref
1460            */
1461           PI_ADD_REF(pi_new, "dance_of_destruction");
1462           PI_UNREF(pi_new, "dance_of_destruction");
1463           goto retry;
1464         }
1465       } else {
1466         pi_new = polling_island_create(nullptr, &error);
1467       }
1468 
1469       GRPC_POLLING_TRACE(
1470           "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1471           "%s: %p)",
1472           (void*)pi_new, poll_obj_string(item_type), (void*)item,
1473           poll_obj_string(bag_type), (void*)bag);
1474     } else {
1475       GRPC_POLLING_TRACE(
1476           "add_poll_object: Same polling island. pi: %p (%s, %s)",
1477           (void*)pi_new, poll_obj_string(item_type), poll_obj_string(bag_type));
1478     }
1479   } else if (item->pi == nullptr) {
1480     /* GPR_ASSERT(bag->pi != NULL) */
1481     /* Make pi_new point to latest pi*/
1482     pi_new = polling_island_lock(bag->pi);
1483 
1484     if (item_type == POLL_OBJ_FD) {
1485       grpc_fd* fd = FD_FROM_PO(item);
1486       polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1487     }
1488 
1489     gpr_mu_unlock(&pi_new->mu);
1490     GRPC_POLLING_TRACE(
1491         "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1492         "bag(%s): %p)",
1493         (void*)pi_new, poll_obj_string(item_type), (void*)item,
1494         poll_obj_string(bag_type), (void*)bag);
1495   } else if (bag->pi == nullptr) {
1496     /* GPR_ASSERT(item->pi != NULL) */
1497     /* Make pi_new to point to latest pi */
1498     pi_new = polling_island_lock(item->pi);
1499     gpr_mu_unlock(&pi_new->mu);
1500     GRPC_POLLING_TRACE(
1501         "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1502         "bag(%s): %p)",
1503         (void*)pi_new, poll_obj_string(item_type), (void*)item,
1504         poll_obj_string(bag_type), (void*)bag);
1505   } else {
1506     pi_new = polling_island_merge(item->pi, bag->pi, &error);
1507     GRPC_POLLING_TRACE(
1508         "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1509         "bag(%s): %p)",
1510         (void*)pi_new, poll_obj_string(item_type), (void*)item,
1511         poll_obj_string(bag_type), (void*)bag);
1512   }
1513 
1514   /* At this point, pi_new is the polling island that both item->pi and bag->pi
1515      MUST be pointing to */
1516 
1517   if (item->pi != pi_new) {
1518     PI_ADD_REF(pi_new, poll_obj_string(item_type));
1519     if (item->pi != nullptr) {
1520       PI_UNREF(item->pi, poll_obj_string(item_type));
1521     }
1522     item->pi = pi_new;
1523   }
1524 
1525   if (bag->pi != pi_new) {
1526     PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1527     if (bag->pi != nullptr) {
1528       PI_UNREF(bag->pi, poll_obj_string(bag_type));
1529     }
1530     bag->pi = pi_new;
1531   }
1532 
1533   gpr_mu_unlock(&item->mu);
1534   gpr_mu_unlock(&bag->mu);
1535 
1536   GRPC_LOG_IF_ERROR("add_poll_object", error);
1537 }
1538 
1539 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
1540   add_poll_object(&pollset->po, POLL_OBJ_POLLSET, &fd->po, POLL_OBJ_FD);
1541 }
1542 
1543 /*******************************************************************************
1544  * Pollset-set Definitions
1545  */
1546 
1547 static grpc_pollset_set* pollset_set_create(void) {
1548   grpc_pollset_set* pss =
1549       static_cast<grpc_pollset_set*>(gpr_malloc(sizeof(*pss)));
1550   gpr_mu_init(&pss->po.mu);
1551   pss->po.pi = nullptr;
1552 #ifndef NDEBUG
1553   pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1554 #endif
1555   return pss;
1556 }
1557 
1558 static void pollset_set_destroy(grpc_pollset_set* pss) {
1559   gpr_mu_destroy(&pss->po.mu);
1560 
1561   if (pss->po.pi != nullptr) {
1562     PI_UNREF(pss->po.pi, "pss_destroy");
1563   }
1564 
1565   gpr_free(pss);
1566 }
1567 
1568 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
1569   add_poll_object(&pss->po, POLL_OBJ_POLLSET_SET, &fd->po, POLL_OBJ_FD);
1570 }
1571 
1572 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
1573   /* Nothing to do */
1574 }
1575 
1576 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
1577   add_poll_object(&pss->po, POLL_OBJ_POLLSET_SET, &ps->po, POLL_OBJ_POLLSET);
1578 }
1579 
1580 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
1581   /* Nothing to do */
1582 }
1583 
1584 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1585                                         grpc_pollset_set* item) {
1586   add_poll_object(&bag->po, POLL_OBJ_POLLSET_SET, &item->po,
1587                   POLL_OBJ_POLLSET_SET);
1588 }
1589 
1590 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1591                                         grpc_pollset_set* item) {
1592   /* Nothing to do */
1593 }
1594 
1595 /* Test helper functions
1596  * */
1597 void* grpc_fd_get_polling_island(grpc_fd* fd) {
1598   polling_island* pi;
1599 
1600   gpr_mu_lock(&fd->po.mu);
1601   pi = fd->po.pi;
1602   gpr_mu_unlock(&fd->po.mu);
1603 
1604   return pi;
1605 }
1606 
1607 void* grpc_pollset_get_polling_island(grpc_pollset* ps) {
1608   polling_island* pi;
1609 
1610   gpr_mu_lock(&ps->po.mu);
1611   pi = ps->po.pi;
1612   gpr_mu_unlock(&ps->po.mu);
1613 
1614   return pi;
1615 }
1616 
1617 bool grpc_are_polling_islands_equal(void* p, void* q) {
1618   polling_island* p1 = static_cast<polling_island*>(p);
1619   polling_island* p2 = static_cast<polling_island*>(q);
1620 
1621   /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1622      latest polling islands in their respective linked lists */
1623   polling_island_lock_pair(&p1, &p2);
1624   polling_island_unlock_pair(p1, p2);
1625 
1626   return p1 == p2;
1627 }
1628 
1629 /*******************************************************************************
1630  * Event engine binding
1631  */
1632 
1633 static void shutdown_engine(void) {
1634   fd_global_shutdown();
1635   pollset_global_shutdown();
1636   polling_island_global_shutdown();
1637 }
1638 
1639 static const grpc_event_engine_vtable vtable = {
1640     sizeof(grpc_pollset),
1641     true,
1642 
1643     fd_create,
1644     fd_wrapped_fd,
1645     fd_orphan,
1646     fd_shutdown,
1647     fd_notify_on_read,
1648     fd_notify_on_write,
1649     fd_notify_on_error,
1650     fd_become_readable,
1651     fd_become_writable,
1652     fd_has_errors,
1653     fd_is_shutdown,
1654 
1655     pollset_init,
1656     pollset_shutdown,
1657     pollset_destroy,
1658     pollset_work,
1659     pollset_kick,
1660     pollset_add_fd,
1661 
1662     pollset_set_create,
1663     pollset_set_destroy,
1664     pollset_set_add_pollset,
1665     pollset_set_del_pollset,
1666     pollset_set_add_pollset_set,
1667     pollset_set_del_pollset_set,
1668     pollset_set_add_fd,
1669     pollset_set_del_fd,
1670 
1671     shutdown_engine,
1672 };
1673 
1674 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1675  * Create a dummy epoll_fd to make sure epoll support is available */
1676 static bool is_epoll_available() {
1677   int fd = epoll_create1(EPOLL_CLOEXEC);
1678   if (fd < 0) {
1679     gpr_log(
1680         GPR_ERROR,
1681         "epoll_create1 failed with error: %d. Not using epoll polling engine",
1682         fd);
1683     return false;
1684   }
1685   close(fd);
1686   return true;
1687 }
1688 
1689 const grpc_event_engine_vtable* grpc_init_epollsig_linux(
1690     bool explicit_request) {
1691   /* If use of signals is disabled, we cannot use epoll engine*/
1692   if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1693     gpr_log(GPR_ERROR, "Skipping epollsig because use of signals is disabled.");
1694     return nullptr;
1695   }
1696 
1697   if (!grpc_has_wakeup_fd()) {
1698     gpr_log(GPR_ERROR, "Skipping epollsig because of no wakeup fd.");
1699     return nullptr;
1700   }
1701 
1702   if (!is_epoll_available()) {
1703     gpr_log(GPR_ERROR, "Skipping epollsig because epoll is unavailable.");
1704     return nullptr;
1705   }
1706 
1707   if (!is_grpc_wakeup_signal_initialized) {
1708     if (explicit_request) {
1709       grpc_use_signal(SIGRTMIN + 6);
1710     } else {
1711       gpr_log(GPR_ERROR,
1712               "Skipping epollsig because uninitialized wakeup signal.");
1713       return nullptr;
1714     }
1715   }
1716 
1717   fd_global_init();
1718 
1719   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1720     return nullptr;
1721   }
1722 
1723   if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1724                          polling_island_global_init())) {
1725     return nullptr;
1726   }
1727 
1728   return &vtable;
1729 }
1730 
1731 #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
1732 #if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG)
1733 #include "src/core/lib/iomgr/ev_epollsig_linux.h"
1734 /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
1735    epoll_create1 is not available. Return NULL */
1736 const grpc_event_engine_vtable* grpc_init_epollsig_linux(
1737     bool explicit_request) {
1738   return nullptr;
1739 }
1740 #endif /* defined(GRPC_POSIX_SOCKET) */
1741 
1742 void grpc_use_signal(int signum) {}
1743 #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
1744