1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/server.h"
22 
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/channel/connected_channel.h"
33 #include "src/core/lib/debug/stats.h"
34 #include "src/core/lib/gpr/mpscq.h"
35 #include "src/core/lib/gpr/spinlock.h"
36 #include "src/core/lib/gpr/string.h"
37 #include "src/core/lib/iomgr/executor.h"
38 #include "src/core/lib/iomgr/iomgr.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 #include "src/core/lib/surface/api_trace.h"
41 #include "src/core/lib/surface/call.h"
42 #include "src/core/lib/surface/channel.h"
43 #include "src/core/lib/surface/completion_queue.h"
44 #include "src/core/lib/surface/init.h"
45 #include "src/core/lib/transport/metadata.h"
46 #include "src/core/lib/transport/static_metadata.h"
47 
48 grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
49 
50 namespace {
51 struct listener {
52   void* arg;
53   void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
54                 size_t pollset_count);
55   void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
56   struct listener* next;
57   grpc_closure destroy_done;
58 };
59 
60 enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
61 
62 struct registered_method;
63 
64 struct requested_call {
65   gpr_mpscq_node request_link; /* must be first */
66   requested_call_type type;
67   size_t cq_idx;
68   void* tag;
69   grpc_server* server;
70   grpc_completion_queue* cq_bound_to_call;
71   grpc_call** call;
72   grpc_cq_completion completion;
73   grpc_metadata_array* initial_metadata;
74   union {
75     struct {
76       grpc_call_details* details;
77     } batch;
78     struct {
79       registered_method* method;
80       gpr_timespec* deadline;
81       grpc_byte_buffer** optional_payload;
82     } registered;
83   } data;
84 };
85 
86 struct channel_registered_method {
87   registered_method* server_registered_method;
88   uint32_t flags;
89   bool has_host;
90   grpc_slice method;
91   grpc_slice host;
92 };
93 
94 struct channel_data {
95   grpc_server* server;
96   grpc_connectivity_state connectivity_state;
97   grpc_channel* channel;
98   size_t cq_idx;
99   /* linked list of all channels on a server */
100   channel_data* next;
101   channel_data* prev;
102   channel_registered_method* registered_methods;
103   uint32_t registered_method_slots;
104   uint32_t registered_method_max_probes;
105   grpc_closure finish_destroy_channel_closure;
106   grpc_closure channel_connectivity_changed;
107 };
108 
109 typedef struct shutdown_tag {
110   void* tag;
111   grpc_completion_queue* cq;
112   grpc_cq_completion completion;
113 } shutdown_tag;
114 
115 typedef enum {
116   /* waiting for metadata */
117   NOT_STARTED,
118   /* inital metadata read, not flow controlled in yet */
119   PENDING,
120   /* flow controlled in, on completion queue */
121   ACTIVATED,
122   /* cancelled before being queued */
123   ZOMBIED
124 } call_state;
125 
126 typedef struct request_matcher request_matcher;
127 
128 struct call_data {
129   grpc_call* call;
130 
131   gpr_atm state;
132 
133   bool path_set;
134   bool host_set;
135   grpc_slice path;
136   grpc_slice host;
137   grpc_millis deadline;
138 
139   grpc_completion_queue* cq_new;
140 
141   grpc_metadata_batch* recv_initial_metadata;
142   uint32_t recv_initial_metadata_flags;
143   grpc_metadata_array initial_metadata;
144 
145   request_matcher* matcher;
146   grpc_byte_buffer* payload;
147 
148   grpc_closure got_initial_metadata;
149   grpc_closure server_on_recv_initial_metadata;
150   grpc_closure kill_zombie_closure;
151   grpc_closure* on_done_recv_initial_metadata;
152   grpc_closure recv_trailing_metadata_ready;
153   grpc_error* error;
154   grpc_closure* original_recv_trailing_metadata_ready;
155 
156   grpc_closure publish;
157 
158   call_data* pending_next;
159 };
160 
161 struct request_matcher {
162   grpc_server* server;
163   call_data* pending_head;
164   call_data* pending_tail;
165   gpr_locked_mpscq* requests_per_cq;
166 };
167 
168 struct registered_method {
169   char* method;
170   char* host;
171   grpc_server_register_method_payload_handling payload_handling;
172   uint32_t flags;
173   /* one request matcher per method */
174   request_matcher matcher;
175   registered_method* next;
176 };
177 
178 typedef struct {
179   grpc_channel** channels;
180   size_t num_channels;
181 } channel_broadcaster;
182 }  // namespace
183 
184 struct grpc_server {
185   grpc_channel_args* channel_args;
186 
187   grpc_completion_queue** cqs;
188   grpc_pollset** pollsets;
189   size_t cq_count;
190   size_t pollset_count;
191   bool started;
192 
193   /* The two following mutexes control access to server-state
194      mu_global controls access to non-call-related state (e.g., channel state)
195      mu_call controls access to call-related state (e.g., the call lists)
196 
197      If they are ever required to be nested, you must lock mu_global
198      before mu_call. This is currently used in shutdown processing
199      (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
200   gpr_mu mu_global; /* mutex for server and channel state */
201   gpr_mu mu_call;   /* mutex for call-specific state */
202 
203   /* startup synchronization: flag is protected by mu_global, signals whether
204      we are doing the listener start routine or not */
205   bool starting;
206   gpr_cv starting_cv;
207 
208   registered_method* registered_methods;
209   /** one request matcher for unregistered methods */
210   request_matcher unregistered_request_matcher;
211 
212   gpr_atm shutdown_flag;
213   uint8_t shutdown_published;
214   size_t num_shutdown_tags;
215   shutdown_tag* shutdown_tags;
216 
217   channel_data root_channel_data;
218 
219   listener* listeners;
220   int listeners_destroyed;
221   gpr_refcount internal_refcount;
222 
223   /** when did we print the last shutdown progress message */
224   gpr_timespec last_shutdown_message_time;
225 
226   grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
227 };
228 
229 #define SERVER_FROM_CALL_ELEM(elem) \
230   (((channel_data*)(elem)->channel_data)->server)
231 
232 static void publish_new_rpc(void* calld, grpc_error* error);
233 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
234                       grpc_error* error);
235 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
236    hold mu_call */
237 static void maybe_finish_shutdown(grpc_server* server);
238 
239 /*
240  * channel broadcaster
241  */
242 
243 /* assumes server locked */
channel_broadcaster_init(grpc_server * s,channel_broadcaster * cb)244 static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
245   channel_data* c;
246   size_t count = 0;
247   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
248     count++;
249   }
250   cb->num_channels = count;
251   cb->channels = static_cast<grpc_channel**>(
252       gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
253   count = 0;
254   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
255     cb->channels[count++] = c->channel;
256     GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
257   }
258 }
259 
260 struct shutdown_cleanup_args {
261   grpc_closure closure;
262   grpc_slice slice;
263 };
264 
shutdown_cleanup(void * arg,grpc_error * error)265 static void shutdown_cleanup(void* arg, grpc_error* error) {
266   struct shutdown_cleanup_args* a =
267       static_cast<struct shutdown_cleanup_args*>(arg);
268   grpc_slice_unref_internal(a->slice);
269   gpr_free(a);
270 }
271 
send_shutdown(grpc_channel * channel,bool send_goaway,grpc_error * send_disconnect)272 static void send_shutdown(grpc_channel* channel, bool send_goaway,
273                           grpc_error* send_disconnect) {
274   struct shutdown_cleanup_args* sc =
275       static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
276   GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
277                     grpc_schedule_on_exec_ctx);
278   grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
279   grpc_channel_element* elem;
280 
281   op->goaway_error =
282       send_goaway ? grpc_error_set_int(
283                         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
284                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
285                   : GRPC_ERROR_NONE;
286   op->set_accept_stream = true;
287   sc->slice = grpc_slice_from_copied_string("Server shutdown");
288   op->disconnect_with_error = send_disconnect;
289 
290   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
291   elem->filter->start_transport_op(elem, op);
292 }
293 
channel_broadcaster_shutdown(channel_broadcaster * cb,bool send_goaway,grpc_error * force_disconnect)294 static void channel_broadcaster_shutdown(channel_broadcaster* cb,
295                                          bool send_goaway,
296                                          grpc_error* force_disconnect) {
297   size_t i;
298 
299   for (i = 0; i < cb->num_channels; i++) {
300     send_shutdown(cb->channels[i], send_goaway,
301                   GRPC_ERROR_REF(force_disconnect));
302     GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
303   }
304   gpr_free(cb->channels);
305   GRPC_ERROR_UNREF(force_disconnect);
306 }
307 
308 /*
309  * request_matcher
310  */
311 
request_matcher_init(request_matcher * rm,grpc_server * server)312 static void request_matcher_init(request_matcher* rm, grpc_server* server) {
313   memset(rm, 0, sizeof(*rm));
314   rm->server = server;
315   rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
316       gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
317   for (size_t i = 0; i < server->cq_count; i++) {
318     gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
319   }
320 }
321 
request_matcher_destroy(request_matcher * rm)322 static void request_matcher_destroy(request_matcher* rm) {
323   for (size_t i = 0; i < rm->server->cq_count; i++) {
324     GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
325     gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
326   }
327   gpr_free(rm->requests_per_cq);
328 }
329 
kill_zombie(void * elem,grpc_error * error)330 static void kill_zombie(void* elem, grpc_error* error) {
331   grpc_call_unref(
332       grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
333 }
334 
request_matcher_zombify_all_pending_calls(request_matcher * rm)335 static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
336   while (rm->pending_head) {
337     call_data* calld = rm->pending_head;
338     rm->pending_head = calld->pending_next;
339     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
340     GRPC_CLOSURE_INIT(
341         &calld->kill_zombie_closure, kill_zombie,
342         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
343         grpc_schedule_on_exec_ctx);
344     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
345   }
346 }
347 
request_matcher_kill_requests(grpc_server * server,request_matcher * rm,grpc_error * error)348 static void request_matcher_kill_requests(grpc_server* server,
349                                           request_matcher* rm,
350                                           grpc_error* error) {
351   requested_call* rc;
352   for (size_t i = 0; i < server->cq_count; i++) {
353     while ((rc = reinterpret_cast<requested_call*>(
354                 gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {
355       fail_call(server, i, rc, GRPC_ERROR_REF(error));
356     }
357   }
358   GRPC_ERROR_UNREF(error);
359 }
360 
361 /*
362  * server proper
363  */
364 
server_ref(grpc_server * server)365 static void server_ref(grpc_server* server) {
366   gpr_ref(&server->internal_refcount);
367 }
368 
server_delete(grpc_server * server)369 static void server_delete(grpc_server* server) {
370   registered_method* rm;
371   size_t i;
372   server->channelz_server.reset();
373   grpc_channel_args_destroy(server->channel_args);
374   gpr_mu_destroy(&server->mu_global);
375   gpr_mu_destroy(&server->mu_call);
376   gpr_cv_destroy(&server->starting_cv);
377   while ((rm = server->registered_methods) != nullptr) {
378     server->registered_methods = rm->next;
379     if (server->started) {
380       request_matcher_destroy(&rm->matcher);
381     }
382     gpr_free(rm->method);
383     gpr_free(rm->host);
384     gpr_free(rm);
385   }
386   if (server->started) {
387     request_matcher_destroy(&server->unregistered_request_matcher);
388   }
389   for (i = 0; i < server->cq_count; i++) {
390     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
391   }
392   gpr_free(server->cqs);
393   gpr_free(server->pollsets);
394   gpr_free(server->shutdown_tags);
395   gpr_free(server);
396 }
397 
server_unref(grpc_server * server)398 static void server_unref(grpc_server* server) {
399   if (gpr_unref(&server->internal_refcount)) {
400     server_delete(server);
401   }
402 }
403 
is_channel_orphaned(channel_data * chand)404 static int is_channel_orphaned(channel_data* chand) {
405   return chand->next == chand;
406 }
407 
orphan_channel(channel_data * chand)408 static void orphan_channel(channel_data* chand) {
409   chand->next->prev = chand->prev;
410   chand->prev->next = chand->next;
411   chand->next = chand->prev = chand;
412 }
413 
finish_destroy_channel(void * cd,grpc_error * error)414 static void finish_destroy_channel(void* cd, grpc_error* error) {
415   channel_data* chand = static_cast<channel_data*>(cd);
416   grpc_server* server = chand->server;
417   GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
418   server_unref(server);
419 }
420 
destroy_channel(channel_data * chand,grpc_error * error)421 static void destroy_channel(channel_data* chand, grpc_error* error) {
422   if (is_channel_orphaned(chand)) return;
423   GPR_ASSERT(chand->server != nullptr);
424   orphan_channel(chand);
425   server_ref(chand->server);
426   maybe_finish_shutdown(chand->server);
427   GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
428                     finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
429 
430   if (grpc_server_channel_trace.enabled() && error != GRPC_ERROR_NONE) {
431     const char* msg = grpc_error_string(error);
432     gpr_log(GPR_INFO, "Disconnected client: %s", msg);
433   }
434   GRPC_ERROR_UNREF(error);
435 
436   grpc_transport_op* op =
437       grpc_make_transport_op(&chand->finish_destroy_channel_closure);
438   op->set_accept_stream = true;
439   grpc_channel_next_op(grpc_channel_stack_element(
440                            grpc_channel_get_channel_stack(chand->channel), 0),
441                        op);
442 }
443 
done_request_event(void * req,grpc_cq_completion * c)444 static void done_request_event(void* req, grpc_cq_completion* c) {
445   gpr_free(req);
446 }
447 
publish_call(grpc_server * server,call_data * calld,size_t cq_idx,requested_call * rc)448 static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
449                          requested_call* rc) {
450   grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
451   grpc_call* call = calld->call;
452   *rc->call = call;
453   calld->cq_new = server->cqs[cq_idx];
454   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
455   switch (rc->type) {
456     case BATCH_CALL:
457       GPR_ASSERT(calld->host_set);
458       GPR_ASSERT(calld->path_set);
459       rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
460       rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
461       rc->data.batch.details->deadline =
462           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
463       rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
464       break;
465     case REGISTERED_CALL:
466       *rc->data.registered.deadline =
467           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
468       if (rc->data.registered.optional_payload) {
469         *rc->data.registered.optional_payload = calld->payload;
470         calld->payload = nullptr;
471       }
472       break;
473     default:
474       GPR_UNREACHABLE_CODE(return );
475   }
476 
477   grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
478                  rc, &rc->completion);
479 }
480 
publish_new_rpc(void * arg,grpc_error * error)481 static void publish_new_rpc(void* arg, grpc_error* error) {
482   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
483   call_data* calld = static_cast<call_data*>(call_elem->call_data);
484   channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
485   request_matcher* rm = calld->matcher;
486   grpc_server* server = rm->server;
487 
488   if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
489     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
490     GRPC_CLOSURE_INIT(
491         &calld->kill_zombie_closure, kill_zombie,
492         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
493         grpc_schedule_on_exec_ctx);
494     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
495     return;
496   }
497 
498   for (size_t i = 0; i < server->cq_count; i++) {
499     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
500     requested_call* rc = reinterpret_cast<requested_call*>(
501         gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));
502     if (rc == nullptr) {
503       continue;
504     } else {
505       GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
506       gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
507       publish_call(server, calld, cq_idx, rc);
508       return; /* early out */
509     }
510   }
511 
512   /* no cq to take the request found: queue it on the slow list */
513   GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
514   gpr_mu_lock(&server->mu_call);
515 
516   // We need to ensure that all the queues are empty.  We do this under
517   // the server mu_call lock to ensure that if something is added to
518   // an empty request queue, it will block until the call is actually
519   // added to the pending list.
520   for (size_t i = 0; i < server->cq_count; i++) {
521     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
522     requested_call* rc = reinterpret_cast<requested_call*>(
523         gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
524     if (rc == nullptr) {
525       continue;
526     } else {
527       gpr_mu_unlock(&server->mu_call);
528       GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
529       gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
530       publish_call(server, calld, cq_idx, rc);
531       return; /* early out */
532     }
533   }
534 
535   gpr_atm_no_barrier_store(&calld->state, PENDING);
536   if (rm->pending_head == nullptr) {
537     rm->pending_tail = rm->pending_head = calld;
538   } else {
539     rm->pending_tail->pending_next = calld;
540     rm->pending_tail = calld;
541   }
542   calld->pending_next = nullptr;
543   gpr_mu_unlock(&server->mu_call);
544 }
545 
finish_start_new_rpc(grpc_server * server,grpc_call_element * elem,request_matcher * rm,grpc_server_register_method_payload_handling payload_handling)546 static void finish_start_new_rpc(
547     grpc_server* server, grpc_call_element* elem, request_matcher* rm,
548     grpc_server_register_method_payload_handling payload_handling) {
549   call_data* calld = static_cast<call_data*>(elem->call_data);
550 
551   if (gpr_atm_acq_load(&server->shutdown_flag)) {
552     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
553     GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
554                       grpc_schedule_on_exec_ctx);
555     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
556     return;
557   }
558 
559   calld->matcher = rm;
560 
561   switch (payload_handling) {
562     case GRPC_SRM_PAYLOAD_NONE:
563       publish_new_rpc(elem, GRPC_ERROR_NONE);
564       break;
565     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
566       grpc_op op;
567       memset(&op, 0, sizeof(op));
568       op.op = GRPC_OP_RECV_MESSAGE;
569       op.data.recv_message.recv_message = &calld->payload;
570       GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
571                         grpc_schedule_on_exec_ctx);
572       grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
573       break;
574     }
575   }
576 }
577 
start_new_rpc(grpc_call_element * elem)578 static void start_new_rpc(grpc_call_element* elem) {
579   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
580   call_data* calld = static_cast<call_data*>(elem->call_data);
581   grpc_server* server = chand->server;
582   uint32_t i;
583   uint32_t hash;
584   channel_registered_method* rm;
585 
586   if (chand->registered_methods && calld->path_set && calld->host_set) {
587     /* TODO(ctiller): unify these two searches */
588     /* check for an exact match with host */
589     hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(calld->host),
590                               grpc_slice_hash(calld->path));
591     for (i = 0; i <= chand->registered_method_max_probes; i++) {
592       rm = &chand->registered_methods[(hash + i) %
593                                       chand->registered_method_slots];
594       if (!rm) break;
595       if (!rm->has_host) continue;
596       if (!grpc_slice_eq(rm->host, calld->host)) continue;
597       if (!grpc_slice_eq(rm->method, calld->path)) continue;
598       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
599           0 == (calld->recv_initial_metadata_flags &
600                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
601         continue;
602       }
603       finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
604                            rm->server_registered_method->payload_handling);
605       return;
606     }
607     /* check for a wildcard method definition (no host set) */
608     hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash(calld->path));
609     for (i = 0; i <= chand->registered_method_max_probes; i++) {
610       rm = &chand->registered_methods[(hash + i) %
611                                       chand->registered_method_slots];
612       if (!rm) break;
613       if (rm->has_host) continue;
614       if (!grpc_slice_eq(rm->method, calld->path)) continue;
615       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
616           0 == (calld->recv_initial_metadata_flags &
617                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
618         continue;
619       }
620       finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
621                            rm->server_registered_method->payload_handling);
622       return;
623     }
624   }
625   finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
626                        GRPC_SRM_PAYLOAD_NONE);
627 }
628 
num_listeners(grpc_server * server)629 static int num_listeners(grpc_server* server) {
630   listener* l;
631   int n = 0;
632   for (l = server->listeners; l; l = l->next) {
633     n++;
634   }
635   return n;
636 }
637 
done_shutdown_event(void * server,grpc_cq_completion * completion)638 static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
639   server_unref(static_cast<grpc_server*>(server));
640 }
641 
num_channels(grpc_server * server)642 static int num_channels(grpc_server* server) {
643   channel_data* chand;
644   int n = 0;
645   for (chand = server->root_channel_data.next;
646        chand != &server->root_channel_data; chand = chand->next) {
647     n++;
648   }
649   return n;
650 }
651 
kill_pending_work_locked(grpc_server * server,grpc_error * error)652 static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
653   if (server->started) {
654     request_matcher_kill_requests(server, &server->unregistered_request_matcher,
655                                   GRPC_ERROR_REF(error));
656     request_matcher_zombify_all_pending_calls(
657         &server->unregistered_request_matcher);
658     for (registered_method* rm = server->registered_methods; rm;
659          rm = rm->next) {
660       request_matcher_kill_requests(server, &rm->matcher,
661                                     GRPC_ERROR_REF(error));
662       request_matcher_zombify_all_pending_calls(&rm->matcher);
663     }
664   }
665   GRPC_ERROR_UNREF(error);
666 }
667 
maybe_finish_shutdown(grpc_server * server)668 static void maybe_finish_shutdown(grpc_server* server) {
669   size_t i;
670   if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
671     return;
672   }
673 
674   kill_pending_work_locked(
675       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
676 
677   if (server->root_channel_data.next != &server->root_channel_data ||
678       server->listeners_destroyed < num_listeners(server)) {
679     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
680                                   server->last_shutdown_message_time),
681                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
682       server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
683       gpr_log(GPR_DEBUG,
684               "Waiting for %d channels and %d/%d listeners to be destroyed"
685               " before shutting down server",
686               num_channels(server),
687               num_listeners(server) - server->listeners_destroyed,
688               num_listeners(server));
689     }
690     return;
691   }
692   server->shutdown_published = 1;
693   for (i = 0; i < server->num_shutdown_tags; i++) {
694     server_ref(server);
695     grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
696                    GRPC_ERROR_NONE, done_shutdown_event, server,
697                    &server->shutdown_tags[i].completion);
698   }
699 }
700 
server_on_recv_initial_metadata(void * ptr,grpc_error * error)701 static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
702   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
703   call_data* calld = static_cast<call_data*>(elem->call_data);
704   grpc_millis op_deadline;
705 
706   if (error == GRPC_ERROR_NONE) {
707     GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
708     GPR_ASSERT(calld->recv_initial_metadata->idx.named.authority != nullptr);
709     calld->path = grpc_slice_ref_internal(
710         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
711     calld->host = grpc_slice_ref_internal(
712         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
713     calld->path_set = true;
714     calld->host_set = true;
715     grpc_metadata_batch_remove(calld->recv_initial_metadata,
716                                calld->recv_initial_metadata->idx.named.path);
717     grpc_metadata_batch_remove(
718         calld->recv_initial_metadata,
719         calld->recv_initial_metadata->idx.named.authority);
720   } else {
721     GRPC_ERROR_REF(error);
722   }
723   op_deadline = calld->recv_initial_metadata->deadline;
724   if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
725     calld->deadline = op_deadline;
726   }
727   if (calld->host_set && calld->path_set) {
728     /* do nothing */
729   } else {
730     grpc_error* src_error = error;
731     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
732         "Missing :authority or :path", &error, 1);
733     GRPC_ERROR_UNREF(src_error);
734   }
735 
736   GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
737 }
738 
server_recv_trailing_metadata_ready(void * user_data,grpc_error * err)739 static void server_recv_trailing_metadata_ready(void* user_data,
740                                                 grpc_error* err) {
741   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
742   call_data* calld = static_cast<call_data*>(elem->call_data);
743   err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
744   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
745 }
746 
server_mutate_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)747 static void server_mutate_op(grpc_call_element* elem,
748                              grpc_transport_stream_op_batch* op) {
749   call_data* calld = static_cast<call_data*>(elem->call_data);
750 
751   if (op->recv_initial_metadata) {
752     GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
753     calld->recv_initial_metadata =
754         op->payload->recv_initial_metadata.recv_initial_metadata;
755     calld->on_done_recv_initial_metadata =
756         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
757     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
758         &calld->server_on_recv_initial_metadata;
759     op->payload->recv_initial_metadata.recv_flags =
760         &calld->recv_initial_metadata_flags;
761   }
762   if (op->recv_trailing_metadata) {
763     calld->original_recv_trailing_metadata_ready =
764         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
765     op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
766         &calld->recv_trailing_metadata_ready;
767   }
768 }
769 
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)770 static void server_start_transport_stream_op_batch(
771     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
772   server_mutate_op(elem, op);
773   grpc_call_next_op(elem, op);
774 }
775 
got_initial_metadata(void * ptr,grpc_error * error)776 static void got_initial_metadata(void* ptr, grpc_error* error) {
777   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
778   call_data* calld = static_cast<call_data*>(elem->call_data);
779   if (error == GRPC_ERROR_NONE) {
780     start_new_rpc(elem);
781   } else {
782     if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
783       GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
784                         grpc_schedule_on_exec_ctx);
785       GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
786     } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
787       /* zombied call will be destroyed when it's removed from the pending
788          queue... later */
789     }
790   }
791 }
792 
accept_stream(void * cd,grpc_transport * transport,const void * transport_server_data)793 static void accept_stream(void* cd, grpc_transport* transport,
794                           const void* transport_server_data) {
795   channel_data* chand = static_cast<channel_data*>(cd);
796   /* create a call */
797   grpc_call_create_args args;
798   memset(&args, 0, sizeof(args));
799   args.channel = chand->channel;
800   args.server_transport_data = transport_server_data;
801   args.send_deadline = GRPC_MILLIS_INF_FUTURE;
802   args.server = chand->server;
803   grpc_call* call;
804   grpc_error* error = grpc_call_create(&args, &call);
805   grpc_call_element* elem =
806       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
807   if (error != GRPC_ERROR_NONE) {
808     got_initial_metadata(elem, error);
809     GRPC_ERROR_UNREF(error);
810     return;
811   }
812   call_data* calld = static_cast<call_data*>(elem->call_data);
813   grpc_op op;
814   memset(&op, 0, sizeof(op));
815   op.op = GRPC_OP_RECV_INITIAL_METADATA;
816   op.data.recv_initial_metadata.recv_initial_metadata =
817       &calld->initial_metadata;
818   GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
819                     grpc_schedule_on_exec_ctx);
820   grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
821 }
822 
channel_connectivity_changed(void * cd,grpc_error * error)823 static void channel_connectivity_changed(void* cd, grpc_error* error) {
824   channel_data* chand = static_cast<channel_data*>(cd);
825   grpc_server* server = chand->server;
826   if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
827     grpc_transport_op* op = grpc_make_transport_op(nullptr);
828     op->on_connectivity_state_change = &chand->channel_connectivity_changed;
829     op->connectivity_state = &chand->connectivity_state;
830     grpc_channel_next_op(grpc_channel_stack_element(
831                              grpc_channel_get_channel_stack(chand->channel), 0),
832                          op);
833   } else {
834     gpr_mu_lock(&server->mu_global);
835     destroy_channel(chand, GRPC_ERROR_REF(error));
836     gpr_mu_unlock(&server->mu_global);
837     GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
838   }
839 }
840 
init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)841 static grpc_error* init_call_elem(grpc_call_element* elem,
842                                   const grpc_call_element_args* args) {
843   call_data* calld = static_cast<call_data*>(elem->call_data);
844   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
845   memset(calld, 0, sizeof(call_data));
846   calld->deadline = GRPC_MILLIS_INF_FUTURE;
847   calld->call = grpc_call_from_top_element(elem);
848 
849   GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
850                     server_on_recv_initial_metadata, elem,
851                     grpc_schedule_on_exec_ctx);
852   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
853                     server_recv_trailing_metadata_ready, elem,
854                     grpc_schedule_on_exec_ctx);
855   server_ref(chand->server);
856   return GRPC_ERROR_NONE;
857 }
858 
destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * ignored)859 static void destroy_call_elem(grpc_call_element* elem,
860                               const grpc_call_final_info* final_info,
861                               grpc_closure* ignored) {
862   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
863   call_data* calld = static_cast<call_data*>(elem->call_data);
864 
865   GPR_ASSERT(calld->state != PENDING);
866   GRPC_ERROR_UNREF(calld->error);
867   if (calld->host_set) {
868     grpc_slice_unref_internal(calld->host);
869   }
870   if (calld->path_set) {
871     grpc_slice_unref_internal(calld->path);
872   }
873   grpc_metadata_array_destroy(&calld->initial_metadata);
874   grpc_byte_buffer_destroy(calld->payload);
875 
876   server_unref(chand->server);
877 }
878 
init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)879 static grpc_error* init_channel_elem(grpc_channel_element* elem,
880                                      grpc_channel_element_args* args) {
881   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
882   GPR_ASSERT(args->is_first);
883   GPR_ASSERT(!args->is_last);
884   chand->server = nullptr;
885   chand->channel = nullptr;
886   chand->next = chand->prev = chand;
887   chand->registered_methods = nullptr;
888   chand->connectivity_state = GRPC_CHANNEL_IDLE;
889   GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
890                     channel_connectivity_changed, chand,
891                     grpc_schedule_on_exec_ctx);
892   return GRPC_ERROR_NONE;
893 }
894 
destroy_channel_elem(grpc_channel_element * elem)895 static void destroy_channel_elem(grpc_channel_element* elem) {
896   size_t i;
897   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
898   if (chand->registered_methods) {
899     for (i = 0; i < chand->registered_method_slots; i++) {
900       grpc_slice_unref_internal(chand->registered_methods[i].method);
901       if (chand->registered_methods[i].has_host) {
902         grpc_slice_unref_internal(chand->registered_methods[i].host);
903       }
904     }
905     gpr_free(chand->registered_methods);
906   }
907   if (chand->server) {
908     gpr_mu_lock(&chand->server->mu_global);
909     chand->next->prev = chand->prev;
910     chand->prev->next = chand->next;
911     chand->next = chand->prev = chand;
912     maybe_finish_shutdown(chand->server);
913     gpr_mu_unlock(&chand->server->mu_global);
914     server_unref(chand->server);
915   }
916 }
917 
918 const grpc_channel_filter grpc_server_top_filter = {
919     server_start_transport_stream_op_batch,
920     grpc_channel_next_op,
921     sizeof(call_data),
922     init_call_elem,
923     grpc_call_stack_ignore_set_pollset_or_pollset_set,
924     destroy_call_elem,
925     sizeof(channel_data),
926     init_channel_elem,
927     destroy_channel_elem,
928     grpc_channel_next_get_info,
929     "server",
930 };
931 
register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)932 static void register_completion_queue(grpc_server* server,
933                                       grpc_completion_queue* cq,
934                                       void* reserved) {
935   size_t i, n;
936   GPR_ASSERT(!reserved);
937   for (i = 0; i < server->cq_count; i++) {
938     if (server->cqs[i] == cq) return;
939   }
940 
941   GRPC_CQ_INTERNAL_REF(cq, "server");
942   n = server->cq_count++;
943   server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
944       server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
945   server->cqs[n] = cq;
946 }
947 
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)948 void grpc_server_register_completion_queue(grpc_server* server,
949                                            grpc_completion_queue* cq,
950                                            void* reserved) {
951   GRPC_API_TRACE(
952       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
953       (server, cq, reserved));
954 
955   if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
956     gpr_log(GPR_INFO,
957             "Completion queue which is not of type GRPC_CQ_NEXT is being "
958             "registered as a server-completion-queue");
959     /* Ideally we should log an error and abort but ruby-wrapped-language API
960        calls grpc_completion_queue_pluck() on server completion queues */
961   }
962 
963   register_completion_queue(server, cq, reserved);
964 }
965 
grpc_server_create(const grpc_channel_args * args,void * reserved)966 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
967   grpc_core::ExecCtx exec_ctx;
968   GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
969 
970   grpc_server* server =
971       static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
972 
973   gpr_mu_init(&server->mu_global);
974   gpr_mu_init(&server->mu_call);
975   gpr_cv_init(&server->starting_cv);
976 
977   /* decremented by grpc_server_destroy */
978   gpr_ref_init(&server->internal_refcount, 1);
979   server->root_channel_data.next = server->root_channel_data.prev =
980       &server->root_channel_data;
981 
982   server->channel_args = grpc_channel_args_copy(args);
983 
984   const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
985   if (grpc_channel_arg_get_bool(arg, false)) {
986     arg = grpc_channel_args_find(args,
987                                  GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
988     size_t trace_events_per_node =
989         grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
990     server->channelz_server =
991         grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
992             trace_events_per_node);
993     server->channelz_server->AddTraceEvent(
994         grpc_core::channelz::ChannelTrace::Severity::Info,
995         grpc_slice_from_static_string("Server created"));
996   }
997 
998   return server;
999 }
1000 
streq(const char * a,const char * b)1001 static int streq(const char* a, const char* b) {
1002   if (a == nullptr && b == nullptr) return 1;
1003   if (a == nullptr) return 0;
1004   if (b == nullptr) return 0;
1005   return 0 == strcmp(a, b);
1006 }
1007 
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1008 void* grpc_server_register_method(
1009     grpc_server* server, const char* method, const char* host,
1010     grpc_server_register_method_payload_handling payload_handling,
1011     uint32_t flags) {
1012   registered_method* m;
1013   GRPC_API_TRACE(
1014       "grpc_server_register_method(server=%p, method=%s, host=%s, "
1015       "flags=0x%08x)",
1016       4, (server, method, host, flags));
1017   if (!method) {
1018     gpr_log(GPR_ERROR,
1019             "grpc_server_register_method method string cannot be NULL");
1020     return nullptr;
1021   }
1022   for (m = server->registered_methods; m; m = m->next) {
1023     if (streq(m->method, method) && streq(m->host, host)) {
1024       gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1025               host ? host : "*");
1026       return nullptr;
1027     }
1028   }
1029   if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1030     gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1031             flags);
1032     return nullptr;
1033   }
1034   m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));
1035   m->method = gpr_strdup(method);
1036   m->host = gpr_strdup(host);
1037   m->next = server->registered_methods;
1038   m->payload_handling = payload_handling;
1039   m->flags = flags;
1040   server->registered_methods = m;
1041   return m;
1042 }
1043 
start_listeners(void * s,grpc_error * error)1044 static void start_listeners(void* s, grpc_error* error) {
1045   grpc_server* server = static_cast<grpc_server*>(s);
1046   for (listener* l = server->listeners; l; l = l->next) {
1047     l->start(server, l->arg, server->pollsets, server->pollset_count);
1048   }
1049 
1050   gpr_mu_lock(&server->mu_global);
1051   server->starting = false;
1052   gpr_cv_signal(&server->starting_cv);
1053   gpr_mu_unlock(&server->mu_global);
1054 
1055   server_unref(server);
1056 }
1057 
grpc_server_start(grpc_server * server)1058 void grpc_server_start(grpc_server* server) {
1059   size_t i;
1060   grpc_core::ExecCtx exec_ctx;
1061 
1062   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1063 
1064   server->started = true;
1065   server->pollset_count = 0;
1066   server->pollsets = static_cast<grpc_pollset**>(
1067       gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
1068   for (i = 0; i < server->cq_count; i++) {
1069     if (grpc_cq_can_listen(server->cqs[i])) {
1070       server->pollsets[server->pollset_count++] =
1071           grpc_cq_pollset(server->cqs[i]);
1072     }
1073   }
1074   request_matcher_init(&server->unregistered_request_matcher, server);
1075   for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
1076     request_matcher_init(&rm->matcher, server);
1077   }
1078 
1079   server_ref(server);
1080   server->starting = true;
1081   GRPC_CLOSURE_SCHED(
1082       GRPC_CLOSURE_CREATE(start_listeners, server,
1083                           grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
1084       GRPC_ERROR_NONE);
1085 }
1086 
grpc_server_get_pollsets(grpc_server * server,grpc_pollset *** pollsets,size_t * pollset_count)1087 void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
1088                               size_t* pollset_count) {
1089   *pollset_count = server->pollset_count;
1090   *pollsets = server->pollsets;
1091 }
1092 
grpc_server_setup_transport(grpc_server * s,grpc_transport * transport,grpc_pollset * accepting_pollset,const grpc_channel_args * args)1093 void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
1094                                  grpc_pollset* accepting_pollset,
1095                                  const grpc_channel_args* args) {
1096   size_t num_registered_methods;
1097   size_t alloc;
1098   registered_method* rm;
1099   channel_registered_method* crm;
1100   grpc_channel* channel;
1101   channel_data* chand;
1102   uint32_t hash;
1103   size_t slots;
1104   uint32_t probes;
1105   uint32_t max_probes = 0;
1106   grpc_transport_op* op = nullptr;
1107 
1108   channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
1109   chand = static_cast<channel_data*>(
1110       grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1111           ->channel_data);
1112   chand->server = s;
1113   server_ref(s);
1114   chand->channel = channel;
1115 
1116   size_t cq_idx;
1117   for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
1118     if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1119   }
1120   if (cq_idx == s->cq_count) {
1121     /* completion queue not found: pick a random one to publish new calls to */
1122     cq_idx = static_cast<size_t>(rand()) % s->cq_count;
1123   }
1124   chand->cq_idx = cq_idx;
1125 
1126   num_registered_methods = 0;
1127   for (rm = s->registered_methods; rm; rm = rm->next) {
1128     num_registered_methods++;
1129   }
1130   /* build a lookup table phrased in terms of mdstr's in this channels context
1131      to quickly find registered methods */
1132   if (num_registered_methods > 0) {
1133     slots = 2 * num_registered_methods;
1134     alloc = sizeof(channel_registered_method) * slots;
1135     chand->registered_methods =
1136         static_cast<channel_registered_method*>(gpr_zalloc(alloc));
1137     for (rm = s->registered_methods; rm; rm = rm->next) {
1138       grpc_slice host;
1139       bool has_host;
1140       grpc_slice method;
1141       if (rm->host != nullptr) {
1142         host = grpc_slice_intern(grpc_slice_from_static_string(rm->host));
1143         has_host = true;
1144       } else {
1145         has_host = false;
1146       }
1147       method = grpc_slice_intern(grpc_slice_from_static_string(rm->method));
1148       hash = GRPC_MDSTR_KV_HASH(has_host ? grpc_slice_hash(host) : 0,
1149                                 grpc_slice_hash(method));
1150       for (probes = 0; chand->registered_methods[(hash + probes) % slots]
1151                            .server_registered_method != nullptr;
1152            probes++)
1153         ;
1154       if (probes > max_probes) max_probes = probes;
1155       crm = &chand->registered_methods[(hash + probes) % slots];
1156       crm->server_registered_method = rm;
1157       crm->flags = rm->flags;
1158       crm->has_host = has_host;
1159       if (has_host) {
1160         crm->host = host;
1161       }
1162       crm->method = method;
1163     }
1164     GPR_ASSERT(slots <= UINT32_MAX);
1165     chand->registered_method_slots = static_cast<uint32_t>(slots);
1166     chand->registered_method_max_probes = max_probes;
1167   }
1168 
1169   gpr_mu_lock(&s->mu_global);
1170   chand->next = &s->root_channel_data;
1171   chand->prev = chand->next->prev;
1172   chand->next->prev = chand->prev->next = chand;
1173   gpr_mu_unlock(&s->mu_global);
1174 
1175   GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
1176   op = grpc_make_transport_op(nullptr);
1177   op->set_accept_stream = true;
1178   op->set_accept_stream_fn = accept_stream;
1179   op->set_accept_stream_user_data = chand;
1180   op->on_connectivity_state_change = &chand->channel_connectivity_changed;
1181   op->connectivity_state = &chand->connectivity_state;
1182   if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
1183     op->disconnect_with_error =
1184         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1185   }
1186   grpc_transport_perform_op(transport, op);
1187 }
1188 
done_published_shutdown(void * done_arg,grpc_cq_completion * storage)1189 void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
1190   (void)done_arg;
1191   gpr_free(storage);
1192 }
1193 
listener_destroy_done(void * s,grpc_error * error)1194 static void listener_destroy_done(void* s, grpc_error* error) {
1195   grpc_server* server = static_cast<grpc_server*>(s);
1196   gpr_mu_lock(&server->mu_global);
1197   server->listeners_destroyed++;
1198   maybe_finish_shutdown(server);
1199   gpr_mu_unlock(&server->mu_global);
1200 }
1201 
1202 /*
1203   - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1204     grpc_server_request_call and grpc_server_request_registered call will now be
1205     cancelled). See 'kill_pending_work_locked()'
1206 
1207   - Shuts down the listeners (i.e the server will no longer listen on the port
1208     for new incoming channels).
1209 
1210   - Iterates through all channels on the server and sends shutdown msg (see
1211     'channel_broadcaster_shutdown()' for details) to the clients via the
1212     transport layer. The transport layer then guarantees the following:
1213      -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1214      -- If the server has outstanding calls that are in the process, the
1215         connection is NOT closed until the server is done with all those calls
1216      -- Once, there are no more calls in progress, the channel is closed
1217  */
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1218 void grpc_server_shutdown_and_notify(grpc_server* server,
1219                                      grpc_completion_queue* cq, void* tag) {
1220   listener* l;
1221   shutdown_tag* sdt;
1222   channel_broadcaster broadcaster;
1223   grpc_core::ExecCtx exec_ctx;
1224 
1225   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1226                  (server, cq, tag));
1227 
1228   /* wait for startup to be finished: locks mu_global */
1229   gpr_mu_lock(&server->mu_global);
1230   while (server->starting) {
1231     gpr_cv_wait(&server->starting_cv, &server->mu_global,
1232                 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1233   }
1234 
1235   /* stay locked, and gather up some stuff to do */
1236   GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1237   if (server->shutdown_published) {
1238     grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
1239                    static_cast<grpc_cq_completion*>(
1240                        gpr_malloc(sizeof(grpc_cq_completion))));
1241     gpr_mu_unlock(&server->mu_global);
1242     return;
1243   }
1244   server->shutdown_tags = static_cast<shutdown_tag*>(
1245       gpr_realloc(server->shutdown_tags,
1246                   sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
1247   sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1248   sdt->tag = tag;
1249   sdt->cq = cq;
1250   if (gpr_atm_acq_load(&server->shutdown_flag)) {
1251     gpr_mu_unlock(&server->mu_global);
1252     return;
1253   }
1254 
1255   server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1256 
1257   channel_broadcaster_init(server, &broadcaster);
1258 
1259   gpr_atm_rel_store(&server->shutdown_flag, 1);
1260 
1261   /* collect all unregistered then registered calls */
1262   gpr_mu_lock(&server->mu_call);
1263   kill_pending_work_locked(
1264       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1265   gpr_mu_unlock(&server->mu_call);
1266 
1267   maybe_finish_shutdown(server);
1268   gpr_mu_unlock(&server->mu_global);
1269 
1270   /* Shutdown listeners */
1271   for (l = server->listeners; l; l = l->next) {
1272     GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
1273                       grpc_schedule_on_exec_ctx);
1274     l->destroy(server, l->arg, &l->destroy_done);
1275   }
1276 
1277   channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
1278                                GRPC_ERROR_NONE);
1279 }
1280 
grpc_server_cancel_all_calls(grpc_server * server)1281 void grpc_server_cancel_all_calls(grpc_server* server) {
1282   channel_broadcaster broadcaster;
1283   grpc_core::ExecCtx exec_ctx;
1284 
1285   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1286 
1287   gpr_mu_lock(&server->mu_global);
1288   channel_broadcaster_init(server, &broadcaster);
1289   gpr_mu_unlock(&server->mu_global);
1290 
1291   channel_broadcaster_shutdown(
1292       &broadcaster, false /* send_goaway */,
1293       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1294 }
1295 
grpc_server_destroy(grpc_server * server)1296 void grpc_server_destroy(grpc_server* server) {
1297   listener* l;
1298   grpc_core::ExecCtx exec_ctx;
1299 
1300   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1301 
1302   gpr_mu_lock(&server->mu_global);
1303   GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1304   GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1305 
1306   while (server->listeners) {
1307     l = server->listeners;
1308     server->listeners = l->next;
1309     gpr_free(l);
1310   }
1311 
1312   gpr_mu_unlock(&server->mu_global);
1313 
1314   server_unref(server);
1315 }
1316 
grpc_server_add_listener(grpc_server * server,void * arg,void (* start)(grpc_server * server,void * arg,grpc_pollset ** pollsets,size_t pollset_count),void (* destroy)(grpc_server * server,void * arg,grpc_closure * on_done))1317 void grpc_server_add_listener(grpc_server* server, void* arg,
1318                               void (*start)(grpc_server* server, void* arg,
1319                                             grpc_pollset** pollsets,
1320                                             size_t pollset_count),
1321                               void (*destroy)(grpc_server* server, void* arg,
1322                                               grpc_closure* on_done)) {
1323   listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
1324   l->arg = arg;
1325   l->start = start;
1326   l->destroy = destroy;
1327   l->next = server->listeners;
1328   server->listeners = l;
1329 }
1330 
queue_call_request(grpc_server * server,size_t cq_idx,requested_call * rc)1331 static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1332                                           requested_call* rc) {
1333   call_data* calld = nullptr;
1334   request_matcher* rm = nullptr;
1335   if (gpr_atm_acq_load(&server->shutdown_flag)) {
1336     fail_call(server, cq_idx, rc,
1337               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1338     return GRPC_CALL_OK;
1339   }
1340   switch (rc->type) {
1341     case BATCH_CALL:
1342       rm = &server->unregistered_request_matcher;
1343       break;
1344     case REGISTERED_CALL:
1345       rm = &rc->data.registered.method->matcher;
1346       break;
1347   }
1348   if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
1349     /* this was the first queued request: we need to lock and start
1350        matching calls */
1351     gpr_mu_lock(&server->mu_call);
1352     while ((calld = rm->pending_head) != nullptr) {
1353       rc = reinterpret_cast<requested_call*>(
1354           gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
1355       if (rc == nullptr) break;
1356       rm->pending_head = calld->pending_next;
1357       gpr_mu_unlock(&server->mu_call);
1358       if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
1359         // Zombied Call
1360         GRPC_CLOSURE_INIT(
1361             &calld->kill_zombie_closure, kill_zombie,
1362             grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
1363             grpc_schedule_on_exec_ctx);
1364         GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
1365       } else {
1366         publish_call(server, calld, cq_idx, rc);
1367       }
1368       gpr_mu_lock(&server->mu_call);
1369     }
1370     gpr_mu_unlock(&server->mu_call);
1371   }
1372   return GRPC_CALL_OK;
1373 }
1374 
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * initial_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1375 grpc_call_error grpc_server_request_call(
1376     grpc_server* server, grpc_call** call, grpc_call_details* details,
1377     grpc_metadata_array* initial_metadata,
1378     grpc_completion_queue* cq_bound_to_call,
1379     grpc_completion_queue* cq_for_notification, void* tag) {
1380   grpc_call_error error;
1381   grpc_core::ExecCtx exec_ctx;
1382   requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
1383   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1384   GRPC_API_TRACE(
1385       "grpc_server_request_call("
1386       "server=%p, call=%p, details=%p, initial_metadata=%p, "
1387       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1388       7,
1389       (server, call, details, initial_metadata, cq_bound_to_call,
1390        cq_for_notification, tag));
1391   size_t cq_idx;
1392   for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
1393     if (server->cqs[cq_idx] == cq_for_notification) {
1394       break;
1395     }
1396   }
1397   if (cq_idx == server->cq_count) {
1398     gpr_free(rc);
1399     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1400     goto done;
1401   }
1402   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1403     gpr_free(rc);
1404     error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1405     goto done;
1406   }
1407   details->reserved = nullptr;
1408   rc->cq_idx = cq_idx;
1409   rc->type = BATCH_CALL;
1410   rc->server = server;
1411   rc->tag = tag;
1412   rc->cq_bound_to_call = cq_bound_to_call;
1413   rc->call = call;
1414   rc->data.batch.details = details;
1415   rc->initial_metadata = initial_metadata;
1416   error = queue_call_request(server, cq_idx, rc);
1417 done:
1418 
1419   return error;
1420 }
1421 
grpc_server_request_registered_call(grpc_server * server,void * rmp,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * initial_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1422 grpc_call_error grpc_server_request_registered_call(
1423     grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1424     grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1425     grpc_completion_queue* cq_bound_to_call,
1426     grpc_completion_queue* cq_for_notification, void* tag) {
1427   grpc_call_error error;
1428   grpc_core::ExecCtx exec_ctx;
1429   requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
1430   registered_method* rm = static_cast<registered_method*>(rmp);
1431   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1432   GRPC_API_TRACE(
1433       "grpc_server_request_registered_call("
1434       "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1435       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1436       "tag=%p)",
1437       9,
1438       (server, rmp, call, deadline, initial_metadata, optional_payload,
1439        cq_bound_to_call, cq_for_notification, tag));
1440 
1441   size_t cq_idx;
1442   for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
1443     if (server->cqs[cq_idx] == cq_for_notification) {
1444       break;
1445     }
1446   }
1447   if (cq_idx == server->cq_count) {
1448     gpr_free(rc);
1449     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1450     goto done;
1451   }
1452   if ((optional_payload == nullptr) !=
1453       (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
1454     gpr_free(rc);
1455     error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1456     goto done;
1457   }
1458   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1459     gpr_free(rc);
1460     error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1461     goto done;
1462   }
1463   rc->cq_idx = cq_idx;
1464   rc->type = REGISTERED_CALL;
1465   rc->server = server;
1466   rc->tag = tag;
1467   rc->cq_bound_to_call = cq_bound_to_call;
1468   rc->call = call;
1469   rc->data.registered.method = rm;
1470   rc->data.registered.deadline = deadline;
1471   rc->initial_metadata = initial_metadata;
1472   rc->data.registered.optional_payload = optional_payload;
1473   error = queue_call_request(server, cq_idx, rc);
1474 done:
1475 
1476   return error;
1477 }
1478 
fail_call(grpc_server * server,size_t cq_idx,requested_call * rc,grpc_error * error)1479 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1480                       grpc_error* error) {
1481   *rc->call = nullptr;
1482   rc->initial_metadata->count = 0;
1483   GPR_ASSERT(error != GRPC_ERROR_NONE);
1484 
1485   grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1486                  &rc->completion);
1487 }
1488 
grpc_server_get_channel_args(grpc_server * server)1489 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
1490   return server->channel_args;
1491 }
1492 
grpc_server_has_open_connections(grpc_server * server)1493 int grpc_server_has_open_connections(grpc_server* server) {
1494   int r;
1495   gpr_mu_lock(&server->mu_global);
1496   r = server->root_channel_data.next != &server->root_channel_data;
1497   gpr_mu_unlock(&server->mu_global);
1498   return r;
1499 }
1500 
grpc_server_get_channelz_node(grpc_server * server)1501 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
1502     grpc_server* server) {
1503   if (server == nullptr) {
1504     return nullptr;
1505   }
1506   return server->channelz_server.get();
1507 }
1508