1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
20 #ifndef _GNU_SOURCE
21 #define _GNU_SOURCE
22 #endif
23
24 #include <grpc/support/port_platform.h>
25
26 #include "src/core/lib/iomgr/port.h"
27
28 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
29
30 #include "src/core/lib/iomgr/tcp_server.h"
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <string.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40 #include <unistd.h>
41
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/string_util.h>
45 #include <grpc/support/sync.h>
46 #include <grpc/support/time.h>
47
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/gpr/string.h"
50 #include "src/core/lib/iomgr/resolve_address.h"
51 #include "src/core/lib/iomgr/sockaddr.h"
52 #include "src/core/lib/iomgr/sockaddr_utils.h"
53 #include "src/core/lib/iomgr/socket_utils_posix.h"
54 #include "src/core/lib/iomgr/tcp_posix.h"
55 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
56 #include "src/core/lib/iomgr/unix_sockets_posix.h"
57
tcp_server_create(grpc_closure * shutdown_complete,const grpc_channel_args * args,grpc_tcp_server ** server)58 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
59 const grpc_channel_args* args,
60 grpc_tcp_server** server) {
61 grpc_tcp_server* s =
62 static_cast<grpc_tcp_server*>(gpr_zalloc(sizeof(grpc_tcp_server)));
63 s->so_reuseport = grpc_is_socket_reuse_port_supported();
64 s->expand_wildcard_addrs = false;
65 for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
66 if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
67 if (args->args[i].type == GRPC_ARG_INTEGER) {
68 s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
69 (args->args[i].value.integer != 0);
70 } else {
71 gpr_free(s);
72 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
73 " must be an integer");
74 }
75 } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
76 if (args->args[i].type == GRPC_ARG_INTEGER) {
77 s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
78 } else {
79 gpr_free(s);
80 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
81 GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
82 }
83 }
84 }
85 gpr_ref_init(&s->refs, 1);
86 gpr_mu_init(&s->mu);
87 s->active_ports = 0;
88 s->destroyed_ports = 0;
89 s->shutdown = false;
90 s->shutdown_starting.head = nullptr;
91 s->shutdown_starting.tail = nullptr;
92 s->shutdown_complete = shutdown_complete;
93 s->on_accept_cb = nullptr;
94 s->on_accept_cb_arg = nullptr;
95 s->head = nullptr;
96 s->tail = nullptr;
97 s->nports = 0;
98 s->channel_args = grpc_channel_args_copy(args);
99 gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
100 *server = s;
101 return GRPC_ERROR_NONE;
102 }
103
finish_shutdown(grpc_tcp_server * s)104 static void finish_shutdown(grpc_tcp_server* s) {
105 gpr_mu_lock(&s->mu);
106 GPR_ASSERT(s->shutdown);
107 gpr_mu_unlock(&s->mu);
108 if (s->shutdown_complete != nullptr) {
109 GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
110 }
111
112 gpr_mu_destroy(&s->mu);
113
114 while (s->head) {
115 grpc_tcp_listener* sp = s->head;
116 s->head = sp->next;
117 gpr_free(sp);
118 }
119 grpc_channel_args_destroy(s->channel_args);
120
121 gpr_free(s);
122 }
123
destroyed_port(void * server,grpc_error * error)124 static void destroyed_port(void* server, grpc_error* error) {
125 grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
126 gpr_mu_lock(&s->mu);
127 s->destroyed_ports++;
128 if (s->destroyed_ports == s->nports) {
129 gpr_mu_unlock(&s->mu);
130 finish_shutdown(s);
131 } else {
132 GPR_ASSERT(s->destroyed_ports < s->nports);
133 gpr_mu_unlock(&s->mu);
134 }
135 }
136
137 /* called when all listening endpoints have been shutdown, so no further
138 events will be received on them - at this point it's safe to destroy
139 things */
deactivated_all_ports(grpc_tcp_server * s)140 static void deactivated_all_ports(grpc_tcp_server* s) {
141 /* delete ALL the things */
142 gpr_mu_lock(&s->mu);
143
144 GPR_ASSERT(s->shutdown);
145
146 if (s->head) {
147 grpc_tcp_listener* sp;
148 for (sp = s->head; sp; sp = sp->next) {
149 grpc_unlink_if_unix_domain_socket(&sp->addr);
150 GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
151 grpc_schedule_on_exec_ctx);
152 grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
153 "tcp_listener_shutdown");
154 }
155 gpr_mu_unlock(&s->mu);
156 } else {
157 gpr_mu_unlock(&s->mu);
158 finish_shutdown(s);
159 }
160 }
161
tcp_server_destroy(grpc_tcp_server * s)162 static void tcp_server_destroy(grpc_tcp_server* s) {
163 gpr_mu_lock(&s->mu);
164
165 GPR_ASSERT(!s->shutdown);
166 s->shutdown = true;
167
168 /* shutdown all fd's */
169 if (s->active_ports) {
170 grpc_tcp_listener* sp;
171 for (sp = s->head; sp; sp = sp->next) {
172 grpc_fd_shutdown(
173 sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
174 }
175 gpr_mu_unlock(&s->mu);
176 } else {
177 gpr_mu_unlock(&s->mu);
178 deactivated_all_ports(s);
179 }
180 }
181
182 /* event manager callback when reads are ready */
on_read(void * arg,grpc_error * err)183 static void on_read(void* arg, grpc_error* err) {
184 grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
185 grpc_pollset* read_notifier_pollset;
186 if (err != GRPC_ERROR_NONE) {
187 goto error;
188 }
189
190 /* loop until accept4 returns EAGAIN, and then re-arm notification */
191 for (;;) {
192 grpc_resolved_address addr;
193 char* addr_str;
194 char* name;
195 memset(&addr, 0, sizeof(addr));
196 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
197 /* Note: If we ever decide to return this address to the user, remember to
198 strip off the ::ffff:0.0.0.0/96 prefix first. */
199 int fd = grpc_accept4(sp->fd, &addr, 1, 1);
200 if (fd < 0) {
201 switch (errno) {
202 case EINTR:
203 continue;
204 case EAGAIN:
205 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
206 return;
207 default:
208 gpr_mu_lock(&sp->server->mu);
209 if (!sp->server->shutdown_listeners) {
210 gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
211 } else {
212 /* if we have shutdown listeners, accept4 could fail, and we
213 needn't notify users */
214 }
215 gpr_mu_unlock(&sp->server->mu);
216 goto error;
217 }
218 }
219
220 grpc_set_socket_no_sigpipe_if_possible(fd);
221
222 addr_str = grpc_sockaddr_to_uri(&addr);
223 gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
224
225 if (grpc_tcp_trace.enabled()) {
226 gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
227 }
228
229 grpc_fd* fdobj = grpc_fd_create(fd, name, true);
230
231 read_notifier_pollset =
232 sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
233 &sp->server->next_pollset_to_assign, 1)) %
234 sp->server->pollset_count];
235
236 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
237
238 // Create acceptor.
239 grpc_tcp_server_acceptor* acceptor =
240 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
241 acceptor->from_server = sp->server;
242 acceptor->port_index = sp->port_index;
243 acceptor->fd_index = sp->fd_index;
244
245 sp->server->on_accept_cb(
246 sp->server->on_accept_cb_arg,
247 grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
248 read_notifier_pollset, acceptor);
249
250 gpr_free(name);
251 gpr_free(addr_str);
252 }
253
254 GPR_UNREACHABLE_CODE(return );
255
256 error:
257 gpr_mu_lock(&sp->server->mu);
258 if (0 == --sp->server->active_ports && sp->server->shutdown) {
259 gpr_mu_unlock(&sp->server->mu);
260 deactivated_all_ports(sp->server);
261 } else {
262 gpr_mu_unlock(&sp->server->mu);
263 }
264 }
265
266 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)267 static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
268 unsigned port_index,
269 int requested_port,
270 int* out_port) {
271 grpc_resolved_address wild4;
272 grpc_resolved_address wild6;
273 unsigned fd_index = 0;
274 grpc_dualstack_mode dsmode;
275 grpc_tcp_listener* sp = nullptr;
276 grpc_tcp_listener* sp2 = nullptr;
277 grpc_error* v6_err = GRPC_ERROR_NONE;
278 grpc_error* v4_err = GRPC_ERROR_NONE;
279 *out_port = -1;
280
281 if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
282 return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
283 out_port);
284 }
285
286 grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
287 /* Try listening on IPv6 first. */
288 if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
289 &dsmode, &sp)) == GRPC_ERROR_NONE) {
290 ++fd_index;
291 requested_port = *out_port = sp->port;
292 if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
293 return GRPC_ERROR_NONE;
294 }
295 }
296 /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
297 grpc_sockaddr_set_port(&wild4, requested_port);
298 if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
299 &dsmode, &sp2)) == GRPC_ERROR_NONE) {
300 *out_port = sp2->port;
301 if (sp != nullptr) {
302 sp2->is_sibling = 1;
303 sp->sibling = sp2;
304 }
305 }
306 if (*out_port > 0) {
307 if (v6_err != GRPC_ERROR_NONE) {
308 gpr_log(GPR_INFO,
309 "Failed to add :: listener, "
310 "the environment may not support IPv6: %s",
311 grpc_error_string(v6_err));
312 GRPC_ERROR_UNREF(v6_err);
313 }
314 if (v4_err != GRPC_ERROR_NONE) {
315 gpr_log(GPR_INFO,
316 "Failed to add 0.0.0.0 listener, "
317 "the environment may not support IPv4: %s",
318 grpc_error_string(v4_err));
319 GRPC_ERROR_UNREF(v4_err);
320 }
321 return GRPC_ERROR_NONE;
322 } else {
323 grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
324 "Failed to add any wildcard listeners");
325 GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE);
326 root_err = grpc_error_add_child(root_err, v6_err);
327 root_err = grpc_error_add_child(root_err, v4_err);
328 return root_err;
329 }
330 }
331
clone_port(grpc_tcp_listener * listener,unsigned count)332 static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
333 grpc_tcp_listener* sp = nullptr;
334 char* addr_str;
335 char* name;
336 grpc_error* err;
337
338 for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
339 l->fd_index += count;
340 }
341
342 for (unsigned i = 0; i < count; i++) {
343 int fd = -1;
344 int port = -1;
345 grpc_dualstack_mode dsmode;
346 err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
347 &fd);
348 if (err != GRPC_ERROR_NONE) return err;
349 err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
350 true, &port);
351 if (err != GRPC_ERROR_NONE) return err;
352 listener->server->nports++;
353 grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
354 gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
355 sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
356 sp->next = listener->next;
357 listener->next = sp;
358 /* sp (the new listener) is a sibling of 'listener' (the original
359 listener). */
360 sp->is_sibling = 1;
361 sp->sibling = listener->sibling;
362 listener->sibling = sp;
363 sp->server = listener->server;
364 sp->fd = fd;
365 sp->emfd = grpc_fd_create(fd, name, true);
366 memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
367 sp->port = port;
368 sp->port_index = listener->port_index;
369 sp->fd_index = listener->fd_index + count - i;
370 GPR_ASSERT(sp->emfd);
371 while (listener->server->tail->next != nullptr) {
372 listener->server->tail = listener->server->tail->next;
373 }
374 gpr_free(addr_str);
375 gpr_free(name);
376 }
377
378 return GRPC_ERROR_NONE;
379 }
380
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)381 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
382 const grpc_resolved_address* addr,
383 int* out_port) {
384 grpc_tcp_listener* sp;
385 grpc_resolved_address sockname_temp;
386 grpc_resolved_address addr6_v4mapped;
387 int requested_port = grpc_sockaddr_get_port(addr);
388 unsigned port_index = 0;
389 grpc_dualstack_mode dsmode;
390 grpc_error* err;
391 *out_port = -1;
392 if (s->tail != nullptr) {
393 port_index = s->tail->port_index + 1;
394 }
395 grpc_unlink_if_unix_domain_socket(addr);
396
397 /* Check if this is a wildcard port, and if so, try to keep the port the same
398 as some previously created listener. */
399 if (requested_port == 0) {
400 for (sp = s->head; sp; sp = sp->next) {
401 sockname_temp.len =
402 static_cast<socklen_t>(sizeof(struct sockaddr_storage));
403 if (0 ==
404 getsockname(sp->fd,
405 reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
406 &sockname_temp.len)) {
407 int used_port = grpc_sockaddr_get_port(&sockname_temp);
408 if (used_port > 0) {
409 memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
410 grpc_sockaddr_set_port(&sockname_temp, used_port);
411 requested_port = used_port;
412 addr = &sockname_temp;
413 break;
414 }
415 }
416 }
417 }
418 if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
419 return add_wildcard_addrs_to_server(s, port_index, requested_port,
420 out_port);
421 }
422 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
423 addr = &addr6_v4mapped;
424 }
425 if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
426 GRPC_ERROR_NONE) {
427 *out_port = sp->port;
428 }
429 return err;
430 }
431
432 /* Return listener at port_index or NULL. Should only be called with s->mu
433 locked. */
get_port_index(grpc_tcp_server * s,unsigned port_index)434 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
435 unsigned port_index) {
436 unsigned num_ports = 0;
437 grpc_tcp_listener* sp;
438 for (sp = s->head; sp; sp = sp->next) {
439 if (!sp->is_sibling) {
440 if (++num_ports > port_index) {
441 return sp;
442 }
443 }
444 }
445 return nullptr;
446 }
447
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)448 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
449 unsigned num_fds = 0;
450 gpr_mu_lock(&s->mu);
451 grpc_tcp_listener* sp = get_port_index(s, port_index);
452 for (; sp; sp = sp->sibling) {
453 ++num_fds;
454 }
455 gpr_mu_unlock(&s->mu);
456 return num_fds;
457 }
458
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)459 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
460 unsigned fd_index) {
461 gpr_mu_lock(&s->mu);
462 grpc_tcp_listener* sp = get_port_index(s, port_index);
463 for (; sp; sp = sp->sibling, --fd_index) {
464 if (fd_index == 0) {
465 gpr_mu_unlock(&s->mu);
466 return sp->fd;
467 }
468 }
469 gpr_mu_unlock(&s->mu);
470 return -1;
471 }
472
tcp_server_start(grpc_tcp_server * s,grpc_pollset ** pollsets,size_t pollset_count,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg)473 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
474 size_t pollset_count,
475 grpc_tcp_server_cb on_accept_cb,
476 void* on_accept_cb_arg) {
477 size_t i;
478 grpc_tcp_listener* sp;
479 GPR_ASSERT(on_accept_cb);
480 gpr_mu_lock(&s->mu);
481 GPR_ASSERT(!s->on_accept_cb);
482 GPR_ASSERT(s->active_ports == 0);
483 s->on_accept_cb = on_accept_cb;
484 s->on_accept_cb_arg = on_accept_cb_arg;
485 s->pollsets = pollsets;
486 s->pollset_count = pollset_count;
487 sp = s->head;
488 while (sp != nullptr) {
489 if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
490 pollset_count > 1) {
491 GPR_ASSERT(GRPC_LOG_IF_ERROR(
492 "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
493 for (i = 0; i < pollset_count; i++) {
494 grpc_pollset_add_fd(pollsets[i], sp->emfd);
495 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
496 grpc_schedule_on_exec_ctx);
497 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
498 s->active_ports++;
499 sp = sp->next;
500 }
501 } else {
502 for (i = 0; i < pollset_count; i++) {
503 grpc_pollset_add_fd(pollsets[i], sp->emfd);
504 }
505 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
506 grpc_schedule_on_exec_ctx);
507 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
508 s->active_ports++;
509 sp = sp->next;
510 }
511 }
512 gpr_mu_unlock(&s->mu);
513 }
514
tcp_server_ref(grpc_tcp_server * s)515 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
516 gpr_ref_non_zero(&s->refs);
517 return s;
518 }
519
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)520 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
521 grpc_closure* shutdown_starting) {
522 gpr_mu_lock(&s->mu);
523 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
524 GRPC_ERROR_NONE);
525 gpr_mu_unlock(&s->mu);
526 }
527
tcp_server_unref(grpc_tcp_server * s)528 static void tcp_server_unref(grpc_tcp_server* s) {
529 if (gpr_unref(&s->refs)) {
530 grpc_tcp_server_shutdown_listeners(s);
531 gpr_mu_lock(&s->mu);
532 GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
533 gpr_mu_unlock(&s->mu);
534 tcp_server_destroy(s);
535 }
536 }
537
tcp_server_shutdown_listeners(grpc_tcp_server * s)538 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
539 gpr_mu_lock(&s->mu);
540 s->shutdown_listeners = true;
541 /* shutdown all fd's */
542 if (s->active_ports) {
543 grpc_tcp_listener* sp;
544 for (sp = s->head; sp; sp = sp->next) {
545 grpc_fd_shutdown(sp->emfd,
546 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
547 }
548 }
549 gpr_mu_unlock(&s->mu);
550 }
551
552 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
553 tcp_server_create,
554 tcp_server_start,
555 tcp_server_add_port,
556 tcp_server_port_fd_count,
557 tcp_server_port_fd,
558 tcp_server_ref,
559 tcp_server_shutdown_starting_add,
560 tcp_server_unref,
561 tcp_server_shutdown_listeners};
562 #endif /* GRPC_POSIX_SOCKET_TCP_SERVER */
563