1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23 
24 #include "src/core/lib/iomgr/tcp_client.h"
25 
26 #include <errno.h>
27 #include <netinet/in.h>
28 #include <string.h>
29 #include <sys/socket.h>
30 #include <unistd.h>
31 
32 #include <grpc/grpc.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/iomgr/iomgr.h"
38 #include "src/core/lib/iomgr/pollset_set.h"
39 #include "src/core/lib/iomgr/socket_utils_posix.h"
40 #include "src/core/lib/iomgr/timer.h"
41 #include "test/core/util/test_config.h"
42 
43 static grpc_pollset_set* g_pollset_set;
44 static gpr_mu* g_mu;
45 static grpc_pollset* g_pollset;
46 static int g_connections_complete = 0;
47 static grpc_endpoint* g_connecting = nullptr;
48 
test_deadline(void)49 static grpc_millis test_deadline(void) {
50   return grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
51 }
52 
finish_connection()53 static void finish_connection() {
54   gpr_mu_lock(g_mu);
55   g_connections_complete++;
56   grpc_core::ExecCtx exec_ctx;
57   GPR_ASSERT(
58       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
59 
60   gpr_mu_unlock(g_mu);
61 }
62 
must_succeed(void * arg,grpc_error * error)63 static void must_succeed(void* arg, grpc_error* error) {
64   GPR_ASSERT(g_connecting != nullptr);
65   GPR_ASSERT(error == GRPC_ERROR_NONE);
66   grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
67                                            "must_succeed called"));
68   grpc_endpoint_destroy(g_connecting);
69   g_connecting = nullptr;
70   finish_connection();
71 }
72 
must_fail(void * arg,grpc_error * error)73 static void must_fail(void* arg, grpc_error* error) {
74   GPR_ASSERT(g_connecting == nullptr);
75   GPR_ASSERT(error != GRPC_ERROR_NONE);
76   finish_connection();
77 }
78 
test_succeeds(void)79 void test_succeeds(void) {
80   grpc_resolved_address resolved_addr;
81   struct sockaddr_in* addr =
82       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
83   int svr_fd;
84   int r;
85   int connections_complete_before;
86   grpc_closure done;
87   grpc_core::ExecCtx exec_ctx;
88 
89   gpr_log(GPR_DEBUG, "test_succeeds");
90 
91   memset(&resolved_addr, 0, sizeof(resolved_addr));
92   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
93   addr->sin_family = AF_INET;
94 
95   /* create a dummy server */
96   svr_fd = socket(AF_INET, SOCK_STREAM, 0);
97   GPR_ASSERT(svr_fd >= 0);
98   GPR_ASSERT(
99       0 == bind(svr_fd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len));
100   GPR_ASSERT(0 == listen(svr_fd, 1));
101 
102   gpr_mu_lock(g_mu);
103   connections_complete_before = g_connections_complete;
104   gpr_mu_unlock(g_mu);
105 
106   /* connect to it */
107   GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr,
108                          (socklen_t*)&resolved_addr.len) == 0);
109   GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
110   grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr,
111                           &resolved_addr, GRPC_MILLIS_INF_FUTURE);
112 
113   /* await the connection */
114   do {
115     resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
116     r = accept(svr_fd, reinterpret_cast<struct sockaddr*>(addr),
117                reinterpret_cast<socklen_t*>(&resolved_addr.len));
118   } while (r == -1 && errno == EINTR);
119   GPR_ASSERT(r >= 0);
120   close(r);
121 
122   gpr_mu_lock(g_mu);
123 
124   while (g_connections_complete == connections_complete_before) {
125     grpc_pollset_worker* worker = nullptr;
126     GPR_ASSERT(GRPC_LOG_IF_ERROR(
127         "pollset_work",
128         grpc_pollset_work(g_pollset, &worker,
129                           grpc_timespec_to_millis_round_up(
130                               grpc_timeout_seconds_to_deadline(5)))));
131     gpr_mu_unlock(g_mu);
132     grpc_core::ExecCtx::Get()->Flush();
133     gpr_mu_lock(g_mu);
134   }
135 
136   gpr_mu_unlock(g_mu);
137 }
138 
test_fails(void)139 void test_fails(void) {
140   grpc_resolved_address resolved_addr;
141   struct sockaddr_in* addr =
142       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
143   int connections_complete_before;
144   grpc_closure done;
145   grpc_core::ExecCtx exec_ctx;
146 
147   gpr_log(GPR_DEBUG, "test_fails");
148 
149   memset(&resolved_addr, 0, sizeof(resolved_addr));
150   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
151   addr->sin_family = AF_INET;
152 
153   gpr_mu_lock(g_mu);
154   connections_complete_before = g_connections_complete;
155   gpr_mu_unlock(g_mu);
156 
157   /* connect to a broken address */
158   GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
159   grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr,
160                           &resolved_addr, GRPC_MILLIS_INF_FUTURE);
161 
162   gpr_mu_lock(g_mu);
163 
164   /* wait for the connection callback to finish */
165   while (g_connections_complete == connections_complete_before) {
166     grpc_pollset_worker* worker = nullptr;
167     grpc_millis polling_deadline = test_deadline();
168     switch (grpc_timer_check(&polling_deadline)) {
169       case GRPC_TIMERS_FIRED:
170         break;
171       case GRPC_TIMERS_NOT_CHECKED:
172         polling_deadline = 0;
173       /* fall through */
174       case GRPC_TIMERS_CHECKED_AND_EMPTY:
175         GPR_ASSERT(GRPC_LOG_IF_ERROR(
176             "pollset_work",
177             grpc_pollset_work(g_pollset, &worker, polling_deadline)));
178         break;
179     }
180     gpr_mu_unlock(g_mu);
181     grpc_core::ExecCtx::Get()->Flush();
182     gpr_mu_lock(g_mu);
183   }
184 
185   gpr_mu_unlock(g_mu);
186 }
187 
destroy_pollset(void * p,grpc_error * error)188 static void destroy_pollset(void* p, grpc_error* error) {
189   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
190 }
191 
main(int argc,char ** argv)192 int main(int argc, char** argv) {
193   grpc_closure destroyed;
194   grpc_test_init(argc, argv);
195   grpc_init();
196 
197   {
198     grpc_core::ExecCtx exec_ctx;
199     g_pollset_set = grpc_pollset_set_create();
200     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
201     grpc_pollset_init(g_pollset, &g_mu);
202     grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
203 
204     test_succeeds();
205     gpr_log(GPR_ERROR, "End of first test");
206     test_fails();
207     grpc_pollset_set_destroy(g_pollset_set);
208     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
209                       grpc_schedule_on_exec_ctx);
210     grpc_pollset_shutdown(g_pollset, &destroyed);
211   }
212 
213   grpc_shutdown();
214   gpr_free(g_pollset);
215   return 0;
216 }
217 
218 #else /* GRPC_POSIX_SOCKET */
219 
main(int argc,char ** argv)220 int main(int argc, char** argv) { return 1; }
221 
222 #endif /* GRPC_POSIX_SOCKET */
223