1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23 
24 #include "src/core/lib/iomgr/udp_server.h"
25 
26 #include <netinet/in.h>
27 #include <string.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30 
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/gprpp/memory.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/iomgr.h"
42 #include "src/core/lib/iomgr/socket_factory_posix.h"
43 #include "src/core/lib/iomgr/socket_utils_posix.h"
44 #include "test/core/util/test_config.h"
45 
46 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
47 
48 static grpc_pollset* g_pollset;
49 static gpr_mu* g_mu;
50 static int g_number_of_reads = 0;
51 static int g_number_of_writes = 0;
52 static int g_number_of_bytes_read = 0;
53 static int g_number_of_orphan_calls = 0;
54 static int g_number_of_starts = 0;
55 
56 int rcv_buf_size = 1024;
57 int snd_buf_size = 1024;
58 
59 static int g_num_listeners = 1;
60 
61 class TestGrpcUdpHandler : public GrpcUdpHandler {
62  public:
TestGrpcUdpHandler(grpc_fd * emfd,void * user_data)63   TestGrpcUdpHandler(grpc_fd* emfd, void* user_data)
64       : GrpcUdpHandler(emfd, user_data), emfd_(emfd) {
65     g_number_of_starts++;
66   }
~TestGrpcUdpHandler()67   ~TestGrpcUdpHandler() override {}
68 
69  protected:
Read()70   bool Read() override {
71     char read_buffer[512];
72     ssize_t byte_count;
73 
74     gpr_mu_lock(g_mu);
75     byte_count =
76         recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0);
77 
78     g_number_of_reads++;
79     g_number_of_bytes_read += static_cast<int>(byte_count);
80 
81     gpr_log(GPR_DEBUG, "receive %zu on handler %p", byte_count, this);
82     GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
83                                  grpc_pollset_kick(g_pollset, nullptr)));
84     gpr_mu_unlock(g_mu);
85     return false;
86   }
87 
OnCanWrite(void * user_data,grpc_closure * notify_on_write_closure)88   void OnCanWrite(void* user_data,
89                   grpc_closure* notify_on_write_closure) override {
90     gpr_mu_lock(g_mu);
91     g_number_of_writes++;
92 
93     GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
94                                  grpc_pollset_kick(g_pollset, nullptr)));
95     gpr_mu_unlock(g_mu);
96   }
97 
OnFdAboutToOrphan(grpc_closure * orphan_fd_closure,void * user_data)98   void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
99                          void* user_data) override {
100     gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
101             grpc_fd_wrapped_fd(emfd()));
102     GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE);
103     g_number_of_orphan_calls++;
104   }
105 
emfd()106   grpc_fd* emfd() { return emfd_; }
107 
108  private:
109   grpc_fd* emfd_;
110 };
111 
112 class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory {
113  public:
CreateUdpHandler(grpc_fd * emfd,void * user_data)114   GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override {
115     gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd));
116     return grpc_core::New<TestGrpcUdpHandler>(emfd, user_data);
117   }
118 
DestroyUdpHandler(GrpcUdpHandler * handler)119   void DestroyUdpHandler(GrpcUdpHandler* handler) override {
120     gpr_log(GPR_INFO, "Destroy handler");
121     grpc_core::Delete(reinterpret_cast<TestGrpcUdpHandler*>(handler));
122   }
123 };
124 
125 TestGrpcUdpHandlerFactory handler_factory;
126 
127 struct test_socket_factory {
128   grpc_socket_factory base;
129   int number_of_socket_calls;
130   int number_of_bind_calls;
131 };
132 typedef struct test_socket_factory test_socket_factory;
133 
test_socket_factory_socket(grpc_socket_factory * factory,int domain,int type,int protocol)134 static int test_socket_factory_socket(grpc_socket_factory* factory, int domain,
135                                       int type, int protocol) {
136   test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
137   f->number_of_socket_calls++;
138   return socket(domain, type, protocol);
139 }
140 
test_socket_factory_bind(grpc_socket_factory * factory,int sockfd,const grpc_resolved_address * addr)141 static int test_socket_factory_bind(grpc_socket_factory* factory, int sockfd,
142                                     const grpc_resolved_address* addr) {
143   test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
144   f->number_of_bind_calls++;
145   return bind(sockfd,
146               reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr)),
147               static_cast<socklen_t>(addr->len));
148 }
149 
test_socket_factory_compare(grpc_socket_factory * a,grpc_socket_factory * b)150 static int test_socket_factory_compare(grpc_socket_factory* a,
151                                        grpc_socket_factory* b) {
152   return GPR_ICMP(a, b);
153 }
154 
test_socket_factory_destroy(grpc_socket_factory * factory)155 static void test_socket_factory_destroy(grpc_socket_factory* factory) {
156   test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
157   gpr_free(f);
158 }
159 
160 static const grpc_socket_factory_vtable test_socket_factory_vtable = {
161     test_socket_factory_socket, test_socket_factory_bind,
162     test_socket_factory_compare, test_socket_factory_destroy};
163 
test_socket_factory_create(void)164 static test_socket_factory* test_socket_factory_create(void) {
165   test_socket_factory* factory = static_cast<test_socket_factory*>(
166       gpr_malloc(sizeof(test_socket_factory)));
167   grpc_socket_factory_init(&factory->base, &test_socket_factory_vtable);
168   factory->number_of_socket_calls = 0;
169   factory->number_of_bind_calls = 0;
170   return factory;
171 }
172 
destroy_pollset(void * p,grpc_error * error)173 static void destroy_pollset(void* p, grpc_error* error) {
174   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
175 }
176 
shutdown_and_destroy_pollset()177 static void shutdown_and_destroy_pollset() {
178   gpr_mu_lock(g_mu);
179   auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset,
180                                      grpc_schedule_on_exec_ctx);
181   grpc_pollset_shutdown(g_pollset, closure);
182   gpr_mu_unlock(g_mu);
183   /* Flush exec_ctx to run |destroyed| */
184   grpc_core::ExecCtx::Get()->Flush();
185 }
186 
test_no_op(void)187 static void test_no_op(void) {
188   grpc_pollset_init(g_pollset, &g_mu);
189   grpc_core::ExecCtx exec_ctx;
190   grpc_udp_server* s = grpc_udp_server_create(nullptr);
191   LOG_TEST("test_no_op");
192   grpc_udp_server_destroy(s, nullptr);
193   shutdown_and_destroy_pollset();
194 }
195 
test_no_op_with_start(void)196 static void test_no_op_with_start(void) {
197   grpc_pollset_init(g_pollset, &g_mu);
198   grpc_core::ExecCtx exec_ctx;
199   grpc_udp_server* s = grpc_udp_server_create(nullptr);
200   LOG_TEST("test_no_op_with_start");
201   grpc_udp_server_start(s, nullptr, 0, nullptr);
202   grpc_udp_server_destroy(s, nullptr);
203   shutdown_and_destroy_pollset();
204 }
205 
test_no_op_with_port(void)206 static void test_no_op_with_port(void) {
207   grpc_pollset_init(g_pollset, &g_mu);
208   g_number_of_orphan_calls = 0;
209   grpc_core::ExecCtx exec_ctx;
210   grpc_resolved_address resolved_addr;
211   struct sockaddr_in* addr =
212       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
213   grpc_udp_server* s = grpc_udp_server_create(nullptr);
214   LOG_TEST("test_no_op_with_port");
215 
216   memset(&resolved_addr, 0, sizeof(resolved_addr));
217   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
218   addr->sin_family = AF_INET;
219   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
220                                       snd_buf_size, &handler_factory,
221                                       g_num_listeners));
222 
223   grpc_udp_server_destroy(s, nullptr);
224 
225   /* The server haven't start listening, so no udp handler to be notified. */
226   GPR_ASSERT(g_number_of_orphan_calls == 0);
227   shutdown_and_destroy_pollset();
228 }
229 
test_no_op_with_port_and_socket_factory(void)230 static void test_no_op_with_port_and_socket_factory(void) {
231   grpc_pollset_init(g_pollset, &g_mu);
232   g_number_of_orphan_calls = 0;
233   grpc_core::ExecCtx exec_ctx;
234   grpc_resolved_address resolved_addr;
235   struct sockaddr_in* addr =
236       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
237 
238   test_socket_factory* socket_factory = test_socket_factory_create();
239   grpc_arg socket_factory_arg =
240       grpc_socket_factory_to_arg(&socket_factory->base);
241   grpc_channel_args* channel_args =
242       grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1);
243   grpc_udp_server* s = grpc_udp_server_create(channel_args);
244   grpc_channel_args_destroy(channel_args);
245 
246   LOG_TEST("test_no_op_with_port_and_socket_factory");
247 
248   memset(&resolved_addr, 0, sizeof(resolved_addr));
249   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
250   addr->sin_family = AF_INET;
251   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
252                                       snd_buf_size, &handler_factory,
253                                       g_num_listeners));
254   GPR_ASSERT(socket_factory->number_of_socket_calls == g_num_listeners);
255   GPR_ASSERT(socket_factory->number_of_bind_calls == g_num_listeners);
256 
257   grpc_udp_server_destroy(s, nullptr);
258 
259   grpc_socket_factory_unref(&socket_factory->base);
260 
261   /* The server haven't start listening, so no udp handler to be notified. */
262   GPR_ASSERT(g_number_of_orphan_calls == 0);
263   shutdown_and_destroy_pollset();
264 }
265 
test_no_op_with_port_and_start(void)266 static void test_no_op_with_port_and_start(void) {
267   grpc_pollset_init(g_pollset, &g_mu);
268   g_number_of_orphan_calls = 0;
269   grpc_core::ExecCtx exec_ctx;
270   grpc_resolved_address resolved_addr;
271   struct sockaddr_in* addr =
272       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
273   grpc_udp_server* s = grpc_udp_server_create(nullptr);
274   LOG_TEST("test_no_op_with_port_and_start");
275 
276   memset(&resolved_addr, 0, sizeof(resolved_addr));
277   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
278   addr->sin_family = AF_INET;
279   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
280                                       snd_buf_size, &handler_factory,
281                                       g_num_listeners));
282 
283   grpc_udp_server_start(s, nullptr, 0, nullptr);
284   GPR_ASSERT(g_number_of_starts == g_num_listeners);
285   grpc_udp_server_destroy(s, nullptr);
286 
287   /* The server had a single FD, which is orphaned exactly once in *
288    * grpc_udp_server_destroy. */
289   GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
290   shutdown_and_destroy_pollset();
291 }
292 
test_receive(int number_of_clients)293 static void test_receive(int number_of_clients) {
294   grpc_pollset_init(g_pollset, &g_mu);
295   grpc_core::ExecCtx exec_ctx;
296   grpc_resolved_address resolved_addr;
297   struct sockaddr_storage* addr =
298       reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
299   int clifd, svrfd;
300   grpc_udp_server* s = grpc_udp_server_create(nullptr);
301   int i;
302   grpc_millis deadline;
303   grpc_pollset* pollsets[1];
304   LOG_TEST("test_receive");
305   gpr_log(GPR_INFO, "clients=%d", number_of_clients);
306 
307   g_number_of_bytes_read = 0;
308   g_number_of_orphan_calls = 0;
309 
310   memset(&resolved_addr, 0, sizeof(resolved_addr));
311   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
312   addr->ss_family = AF_INET;
313   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
314                                       snd_buf_size, &handler_factory,
315                                       g_num_listeners));
316 
317   svrfd = grpc_udp_server_get_fd(s, 0);
318   GPR_ASSERT(svrfd >= 0);
319   GPR_ASSERT(getsockname(svrfd, (struct sockaddr*)addr,
320                          (socklen_t*)&resolved_addr.len) == 0);
321   GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
322 
323   pollsets[0] = g_pollset;
324   grpc_udp_server_start(s, pollsets, 1, nullptr);
325 
326   gpr_mu_lock(g_mu);
327 
328   for (i = 0; i < number_of_clients; i++) {
329     deadline =
330         grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
331 
332     int number_of_bytes_read_before = g_number_of_bytes_read;
333     /* Create a socket, send a packet to the UDP server. */
334     clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
335     GPR_ASSERT(clifd >= 0);
336     GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
337                        (socklen_t)resolved_addr.len) == 0);
338     GPR_ASSERT(5 == write(clifd, "hello", 5));
339     while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
340            deadline > grpc_core::ExecCtx::Get()->Now()) {
341       grpc_pollset_worker* worker = nullptr;
342       GPR_ASSERT(GRPC_LOG_IF_ERROR(
343           "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
344       gpr_mu_unlock(g_mu);
345       grpc_core::ExecCtx::Get()->Flush();
346       gpr_mu_lock(g_mu);
347     }
348     close(clifd);
349   }
350   GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
351 
352   gpr_mu_unlock(g_mu);
353 
354   grpc_udp_server_destroy(s, nullptr);
355 
356   /* The server had a single FD, which is orphaned exactly once in *
357    * grpc_udp_server_destroy. */
358   GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
359   shutdown_and_destroy_pollset();
360 }
361 
main(int argc,char ** argv)362 int main(int argc, char** argv) {
363   grpc_test_init(argc, argv);
364   grpc_init();
365   if (grpc_is_socket_reuse_port_supported()) {
366     g_num_listeners = 10;
367   }
368   {
369     grpc_core::ExecCtx exec_ctx;
370     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
371 
372     test_no_op();
373     test_no_op_with_start();
374     test_no_op_with_port();
375     test_no_op_with_port_and_socket_factory();
376     test_no_op_with_port_and_start();
377     test_receive(1);
378     test_receive(10);
379 
380     gpr_free(g_pollset);
381   }
382   grpc_shutdown();
383   return 0;
384 }
385 
386 #else /* GRPC_POSIX_SOCKET */
387 
main(int argc,char ** argv)388 int main(int argc, char** argv) { return 1; }
389 
390 #endif /* GRPC_POSIX_SOCKET */
391