1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 
28 #include "src/core/lib/iomgr/error.h"
29 #include "src/core/lib/iomgr/iomgr_custom.h"
30 #include "src/core/lib/iomgr/sockaddr_utils.h"
31 #include "src/core/lib/iomgr/tcp_client.h"
32 #include "src/core/lib/iomgr/tcp_custom.h"
33 #include "src/core/lib/iomgr/timer.h"
34 
35 extern grpc_core::TraceFlag grpc_tcp_trace;
36 extern grpc_socket_vtable* grpc_custom_socket_vtable;
37 
38 struct grpc_custom_tcp_connect {
39   grpc_custom_socket* socket;
40   grpc_timer alarm;
41   grpc_closure on_alarm;
42   grpc_closure* closure;
43   grpc_endpoint** endpoint;
44   int refs;
45   char* addr_name;
46   grpc_resource_quota* resource_quota;
47 };
48 
custom_tcp_connect_cleanup(grpc_custom_tcp_connect * connect)49 static void custom_tcp_connect_cleanup(grpc_custom_tcp_connect* connect) {
50   grpc_custom_socket* socket = connect->socket;
51   grpc_resource_quota_unref_internal(connect->resource_quota);
52   gpr_free(connect->addr_name);
53   gpr_free(connect);
54   socket->refs--;
55   if (socket->refs == 0) {
56     grpc_custom_socket_vtable->destroy(socket);
57     gpr_free(socket);
58   }
59 }
60 
custom_close_callback(grpc_custom_socket * socket)61 static void custom_close_callback(grpc_custom_socket* socket) {}
62 
on_alarm(void * acp,grpc_error * error)63 static void on_alarm(void* acp, grpc_error* error) {
64   int done;
65   grpc_custom_socket* socket = (grpc_custom_socket*)acp;
66   grpc_custom_tcp_connect* connect = socket->connector;
67   if (grpc_tcp_trace.enabled()) {
68     const char* str = grpc_error_string(error);
69     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
70             connect->addr_name, str);
71   }
72   if (error == GRPC_ERROR_NONE) {
73     /* error == NONE implies that the timer ran out, and wasn't cancelled. If
74        it was cancelled, then the handler that cancelled it also should close
75        the handle, if applicable */
76     grpc_custom_socket_vtable->close(socket, custom_close_callback);
77   }
78   done = (--connect->refs == 0);
79   if (done) {
80     custom_tcp_connect_cleanup(connect);
81   }
82 }
83 
custom_connect_callback(grpc_custom_socket * socket,grpc_error * error)84 static void custom_connect_callback(grpc_custom_socket* socket,
85                                     grpc_error* error) {
86   grpc_core::ExecCtx exec_ctx;
87   grpc_custom_tcp_connect* connect = socket->connector;
88   int done;
89   grpc_closure* closure = connect->closure;
90   grpc_timer_cancel(&connect->alarm);
91   if (error == GRPC_ERROR_NONE) {
92     *connect->endpoint = custom_tcp_endpoint_create(
93         socket, connect->resource_quota, connect->addr_name);
94   }
95   done = (--connect->refs == 0);
96   if (done) {
97     grpc_core::ExecCtx::Get()->Flush();
98     custom_tcp_connect_cleanup(connect);
99   }
100   GRPC_CLOSURE_SCHED(closure, error);
101 }
102 
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * resolved_addr,grpc_millis deadline)103 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
104                         grpc_pollset_set* interested_parties,
105                         const grpc_channel_args* channel_args,
106                         const grpc_resolved_address* resolved_addr,
107                         grpc_millis deadline) {
108   GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
109   (void)channel_args;
110   (void)interested_parties;
111   grpc_custom_tcp_connect* connect;
112   grpc_resource_quota* resource_quota = grpc_resource_quota_create(nullptr);
113   if (channel_args != nullptr) {
114     for (size_t i = 0; i < channel_args->num_args; i++) {
115       if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
116         grpc_resource_quota_unref_internal(resource_quota);
117         resource_quota = grpc_resource_quota_ref_internal(
118             (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
119       }
120     }
121   }
122   grpc_custom_socket* socket =
123       (grpc_custom_socket*)gpr_malloc(sizeof(grpc_custom_socket));
124   socket->refs = 2;
125   grpc_custom_socket_vtable->init(socket, GRPC_AF_UNSPEC);
126   connect =
127       (grpc_custom_tcp_connect*)gpr_malloc(sizeof(grpc_custom_tcp_connect));
128   connect->closure = closure;
129   connect->endpoint = ep;
130   connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
131   connect->resource_quota = resource_quota;
132   connect->socket = socket;
133   socket->connector = connect;
134   socket->endpoint = nullptr;
135   socket->listener = nullptr;
136   connect->refs = 2;
137 
138   if (grpc_tcp_trace.enabled()) {
139     gpr_log(GPR_INFO, "CLIENT_CONNECT: %p %s: asynchronously connecting",
140             socket, connect->addr_name);
141   }
142 
143   GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket,
144                     grpc_schedule_on_exec_ctx);
145   grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
146   grpc_custom_socket_vtable->connect(
147       socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
148       custom_connect_callback);
149 }
150 
151 grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect};
152