1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19 
20 #include "src/core/lib/surface/completion_queue.h"
21 
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <string.h>
25 
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpc/support/time.h>
31 
32 #include "src/core/lib/debug/stats.h"
33 #include "src/core/lib/gpr/spinlock.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gpr/tls.h"
36 #include "src/core/lib/iomgr/pollset.h"
37 #include "src/core/lib/iomgr/timer.h"
38 #include "src/core/lib/profiling/timers.h"
39 #include "src/core/lib/surface/api_trace.h"
40 #include "src/core/lib/surface/call.h"
41 #include "src/core/lib/surface/event_string.h"
42 
43 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
44 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
45 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
46 
47 // Specifies a cq thread local cache.
48 // The first event that occurs on a thread
49 // with a cq cache will go into that cache, and
50 // will only be returned on the thread that initialized the cache.
51 // NOTE: Only one event will ever be cached.
52 GPR_TLS_DECL(g_cached_event);
53 GPR_TLS_DECL(g_cached_cq);
54 
55 typedef struct {
56   grpc_pollset_worker** worker;
57   void* tag;
58 } plucker;
59 
60 typedef struct {
61   bool can_get_pollset;
62   bool can_listen;
63   size_t (*size)(void);
64   void (*init)(grpc_pollset* pollset, gpr_mu** mu);
65   grpc_error* (*kick)(grpc_pollset* pollset,
66                       grpc_pollset_worker* specific_worker);
67   grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
68                       grpc_millis deadline);
69   void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
70   void (*destroy)(grpc_pollset* pollset);
71 } cq_poller_vtable;
72 
73 typedef struct non_polling_worker {
74   gpr_cv cv;
75   bool kicked;
76   struct non_polling_worker* next;
77   struct non_polling_worker* prev;
78 } non_polling_worker;
79 
80 typedef struct {
81   gpr_mu mu;
82   non_polling_worker* root;
83   grpc_closure* shutdown;
84 } non_polling_poller;
85 
non_polling_poller_size(void)86 static size_t non_polling_poller_size(void) {
87   return sizeof(non_polling_poller);
88 }
89 
non_polling_poller_init(grpc_pollset * pollset,gpr_mu ** mu)90 static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
91   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
92   gpr_mu_init(&npp->mu);
93   *mu = &npp->mu;
94 }
95 
non_polling_poller_destroy(grpc_pollset * pollset)96 static void non_polling_poller_destroy(grpc_pollset* pollset) {
97   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
98   gpr_mu_destroy(&npp->mu);
99 }
100 
non_polling_poller_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)101 static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
102                                            grpc_pollset_worker** worker,
103                                            grpc_millis deadline) {
104   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
105   if (npp->shutdown) return GRPC_ERROR_NONE;
106   non_polling_worker w;
107   gpr_cv_init(&w.cv);
108   if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
109   if (npp->root == nullptr) {
110     npp->root = w.next = w.prev = &w;
111   } else {
112     w.next = npp->root;
113     w.prev = w.next->prev;
114     w.next->prev = w.prev->next = &w;
115   }
116   w.kicked = false;
117   gpr_timespec deadline_ts =
118       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
119   while (!npp->shutdown && !w.kicked &&
120          !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
121     ;
122   grpc_core::ExecCtx::Get()->InvalidateNow();
123   if (&w == npp->root) {
124     npp->root = w.next;
125     if (&w == npp->root) {
126       if (npp->shutdown) {
127         GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
128       }
129       npp->root = nullptr;
130     }
131   }
132   w.next->prev = w.prev;
133   w.prev->next = w.next;
134   gpr_cv_destroy(&w.cv);
135   if (worker != nullptr) *worker = nullptr;
136   return GRPC_ERROR_NONE;
137 }
138 
non_polling_poller_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)139 static grpc_error* non_polling_poller_kick(
140     grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
141   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
142   if (specific_worker == nullptr)
143     specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
144   if (specific_worker != nullptr) {
145     non_polling_worker* w =
146         reinterpret_cast<non_polling_worker*>(specific_worker);
147     if (!w->kicked) {
148       w->kicked = true;
149       gpr_cv_signal(&w->cv);
150     }
151   }
152   return GRPC_ERROR_NONE;
153 }
154 
non_polling_poller_shutdown(grpc_pollset * pollset,grpc_closure * closure)155 static void non_polling_poller_shutdown(grpc_pollset* pollset,
156                                         grpc_closure* closure) {
157   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
158   GPR_ASSERT(closure != nullptr);
159   p->shutdown = closure;
160   if (p->root == nullptr) {
161     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
162   } else {
163     non_polling_worker* w = p->root;
164     do {
165       gpr_cv_signal(&w->cv);
166       w = w->next;
167     } while (w != p->root);
168   }
169 }
170 
171 static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
172     /* GRPC_CQ_DEFAULT_POLLING */
173     {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
174      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
175     /* GRPC_CQ_NON_LISTENING */
176     {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
177      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
178     /* GRPC_CQ_NON_POLLING */
179     {false, false, non_polling_poller_size, non_polling_poller_init,
180      non_polling_poller_kick, non_polling_poller_work,
181      non_polling_poller_shutdown, non_polling_poller_destroy},
182 };
183 
184 typedef struct cq_vtable {
185   grpc_cq_completion_type cq_completion_type;
186   size_t data_size;
187   void (*init)(void* data,
188                grpc_experimental_completion_queue_functor* shutdown_callback);
189   void (*shutdown)(grpc_completion_queue* cq);
190   void (*destroy)(void* data);
191   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
192   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
193                  void (*done)(void* done_arg, grpc_cq_completion* storage),
194                  void* done_arg, grpc_cq_completion* storage);
195   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
196                      void* reserved);
197   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
198                       gpr_timespec deadline, void* reserved);
199 } cq_vtable;
200 
201 /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
202  * (a lockfree multiproducer single consumer queue). It uses a queue_lock
203  * to support multiple consumers.
204  * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
205 typedef struct grpc_cq_event_queue {
206   /* Spinlock to serialize consumers i.e pop() operations */
207   gpr_spinlock queue_lock;
208 
209   gpr_mpscq queue;
210 
211   /* A lazy counter of number of items in the queue. This is NOT atomically
212      incremented/decremented along with push/pop operations and hence is only
213      eventually consistent */
214   gpr_atm num_queue_items;
215 } grpc_cq_event_queue;
216 
217 typedef struct cq_next_data {
218   /** Completed events for completion-queues of type GRPC_CQ_NEXT */
219   grpc_cq_event_queue queue;
220 
221   /** Counter of how many things have ever been queued on this completion queue
222       useful for avoiding locks to check the queue */
223   gpr_atm things_queued_ever;
224 
225   /* Number of outstanding events (+1 if not shut down) */
226   gpr_atm pending_events;
227 
228   /** 0 initially. 1 once we initiated shutdown */
229   bool shutdown_called;
230 } cq_next_data;
231 
232 typedef struct cq_pluck_data {
233   /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
234   grpc_cq_completion completed_head;
235   grpc_cq_completion* completed_tail;
236 
237   /** Number of pending events (+1 if we're not shutdown) */
238   gpr_atm pending_events;
239 
240   /** Counter of how many things have ever been queued on this completion queue
241       useful for avoiding locks to check the queue */
242   gpr_atm things_queued_ever;
243 
244   /** 0 initially. 1 once we completed shutting */
245   /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
246    * (pending_events == 0). So consider removing this in future and use
247    * pending_events */
248   gpr_atm shutdown;
249 
250   /** 0 initially. 1 once we initiated shutdown */
251   bool shutdown_called;
252 
253   int num_pluckers;
254   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
255 } cq_pluck_data;
256 
257 typedef struct cq_callback_data {
258   /** No actual completed events queue, unlike other types */
259 
260   /** Number of pending events (+1 if we're not shutdown) */
261   gpr_atm pending_events;
262 
263   /** Counter of how many things have ever been queued on this completion queue
264       useful for avoiding locks to check the queue */
265   gpr_atm things_queued_ever;
266 
267   /** 0 initially. 1 once we initiated shutdown */
268   bool shutdown_called;
269 
270   /** A callback that gets invoked when the CQ completes shutdown */
271   grpc_experimental_completion_queue_functor* shutdown_callback;
272 } cq_callback_data;
273 
274 /* Completion queue structure */
275 struct grpc_completion_queue {
276   /** Once owning_refs drops to zero, we will destroy the cq */
277   gpr_refcount owning_refs;
278 
279   gpr_mu* mu;
280 
281   const cq_vtable* vtable;
282   const cq_poller_vtable* poller_vtable;
283 
284 #ifndef NDEBUG
285   void** outstanding_tags;
286   size_t outstanding_tag_count;
287   size_t outstanding_tag_capacity;
288 #endif
289 
290   grpc_closure pollset_shutdown_done;
291   int num_polls;
292 };
293 
294 /* Forward declarations */
295 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
296 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
297 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
298 static void cq_shutdown_next(grpc_completion_queue* cq);
299 static void cq_shutdown_pluck(grpc_completion_queue* cq);
300 static void cq_shutdown_callback(grpc_completion_queue* cq);
301 
302 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
303 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
304 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
305 
306 // A cq_end_op function is called when an operation on a given CQ with
307 // a given tag has completed. The storage argument is a reference to the
308 // space reserved for this completion as it is placed into the corresponding
309 // queue. The done argument is a callback that will be invoked when it is
310 // safe to free up that storage. The storage MUST NOT be freed until the
311 // done callback is invoked.
312 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
313                                grpc_error* error,
314                                void (*done)(void* done_arg,
315                                             grpc_cq_completion* storage),
316                                void* done_arg, grpc_cq_completion* storage);
317 
318 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
319                                 grpc_error* error,
320                                 void (*done)(void* done_arg,
321                                              grpc_cq_completion* storage),
322                                 void* done_arg, grpc_cq_completion* storage);
323 
324 static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
325                                    grpc_error* error,
326                                    void (*done)(void* done_arg,
327                                                 grpc_cq_completion* storage),
328                                    void* done_arg, grpc_cq_completion* storage);
329 
330 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
331                           void* reserved);
332 
333 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
334                            gpr_timespec deadline, void* reserved);
335 
336 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
337 static void cq_init_next(
338     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
339 static void cq_init_pluck(
340     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
341 static void cq_init_callback(
342     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
343 static void cq_destroy_next(void* data);
344 static void cq_destroy_pluck(void* data);
345 static void cq_destroy_callback(void* data);
346 
347 /* Completion queue vtables based on the completion-type */
348 static const cq_vtable g_cq_vtable[] = {
349     /* GRPC_CQ_NEXT */
350     {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
351      cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
352      nullptr},
353     /* GRPC_CQ_PLUCK */
354     {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
355      cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
356      cq_pluck},
357     /* GRPC_CQ_CALLBACK */
358     {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
359      cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
360      cq_end_op_for_callback, nullptr, nullptr},
361 };
362 
363 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
364 #define POLLSET_FROM_CQ(cq) \
365   ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
366 
367 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
368 
369 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)                       \
370   if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() ||        \
371                                    (event)->type != GRPC_QUEUE_TIMEOUT)) { \
372     char* _ev = grpc_event_string(event);                                  \
373     gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev);                    \
374     gpr_free(_ev);                                                         \
375   }
376 
377 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
378 
grpc_cq_global_init()379 void grpc_cq_global_init() {
380   gpr_tls_init(&g_cached_event);
381   gpr_tls_init(&g_cached_cq);
382 }
383 
grpc_completion_queue_thread_local_cache_init(grpc_completion_queue * cq)384 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
385   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
386     gpr_tls_set(&g_cached_event, (intptr_t)0);
387     gpr_tls_set(&g_cached_cq, (intptr_t)cq);
388   }
389 }
390 
grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue * cq,void ** tag,int * ok)391 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
392                                                    void** tag, int* ok) {
393   grpc_cq_completion* storage =
394       (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
395   int ret = 0;
396   if (storage != nullptr &&
397       (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
398     *tag = storage->tag;
399     grpc_core::ExecCtx exec_ctx;
400     *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
401     storage->done(storage->done_arg, storage);
402     ret = 1;
403     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
404     if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
405       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
406       gpr_mu_lock(cq->mu);
407       cq_finish_shutdown_next(cq);
408       gpr_mu_unlock(cq->mu);
409       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
410     }
411   }
412   gpr_tls_set(&g_cached_event, (intptr_t)0);
413   gpr_tls_set(&g_cached_cq, (intptr_t)0);
414 
415   return ret;
416 }
417 
cq_event_queue_init(grpc_cq_event_queue * q)418 static void cq_event_queue_init(grpc_cq_event_queue* q) {
419   gpr_mpscq_init(&q->queue);
420   q->queue_lock = GPR_SPINLOCK_INITIALIZER;
421   gpr_atm_no_barrier_store(&q->num_queue_items, 0);
422 }
423 
cq_event_queue_destroy(grpc_cq_event_queue * q)424 static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
425   gpr_mpscq_destroy(&q->queue);
426 }
427 
cq_event_queue_push(grpc_cq_event_queue * q,grpc_cq_completion * c)428 static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
429   gpr_mpscq_push(&q->queue, reinterpret_cast<gpr_mpscq_node*>(c));
430   return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
431 }
432 
cq_event_queue_pop(grpc_cq_event_queue * q)433 static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
434   grpc_cq_completion* c = nullptr;
435 
436   if (gpr_spinlock_trylock(&q->queue_lock)) {
437     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
438 
439     bool is_empty = false;
440     c = reinterpret_cast<grpc_cq_completion*>(
441         gpr_mpscq_pop_and_check_end(&q->queue, &is_empty));
442     gpr_spinlock_unlock(&q->queue_lock);
443 
444     if (c == nullptr && !is_empty) {
445       GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
446     }
447   } else {
448     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
449   }
450 
451   if (c) {
452     gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
453   }
454 
455   return c;
456 }
457 
458 /* Note: The counter is not incremented/decremented atomically with push/pop.
459  * The count is only eventually consistent */
cq_event_queue_num_items(grpc_cq_event_queue * q)460 static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
461   return static_cast<long>(gpr_atm_no_barrier_load(&q->num_queue_items));
462 }
463 
grpc_completion_queue_create_internal(grpc_cq_completion_type completion_type,grpc_cq_polling_type polling_type,grpc_experimental_completion_queue_functor * shutdown_callback)464 grpc_completion_queue* grpc_completion_queue_create_internal(
465     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
466     grpc_experimental_completion_queue_functor* shutdown_callback) {
467   GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
468 
469   grpc_completion_queue* cq;
470 
471   GRPC_API_TRACE(
472       "grpc_completion_queue_create_internal(completion_type=%d, "
473       "polling_type=%d)",
474       2, (completion_type, polling_type));
475 
476   const cq_vtable* vtable = &g_cq_vtable[completion_type];
477   const cq_poller_vtable* poller_vtable =
478       &g_poller_vtable_by_poller_type[polling_type];
479 
480   grpc_core::ExecCtx exec_ctx;
481   GRPC_STATS_INC_CQS_CREATED();
482 
483   cq = static_cast<grpc_completion_queue*>(
484       gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
485                  poller_vtable->size()));
486 
487   cq->vtable = vtable;
488   cq->poller_vtable = poller_vtable;
489 
490   /* One for destroy(), one for pollset_shutdown */
491   gpr_ref_init(&cq->owning_refs, 2);
492 
493   poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
494   vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
495 
496   GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
497                     grpc_schedule_on_exec_ctx);
498   return cq;
499 }
500 
cq_init_next(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)501 static void cq_init_next(
502     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
503   cq_next_data* cqd = static_cast<cq_next_data*>(data);
504   /* Initial count is dropped by grpc_completion_queue_shutdown */
505   gpr_atm_no_barrier_store(&cqd->pending_events, 1);
506   cqd->shutdown_called = false;
507   gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
508   cq_event_queue_init(&cqd->queue);
509 }
510 
cq_destroy_next(void * data)511 static void cq_destroy_next(void* data) {
512   cq_next_data* cqd = static_cast<cq_next_data*>(data);
513   GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
514   cq_event_queue_destroy(&cqd->queue);
515 }
516 
cq_init_pluck(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)517 static void cq_init_pluck(
518     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
519   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
520   /* Initial count is dropped by grpc_completion_queue_shutdown */
521   gpr_atm_no_barrier_store(&cqd->pending_events, 1);
522   cqd->completed_tail = &cqd->completed_head;
523   cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
524   gpr_atm_no_barrier_store(&cqd->shutdown, 0);
525   cqd->shutdown_called = false;
526   cqd->num_pluckers = 0;
527   gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
528 }
529 
cq_destroy_pluck(void * data)530 static void cq_destroy_pluck(void* data) {
531   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
532   GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
533 }
534 
cq_init_callback(void * data,grpc_experimental_completion_queue_functor * shutdown_callback)535 static void cq_init_callback(
536     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
537   cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
538   /* Initial count is dropped by grpc_completion_queue_shutdown */
539   gpr_atm_no_barrier_store(&cqd->pending_events, 1);
540   cqd->shutdown_called = false;
541   gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
542   cqd->shutdown_callback = shutdown_callback;
543 }
544 
cq_destroy_callback(void * data)545 static void cq_destroy_callback(void* data) {}
546 
grpc_get_cq_completion_type(grpc_completion_queue * cq)547 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
548   return cq->vtable->cq_completion_type;
549 }
550 
grpc_get_cq_poll_num(grpc_completion_queue * cq)551 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
552   int cur_num_polls;
553   gpr_mu_lock(cq->mu);
554   cur_num_polls = cq->num_polls;
555   gpr_mu_unlock(cq->mu);
556   return cur_num_polls;
557 }
558 
559 #ifndef NDEBUG
grpc_cq_internal_ref(grpc_completion_queue * cq,const char * reason,const char * file,int line)560 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
561                           const char* file, int line) {
562   if (grpc_trace_cq_refcount.enabled()) {
563     gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
564     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
565             "CQ:%p   ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
566             reason);
567   }
568 #else
569 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
570 #endif
571   gpr_ref(&cq->owning_refs);
572 }
573 
574 static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
575   grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
576   GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
577 }
578 
579 #ifndef NDEBUG
580 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
581                             const char* file, int line) {
582   if (grpc_trace_cq_refcount.enabled()) {
583     gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
584     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
585             "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
586             reason);
587   }
588 #else
589 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
590 #endif
591   if (gpr_unref(&cq->owning_refs)) {
592     cq->vtable->destroy(DATA_FROM_CQ(cq));
593     cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
594 #ifndef NDEBUG
595     gpr_free(cq->outstanding_tags);
596 #endif
597     gpr_free(cq);
598   }
599 }
600 
601 #ifndef NDEBUG
602 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
603   int found = 0;
604   if (lock_cq) {
605     gpr_mu_lock(cq->mu);
606   }
607 
608   for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
609     if (cq->outstanding_tags[i] == tag) {
610       cq->outstanding_tag_count--;
611       GPR_SWAP(void*, cq->outstanding_tags[i],
612                cq->outstanding_tags[cq->outstanding_tag_count]);
613       found = 1;
614       break;
615     }
616   }
617 
618   if (lock_cq) {
619     gpr_mu_unlock(cq->mu);
620   }
621 
622   GPR_ASSERT(found);
623 }
624 #else
625 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
626 #endif
627 
628 /* Atomically increments a counter only if the counter is not zero. Returns
629  * true if the increment was successful; false if the counter is zero */
630 static bool atm_inc_if_nonzero(gpr_atm* counter) {
631   while (true) {
632     gpr_atm count = gpr_atm_acq_load(counter);
633     /* If zero, we are done. If not, we must to a CAS (instead of an atomic
634      * increment) to maintain the contract: do not increment the counter if it
635      * is zero. */
636     if (count == 0) {
637       return false;
638     } else if (gpr_atm_full_cas(counter, count, count + 1)) {
639       break;
640     }
641   }
642 
643   return true;
644 }
645 
646 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
647   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
648   return atm_inc_if_nonzero(&cqd->pending_events);
649 }
650 
651 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
652   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
653   return atm_inc_if_nonzero(&cqd->pending_events);
654 }
655 
656 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
657   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
658   return atm_inc_if_nonzero(&cqd->pending_events);
659 }
660 
661 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
662 #ifndef NDEBUG
663   gpr_mu_lock(cq->mu);
664   if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
665     cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
666     cq->outstanding_tags = static_cast<void**>(gpr_realloc(
667         cq->outstanding_tags,
668         sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
669   }
670   cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
671   gpr_mu_unlock(cq->mu);
672 #endif
673   return cq->vtable->begin_op(cq, tag);
674 }
675 
676 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
677  * completion
678  * type of GRPC_CQ_NEXT) */
679 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
680                                grpc_error* error,
681                                void (*done)(void* done_arg,
682                                             grpc_cq_completion* storage),
683                                void* done_arg, grpc_cq_completion* storage) {
684   GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
685 
686   if (grpc_api_trace.enabled() ||
687       (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
688     const char* errmsg = grpc_error_string(error);
689     GRPC_API_TRACE(
690         "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
691         "done=%p, done_arg=%p, storage=%p)",
692         6, (cq, tag, errmsg, done, done_arg, storage));
693     if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
694       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
695     }
696   }
697   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
698   int is_success = (error == GRPC_ERROR_NONE);
699 
700   storage->tag = tag;
701   storage->done = done;
702   storage->done_arg = done_arg;
703   storage->next = static_cast<uintptr_t>(is_success);
704 
705   cq_check_tag(cq, tag, true); /* Used in debug builds only */
706 
707   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
708       (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
709     gpr_tls_set(&g_cached_event, (intptr_t)storage);
710   } else {
711     /* Add the completion to the queue */
712     bool is_first = cq_event_queue_push(&cqd->queue, storage);
713     gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
714 
715     /* Since we do not hold the cq lock here, it is important to do an 'acquire'
716        load here (instead of a 'no_barrier' load) to match with the release
717        store
718        (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
719        */
720     bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
721 
722     if (!will_definitely_shutdown) {
723       /* Only kick if this is the first item queued */
724       if (is_first) {
725         gpr_mu_lock(cq->mu);
726         grpc_error* kick_error =
727             cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
728         gpr_mu_unlock(cq->mu);
729 
730         if (kick_error != GRPC_ERROR_NONE) {
731           const char* msg = grpc_error_string(kick_error);
732           gpr_log(GPR_ERROR, "Kick failed: %s", msg);
733           GRPC_ERROR_UNREF(kick_error);
734         }
735       }
736       if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
737         GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
738         gpr_mu_lock(cq->mu);
739         cq_finish_shutdown_next(cq);
740         gpr_mu_unlock(cq->mu);
741         GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
742       }
743     } else {
744       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
745       gpr_atm_rel_store(&cqd->pending_events, 0);
746       gpr_mu_lock(cq->mu);
747       cq_finish_shutdown_next(cq);
748       gpr_mu_unlock(cq->mu);
749       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
750     }
751   }
752 
753   GRPC_ERROR_UNREF(error);
754 }
755 
756 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
757  * completion
758  * type of GRPC_CQ_PLUCK) */
759 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
760                                 grpc_error* error,
761                                 void (*done)(void* done_arg,
762                                              grpc_cq_completion* storage),
763                                 void* done_arg, grpc_cq_completion* storage) {
764   GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
765 
766   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
767   int is_success = (error == GRPC_ERROR_NONE);
768 
769   if (grpc_api_trace.enabled() ||
770       (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
771     const char* errmsg = grpc_error_string(error);
772     GRPC_API_TRACE(
773         "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
774         "done=%p, done_arg=%p, storage=%p)",
775         6, (cq, tag, errmsg, done, done_arg, storage));
776     if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
777       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
778     }
779   }
780 
781   storage->tag = tag;
782   storage->done = done;
783   storage->done_arg = done_arg;
784   storage->next =
785       ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
786 
787   gpr_mu_lock(cq->mu);
788   cq_check_tag(cq, tag, false); /* Used in debug builds only */
789 
790   /* Add to the list of completions */
791   gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
792   cqd->completed_tail->next =
793       ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
794   cqd->completed_tail = storage;
795 
796   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
797     cq_finish_shutdown_pluck(cq);
798     gpr_mu_unlock(cq->mu);
799   } else {
800     grpc_pollset_worker* pluck_worker = nullptr;
801     for (int i = 0; i < cqd->num_pluckers; i++) {
802       if (cqd->pluckers[i].tag == tag) {
803         pluck_worker = *cqd->pluckers[i].worker;
804         break;
805       }
806     }
807 
808     grpc_error* kick_error =
809         cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
810 
811     gpr_mu_unlock(cq->mu);
812 
813     if (kick_error != GRPC_ERROR_NONE) {
814       const char* msg = grpc_error_string(kick_error);
815       gpr_log(GPR_ERROR, "Kick failed: %s", msg);
816 
817       GRPC_ERROR_UNREF(kick_error);
818     }
819   }
820 
821   GRPC_ERROR_UNREF(error);
822 }
823 
824 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
825 static void cq_end_op_for_callback(
826     grpc_completion_queue* cq, void* tag, grpc_error* error,
827     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
828     grpc_cq_completion* storage) {
829   GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
830 
831   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
832   bool is_success = (error == GRPC_ERROR_NONE);
833 
834   if (grpc_api_trace.enabled() ||
835       (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
836     const char* errmsg = grpc_error_string(error);
837     GRPC_API_TRACE(
838         "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
839         "done=%p, done_arg=%p, storage=%p)",
840         6, (cq, tag, errmsg, done, done_arg, storage));
841     if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
842       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
843     }
844   }
845 
846   // The callback-based CQ isn't really a queue at all and thus has no need
847   // for reserved storage. Invoke the done callback right away to release it.
848   done(done_arg, storage);
849 
850   gpr_mu_lock(cq->mu);
851   cq_check_tag(cq, tag, false); /* Used in debug builds only */
852 
853   gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
854   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
855     cq_finish_shutdown_callback(cq);
856     gpr_mu_unlock(cq->mu);
857   } else {
858     gpr_mu_unlock(cq->mu);
859   }
860 
861   GRPC_ERROR_UNREF(error);
862 
863   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
864   (*functor->functor_run)(functor, is_success);
865 }
866 
867 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
868                     void (*done)(void* done_arg, grpc_cq_completion* storage),
869                     void* done_arg, grpc_cq_completion* storage) {
870   cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
871 }
872 
873 typedef struct {
874   gpr_atm last_seen_things_queued_ever;
875   grpc_completion_queue* cq;
876   grpc_millis deadline;
877   grpc_cq_completion* stolen_completion;
878   void* tag; /* for pluck */
879   bool first_loop;
880 } cq_is_finished_arg;
881 
882 class ExecCtxNext : public grpc_core::ExecCtx {
883  public:
884   ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
885 
886   bool CheckReadyToFinish() override {
887     cq_is_finished_arg* a =
888         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
889     grpc_completion_queue* cq = a->cq;
890     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
891     GPR_ASSERT(a->stolen_completion == nullptr);
892 
893     gpr_atm current_last_seen_things_queued_ever =
894         gpr_atm_no_barrier_load(&cqd->things_queued_ever);
895 
896     if (current_last_seen_things_queued_ever !=
897         a->last_seen_things_queued_ever) {
898       a->last_seen_things_queued_ever =
899           gpr_atm_no_barrier_load(&cqd->things_queued_ever);
900 
901       /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
902        * might return NULL in some cases even if the queue is not empty; but
903        * that
904        * is ok and doesn't affect correctness. Might effect the tail latencies a
905        * bit) */
906       a->stolen_completion = cq_event_queue_pop(&cqd->queue);
907       if (a->stolen_completion != nullptr) {
908         return true;
909       }
910     }
911     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
912   }
913 
914  private:
915   void* check_ready_to_finish_arg_;
916 };
917 
918 #ifndef NDEBUG
919 static void dump_pending_tags(grpc_completion_queue* cq) {
920   if (!grpc_trace_pending_tags.enabled()) return;
921 
922   gpr_strvec v;
923   gpr_strvec_init(&v);
924   gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
925   gpr_mu_lock(cq->mu);
926   for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
927     char* s;
928     gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
929     gpr_strvec_add(&v, s);
930   }
931   gpr_mu_unlock(cq->mu);
932   char* out = gpr_strvec_flatten(&v, nullptr);
933   gpr_strvec_destroy(&v);
934   gpr_log(GPR_DEBUG, "%s", out);
935   gpr_free(out);
936 }
937 #else
938 static void dump_pending_tags(grpc_completion_queue* cq) {}
939 #endif
940 
941 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
942                           void* reserved) {
943   GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
944 
945   grpc_event ret;
946   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
947 
948   GRPC_API_TRACE(
949       "grpc_completion_queue_next("
950       "cq=%p, "
951       "deadline=gpr_timespec { tv_sec: %" PRId64
952       ", tv_nsec: %d, clock_type: %d }, "
953       "reserved=%p)",
954       5,
955       (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
956        reserved));
957   GPR_ASSERT(!reserved);
958 
959   dump_pending_tags(cq);
960 
961   GRPC_CQ_INTERNAL_REF(cq, "next");
962 
963   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
964   cq_is_finished_arg is_finished_arg = {
965       gpr_atm_no_barrier_load(&cqd->things_queued_ever),
966       cq,
967       deadline_millis,
968       nullptr,
969       nullptr,
970       true};
971   ExecCtxNext exec_ctx(&is_finished_arg);
972   for (;;) {
973     grpc_millis iteration_deadline = deadline_millis;
974 
975     if (is_finished_arg.stolen_completion != nullptr) {
976       grpc_cq_completion* c = is_finished_arg.stolen_completion;
977       is_finished_arg.stolen_completion = nullptr;
978       ret.type = GRPC_OP_COMPLETE;
979       ret.success = c->next & 1u;
980       ret.tag = c->tag;
981       c->done(c->done_arg, c);
982       break;
983     }
984 
985     grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
986 
987     if (c != nullptr) {
988       ret.type = GRPC_OP_COMPLETE;
989       ret.success = c->next & 1u;
990       ret.tag = c->tag;
991       c->done(c->done_arg, c);
992       break;
993     } else {
994       /* If c == NULL it means either the queue is empty OR in an transient
995          inconsistent state. If it is the latter, we shold do a 0-timeout poll
996          so that the thread comes back quickly from poll to make a second
997          attempt at popping. Not doing this can potentially deadlock this
998          thread forever (if the deadline is infinity) */
999       if (cq_event_queue_num_items(&cqd->queue) > 0) {
1000         iteration_deadline = 0;
1001       }
1002     }
1003 
1004     if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
1005       /* Before returning, check if the queue has any items left over (since
1006          gpr_mpscq_pop() can sometimes return NULL even if the queue is not
1007          empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
1008       if (cq_event_queue_num_items(&cqd->queue) > 0) {
1009         /* Go to the beginning of the loop. No point doing a poll because
1010            (cq->shutdown == true) is only possible when there is no pending
1011            work (i.e cq->pending_events == 0) and any outstanding completion
1012            events should have already been queued on this cq */
1013         continue;
1014       }
1015 
1016       memset(&ret, 0, sizeof(ret));
1017       ret.type = GRPC_QUEUE_SHUTDOWN;
1018       break;
1019     }
1020 
1021     if (!is_finished_arg.first_loop &&
1022         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1023       memset(&ret, 0, sizeof(ret));
1024       ret.type = GRPC_QUEUE_TIMEOUT;
1025       dump_pending_tags(cq);
1026       break;
1027     }
1028 
1029     /* The main polling work happens in grpc_pollset_work */
1030     gpr_mu_lock(cq->mu);
1031     cq->num_polls++;
1032     grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
1033                                               iteration_deadline);
1034     gpr_mu_unlock(cq->mu);
1035 
1036     if (err != GRPC_ERROR_NONE) {
1037       const char* msg = grpc_error_string(err);
1038       gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
1039 
1040       GRPC_ERROR_UNREF(err);
1041       memset(&ret, 0, sizeof(ret));
1042       ret.type = GRPC_QUEUE_TIMEOUT;
1043       dump_pending_tags(cq);
1044       break;
1045     }
1046     is_finished_arg.first_loop = false;
1047   }
1048 
1049   if (cq_event_queue_num_items(&cqd->queue) > 0 &&
1050       gpr_atm_acq_load(&cqd->pending_events) > 0) {
1051     gpr_mu_lock(cq->mu);
1052     cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1053     gpr_mu_unlock(cq->mu);
1054   }
1055 
1056   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1057   GRPC_CQ_INTERNAL_UNREF(cq, "next");
1058 
1059   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1060 
1061   return ret;
1062 }
1063 
1064 /* Finishes the completion queue shutdown. This means that there are no more
1065    completion events / tags expected from the completion queue
1066    - Must be called under completion queue lock
1067    - Must be called only once in completion queue's lifetime
1068    - grpc_completion_queue_shutdown() MUST have been called before calling
1069    this function */
1070 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1071   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1072 
1073   GPR_ASSERT(cqd->shutdown_called);
1074   GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
1075 
1076   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1077 }
1078 
1079 static void cq_shutdown_next(grpc_completion_queue* cq) {
1080   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1081 
1082   /* Need an extra ref for cq here because:
1083    * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1084    * Pollset shutdown decrements the cq ref count which can potentially destroy
1085    * the cq (if that happens to be the last ref).
1086    * Creating an extra ref here prevents the cq from getting destroyed while
1087    * this function is still active */
1088   GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1089   gpr_mu_lock(cq->mu);
1090   if (cqd->shutdown_called) {
1091     gpr_mu_unlock(cq->mu);
1092     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1093     return;
1094   }
1095   cqd->shutdown_called = true;
1096   /* Doing a full_fetch_add (i.e acq/release) here to match with
1097    * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
1098    * on this counter without necessarily holding a lock on cq */
1099   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1100     cq_finish_shutdown_next(cq);
1101   }
1102   gpr_mu_unlock(cq->mu);
1103   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1104 }
1105 
1106 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1107                                       gpr_timespec deadline, void* reserved) {
1108   return cq->vtable->next(cq, deadline, reserved);
1109 }
1110 
1111 static int add_plucker(grpc_completion_queue* cq, void* tag,
1112                        grpc_pollset_worker** worker) {
1113   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1114   if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1115     return 0;
1116   }
1117   cqd->pluckers[cqd->num_pluckers].tag = tag;
1118   cqd->pluckers[cqd->num_pluckers].worker = worker;
1119   cqd->num_pluckers++;
1120   return 1;
1121 }
1122 
1123 static void del_plucker(grpc_completion_queue* cq, void* tag,
1124                         grpc_pollset_worker** worker) {
1125   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1126   for (int i = 0; i < cqd->num_pluckers; i++) {
1127     if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1128       cqd->num_pluckers--;
1129       GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1130       return;
1131     }
1132   }
1133   GPR_UNREACHABLE_CODE(return );
1134 }
1135 
1136 class ExecCtxPluck : public grpc_core::ExecCtx {
1137  public:
1138   ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1139 
1140   bool CheckReadyToFinish() override {
1141     cq_is_finished_arg* a =
1142         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1143     grpc_completion_queue* cq = a->cq;
1144     cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1145 
1146     GPR_ASSERT(a->stolen_completion == nullptr);
1147     gpr_atm current_last_seen_things_queued_ever =
1148         gpr_atm_no_barrier_load(&cqd->things_queued_ever);
1149     if (current_last_seen_things_queued_ever !=
1150         a->last_seen_things_queued_ever) {
1151       gpr_mu_lock(cq->mu);
1152       a->last_seen_things_queued_ever =
1153           gpr_atm_no_barrier_load(&cqd->things_queued_ever);
1154       grpc_cq_completion* c;
1155       grpc_cq_completion* prev = &cqd->completed_head;
1156       while ((c = (grpc_cq_completion*)(prev->next &
1157                                         ~static_cast<uintptr_t>(1))) !=
1158              &cqd->completed_head) {
1159         if (c->tag == a->tag) {
1160           prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1161                        (c->next & ~static_cast<uintptr_t>(1));
1162           if (c == cqd->completed_tail) {
1163             cqd->completed_tail = prev;
1164           }
1165           gpr_mu_unlock(cq->mu);
1166           a->stolen_completion = c;
1167           return true;
1168         }
1169         prev = c;
1170       }
1171       gpr_mu_unlock(cq->mu);
1172     }
1173     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1174   }
1175 
1176  private:
1177   void* check_ready_to_finish_arg_;
1178 };
1179 
1180 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1181                            gpr_timespec deadline, void* reserved) {
1182   GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1183 
1184   grpc_event ret;
1185   grpc_cq_completion* c;
1186   grpc_cq_completion* prev;
1187   grpc_pollset_worker* worker = nullptr;
1188   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1189 
1190   if (grpc_cq_pluck_trace.enabled()) {
1191     GRPC_API_TRACE(
1192         "grpc_completion_queue_pluck("
1193         "cq=%p, tag=%p, "
1194         "deadline=gpr_timespec { tv_sec: %" PRId64
1195         ", tv_nsec: %d, clock_type: %d }, "
1196         "reserved=%p)",
1197         6,
1198         (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1199          reserved));
1200   }
1201   GPR_ASSERT(!reserved);
1202 
1203   dump_pending_tags(cq);
1204 
1205   GRPC_CQ_INTERNAL_REF(cq, "pluck");
1206   gpr_mu_lock(cq->mu);
1207   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1208   cq_is_finished_arg is_finished_arg = {
1209       gpr_atm_no_barrier_load(&cqd->things_queued_ever),
1210       cq,
1211       deadline_millis,
1212       nullptr,
1213       tag,
1214       true};
1215   ExecCtxPluck exec_ctx(&is_finished_arg);
1216   for (;;) {
1217     if (is_finished_arg.stolen_completion != nullptr) {
1218       gpr_mu_unlock(cq->mu);
1219       c = is_finished_arg.stolen_completion;
1220       is_finished_arg.stolen_completion = nullptr;
1221       ret.type = GRPC_OP_COMPLETE;
1222       ret.success = c->next & 1u;
1223       ret.tag = c->tag;
1224       c->done(c->done_arg, c);
1225       break;
1226     }
1227     prev = &cqd->completed_head;
1228     while (
1229         (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
1230         &cqd->completed_head) {
1231       if (c->tag == tag) {
1232         prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1233                      (c->next & ~static_cast<uintptr_t>(1));
1234         if (c == cqd->completed_tail) {
1235           cqd->completed_tail = prev;
1236         }
1237         gpr_mu_unlock(cq->mu);
1238         ret.type = GRPC_OP_COMPLETE;
1239         ret.success = c->next & 1u;
1240         ret.tag = c->tag;
1241         c->done(c->done_arg, c);
1242         goto done;
1243       }
1244       prev = c;
1245     }
1246     if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
1247       gpr_mu_unlock(cq->mu);
1248       memset(&ret, 0, sizeof(ret));
1249       ret.type = GRPC_QUEUE_SHUTDOWN;
1250       break;
1251     }
1252     if (!add_plucker(cq, tag, &worker)) {
1253       gpr_log(GPR_DEBUG,
1254               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1255               "is %d",
1256               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1257       gpr_mu_unlock(cq->mu);
1258       memset(&ret, 0, sizeof(ret));
1259       /* TODO(ctiller): should we use a different result here */
1260       ret.type = GRPC_QUEUE_TIMEOUT;
1261       dump_pending_tags(cq);
1262       break;
1263     }
1264     if (!is_finished_arg.first_loop &&
1265         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1266       del_plucker(cq, tag, &worker);
1267       gpr_mu_unlock(cq->mu);
1268       memset(&ret, 0, sizeof(ret));
1269       ret.type = GRPC_QUEUE_TIMEOUT;
1270       dump_pending_tags(cq);
1271       break;
1272     }
1273     cq->num_polls++;
1274     grpc_error* err =
1275         cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1276     if (err != GRPC_ERROR_NONE) {
1277       del_plucker(cq, tag, &worker);
1278       gpr_mu_unlock(cq->mu);
1279       const char* msg = grpc_error_string(err);
1280       gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
1281 
1282       GRPC_ERROR_UNREF(err);
1283       memset(&ret, 0, sizeof(ret));
1284       ret.type = GRPC_QUEUE_TIMEOUT;
1285       dump_pending_tags(cq);
1286       break;
1287     }
1288     is_finished_arg.first_loop = false;
1289     del_plucker(cq, tag, &worker);
1290   }
1291 done:
1292   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1293   GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1294 
1295   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1296 
1297   return ret;
1298 }
1299 
1300 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1301                                        gpr_timespec deadline, void* reserved) {
1302   return cq->vtable->pluck(cq, tag, deadline, reserved);
1303 }
1304 
1305 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1306   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1307 
1308   GPR_ASSERT(cqd->shutdown_called);
1309   GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
1310   gpr_atm_no_barrier_store(&cqd->shutdown, 1);
1311 
1312   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1313 }
1314 
1315 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1316  * merging them is a bit tricky and probably not worth it */
1317 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1318   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1319 
1320   /* Need an extra ref for cq here because:
1321    * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1322    * Pollset shutdown decrements the cq ref count which can potentially destroy
1323    * the cq (if that happens to be the last ref).
1324    * Creating an extra ref here prevents the cq from getting destroyed while
1325    * this function is still active */
1326   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1327   gpr_mu_lock(cq->mu);
1328   if (cqd->shutdown_called) {
1329     gpr_mu_unlock(cq->mu);
1330     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1331     return;
1332   }
1333   cqd->shutdown_called = true;
1334   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1335     cq_finish_shutdown_pluck(cq);
1336   }
1337   gpr_mu_unlock(cq->mu);
1338   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1339 }
1340 
1341 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1342   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1343   auto* callback = cqd->shutdown_callback;
1344 
1345   GPR_ASSERT(cqd->shutdown_called);
1346 
1347   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1348   (*callback->functor_run)(callback, true);
1349 }
1350 
1351 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1352   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1353 
1354   /* Need an extra ref for cq here because:
1355    * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1356    * Pollset shutdown decrements the cq ref count which can potentially destroy
1357    * the cq (if that happens to be the last ref).
1358    * Creating an extra ref here prevents the cq from getting destroyed while
1359    * this function is still active */
1360   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1361   gpr_mu_lock(cq->mu);
1362   if (cqd->shutdown_called) {
1363     gpr_mu_unlock(cq->mu);
1364     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1365     return;
1366   }
1367   cqd->shutdown_called = true;
1368   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
1369     gpr_mu_unlock(cq->mu);
1370     cq_finish_shutdown_callback(cq);
1371   } else {
1372     gpr_mu_unlock(cq->mu);
1373   }
1374   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1375 }
1376 
1377 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1378    to zero here, then enter shutdown mode and wake up any waiters */
1379 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1380   GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1381   grpc_core::ExecCtx exec_ctx;
1382   GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1383   cq->vtable->shutdown(cq);
1384 }
1385 
1386 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1387   GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1388   GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1389   grpc_completion_queue_shutdown(cq);
1390 
1391   grpc_core::ExecCtx exec_ctx;
1392   GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1393 }
1394 
1395 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1396   return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1397 }
1398 
1399 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1400   return cq->poller_vtable->can_listen;
1401 }
1402