1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <string>
28 
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65 
66 /** The maximum number of concurrent batches possible.
67     Based upon the maximum number of individually queueable ops in the batch
68     api:
69       - initial metadata send
70       - message send
71       - status/close send (depending on client/server)
72       - initial metadata recv
73       - message recv
74       - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76 
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78 
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81 
82 struct batch_control {
83   batch_control() = default;
84 
85   grpc_call* call = nullptr;
86   grpc_transport_stream_op_batch op;
87   /* Share memory for cq_completion and notify_tag as they are never needed
88      simultaneously. Each byte used in this data structure count as six bytes
89      per call, so any savings we can make are worthwhile,
90 
91      We use notify_tag to determine whether or not to send notification to the
92      completion queue. Once we've made that determination, we can reuse the
93      memory for cq_completion. */
94   union {
95     grpc_cq_completion cq_completion;
96     struct {
97       /* Any given op indicates completion by either (a) calling a closure or
98          (b) sending a notification on the call's completion queue.  If
99          \a is_closure is true, \a tag indicates a closure to be invoked;
100          otherwise, \a tag indicates the tag to be used in the notification to
101          be sent to the completion queue. */
102       void* tag;
103       bool is_closure;
104     } notify_tag;
105   } completion_data;
106   grpc_closure start_batch;
107   grpc_closure finish_batch;
108   grpc_core::Atomic<intptr_t> steps_to_complete;
109   gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110   void set_num_steps_to_complete(uintptr_t steps) {
111     steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112   }
completed_batch_stepbatch_control113   bool completed_batch_step() {
114     return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115   }
116 };
117 
118 struct parent_call {
parent_callparent_call119   parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120   ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121 
122   gpr_mu child_list_mu;
123   grpc_call* first_child = nullptr;
124 };
125 
126 struct child_call {
child_callchild_call127   explicit child_call(grpc_call* parent) : parent(parent) {}
128   grpc_call* parent;
129   /** siblings: children of the same parent form a list, and this list is
130      protected under
131       parent->mu */
132   grpc_call* sibling_next = nullptr;
133   grpc_call* sibling_prev = nullptr;
134 };
135 
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138 
139 struct grpc_call {
grpc_callgrpc_call140   grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141       : arena(arena),
142         cq(args.cq),
143         channel(args.channel),
144         is_client(args.server_transport_data == nullptr),
145         stream_op_payload(context) {
146     for (int i = 0; i < 2; i++) {
147       for (int j = 0; j < 2; j++) {
148         metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149       }
150     }
151   }
152 
~grpc_callgrpc_call153   ~grpc_call() {
154     gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155   }
156 
157   grpc_core::RefCount ext_ref;
158   grpc_core::Arena* arena;
159   grpc_core::CallCombiner call_combiner;
160   grpc_completion_queue* cq;
161   grpc_polling_entity pollent;
162   grpc_channel* channel;
163   gpr_cycle_counter start_time = gpr_get_cycle_counter();
164   /* parent_call* */ gpr_atm parent_call_atm = 0;
165   child_call* child = nullptr;
166 
167   /* client or server call */
168   bool is_client;
169   /** has grpc_call_unref been called */
170   bool destroy_called = false;
171   /** flag indicating that cancellation is inherited */
172   bool cancellation_is_inherited = false;
173   /** which ops are in-flight */
174   bool sent_initial_metadata = false;
175   bool sending_message = false;
176   bool sent_final_op = false;
177   bool received_initial_metadata = false;
178   bool receiving_message = false;
179   bool requested_final_op = false;
180   gpr_atm any_ops_sent_atm = 0;
181   gpr_atm received_final_op_atm = 0;
182 
183   batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184   grpc_transport_stream_op_batch_payload stream_op_payload;
185 
186   /* first idx: is_receiving, second idx: is_trailing */
187   grpc_metadata_batch metadata_batch[2][2] = {};
188 
189   /* Buffered read metadata waiting to be returned to the application.
190      Element 0 is initial metadata, element 1 is trailing metadata. */
191   grpc_metadata_array* buffered_metadata[2] = {};
192 
193   grpc_metadata compression_md;
194 
195   // A char* indicating the peer name.
196   gpr_atm peer_string = 0;
197 
198   /* Call data useful used for reporting. Only valid after the call has
199    * completed */
200   grpc_call_final_info final_info;
201 
202   /* Compression algorithm for *incoming* data */
203   grpc_message_compression_algorithm incoming_message_compression_algorithm =
204       GRPC_MESSAGE_COMPRESS_NONE;
205   /* Stream compression algorithm for *incoming* data */
206   grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207       GRPC_STREAM_COMPRESS_NONE;
208   /* Supported encodings (compression algorithms), a bitset.
209    * Always support no compression. */
210   uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211   /* Supported stream encodings (stream compression algorithms), a bitset */
212   uint32_t stream_encodings_accepted_by_peer = 0;
213 
214   /* Contexts for various subsystems (security, tracing, ...). */
215   grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216 
217   /* for the client, extra metadata is initial metadata; for the
218      server, it's trailing metadata */
219   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220   int send_extra_metadata_count;
221   grpc_millis send_deadline;
222 
223   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224 
225   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226   grpc_byte_buffer** receiving_buffer = nullptr;
227   grpc_slice receiving_slice = grpc_empty_slice();
228   grpc_closure receiving_slice_ready;
229   grpc_closure receiving_stream_ready;
230   grpc_closure receiving_initial_metadata_ready;
231   grpc_closure receiving_trailing_metadata_ready;
232   uint32_t test_only_last_message_flags = 0;
233   // Status about operation of call
234   bool sent_server_trailing_metadata = false;
235   gpr_atm cancelled_with_error = 0;
236 
237   grpc_closure release_call;
238 
239   union {
240     struct {
241       grpc_status_code* status;
242       grpc_slice* status_details;
243       const char** error_string;
244     } client;
245     struct {
246       int* cancelled;
247       // backpointer to owning server if this is a server side call.
248       grpc_core::Server* core_server;
249     } server;
250   } final_op;
251   gpr_atm status_error = 0;
252 
253   /* recv_state can contain one of the following values:
254      RECV_NONE :                 :  no initial metadata and messages received
255      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
256      a batch_control*            :  received messages first
257 
258                  +------1------RECV_NONE------3-----+
259                  |                                  |
260                  |                                  |
261                  v                                  v
262      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
263            |           ^                      |           ^
264            |           |                      |           |
265            +-----2-----+                      +-----4-----+
266 
267     For 1, 4: See receiving_initial_metadata_ready() function
268     For 2, 3: See receiving_stream_ready() function */
269   gpr_atm recv_state = 0;
270 };
271 
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274 
275 #define CALL_STACK_FROM_CALL(call)   \
276   (grpc_call_stack*)((char*)(call) + \
277                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279   (grpc_call*)(((char*)(call_stack)) -   \
280                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281 
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286 
287 static void execute_batch(grpc_call* call,
288                           grpc_transport_stream_op_batch* batch,
289                           grpc_closure* start_batch_closure);
290 
291 static void cancel_with_status(grpc_call* c, grpc_status_code status,
292                                const char* description);
293 static void cancel_with_error(grpc_call* c, grpc_error* error);
294 static void destroy_call(void* call_stack, grpc_error* error);
295 static void receiving_slice_ready(void* bctlp, grpc_error* error);
296 static void set_final_status(grpc_call* call, grpc_error* error);
297 static void process_data_after_md(batch_control* bctl);
298 static void post_batch_completion(batch_control* bctl);
299 
add_init_error(grpc_error ** composite,grpc_error * new_err)300 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
301   if (new_err == GRPC_ERROR_NONE) return;
302   if (*composite == GRPC_ERROR_NONE) {
303     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
304   }
305   *composite = grpc_error_add_child(*composite, new_err);
306 }
307 
grpc_call_arena_alloc(grpc_call * call,size_t size)308 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
309   return call->arena->Alloc(size);
310 }
311 
get_or_create_parent_call(grpc_call * call)312 static parent_call* get_or_create_parent_call(grpc_call* call) {
313   parent_call* p =
314       reinterpret_cast<parent_call*>(gpr_atm_acq_load(&call->parent_call_atm));
315   if (p == nullptr) {
316     p = call->arena->New<parent_call>();
317     if (!gpr_atm_rel_cas(&call->parent_call_atm,
318                          reinterpret_cast<gpr_atm>(nullptr),
319                          reinterpret_cast<gpr_atm>(p))) {
320       p->~parent_call();
321       p = reinterpret_cast<parent_call*>(
322           gpr_atm_acq_load(&call->parent_call_atm));
323     }
324   }
325   return p;
326 }
327 
get_parent_call(grpc_call * call)328 static parent_call* get_parent_call(grpc_call* call) {
329   return reinterpret_cast<parent_call*>(
330       gpr_atm_acq_load(&call->parent_call_atm));
331 }
332 
grpc_call_get_initial_size_estimate()333 size_t grpc_call_get_initial_size_estimate() {
334   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
335          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
336 }
337 
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)338 grpc_error* grpc_call_create(const grpc_call_create_args* args,
339                              grpc_call** out_call) {
340   GPR_TIMER_SCOPE("grpc_call_create", 0);
341 
342   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
343 
344   grpc_core::Arena* arena;
345   grpc_call* call;
346   grpc_error* error = GRPC_ERROR_NONE;
347   grpc_channel_stack* channel_stack =
348       grpc_channel_get_channel_stack(args->channel);
349   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
350   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
351   size_t call_and_stack_size =
352       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
353       channel_stack->call_stack_size;
354   size_t call_alloc_size =
355       call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
356 
357   std::pair<grpc_core::Arena*, void*> arena_with_call =
358       grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
359   arena = arena_with_call.first;
360   call = new (arena_with_call.second) grpc_call(arena, *args);
361   *out_call = call;
362   grpc_slice path = grpc_empty_slice();
363   if (call->is_client) {
364     call->final_op.client.status_details = nullptr;
365     call->final_op.client.status = nullptr;
366     call->final_op.client.error_string = nullptr;
367     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
368     GPR_ASSERT(args->add_initial_metadata_count <
369                MAX_SEND_EXTRA_METADATA_COUNT);
370     for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
371       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
372       if (grpc_slice_eq_static_interned(
373               GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
374         path = grpc_slice_ref_internal(
375             GRPC_MDVALUE(args->add_initial_metadata[i]));
376       }
377     }
378     call->send_extra_metadata_count =
379         static_cast<int>(args->add_initial_metadata_count);
380   } else {
381     GRPC_STATS_INC_SERVER_CALLS_CREATED();
382     call->final_op.server.cancelled = nullptr;
383     call->final_op.server.core_server = args->server;
384     GPR_ASSERT(args->add_initial_metadata_count == 0);
385     call->send_extra_metadata_count = 0;
386   }
387 
388   grpc_millis send_deadline = args->send_deadline;
389   bool immediately_cancel = false;
390 
391   if (args->parent != nullptr) {
392     call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
393                        call_and_stack_size) child_call(args->parent);
394 
395     GRPC_CALL_INTERNAL_REF(args->parent, "child");
396     GPR_ASSERT(call->is_client);
397     GPR_ASSERT(!args->parent->is_client);
398 
399     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
400       send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
401     }
402     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
403      * GRPC_PROPAGATE_STATS_CONTEXT */
404     /* TODO(ctiller): This should change to use the appropriate census start_op
405      * call. */
406     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
407       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
408         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
409                                    "Census tracing propagation requested "
410                                    "without Census context propagation"));
411       }
412       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
413                             args->parent->context[GRPC_CONTEXT_TRACING].value,
414                             nullptr);
415     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
416       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
417                                  "Census context propagation requested "
418                                  "without Census tracing propagation"));
419     }
420     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
421       call->cancellation_is_inherited = true;
422       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
423         immediately_cancel = true;
424       }
425     }
426   }
427   call->send_deadline = send_deadline;
428   /* initial refcount dropped by grpc_call_unref */
429   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
430                                       args->server_transport_data,
431                                       call->context,
432                                       path,
433                                       call->start_time,
434                                       send_deadline,
435                                       call->arena,
436                                       &call->call_combiner};
437   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
438                                               call, &call_args));
439   // Publish this call to parent only after the call stack has been initialized.
440   if (args->parent != nullptr) {
441     child_call* cc = call->child;
442     parent_call* pc = get_or_create_parent_call(args->parent);
443     gpr_mu_lock(&pc->child_list_mu);
444     if (pc->first_child == nullptr) {
445       pc->first_child = call;
446       cc->sibling_next = cc->sibling_prev = call;
447     } else {
448       cc->sibling_next = pc->first_child;
449       cc->sibling_prev = pc->first_child->child->sibling_prev;
450       cc->sibling_next->child->sibling_prev =
451           cc->sibling_prev->child->sibling_next = call;
452     }
453     gpr_mu_unlock(&pc->child_list_mu);
454   }
455 
456   if (error != GRPC_ERROR_NONE) {
457     cancel_with_error(call, GRPC_ERROR_REF(error));
458   }
459   if (immediately_cancel) {
460     cancel_with_error(call, GRPC_ERROR_CANCELLED);
461   }
462   if (args->cq != nullptr) {
463     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
464                "Only one of 'cq' and 'pollset_set_alternative' should be "
465                "non-nullptr.");
466     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
467     call->pollent =
468         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
469   }
470   if (args->pollset_set_alternative != nullptr) {
471     call->pollent = grpc_polling_entity_create_from_pollset_set(
472         args->pollset_set_alternative);
473   }
474   if (!grpc_polling_entity_is_empty(&call->pollent)) {
475     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
476                                                &call->pollent);
477   }
478 
479   if (call->is_client) {
480     grpc_core::channelz::ChannelNode* channelz_channel =
481         grpc_channel_get_channelz_node(call->channel);
482     if (channelz_channel != nullptr) {
483       channelz_channel->RecordCallStarted();
484     }
485   } else if (call->final_op.server.core_server != nullptr) {
486     grpc_core::channelz::ServerNode* channelz_node =
487         call->final_op.server.core_server->channelz_node();
488     if (channelz_node != nullptr) {
489       channelz_node->RecordCallStarted();
490     }
491   }
492 
493   grpc_slice_unref_internal(path);
494 
495   return error;
496 }
497 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)498 void grpc_call_set_completion_queue(grpc_call* call,
499                                     grpc_completion_queue* cq) {
500   GPR_ASSERT(cq);
501 
502   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
503     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
504     abort();
505   }
506   call->cq = cq;
507   GRPC_CQ_INTERNAL_REF(cq, "bind");
508   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
509   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
510                                              &call->pollent);
511 }
512 
513 #ifndef NDEBUG
514 #define REF_REASON reason
515 #define REF_ARG , const char* reason
516 #else
517 #define REF_REASON ""
518 #define REF_ARG
519 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)520 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
521   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
522 }
grpc_call_internal_unref(grpc_call * c REF_ARG)523 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
524   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
525 }
526 
release_call(void * call,grpc_error *)527 static void release_call(void* call, grpc_error* /*error*/) {
528   grpc_call* c = static_cast<grpc_call*>(call);
529   grpc_channel* channel = c->channel;
530   grpc_core::Arena* arena = c->arena;
531   c->~grpc_call();
532   grpc_channel_update_call_size_estimate(channel, arena->Destroy());
533   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
534 }
535 
destroy_call(void * call,grpc_error *)536 static void destroy_call(void* call, grpc_error* /*error*/) {
537   GPR_TIMER_SCOPE("destroy_call", 0);
538   size_t i;
539   int ii;
540   grpc_call* c = static_cast<grpc_call*>(call);
541   for (i = 0; i < 2; i++) {
542     grpc_metadata_batch_destroy(
543         &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
544   }
545   c->receiving_stream.reset();
546   parent_call* pc = get_parent_call(c);
547   if (pc != nullptr) {
548     pc->~parent_call();
549   }
550   for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
551     GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
552   }
553   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
554     if (c->context[i].destroy) {
555       c->context[i].destroy(c->context[i].value);
556     }
557   }
558   if (c->cq) {
559     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
560   }
561 
562   grpc_error* status_error =
563       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
564   grpc_error_get_status(status_error, c->send_deadline,
565                         &c->final_info.final_status, nullptr, nullptr,
566                         &(c->final_info.error_string));
567   GRPC_ERROR_UNREF(status_error);
568   c->final_info.stats.latency =
569       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
570   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
571                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
572                                             grpc_schedule_on_exec_ctx));
573 }
574 
grpc_call_ref(grpc_call * c)575 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
576 
grpc_call_unref(grpc_call * c)577 void grpc_call_unref(grpc_call* c) {
578   if (GPR_LIKELY(!c->ext_ref.Unref())) return;
579 
580   GPR_TIMER_SCOPE("grpc_call_unref", 0);
581 
582   child_call* cc = c->child;
583   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
584   grpc_core::ExecCtx exec_ctx;
585 
586   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
587 
588   if (cc) {
589     parent_call* pc = get_parent_call(cc->parent);
590     gpr_mu_lock(&pc->child_list_mu);
591     if (c == pc->first_child) {
592       pc->first_child = cc->sibling_next;
593       if (c == pc->first_child) {
594         pc->first_child = nullptr;
595       }
596     }
597     cc->sibling_prev->child->sibling_next = cc->sibling_next;
598     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
599     gpr_mu_unlock(&pc->child_list_mu);
600     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
601   }
602 
603   GPR_ASSERT(!c->destroy_called);
604   c->destroy_called = true;
605   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
606                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
607   if (cancel) {
608     cancel_with_error(c, GRPC_ERROR_CANCELLED);
609   } else {
610     // Unset the call combiner cancellation closure.  This has the
611     // effect of scheduling the previously set cancellation closure, if
612     // any, so that it can release any internal references it may be
613     // holding to the call stack. Also flush the closures on exec_ctx so that
614     // filters that schedule cancel notification closures on exec_ctx do not
615     // need to take a ref of the call stack to guarantee closure liveness.
616     c->call_combiner.SetNotifyOnCancel(nullptr);
617     grpc_core::ExecCtx::Get()->Flush();
618   }
619   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
620 }
621 
grpc_call_cancel(grpc_call * call,void * reserved)622 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
623   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
624   GPR_ASSERT(!reserved);
625   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
626   grpc_core::ExecCtx exec_ctx;
627   cancel_with_error(call, GRPC_ERROR_CANCELLED);
628   return GRPC_CALL_OK;
629 }
630 
631 // This is called via the call combiner to start sending a batch down
632 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error *)633 static void execute_batch_in_call_combiner(void* arg, grpc_error* /*ignored*/) {
634   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
635   grpc_transport_stream_op_batch* batch =
636       static_cast<grpc_transport_stream_op_batch*>(arg);
637   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
638   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
639   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
640   elem->filter->start_transport_stream_op_batch(elem, batch);
641 }
642 
643 // start_batch_closure points to a caller-allocated closure to be used
644 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)645 static void execute_batch(grpc_call* call,
646                           grpc_transport_stream_op_batch* batch,
647                           grpc_closure* start_batch_closure) {
648   batch->handler_private.extra_arg = call;
649   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
650                     grpc_schedule_on_exec_ctx);
651   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
652                            GRPC_ERROR_NONE, "executing batch");
653 }
654 
grpc_call_get_peer(grpc_call * call)655 char* grpc_call_get_peer(grpc_call* call) {
656   char* peer_string =
657       reinterpret_cast<char*>(gpr_atm_acq_load(&call->peer_string));
658   if (peer_string != nullptr) return gpr_strdup(peer_string);
659   peer_string = grpc_channel_get_target(call->channel);
660   if (peer_string != nullptr) return peer_string;
661   return gpr_strdup("unknown");
662 }
663 
grpc_call_from_top_element(grpc_call_element * surface_element)664 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
665   return CALL_FROM_TOP_ELEM(surface_element);
666 }
667 
668 /*******************************************************************************
669  * CANCELLATION
670  */
671 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)672 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
673                                              grpc_status_code status,
674                                              const char* description,
675                                              void* reserved) {
676   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
677   grpc_core::ExecCtx exec_ctx;
678   GRPC_API_TRACE(
679       "grpc_call_cancel_with_status("
680       "c=%p, status=%d, description=%s, reserved=%p)",
681       4, (c, (int)status, description, reserved));
682   GPR_ASSERT(reserved == nullptr);
683   cancel_with_status(c, status, description);
684   return GRPC_CALL_OK;
685 }
686 
687 struct cancel_state {
688   grpc_call* call;
689   grpc_closure start_batch;
690   grpc_closure finish_batch;
691 };
692 // The on_complete callback used when sending a cancel_stream batch down
693 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error *)694 static void done_termination(void* arg, grpc_error* /*error*/) {
695   cancel_state* state = static_cast<cancel_state*>(arg);
696   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
697                           "on_complete for cancel_stream op");
698   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
699   gpr_free(state);
700 }
701 
cancel_with_error(grpc_call * c,grpc_error * error)702 static void cancel_with_error(grpc_call* c, grpc_error* error) {
703   if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
704     GRPC_ERROR_UNREF(error);
705     return;
706   }
707   GRPC_CALL_INTERNAL_REF(c, "termination");
708   // Inform the call combiner of the cancellation, so that it can cancel
709   // any in-flight asynchronous actions that may be holding the call
710   // combiner.  This ensures that the cancel_stream batch can be sent
711   // down the filter stack in a timely manner.
712   c->call_combiner.Cancel(GRPC_ERROR_REF(error));
713   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
714   state->call = c;
715   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
716                     grpc_schedule_on_exec_ctx);
717   grpc_transport_stream_op_batch* op =
718       grpc_make_transport_stream_op(&state->finish_batch);
719   op->cancel_stream = true;
720   op->payload->cancel_stream.cancel_error = error;
721   execute_batch(c, op, &state->start_batch);
722 }
723 
grpc_call_cancel_internal(grpc_call * call)724 void grpc_call_cancel_internal(grpc_call* call) {
725   cancel_with_error(call, GRPC_ERROR_CANCELLED);
726 }
727 
error_from_status(grpc_status_code status,const char * description)728 static grpc_error* error_from_status(grpc_status_code status,
729                                      const char* description) {
730   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
731   // guarantee that can be short-lived.
732   return grpc_error_set_int(
733       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
734                          GRPC_ERROR_STR_GRPC_MESSAGE,
735                          grpc_slice_from_copied_string(description)),
736       GRPC_ERROR_INT_GRPC_STATUS, status);
737 }
738 
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)739 static void cancel_with_status(grpc_call* c, grpc_status_code status,
740                                const char* description) {
741   cancel_with_error(c, error_from_status(status, description));
742 }
743 
set_final_status(grpc_call * call,grpc_error * error)744 static void set_final_status(grpc_call* call, grpc_error* error) {
745   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
746     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
747     gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
748   }
749   if (call->is_client) {
750     grpc_error_get_status(error, call->send_deadline,
751                           call->final_op.client.status,
752                           call->final_op.client.status_details, nullptr,
753                           call->final_op.client.error_string);
754     // explicitly take a ref
755     grpc_slice_ref_internal(*call->final_op.client.status_details);
756     gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
757     grpc_core::channelz::ChannelNode* channelz_channel =
758         grpc_channel_get_channelz_node(call->channel);
759     if (channelz_channel != nullptr) {
760       if (*call->final_op.client.status != GRPC_STATUS_OK) {
761         channelz_channel->RecordCallFailed();
762       } else {
763         channelz_channel->RecordCallSucceeded();
764       }
765     }
766   } else {
767     *call->final_op.server.cancelled =
768         error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
769     grpc_core::channelz::ServerNode* channelz_node =
770         call->final_op.server.core_server->channelz_node();
771     if (channelz_node != nullptr) {
772       if (*call->final_op.server.cancelled ||
773           reinterpret_cast<grpc_error*>(
774               gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
775         channelz_node->RecordCallFailed();
776       } else {
777         channelz_node->RecordCallSucceeded();
778       }
779     }
780     GRPC_ERROR_UNREF(error);
781   }
782 }
783 
784 /*******************************************************************************
785  * COMPRESSION
786  */
787 
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)788 static void set_incoming_message_compression_algorithm(
789     grpc_call* call, grpc_message_compression_algorithm algo) {
790   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
791   call->incoming_message_compression_algorithm = algo;
792 }
793 
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)794 static void set_incoming_stream_compression_algorithm(
795     grpc_call* call, grpc_stream_compression_algorithm algo) {
796   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
797   call->incoming_stream_compression_algorithm = algo;
798 }
799 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)800 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
801     grpc_call* call) {
802   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
803   grpc_compression_algorithm_from_message_stream_compression_algorithm(
804       &algorithm, call->incoming_message_compression_algorithm,
805       call->incoming_stream_compression_algorithm);
806   return algorithm;
807 }
808 
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)809 static grpc_compression_algorithm compression_algorithm_for_level_locked(
810     grpc_call* call, grpc_compression_level level) {
811   return grpc_compression_algorithm_for_level(level,
812                                               call->encodings_accepted_by_peer);
813 }
814 
grpc_call_test_only_get_message_flags(grpc_call * call)815 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
816   uint32_t flags;
817   flags = call->test_only_last_message_flags;
818   return flags;
819 }
820 
destroy_encodings_accepted_by_peer(void *)821 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
822 
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)823 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
824                                            grpc_mdelem mdel,
825                                            uint32_t* encodings_accepted_by_peer,
826                                            bool stream_encoding) {
827   size_t i;
828   uint32_t algorithm;
829   grpc_slice_buffer accept_encoding_parts;
830   grpc_slice accept_encoding_slice;
831   void* accepted_user_data;
832 
833   accepted_user_data =
834       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
835   if (accepted_user_data != nullptr) {
836     *encodings_accepted_by_peer = static_cast<uint32_t>(
837         reinterpret_cast<uintptr_t>(accepted_user_data) - 1);
838     return;
839   }
840 
841   *encodings_accepted_by_peer = 0;
842 
843   accept_encoding_slice = GRPC_MDVALUE(mdel);
844   grpc_slice_buffer_init(&accept_encoding_parts);
845   grpc_slice_split_without_space(accept_encoding_slice, ",",
846                                  &accept_encoding_parts);
847 
848   GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
849   for (i = 0; i < accept_encoding_parts.count; i++) {
850     int r;
851     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
852     if (!stream_encoding) {
853       r = grpc_message_compression_algorithm_parse(
854           accept_encoding_entry_slice,
855           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
856     } else {
857       r = grpc_stream_compression_algorithm_parse(
858           accept_encoding_entry_slice,
859           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
860     }
861     if (r) {
862       GPR_BITSET(encodings_accepted_by_peer, algorithm);
863     } else {
864       char* accept_encoding_entry_str =
865           grpc_slice_to_c_string(accept_encoding_entry_slice);
866       gpr_log(GPR_DEBUG,
867               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
868               accept_encoding_entry_str);
869       gpr_free(accept_encoding_entry_str);
870     }
871   }
872 
873   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
874 
875   grpc_mdelem_set_user_data(
876       mdel, destroy_encodings_accepted_by_peer,
877       reinterpret_cast<void*>(
878           static_cast<uintptr_t>(*encodings_accepted_by_peer) + 1));
879 }
880 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)881 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
882   uint32_t encodings_accepted_by_peer;
883   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
884   return encodings_accepted_by_peer;
885 }
886 
887 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)888 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
889   return call->incoming_stream_compression_algorithm;
890 }
891 
linked_from_md(grpc_metadata * md)892 static grpc_linked_mdelem* linked_from_md(grpc_metadata* md) {
893   return reinterpret_cast<grpc_linked_mdelem*>(&md->internal_data);
894 }
895 
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)896 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
897                                   grpc_metadata* additional_metadata, int i,
898                                   int count) {
899   grpc_metadata* res =
900       i < count ? &metadata[i] : &additional_metadata[i - count];
901   GPR_ASSERT(res);
902   return res;
903 }
904 
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)905 static int prepare_application_metadata(grpc_call* call, int count,
906                                         grpc_metadata* metadata,
907                                         int is_trailing,
908                                         int prepend_extra_metadata,
909                                         grpc_metadata* additional_metadata,
910                                         int additional_metadata_count) {
911   int total_count = count + additional_metadata_count;
912   int i;
913   grpc_metadata_batch* batch =
914       &call->metadata_batch[0 /* is_receiving */][is_trailing];
915   for (i = 0; i < total_count; i++) {
916     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
917     grpc_linked_mdelem* l = linked_from_md(md);
918     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
919     if (!GRPC_LOG_IF_ERROR("validate_metadata",
920                            grpc_validate_header_key_is_legal(md->key))) {
921       break;
922     } else if (!grpc_is_binary_header_internal(md->key) &&
923                !GRPC_LOG_IF_ERROR(
924                    "validate_metadata",
925                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
926       break;
927     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
928       // HTTP2 hpack encoding has a maximum limit.
929       break;
930     }
931     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
932   }
933   if (i != total_count) {
934     for (int j = 0; j < i; j++) {
935       grpc_metadata* md = get_md_elem(metadata, additional_metadata, j, count);
936       grpc_linked_mdelem* l = linked_from_md(md);
937       GRPC_MDELEM_UNREF(l->md);
938     }
939     return 0;
940   }
941   if (prepend_extra_metadata) {
942     if (call->send_extra_metadata_count == 0) {
943       prepend_extra_metadata = 0;
944     } else {
945       for (i = 0; i < call->send_extra_metadata_count; i++) {
946         GRPC_LOG_IF_ERROR("prepare_application_metadata",
947                           grpc_metadata_batch_link_tail(
948                               batch, &call->send_extra_metadata[i]));
949       }
950     }
951   }
952   for (i = 0; i < total_count; i++) {
953     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
954     grpc_linked_mdelem* l = linked_from_md(md);
955     grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
956     if (error != GRPC_ERROR_NONE) {
957       GRPC_MDELEM_UNREF(l->md);
958     }
959     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
960   }
961   call->send_extra_metadata_count = 0;
962 
963   return 1;
964 }
965 
decode_message_compression(grpc_mdelem md)966 static grpc_message_compression_algorithm decode_message_compression(
967     grpc_mdelem md) {
968   grpc_message_compression_algorithm algorithm =
969       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
970   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
971     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
972     gpr_log(GPR_ERROR,
973             "Invalid incoming message compression algorithm: '%s'. "
974             "Interpreting incoming data as uncompressed.",
975             md_c_str);
976     gpr_free(md_c_str);
977     return GRPC_MESSAGE_COMPRESS_NONE;
978   }
979   return algorithm;
980 }
981 
decode_stream_compression(grpc_mdelem md)982 static grpc_stream_compression_algorithm decode_stream_compression(
983     grpc_mdelem md) {
984   grpc_stream_compression_algorithm algorithm =
985       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
986   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
987     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
988     gpr_log(GPR_ERROR,
989             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
990             "incoming data as uncompressed.",
991             md_c_str);
992     gpr_free(md_c_str);
993     return GRPC_STREAM_COMPRESS_NONE;
994   }
995   return algorithm;
996 }
997 
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)998 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
999                                  int is_trailing) {
1000   if (b->list.count == 0) return;
1001   if (!call->is_client && is_trailing) return;
1002   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
1003   GPR_TIMER_SCOPE("publish_app_metadata", 0);
1004   grpc_metadata_array* dest;
1005   grpc_metadata* mdusr;
1006   dest = call->buffered_metadata[is_trailing];
1007   if (dest->count + b->list.count > dest->capacity) {
1008     dest->capacity =
1009         GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1010     dest->metadata = static_cast<grpc_metadata*>(
1011         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1012   }
1013   for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1014     mdusr = &dest->metadata[dest->count++];
1015     /* we pass back borrowed slices that are valid whilst the call is valid */
1016     mdusr->key = GRPC_MDKEY(l->md);
1017     mdusr->value = GRPC_MDVALUE(l->md);
1018   }
1019 }
1020 
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1021 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1022   if (b->idx.named.content_encoding != nullptr) {
1023     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1024     set_incoming_stream_compression_algorithm(
1025         call, decode_stream_compression(b->idx.named.content_encoding->md));
1026     grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1027   }
1028   if (b->idx.named.grpc_encoding != nullptr) {
1029     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1030     set_incoming_message_compression_algorithm(
1031         call, decode_message_compression(b->idx.named.grpc_encoding->md));
1032     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1033   }
1034   uint32_t message_encodings_accepted_by_peer = 1u;
1035   uint32_t stream_encodings_accepted_by_peer = 1u;
1036   if (b->idx.named.grpc_accept_encoding != nullptr) {
1037     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1038     set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1039                                    &message_encodings_accepted_by_peer, false);
1040     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1041   }
1042   if (b->idx.named.accept_encoding != nullptr) {
1043     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1044     set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1045                                    &stream_encodings_accepted_by_peer, true);
1046     grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1047   }
1048   call->encodings_accepted_by_peer =
1049       grpc_compression_bitset_from_message_stream_compression_bitset(
1050           message_encodings_accepted_by_peer,
1051           stream_encodings_accepted_by_peer);
1052   publish_app_metadata(call, b, false);
1053 }
1054 
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1055 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1056                                  grpc_error* batch_error) {
1057   grpc_call* call = static_cast<grpc_call*>(args);
1058   if (batch_error != GRPC_ERROR_NONE) {
1059     set_final_status(call, batch_error);
1060   } else if (b->idx.named.grpc_status != nullptr) {
1061     grpc_status_code status_code =
1062         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1063     grpc_error* error = GRPC_ERROR_NONE;
1064     if (status_code != GRPC_STATUS_OK) {
1065       char* peer = grpc_call_get_peer(call);
1066       error = grpc_error_set_int(
1067           GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1068               absl::StrCat("Error received from peer ", peer).c_str()),
1069           GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1070       gpr_free(peer);
1071     }
1072     if (b->idx.named.grpc_message != nullptr) {
1073       error = grpc_error_set_str(
1074           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1075           grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1076       grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1077     } else if (error != GRPC_ERROR_NONE) {
1078       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1079                                  grpc_empty_slice());
1080     }
1081     set_final_status(call, GRPC_ERROR_REF(error));
1082     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1083     GRPC_ERROR_UNREF(error);
1084   } else if (!call->is_client) {
1085     set_final_status(call, GRPC_ERROR_NONE);
1086   } else {
1087     gpr_log(GPR_DEBUG,
1088             "Received trailing metadata with no error and no status");
1089     set_final_status(
1090         call, grpc_error_set_int(
1091                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1092                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1093   }
1094   publish_app_metadata(call, b, true);
1095 }
1096 
grpc_call_get_arena(grpc_call * call)1097 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1098 
grpc_call_get_call_stack(grpc_call * call)1099 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1100   return CALL_STACK_FROM_CALL(call);
1101 }
1102 
1103 /*******************************************************************************
1104  * BATCH API IMPLEMENTATION
1105  */
1106 
are_write_flags_valid(uint32_t flags)1107 static bool are_write_flags_valid(uint32_t flags) {
1108   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1109   const uint32_t allowed_write_positions =
1110       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1111   const uint32_t invalid_positions = ~allowed_write_positions;
1112   return !(flags & invalid_positions);
1113 }
1114 
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1115 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1116   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1117   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1118   if (!is_client) {
1119     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1120   }
1121   return !(flags & invalid_positions);
1122 }
1123 
batch_slot_for_op(grpc_op_type type)1124 static size_t batch_slot_for_op(grpc_op_type type) {
1125   switch (type) {
1126     case GRPC_OP_SEND_INITIAL_METADATA:
1127       return 0;
1128     case GRPC_OP_SEND_MESSAGE:
1129       return 1;
1130     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1131     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1132       return 2;
1133     case GRPC_OP_RECV_INITIAL_METADATA:
1134       return 3;
1135     case GRPC_OP_RECV_MESSAGE:
1136       return 4;
1137     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1138     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1139       return 5;
1140   }
1141   GPR_UNREACHABLE_CODE(return 123456789);
1142 }
1143 
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1144 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1145                                                       const grpc_op* ops) {
1146   size_t slot_idx = batch_slot_for_op(ops[0].op);
1147   batch_control** pslot = &call->active_batches[slot_idx];
1148   batch_control* bctl;
1149   if (*pslot != nullptr) {
1150     bctl = *pslot;
1151     if (bctl->call != nullptr) {
1152       return nullptr;
1153     }
1154     bctl->~batch_control();
1155     bctl->op = {};
1156   } else {
1157     bctl = call->arena->New<batch_control>();
1158     *pslot = bctl;
1159   }
1160   bctl->call = call;
1161   bctl->op.payload = &call->stream_op_payload;
1162   return bctl;
1163 }
1164 
finish_batch_completion(void * user_data,grpc_cq_completion *)1165 static void finish_batch_completion(void* user_data,
1166                                     grpc_cq_completion* /*storage*/) {
1167   batch_control* bctl = static_cast<batch_control*>(user_data);
1168   grpc_call* call = bctl->call;
1169   bctl->call = nullptr;
1170   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1171 }
1172 
reset_batch_errors(batch_control * bctl)1173 static void reset_batch_errors(batch_control* bctl) {
1174   GRPC_ERROR_UNREF(
1175       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1176   gpr_atm_rel_store(&bctl->batch_error,
1177                     reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1178 }
1179 
post_batch_completion(batch_control * bctl)1180 static void post_batch_completion(batch_control* bctl) {
1181   grpc_call* next_child_call;
1182   grpc_call* call = bctl->call;
1183   grpc_error* error = GRPC_ERROR_REF(
1184       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1185 
1186   if (bctl->op.send_initial_metadata) {
1187     grpc_metadata_batch_destroy(
1188         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1189   }
1190   if (bctl->op.send_message) {
1191     if (bctl->op.payload->send_message.stream_write_closed &&
1192         error == GRPC_ERROR_NONE) {
1193       error = grpc_error_add_child(
1194           error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1195                      "Attempt to send message after stream was closed."));
1196     }
1197     call->sending_message = false;
1198   }
1199   if (bctl->op.send_trailing_metadata) {
1200     grpc_metadata_batch_destroy(
1201         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1202   }
1203   if (bctl->op.recv_trailing_metadata) {
1204     /* propagate cancellation to any interested children */
1205     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1206     parent_call* pc = get_parent_call(call);
1207     if (pc != nullptr) {
1208       grpc_call* child;
1209       gpr_mu_lock(&pc->child_list_mu);
1210       child = pc->first_child;
1211       if (child != nullptr) {
1212         do {
1213           next_child_call = child->child->sibling_next;
1214           if (child->cancellation_is_inherited) {
1215             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1216             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1217             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1218           }
1219           child = next_child_call;
1220         } while (child != pc->first_child);
1221       }
1222       gpr_mu_unlock(&pc->child_list_mu);
1223     }
1224     GRPC_ERROR_UNREF(error);
1225     error = GRPC_ERROR_NONE;
1226   }
1227   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1228       *call->receiving_buffer != nullptr) {
1229     grpc_byte_buffer_destroy(*call->receiving_buffer);
1230     *call->receiving_buffer = nullptr;
1231   }
1232   reset_batch_errors(bctl);
1233 
1234   if (bctl->completion_data.notify_tag.is_closure) {
1235     /* unrefs error */
1236     bctl->call = nullptr;
1237     grpc_core::Closure::Run(
1238         DEBUG_LOCATION,
1239         static_cast<grpc_closure*>(bctl->completion_data.notify_tag.tag),
1240         error);
1241     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1242   } else {
1243     /* unrefs error */
1244     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1245                    finish_batch_completion, bctl,
1246                    &bctl->completion_data.cq_completion);
1247   }
1248 }
1249 
finish_batch_step(batch_control * bctl)1250 static void finish_batch_step(batch_control* bctl) {
1251   if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1252     post_batch_completion(bctl);
1253   }
1254 }
1255 
continue_receiving_slices(batch_control * bctl)1256 static void continue_receiving_slices(batch_control* bctl) {
1257   grpc_error* error;
1258   grpc_call* call = bctl->call;
1259   for (;;) {
1260     size_t remaining = call->receiving_stream->length() -
1261                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1262     if (remaining == 0) {
1263       call->receiving_message = false;
1264       call->receiving_stream.reset();
1265       finish_batch_step(bctl);
1266       return;
1267     }
1268     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1269       error = call->receiving_stream->Pull(&call->receiving_slice);
1270       if (error == GRPC_ERROR_NONE) {
1271         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1272                               call->receiving_slice);
1273       } else {
1274         call->receiving_stream.reset();
1275         grpc_byte_buffer_destroy(*call->receiving_buffer);
1276         *call->receiving_buffer = nullptr;
1277         call->receiving_message = false;
1278         finish_batch_step(bctl);
1279         GRPC_ERROR_UNREF(error);
1280         return;
1281       }
1282     } else {
1283       return;
1284     }
1285   }
1286 }
1287 
receiving_slice_ready(void * bctlp,grpc_error * error)1288 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1289   batch_control* bctl = static_cast<batch_control*>(bctlp);
1290   grpc_call* call = bctl->call;
1291   bool release_error = false;
1292 
1293   if (error == GRPC_ERROR_NONE) {
1294     grpc_slice slice;
1295     error = call->receiving_stream->Pull(&slice);
1296     if (error == GRPC_ERROR_NONE) {
1297       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1298                             slice);
1299       continue_receiving_slices(bctl);
1300     } else {
1301       /* Error returned by ByteStream::Pull() needs to be released manually */
1302       release_error = true;
1303     }
1304   }
1305 
1306   if (error != GRPC_ERROR_NONE) {
1307     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1308       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1309     }
1310     call->receiving_stream.reset();
1311     grpc_byte_buffer_destroy(*call->receiving_buffer);
1312     *call->receiving_buffer = nullptr;
1313     call->receiving_message = false;
1314     finish_batch_step(bctl);
1315     if (release_error) {
1316       GRPC_ERROR_UNREF(error);
1317     }
1318   }
1319 }
1320 
process_data_after_md(batch_control * bctl)1321 static void process_data_after_md(batch_control* bctl) {
1322   grpc_call* call = bctl->call;
1323   if (call->receiving_stream == nullptr) {
1324     *call->receiving_buffer = nullptr;
1325     call->receiving_message = false;
1326     finish_batch_step(bctl);
1327   } else {
1328     call->test_only_last_message_flags = call->receiving_stream->flags();
1329     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1330         (call->incoming_message_compression_algorithm >
1331          GRPC_MESSAGE_COMPRESS_NONE)) {
1332       grpc_compression_algorithm algo;
1333       GPR_ASSERT(
1334           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1335               &algo, call->incoming_message_compression_algorithm,
1336               (grpc_stream_compression_algorithm)0));
1337       *call->receiving_buffer =
1338           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1339     } else {
1340       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1341     }
1342     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1343                       grpc_schedule_on_exec_ctx);
1344     continue_receiving_slices(bctl);
1345   }
1346 }
1347 
receiving_stream_ready(void * bctlp,grpc_error * error)1348 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1349   batch_control* bctl = static_cast<batch_control*>(bctlp);
1350   grpc_call* call = bctl->call;
1351   if (error != GRPC_ERROR_NONE) {
1352     call->receiving_stream.reset();
1353     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1354         GRPC_ERROR_NONE) {
1355       gpr_atm_rel_store(&bctl->batch_error,
1356                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1357     }
1358     cancel_with_error(call, GRPC_ERROR_REF(error));
1359   }
1360   /* If recv_state is RECV_NONE, we will save the batch_control
1361    * object with rel_cas, and will not use it after the cas. Its corresponding
1362    * acq_load is in receiving_initial_metadata_ready() */
1363   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1364       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE,
1365                        reinterpret_cast<gpr_atm>(bctlp))) {
1366     process_data_after_md(bctl);
1367   }
1368 }
1369 
1370 // The recv_message_ready callback used when sending a batch containing
1371 // a recv_message op down the filter stack.  Yields the call combiner
1372 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1373 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1374                                                     grpc_error* error) {
1375   batch_control* bctl = static_cast<batch_control*>(bctlp);
1376   grpc_call* call = bctl->call;
1377   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1378   receiving_stream_ready(bctlp, error);
1379 }
1380 
1381 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1382 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1383   std::string error_msg = absl::StrFormat(
1384       "Incoming stream has both stream compression (%d) and message "
1385       "compression (%d).",
1386       call->incoming_stream_compression_algorithm,
1387       call->incoming_message_compression_algorithm);
1388   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1389   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1390 }
1391 
1392 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1393 handle_error_parsing_compression_algorithm(grpc_call* call) {
1394   std::string error_msg = absl::StrFormat(
1395       "Error in incoming message compression (%d) or stream "
1396       "compression (%d).",
1397       call->incoming_stream_compression_algorithm,
1398       call->incoming_message_compression_algorithm);
1399   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1400 }
1401 
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1402 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1403     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1404   std::string error_msg = absl::StrFormat(
1405       "Invalid compression algorithm value '%d'.", compression_algorithm);
1406   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1407   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1408 }
1409 
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1410 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1411     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1412   const char* algo_name = nullptr;
1413   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1414   std::string error_msg =
1415       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1416   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1417   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1418 }
1419 
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1420 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1421     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1422   const char* algo_name = nullptr;
1423   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1424   gpr_log(GPR_ERROR,
1425           "Compression algorithm ('%s') not present in the bitset of "
1426           "accepted encodings ('0x%x')",
1427           algo_name, call->encodings_accepted_by_peer);
1428 }
1429 
validate_filtered_metadata(batch_control * bctl)1430 static void validate_filtered_metadata(batch_control* bctl) {
1431   grpc_compression_algorithm compression_algorithm;
1432   grpc_call* call = bctl->call;
1433   if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1434                        GRPC_STREAM_COMPRESS_NONE &&
1435                    call->incoming_message_compression_algorithm !=
1436                        GRPC_MESSAGE_COMPRESS_NONE)) {
1437     handle_both_stream_and_msg_compression_set(call);
1438   } else if (
1439       GPR_UNLIKELY(
1440           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1441               &compression_algorithm,
1442               call->incoming_message_compression_algorithm,
1443               call->incoming_stream_compression_algorithm) == 0)) {
1444     handle_error_parsing_compression_algorithm(call);
1445   } else {
1446     const grpc_compression_options compression_options =
1447         grpc_channel_compression_options(call->channel);
1448     if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1449       handle_invalid_compression(call, compression_algorithm);
1450     } else if (GPR_UNLIKELY(
1451                    grpc_compression_options_is_algorithm_enabled_internal(
1452                        &compression_options, compression_algorithm) == 0)) {
1453       /* check if algorithm is supported by current channel config */
1454       handle_compression_algorithm_disabled(call, compression_algorithm);
1455     }
1456     /* GRPC_COMPRESS_NONE is always set. */
1457     GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1458     if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1459                                  compression_algorithm))) {
1460       if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1461         handle_compression_algorithm_not_accepted(call, compression_algorithm);
1462       }
1463     }
1464   }
1465 }
1466 
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1467 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1468   batch_control* bctl = static_cast<batch_control*>(bctlp);
1469   grpc_call* call = bctl->call;
1470 
1471   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1472 
1473   if (error == GRPC_ERROR_NONE) {
1474     grpc_metadata_batch* md =
1475         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1476     recv_initial_filter(call, md);
1477 
1478     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1479     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1480     validate_filtered_metadata(bctl);
1481 
1482     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1483       call->send_deadline = md->deadline;
1484     }
1485   } else {
1486     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1487         GRPC_ERROR_NONE) {
1488       gpr_atm_rel_store(&bctl->batch_error,
1489                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1490     }
1491     cancel_with_error(call, GRPC_ERROR_REF(error));
1492   }
1493 
1494   grpc_closure* saved_rsr_closure = nullptr;
1495   while (true) {
1496     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1497     /* Should only receive initial metadata once */
1498     GPR_ASSERT(rsr_bctlp != 1);
1499     if (rsr_bctlp == 0) {
1500       /* We haven't seen initial metadata and messages before, thus initial
1501        * metadata is received first.
1502        * no_barrier_cas is used, as this function won't access the batch_control
1503        * object saved by receiving_stream_ready() if the initial metadata is
1504        * received first. */
1505       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1506                                  RECV_INITIAL_METADATA_FIRST)) {
1507         break;
1508       }
1509     } else {
1510       /* Already received messages */
1511       saved_rsr_closure =
1512           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1513                               grpc_schedule_on_exec_ctx);
1514       /* No need to modify recv_state */
1515       break;
1516     }
1517   }
1518   if (saved_rsr_closure != nullptr) {
1519     grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1520                             GRPC_ERROR_REF(error));
1521   }
1522 
1523   finish_batch_step(bctl);
1524 }
1525 
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1526 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1527   batch_control* bctl = static_cast<batch_control*>(bctlp);
1528   grpc_call* call = bctl->call;
1529   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1530   grpc_metadata_batch* md =
1531       &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1532   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1533   finish_batch_step(bctl);
1534 }
1535 
finish_batch(void * bctlp,grpc_error * error)1536 static void finish_batch(void* bctlp, grpc_error* error) {
1537   batch_control* bctl = static_cast<batch_control*>(bctlp);
1538   grpc_call* call = bctl->call;
1539   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1540   if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1541       GRPC_ERROR_NONE) {
1542     gpr_atm_rel_store(&bctl->batch_error,
1543                       reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1544   }
1545   if (error != GRPC_ERROR_NONE) {
1546     cancel_with_error(call, GRPC_ERROR_REF(error));
1547   }
1548   finish_batch_step(bctl);
1549 }
1550 
free_no_op_completion(void *,grpc_cq_completion * completion)1551 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1552   gpr_free(completion);
1553 }
1554 
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1555 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1556                                         size_t nops, void* notify_tag,
1557                                         int is_notify_tag_closure) {
1558   GPR_TIMER_SCOPE("call_start_batch", 0);
1559 
1560   size_t i;
1561   const grpc_op* op;
1562   batch_control* bctl;
1563   bool has_send_ops = false;
1564   int num_recv_ops = 0;
1565   grpc_call_error error = GRPC_CALL_OK;
1566   grpc_transport_stream_op_batch* stream_op;
1567   grpc_transport_stream_op_batch_payload* stream_op_payload;
1568 
1569   GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1570 
1571   if (nops == 0) {
1572     if (!is_notify_tag_closure) {
1573       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1574       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1575                      free_no_op_completion, nullptr,
1576                      static_cast<grpc_cq_completion*>(
1577                          gpr_malloc(sizeof(grpc_cq_completion))));
1578     } else {
1579       grpc_core::Closure::Run(DEBUG_LOCATION,
1580                               static_cast<grpc_closure*>(notify_tag),
1581                               GRPC_ERROR_NONE);
1582     }
1583     error = GRPC_CALL_OK;
1584     goto done;
1585   }
1586 
1587   bctl = reuse_or_allocate_batch_control(call, ops);
1588   if (bctl == nullptr) {
1589     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1590   }
1591   bctl->completion_data.notify_tag.tag = notify_tag;
1592   bctl->completion_data.notify_tag.is_closure =
1593       static_cast<uint8_t>(is_notify_tag_closure != 0);
1594 
1595   stream_op = &bctl->op;
1596   stream_op_payload = &call->stream_op_payload;
1597 
1598   /* rewrite batch ops into a transport op */
1599   for (i = 0; i < nops; i++) {
1600     op = &ops[i];
1601     if (op->reserved != nullptr) {
1602       error = GRPC_CALL_ERROR;
1603       goto done_with_error;
1604     }
1605     switch (op->op) {
1606       case GRPC_OP_SEND_INITIAL_METADATA: {
1607         /* Flag validation: currently allow no flags */
1608         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1609           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1610           goto done_with_error;
1611         }
1612         if (call->sent_initial_metadata) {
1613           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1614           goto done_with_error;
1615         }
1616         // TODO(juanlishen): If the user has already specified a compression
1617         // algorithm by setting the initial metadata with key of
1618         // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1619         // with the compression algorithm mapped from compression level.
1620         /* process compression level */
1621         grpc_metadata& compression_md = call->compression_md;
1622         compression_md.key = grpc_empty_slice();
1623         compression_md.value = grpc_empty_slice();
1624         compression_md.flags = 0;
1625         size_t additional_metadata_count = 0;
1626         grpc_compression_level effective_compression_level =
1627             GRPC_COMPRESS_LEVEL_NONE;
1628         bool level_set = false;
1629         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1630           effective_compression_level =
1631               op->data.send_initial_metadata.maybe_compression_level.level;
1632           level_set = true;
1633         } else {
1634           const grpc_compression_options copts =
1635               grpc_channel_compression_options(call->channel);
1636           if (copts.default_level.is_set) {
1637             level_set = true;
1638             effective_compression_level = copts.default_level.level;
1639           }
1640         }
1641         // Currently, only server side supports compression level setting.
1642         if (level_set && !call->is_client) {
1643           const grpc_compression_algorithm calgo =
1644               compression_algorithm_for_level_locked(
1645                   call, effective_compression_level);
1646           // The following metadata will be checked and removed by the message
1647           // compression filter. It will be used as the call's compression
1648           // algorithm.
1649           compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1650           compression_md.value = grpc_compression_algorithm_slice(calgo);
1651           additional_metadata_count++;
1652         }
1653         if (op->data.send_initial_metadata.count + additional_metadata_count >
1654             INT_MAX) {
1655           error = GRPC_CALL_ERROR_INVALID_METADATA;
1656           goto done_with_error;
1657         }
1658         stream_op->send_initial_metadata = true;
1659         call->sent_initial_metadata = true;
1660         if (!prepare_application_metadata(
1661                 call, static_cast<int>(op->data.send_initial_metadata.count),
1662                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1663                 &compression_md, static_cast<int>(additional_metadata_count))) {
1664           error = GRPC_CALL_ERROR_INVALID_METADATA;
1665           goto done_with_error;
1666         }
1667         /* TODO(ctiller): just make these the same variable? */
1668         if (call->is_client) {
1669           call->metadata_batch[0][0].deadline = call->send_deadline;
1670         }
1671         stream_op_payload->send_initial_metadata.send_initial_metadata =
1672             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1673         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1674             op->flags;
1675         if (call->is_client) {
1676           stream_op_payload->send_initial_metadata.peer_string =
1677               &call->peer_string;
1678         }
1679         has_send_ops = true;
1680         break;
1681       }
1682       case GRPC_OP_SEND_MESSAGE: {
1683         if (!are_write_flags_valid(op->flags)) {
1684           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1685           goto done_with_error;
1686         }
1687         if (op->data.send_message.send_message == nullptr) {
1688           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1689           goto done_with_error;
1690         }
1691         if (call->sending_message) {
1692           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1693           goto done_with_error;
1694         }
1695         uint32_t flags = op->flags;
1696         /* If the outgoing buffer is already compressed, mark it as so in the
1697            flags. These will be picked up by the compression filter and further
1698            (wasteful) attempts at compression skipped. */
1699         if (op->data.send_message.send_message->data.raw.compression >
1700             GRPC_COMPRESS_NONE) {
1701           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1702         }
1703         stream_op->send_message = true;
1704         call->sending_message = true;
1705         call->sending_stream.Init(
1706             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1707         stream_op_payload->send_message.send_message.reset(
1708             call->sending_stream.get());
1709         has_send_ops = true;
1710         break;
1711       }
1712       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1713         /* Flag validation: currently allow no flags */
1714         if (op->flags != 0) {
1715           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1716           goto done_with_error;
1717         }
1718         if (!call->is_client) {
1719           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1720           goto done_with_error;
1721         }
1722         if (call->sent_final_op) {
1723           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1724           goto done_with_error;
1725         }
1726         stream_op->send_trailing_metadata = true;
1727         call->sent_final_op = true;
1728         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1729             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1730         has_send_ops = true;
1731         break;
1732       }
1733       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1734         /* Flag validation: currently allow no flags */
1735         if (op->flags != 0) {
1736           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1737           goto done_with_error;
1738         }
1739         if (call->is_client) {
1740           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1741           goto done_with_error;
1742         }
1743         if (call->sent_final_op) {
1744           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1745           goto done_with_error;
1746         }
1747         if (op->data.send_status_from_server.trailing_metadata_count >
1748             INT_MAX) {
1749           error = GRPC_CALL_ERROR_INVALID_METADATA;
1750           goto done_with_error;
1751         }
1752         stream_op->send_trailing_metadata = true;
1753         call->sent_final_op = true;
1754         GPR_ASSERT(call->send_extra_metadata_count == 0);
1755         call->send_extra_metadata_count = 1;
1756         call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1757             op->data.send_status_from_server.status);
1758         grpc_error* status_error =
1759             op->data.send_status_from_server.status == GRPC_STATUS_OK
1760                 ? GRPC_ERROR_NONE
1761                 : grpc_error_set_int(
1762                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1763                           "Server returned error"),
1764                       GRPC_ERROR_INT_GRPC_STATUS,
1765                       static_cast<intptr_t>(
1766                           op->data.send_status_from_server.status));
1767         if (op->data.send_status_from_server.status_details != nullptr) {
1768           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1769               GRPC_MDSTR_GRPC_MESSAGE,
1770               grpc_slice_ref_internal(
1771                   *op->data.send_status_from_server.status_details));
1772           call->send_extra_metadata_count++;
1773           if (status_error != GRPC_ERROR_NONE) {
1774             char* msg = grpc_slice_to_c_string(
1775                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1776             status_error =
1777                 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1778                                    grpc_slice_from_copied_string(msg));
1779             gpr_free(msg);
1780           }
1781         }
1782 
1783         gpr_atm_rel_store(&call->status_error,
1784                           reinterpret_cast<gpr_atm>(status_error));
1785         if (!prepare_application_metadata(
1786                 call,
1787                 static_cast<int>(
1788                     op->data.send_status_from_server.trailing_metadata_count),
1789                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1790                 nullptr, 0)) {
1791           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1792             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1793           }
1794           call->send_extra_metadata_count = 0;
1795           error = GRPC_CALL_ERROR_INVALID_METADATA;
1796           goto done_with_error;
1797         }
1798         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1799             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1800         stream_op_payload->send_trailing_metadata.sent =
1801             &call->sent_server_trailing_metadata;
1802         has_send_ops = true;
1803         break;
1804       }
1805       case GRPC_OP_RECV_INITIAL_METADATA: {
1806         /* Flag validation: currently allow no flags */
1807         if (op->flags != 0) {
1808           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1809           goto done_with_error;
1810         }
1811         if (call->received_initial_metadata) {
1812           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1813           goto done_with_error;
1814         }
1815         call->received_initial_metadata = true;
1816         call->buffered_metadata[0] =
1817             op->data.recv_initial_metadata.recv_initial_metadata;
1818         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1819                           receiving_initial_metadata_ready, bctl,
1820                           grpc_schedule_on_exec_ctx);
1821         stream_op->recv_initial_metadata = true;
1822         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1823             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1824         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1825             &call->receiving_initial_metadata_ready;
1826         if (!call->is_client) {
1827           stream_op_payload->recv_initial_metadata.peer_string =
1828               &call->peer_string;
1829         }
1830         ++num_recv_ops;
1831         break;
1832       }
1833       case GRPC_OP_RECV_MESSAGE: {
1834         /* Flag validation: currently allow no flags */
1835         if (op->flags != 0) {
1836           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1837           goto done_with_error;
1838         }
1839         if (call->receiving_message) {
1840           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1841           goto done_with_error;
1842         }
1843         call->receiving_message = true;
1844         stream_op->recv_message = true;
1845         call->receiving_buffer = op->data.recv_message.recv_message;
1846         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1847         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1848                           receiving_stream_ready_in_call_combiner, bctl,
1849                           grpc_schedule_on_exec_ctx);
1850         stream_op_payload->recv_message.recv_message_ready =
1851             &call->receiving_stream_ready;
1852         ++num_recv_ops;
1853         break;
1854       }
1855       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1856         /* Flag validation: currently allow no flags */
1857         if (op->flags != 0) {
1858           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1859           goto done_with_error;
1860         }
1861         if (!call->is_client) {
1862           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1863           goto done_with_error;
1864         }
1865         if (call->requested_final_op) {
1866           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1867           goto done_with_error;
1868         }
1869         call->requested_final_op = true;
1870         call->buffered_metadata[1] =
1871             op->data.recv_status_on_client.trailing_metadata;
1872         call->final_op.client.status = op->data.recv_status_on_client.status;
1873         call->final_op.client.status_details =
1874             op->data.recv_status_on_client.status_details;
1875         call->final_op.client.error_string =
1876             op->data.recv_status_on_client.error_string;
1877         stream_op->recv_trailing_metadata = true;
1878         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1879             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1880         stream_op_payload->recv_trailing_metadata.collect_stats =
1881             &call->final_info.stats.transport_stream_stats;
1882         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1883                           receiving_trailing_metadata_ready, bctl,
1884                           grpc_schedule_on_exec_ctx);
1885         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1886             &call->receiving_trailing_metadata_ready;
1887         ++num_recv_ops;
1888         break;
1889       }
1890       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1891         /* Flag validation: currently allow no flags */
1892         if (op->flags != 0) {
1893           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1894           goto done_with_error;
1895         }
1896         if (call->is_client) {
1897           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1898           goto done_with_error;
1899         }
1900         if (call->requested_final_op) {
1901           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1902           goto done_with_error;
1903         }
1904         call->requested_final_op = true;
1905         call->final_op.server.cancelled =
1906             op->data.recv_close_on_server.cancelled;
1907         stream_op->recv_trailing_metadata = true;
1908         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1909             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1910         stream_op_payload->recv_trailing_metadata.collect_stats =
1911             &call->final_info.stats.transport_stream_stats;
1912         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1913                           receiving_trailing_metadata_ready, bctl,
1914                           grpc_schedule_on_exec_ctx);
1915         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1916             &call->receiving_trailing_metadata_ready;
1917         ++num_recv_ops;
1918         break;
1919       }
1920     }
1921   }
1922 
1923   GRPC_CALL_INTERNAL_REF(call, "completion");
1924   if (!is_notify_tag_closure) {
1925     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1926   }
1927   bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1928 
1929   if (has_send_ops) {
1930     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1931                       grpc_schedule_on_exec_ctx);
1932     stream_op->on_complete = &bctl->finish_batch;
1933   }
1934 
1935   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1936   execute_batch(call, stream_op, &bctl->start_batch);
1937 
1938 done:
1939   return error;
1940 
1941 done_with_error:
1942   /* reverse any mutations that occurred */
1943   if (stream_op->send_initial_metadata) {
1944     call->sent_initial_metadata = false;
1945     grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1946   }
1947   if (stream_op->send_message) {
1948     call->sending_message = false;
1949     call->sending_stream->Orphan();
1950   }
1951   if (stream_op->send_trailing_metadata) {
1952     call->sent_final_op = false;
1953     grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1954   }
1955   if (stream_op->recv_initial_metadata) {
1956     call->received_initial_metadata = false;
1957   }
1958   if (stream_op->recv_message) {
1959     call->receiving_message = false;
1960   }
1961   if (stream_op->recv_trailing_metadata) {
1962     call->requested_final_op = false;
1963   }
1964   goto done;
1965 }
1966 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1967 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1968                                       size_t nops, void* tag, void* reserved) {
1969   grpc_call_error err;
1970 
1971   GRPC_API_TRACE(
1972       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1973       "reserved=%p)",
1974       5, (call, ops, (unsigned long)nops, tag, reserved));
1975 
1976   if (reserved != nullptr) {
1977     err = GRPC_CALL_ERROR;
1978   } else {
1979     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1980     grpc_core::ExecCtx exec_ctx;
1981     err = call_start_batch(call, ops, nops, tag, 0);
1982   }
1983 
1984   return err;
1985 }
1986 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1987 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1988                                                   const grpc_op* ops,
1989                                                   size_t nops,
1990                                                   grpc_closure* closure) {
1991   return call_start_batch(call, ops, nops, closure, 1);
1992 }
1993 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1994 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1995                            void* value, void (*destroy)(void* value)) {
1996   if (call->context[elem].destroy) {
1997     call->context[elem].destroy(call->context[elem].value);
1998   }
1999   call->context[elem].value = value;
2000   call->context[elem].destroy = destroy;
2001 }
2002 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)2003 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
2004   return call->context[elem].value;
2005 }
2006 
grpc_call_is_client(grpc_call * call)2007 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2008 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2009 grpc_compression_algorithm grpc_call_compression_for_level(
2010     grpc_call* call, grpc_compression_level level) {
2011   grpc_compression_algorithm algo =
2012       compression_algorithm_for_level_locked(call, level);
2013   return algo;
2014 }
2015 
grpc_call_error_to_string(grpc_call_error error)2016 const char* grpc_call_error_to_string(grpc_call_error error) {
2017   switch (error) {
2018     case GRPC_CALL_ERROR:
2019       return "GRPC_CALL_ERROR";
2020     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2021       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2022     case GRPC_CALL_ERROR_ALREADY_FINISHED:
2023       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2024     case GRPC_CALL_ERROR_ALREADY_INVOKED:
2025       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2026     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2027       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2028     case GRPC_CALL_ERROR_INVALID_FLAGS:
2029       return "GRPC_CALL_ERROR_INVALID_FLAGS";
2030     case GRPC_CALL_ERROR_INVALID_MESSAGE:
2031       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2032     case GRPC_CALL_ERROR_INVALID_METADATA:
2033       return "GRPC_CALL_ERROR_INVALID_METADATA";
2034     case GRPC_CALL_ERROR_NOT_INVOKED:
2035       return "GRPC_CALL_ERROR_NOT_INVOKED";
2036     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2037       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2038     case GRPC_CALL_ERROR_NOT_ON_SERVER:
2039       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2040     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2041       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2042     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2043       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2044     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2045       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2046     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2047       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2048     case GRPC_CALL_OK:
2049       return "GRPC_CALL_OK";
2050   }
2051   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2052 }
2053