1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/channel/connected_channel.h"
22 
23 #include <stdarg.h>
24 #include <stdio.h>
25 #include <string.h>
26 
27 #include <grpc/byte_buffer.h>
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/profiling/timers.h"
33 #include "src/core/lib/transport/transport.h"
34 
35 #define MAX_BUFFER_LENGTH 8192
36 
37 typedef struct connected_channel_channel_data {
38   grpc_transport* transport;
39 } channel_data;
40 
41 typedef struct {
42   grpc_closure closure;
43   grpc_closure* original_closure;
44   grpc_call_combiner* call_combiner;
45   const char* reason;
46 } callback_state;
47 
48 typedef struct connected_channel_call_data {
49   grpc_call_combiner* call_combiner;
50   // Closures used for returning results on the call combiner.
51   callback_state on_complete[6];  // Max number of pending batches.
52   callback_state recv_initial_metadata_ready;
53   callback_state recv_message_ready;
54   callback_state recv_trailing_metadata_ready;
55 } call_data;
56 
run_in_call_combiner(void * arg,grpc_error * error)57 static void run_in_call_combiner(void* arg, grpc_error* error) {
58   callback_state* state = static_cast<callback_state*>(arg);
59   GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure,
60                            GRPC_ERROR_REF(error), state->reason);
61 }
62 
run_cancel_in_call_combiner(void * arg,grpc_error * error)63 static void run_cancel_in_call_combiner(void* arg, grpc_error* error) {
64   run_in_call_combiner(arg, error);
65   gpr_free(arg);
66 }
67 
intercept_callback(call_data * calld,callback_state * state,bool free_when_done,const char * reason,grpc_closure ** original_closure)68 static void intercept_callback(call_data* calld, callback_state* state,
69                                bool free_when_done, const char* reason,
70                                grpc_closure** original_closure) {
71   state->original_closure = *original_closure;
72   state->call_combiner = calld->call_combiner;
73   state->reason = reason;
74   *original_closure = GRPC_CLOSURE_INIT(
75       &state->closure,
76       free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
77       state, grpc_schedule_on_exec_ctx);
78 }
79 
get_state_for_batch(call_data * calld,grpc_transport_stream_op_batch * batch)80 static callback_state* get_state_for_batch(
81     call_data* calld, grpc_transport_stream_op_batch* batch) {
82   if (batch->send_initial_metadata) return &calld->on_complete[0];
83   if (batch->send_message) return &calld->on_complete[1];
84   if (batch->send_trailing_metadata) return &calld->on_complete[2];
85   if (batch->recv_initial_metadata) return &calld->on_complete[3];
86   if (batch->recv_message) return &calld->on_complete[4];
87   if (batch->recv_trailing_metadata) return &calld->on_complete[5];
88   GPR_UNREACHABLE_CODE(return nullptr);
89 }
90 
91 /* We perform a small hack to locate transport data alongside the connected
92    channel data in call allocations, to allow everything to be pulled in minimal
93    cache line requests */
94 #define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream*)((calld) + 1))
95 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
96   (((call_data*)(transport_stream)) - 1)
97 
98 /* Intercept a call operation and either push it directly up or translate it
99    into transport stream operations */
con_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)100 static void con_start_transport_stream_op_batch(
101     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
102   call_data* calld = static_cast<call_data*>(elem->call_data);
103   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
104   if (batch->recv_initial_metadata) {
105     callback_state* state = &calld->recv_initial_metadata_ready;
106     intercept_callback(
107         calld, state, false, "recv_initial_metadata_ready",
108         &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
109   }
110   if (batch->recv_message) {
111     callback_state* state = &calld->recv_message_ready;
112     intercept_callback(calld, state, false, "recv_message_ready",
113                        &batch->payload->recv_message.recv_message_ready);
114   }
115   if (batch->recv_trailing_metadata) {
116     callback_state* state = &calld->recv_trailing_metadata_ready;
117     intercept_callback(
118         calld, state, false, "recv_trailing_metadata_ready",
119         &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
120   }
121   if (batch->cancel_stream) {
122     // There can be more than one cancellation batch in flight at any
123     // given time, so we can't just pick out a fixed index into
124     // calld->on_complete like we can for the other ops.  However,
125     // cancellation isn't in the fast path, so we just allocate a new
126     // closure for each one.
127     callback_state* state =
128         static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
129     intercept_callback(calld, state, true, "on_complete (cancel_stream)",
130                        &batch->on_complete);
131   } else if (batch->on_complete != nullptr) {
132     callback_state* state = get_state_for_batch(calld, batch);
133     intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
134   }
135   grpc_transport_perform_stream_op(
136       chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch);
137   GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport");
138 }
139 
con_start_transport_op(grpc_channel_element * elem,grpc_transport_op * op)140 static void con_start_transport_op(grpc_channel_element* elem,
141                                    grpc_transport_op* op) {
142   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
143   grpc_transport_perform_op(chand->transport, op);
144 }
145 
146 /* Constructor for call_data */
init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)147 static grpc_error* init_call_elem(grpc_call_element* elem,
148                                   const grpc_call_element_args* args) {
149   call_data* calld = static_cast<call_data*>(elem->call_data);
150   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
151   calld->call_combiner = args->call_combiner;
152   int r = grpc_transport_init_stream(
153       chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
154       &args->call_stack->refcount, args->server_transport_data, args->arena);
155   return r == 0 ? GRPC_ERROR_NONE
156                 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
157                       "transport stream initialization failed");
158 }
159 
set_pollset_or_pollset_set(grpc_call_element * elem,grpc_polling_entity * pollent)160 static void set_pollset_or_pollset_set(grpc_call_element* elem,
161                                        grpc_polling_entity* pollent) {
162   call_data* calld = static_cast<call_data*>(elem->call_data);
163   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
164   grpc_transport_set_pops(chand->transport,
165                           TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent);
166 }
167 
168 /* Destructor for call_data */
destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)169 static void destroy_call_elem(grpc_call_element* elem,
170                               const grpc_call_final_info* final_info,
171                               grpc_closure* then_schedule_closure) {
172   call_data* calld = static_cast<call_data*>(elem->call_data);
173   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
174   grpc_transport_destroy_stream(chand->transport,
175                                 TRANSPORT_STREAM_FROM_CALL_DATA(calld),
176                                 then_schedule_closure);
177 }
178 
179 /* Constructor for channel_data */
init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)180 static grpc_error* init_channel_elem(grpc_channel_element* elem,
181                                      grpc_channel_element_args* args) {
182   channel_data* cd = static_cast<channel_data*>(elem->channel_data);
183   GPR_ASSERT(args->is_last);
184   cd->transport = nullptr;
185   return GRPC_ERROR_NONE;
186 }
187 
188 /* Destructor for channel_data */
destroy_channel_elem(grpc_channel_element * elem)189 static void destroy_channel_elem(grpc_channel_element* elem) {
190   channel_data* cd = static_cast<channel_data*>(elem->channel_data);
191   if (cd->transport) {
192     grpc_transport_destroy(cd->transport);
193   }
194 }
195 
196 /* No-op. */
con_get_channel_info(grpc_channel_element * elem,const grpc_channel_info * channel_info)197 static void con_get_channel_info(grpc_channel_element* elem,
198                                  const grpc_channel_info* channel_info) {}
199 
200 const grpc_channel_filter grpc_connected_filter = {
201     con_start_transport_stream_op_batch,
202     con_start_transport_op,
203     sizeof(call_data),
204     init_call_elem,
205     set_pollset_or_pollset_set,
206     destroy_call_elem,
207     sizeof(channel_data),
208     init_channel_elem,
209     destroy_channel_elem,
210     con_get_channel_info,
211     "connected",
212 };
213 
bind_transport(grpc_channel_stack * channel_stack,grpc_channel_element * elem,void * t)214 static void bind_transport(grpc_channel_stack* channel_stack,
215                            grpc_channel_element* elem, void* t) {
216   channel_data* cd = static_cast<channel_data*>(elem->channel_data);
217   GPR_ASSERT(elem->filter == &grpc_connected_filter);
218   GPR_ASSERT(cd->transport == nullptr);
219   cd->transport = static_cast<grpc_transport*>(t);
220 
221   /* HACK(ctiller): increase call stack size for the channel to make space
222      for channel data. We need a cleaner (but performant) way to do this,
223      and I'm not sure what that is yet.
224      This is only "safe" because call stacks place no additional data after
225      the last call element, and the last call element MUST be the connected
226      channel. */
227   channel_stack->call_stack_size +=
228       grpc_transport_stream_size(static_cast<grpc_transport*>(t));
229 }
230 
grpc_add_connected_filter(grpc_channel_stack_builder * builder,void * arg_must_be_null)231 bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
232                                void* arg_must_be_null) {
233   GPR_ASSERT(arg_must_be_null == nullptr);
234   grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
235   GPR_ASSERT(t != nullptr);
236   return grpc_channel_stack_builder_append_filter(
237       builder, &grpc_connected_filter, bind_transport, t);
238 }
239 
grpc_connected_channel_get_stream(grpc_call_element * elem)240 grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem) {
241   call_data* calld = static_cast<call_data*>(elem->call_data);
242   return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
243 }
244