1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23 
24 #include "src/core/lib/iomgr/ev_posix.h"
25 
26 #include <ctype.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <netinet/in.h>
30 #include <poll.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/time.h>
36 #include <unistd.h>
37 
38 #include <grpc/grpc.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/sync.h>
42 #include <grpc/support/time.h>
43 
44 #include "src/core/lib/iomgr/ev_posix.h"
45 #include "src/core/lib/iomgr/iomgr.h"
46 #include "src/core/lib/iomgr/socket_utils_posix.h"
47 #include "test/core/util/test_config.h"
48 
49 static gpr_mu* g_mu;
50 static grpc_pollset* g_pollset;
51 
52 /* buffer size used to send and receive data.
53    1024 is the minimal value to set TCP send and receive buffer. */
54 #define BUF_SIZE 1024
55 
56 /* Create a test socket with the right properties for testing.
57    port is the TCP port to listen or connect to.
58    Return a socket FD and sockaddr_in. */
create_test_socket(int port,int * socket_fd,struct sockaddr_in * sin)59 static void create_test_socket(int port, int* socket_fd,
60                                struct sockaddr_in* sin) {
61   int fd;
62   int one = 1;
63   int buffer_size_bytes = BUF_SIZE;
64   int flags;
65 
66   fd = socket(AF_INET, SOCK_STREAM, 0);
67   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
68   /* Reset the size of socket send buffer to the minimal value to facilitate
69      buffer filling up and triggering notify_on_write  */
70   GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
71   GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
72   /* Make fd non-blocking */
73   flags = fcntl(fd, F_GETFL, 0);
74   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
75   *socket_fd = fd;
76 
77   /* Use local address for test */
78   sin->sin_family = AF_INET;
79   sin->sin_addr.s_addr = htonl(0x7f000001);
80   GPR_ASSERT(port >= 0 && port < 65536);
81   sin->sin_port = htons(static_cast<uint16_t>(port));
82 }
83 
84 /* Dummy gRPC callback */
no_op_cb(void * arg,int success)85 void no_op_cb(void* arg, int success) {}
86 
87 /* =======An upload server to test notify_on_read===========
88    The server simply reads and counts a stream of bytes. */
89 
90 /* An upload server. */
91 typedef struct {
92   grpc_fd* em_fd;           /* listening fd */
93   ssize_t read_bytes_total; /* total number of received bytes */
94   int done;                 /* set to 1 when a server finishes serving */
95   grpc_closure listen_closure;
96 } server;
97 
server_init(server * sv)98 static void server_init(server* sv) {
99   sv->read_bytes_total = 0;
100   sv->done = 0;
101 }
102 
103 /* An upload session.
104    Created when a new upload request arrives in the server. */
105 typedef struct {
106   server* sv;              /* not owned by a single session */
107   grpc_fd* em_fd;          /* fd to read upload bytes */
108   char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
109   grpc_closure session_read_closure;
110 } session;
111 
112 /* Called when an upload session can be safely shutdown.
113    Close session FD and start to shutdown listen FD. */
session_shutdown_cb(void * arg,bool success)114 static void session_shutdown_cb(void* arg, /*session */
115                                 bool success) {
116   session* se = static_cast<session*>(arg);
117   server* sv = se->sv;
118   grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
119   gpr_free(se);
120   /* Start to shutdown listen fd. */
121   grpc_fd_shutdown(sv->em_fd,
122                    GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
123 }
124 
125 /* Called when data become readable in a session. */
session_read_cb(void * arg,grpc_error * error)126 static void session_read_cb(void* arg, /*session */
127                             grpc_error* error) {
128   session* se = static_cast<session*>(arg);
129   int fd = grpc_fd_wrapped_fd(se->em_fd);
130 
131   ssize_t read_once = 0;
132   ssize_t read_total = 0;
133 
134   if (error != GRPC_ERROR_NONE) {
135     session_shutdown_cb(arg, 1);
136     return;
137   }
138 
139   do {
140     read_once = read(fd, se->read_buf, BUF_SIZE);
141     if (read_once > 0) read_total += read_once;
142   } while (read_once > 0);
143   se->sv->read_bytes_total += read_total;
144 
145   /* read() returns 0 to indicate the TCP connection was closed by the client.
146      read(fd, read_buf, 0) also returns 0 which should never be called as such.
147      It is possible to read nothing due to spurious edge event or data has
148      been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
149   if (read_once == 0) {
150     session_shutdown_cb(arg, 1);
151   } else if (read_once == -1) {
152     if (errno == EAGAIN) {
153       /* An edge triggered event is cached in the kernel until next poll.
154          In the current single thread implementation, session_read_cb is called
155          in the polling thread, such that polling only happens after this
156          callback, and will catch read edge event if data is available again
157          before notify_on_read.
158          TODO(chenw): in multi-threaded version, callback and polling can be
159          run in different threads. polling may catch a persist read edge event
160          before notify_on_read is called.  */
161       grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
162     } else {
163       gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
164       abort();
165     }
166   }
167 }
168 
169 /* Called when the listen FD can be safely shutdown.
170    Close listen FD and signal that server can be shutdown. */
listen_shutdown_cb(void * arg,int success)171 static void listen_shutdown_cb(void* arg /*server */, int success) {
172   server* sv = static_cast<server*>(arg);
173 
174   grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
175 
176   gpr_mu_lock(g_mu);
177   sv->done = 1;
178   GPR_ASSERT(
179       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
180   gpr_mu_unlock(g_mu);
181 }
182 
183 /* Called when a new TCP connection request arrives in the listening port. */
listen_cb(void * arg,grpc_error * error)184 static void listen_cb(void* arg, /*=sv_arg*/
185                       grpc_error* error) {
186   server* sv = static_cast<server*>(arg);
187   int fd;
188   int flags;
189   session* se;
190   struct sockaddr_storage ss;
191   socklen_t slen = sizeof(ss);
192   grpc_fd* listen_em_fd = sv->em_fd;
193 
194   if (error != GRPC_ERROR_NONE) {
195     listen_shutdown_cb(arg, 1);
196     return;
197   }
198 
199   fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
200               reinterpret_cast<struct sockaddr*>(&ss), &slen);
201   GPR_ASSERT(fd >= 0);
202   GPR_ASSERT(fd < FD_SETSIZE);
203   flags = fcntl(fd, F_GETFL, 0);
204   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
205   se = static_cast<session*>(gpr_malloc(sizeof(*se)));
206   se->sv = sv;
207   se->em_fd = grpc_fd_create(fd, "listener", false);
208   grpc_pollset_add_fd(g_pollset, se->em_fd);
209   GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
210                     grpc_schedule_on_exec_ctx);
211   grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
212 
213   grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
214 }
215 
216 /* Max number of connections pending to be accepted by listen(). */
217 #define MAX_NUM_FD 1024
218 
219 /* Start a test server, return the TCP listening port bound to listen_fd.
220    listen_cb() is registered to be interested in reading from listen_fd.
221    When connection request arrives, listen_cb() is called to accept the
222    connection request. */
server_start(server * sv)223 static int server_start(server* sv) {
224   int port = 0;
225   int fd;
226   struct sockaddr_in sin;
227   socklen_t addr_len;
228 
229   create_test_socket(port, &fd, &sin);
230   addr_len = sizeof(sin);
231   GPR_ASSERT(bind(fd, (struct sockaddr*)&sin, addr_len) == 0);
232   GPR_ASSERT(getsockname(fd, (struct sockaddr*)&sin, &addr_len) == 0);
233   port = ntohs(sin.sin_port);
234   GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
235 
236   sv->em_fd = grpc_fd_create(fd, "server", false);
237   grpc_pollset_add_fd(g_pollset, sv->em_fd);
238   /* Register to be interested in reading from listen_fd. */
239   GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
240                     grpc_schedule_on_exec_ctx);
241   grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
242 
243   return port;
244 }
245 
246 /* Wait and shutdown a sever. */
server_wait_and_shutdown(server * sv)247 static void server_wait_and_shutdown(server* sv) {
248   gpr_mu_lock(g_mu);
249   while (!sv->done) {
250     grpc_core::ExecCtx exec_ctx;
251     grpc_pollset_worker* worker = nullptr;
252     GPR_ASSERT(GRPC_LOG_IF_ERROR(
253         "pollset_work",
254         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
255     gpr_mu_unlock(g_mu);
256 
257     gpr_mu_lock(g_mu);
258   }
259   gpr_mu_unlock(g_mu);
260 }
261 
262 /* ===An upload client to test notify_on_write=== */
263 
264 /* Client write buffer size */
265 #define CLIENT_WRITE_BUF_SIZE 10
266 /* Total number of times that the client fills up the write buffer */
267 #define CLIENT_TOTAL_WRITE_CNT 3
268 
269 /* An upload client. */
270 typedef struct {
271   grpc_fd* em_fd;
272   char write_buf[CLIENT_WRITE_BUF_SIZE];
273   ssize_t write_bytes_total;
274   /* Number of times that the client fills up the write buffer and calls
275      notify_on_write to schedule another write. */
276   int client_write_cnt;
277 
278   int done; /* set to 1 when a client finishes sending */
279   grpc_closure write_closure;
280 } client;
281 
client_init(client * cl)282 static void client_init(client* cl) {
283   memset(cl->write_buf, 0, sizeof(cl->write_buf));
284   cl->write_bytes_total = 0;
285   cl->client_write_cnt = 0;
286   cl->done = 0;
287 }
288 
289 /* Called when a client upload session is ready to shutdown. */
client_session_shutdown_cb(void * arg,int success)290 static void client_session_shutdown_cb(void* arg /*client */, int success) {
291   client* cl = static_cast<client*>(arg);
292   grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
293   cl->done = 1;
294   GPR_ASSERT(
295       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
296 }
297 
298 /* Write as much as possible, then register notify_on_write. */
client_session_write(void * arg,grpc_error * error)299 static void client_session_write(void* arg, /*client */
300                                  grpc_error* error) {
301   client* cl = static_cast<client*>(arg);
302   int fd = grpc_fd_wrapped_fd(cl->em_fd);
303   ssize_t write_once = 0;
304 
305   if (error != GRPC_ERROR_NONE) {
306     gpr_mu_lock(g_mu);
307     client_session_shutdown_cb(arg, 1);
308     gpr_mu_unlock(g_mu);
309     return;
310   }
311 
312   do {
313     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
314     if (write_once > 0) cl->write_bytes_total += write_once;
315   } while (write_once > 0);
316 
317   if (errno == EAGAIN) {
318     gpr_mu_lock(g_mu);
319     if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
320       GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
321                         grpc_schedule_on_exec_ctx);
322       grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
323       cl->client_write_cnt++;
324     } else {
325       client_session_shutdown_cb(arg, 1);
326     }
327     gpr_mu_unlock(g_mu);
328   } else {
329     gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
330     abort();
331   }
332 }
333 
334 /* Start a client to send a stream of bytes. */
client_start(client * cl,int port)335 static void client_start(client* cl, int port) {
336   int fd;
337   struct sockaddr_in sin;
338   create_test_socket(port, &fd, &sin);
339   if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
340       -1) {
341     if (errno == EINPROGRESS) {
342       struct pollfd pfd;
343       pfd.fd = fd;
344       pfd.events = POLLOUT;
345       pfd.revents = 0;
346       if (poll(&pfd, 1, -1) == -1) {
347         gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
348         abort();
349       }
350     } else {
351       gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
352       abort();
353     }
354   }
355 
356   cl->em_fd = grpc_fd_create(fd, "client", false);
357   grpc_pollset_add_fd(g_pollset, cl->em_fd);
358 
359   client_session_write(cl, GRPC_ERROR_NONE);
360 }
361 
362 /* Wait for the signal to shutdown a client. */
client_wait_and_shutdown(client * cl)363 static void client_wait_and_shutdown(client* cl) {
364   gpr_mu_lock(g_mu);
365   while (!cl->done) {
366     grpc_pollset_worker* worker = nullptr;
367     grpc_core::ExecCtx exec_ctx;
368     GPR_ASSERT(GRPC_LOG_IF_ERROR(
369         "pollset_work",
370         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
371     gpr_mu_unlock(g_mu);
372 
373     gpr_mu_lock(g_mu);
374   }
375   gpr_mu_unlock(g_mu);
376 }
377 
378 /* Test grpc_fd. Start an upload server and client, upload a stream of
379    bytes from the client to the server, and verify that the total number of
380    sent bytes is equal to the total number of received bytes. */
test_grpc_fd(void)381 static void test_grpc_fd(void) {
382   server sv;
383   client cl;
384   int port;
385   grpc_core::ExecCtx exec_ctx;
386 
387   server_init(&sv);
388   port = server_start(&sv);
389   client_init(&cl);
390   client_start(&cl, port);
391 
392   client_wait_and_shutdown(&cl);
393   server_wait_and_shutdown(&sv);
394   GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
395   gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
396 }
397 
398 typedef struct fd_change_data {
399   grpc_iomgr_cb_func cb_that_ran;
400 } fd_change_data;
401 
init_change_data(fd_change_data * fdc)402 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
403 
destroy_change_data(fd_change_data * fdc)404 void destroy_change_data(fd_change_data* fdc) {}
405 
first_read_callback(void * arg,grpc_error * error)406 static void first_read_callback(void* arg /* fd_change_data */,
407                                 grpc_error* error) {
408   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
409 
410   gpr_mu_lock(g_mu);
411   fdc->cb_that_ran = first_read_callback;
412   GPR_ASSERT(
413       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
414   gpr_mu_unlock(g_mu);
415 }
416 
second_read_callback(void * arg,grpc_error * error)417 static void second_read_callback(void* arg /* fd_change_data */,
418                                  grpc_error* error) {
419   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
420 
421   gpr_mu_lock(g_mu);
422   fdc->cb_that_ran = second_read_callback;
423   GPR_ASSERT(
424       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
425   gpr_mu_unlock(g_mu);
426 }
427 
428 /* Test that changing the callback we use for notify_on_read actually works.
429    Note that we have two different but almost identical callbacks above -- the
430    point is to have two different function pointers and two different data
431    pointers and make sure that changing both really works. */
test_grpc_fd_change(void)432 static void test_grpc_fd_change(void) {
433   grpc_fd* em_fd;
434   fd_change_data a, b;
435   int flags;
436   int sv[2];
437   char data;
438   ssize_t result;
439   grpc_closure first_closure;
440   grpc_closure second_closure;
441   grpc_core::ExecCtx exec_ctx;
442 
443   GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
444                     grpc_schedule_on_exec_ctx);
445   GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
446                     grpc_schedule_on_exec_ctx);
447 
448   init_change_data(&a);
449   init_change_data(&b);
450 
451   GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
452   flags = fcntl(sv[0], F_GETFL, 0);
453   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
454   flags = fcntl(sv[1], F_GETFL, 0);
455   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
456 
457   em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
458   grpc_pollset_add_fd(g_pollset, em_fd);
459 
460   /* Register the first callback, then make its FD readable */
461   grpc_fd_notify_on_read(em_fd, &first_closure);
462   data = 0;
463   result = write(sv[1], &data, 1);
464   GPR_ASSERT(result == 1);
465 
466   /* And now wait for it to run. */
467   gpr_mu_lock(g_mu);
468   while (a.cb_that_ran == nullptr) {
469     grpc_pollset_worker* worker = nullptr;
470     GPR_ASSERT(GRPC_LOG_IF_ERROR(
471         "pollset_work",
472         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
473     gpr_mu_unlock(g_mu);
474 
475     gpr_mu_lock(g_mu);
476   }
477   GPR_ASSERT(a.cb_that_ran == first_read_callback);
478   gpr_mu_unlock(g_mu);
479 
480   /* And drain the socket so we can generate a new read edge */
481   result = read(sv[0], &data, 1);
482   GPR_ASSERT(result == 1);
483 
484   /* Now register a second callback with distinct change data, and do the same
485      thing again. */
486   grpc_fd_notify_on_read(em_fd, &second_closure);
487   data = 0;
488   result = write(sv[1], &data, 1);
489   GPR_ASSERT(result == 1);
490 
491   gpr_mu_lock(g_mu);
492   while (b.cb_that_ran == nullptr) {
493     grpc_pollset_worker* worker = nullptr;
494     GPR_ASSERT(GRPC_LOG_IF_ERROR(
495         "pollset_work",
496         grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
497     gpr_mu_unlock(g_mu);
498 
499     gpr_mu_lock(g_mu);
500   }
501   /* Except now we verify that second_read_callback ran instead */
502   GPR_ASSERT(b.cb_that_ran == second_read_callback);
503   gpr_mu_unlock(g_mu);
504 
505   grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
506 
507   destroy_change_data(&a);
508   destroy_change_data(&b);
509   close(sv[1]);
510 }
511 
destroy_pollset(void * p,grpc_error * error)512 static void destroy_pollset(void* p, grpc_error* error) {
513   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
514 }
515 
main(int argc,char ** argv)516 int main(int argc, char** argv) {
517   grpc_closure destroyed;
518   grpc_test_init(argc, argv);
519   grpc_init();
520   {
521     grpc_core::ExecCtx exec_ctx;
522     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
523     grpc_pollset_init(g_pollset, &g_mu);
524     test_grpc_fd();
525     test_grpc_fd_change();
526     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
527                       grpc_schedule_on_exec_ctx);
528     grpc_pollset_shutdown(g_pollset, &destroyed);
529     grpc_core::ExecCtx::Get()->Flush();
530     gpr_free(g_pollset);
531   }
532   grpc_shutdown();
533   return 0;
534 }
535 
536 #else /* GRPC_POSIX_SOCKET */
537 
main(int argc,char ** argv)538 int main(int argc, char** argv) { return 1; }
539 
540 #endif /* GRPC_POSIX_SOCKET */
541