1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpc/support/sync.h>
24 #include <grpc/support/time.h>
25 #include <string.h>
26 #include "src/core/ext/transport/inproc/inproc_transport.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/surface/api_trace.h"
31 #include "src/core/lib/surface/channel.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/server.h"
34 #include "src/core/lib/transport/connectivity_state.h"
35 #include "src/core/lib/transport/error_utils.h"
36 #include "src/core/lib/transport/transport_impl.h"
37 
38 #define INPROC_LOG(...)                                    \
39   do {                                                     \
40     if (grpc_inproc_trace.enabled()) gpr_log(__VA_ARGS__); \
41   } while (0)
42 
43 static grpc_slice g_empty_slice;
44 static grpc_slice g_fake_path_key;
45 static grpc_slice g_fake_path_value;
46 static grpc_slice g_fake_auth_key;
47 static grpc_slice g_fake_auth_value;
48 
49 typedef struct {
50   gpr_mu mu;
51   gpr_refcount refs;
52 } shared_mu;
53 
54 typedef struct inproc_transport {
55   grpc_transport base;
56   shared_mu* mu;
57   gpr_refcount refs;
58   bool is_client;
59   grpc_connectivity_state_tracker connectivity;
60   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
61                            const void* server_data);
62   void* accept_stream_data;
63   bool is_closed;
64   struct inproc_transport* other_side;
65   struct inproc_stream* stream_list;
66 } inproc_transport;
67 
68 typedef struct inproc_stream {
69   inproc_transport* t;
70   grpc_metadata_batch to_read_initial_md;
71   uint32_t to_read_initial_md_flags;
72   bool to_read_initial_md_filled;
73   grpc_metadata_batch to_read_trailing_md;
74   bool to_read_trailing_md_filled;
75   bool ops_needed;
76   bool op_closure_scheduled;
77   grpc_closure op_closure;
78   // Write buffer used only during gap at init time when client-side
79   // stream is set up but server side stream is not yet set up
80   grpc_metadata_batch write_buffer_initial_md;
81   bool write_buffer_initial_md_filled;
82   uint32_t write_buffer_initial_md_flags;
83   grpc_millis write_buffer_deadline;
84   grpc_metadata_batch write_buffer_trailing_md;
85   bool write_buffer_trailing_md_filled;
86   grpc_error* write_buffer_cancel_error;
87 
88   struct inproc_stream* other_side;
89   bool other_side_closed;               // won't talk anymore
90   bool write_buffer_other_side_closed;  // on hold
91   grpc_stream_refcount* refs;
92   grpc_closure* closure_at_destroy;
93 
94   gpr_arena* arena;
95 
96   grpc_transport_stream_op_batch* send_message_op;
97   grpc_transport_stream_op_batch* send_trailing_md_op;
98   grpc_transport_stream_op_batch* recv_initial_md_op;
99   grpc_transport_stream_op_batch* recv_message_op;
100   grpc_transport_stream_op_batch* recv_trailing_md_op;
101 
102   grpc_slice_buffer recv_message;
103   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
104   bool recv_inited;
105 
106   bool initial_md_sent;
107   bool trailing_md_sent;
108   bool initial_md_recvd;
109   bool trailing_md_recvd;
110 
111   bool closed;
112 
113   grpc_error* cancel_self_error;
114   grpc_error* cancel_other_error;
115 
116   grpc_millis deadline;
117 
118   bool listed;
119   struct inproc_stream* stream_list_prev;
120   struct inproc_stream* stream_list_next;
121 } inproc_stream;
122 
123 static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
124 static void op_state_machine(void* arg, grpc_error* error);
125 
ref_transport(inproc_transport * t)126 static void ref_transport(inproc_transport* t) {
127   INPROC_LOG(GPR_INFO, "ref_transport %p", t);
128   gpr_ref(&t->refs);
129 }
130 
really_destroy_transport(inproc_transport * t)131 static void really_destroy_transport(inproc_transport* t) {
132   INPROC_LOG(GPR_INFO, "really_destroy_transport %p", t);
133   grpc_connectivity_state_destroy(&t->connectivity);
134   if (gpr_unref(&t->mu->refs)) {
135     gpr_free(t->mu);
136   }
137   gpr_free(t);
138 }
139 
unref_transport(inproc_transport * t)140 static void unref_transport(inproc_transport* t) {
141   INPROC_LOG(GPR_INFO, "unref_transport %p", t);
142   if (gpr_unref(&t->refs)) {
143     really_destroy_transport(t);
144   }
145 }
146 
147 #ifndef NDEBUG
148 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
149 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
150 #else
151 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
152 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
153 #endif
154 
ref_stream(inproc_stream * s,const char * reason)155 static void ref_stream(inproc_stream* s, const char* reason) {
156   INPROC_LOG(GPR_INFO, "ref_stream %p %s", s, reason);
157   STREAM_REF(s->refs, reason);
158 }
159 
unref_stream(inproc_stream * s,const char * reason)160 static void unref_stream(inproc_stream* s, const char* reason) {
161   INPROC_LOG(GPR_INFO, "unref_stream %p %s", s, reason);
162   STREAM_UNREF(s->refs, reason);
163 }
164 
really_destroy_stream(inproc_stream * s)165 static void really_destroy_stream(inproc_stream* s) {
166   INPROC_LOG(GPR_INFO, "really_destroy_stream %p", s);
167 
168   GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
169   GRPC_ERROR_UNREF(s->cancel_self_error);
170   GRPC_ERROR_UNREF(s->cancel_other_error);
171 
172   if (s->recv_inited) {
173     grpc_slice_buffer_destroy_internal(&s->recv_message);
174   }
175 
176   unref_transport(s->t);
177 
178   if (s->closure_at_destroy) {
179     GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE);
180   }
181 }
182 
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)183 static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
184                          bool is_initial) {
185   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
186        md = md->next) {
187     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
188     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
189     gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
190             is_client ? "CLI" : "SVR", key, value);
191     gpr_free(key);
192     gpr_free(value);
193   }
194 }
195 
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,uint32_t flags,grpc_metadata_batch * out_md,uint32_t * outflags,bool * markfilled)196 static grpc_error* fill_in_metadata(inproc_stream* s,
197                                     const grpc_metadata_batch* metadata,
198                                     uint32_t flags, grpc_metadata_batch* out_md,
199                                     uint32_t* outflags, bool* markfilled) {
200   if (grpc_inproc_trace.enabled()) {
201     log_metadata(metadata, s->t->is_client, outflags != nullptr);
202   }
203 
204   if (outflags != nullptr) {
205     *outflags = flags;
206   }
207   if (markfilled != nullptr) {
208     *markfilled = true;
209   }
210   grpc_error* error = GRPC_ERROR_NONE;
211   for (grpc_linked_mdelem* elem = metadata->list.head;
212        (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
213     grpc_linked_mdelem* nelem = static_cast<grpc_linked_mdelem*>(
214         gpr_arena_alloc(s->arena, sizeof(*nelem)));
215     nelem->md =
216         grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
217                                 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
218 
219     error = grpc_metadata_batch_link_tail(out_md, nelem);
220   }
221   return error;
222 }
223 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,gpr_arena * arena)224 static int init_stream(grpc_transport* gt, grpc_stream* gs,
225                        grpc_stream_refcount* refcount, const void* server_data,
226                        gpr_arena* arena) {
227   INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
228   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
229   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
230   s->arena = arena;
231 
232   s->refs = refcount;
233   // Ref this stream right now
234   ref_stream(s, "inproc_init_stream:init");
235 
236   grpc_metadata_batch_init(&s->to_read_initial_md);
237   s->to_read_initial_md_flags = 0;
238   s->to_read_initial_md_filled = false;
239   grpc_metadata_batch_init(&s->to_read_trailing_md);
240   s->to_read_trailing_md_filled = false;
241   grpc_metadata_batch_init(&s->write_buffer_initial_md);
242   s->write_buffer_initial_md_flags = 0;
243   s->write_buffer_initial_md_filled = false;
244   grpc_metadata_batch_init(&s->write_buffer_trailing_md);
245   s->write_buffer_trailing_md_filled = false;
246   s->ops_needed = false;
247   s->op_closure_scheduled = false;
248   GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
249                     grpc_schedule_on_exec_ctx);
250   s->t = t;
251   s->closure_at_destroy = nullptr;
252   s->other_side_closed = false;
253 
254   s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd =
255       s->trailing_md_recvd = false;
256 
257   s->closed = false;
258 
259   s->cancel_self_error = GRPC_ERROR_NONE;
260   s->cancel_other_error = GRPC_ERROR_NONE;
261   s->write_buffer_cancel_error = GRPC_ERROR_NONE;
262   s->deadline = GRPC_MILLIS_INF_FUTURE;
263   s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
264 
265   s->stream_list_prev = nullptr;
266   gpr_mu_lock(&t->mu->mu);
267   s->listed = true;
268   ref_stream(s, "inproc_init_stream:list");
269   s->stream_list_next = t->stream_list;
270   if (t->stream_list) {
271     t->stream_list->stream_list_prev = s;
272   }
273   t->stream_list = s;
274   gpr_mu_unlock(&t->mu->mu);
275 
276   if (!server_data) {
277     ref_transport(t);
278     inproc_transport* st = t->other_side;
279     ref_transport(st);
280     s->other_side = nullptr;  // will get filled in soon
281     // Pass the client-side stream address to the server-side for a ref
282     ref_stream(s, "inproc_init_stream:clt");  // ref it now on behalf of server
283                                               // side to avoid destruction
284     INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", st->accept_stream_cb,
285                st->accept_stream_data);
286     (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s);
287   } else {
288     // This is the server-side and is being called through accept_stream_cb
289     inproc_stream* cs = (inproc_stream*)server_data;
290     s->other_side = cs;
291     // Ref the server-side stream on behalf of the client now
292     ref_stream(s, "inproc_init_stream:srv");
293 
294     // Now we are about to affect the other side, so lock the transport
295     // to make sure that it doesn't get destroyed
296     gpr_mu_lock(&s->t->mu->mu);
297     cs->other_side = s;
298     // Now transfer from the other side's write_buffer if any to the to_read
299     // buffer
300     if (cs->write_buffer_initial_md_filled) {
301       fill_in_metadata(s, &cs->write_buffer_initial_md,
302                        cs->write_buffer_initial_md_flags,
303                        &s->to_read_initial_md, &s->to_read_initial_md_flags,
304                        &s->to_read_initial_md_filled);
305       s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline);
306       grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
307       cs->write_buffer_initial_md_filled = false;
308     }
309     if (cs->write_buffer_trailing_md_filled) {
310       fill_in_metadata(s, &cs->write_buffer_trailing_md, 0,
311                        &s->to_read_trailing_md, nullptr,
312                        &s->to_read_trailing_md_filled);
313       grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
314       cs->write_buffer_trailing_md_filled = false;
315     }
316     if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
317       s->cancel_other_error = cs->write_buffer_cancel_error;
318       cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
319     }
320 
321     gpr_mu_unlock(&s->t->mu->mu);
322   }
323   return 0;  // return value is not important
324 }
325 
close_stream_locked(inproc_stream * s)326 static void close_stream_locked(inproc_stream* s) {
327   if (!s->closed) {
328     // Release the metadata that we would have written out
329     grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
330     grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
331 
332     if (s->listed) {
333       inproc_stream* p = s->stream_list_prev;
334       inproc_stream* n = s->stream_list_next;
335       if (p != nullptr) {
336         p->stream_list_next = n;
337       } else {
338         s->t->stream_list = n;
339       }
340       if (n != nullptr) {
341         n->stream_list_prev = p;
342       }
343       s->listed = false;
344       unref_stream(s, "close_stream:list");
345     }
346     s->closed = true;
347     unref_stream(s, "close_stream:closing");
348   }
349 }
350 
351 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)352 static void close_other_side_locked(inproc_stream* s, const char* reason) {
353   if (s->other_side != nullptr) {
354     // First release the metadata that came from the other side's arena
355     grpc_metadata_batch_destroy(&s->to_read_initial_md);
356     grpc_metadata_batch_destroy(&s->to_read_trailing_md);
357 
358     unref_stream(s->other_side, reason);
359     s->other_side_closed = true;
360     s->other_side = nullptr;
361   } else if (!s->other_side_closed) {
362     s->write_buffer_other_side_closed = true;
363   }
364 }
365 
366 // Call the on_complete closure associated with this stream_op_batch if
367 // this stream_op_batch is only one of the pending operations for this
368 // stream. This is called when one of the pending operations for the stream
369 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error * error,grpc_transport_stream_op_batch * op,const char * msg)370 static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
371                                          grpc_transport_stream_op_batch* op,
372                                          const char* msg) {
373   int is_sm = static_cast<int>(op == s->send_message_op);
374   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
375   // TODO(vjpai): We should not consider the recv ops here, since they
376   // have their own callbacks.  We should invoke a batch's on_complete
377   // as soon as all of the batch's send ops are complete, even if there
378   // are still recv ops pending.
379   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
380   int is_rm = static_cast<int>(op == s->recv_message_op);
381   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
382 
383   if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
384     INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
385     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
386   }
387 }
388 
maybe_schedule_op_closure_locked(inproc_stream * s,grpc_error * error)389 static void maybe_schedule_op_closure_locked(inproc_stream* s,
390                                              grpc_error* error) {
391   if (s && s->ops_needed && !s->op_closure_scheduled) {
392     GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
393     s->op_closure_scheduled = true;
394     s->ops_needed = false;
395   }
396 }
397 
fail_helper_locked(inproc_stream * s,grpc_error * error)398 static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
399   INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
400   // If we're failing this side, we need to make sure that
401   // we also send or have already sent trailing metadata
402   if (!s->trailing_md_sent) {
403     // Send trailing md to the other side indicating cancellation
404     s->trailing_md_sent = true;
405 
406     grpc_metadata_batch fake_md;
407     grpc_metadata_batch_init(&fake_md);
408 
409     inproc_stream* other = s->other_side;
410     grpc_metadata_batch* dest = (other == nullptr)
411                                     ? &s->write_buffer_trailing_md
412                                     : &other->to_read_trailing_md;
413     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
414                                           : &other->to_read_trailing_md_filled;
415     fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
416     grpc_metadata_batch_destroy(&fake_md);
417 
418     if (other != nullptr) {
419       if (other->cancel_other_error == GRPC_ERROR_NONE) {
420         other->cancel_other_error = GRPC_ERROR_REF(error);
421       }
422       maybe_schedule_op_closure_locked(other, error);
423     } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
424       s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
425     }
426   }
427   if (s->recv_initial_md_op) {
428     grpc_error* err;
429     if (!s->t->is_client) {
430       // If this is a server, provide initial metadata with a path and authority
431       // since it expects that as well as no error yet
432       grpc_metadata_batch fake_md;
433       grpc_metadata_batch_init(&fake_md);
434       grpc_linked_mdelem* path_md = static_cast<grpc_linked_mdelem*>(
435           gpr_arena_alloc(s->arena, sizeof(*path_md)));
436       path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
437       GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
438                  GRPC_ERROR_NONE);
439       grpc_linked_mdelem* auth_md = static_cast<grpc_linked_mdelem*>(
440           gpr_arena_alloc(s->arena, sizeof(*auth_md)));
441       auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
442       GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
443                  GRPC_ERROR_NONE);
444 
445       fill_in_metadata(
446           s, &fake_md, 0,
447           s->recv_initial_md_op->payload->recv_initial_metadata
448               .recv_initial_metadata,
449           s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
450           nullptr);
451       grpc_metadata_batch_destroy(&fake_md);
452       err = GRPC_ERROR_NONE;
453     } else {
454       err = GRPC_ERROR_REF(error);
455     }
456     if (s->recv_initial_md_op->payload->recv_initial_metadata
457             .trailing_metadata_available != nullptr) {
458       // Set to true unconditionally, because we're failing the call, so even
459       // if we haven't actually seen the send_trailing_metadata op from the
460       // other side, we're going to return trailing metadata anyway.
461       *s->recv_initial_md_op->payload->recv_initial_metadata
462            .trailing_metadata_available = true;
463     }
464     INPROC_LOG(GPR_INFO,
465                "fail_helper %p scheduling initial-metadata-ready %p %p", s,
466                error, err);
467     GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
468                            .recv_initial_metadata_ready,
469                        err);
470     // Last use of err so no need to REF and then UNREF it
471 
472     complete_if_batch_end_locked(
473         s, error, s->recv_initial_md_op,
474         "fail_helper scheduling recv-initial-metadata-on-complete");
475     s->recv_initial_md_op = nullptr;
476   }
477   if (s->recv_message_op) {
478     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
479                error);
480     GRPC_CLOSURE_SCHED(
481         s->recv_message_op->payload->recv_message.recv_message_ready,
482         GRPC_ERROR_REF(error));
483     complete_if_batch_end_locked(
484         s, error, s->recv_message_op,
485         "fail_helper scheduling recv-message-on-complete");
486     s->recv_message_op = nullptr;
487   }
488   if (s->send_message_op) {
489     s->send_message_op->payload->send_message.send_message.reset();
490     complete_if_batch_end_locked(
491         s, error, s->send_message_op,
492         "fail_helper scheduling send-message-on-complete");
493     s->send_message_op = nullptr;
494   }
495   if (s->send_trailing_md_op) {
496     complete_if_batch_end_locked(
497         s, error, s->send_trailing_md_op,
498         "fail_helper scheduling send-trailng-md-on-complete");
499     s->send_trailing_md_op = nullptr;
500   }
501   if (s->recv_trailing_md_op) {
502     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
503                s, error);
504     GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
505                            .recv_trailing_metadata_ready,
506                        GRPC_ERROR_REF(error));
507     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
508                s, error);
509     complete_if_batch_end_locked(
510         s, error, s->recv_trailing_md_op,
511         "fail_helper scheduling recv-trailing-metadata-on-complete");
512     s->recv_trailing_md_op = nullptr;
513   }
514   close_other_side_locked(s, "fail_helper:other_side");
515   close_stream_locked(s);
516 
517   GRPC_ERROR_UNREF(error);
518 }
519 
520 // TODO(vjpai): It should not be necessary to drain the incoming byte
521 // stream and create a new one; instead, we should simply pass the byte
522 // stream from the sender directly to the receiver as-is.
523 //
524 // Note that fixing this will also avoid the assumption in this code
525 // that the incoming byte stream's next() call will always return
526 // synchronously.  That assumption is true today but may not always be
527 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)528 static void message_transfer_locked(inproc_stream* sender,
529                                     inproc_stream* receiver) {
530   size_t remaining =
531       sender->send_message_op->payload->send_message.send_message->length();
532   if (receiver->recv_inited) {
533     grpc_slice_buffer_destroy_internal(&receiver->recv_message);
534   }
535   grpc_slice_buffer_init(&receiver->recv_message);
536   receiver->recv_inited = true;
537   do {
538     grpc_slice message_slice;
539     grpc_closure unused;
540     GPR_ASSERT(
541         sender->send_message_op->payload->send_message.send_message->Next(
542             SIZE_MAX, &unused));
543     grpc_error* error =
544         sender->send_message_op->payload->send_message.send_message->Pull(
545             &message_slice);
546     if (error != GRPC_ERROR_NONE) {
547       cancel_stream_locked(sender, GRPC_ERROR_REF(error));
548       break;
549     }
550     GPR_ASSERT(error == GRPC_ERROR_NONE);
551     remaining -= GRPC_SLICE_LENGTH(message_slice);
552     grpc_slice_buffer_add(&receiver->recv_message, message_slice);
553   } while (remaining > 0);
554   sender->send_message_op->payload->send_message.send_message.reset();
555 
556   receiver->recv_stream.Init(&receiver->recv_message, 0);
557   receiver->recv_message_op->payload->recv_message.recv_message->reset(
558       receiver->recv_stream.get());
559   INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
560              receiver);
561   GRPC_CLOSURE_SCHED(
562       receiver->recv_message_op->payload->recv_message.recv_message_ready,
563       GRPC_ERROR_NONE);
564   complete_if_batch_end_locked(
565       sender, GRPC_ERROR_NONE, sender->send_message_op,
566       "message_transfer scheduling sender on_complete");
567   complete_if_batch_end_locked(
568       receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
569       "message_transfer scheduling receiver on_complete");
570 
571   receiver->recv_message_op = nullptr;
572   sender->send_message_op = nullptr;
573 }
574 
op_state_machine(void * arg,grpc_error * error)575 static void op_state_machine(void* arg, grpc_error* error) {
576   // This function gets called when we have contents in the unprocessed reads
577   // Get what we want based on our ops wanted
578   // Schedule our appropriate closures
579   // and then return to ops_needed state if still needed
580 
581   // Since this is a closure directly invoked by the combiner, it should not
582   // unref the error parameter explicitly; the combiner will do that implicitly
583   grpc_error* new_err = GRPC_ERROR_NONE;
584 
585   bool needs_close = false;
586 
587   INPROC_LOG(GPR_INFO, "op_state_machine %p", arg);
588   inproc_stream* s = static_cast<inproc_stream*>(arg);
589   gpr_mu* mu = &s->t->mu->mu;  // keep aside in case s gets closed
590   gpr_mu_lock(mu);
591   s->op_closure_scheduled = false;
592   // cancellation takes precedence
593   inproc_stream* other = s->other_side;
594 
595   if (s->cancel_self_error != GRPC_ERROR_NONE) {
596     fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
597     goto done;
598   } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
599     fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
600     goto done;
601   } else if (error != GRPC_ERROR_NONE) {
602     fail_helper_locked(s, GRPC_ERROR_REF(error));
603     goto done;
604   }
605 
606   if (s->send_message_op && other) {
607     if (other->recv_message_op) {
608       message_transfer_locked(s, other);
609       maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
610     } else if (!s->t->is_client &&
611                (s->trailing_md_sent || other->recv_trailing_md_op)) {
612       // A server send will never be matched if the client is waiting
613       // for trailing metadata already
614       s->send_message_op->payload->send_message.send_message.reset();
615       complete_if_batch_end_locked(
616           s, GRPC_ERROR_NONE, s->send_message_op,
617           "op_state_machine scheduling send-message-on-complete");
618       s->send_message_op = nullptr;
619     }
620   }
621   // Pause a send trailing metadata if there is still an outstanding
622   // send message unless we know that the send message will never get
623   // matched to a receive. This happens on the client if the server has
624   // already sent status.
625   if (s->send_trailing_md_op &&
626       (!s->send_message_op ||
627        (s->t->is_client &&
628         (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
629     grpc_metadata_batch* dest = (other == nullptr)
630                                     ? &s->write_buffer_trailing_md
631                                     : &other->to_read_trailing_md;
632     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
633                                           : &other->to_read_trailing_md_filled;
634     if (*destfilled || s->trailing_md_sent) {
635       // The buffer is already in use; that's an error!
636       INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
637       new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
638       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
639       goto done;
640     } else {
641       if (!other || !other->closed) {
642         fill_in_metadata(s,
643                          s->send_trailing_md_op->payload->send_trailing_metadata
644                              .send_trailing_metadata,
645                          0, dest, nullptr, destfilled);
646       }
647       s->trailing_md_sent = true;
648       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
649         INPROC_LOG(GPR_INFO,
650                    "op_state_machine %p scheduling trailing-metadata-ready", s);
651         GRPC_CLOSURE_SCHED(
652             s->recv_trailing_md_op->payload->recv_trailing_metadata
653                 .recv_trailing_metadata_ready,
654             GRPC_ERROR_NONE);
655         INPROC_LOG(GPR_INFO,
656                    "op_state_machine %p scheduling trailing-md-on-complete", s);
657         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
658                            GRPC_ERROR_NONE);
659         s->recv_trailing_md_op = nullptr;
660         needs_close = true;
661       }
662     }
663     maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
664     complete_if_batch_end_locked(
665         s, GRPC_ERROR_NONE, s->send_trailing_md_op,
666         "op_state_machine scheduling send-trailing-metadata-on-complete");
667     s->send_trailing_md_op = nullptr;
668   }
669   if (s->recv_initial_md_op) {
670     if (s->initial_md_recvd) {
671       new_err =
672           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
673       INPROC_LOG(
674           GPR_INFO,
675           "op_state_machine %p scheduling on_complete errors for already "
676           "recvd initial md %p",
677           s, new_err);
678       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
679       goto done;
680     }
681 
682     if (s->to_read_initial_md_filled) {
683       s->initial_md_recvd = true;
684       new_err = fill_in_metadata(
685           s, &s->to_read_initial_md, s->to_read_initial_md_flags,
686           s->recv_initial_md_op->payload->recv_initial_metadata
687               .recv_initial_metadata,
688           s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
689           nullptr);
690       s->recv_initial_md_op->payload->recv_initial_metadata
691           .recv_initial_metadata->deadline = s->deadline;
692       if (s->recv_initial_md_op->payload->recv_initial_metadata
693               .trailing_metadata_available != nullptr) {
694         *s->recv_initial_md_op->payload->recv_initial_metadata
695              .trailing_metadata_available =
696             (other != nullptr && other->send_trailing_md_op != nullptr);
697       }
698       grpc_metadata_batch_clear(&s->to_read_initial_md);
699       s->to_read_initial_md_filled = false;
700       INPROC_LOG(GPR_INFO,
701                  "op_state_machine %p scheduling initial-metadata-ready %p", s,
702                  new_err);
703       GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
704                              .recv_initial_metadata_ready,
705                          GRPC_ERROR_REF(new_err));
706       complete_if_batch_end_locked(
707           s, new_err, s->recv_initial_md_op,
708           "op_state_machine scheduling recv-initial-metadata-on-complete");
709       s->recv_initial_md_op = nullptr;
710 
711       if (new_err != GRPC_ERROR_NONE) {
712         INPROC_LOG(GPR_INFO,
713                    "op_state_machine %p scheduling on_complete errors2 %p", s,
714                    new_err);
715         fail_helper_locked(s, GRPC_ERROR_REF(new_err));
716         goto done;
717       }
718     }
719   }
720   if (s->recv_message_op) {
721     if (other && other->send_message_op) {
722       message_transfer_locked(other, s);
723       maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
724     }
725   }
726   if (s->recv_trailing_md_op && s->t->is_client && other &&
727       other->send_message_op) {
728     INPROC_LOG(GPR_INFO,
729                "op_state_machine %p scheduling trailing-metadata-ready %p", s,
730                GRPC_ERROR_NONE);
731     GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
732                            .recv_trailing_metadata_ready,
733                        GRPC_ERROR_NONE);
734     maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
735   }
736   if (s->to_read_trailing_md_filled) {
737     if (s->trailing_md_recvd) {
738       new_err =
739           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
740       INPROC_LOG(
741           GPR_INFO,
742           "op_state_machine %p scheduling on_complete errors for already "
743           "recvd trailing md %p",
744           s, new_err);
745       fail_helper_locked(s, GRPC_ERROR_REF(new_err));
746       goto done;
747     }
748     if (s->recv_message_op != nullptr) {
749       // This message needs to be wrapped up because it will never be
750       // satisfied
751       INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
752       GRPC_CLOSURE_SCHED(
753           s->recv_message_op->payload->recv_message.recv_message_ready,
754           GRPC_ERROR_NONE);
755       complete_if_batch_end_locked(
756           s, new_err, s->recv_message_op,
757           "op_state_machine scheduling recv-message-on-complete");
758       s->recv_message_op = nullptr;
759     }
760     if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
761       // Nothing further will try to receive from this stream, so finish off
762       // any outstanding send_message op
763       s->send_message_op->payload->send_message.send_message.reset();
764       complete_if_batch_end_locked(
765           s, new_err, s->send_message_op,
766           "op_state_machine scheduling send-message-on-complete");
767       s->send_message_op = nullptr;
768     }
769     if (s->recv_trailing_md_op != nullptr) {
770       // We wanted trailing metadata and we got it
771       s->trailing_md_recvd = true;
772       new_err =
773           fill_in_metadata(s, &s->to_read_trailing_md, 0,
774                            s->recv_trailing_md_op->payload
775                                ->recv_trailing_metadata.recv_trailing_metadata,
776                            nullptr, nullptr);
777       grpc_metadata_batch_clear(&s->to_read_trailing_md);
778       s->to_read_trailing_md_filled = false;
779 
780       // We should schedule the recv_trailing_md_op completion if
781       // 1. this stream is the client-side
782       // 2. this stream is the server-side AND has already sent its trailing md
783       //    (If the server hasn't already sent its trailing md, it doesn't have
784       //     a final status, so don't mark this op complete)
785       if (s->t->is_client || s->trailing_md_sent) {
786         INPROC_LOG(GPR_INFO,
787                    "op_state_machine %p scheduling trailing-md-on-complete %p",
788                    s, new_err);
789         GRPC_CLOSURE_SCHED(
790             s->recv_trailing_md_op->payload->recv_trailing_metadata
791                 .recv_trailing_metadata_ready,
792             GRPC_ERROR_REF(new_err));
793         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
794                            GRPC_ERROR_REF(new_err));
795         s->recv_trailing_md_op = nullptr;
796         needs_close = true;
797       } else {
798         INPROC_LOG(GPR_INFO,
799                    "op_state_machine %p server needs to delay handling "
800                    "trailing-md-on-complete %p",
801                    s, new_err);
802       }
803     } else {
804       INPROC_LOG(
805           GPR_INFO,
806           "op_state_machine %p has trailing md but not yet waiting for it", s);
807     }
808   }
809   if (s->trailing_md_recvd && s->recv_message_op) {
810     // No further message will come on this stream, so finish off the
811     // recv_message_op
812     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
813     GRPC_CLOSURE_SCHED(
814         s->recv_message_op->payload->recv_message.recv_message_ready,
815         GRPC_ERROR_NONE);
816     complete_if_batch_end_locked(
817         s, new_err, s->recv_message_op,
818         "op_state_machine scheduling recv-message-on-complete");
819     s->recv_message_op = nullptr;
820   }
821   if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
822       s->send_message_op) {
823     // Nothing further will try to receive from this stream, so finish off
824     // any outstanding send_message op
825     s->send_message_op->payload->send_message.send_message.reset();
826     complete_if_batch_end_locked(
827         s, new_err, s->send_message_op,
828         "op_state_machine scheduling send-message-on-complete");
829     s->send_message_op = nullptr;
830   }
831   if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
832       s->recv_message_op || s->recv_trailing_md_op) {
833     // Didn't get the item we wanted so we still need to get
834     // rescheduled
835     INPROC_LOG(
836         GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
837         s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
838         s->recv_message_op, s->recv_trailing_md_op);
839     s->ops_needed = true;
840   }
841 done:
842   if (needs_close) {
843     close_other_side_locked(s, "op_state_machine");
844     close_stream_locked(s);
845   }
846   gpr_mu_unlock(mu);
847   GRPC_ERROR_UNREF(new_err);
848 }
849 
cancel_stream_locked(inproc_stream * s,grpc_error * error)850 static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
851   bool ret = false;  // was the cancel accepted
852   INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
853   if (s->cancel_self_error == GRPC_ERROR_NONE) {
854     ret = true;
855     s->cancel_self_error = GRPC_ERROR_REF(error);
856     maybe_schedule_op_closure_locked(s, s->cancel_self_error);
857     // Send trailing md to the other side indicating cancellation, even if we
858     // already have
859     s->trailing_md_sent = true;
860 
861     grpc_metadata_batch cancel_md;
862     grpc_metadata_batch_init(&cancel_md);
863 
864     inproc_stream* other = s->other_side;
865     grpc_metadata_batch* dest = (other == nullptr)
866                                     ? &s->write_buffer_trailing_md
867                                     : &other->to_read_trailing_md;
868     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
869                                           : &other->to_read_trailing_md_filled;
870     fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
871     grpc_metadata_batch_destroy(&cancel_md);
872 
873     if (other != nullptr) {
874       if (other->cancel_other_error == GRPC_ERROR_NONE) {
875         other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
876       }
877       maybe_schedule_op_closure_locked(other, other->cancel_other_error);
878     } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
879       s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
880     }
881 
882     // if we are a server and already received trailing md but
883     // couldn't complete that because we hadn't yet sent out trailing
884     // md, now's the chance
885     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
886       GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
887                              .recv_trailing_metadata_ready,
888                          GRPC_ERROR_REF(s->cancel_self_error));
889       complete_if_batch_end_locked(
890           s, s->cancel_self_error, s->recv_trailing_md_op,
891           "cancel_stream scheduling trailing-md-on-complete");
892       s->recv_trailing_md_op = nullptr;
893     }
894   }
895 
896   close_other_side_locked(s, "cancel_stream:other_side");
897   close_stream_locked(s);
898 
899   GRPC_ERROR_UNREF(error);
900   return ret;
901 }
902 
do_nothing(void * arg,grpc_error * error)903 static void do_nothing(void* arg, grpc_error* error) {}
904 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)905 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
906                               grpc_transport_stream_op_batch* op) {
907   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
908   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
909   gpr_mu* mu = &s->t->mu->mu;  // save aside in case s gets closed
910   gpr_mu_lock(mu);
911 
912   if (grpc_inproc_trace.enabled()) {
913     if (op->send_initial_metadata) {
914       log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
915                    s->t->is_client, true);
916     }
917     if (op->send_trailing_metadata) {
918       log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
919                    s->t->is_client, false);
920     }
921   }
922   grpc_error* error = GRPC_ERROR_NONE;
923   grpc_closure* on_complete = op->on_complete;
924   // TODO(roth): This is a hack needed because we use data inside of the
925   // closure itself to do the barrier calculation (i.e., to ensure that
926   // we don't schedule the closure until all ops in the batch have been
927   // completed).  This can go away once we move to a new C++ closure API
928   // that provides the ability to create a barrier closure.
929   if (on_complete == nullptr) {
930     on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
931                                     nullptr, grpc_schedule_on_exec_ctx);
932   }
933 
934   if (op->cancel_stream) {
935     // Call cancel_stream_locked without ref'ing the cancel_error because
936     // this function is responsible to make sure that that field gets unref'ed
937     cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
938     // this op can complete without an error
939   } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
940     // already self-canceled so still give it an error
941     error = GRPC_ERROR_REF(s->cancel_self_error);
942   } else {
943     INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
944                s->t->is_client ? "client" : "server",
945                op->send_initial_metadata ? " send_initial_metadata" : "",
946                op->send_message ? " send_message" : "",
947                op->send_trailing_metadata ? " send_trailing_metadata" : "",
948                op->recv_initial_metadata ? " recv_initial_metadata" : "",
949                op->recv_message ? " recv_message" : "",
950                op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
951   }
952 
953   bool needs_close = false;
954 
955   inproc_stream* other = s->other_side;
956   if (error == GRPC_ERROR_NONE &&
957       (op->send_initial_metadata || op->send_trailing_metadata)) {
958     if (s->t->is_closed) {
959       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
960     }
961     if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
962       grpc_metadata_batch* dest = (other == nullptr)
963                                       ? &s->write_buffer_initial_md
964                                       : &other->to_read_initial_md;
965       uint32_t* destflags = (other == nullptr)
966                                 ? &s->write_buffer_initial_md_flags
967                                 : &other->to_read_initial_md_flags;
968       bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
969                                             : &other->to_read_initial_md_filled;
970       if (*destfilled || s->initial_md_sent) {
971         // The buffer is already in use; that's an error!
972         INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
973         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
974       } else {
975         if (!other || !other->closed) {
976           fill_in_metadata(
977               s, op->payload->send_initial_metadata.send_initial_metadata,
978               op->payload->send_initial_metadata.send_initial_metadata_flags,
979               dest, destflags, destfilled);
980         }
981         if (s->t->is_client) {
982           grpc_millis* dl =
983               (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
984           *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
985                                  .send_initial_metadata->deadline);
986           s->initial_md_sent = true;
987         }
988       }
989       maybe_schedule_op_closure_locked(other, error);
990     }
991   }
992 
993   if (error == GRPC_ERROR_NONE &&
994       (op->send_message || op->send_trailing_metadata ||
995        op->recv_initial_metadata || op->recv_message ||
996        op->recv_trailing_metadata)) {
997     // Mark ops that need to be processed by the closure
998     if (op->send_message) {
999       s->send_message_op = op;
1000     }
1001     if (op->send_trailing_metadata) {
1002       s->send_trailing_md_op = op;
1003     }
1004     if (op->recv_initial_metadata) {
1005       s->recv_initial_md_op = op;
1006     }
1007     if (op->recv_message) {
1008       s->recv_message_op = op;
1009     }
1010     if (op->recv_trailing_metadata) {
1011       s->recv_trailing_md_op = op;
1012     }
1013 
1014     // We want to initiate the closure if:
1015     // 1. We want to send a message and the other side wants to receive or end
1016     // 2. We want to send trailing metadata and there isn't an unmatched send
1017     // 3. We want initial metadata and the other side has sent it
1018     // 4. We want to receive a message and there is a message ready
1019     // 5. There is trailing metadata, even if nothing specifically wants
1020     //    that because that can shut down the receive message as well
1021     if ((op->send_message && other &&
1022          ((other->recv_message_op != nullptr) ||
1023           (other->recv_trailing_md_op != nullptr))) ||
1024         (op->send_trailing_metadata && !op->send_message) ||
1025         (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1026         (op->recv_message && other && (other->send_message_op != nullptr)) ||
1027         (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1028       if (!s->op_closure_scheduled) {
1029         GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
1030         s->op_closure_scheduled = true;
1031       }
1032     } else {
1033       s->ops_needed = true;
1034     }
1035   } else {
1036     if (error != GRPC_ERROR_NONE) {
1037       // Schedule op's closures that we didn't push to op state machine
1038       if (op->recv_initial_metadata) {
1039         if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1040             nullptr) {
1041           // Set to true unconditionally, because we're failing the call, so
1042           // even if we haven't actually seen the send_trailing_metadata op
1043           // from the other side, we're going to return trailing metadata
1044           // anyway.
1045           *op->payload->recv_initial_metadata.trailing_metadata_available =
1046               true;
1047         }
1048         INPROC_LOG(
1049             GPR_INFO,
1050             "perform_stream_op error %p scheduling initial-metadata-ready %p",
1051             s, error);
1052         GRPC_CLOSURE_SCHED(
1053             op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1054             GRPC_ERROR_REF(error));
1055       }
1056       if (op->recv_message) {
1057         INPROC_LOG(
1058             GPR_INFO,
1059             "perform_stream_op error %p scheduling recv message-ready %p", s,
1060             error);
1061         GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1062                            GRPC_ERROR_REF(error));
1063       }
1064       if (op->recv_trailing_metadata) {
1065         INPROC_LOG(
1066             GPR_INFO,
1067             "perform_stream_op error %p scheduling trailing-metadata-ready %p",
1068             s, error);
1069         GRPC_CLOSURE_SCHED(
1070             op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1071             GRPC_ERROR_REF(error));
1072       }
1073     }
1074     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
1075                error);
1076     GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
1077   }
1078   if (needs_close) {
1079     close_other_side_locked(s, "perform_stream_op:other_side");
1080     close_stream_locked(s);
1081   }
1082   gpr_mu_unlock(mu);
1083   GRPC_ERROR_UNREF(error);
1084 }
1085 
close_transport_locked(inproc_transport * t)1086 static void close_transport_locked(inproc_transport* t) {
1087   INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1088   grpc_connectivity_state_set(
1089       &t->connectivity, GRPC_CHANNEL_SHUTDOWN,
1090       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."),
1091       "close transport");
1092   if (!t->is_closed) {
1093     t->is_closed = true;
1094     /* Also end all streams on this transport */
1095     while (t->stream_list != nullptr) {
1096       // cancel_stream_locked also adjusts stream list
1097       cancel_stream_locked(
1098           t->stream_list,
1099           grpc_error_set_int(
1100               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1101               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1102     }
1103   }
1104 }
1105 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1106 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1107   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1108   INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
1109   gpr_mu_lock(&t->mu->mu);
1110   if (op->on_connectivity_state_change) {
1111     grpc_connectivity_state_notify_on_state_change(
1112         &t->connectivity, op->connectivity_state,
1113         op->on_connectivity_state_change);
1114   }
1115   if (op->set_accept_stream) {
1116     t->accept_stream_cb = op->set_accept_stream_fn;
1117     t->accept_stream_data = op->set_accept_stream_user_data;
1118   }
1119   if (op->on_consumed) {
1120     GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1121   }
1122 
1123   bool do_close = false;
1124   if (op->goaway_error != GRPC_ERROR_NONE) {
1125     do_close = true;
1126     GRPC_ERROR_UNREF(op->goaway_error);
1127   }
1128   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1129     do_close = true;
1130     GRPC_ERROR_UNREF(op->disconnect_with_error);
1131   }
1132 
1133   if (do_close) {
1134     close_transport_locked(t);
1135   }
1136   gpr_mu_unlock(&t->mu->mu);
1137 }
1138 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1139 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1140                            grpc_closure* then_schedule_closure) {
1141   INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1142   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1143   s->closure_at_destroy = then_schedule_closure;
1144   really_destroy_stream(s);
1145 }
1146 
destroy_transport(grpc_transport * gt)1147 static void destroy_transport(grpc_transport* gt) {
1148   inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
1149   INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
1150   gpr_mu_lock(&t->mu->mu);
1151   close_transport_locked(t);
1152   gpr_mu_unlock(&t->mu->mu);
1153   unref_transport(t->other_side);
1154   unref_transport(t);
1155 }
1156 
1157 /*******************************************************************************
1158  * INTEGRATION GLUE
1159  */
1160 
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1161 static void set_pollset(grpc_transport* gt, grpc_stream* gs,
1162                         grpc_pollset* pollset) {
1163   // Nothing to do here
1164 }
1165 
set_pollset_set(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1166 static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
1167                             grpc_pollset_set* pollset_set) {
1168   // Nothing to do here
1169 }
1170 
get_endpoint(grpc_transport * t)1171 static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
1172 
1173 /*******************************************************************************
1174  * GLOBAL INIT AND DESTROY
1175  */
grpc_inproc_transport_init(void)1176 void grpc_inproc_transport_init(void) {
1177   grpc_core::ExecCtx exec_ctx;
1178   g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
1179 
1180   grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1181   g_fake_path_key = grpc_slice_intern(key_tmp);
1182   grpc_slice_unref_internal(key_tmp);
1183 
1184   g_fake_path_value = grpc_slice_from_static_string("/");
1185 
1186   grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1187   g_fake_auth_key = grpc_slice_intern(auth_tmp);
1188   grpc_slice_unref_internal(auth_tmp);
1189 
1190   g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1191 }
1192 
1193 static const grpc_transport_vtable inproc_vtable = {
1194     sizeof(inproc_stream), "inproc",        init_stream,
1195     set_pollset,           set_pollset_set, perform_stream_op,
1196     perform_transport_op,  destroy_stream,  destroy_transport,
1197     get_endpoint};
1198 
1199 /*******************************************************************************
1200  * Main inproc transport functions
1201  */
inproc_transports_create(grpc_transport ** server_transport,const grpc_channel_args * server_args,grpc_transport ** client_transport,const grpc_channel_args * client_args)1202 static void inproc_transports_create(grpc_transport** server_transport,
1203                                      const grpc_channel_args* server_args,
1204                                      grpc_transport** client_transport,
1205                                      const grpc_channel_args* client_args) {
1206   INPROC_LOG(GPR_INFO, "inproc_transports_create");
1207   inproc_transport* st =
1208       static_cast<inproc_transport*>(gpr_zalloc(sizeof(*st)));
1209   inproc_transport* ct =
1210       static_cast<inproc_transport*>(gpr_zalloc(sizeof(*ct)));
1211   // Share one lock between both sides since both sides get affected
1212   st->mu = ct->mu = static_cast<shared_mu*>(gpr_malloc(sizeof(*st->mu)));
1213   gpr_mu_init(&st->mu->mu);
1214   gpr_ref_init(&st->mu->refs, 2);
1215   st->base.vtable = &inproc_vtable;
1216   ct->base.vtable = &inproc_vtable;
1217   // Start each side of transport with 2 refs since they each have a ref
1218   // to the other
1219   gpr_ref_init(&st->refs, 2);
1220   gpr_ref_init(&ct->refs, 2);
1221   st->is_client = false;
1222   ct->is_client = true;
1223   grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY,
1224                                "inproc_server");
1225   grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY,
1226                                "inproc_client");
1227   st->other_side = ct;
1228   ct->other_side = st;
1229   st->stream_list = nullptr;
1230   ct->stream_list = nullptr;
1231   *server_transport = reinterpret_cast<grpc_transport*>(st);
1232   *client_transport = reinterpret_cast<grpc_transport*>(ct);
1233 }
1234 
grpc_inproc_channel_create(grpc_server * server,grpc_channel_args * args,void * reserved)1235 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1236                                          grpc_channel_args* args,
1237                                          void* reserved) {
1238   GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1239                  (server, args));
1240 
1241   grpc_core::ExecCtx exec_ctx;
1242 
1243   const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
1244 
1245   // Add a default authority channel argument for the client
1246 
1247   grpc_arg default_authority_arg;
1248   default_authority_arg.type = GRPC_ARG_STRING;
1249   default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY;
1250   default_authority_arg.value.string = (char*)"inproc.authority";
1251   grpc_channel_args* client_args =
1252       grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1253 
1254   grpc_transport* server_transport;
1255   grpc_transport* client_transport;
1256   inproc_transports_create(&server_transport, server_args, &client_transport,
1257                            client_args);
1258 
1259   grpc_server_setup_transport(server, server_transport, nullptr, server_args);
1260   grpc_channel* channel = grpc_channel_create(
1261       "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1262 
1263   // Free up created channel args
1264   grpc_channel_args_destroy(client_args);
1265 
1266   // Now finish scheduled operations
1267 
1268   return channel;
1269 }
1270 
grpc_inproc_transport_shutdown(void)1271 void grpc_inproc_transport_shutdown(void) {
1272   grpc_core::ExecCtx exec_ctx;
1273   grpc_slice_unref_internal(g_empty_slice);
1274   grpc_slice_unref_internal(g_fake_path_key);
1275   grpc_slice_unref_internal(g_fake_path_value);
1276   grpc_slice_unref_internal(g_fake_auth_key);
1277   grpc_slice_unref_internal(g_fake_auth_value);
1278 }
1279