1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
20 #define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 
26 #include "src/core/lib/channel/context.h"
27 #include "src/core/lib/gpr/arena.h"
28 #include "src/core/lib/iomgr/call_combiner.h"
29 #include "src/core/lib/iomgr/endpoint.h"
30 #include "src/core/lib/iomgr/polling_entity.h"
31 #include "src/core/lib/iomgr/pollset.h"
32 #include "src/core/lib/iomgr/pollset_set.h"
33 #include "src/core/lib/transport/byte_stream.h"
34 #include "src/core/lib/transport/metadata_batch.h"
35 
36 /* Minimum and maximum protocol accepted versions. */
37 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
38 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
39 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
40 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
41 
42 /* forward declarations */
43 
44 typedef struct grpc_transport grpc_transport;
45 
46 /* grpc_stream doesn't actually exist. It's used as a typesafe
47    opaque pointer for whatever data the transport wants to track
48    for a stream. */
49 typedef struct grpc_stream grpc_stream;
50 
51 extern grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount;
52 
53 typedef struct grpc_stream_refcount {
54   gpr_refcount refs;
55   grpc_closure destroy;
56 #ifndef NDEBUG
57   const char* object_type;
58 #endif
59   grpc_slice_refcount slice_refcount;
60 } grpc_stream_refcount;
61 
62 #ifndef NDEBUG
63 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
64                           grpc_iomgr_cb_func cb, void* cb_arg,
65                           const char* object_type);
66 void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason);
67 void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason);
68 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
69   grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
70 #else
71 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
72                           grpc_iomgr_cb_func cb, void* cb_arg);
73 void grpc_stream_ref(grpc_stream_refcount* refcount);
74 void grpc_stream_unref(grpc_stream_refcount* refcount);
75 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
76   grpc_stream_ref_init(rc, ir, cb, cb_arg)
77 #endif
78 
79 /* Wrap a buffer that is owned by some stream object into a slice that shares
80    the same refcount */
81 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
82                                                void* buffer, size_t length);
83 
84 typedef struct {
85   uint64_t framing_bytes;
86   uint64_t data_bytes;
87   uint64_t header_bytes;
88 } grpc_transport_one_way_stats;
89 
90 typedef struct grpc_transport_stream_stats {
91   grpc_transport_one_way_stats incoming;
92   grpc_transport_one_way_stats outgoing;
93 } grpc_transport_stream_stats;
94 
95 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
96                                        grpc_transport_one_way_stats* to);
97 
98 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
99                                grpc_transport_stream_stats* to);
100 
101 // This struct (which is present in both grpc_transport_stream_op_batch
102 // and grpc_transport_op_batch) is a convenience to allow filters or
103 // transports to schedule a closure related to a particular batch without
104 // having to allocate memory.  The general pattern is to initialize the
105 // closure with the callback arg set to the batch and extra_arg set to
106 // whatever state is associated with the handler (e.g., the call element
107 // or the transport stream object).
108 //
109 // Note that this can only be used by the current handler of a given
110 // batch on the way down the stack (i.e., whichever filter or transport is
111 // currently handling the batch).  Once a filter or transport passes control
112 // of the batch to the next handler, it cannot depend on the contents of
113 // this struct anymore, because the next handler may reuse it.
114 typedef struct {
115   void* extra_arg;
116   grpc_closure closure;
117 } grpc_handler_private_op_data;
118 
119 typedef struct grpc_transport_stream_op_batch_payload
120     grpc_transport_stream_op_batch_payload;
121 
122 /* Transport stream op: a set of operations to perform on a transport
123    against a single stream */
124 typedef struct grpc_transport_stream_op_batch {
125   /** Should be scheduled when all of the non-recv operations in the batch
126       are complete.
127 
128       The recv ops (recv_initial_metadata, recv_message, and
129       recv_trailing_metadata) each have their own callbacks.  If a batch
130       contains both recv ops and non-recv ops, on_complete should be
131       scheduled as soon as the non-recv ops are complete, regardless of
132       whether or not the recv ops are complete.  If a batch contains
133       only recv ops, on_complete can be null. */
134   grpc_closure* on_complete;
135 
136   /** Values for the stream op (fields set are determined by flags above) */
137   grpc_transport_stream_op_batch_payload* payload;
138 
139   /** Send initial metadata to the peer, from the provided metadata batch. */
140   bool send_initial_metadata : 1;
141 
142   /** Send trailing metadata to the peer, from the provided metadata batch. */
143   bool send_trailing_metadata : 1;
144 
145   /** Send message data to the peer, from the provided byte stream. */
146   bool send_message : 1;
147 
148   /** Receive initial metadata from the stream, into provided metadata batch. */
149   bool recv_initial_metadata : 1;
150 
151   /** Receive message data from the stream, into provided byte stream. */
152   bool recv_message : 1;
153 
154   /** Receive trailing metadata from the stream, into provided metadata batch.
155    */
156   bool recv_trailing_metadata : 1;
157 
158   /** Cancel this stream with the provided error */
159   bool cancel_stream : 1;
160 
161   /***************************************************************************
162    * remaining fields are initialized and used at the discretion of the
163    * current handler of the op */
164 
165   grpc_handler_private_op_data handler_private;
166 } grpc_transport_stream_op_batch;
167 
168 struct grpc_transport_stream_op_batch_payload {
169   struct {
170     grpc_metadata_batch* send_initial_metadata;
171     /** Iff send_initial_metadata != NULL, flags associated with
172         send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
173     uint32_t send_initial_metadata_flags;
174     // If non-NULL, will be set by the transport to the peer string (a char*).
175     // The transport retains ownership of the string.
176     // Note: This pointer may be used by the transport after the
177     // send_initial_metadata op is completed.  It must remain valid
178     // until the call is destroyed.
179     gpr_atm* peer_string;
180   } send_initial_metadata;
181 
182   struct {
183     grpc_metadata_batch* send_trailing_metadata;
184   } send_trailing_metadata;
185 
186   struct {
187     // The transport (or a filter that decides to return a failure before
188     // the op gets down to the transport) takes ownership.
189     // The batch's on_complete will not be called until after the byte
190     // stream is orphaned.
191     grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
192   } send_message;
193 
194   struct {
195     grpc_metadata_batch* recv_initial_metadata;
196     // Flags are used only on the server side.  If non-null, will be set to
197     // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
198     // indicate if the call is idempotent).
199     uint32_t* recv_flags;
200     /** Should be enqueued when initial metadata is ready to be processed. */
201     grpc_closure* recv_initial_metadata_ready;
202     // If not NULL, will be set to true if trailing metadata is
203     // immediately available.  This may be a signal that we received a
204     // Trailers-Only response.
205     bool* trailing_metadata_available;
206     // If non-NULL, will be set by the transport to the peer string (a char*).
207     // The transport retains ownership of the string.
208     // Note: This pointer may be used by the transport after the
209     // recv_initial_metadata op is completed.  It must remain valid
210     // until the call is destroyed.
211     gpr_atm* peer_string;
212   } recv_initial_metadata;
213 
214   struct {
215     // Will be set by the transport to point to the byte stream
216     // containing a received message.
217     // Will be NULL if trailing metadata is received instead of a message.
218     grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
219     /** Should be enqueued when one message is ready to be processed. */
220     grpc_closure* recv_message_ready;
221   } recv_message;
222 
223   struct {
224     grpc_metadata_batch* recv_trailing_metadata;
225     grpc_transport_stream_stats* collect_stats;
226     /** Should be enqueued when initial metadata is ready to be processed. */
227     grpc_closure* recv_trailing_metadata_ready;
228   } recv_trailing_metadata;
229 
230   /** Forcefully close this stream.
231       The HTTP2 semantics should be:
232       - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
233         trailing metadata has not been sent, send trailing metadata with status
234         and message from cancel_error (use grpc_error_get_status) followed by
235         a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close
236       - at all other times: use grpc_error_get_status to get a status code, and
237         convert to a HTTP2 error code using
238         grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
239         error. */
240   struct {
241     // Error contract: the transport that gets this op must cause cancel_error
242     //                 to be unref'ed after processing it
243     grpc_error* cancel_error;
244   } cancel_stream;
245 
246   /* Indexes correspond to grpc_context_index enum values */
247   grpc_call_context_element* context;
248 };
249 
250 /** Transport op: a set of operations to perform on a transport as a whole */
251 typedef struct grpc_transport_op {
252   /** Called when processing of this op is done. */
253   grpc_closure* on_consumed;
254   /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
255   grpc_closure* on_connectivity_state_change;
256   grpc_connectivity_state* connectivity_state;
257   /** should the transport be disconnected
258    * Error contract: the transport that gets this op must cause
259    *                 disconnect_with_error to be unref'ed after processing it */
260   grpc_error* disconnect_with_error;
261   /** what should the goaway contain?
262    * Error contract: the transport that gets this op must cause
263    *                 goaway_error to be unref'ed after processing it */
264   grpc_error* goaway_error;
265   /** set the callback for accepting new streams;
266       this is a permanent callback, unlike the other one-shot closures.
267       If true, the callback is set to set_accept_stream_fn, with its
268       user_data argument set to set_accept_stream_user_data */
269   bool set_accept_stream;
270   void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
271                                const void* server_data);
272   void* set_accept_stream_user_data;
273   /** add this transport to a pollset */
274   grpc_pollset* bind_pollset;
275   /** add this transport to a pollset_set */
276   grpc_pollset_set* bind_pollset_set;
277   /** send a ping, if either on_initiate or on_ack is not NULL */
278   struct {
279     /** Ping may be delayed by the transport, on_initiate callback will be
280         called when the ping is actually being sent. */
281     grpc_closure* on_initiate;
282     /** Called when the ping ack is received */
283     grpc_closure* on_ack;
284   } send_ping;
285   // If true, will reset the channel's connection backoff.
286   bool reset_connect_backoff;
287 
288   /***************************************************************************
289    * remaining fields are initialized and used at the discretion of the
290    * transport implementation */
291 
292   grpc_handler_private_op_data handler_private;
293 } grpc_transport_op;
294 
295 /* Returns the amount of memory required to store a grpc_stream for this
296    transport */
297 size_t grpc_transport_stream_size(grpc_transport* transport);
298 
299 /* Initialize transport data for a stream.
300 
301    Returns 0 on success, any other (transport-defined) value for failure.
302    May assume that stream contains all-zeros.
303 
304    Arguments:
305      transport   - the transport on which to create this stream
306      stream      - a pointer to uninitialized memory to initialize
307      server_data - either NULL for a client initiated stream, or a pointer
308                    supplied from the accept_stream callback function */
309 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
310                                grpc_stream_refcount* refcount,
311                                const void* server_data, gpr_arena* arena);
312 
313 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
314                              grpc_polling_entity* pollent);
315 
316 /* Destroy transport data for a stream.
317 
318    Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
319    received by the up-layer. Must not be called in the same call stack as
320    recv_frame.
321 
322    Arguments:
323      transport - the transport on which to create this stream
324      stream    - the grpc_stream to destroy (memory is still owned by the
325                  caller, but any child memory must be cleaned up) */
326 void grpc_transport_destroy_stream(grpc_transport* transport,
327                                    grpc_stream* stream,
328                                    grpc_closure* then_schedule_closure);
329 
330 void grpc_transport_stream_op_batch_finish_with_failure(
331     grpc_transport_stream_op_batch* op, grpc_error* error,
332     grpc_call_combiner* call_combiner);
333 
334 char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op);
335 char* grpc_transport_op_string(grpc_transport_op* op);
336 
337 /* Send a batch of operations on a transport
338 
339    Takes ownership of any objects contained in ops.
340 
341    Arguments:
342      transport - the transport on which to initiate the stream
343      stream    - the stream on which to send the operations. This must be
344                  non-NULL and previously initialized by the same transport.
345      op        - a grpc_transport_stream_op_batch specifying the op to perform
346    */
347 void grpc_transport_perform_stream_op(grpc_transport* transport,
348                                       grpc_stream* stream,
349                                       grpc_transport_stream_op_batch* op);
350 
351 void grpc_transport_perform_op(grpc_transport* transport,
352                                grpc_transport_op* op);
353 
354 /* Send a ping on a transport
355 
356    Calls cb with user data when a response is received. */
357 void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);
358 
359 /* Advise peer of pending connection termination. */
360 void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
361                            grpc_slice debug_data);
362 
363 /* Destroy the transport */
364 void grpc_transport_destroy(grpc_transport* transport);
365 
366 /* Get the endpoint used by \a transport */
367 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport);
368 
369 /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
370    \a on_consumed and then delete the returned transport op */
371 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed);
372 /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed
373    closure
374    to \a on_consumed and then delete the returned transport op */
375 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
376     grpc_closure* on_consumed);
377 
378 #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */
379