1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include "src/core/lib/iomgr/port.h"
19
20 /* This test only relevant on linux systems where epoll() is available */
21 #ifdef GRPC_LINUX_EPOLL_CREATE1
22 #include "src/core/lib/iomgr/ev_epollsig_linux.h"
23 #include "src/core/lib/iomgr/ev_posix.h"
24
25 #include <errno.h>
26 #include <string.h>
27 #include <unistd.h>
28
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/gprpp/thd.h"
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "test/core/util/test_config.h"
37
38 typedef struct test_pollset {
39 grpc_pollset* pollset;
40 gpr_mu* mu;
41 } test_pollset;
42
43 typedef struct test_fd {
44 int inner_fd;
45 grpc_fd* fd;
46 } test_fd;
47
48 /* num_fds should be an even number */
test_fd_init(test_fd * tfds,int * fds,int num_fds)49 static void test_fd_init(test_fd* tfds, int* fds, int num_fds) {
50 int i;
51 int r;
52
53 /* Create some dummy file descriptors. Currently using pipe file descriptors
54 * for this test but we could use any other type of file descriptors. Also,
55 * since pipe() used in this test creates two fds in each call, num_fds should
56 * be an even number */
57 GPR_ASSERT((num_fds % 2) == 0);
58 for (i = 0; i < num_fds; i = i + 2) {
59 r = pipe(fds + i);
60 if (r != 0) {
61 gpr_log(GPR_ERROR, "Error in creating pipe. %d (%s)", errno,
62 strerror(errno));
63 return;
64 }
65 }
66
67 for (i = 0; i < num_fds; i++) {
68 tfds[i].inner_fd = fds[i];
69 tfds[i].fd = grpc_fd_create(fds[i], "test_fd", false);
70 }
71 }
72
test_fd_cleanup(test_fd * tfds,int num_fds)73 static void test_fd_cleanup(test_fd* tfds, int num_fds) {
74 int release_fd;
75 int i;
76
77 for (i = 0; i < num_fds; i++) {
78 grpc_fd_shutdown(tfds[i].fd,
79 GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
80 grpc_core::ExecCtx::Get()->Flush();
81
82 grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
83 grpc_core::ExecCtx::Get()->Flush();
84
85 GPR_ASSERT(release_fd == tfds[i].inner_fd);
86 close(tfds[i].inner_fd);
87 }
88 }
89
test_pollset_init(test_pollset * pollsets,int num_pollsets)90 static void test_pollset_init(test_pollset* pollsets, int num_pollsets) {
91 int i;
92 for (i = 0; i < num_pollsets; i++) {
93 pollsets[i].pollset =
94 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
95 grpc_pollset_init(pollsets[i].pollset, &pollsets[i].mu);
96 }
97 }
98
destroy_pollset(void * p,grpc_error * error)99 static void destroy_pollset(void* p, grpc_error* error) {
100 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
101 }
102
test_pollset_cleanup(test_pollset * pollsets,int num_pollsets)103 static void test_pollset_cleanup(test_pollset* pollsets, int num_pollsets) {
104 grpc_closure destroyed;
105 int i;
106
107 for (i = 0; i < num_pollsets; i++) {
108 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].pollset,
109 grpc_schedule_on_exec_ctx);
110 grpc_pollset_shutdown(pollsets[i].pollset, &destroyed);
111
112 grpc_core::ExecCtx::Get()->Flush();
113 gpr_free(pollsets[i].pollset);
114 }
115 }
116
117 /*
118 * Cases to test:
119 * case 1) Polling islands of both fd and pollset are NULL
120 * case 2) Polling island of fd is NULL but that of pollset is not-NULL
121 * case 3) Polling island of fd is not-NULL but that of pollset is NULL
122 * case 4) Polling islands of both fd and pollset are not-NULL and:
123 * case 4.1) Polling islands of fd and pollset are equal
124 * case 4.2) Polling islands of fd and pollset are NOT-equal (This results
125 * in a merge)
126 * */
127
128 #define NUM_FDS 8
129 #define NUM_POLLSETS 4
130
test_add_fd_to_pollset()131 static void test_add_fd_to_pollset() {
132 grpc_core::ExecCtx exec_ctx;
133 test_fd tfds[NUM_FDS];
134 int fds[NUM_FDS];
135 test_pollset pollsets[NUM_POLLSETS];
136 void* expected_pi = nullptr;
137 int i;
138
139 test_fd_init(tfds, fds, NUM_FDS);
140 test_pollset_init(pollsets, NUM_POLLSETS);
141
142 /*Step 1.
143 * Create three polling islands (This will exercise test case 1 and 2) with
144 * the following configuration:
145 * polling island 0 = { fds:0,1,2, pollsets:0}
146 * polling island 1 = { fds:3,4, pollsets:1}
147 * polling island 2 = { fds:5,6,7 pollsets:2}
148 *
149 *Step 2.
150 * Add pollset 3 to polling island 0 (by adding fds 0 and 1 to pollset 3)
151 * (This will exercise test cases 3 and 4.1). The configuration becomes:
152 * polling island 0 = { fds:0,1,2, pollsets:0,3} <<< pollset 3 added here
153 * polling island 1 = { fds:3,4, pollsets:1}
154 * polling island 2 = { fds:5,6,7 pollsets:2}
155 *
156 *Step 3.
157 * Merge polling islands 0 and 1 by adding fd 0 to pollset 1 (This will
158 * exercise test case 4.2). The configuration becomes:
159 * polling island (merged) = {fds: 0,1,2,3,4, pollsets: 0,1,3}
160 * polling island 2 = {fds: 5,6,7 pollsets: 2}
161 *
162 *Step 4.
163 * Finally do one more merge by adding fd 3 to pollset 2.
164 * polling island (merged) = {fds: 0,1,2,3,4,5,6,7, pollsets: 0,1,2,3}
165 */
166
167 /* == Step 1 == */
168 for (i = 0; i <= 2; i++) {
169 grpc_pollset_add_fd(pollsets[0].pollset, tfds[i].fd);
170 grpc_core::ExecCtx::Get()->Flush();
171 }
172
173 for (i = 3; i <= 4; i++) {
174 grpc_pollset_add_fd(pollsets[1].pollset, tfds[i].fd);
175 grpc_core::ExecCtx::Get()->Flush();
176 }
177
178 for (i = 5; i <= 7; i++) {
179 grpc_pollset_add_fd(pollsets[2].pollset, tfds[i].fd);
180 grpc_core::ExecCtx::Get()->Flush();
181 }
182
183 /* == Step 2 == */
184 for (i = 0; i <= 1; i++) {
185 grpc_pollset_add_fd(pollsets[3].pollset, tfds[i].fd);
186 grpc_core::ExecCtx::Get()->Flush();
187 }
188
189 /* == Step 3 == */
190 grpc_pollset_add_fd(pollsets[1].pollset, tfds[0].fd);
191 grpc_core::ExecCtx::Get()->Flush();
192
193 /* == Step 4 == */
194 grpc_pollset_add_fd(pollsets[2].pollset, tfds[3].fd);
195 grpc_core::ExecCtx::Get()->Flush();
196
197 /* All polling islands are merged at this point */
198
199 /* Compare Fd:0's polling island with that of all other Fds */
200 expected_pi = grpc_fd_get_polling_island(tfds[0].fd);
201 for (i = 1; i < NUM_FDS; i++) {
202 GPR_ASSERT(grpc_are_polling_islands_equal(
203 expected_pi, grpc_fd_get_polling_island(tfds[i].fd)));
204 }
205
206 /* Compare Fd:0's polling island with that of all other pollsets */
207 for (i = 0; i < NUM_POLLSETS; i++) {
208 GPR_ASSERT(grpc_are_polling_islands_equal(
209 expected_pi, grpc_pollset_get_polling_island(pollsets[i].pollset)));
210 }
211
212 test_fd_cleanup(tfds, NUM_FDS);
213 test_pollset_cleanup(pollsets, NUM_POLLSETS);
214 }
215
216 #undef NUM_FDS
217 #undef NUM_POLLSETS
218
219 typedef struct threading_shared {
220 gpr_mu* mu;
221 grpc_pollset* pollset;
222 grpc_wakeup_fd* wakeup_fd;
223 grpc_fd* wakeup_desc;
224 grpc_closure on_wakeup;
225 int wakeups;
226 } threading_shared;
227
228 static __thread int thread_wakeups = 0;
229
test_threading_loop(void * arg)230 static void test_threading_loop(void* arg) {
231 threading_shared* shared = static_cast<threading_shared*>(arg);
232 while (thread_wakeups < 1000000) {
233 grpc_core::ExecCtx exec_ctx;
234 grpc_pollset_worker* worker;
235 gpr_mu_lock(shared->mu);
236 GPR_ASSERT(GRPC_LOG_IF_ERROR(
237 "pollset_work",
238 grpc_pollset_work(shared->pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
239 gpr_mu_unlock(shared->mu);
240 }
241 }
242
test_threading_wakeup(void * arg,grpc_error * error)243 static void test_threading_wakeup(void* arg, grpc_error* error) {
244 threading_shared* shared = static_cast<threading_shared*>(arg);
245 ++shared->wakeups;
246 ++thread_wakeups;
247 if (error == GRPC_ERROR_NONE) {
248 GPR_ASSERT(GRPC_LOG_IF_ERROR(
249 "consume_wakeup", grpc_wakeup_fd_consume_wakeup(shared->wakeup_fd)));
250 grpc_fd_notify_on_read(shared->wakeup_desc, &shared->on_wakeup);
251 GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_next",
252 grpc_wakeup_fd_wakeup(shared->wakeup_fd)));
253 }
254 }
255
test_threading(void)256 static void test_threading(void) {
257 threading_shared shared;
258 shared.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
259 grpc_pollset_init(shared.pollset, &shared.mu);
260
261 grpc_core::Thread thds[10];
262 for (auto& th : thds) {
263 th = grpc_core::Thread("test_thread", test_threading_loop, &shared);
264 th.Start();
265 }
266 grpc_wakeup_fd fd;
267 GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd)));
268 shared.wakeup_fd = &fd;
269 shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup", false);
270 shared.wakeups = 0;
271 {
272 grpc_core::ExecCtx exec_ctx;
273 grpc_pollset_add_fd(shared.pollset, shared.wakeup_desc);
274 grpc_fd_notify_on_read(
275 shared.wakeup_desc,
276 GRPC_CLOSURE_INIT(&shared.on_wakeup, test_threading_wakeup, &shared,
277 grpc_schedule_on_exec_ctx));
278 }
279 GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_first",
280 grpc_wakeup_fd_wakeup(shared.wakeup_fd)));
281 for (auto& th : thds) {
282 th.Join();
283 }
284 fd.read_fd = 0;
285 grpc_wakeup_fd_destroy(&fd);
286 {
287 grpc_core::ExecCtx exec_ctx;
288 grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED);
289 grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, "done");
290 grpc_pollset_shutdown(shared.pollset,
291 GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
292 grpc_schedule_on_exec_ctx));
293 }
294 gpr_free(shared.pollset);
295 }
296
main(int argc,char ** argv)297 int main(int argc, char** argv) {
298 const char* poll_strategy = nullptr;
299 grpc_test_init(argc, argv);
300 grpc_init();
301 {
302 grpc_core::ExecCtx exec_ctx;
303
304 poll_strategy = grpc_get_poll_strategy_name();
305 if (poll_strategy != nullptr && strcmp(poll_strategy, "epollsig") == 0) {
306 test_add_fd_to_pollset();
307 test_threading();
308 } else {
309 gpr_log(GPR_INFO,
310 "Skipping the test. The test is only relevant for 'epollsig' "
311 "strategy. and the current strategy is: '%s'",
312 poll_strategy);
313 }
314 }
315
316 grpc_shutdown();
317 return 0;
318 }
319 #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
main(int argc,char ** argv)320 int main(int argc, char** argv) { return 0; }
321 #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
322