1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23
24 #include "src/core/lib/iomgr/tcp_posix.h"
25
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/socket.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/iomgr/buffer_list.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/sockaddr_posix.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "test/core/iomgr/endpoint_tests.h"
44 #include "test/core/util/test_config.h"
45
46 static gpr_mu* g_mu;
47 static grpc_pollset* g_pollset;
48
49 /*
50 General test notes:
51
52 All tests which write data into a socket write i%256 into byte i, which is
53 verified by readers.
54
55 In general there are a few interesting things to vary which may lead to
56 exercising different codepaths in an implementation:
57 1. Total amount of data written to the socket
58 2. Size of slice allocations
59 3. Amount of data we read from or write to the socket at once
60
61 The tests here tend to parameterize these where applicable.
62
63 */
64
create_sockets(int sv[2])65 static void create_sockets(int sv[2]) {
66 int flags;
67 GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
68 flags = fcntl(sv[0], F_GETFL, 0);
69 GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
70 flags = fcntl(sv[1], F_GETFL, 0);
71 GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
72 }
73
create_inet_sockets(int sv[2])74 static void create_inet_sockets(int sv[2]) {
75 /* Prepare listening socket */
76 struct sockaddr_in addr;
77 memset(&addr, 0, sizeof(struct sockaddr_in));
78 addr.sin_family = AF_INET;
79 int sock = socket(AF_INET, SOCK_STREAM, 0);
80 GPR_ASSERT(sock);
81 GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
82 listen(sock, 1);
83
84 /* Prepare client socket and connect to server */
85 socklen_t len = sizeof(sockaddr_in);
86 GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
87
88 int client = socket(AF_INET, SOCK_STREAM, 0);
89 GPR_ASSERT(client);
90 int ret;
91 do {
92 ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
93 } while (ret == -1 && errno == EINTR);
94
95 /* Accept client connection */
96 len = sizeof(socklen_t);
97 int server;
98 do {
99 server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
100 } while (server == -1 && errno == EINTR);
101 GPR_ASSERT(server != -1);
102
103 sv[0] = server;
104 sv[1] = client;
105 int flags = fcntl(sv[0], F_GETFL, 0);
106 GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
107 flags = fcntl(sv[1], F_GETFL, 0);
108 GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
109 }
110
fill_socket(int fd)111 static ssize_t fill_socket(int fd) {
112 ssize_t write_bytes;
113 ssize_t total_bytes = 0;
114 int i;
115 unsigned char buf[256];
116 for (i = 0; i < 256; ++i) {
117 buf[i] = static_cast<uint8_t>(i);
118 }
119 do {
120 write_bytes = write(fd, buf, 256);
121 if (write_bytes > 0) {
122 total_bytes += write_bytes;
123 }
124 } while (write_bytes >= 0 || errno == EINTR);
125 GPR_ASSERT(errno == EAGAIN);
126 return total_bytes;
127 }
128
fill_socket_partial(int fd,size_t bytes)129 static size_t fill_socket_partial(int fd, size_t bytes) {
130 ssize_t write_bytes;
131 size_t total_bytes = 0;
132 unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(bytes));
133 unsigned i;
134 for (i = 0; i < bytes; ++i) {
135 buf[i] = static_cast<uint8_t>(i % 256);
136 }
137
138 do {
139 write_bytes = write(fd, buf, bytes - total_bytes);
140 if (write_bytes > 0) {
141 total_bytes += static_cast<size_t>(write_bytes);
142 }
143 } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
144
145 gpr_free(buf);
146
147 return total_bytes;
148 }
149
150 struct read_socket_state {
151 grpc_endpoint* ep;
152 size_t read_bytes;
153 size_t target_read_bytes;
154 grpc_slice_buffer incoming;
155 grpc_closure read_cb;
156 };
157
count_slices(grpc_slice * slices,size_t nslices,int * current_data)158 static size_t count_slices(grpc_slice* slices, size_t nslices,
159 int* current_data) {
160 size_t num_bytes = 0;
161 unsigned i, j;
162 unsigned char* buf;
163 for (i = 0; i < nslices; ++i) {
164 buf = GRPC_SLICE_START_PTR(slices[i]);
165 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
166 GPR_ASSERT(buf[j] == *current_data);
167 *current_data = (*current_data + 1) % 256;
168 }
169 num_bytes += GRPC_SLICE_LENGTH(slices[i]);
170 }
171 return num_bytes;
172 }
173
read_cb(void * user_data,grpc_error * error)174 static void read_cb(void* user_data, grpc_error* error) {
175 struct read_socket_state* state =
176 static_cast<struct read_socket_state*>(user_data);
177 size_t read_bytes;
178 int current_data;
179
180 GPR_ASSERT(error == GRPC_ERROR_NONE);
181
182 gpr_mu_lock(g_mu);
183 current_data = state->read_bytes % 256;
184 read_bytes = count_slices(state->incoming.slices, state->incoming.count,
185 ¤t_data);
186 state->read_bytes += read_bytes;
187 gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
188 state->target_read_bytes);
189 if (state->read_bytes >= state->target_read_bytes) {
190 GPR_ASSERT(
191 GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
192 gpr_mu_unlock(g_mu);
193 } else {
194 grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
195 gpr_mu_unlock(g_mu);
196 }
197 }
198
199 /* Write to a socket, then read from it using the grpc_tcp API. */
read_test(size_t num_bytes,size_t slice_size)200 static void read_test(size_t num_bytes, size_t slice_size) {
201 int sv[2];
202 grpc_endpoint* ep;
203 struct read_socket_state state;
204 size_t written_bytes;
205 grpc_millis deadline =
206 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
207 grpc_core::ExecCtx exec_ctx;
208
209 gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
210 num_bytes, slice_size);
211
212 create_sockets(sv);
213
214 grpc_arg a[1];
215 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
216 a[0].type = GRPC_ARG_INTEGER,
217 a[0].value.integer = static_cast<int>(slice_size);
218 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
219 ep =
220 grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
221 grpc_endpoint_add_to_pollset(ep, g_pollset);
222
223 written_bytes = fill_socket_partial(sv[0], num_bytes);
224 gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
225
226 state.ep = ep;
227 state.read_bytes = 0;
228 state.target_read_bytes = written_bytes;
229 grpc_slice_buffer_init(&state.incoming);
230 GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
231
232 grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
233
234 gpr_mu_lock(g_mu);
235 while (state.read_bytes < state.target_read_bytes) {
236 grpc_pollset_worker* worker = nullptr;
237 GPR_ASSERT(GRPC_LOG_IF_ERROR(
238 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
239 gpr_mu_unlock(g_mu);
240
241 gpr_mu_lock(g_mu);
242 }
243 GPR_ASSERT(state.read_bytes == state.target_read_bytes);
244 gpr_mu_unlock(g_mu);
245
246 grpc_slice_buffer_destroy_internal(&state.incoming);
247 grpc_endpoint_destroy(ep);
248 }
249
250 /* Write to a socket until it fills up, then read from it using the grpc_tcp
251 API. */
large_read_test(size_t slice_size)252 static void large_read_test(size_t slice_size) {
253 int sv[2];
254 grpc_endpoint* ep;
255 struct read_socket_state state;
256 ssize_t written_bytes;
257 grpc_millis deadline =
258 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
259 grpc_core::ExecCtx exec_ctx;
260
261 gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
262
263 create_sockets(sv);
264
265 grpc_arg a[1];
266 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
267 a[0].type = GRPC_ARG_INTEGER;
268 a[0].value.integer = static_cast<int>(slice_size);
269 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
270 ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
271 "test");
272 grpc_endpoint_add_to_pollset(ep, g_pollset);
273
274 written_bytes = fill_socket(sv[0]);
275 gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
276
277 state.ep = ep;
278 state.read_bytes = 0;
279 state.target_read_bytes = static_cast<size_t>(written_bytes);
280 grpc_slice_buffer_init(&state.incoming);
281 GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
282
283 grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
284
285 gpr_mu_lock(g_mu);
286 while (state.read_bytes < state.target_read_bytes) {
287 grpc_pollset_worker* worker = nullptr;
288 GPR_ASSERT(GRPC_LOG_IF_ERROR(
289 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
290 gpr_mu_unlock(g_mu);
291
292 gpr_mu_lock(g_mu);
293 }
294 GPR_ASSERT(state.read_bytes == state.target_read_bytes);
295 gpr_mu_unlock(g_mu);
296
297 grpc_slice_buffer_destroy_internal(&state.incoming);
298 grpc_endpoint_destroy(ep);
299 }
300
301 struct write_socket_state {
302 grpc_endpoint* ep;
303 int write_done;
304 };
305
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)306 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
307 size_t* num_blocks, uint8_t* current_data) {
308 size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
309 grpc_slice* slices =
310 static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
311 size_t num_bytes_left = num_bytes;
312 unsigned i, j;
313 unsigned char* buf;
314 *num_blocks = nslices;
315
316 for (i = 0; i < nslices; ++i) {
317 slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
318 : slice_size);
319 num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
320 buf = GRPC_SLICE_START_PTR(slices[i]);
321 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
322 buf[j] = *current_data;
323 (*current_data)++;
324 }
325 }
326 GPR_ASSERT(num_bytes_left == 0);
327 return slices;
328 }
329
write_done(void * user_data,grpc_error * error)330 static void write_done(void* user_data /* write_socket_state */,
331 grpc_error* error) {
332 GPR_ASSERT(error == GRPC_ERROR_NONE);
333 struct write_socket_state* state =
334 static_cast<struct write_socket_state*>(user_data);
335 gpr_mu_lock(g_mu);
336 state->write_done = 1;
337 GPR_ASSERT(
338 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
339 gpr_mu_unlock(g_mu);
340 }
341
drain_socket_blocking(int fd,size_t num_bytes,size_t read_size)342 void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
343 unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(read_size));
344 ssize_t bytes_read;
345 size_t bytes_left = num_bytes;
346 int flags;
347 int current = 0;
348 int i;
349 grpc_core::ExecCtx exec_ctx;
350
351 flags = fcntl(fd, F_GETFL, 0);
352 GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
353
354 for (;;) {
355 grpc_pollset_worker* worker = nullptr;
356 gpr_mu_lock(g_mu);
357 GPR_ASSERT(GRPC_LOG_IF_ERROR(
358 "pollset_work",
359 grpc_pollset_work(g_pollset, &worker,
360 grpc_timespec_to_millis_round_up(
361 grpc_timeout_milliseconds_to_deadline(10)))));
362 gpr_mu_unlock(g_mu);
363
364 do {
365 bytes_read =
366 read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
367 } while (bytes_read < 0 && errno == EINTR);
368 GPR_ASSERT(bytes_read >= 0);
369 for (i = 0; i < bytes_read; ++i) {
370 GPR_ASSERT(buf[i] == current);
371 current = (current + 1) % 256;
372 }
373 bytes_left -= static_cast<size_t>(bytes_read);
374 if (bytes_left == 0) break;
375 }
376 flags = fcntl(fd, F_GETFL, 0);
377 GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
378
379 gpr_free(buf);
380 }
381
382 /* Verifier for timestamps callback for write_test */
timestamps_verifier(void * arg,grpc_core::Timestamps * ts,grpc_error * error)383 void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
384 grpc_error* error) {
385 GPR_ASSERT(error == GRPC_ERROR_NONE);
386 GPR_ASSERT(arg != nullptr);
387 GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
388 GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
389 GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
390 gpr_atm* done_timestamps = (gpr_atm*)arg;
391 gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
392 }
393
394 /* Write to a socket using the grpc_tcp API, then drain it directly.
395 Note that if the write does not complete immediately we need to drain the
396 socket in parallel with the read. If collect_timestamps is true, it will
397 try to get timestamps for the write. */
write_test(size_t num_bytes,size_t slice_size,bool collect_timestamps)398 static void write_test(size_t num_bytes, size_t slice_size,
399 bool collect_timestamps) {
400 int sv[2];
401 grpc_endpoint* ep;
402 struct write_socket_state state;
403 size_t num_blocks;
404 grpc_slice* slices;
405 uint8_t current_data = 0;
406 grpc_slice_buffer outgoing;
407 grpc_closure write_done_closure;
408 grpc_millis deadline =
409 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
410 grpc_core::ExecCtx exec_ctx;
411
412 if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
413 return;
414 }
415
416 gpr_log(GPR_INFO,
417 "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
418 num_bytes, slice_size);
419
420 if (collect_timestamps) {
421 create_inet_sockets(sv);
422 } else {
423 create_sockets(sv);
424 }
425
426 grpc_arg a[1];
427 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
428 a[0].type = GRPC_ARG_INTEGER,
429 a[0].value.integer = static_cast<int>(slice_size);
430 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
431 ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
432 &args, "test");
433 grpc_endpoint_add_to_pollset(ep, g_pollset);
434
435 state.ep = ep;
436 state.write_done = 0;
437
438 slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data);
439
440 grpc_slice_buffer_init(&outgoing);
441 grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
442 GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
443 grpc_schedule_on_exec_ctx);
444
445 gpr_atm done_timestamps;
446 gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
447 grpc_endpoint_write(ep, &outgoing, &write_done_closure,
448 grpc_event_engine_can_track_errors() && collect_timestamps
449 ? (void*)&done_timestamps
450 : nullptr);
451 drain_socket_blocking(sv[0], num_bytes, num_bytes);
452 exec_ctx.Flush();
453 gpr_mu_lock(g_mu);
454 for (;;) {
455 grpc_pollset_worker* worker = nullptr;
456 if (state.write_done &&
457 (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
458 gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
459 break;
460 }
461 GPR_ASSERT(GRPC_LOG_IF_ERROR(
462 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
463 gpr_mu_unlock(g_mu);
464 exec_ctx.Flush();
465 gpr_mu_lock(g_mu);
466 }
467 gpr_mu_unlock(g_mu);
468
469 grpc_slice_buffer_destroy_internal(&outgoing);
470 grpc_endpoint_destroy(ep);
471 gpr_free(slices);
472 }
473
on_fd_released(void * arg,grpc_error * errors)474 void on_fd_released(void* arg, grpc_error* errors) {
475 int* done = static_cast<int*>(arg);
476 *done = 1;
477 GPR_ASSERT(
478 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
479 }
480
481 /* Do a read_test, then release fd and try to read/write again. Verify that
482 grpc_tcp_fd() is available before the fd is released. */
release_fd_test(size_t num_bytes,size_t slice_size)483 static void release_fd_test(size_t num_bytes, size_t slice_size) {
484 int sv[2];
485 grpc_endpoint* ep;
486 struct read_socket_state state;
487 size_t written_bytes;
488 int fd;
489 grpc_millis deadline =
490 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
491 grpc_core::ExecCtx exec_ctx;
492 grpc_closure fd_released_cb;
493 int fd_released_done = 0;
494 GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
495 grpc_schedule_on_exec_ctx);
496
497 gpr_log(GPR_INFO,
498 "Release fd read_test of size %" PRIuPTR ", slice size %" PRIuPTR,
499 num_bytes, slice_size);
500
501 create_sockets(sv);
502
503 grpc_arg a[1];
504 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
505 a[0].type = GRPC_ARG_INTEGER;
506 a[0].value.integer = static_cast<int>(slice_size);
507 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
508 ep =
509 grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
510 GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
511 grpc_endpoint_add_to_pollset(ep, g_pollset);
512
513 written_bytes = fill_socket_partial(sv[0], num_bytes);
514 gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
515
516 state.ep = ep;
517 state.read_bytes = 0;
518 state.target_read_bytes = written_bytes;
519 grpc_slice_buffer_init(&state.incoming);
520 GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
521
522 grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
523
524 gpr_mu_lock(g_mu);
525 while (state.read_bytes < state.target_read_bytes) {
526 grpc_pollset_worker* worker = nullptr;
527 GPR_ASSERT(GRPC_LOG_IF_ERROR(
528 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
529 gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
530 state.read_bytes, state.target_read_bytes);
531 gpr_mu_unlock(g_mu);
532 grpc_core::ExecCtx::Get()->Flush();
533 gpr_mu_lock(g_mu);
534 }
535 GPR_ASSERT(state.read_bytes == state.target_read_bytes);
536 gpr_mu_unlock(g_mu);
537
538 grpc_slice_buffer_destroy_internal(&state.incoming);
539 grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
540 grpc_core::ExecCtx::Get()->Flush();
541 gpr_mu_lock(g_mu);
542 while (!fd_released_done) {
543 grpc_pollset_worker* worker = nullptr;
544 GPR_ASSERT(GRPC_LOG_IF_ERROR(
545 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
546 gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
547 }
548 gpr_mu_unlock(g_mu);
549 GPR_ASSERT(fd_released_done == 1);
550 GPR_ASSERT(fd == sv[1]);
551
552 written_bytes = fill_socket_partial(sv[0], num_bytes);
553 drain_socket_blocking(fd, written_bytes, written_bytes);
554 written_bytes = fill_socket_partial(fd, num_bytes);
555 drain_socket_blocking(sv[0], written_bytes, written_bytes);
556 close(fd);
557 }
558
run_tests(void)559 void run_tests(void) {
560 size_t i = 0;
561
562 read_test(100, 8192);
563 read_test(10000, 8192);
564 read_test(10000, 137);
565 read_test(10000, 1);
566 large_read_test(8192);
567 large_read_test(1);
568
569 write_test(100, 8192, false);
570 write_test(100, 1, false);
571 write_test(100000, 8192, false);
572 write_test(100000, 1, false);
573 write_test(100000, 137, false);
574
575 write_test(100, 8192, true);
576 write_test(100, 1, true);
577 write_test(100000, 8192, true);
578 write_test(100000, 1, true);
579 write_test(100, 137, true);
580
581 for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
582 write_test(40320, i, false);
583 write_test(40320, i, true);
584 }
585
586 release_fd_test(100, 8192);
587 }
588
clean_up(void)589 static void clean_up(void) {}
590
create_fixture_tcp_socketpair(size_t slice_size)591 static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
592 size_t slice_size) {
593 int sv[2];
594 grpc_endpoint_test_fixture f;
595 grpc_core::ExecCtx exec_ctx;
596
597 create_sockets(sv);
598 grpc_resource_quota* resource_quota =
599 grpc_resource_quota_create("tcp_posix_test_socketpair");
600 grpc_arg a[1];
601 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
602 a[0].type = GRPC_ARG_INTEGER;
603 a[0].value.integer = static_cast<int>(slice_size);
604 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
605 f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
606 &args, "test");
607 f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
608 &args, "test");
609 grpc_resource_quota_unref_internal(resource_quota);
610 grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
611 grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
612
613 return f;
614 }
615
616 static grpc_endpoint_test_config configs[] = {
617 {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
618 };
619
destroy_pollset(void * p,grpc_error * error)620 static void destroy_pollset(void* p, grpc_error* error) {
621 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
622 }
623
main(int argc,char ** argv)624 int main(int argc, char** argv) {
625 grpc_closure destroyed;
626 grpc_test_init(argc, argv);
627 grpc_init();
628 grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
629 {
630 grpc_core::ExecCtx exec_ctx;
631 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
632 grpc_pollset_init(g_pollset, &g_mu);
633 grpc_endpoint_tests(configs[0], g_pollset, g_mu);
634 run_tests();
635 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
636 grpc_schedule_on_exec_ctx);
637 grpc_pollset_shutdown(g_pollset, &destroyed);
638
639 grpc_core::ExecCtx::Get()->Flush();
640 }
641 grpc_shutdown();
642 gpr_free(g_pollset);
643
644 return 0;
645 }
646
647 #else /* GRPC_POSIX_SOCKET */
648
main(int argc,char ** argv)649 int main(int argc, char** argv) { return 1; }
650
651 #endif /* GRPC_POSIX_SOCKET */
652