1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 
25 #include <memory.h>
26 #include <stdio.h>
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 
33 #include "src/core/lib/gprpp/thd.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "src/core/lib/iomgr/resolve_address.h"
37 #include "src/core/lib/iomgr/sockaddr_utils.h"
38 #include "src/core/lib/iomgr/tcp_server.h"
39 
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42 
43 #define NUM_THREADS 100
44 #define NUM_OUTER_LOOPS 10
45 #define NUM_INNER_LOOPS 10
46 #define DELAY_MILLIS 10
47 #define POLL_MILLIS 3000
48 
49 #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10
50 #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100
51 #define DELAY_MILLIS_SHORT_TIMEOUTS 1
52 // in a successful test run, POLL_MILLIS should never be reached because all
53 // runs should end after the shorter delay_millis
54 #define POLL_MILLIS_SHORT_TIMEOUTS 30000
55 // it should never take longer that this to shutdown the server
56 #define SERVER_SHUTDOWN_TIMEOUT 30000
57 
tag(int n)58 static void* tag(int n) { return (void*)static_cast<uintptr_t>(n); }
detag(void * p)59 static int detag(void* p) { return static_cast<int>((uintptr_t)p); }
60 
create_loop_destroy(void * addr)61 void create_loop_destroy(void* addr) {
62   for (int i = 0; i < NUM_OUTER_LOOPS; ++i) {
63     grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
64     grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
65                                                       nullptr, nullptr);
66 
67     for (int j = 0; j < NUM_INNER_LOOPS; ++j) {
68       gpr_timespec later_time =
69           grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS);
70       grpc_connectivity_state state =
71           grpc_channel_check_connectivity_state(chan, 1);
72       grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
73                                             nullptr);
74       gpr_timespec poll_time =
75           grpc_timeout_milliseconds_to_deadline(POLL_MILLIS);
76       GPR_ASSERT(grpc_completion_queue_next(cq, poll_time, nullptr).type ==
77                  GRPC_OP_COMPLETE);
78       /* check that the watcher from "watch state" was free'd */
79       GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
80     }
81     grpc_channel_destroy(chan);
82     grpc_completion_queue_destroy(cq);
83   }
84 }
85 
86 struct server_thread_args {
87   char* addr;
88   grpc_server* server;
89   grpc_completion_queue* cq;
90   grpc_pollset* pollset;
91   gpr_mu* mu;
92   gpr_event ready;
93   gpr_atm stop;
94 };
95 
server_thread(void * vargs)96 void server_thread(void* vargs) {
97   struct server_thread_args* args =
98       static_cast<struct server_thread_args*>(vargs);
99   grpc_event ev;
100   gpr_timespec deadline =
101       grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT);
102   ev = grpc_completion_queue_next(args->cq, deadline, nullptr);
103   GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
104   GPR_ASSERT(detag(ev.tag) == 0xd1e);
105 }
106 
on_connect(void * vargs,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)107 static void on_connect(void* vargs, grpc_endpoint* tcp,
108                        grpc_pollset* accepting_pollset,
109                        grpc_tcp_server_acceptor* acceptor) {
110   gpr_free(acceptor);
111   struct server_thread_args* args =
112       static_cast<struct server_thread_args*>(vargs);
113   grpc_endpoint_shutdown(tcp,
114                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
115   grpc_endpoint_destroy(tcp);
116   gpr_mu_lock(args->mu);
117   GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
118   gpr_mu_unlock(args->mu);
119 }
120 
bad_server_thread(void * vargs)121 void bad_server_thread(void* vargs) {
122   struct server_thread_args* args =
123       static_cast<struct server_thread_args*>(vargs);
124 
125   grpc_core::ExecCtx exec_ctx;
126   grpc_resolved_address resolved_addr;
127   grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_addr.addr);
128   int port;
129   grpc_tcp_server* s;
130   grpc_error* error = grpc_tcp_server_create(nullptr, nullptr, &s);
131   GPR_ASSERT(error == GRPC_ERROR_NONE);
132   memset(&resolved_addr, 0, sizeof(resolved_addr));
133   addr->sa_family = GRPC_AF_INET;
134   error = grpc_tcp_server_add_port(s, &resolved_addr, &port);
135   GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
136   GPR_ASSERT(port > 0);
137   gpr_asprintf(&args->addr, "localhost:%d", port);
138 
139   grpc_tcp_server_start(s, &args->pollset, 1, on_connect, args);
140   gpr_event_set(&args->ready, (void*)1);
141 
142   gpr_mu_lock(args->mu);
143   while (gpr_atm_acq_load(&args->stop) == 0) {
144     grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 100;
145 
146     grpc_pollset_worker* worker = nullptr;
147     if (!GRPC_LOG_IF_ERROR(
148             "pollset_work",
149             grpc_pollset_work(args->pollset, &worker, deadline))) {
150       gpr_atm_rel_store(&args->stop, 1);
151     }
152     gpr_mu_unlock(args->mu);
153 
154     gpr_mu_lock(args->mu);
155   }
156   gpr_mu_unlock(args->mu);
157 
158   grpc_tcp_server_unref(s);
159 
160   gpr_free(args->addr);
161 }
162 
done_pollset_shutdown(void * pollset,grpc_error * error)163 static void done_pollset_shutdown(void* pollset, grpc_error* error) {
164   grpc_pollset_destroy(static_cast<grpc_pollset*>(pollset));
165   gpr_free(pollset);
166 }
167 
run_concurrent_connectivity_test()168 int run_concurrent_connectivity_test() {
169   struct server_thread_args args;
170   memset(&args, 0, sizeof(args));
171 
172   grpc_init();
173 
174   /* First round, no server */
175   {
176     gpr_log(GPR_DEBUG, "Wave 1");
177     char* localhost = gpr_strdup("localhost:54321");
178     grpc_core::Thread threads[NUM_THREADS];
179     for (auto& th : threads) {
180       th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
181       th.Start();
182     }
183     for (auto& th : threads) {
184       th.Join();
185     }
186     gpr_free(localhost);
187   }
188 
189   {
190     /* Second round, actual grpc server */
191     gpr_log(GPR_DEBUG, "Wave 2");
192     int port = grpc_pick_unused_port_or_die();
193     gpr_asprintf(&args.addr, "localhost:%d", port);
194     args.server = grpc_server_create(nullptr, nullptr);
195     grpc_server_add_insecure_http2_port(args.server, args.addr);
196     args.cq = grpc_completion_queue_create_for_next(nullptr);
197     grpc_server_register_completion_queue(args.server, args.cq, nullptr);
198     grpc_server_start(args.server);
199     grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
200     server2.Start();
201 
202     grpc_core::Thread threads[NUM_THREADS];
203     for (auto& th : threads) {
204       th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
205       th.Start();
206     }
207     for (auto& th : threads) {
208       th.Join();
209     }
210     grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
211 
212     server2.Join();
213     grpc_server_destroy(args.server);
214     grpc_completion_queue_destroy(args.cq);
215     gpr_free(args.addr);
216   }
217 
218   {
219     /* Third round, bogus tcp server */
220     gpr_log(GPR_DEBUG, "Wave 3");
221     args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
222     grpc_pollset_init(args.pollset, &args.mu);
223     gpr_event_init(&args.ready);
224     grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
225     server3.Start();
226     gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
227 
228     grpc_core::Thread threads[NUM_THREADS];
229     for (auto& th : threads) {
230       th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
231       th.Start();
232     }
233     for (auto& th : threads) {
234       th.Join();
235     }
236 
237     gpr_atm_rel_store(&args.stop, 1);
238     server3.Join();
239     {
240       grpc_core::ExecCtx exec_ctx;
241       grpc_pollset_shutdown(
242           args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset,
243                                             grpc_schedule_on_exec_ctx));
244     }
245   }
246 
247   grpc_shutdown();
248   return 0;
249 }
250 
watches_with_short_timeouts(void * addr)251 void watches_with_short_timeouts(void* addr) {
252   for (int i = 0; i < NUM_OUTER_LOOPS_SHORT_TIMEOUTS; ++i) {
253     grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
254     grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr),
255                                                       nullptr, nullptr);
256 
257     for (int j = 0; j < NUM_INNER_LOOPS_SHORT_TIMEOUTS; ++j) {
258       gpr_timespec later_time =
259           grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS_SHORT_TIMEOUTS);
260       grpc_connectivity_state state =
261           grpc_channel_check_connectivity_state(chan, 0);
262       GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
263       grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
264                                             nullptr);
265       gpr_timespec poll_time =
266           grpc_timeout_milliseconds_to_deadline(POLL_MILLIS_SHORT_TIMEOUTS);
267       grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr);
268       GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
269       GPR_ASSERT(ev.success == false);
270       /* check that the watcher from "watch state" was free'd */
271       GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
272     }
273     grpc_channel_destroy(chan);
274     grpc_completion_queue_destroy(cq);
275   }
276 }
277 
278 // This test tries to catch deadlock situations.
279 // With short timeouts on "watches" and long timeouts on cq next calls,
280 // so that a QUEUE_TIMEOUT likely means that something is stuck.
run_concurrent_watches_with_short_timeouts_test()281 int run_concurrent_watches_with_short_timeouts_test() {
282   grpc_init();
283 
284   grpc_core::Thread threads[NUM_THREADS];
285 
286   char* localhost = gpr_strdup("localhost:54321");
287 
288   for (auto& th : threads) {
289     th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
290                            localhost);
291     th.Start();
292   }
293   for (auto& th : threads) {
294     th.Join();
295   }
296   gpr_free(localhost);
297 
298   grpc_shutdown();
299   return 0;
300 }
301 
main(int argc,char ** argv)302 int main(int argc, char** argv) {
303   grpc_test_init(argc, argv);
304 
305   run_concurrent_connectivity_test();
306   run_concurrent_watches_with_short_timeouts_test();
307 }
308