1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23 
24 #include "src/core/lib/iomgr/tcp_posix.h"
25 
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/socket.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37 
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/iomgr/buffer_list.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/sockaddr_posix.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "test/core/iomgr/endpoint_tests.h"
44 #include "test/core/util/test_config.h"
45 
46 static gpr_mu* g_mu;
47 static grpc_pollset* g_pollset;
48 
49 /*
50    General test notes:
51 
52    All tests which write data into a socket write i%256 into byte i, which is
53    verified by readers.
54 
55    In general there are a few interesting things to vary which may lead to
56    exercising different codepaths in an implementation:
57    1. Total amount of data written to the socket
58    2. Size of slice allocations
59    3. Amount of data we read from or write to the socket at once
60 
61    The tests here tend to parameterize these where applicable.
62 
63  */
64 
create_sockets(int sv[2])65 static void create_sockets(int sv[2]) {
66   int flags;
67   GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
68   flags = fcntl(sv[0], F_GETFL, 0);
69   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
70   flags = fcntl(sv[1], F_GETFL, 0);
71   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
72 }
73 
create_inet_sockets(int sv[2])74 static void create_inet_sockets(int sv[2]) {
75   /* Prepare listening socket */
76   struct sockaddr_in addr;
77   memset(&addr, 0, sizeof(struct sockaddr_in));
78   addr.sin_family = AF_INET;
79   int sock = socket(AF_INET, SOCK_STREAM, 0);
80   GPR_ASSERT(sock);
81   GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
82   listen(sock, 1);
83 
84   /* Prepare client socket and connect to server */
85   socklen_t len = sizeof(sockaddr_in);
86   GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
87 
88   int client = socket(AF_INET, SOCK_STREAM, 0);
89   GPR_ASSERT(client);
90   int ret;
91   do {
92     ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
93   } while (ret == -1 && errno == EINTR);
94 
95   /* Accept client connection */
96   len = sizeof(socklen_t);
97   int server;
98   do {
99     server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
100   } while (server == -1 && errno == EINTR);
101   GPR_ASSERT(server != -1);
102 
103   sv[0] = server;
104   sv[1] = client;
105   int flags = fcntl(sv[0], F_GETFL, 0);
106   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
107   flags = fcntl(sv[1], F_GETFL, 0);
108   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
109 }
110 
fill_socket(int fd)111 static ssize_t fill_socket(int fd) {
112   ssize_t write_bytes;
113   ssize_t total_bytes = 0;
114   int i;
115   unsigned char buf[256];
116   for (i = 0; i < 256; ++i) {
117     buf[i] = static_cast<uint8_t>(i);
118   }
119   do {
120     write_bytes = write(fd, buf, 256);
121     if (write_bytes > 0) {
122       total_bytes += write_bytes;
123     }
124   } while (write_bytes >= 0 || errno == EINTR);
125   GPR_ASSERT(errno == EAGAIN);
126   return total_bytes;
127 }
128 
fill_socket_partial(int fd,size_t bytes)129 static size_t fill_socket_partial(int fd, size_t bytes) {
130   ssize_t write_bytes;
131   size_t total_bytes = 0;
132   unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(bytes));
133   unsigned i;
134   for (i = 0; i < bytes; ++i) {
135     buf[i] = static_cast<uint8_t>(i % 256);
136   }
137 
138   do {
139     write_bytes = write(fd, buf, bytes - total_bytes);
140     if (write_bytes > 0) {
141       total_bytes += static_cast<size_t>(write_bytes);
142     }
143   } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
144 
145   gpr_free(buf);
146 
147   return total_bytes;
148 }
149 
150 struct read_socket_state {
151   grpc_endpoint* ep;
152   size_t read_bytes;
153   size_t target_read_bytes;
154   grpc_slice_buffer incoming;
155   grpc_closure read_cb;
156 };
157 
count_slices(grpc_slice * slices,size_t nslices,int * current_data)158 static size_t count_slices(grpc_slice* slices, size_t nslices,
159                            int* current_data) {
160   size_t num_bytes = 0;
161   unsigned i, j;
162   unsigned char* buf;
163   for (i = 0; i < nslices; ++i) {
164     buf = GRPC_SLICE_START_PTR(slices[i]);
165     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
166       GPR_ASSERT(buf[j] == *current_data);
167       *current_data = (*current_data + 1) % 256;
168     }
169     num_bytes += GRPC_SLICE_LENGTH(slices[i]);
170   }
171   return num_bytes;
172 }
173 
read_cb(void * user_data,grpc_error * error)174 static void read_cb(void* user_data, grpc_error* error) {
175   struct read_socket_state* state =
176       static_cast<struct read_socket_state*>(user_data);
177   size_t read_bytes;
178   int current_data;
179 
180   GPR_ASSERT(error == GRPC_ERROR_NONE);
181 
182   gpr_mu_lock(g_mu);
183   current_data = state->read_bytes % 256;
184   read_bytes = count_slices(state->incoming.slices, state->incoming.count,
185                             &current_data);
186   state->read_bytes += read_bytes;
187   gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
188           state->target_read_bytes);
189   if (state->read_bytes >= state->target_read_bytes) {
190     GPR_ASSERT(
191         GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
192     gpr_mu_unlock(g_mu);
193   } else {
194     grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
195     gpr_mu_unlock(g_mu);
196   }
197 }
198 
199 /* Write to a socket, then read from it using the grpc_tcp API. */
read_test(size_t num_bytes,size_t slice_size)200 static void read_test(size_t num_bytes, size_t slice_size) {
201   int sv[2];
202   grpc_endpoint* ep;
203   struct read_socket_state state;
204   size_t written_bytes;
205   grpc_millis deadline =
206       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
207   grpc_core::ExecCtx exec_ctx;
208 
209   gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
210           num_bytes, slice_size);
211 
212   create_sockets(sv);
213 
214   grpc_arg a[1];
215   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
216   a[0].type = GRPC_ARG_INTEGER,
217   a[0].value.integer = static_cast<int>(slice_size);
218   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
219   ep =
220       grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
221   grpc_endpoint_add_to_pollset(ep, g_pollset);
222 
223   written_bytes = fill_socket_partial(sv[0], num_bytes);
224   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
225 
226   state.ep = ep;
227   state.read_bytes = 0;
228   state.target_read_bytes = written_bytes;
229   grpc_slice_buffer_init(&state.incoming);
230   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
231 
232   grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
233 
234   gpr_mu_lock(g_mu);
235   while (state.read_bytes < state.target_read_bytes) {
236     grpc_pollset_worker* worker = nullptr;
237     GPR_ASSERT(GRPC_LOG_IF_ERROR(
238         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
239     gpr_mu_unlock(g_mu);
240 
241     gpr_mu_lock(g_mu);
242   }
243   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
244   gpr_mu_unlock(g_mu);
245 
246   grpc_slice_buffer_destroy_internal(&state.incoming);
247   grpc_endpoint_destroy(ep);
248 }
249 
250 /* Write to a socket until it fills up, then read from it using the grpc_tcp
251    API. */
large_read_test(size_t slice_size)252 static void large_read_test(size_t slice_size) {
253   int sv[2];
254   grpc_endpoint* ep;
255   struct read_socket_state state;
256   ssize_t written_bytes;
257   grpc_millis deadline =
258       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
259   grpc_core::ExecCtx exec_ctx;
260 
261   gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
262 
263   create_sockets(sv);
264 
265   grpc_arg a[1];
266   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
267   a[0].type = GRPC_ARG_INTEGER;
268   a[0].value.integer = static_cast<int>(slice_size);
269   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
270   ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
271                        "test");
272   grpc_endpoint_add_to_pollset(ep, g_pollset);
273 
274   written_bytes = fill_socket(sv[0]);
275   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
276 
277   state.ep = ep;
278   state.read_bytes = 0;
279   state.target_read_bytes = static_cast<size_t>(written_bytes);
280   grpc_slice_buffer_init(&state.incoming);
281   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
282 
283   grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
284 
285   gpr_mu_lock(g_mu);
286   while (state.read_bytes < state.target_read_bytes) {
287     grpc_pollset_worker* worker = nullptr;
288     GPR_ASSERT(GRPC_LOG_IF_ERROR(
289         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
290     gpr_mu_unlock(g_mu);
291 
292     gpr_mu_lock(g_mu);
293   }
294   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
295   gpr_mu_unlock(g_mu);
296 
297   grpc_slice_buffer_destroy_internal(&state.incoming);
298   grpc_endpoint_destroy(ep);
299 }
300 
301 struct write_socket_state {
302   grpc_endpoint* ep;
303   int write_done;
304 };
305 
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)306 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
307                                    size_t* num_blocks, uint8_t* current_data) {
308   size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
309   grpc_slice* slices =
310       static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
311   size_t num_bytes_left = num_bytes;
312   unsigned i, j;
313   unsigned char* buf;
314   *num_blocks = nslices;
315 
316   for (i = 0; i < nslices; ++i) {
317     slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
318                                                               : slice_size);
319     num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
320     buf = GRPC_SLICE_START_PTR(slices[i]);
321     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
322       buf[j] = *current_data;
323       (*current_data)++;
324     }
325   }
326   GPR_ASSERT(num_bytes_left == 0);
327   return slices;
328 }
329 
write_done(void * user_data,grpc_error * error)330 static void write_done(void* user_data /* write_socket_state */,
331                        grpc_error* error) {
332   GPR_ASSERT(error == GRPC_ERROR_NONE);
333   struct write_socket_state* state =
334       static_cast<struct write_socket_state*>(user_data);
335   gpr_mu_lock(g_mu);
336   state->write_done = 1;
337   GPR_ASSERT(
338       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
339   gpr_mu_unlock(g_mu);
340 }
341 
drain_socket_blocking(int fd,size_t num_bytes,size_t read_size)342 void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
343   unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(read_size));
344   ssize_t bytes_read;
345   size_t bytes_left = num_bytes;
346   int flags;
347   int current = 0;
348   int i;
349   grpc_core::ExecCtx exec_ctx;
350 
351   flags = fcntl(fd, F_GETFL, 0);
352   GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
353 
354   for (;;) {
355     grpc_pollset_worker* worker = nullptr;
356     gpr_mu_lock(g_mu);
357     GPR_ASSERT(GRPC_LOG_IF_ERROR(
358         "pollset_work",
359         grpc_pollset_work(g_pollset, &worker,
360                           grpc_timespec_to_millis_round_up(
361                               grpc_timeout_milliseconds_to_deadline(10)))));
362     gpr_mu_unlock(g_mu);
363 
364     do {
365       bytes_read =
366           read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
367     } while (bytes_read < 0 && errno == EINTR);
368     GPR_ASSERT(bytes_read >= 0);
369     for (i = 0; i < bytes_read; ++i) {
370       GPR_ASSERT(buf[i] == current);
371       current = (current + 1) % 256;
372     }
373     bytes_left -= static_cast<size_t>(bytes_read);
374     if (bytes_left == 0) break;
375   }
376   flags = fcntl(fd, F_GETFL, 0);
377   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
378 
379   gpr_free(buf);
380 }
381 
382 /* Verifier for timestamps callback for write_test */
timestamps_verifier(void * arg,grpc_core::Timestamps * ts,grpc_error * error)383 void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
384                          grpc_error* error) {
385   GPR_ASSERT(error == GRPC_ERROR_NONE);
386   GPR_ASSERT(arg != nullptr);
387   GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
388   GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
389   GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
390   gpr_atm* done_timestamps = (gpr_atm*)arg;
391   gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
392 }
393 
394 /* Write to a socket using the grpc_tcp API, then drain it directly.
395    Note that if the write does not complete immediately we need to drain the
396    socket in parallel with the read. If collect_timestamps is true, it will
397    try to get timestamps for the write. */
write_test(size_t num_bytes,size_t slice_size,bool collect_timestamps)398 static void write_test(size_t num_bytes, size_t slice_size,
399                        bool collect_timestamps) {
400   int sv[2];
401   grpc_endpoint* ep;
402   struct write_socket_state state;
403   size_t num_blocks;
404   grpc_slice* slices;
405   uint8_t current_data = 0;
406   grpc_slice_buffer outgoing;
407   grpc_closure write_done_closure;
408   grpc_millis deadline =
409       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
410   grpc_core::ExecCtx exec_ctx;
411 
412   if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
413     return;
414   }
415 
416   gpr_log(GPR_INFO,
417           "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
418           num_bytes, slice_size);
419 
420   if (collect_timestamps) {
421     create_inet_sockets(sv);
422   } else {
423     create_sockets(sv);
424   }
425 
426   grpc_arg a[1];
427   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
428   a[0].type = GRPC_ARG_INTEGER,
429   a[0].value.integer = static_cast<int>(slice_size);
430   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
431   ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
432                        &args, "test");
433   grpc_endpoint_add_to_pollset(ep, g_pollset);
434 
435   state.ep = ep;
436   state.write_done = 0;
437 
438   slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
439 
440   grpc_slice_buffer_init(&outgoing);
441   grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
442   GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
443                     grpc_schedule_on_exec_ctx);
444 
445   gpr_atm done_timestamps;
446   gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
447   grpc_endpoint_write(ep, &outgoing, &write_done_closure,
448                       grpc_event_engine_can_track_errors() && collect_timestamps
449                           ? (void*)&done_timestamps
450                           : nullptr);
451   drain_socket_blocking(sv[0], num_bytes, num_bytes);
452   exec_ctx.Flush();
453   gpr_mu_lock(g_mu);
454   for (;;) {
455     grpc_pollset_worker* worker = nullptr;
456     if (state.write_done &&
457         (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
458          gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
459       break;
460     }
461     GPR_ASSERT(GRPC_LOG_IF_ERROR(
462         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
463     gpr_mu_unlock(g_mu);
464     exec_ctx.Flush();
465     gpr_mu_lock(g_mu);
466   }
467   gpr_mu_unlock(g_mu);
468 
469   grpc_slice_buffer_destroy_internal(&outgoing);
470   grpc_endpoint_destroy(ep);
471   gpr_free(slices);
472 }
473 
on_fd_released(void * arg,grpc_error * errors)474 void on_fd_released(void* arg, grpc_error* errors) {
475   int* done = static_cast<int*>(arg);
476   *done = 1;
477   GPR_ASSERT(
478       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
479 }
480 
481 /* Do a read_test, then release fd and try to read/write again. Verify that
482    grpc_tcp_fd() is available before the fd is released. */
release_fd_test(size_t num_bytes,size_t slice_size)483 static void release_fd_test(size_t num_bytes, size_t slice_size) {
484   int sv[2];
485   grpc_endpoint* ep;
486   struct read_socket_state state;
487   size_t written_bytes;
488   int fd;
489   grpc_millis deadline =
490       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
491   grpc_core::ExecCtx exec_ctx;
492   grpc_closure fd_released_cb;
493   int fd_released_done = 0;
494   GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
495                     grpc_schedule_on_exec_ctx);
496 
497   gpr_log(GPR_INFO,
498           "Release fd read_test of size %" PRIuPTR ", slice size %" PRIuPTR,
499           num_bytes, slice_size);
500 
501   create_sockets(sv);
502 
503   grpc_arg a[1];
504   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
505   a[0].type = GRPC_ARG_INTEGER;
506   a[0].value.integer = static_cast<int>(slice_size);
507   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
508   ep =
509       grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
510   GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
511   grpc_endpoint_add_to_pollset(ep, g_pollset);
512 
513   written_bytes = fill_socket_partial(sv[0], num_bytes);
514   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
515 
516   state.ep = ep;
517   state.read_bytes = 0;
518   state.target_read_bytes = written_bytes;
519   grpc_slice_buffer_init(&state.incoming);
520   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
521 
522   grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
523 
524   gpr_mu_lock(g_mu);
525   while (state.read_bytes < state.target_read_bytes) {
526     grpc_pollset_worker* worker = nullptr;
527     GPR_ASSERT(GRPC_LOG_IF_ERROR(
528         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
529     gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
530             state.read_bytes, state.target_read_bytes);
531     gpr_mu_unlock(g_mu);
532     grpc_core::ExecCtx::Get()->Flush();
533     gpr_mu_lock(g_mu);
534   }
535   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
536   gpr_mu_unlock(g_mu);
537 
538   grpc_slice_buffer_destroy_internal(&state.incoming);
539   grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
540   grpc_core::ExecCtx::Get()->Flush();
541   gpr_mu_lock(g_mu);
542   while (!fd_released_done) {
543     grpc_pollset_worker* worker = nullptr;
544     GPR_ASSERT(GRPC_LOG_IF_ERROR(
545         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
546     gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
547   }
548   gpr_mu_unlock(g_mu);
549   GPR_ASSERT(fd_released_done == 1);
550   GPR_ASSERT(fd == sv[1]);
551 
552   written_bytes = fill_socket_partial(sv[0], num_bytes);
553   drain_socket_blocking(fd, written_bytes, written_bytes);
554   written_bytes = fill_socket_partial(fd, num_bytes);
555   drain_socket_blocking(sv[0], written_bytes, written_bytes);
556   close(fd);
557 }
558 
run_tests(void)559 void run_tests(void) {
560   size_t i = 0;
561 
562   read_test(100, 8192);
563   read_test(10000, 8192);
564   read_test(10000, 137);
565   read_test(10000, 1);
566   large_read_test(8192);
567   large_read_test(1);
568 
569   write_test(100, 8192, false);
570   write_test(100, 1, false);
571   write_test(100000, 8192, false);
572   write_test(100000, 1, false);
573   write_test(100000, 137, false);
574 
575   write_test(100, 8192, true);
576   write_test(100, 1, true);
577   write_test(100000, 8192, true);
578   write_test(100000, 1, true);
579   write_test(100, 137, true);
580 
581   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
582     write_test(40320, i, false);
583     write_test(40320, i, true);
584   }
585 
586   release_fd_test(100, 8192);
587 }
588 
clean_up(void)589 static void clean_up(void) {}
590 
create_fixture_tcp_socketpair(size_t slice_size)591 static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
592     size_t slice_size) {
593   int sv[2];
594   grpc_endpoint_test_fixture f;
595   grpc_core::ExecCtx exec_ctx;
596 
597   create_sockets(sv);
598   grpc_resource_quota* resource_quota =
599       grpc_resource_quota_create("tcp_posix_test_socketpair");
600   grpc_arg a[1];
601   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
602   a[0].type = GRPC_ARG_INTEGER;
603   a[0].value.integer = static_cast<int>(slice_size);
604   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
605   f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
606                                 &args, "test");
607   f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
608                                 &args, "test");
609   grpc_resource_quota_unref_internal(resource_quota);
610   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
611   grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
612 
613   return f;
614 }
615 
616 static grpc_endpoint_test_config configs[] = {
617     {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
618 };
619 
destroy_pollset(void * p,grpc_error * error)620 static void destroy_pollset(void* p, grpc_error* error) {
621   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
622 }
623 
main(int argc,char ** argv)624 int main(int argc, char** argv) {
625   grpc_closure destroyed;
626   grpc_test_init(argc, argv);
627   grpc_init();
628   grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
629   {
630     grpc_core::ExecCtx exec_ctx;
631     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
632     grpc_pollset_init(g_pollset, &g_mu);
633     grpc_endpoint_tests(configs[0], g_pollset, g_mu);
634     run_tests();
635     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
636                       grpc_schedule_on_exec_ctx);
637     grpc_pollset_shutdown(g_pollset, &destroyed);
638 
639     grpc_core::ExecCtx::Get()->Flush();
640   }
641   grpc_shutdown();
642   gpr_free(g_pollset);
643 
644   return 0;
645 }
646 
647 #else /* GRPC_POSIX_SOCKET */
648 
main(int argc,char ** argv)649 int main(int argc, char** argv) { return 1; }
650 
651 #endif /* GRPC_POSIX_SOCKET */
652