1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/transport.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/atm.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29 
30 #include "src/core/lib/gpr/string.h"
31 #include "src/core/lib/iomgr/executor.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/slice/slice_string_helpers.h"
34 #include "src/core/lib/transport/transport_impl.h"
35 
36 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
37                                                          "stream_refcount");
38 
39 #ifndef NDEBUG
grpc_stream_ref(grpc_stream_refcount * refcount,const char * reason)40 void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason) {
41   if (grpc_trace_stream_refcount.enabled()) {
42     gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
43     gpr_log(GPR_DEBUG, "%s %p:%p   REF %" PRIdPTR "->%" PRIdPTR " %s",
44             refcount->object_type, refcount, refcount->destroy.cb_arg, val,
45             val + 1, reason);
46   }
47 #else
48 void grpc_stream_ref(grpc_stream_refcount* refcount) {
49 #endif
50   gpr_ref_non_zero(&refcount->refs);
51 }
52 
53 #ifndef NDEBUG
54 void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) {
55   if (grpc_trace_stream_refcount.enabled()) {
56     gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
57     gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
58             refcount->object_type, refcount, refcount->destroy.cb_arg, val,
59             val - 1, reason);
60   }
61 #else
62 void grpc_stream_unref(grpc_stream_refcount* refcount) {
63 #endif
64   if (gpr_unref(&refcount->refs)) {
65     if (grpc_core::ExecCtx::Get()->flags() &
66         GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
67       /* Ick.
68          The thread we're running on MAY be owned (indirectly) by a call-stack.
69          If that's the case, destroying the call-stack MAY try to destroy the
70          thread, which is a tangled mess that we just don't want to ever have to
71          cope with.
72          Throw this over to the executor (on a core-owned thread) and process it
73          there. */
74       refcount->destroy.scheduler =
75           grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
76     }
77     GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
78   }
79 }
80 
81 #define STREAM_REF_FROM_SLICE_REF(p)       \
82   ((grpc_stream_refcount*)(((uint8_t*)p) - \
83                            offsetof(grpc_stream_refcount, slice_refcount)))
84 
85 static void slice_stream_ref(void* p) {
86 #ifndef NDEBUG
87   grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
88 #else
89   grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
90 #endif
91 }
92 
93 static void slice_stream_unref(void* p) {
94 #ifndef NDEBUG
95   grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice");
96 #else
97   grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p));
98 #endif
99 }
100 
101 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
102                                                void* buffer, size_t length) {
103   slice_stream_ref(&refcount->slice_refcount);
104   grpc_slice res;
105   res.refcount = &refcount->slice_refcount;
106   res.data.refcounted.bytes = static_cast<uint8_t*>(buffer);
107   res.data.refcounted.length = length;
108   return res;
109 }
110 
111 static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
112     slice_stream_ref,            /* ref */
113     slice_stream_unref,          /* unref */
114     grpc_slice_default_eq_impl,  /* eq */
115     grpc_slice_default_hash_impl /* hash */
116 };
117 
118 #ifndef NDEBUG
119 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
120                           grpc_iomgr_cb_func cb, void* cb_arg,
121                           const char* object_type) {
122   refcount->object_type = object_type;
123 #else
124 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
125                           grpc_iomgr_cb_func cb, void* cb_arg) {
126 #endif
127   gpr_ref_init(&refcount->refs, initial_refs);
128   GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
129   refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
130   refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
131 }
132 
133 static void move64(uint64_t* from, uint64_t* to) {
134   *to += *from;
135   *from = 0;
136 }
137 
138 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
139                                        grpc_transport_one_way_stats* to) {
140   move64(&from->framing_bytes, &to->framing_bytes);
141   move64(&from->data_bytes, &to->data_bytes);
142   move64(&from->header_bytes, &to->header_bytes);
143 }
144 
145 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
146                                grpc_transport_stream_stats* to) {
147   grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
148   grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
149 }
150 
151 size_t grpc_transport_stream_size(grpc_transport* transport) {
152   return transport->vtable->sizeof_stream;
153 }
154 
155 void grpc_transport_destroy(grpc_transport* transport) {
156   transport->vtable->destroy(transport);
157 }
158 
159 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
160                                grpc_stream_refcount* refcount,
161                                const void* server_data, gpr_arena* arena) {
162   return transport->vtable->init_stream(transport, stream, refcount,
163                                         server_data, arena);
164 }
165 
166 void grpc_transport_perform_stream_op(grpc_transport* transport,
167                                       grpc_stream* stream,
168                                       grpc_transport_stream_op_batch* op) {
169   transport->vtable->perform_stream_op(transport, stream, op);
170 }
171 
172 void grpc_transport_perform_op(grpc_transport* transport,
173                                grpc_transport_op* op) {
174   transport->vtable->perform_op(transport, op);
175 }
176 
177 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
178                              grpc_polling_entity* pollent) {
179   grpc_pollset* pollset;
180   grpc_pollset_set* pollset_set;
181   if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
182     transport->vtable->set_pollset(transport, stream, pollset);
183   } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
184              nullptr) {
185     transport->vtable->set_pollset_set(transport, stream, pollset_set);
186   } else {
187     // No-op for empty pollset. Empty pollset is possible when using
188     // non-fd-based event engines such as CFStream.
189   }
190 }
191 
192 void grpc_transport_destroy_stream(grpc_transport* transport,
193                                    grpc_stream* stream,
194                                    grpc_closure* then_schedule_closure) {
195   transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
196 }
197 
198 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
199   return transport->vtable->get_endpoint(transport);
200 }
201 
202 // This comment should be sung to the tune of
203 // "Supercalifragilisticexpialidocious":
204 //
205 // grpc_transport_stream_op_batch_finish_with_failure
206 // is a function that must always unref cancel_error
207 // though it lives in lib, it handles transport stream ops sure
208 // it's grpc_transport_stream_op_batch_finish_with_failure
209 void grpc_transport_stream_op_batch_finish_with_failure(
210     grpc_transport_stream_op_batch* batch, grpc_error* error,
211     grpc_call_combiner* call_combiner) {
212   if (batch->send_message) {
213     batch->payload->send_message.send_message.reset();
214   }
215   if (batch->cancel_stream) {
216     GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
217   }
218   // Construct a list of closures to execute.
219   grpc_core::CallCombinerClosureList closures;
220   if (batch->recv_initial_metadata) {
221     closures.Add(
222         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
223         GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
224   }
225   if (batch->recv_message) {
226     closures.Add(batch->payload->recv_message.recv_message_ready,
227                  GRPC_ERROR_REF(error), "failing recv_message_ready");
228   }
229   if (batch->recv_trailing_metadata) {
230     closures.Add(
231         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
232         GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
233   }
234   if (batch->on_complete != nullptr) {
235     closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
236                  "failing on_complete");
237   }
238   // Execute closures.
239   closures.RunClosures(call_combiner);
240   GRPC_ERROR_UNREF(error);
241 }
242 
243 typedef struct {
244   grpc_closure outer_on_complete;
245   grpc_closure* inner_on_complete;
246   grpc_transport_op op;
247 } made_transport_op;
248 
249 static void destroy_made_transport_op(void* arg, grpc_error* error) {
250   made_transport_op* op = static_cast<made_transport_op*>(arg);
251   GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error));
252   gpr_free(op);
253 }
254 
255 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
256   made_transport_op* op =
257       static_cast<made_transport_op*>(gpr_malloc(sizeof(*op)));
258   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
259                     grpc_schedule_on_exec_ctx);
260   op->inner_on_complete = on_complete;
261   memset(&op->op, 0, sizeof(op->op));
262   op->op.on_consumed = &op->outer_on_complete;
263   return &op->op;
264 }
265 
266 typedef struct {
267   grpc_closure outer_on_complete;
268   grpc_closure* inner_on_complete;
269   grpc_transport_stream_op_batch op;
270   grpc_transport_stream_op_batch_payload payload;
271 } made_transport_stream_op;
272 
273 static void destroy_made_transport_stream_op(void* arg, grpc_error* error) {
274   made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
275   grpc_closure* c = op->inner_on_complete;
276   gpr_free(op);
277   GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error));
278 }
279 
280 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
281     grpc_closure* on_complete) {
282   made_transport_stream_op* op =
283       static_cast<made_transport_stream_op*>(gpr_zalloc(sizeof(*op)));
284   op->op.payload = &op->payload;
285   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
286                     op, grpc_schedule_on_exec_ctx);
287   op->inner_on_complete = on_complete;
288   op->op.on_complete = &op->outer_on_complete;
289   return &op->op;
290 }
291