1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <grpc/compression.h>
28 #include <grpc/grpc.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/compression/algorithm_metadata.h"
36 #include "src/core/lib/debug/stats.h"
37 #include "src/core/lib/gpr/alloc.h"
38 #include "src/core/lib/gpr/arena.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/profiling/timers.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 #include "src/core/lib/slice/slice_string_helpers.h"
46 #include "src/core/lib/surface/api_trace.h"
47 #include "src/core/lib/surface/call.h"
48 #include "src/core/lib/surface/call_test_only.h"
49 #include "src/core/lib/surface/channel.h"
50 #include "src/core/lib/surface/completion_queue.h"
51 #include "src/core/lib/surface/server.h"
52 #include "src/core/lib/surface/validate_metadata.h"
53 #include "src/core/lib/transport/error_utils.h"
54 #include "src/core/lib/transport/metadata.h"
55 #include "src/core/lib/transport/static_metadata.h"
56 #include "src/core/lib/transport/status_metadata.h"
57 #include "src/core/lib/transport/transport.h"
58 
59 /** The maximum number of concurrent batches possible.
60     Based upon the maximum number of individually queueable ops in the batch
61     api:
62       - initial metadata send
63       - message send
64       - status/close send (depending on client/server)
65       - initial metadata recv
66       - message recv
67       - status/close recv (depending on client/server) */
68 #define MAX_CONCURRENT_BATCHES 6
69 
70 #define MAX_SEND_EXTRA_METADATA_COUNT 3
71 
72 // Used to create arena for the first call.
73 #define ESTIMATED_MDELEM_COUNT 16
74 
75 typedef struct batch_control {
76   grpc_call* call;
77   /* Share memory for cq_completion and notify_tag as they are never needed
78      simultaneously. Each byte used in this data structure count as six bytes
79      per call, so any savings we can make are worthwhile,
80 
81      We use notify_tag to determine whether or not to send notification to the
82      completion queue. Once we've made that determination, we can reuse the
83      memory for cq_completion. */
84   union {
85     grpc_cq_completion cq_completion;
86     struct {
87       /* Any given op indicates completion by either (a) calling a closure or
88          (b) sending a notification on the call's completion queue.  If
89          \a is_closure is true, \a tag indicates a closure to be invoked;
90          otherwise, \a tag indicates the tag to be used in the notification to
91          be sent to the completion queue. */
92       void* tag;
93       bool is_closure;
94     } notify_tag;
95   } completion_data;
96   grpc_closure start_batch;
97   grpc_closure finish_batch;
98   gpr_refcount steps_to_complete;
99   gpr_atm batch_error;
100   grpc_transport_stream_op_batch op;
101 } batch_control;
102 
103 typedef struct {
104   gpr_mu child_list_mu;
105   grpc_call* first_child;
106 } parent_call;
107 
108 typedef struct {
109   grpc_call* parent;
110   /** siblings: children of the same parent form a list, and this list is
111      protected under
112       parent->mu */
113   grpc_call* sibling_next;
114   grpc_call* sibling_prev;
115 } child_call;
116 
117 #define RECV_NONE ((gpr_atm)0)
118 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
119 
120 struct grpc_call {
121   gpr_refcount ext_ref;
122   gpr_arena* arena;
123   grpc_call_combiner call_combiner;
124   grpc_completion_queue* cq;
125   grpc_polling_entity pollent;
126   grpc_channel* channel;
127   gpr_timespec start_time;
128   /* parent_call* */ gpr_atm parent_call_atm;
129   child_call* child;
130 
131   /* client or server call */
132   bool is_client;
133   /** has grpc_call_unref been called */
134   bool destroy_called;
135   /** flag indicating that cancellation is inherited */
136   bool cancellation_is_inherited;
137   /** which ops are in-flight */
138   bool sent_initial_metadata;
139   bool sending_message;
140   bool sent_final_op;
141   bool received_initial_metadata;
142   bool receiving_message;
143   bool requested_final_op;
144   gpr_atm any_ops_sent_atm;
145   gpr_atm received_final_op_atm;
146 
147   batch_control* active_batches[MAX_CONCURRENT_BATCHES];
148   grpc_transport_stream_op_batch_payload stream_op_payload;
149 
150   /* first idx: is_receiving, second idx: is_trailing */
151   grpc_metadata_batch metadata_batch[2][2];
152 
153   /* Buffered read metadata waiting to be returned to the application.
154      Element 0 is initial metadata, element 1 is trailing metadata. */
155   grpc_metadata_array* buffered_metadata[2];
156 
157   grpc_metadata compression_md;
158 
159   // A char* indicating the peer name.
160   gpr_atm peer_string;
161 
162   /* Call data useful used for reporting. Only valid after the call has
163    * completed */
164   grpc_call_final_info final_info;
165 
166   /* Compression algorithm for *incoming* data */
167   grpc_message_compression_algorithm incoming_message_compression_algorithm;
168   /* Stream compression algorithm for *incoming* data */
169   grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
170   /* Supported encodings (compression algorithms), a bitset */
171   uint32_t encodings_accepted_by_peer;
172   /* Supported stream encodings (stream compression algorithms), a bitset */
173   uint32_t stream_encodings_accepted_by_peer;
174 
175   /* Contexts for various subsystems (security, tracing, ...). */
176   grpc_call_context_element context[GRPC_CONTEXT_COUNT];
177 
178   /* for the client, extra metadata is initial metadata; for the
179      server, it's trailing metadata */
180   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
181   int send_extra_metadata_count;
182   grpc_millis send_deadline;
183 
184   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
185 
186   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
187   grpc_byte_buffer** receiving_buffer;
188   grpc_slice receiving_slice;
189   grpc_closure receiving_slice_ready;
190   grpc_closure receiving_stream_ready;
191   grpc_closure receiving_initial_metadata_ready;
192   grpc_closure receiving_trailing_metadata_ready;
193   uint32_t test_only_last_message_flags;
194   gpr_atm cancelled;
195 
196   grpc_closure release_call;
197 
198   union {
199     struct {
200       grpc_status_code* status;
201       grpc_slice* status_details;
202       const char** error_string;
203     } client;
204     struct {
205       int* cancelled;
206       // backpointer to owning server if this is a server side call.
207       grpc_server* server;
208     } server;
209   } final_op;
210   gpr_atm status_error;
211 
212   /* recv_state can contain one of the following values:
213      RECV_NONE :                 :  no initial metadata and messages received
214      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
215      a batch_control*            :  received messages first
216 
217                  +------1------RECV_NONE------3-----+
218                  |                                  |
219                  |                                  |
220                  v                                  v
221      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
222            |           ^                      |           ^
223            |           |                      |           |
224            +-----2-----+                      +-----4-----+
225 
226     For 1, 4: See receiving_initial_metadata_ready() function
227     For 2, 3: See receiving_stream_ready() function */
228   gpr_atm recv_state;
229 };
230 
231 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
232 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
233 
234 #define CALL_STACK_FROM_CALL(call)   \
235   (grpc_call_stack*)((char*)(call) + \
236                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
237 #define CALL_FROM_CALL_STACK(call_stack) \
238   (grpc_call*)(((char*)(call_stack)) -   \
239                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
240 
241 #define CALL_ELEM_FROM_CALL(call, idx) \
242   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
243 #define CALL_FROM_TOP_ELEM(top_elem) \
244   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
245 
246 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
247                           grpc_closure* start_batch_closure);
248 
249 static void cancel_with_status(grpc_call* c, grpc_status_code status,
250                                const char* description);
251 static void cancel_with_error(grpc_call* c, grpc_error* error);
252 static void destroy_call(void* call_stack, grpc_error* error);
253 static void receiving_slice_ready(void* bctlp, grpc_error* error);
254 static void set_final_status(grpc_call* call, grpc_error* error);
255 static void process_data_after_md(batch_control* bctl);
256 static void post_batch_completion(batch_control* bctl);
257 
add_init_error(grpc_error ** composite,grpc_error * new_err)258 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
259   if (new_err == GRPC_ERROR_NONE) return;
260   if (*composite == GRPC_ERROR_NONE)
261     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
262   *composite = grpc_error_add_child(*composite, new_err);
263 }
264 
grpc_call_arena_alloc(grpc_call * call,size_t size)265 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
266   return gpr_arena_alloc(call->arena, size);
267 }
268 
get_or_create_parent_call(grpc_call * call)269 static parent_call* get_or_create_parent_call(grpc_call* call) {
270   parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
271   if (p == nullptr) {
272     p = static_cast<parent_call*>(gpr_arena_alloc(call->arena, sizeof(*p)));
273     gpr_mu_init(&p->child_list_mu);
274     if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
275                          (gpr_atm)p)) {
276       gpr_mu_destroy(&p->child_list_mu);
277       p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
278     }
279   }
280   return p;
281 }
282 
get_parent_call(grpc_call * call)283 static parent_call* get_parent_call(grpc_call* call) {
284   return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
285 }
286 
grpc_call_get_initial_size_estimate()287 size_t grpc_call_get_initial_size_estimate() {
288   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
289          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
290 }
291 
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)292 grpc_error* grpc_call_create(const grpc_call_create_args* args,
293                              grpc_call** out_call) {
294   GPR_TIMER_SCOPE("grpc_call_create", 0);
295   size_t i, j;
296   grpc_error* error = GRPC_ERROR_NONE;
297   grpc_channel_stack* channel_stack =
298       grpc_channel_get_channel_stack(args->channel);
299   grpc_call* call;
300   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
301   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
302   gpr_arena* arena = gpr_arena_create(initial_size);
303   call = static_cast<grpc_call*>(
304       gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
305                                  channel_stack->call_stack_size));
306   gpr_ref_init(&call->ext_ref, 1);
307   gpr_atm_no_barrier_store(&call->cancelled, 0);
308   call->arena = arena;
309   grpc_call_combiner_init(&call->call_combiner);
310   *out_call = call;
311   call->channel = args->channel;
312   call->cq = args->cq;
313   call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
314   /* Always support no compression */
315   GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
316   call->is_client = args->server_transport_data == nullptr;
317   call->stream_op_payload.context = call->context;
318   grpc_slice path = grpc_empty_slice();
319   if (call->is_client) {
320     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
321     GPR_ASSERT(args->add_initial_metadata_count <
322                MAX_SEND_EXTRA_METADATA_COUNT);
323     for (i = 0; i < args->add_initial_metadata_count; i++) {
324       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
325       if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
326                         GRPC_MDSTR_PATH)) {
327         path = grpc_slice_ref_internal(
328             GRPC_MDVALUE(args->add_initial_metadata[i]));
329       }
330     }
331     call->send_extra_metadata_count =
332         static_cast<int>(args->add_initial_metadata_count);
333   } else {
334     GRPC_STATS_INC_SERVER_CALLS_CREATED();
335     call->final_op.server.server = args->server;
336     GPR_ASSERT(args->add_initial_metadata_count == 0);
337     call->send_extra_metadata_count = 0;
338   }
339   for (i = 0; i < 2; i++) {
340     for (j = 0; j < 2; j++) {
341       call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
342     }
343   }
344   grpc_millis send_deadline = args->send_deadline;
345 
346   bool immediately_cancel = false;
347 
348   if (args->parent != nullptr) {
349     call->child =
350         static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call)));
351     call->child->parent = args->parent;
352 
353     GRPC_CALL_INTERNAL_REF(args->parent, "child");
354     GPR_ASSERT(call->is_client);
355     GPR_ASSERT(!args->parent->is_client);
356 
357     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
358       send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
359     }
360     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
361      * GRPC_PROPAGATE_STATS_CONTEXT */
362     /* TODO(ctiller): This should change to use the appropriate census start_op
363      * call. */
364     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
365       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
366         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
367                                    "Census tracing propagation requested "
368                                    "without Census context propagation"));
369       }
370       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
371                             args->parent->context[GRPC_CONTEXT_TRACING].value,
372                             nullptr);
373     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
374       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
375                                  "Census context propagation requested "
376                                  "without Census tracing propagation"));
377     }
378     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
379       call->cancellation_is_inherited = 1;
380       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
381         immediately_cancel = true;
382       }
383     }
384   }
385 
386   call->send_deadline = send_deadline;
387 
388   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
389   /* initial refcount dropped by grpc_call_unref */
390   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
391                                       args->server_transport_data,
392                                       call->context,
393                                       path,
394                                       call->start_time,
395                                       send_deadline,
396                                       call->arena,
397                                       &call->call_combiner};
398   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
399                                               call, &call_args));
400   // Publish this call to parent only after the call stack has been initialized.
401   if (args->parent != nullptr) {
402     child_call* cc = call->child;
403     parent_call* pc = get_or_create_parent_call(args->parent);
404     gpr_mu_lock(&pc->child_list_mu);
405     if (pc->first_child == nullptr) {
406       pc->first_child = call;
407       cc->sibling_next = cc->sibling_prev = call;
408     } else {
409       cc->sibling_next = pc->first_child;
410       cc->sibling_prev = pc->first_child->child->sibling_prev;
411       cc->sibling_next->child->sibling_prev =
412           cc->sibling_prev->child->sibling_next = call;
413     }
414     gpr_mu_unlock(&pc->child_list_mu);
415   }
416   if (error != GRPC_ERROR_NONE) {
417     cancel_with_error(call, GRPC_ERROR_REF(error));
418   }
419   if (immediately_cancel) {
420     cancel_with_error(call, GRPC_ERROR_CANCELLED);
421   }
422   if (args->cq != nullptr) {
423     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
424                "Only one of 'cq' and 'pollset_set_alternative' should be "
425                "non-nullptr.");
426     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
427     call->pollent =
428         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
429   }
430   if (args->pollset_set_alternative != nullptr) {
431     call->pollent = grpc_polling_entity_create_from_pollset_set(
432         args->pollset_set_alternative);
433   }
434   if (!grpc_polling_entity_is_empty(&call->pollent)) {
435     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
436                                                &call->pollent);
437   }
438 
439   if (call->is_client) {
440     grpc_core::channelz::ChannelNode* channelz_channel =
441         grpc_channel_get_channelz_node(call->channel);
442     if (channelz_channel != nullptr) {
443       channelz_channel->RecordCallStarted();
444     }
445   } else {
446     grpc_core::channelz::ServerNode* channelz_server =
447         grpc_server_get_channelz_node(call->final_op.server.server);
448     if (channelz_server != nullptr) {
449       channelz_server->RecordCallStarted();
450     }
451   }
452 
453   grpc_slice_unref_internal(path);
454 
455   return error;
456 }
457 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)458 void grpc_call_set_completion_queue(grpc_call* call,
459                                     grpc_completion_queue* cq) {
460   GPR_ASSERT(cq);
461 
462   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
463     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
464     abort();
465   }
466   call->cq = cq;
467   GRPC_CQ_INTERNAL_REF(cq, "bind");
468   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
469   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
470                                              &call->pollent);
471 }
472 
473 #ifndef NDEBUG
474 #define REF_REASON reason
475 #define REF_ARG , const char* reason
476 #else
477 #define REF_REASON ""
478 #define REF_ARG
479 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)480 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
481   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
482 }
grpc_call_internal_unref(grpc_call * c REF_ARG)483 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
484   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
485 }
486 
release_call(void * call,grpc_error * error)487 static void release_call(void* call, grpc_error* error) {
488   grpc_call* c = static_cast<grpc_call*>(call);
489   grpc_channel* channel = c->channel;
490   gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
491   grpc_call_combiner_destroy(&c->call_combiner);
492   grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
493   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
494 }
495 
destroy_call(void * call,grpc_error * error)496 static void destroy_call(void* call, grpc_error* error) {
497   GPR_TIMER_SCOPE("destroy_call", 0);
498   size_t i;
499   int ii;
500   grpc_call* c = static_cast<grpc_call*>(call);
501   for (i = 0; i < 2; i++) {
502     grpc_metadata_batch_destroy(
503         &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
504   }
505   c->receiving_stream.reset();
506   parent_call* pc = get_parent_call(c);
507   if (pc != nullptr) {
508     gpr_mu_destroy(&pc->child_list_mu);
509   }
510   for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
511     GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
512   }
513   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
514     if (c->context[i].destroy) {
515       c->context[i].destroy(c->context[i].value);
516     }
517   }
518   if (c->cq) {
519     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
520   }
521 
522   grpc_error* status_error =
523       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
524   grpc_error_get_status(status_error, c->send_deadline,
525                         &c->final_info.final_status, nullptr, nullptr,
526                         &(c->final_info.error_string));
527   GRPC_ERROR_UNREF(status_error);
528   c->final_info.stats.latency =
529       gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
530 
531   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
532                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
533                                             grpc_schedule_on_exec_ctx));
534 }
535 
grpc_call_ref(grpc_call * c)536 void grpc_call_ref(grpc_call* c) { gpr_ref(&c->ext_ref); }
537 
grpc_call_unref(grpc_call * c)538 void grpc_call_unref(grpc_call* c) {
539   if (!gpr_unref(&c->ext_ref)) return;
540 
541   GPR_TIMER_SCOPE("grpc_call_unref", 0);
542 
543   child_call* cc = c->child;
544   grpc_core::ExecCtx exec_ctx;
545 
546   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
547 
548   if (cc) {
549     parent_call* pc = get_parent_call(cc->parent);
550     gpr_mu_lock(&pc->child_list_mu);
551     if (c == pc->first_child) {
552       pc->first_child = cc->sibling_next;
553       if (c == pc->first_child) {
554         pc->first_child = nullptr;
555       }
556     }
557     cc->sibling_prev->child->sibling_next = cc->sibling_next;
558     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
559     gpr_mu_unlock(&pc->child_list_mu);
560     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
561   }
562 
563   GPR_ASSERT(!c->destroy_called);
564   c->destroy_called = 1;
565   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
566                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
567   if (cancel) {
568     cancel_with_error(c, GRPC_ERROR_CANCELLED);
569   } else {
570     // Unset the call combiner cancellation closure.  This has the
571     // effect of scheduling the previously set cancellation closure, if
572     // any, so that it can release any internal references it may be
573     // holding to the call stack. Also flush the closures on exec_ctx so that
574     // filters that schedule cancel notification closures on exec_ctx do not
575     // need to take a ref of the call stack to guarantee closure liveness.
576     grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr);
577     grpc_core::ExecCtx::Get()->Flush();
578   }
579   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
580 }
581 
grpc_call_cancel(grpc_call * call,void * reserved)582 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
583   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
584   GPR_ASSERT(!reserved);
585   grpc_core::ExecCtx exec_ctx;
586   cancel_with_error(call, GRPC_ERROR_CANCELLED);
587   return GRPC_CALL_OK;
588 }
589 
590 // This is called via the call combiner to start sending a batch down
591 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error * ignored)592 static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
593   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
594   grpc_transport_stream_op_batch* batch =
595       static_cast<grpc_transport_stream_op_batch*>(arg);
596   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
597   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
598   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
599   elem->filter->start_transport_stream_op_batch(elem, batch);
600 }
601 
602 // start_batch_closure points to a caller-allocated closure to be used
603 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)604 static void execute_batch(grpc_call* call,
605                           grpc_transport_stream_op_batch* batch,
606                           grpc_closure* start_batch_closure) {
607   batch->handler_private.extra_arg = call;
608   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
609                     grpc_schedule_on_exec_ctx);
610   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
611                            GRPC_ERROR_NONE, "executing batch");
612 }
613 
grpc_call_get_peer(grpc_call * call)614 char* grpc_call_get_peer(grpc_call* call) {
615   char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
616   if (peer_string != nullptr) return gpr_strdup(peer_string);
617   peer_string = grpc_channel_get_target(call->channel);
618   if (peer_string != nullptr) return peer_string;
619   return gpr_strdup("unknown");
620 }
621 
grpc_call_from_top_element(grpc_call_element * elem)622 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
623   return CALL_FROM_TOP_ELEM(elem);
624 }
625 
626 /*******************************************************************************
627  * CANCELLATION
628  */
629 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)630 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
631                                              grpc_status_code status,
632                                              const char* description,
633                                              void* reserved) {
634   grpc_core::ExecCtx exec_ctx;
635   GRPC_API_TRACE(
636       "grpc_call_cancel_with_status("
637       "c=%p, status=%d, description=%s, reserved=%p)",
638       4, (c, (int)status, description, reserved));
639   GPR_ASSERT(reserved == nullptr);
640   cancel_with_status(c, status, description);
641   return GRPC_CALL_OK;
642 }
643 
644 typedef struct {
645   grpc_call* call;
646   grpc_closure start_batch;
647   grpc_closure finish_batch;
648 } cancel_state;
649 
650 // The on_complete callback used when sending a cancel_stream batch down
651 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error * error)652 static void done_termination(void* arg, grpc_error* error) {
653   cancel_state* state = static_cast<cancel_state*>(arg);
654   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
655                           "on_complete for cancel_stream op");
656   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
657   gpr_free(state);
658 }
659 
cancel_with_error(grpc_call * c,grpc_error * error)660 static void cancel_with_error(grpc_call* c, grpc_error* error) {
661   if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
662     GRPC_ERROR_UNREF(error);
663     return;
664   }
665   GRPC_CALL_INTERNAL_REF(c, "termination");
666   // Inform the call combiner of the cancellation, so that it can cancel
667   // any in-flight asynchronous actions that may be holding the call
668   // combiner.  This ensures that the cancel_stream batch can be sent
669   // down the filter stack in a timely manner.
670   grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
671   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
672   state->call = c;
673   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
674                     grpc_schedule_on_exec_ctx);
675   grpc_transport_stream_op_batch* op =
676       grpc_make_transport_stream_op(&state->finish_batch);
677   op->cancel_stream = true;
678   op->payload->cancel_stream.cancel_error = error;
679   execute_batch(c, op, &state->start_batch);
680 }
681 
error_from_status(grpc_status_code status,const char * description)682 static grpc_error* error_from_status(grpc_status_code status,
683                                      const char* description) {
684   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
685   // guarantee that can be short-lived.
686   return grpc_error_set_int(
687       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
688                          GRPC_ERROR_STR_GRPC_MESSAGE,
689                          grpc_slice_from_copied_string(description)),
690       GRPC_ERROR_INT_GRPC_STATUS, status);
691 }
692 
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)693 static void cancel_with_status(grpc_call* c, grpc_status_code status,
694                                const char* description) {
695   cancel_with_error(c, error_from_status(status, description));
696 }
697 
set_final_status(grpc_call * call,grpc_error * error)698 static void set_final_status(grpc_call* call, grpc_error* error) {
699   if (grpc_call_error_trace.enabled()) {
700     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
701     gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
702   }
703   if (call->is_client) {
704     grpc_error_get_status(error, call->send_deadline,
705                           call->final_op.client.status,
706                           call->final_op.client.status_details, nullptr,
707                           call->final_op.client.error_string);
708     // explicitly take a ref
709     grpc_slice_ref_internal(*call->final_op.client.status_details);
710     gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
711     grpc_core::channelz::ChannelNode* channelz_channel =
712         grpc_channel_get_channelz_node(call->channel);
713     if (channelz_channel != nullptr) {
714       if (*call->final_op.client.status != GRPC_STATUS_OK) {
715         channelz_channel->RecordCallFailed();
716       } else {
717         channelz_channel->RecordCallSucceeded();
718       }
719     }
720   } else {
721     *call->final_op.server.cancelled =
722         error != GRPC_ERROR_NONE ||
723         reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
724             GRPC_ERROR_NONE;
725     grpc_core::channelz::ServerNode* channelz_server =
726         grpc_server_get_channelz_node(call->final_op.server.server);
727     if (channelz_server != nullptr) {
728       if (*call->final_op.server.cancelled) {
729         channelz_server->RecordCallFailed();
730       } else {
731         channelz_server->RecordCallSucceeded();
732       }
733     }
734     GRPC_ERROR_UNREF(error);
735   }
736 }
737 
738 /*******************************************************************************
739  * COMPRESSION
740  */
741 
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)742 static void set_incoming_message_compression_algorithm(
743     grpc_call* call, grpc_message_compression_algorithm algo) {
744   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
745   call->incoming_message_compression_algorithm = algo;
746 }
747 
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)748 static void set_incoming_stream_compression_algorithm(
749     grpc_call* call, grpc_stream_compression_algorithm algo) {
750   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
751   call->incoming_stream_compression_algorithm = algo;
752 }
753 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)754 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
755     grpc_call* call) {
756   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
757   grpc_compression_algorithm_from_message_stream_compression_algorithm(
758       &algorithm, call->incoming_message_compression_algorithm,
759       call->incoming_stream_compression_algorithm);
760   return algorithm;
761 }
762 
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)763 static grpc_compression_algorithm compression_algorithm_for_level_locked(
764     grpc_call* call, grpc_compression_level level) {
765   return grpc_compression_algorithm_for_level(level,
766                                               call->encodings_accepted_by_peer);
767 }
768 
grpc_call_test_only_get_message_flags(grpc_call * call)769 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
770   uint32_t flags;
771   flags = call->test_only_last_message_flags;
772   return flags;
773 }
774 
destroy_encodings_accepted_by_peer(void * p)775 static void destroy_encodings_accepted_by_peer(void* p) { return; }
776 
set_encodings_accepted_by_peer(grpc_call * call,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)777 static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel,
778                                            uint32_t* encodings_accepted_by_peer,
779                                            bool stream_encoding) {
780   size_t i;
781   uint32_t algorithm;
782   grpc_slice_buffer accept_encoding_parts;
783   grpc_slice accept_encoding_slice;
784   void* accepted_user_data;
785 
786   accepted_user_data =
787       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
788   if (accepted_user_data != nullptr) {
789     *encodings_accepted_by_peer =
790         static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
791     return;
792   }
793 
794   *encodings_accepted_by_peer = 0;
795 
796   accept_encoding_slice = GRPC_MDVALUE(mdel);
797   grpc_slice_buffer_init(&accept_encoding_parts);
798   grpc_slice_split_without_space(accept_encoding_slice, ",",
799                                  &accept_encoding_parts);
800 
801   GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
802   for (i = 0; i < accept_encoding_parts.count; i++) {
803     int r;
804     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
805     if (!stream_encoding) {
806       r = grpc_message_compression_algorithm_parse(
807           accept_encoding_entry_slice,
808           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
809     } else {
810       r = grpc_stream_compression_algorithm_parse(
811           accept_encoding_entry_slice,
812           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
813     }
814     if (r) {
815       GPR_BITSET(encodings_accepted_by_peer, algorithm);
816     } else {
817       char* accept_encoding_entry_str =
818           grpc_slice_to_c_string(accept_encoding_entry_slice);
819       gpr_log(GPR_DEBUG,
820               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
821               accept_encoding_entry_str);
822       gpr_free(accept_encoding_entry_str);
823     }
824   }
825 
826   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
827 
828   grpc_mdelem_set_user_data(
829       mdel, destroy_encodings_accepted_by_peer,
830       (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
831 }
832 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)833 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
834   uint32_t encodings_accepted_by_peer;
835   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
836   return encodings_accepted_by_peer;
837 }
838 
839 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)840 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
841   return call->incoming_stream_compression_algorithm;
842 }
843 
linked_from_md(const grpc_metadata * md)844 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
845   return (grpc_linked_mdelem*)&md->internal_data;
846 }
847 
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)848 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
849                                   grpc_metadata* additional_metadata, int i,
850                                   int count) {
851   grpc_metadata* res =
852       i < count ? &metadata[i] : &additional_metadata[i - count];
853   GPR_ASSERT(res);
854   return res;
855 }
856 
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)857 static int prepare_application_metadata(grpc_call* call, int count,
858                                         grpc_metadata* metadata,
859                                         int is_trailing,
860                                         int prepend_extra_metadata,
861                                         grpc_metadata* additional_metadata,
862                                         int additional_metadata_count) {
863   int total_count = count + additional_metadata_count;
864   int i;
865   grpc_metadata_batch* batch =
866       &call->metadata_batch[0 /* is_receiving */][is_trailing];
867   for (i = 0; i < total_count; i++) {
868     const grpc_metadata* md =
869         get_md_elem(metadata, additional_metadata, i, count);
870     grpc_linked_mdelem* l = linked_from_md(md);
871     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
872     if (!GRPC_LOG_IF_ERROR("validate_metadata",
873                            grpc_validate_header_key_is_legal(md->key))) {
874       break;
875     } else if (!grpc_is_binary_header(md->key) &&
876                !GRPC_LOG_IF_ERROR(
877                    "validate_metadata",
878                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
879       break;
880     }
881     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
882   }
883   if (i != total_count) {
884     for (int j = 0; j < i; j++) {
885       const grpc_metadata* md =
886           get_md_elem(metadata, additional_metadata, j, count);
887       grpc_linked_mdelem* l = linked_from_md(md);
888       GRPC_MDELEM_UNREF(l->md);
889     }
890     return 0;
891   }
892   if (prepend_extra_metadata) {
893     if (call->send_extra_metadata_count == 0) {
894       prepend_extra_metadata = 0;
895     } else {
896       for (i = 0; i < call->send_extra_metadata_count; i++) {
897         GRPC_LOG_IF_ERROR("prepare_application_metadata",
898                           grpc_metadata_batch_link_tail(
899                               batch, &call->send_extra_metadata[i]));
900       }
901     }
902   }
903   for (i = 0; i < total_count; i++) {
904     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
905     grpc_linked_mdelem* l = linked_from_md(md);
906     grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
907     if (error != GRPC_ERROR_NONE) {
908       GRPC_MDELEM_UNREF(l->md);
909     }
910     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
911   }
912   call->send_extra_metadata_count = 0;
913 
914   return 1;
915 }
916 
decode_message_compression(grpc_mdelem md)917 static grpc_message_compression_algorithm decode_message_compression(
918     grpc_mdelem md) {
919   grpc_message_compression_algorithm algorithm =
920       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
921   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
922     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
923     gpr_log(GPR_ERROR,
924             "Invalid incoming message compression algorithm: '%s'. "
925             "Interpreting incoming data as uncompressed.",
926             md_c_str);
927     gpr_free(md_c_str);
928     return GRPC_MESSAGE_COMPRESS_NONE;
929   }
930   return algorithm;
931 }
932 
decode_stream_compression(grpc_mdelem md)933 static grpc_stream_compression_algorithm decode_stream_compression(
934     grpc_mdelem md) {
935   grpc_stream_compression_algorithm algorithm =
936       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
937   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
938     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
939     gpr_log(GPR_ERROR,
940             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
941             "incoming data as uncompressed.",
942             md_c_str);
943     gpr_free(md_c_str);
944     return GRPC_STREAM_COMPRESS_NONE;
945   }
946   return algorithm;
947 }
948 
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)949 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
950                                  int is_trailing) {
951   if (b->list.count == 0) return;
952   if (!call->is_client && is_trailing) return;
953   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
954   GPR_TIMER_SCOPE("publish_app_metadata", 0);
955   grpc_metadata_array* dest;
956   grpc_metadata* mdusr;
957   dest = call->buffered_metadata[is_trailing];
958   if (dest->count + b->list.count > dest->capacity) {
959     dest->capacity =
960         GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
961     dest->metadata = static_cast<grpc_metadata*>(
962         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
963   }
964   for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
965     mdusr = &dest->metadata[dest->count++];
966     /* we pass back borrowed slices that are valid whilst the call is valid */
967     mdusr->key = GRPC_MDKEY(l->md);
968     mdusr->value = GRPC_MDVALUE(l->md);
969   }
970 }
971 
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)972 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
973   if (b->idx.named.content_encoding != nullptr) {
974     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
975     set_incoming_stream_compression_algorithm(
976         call, decode_stream_compression(b->idx.named.content_encoding->md));
977     grpc_metadata_batch_remove(b, b->idx.named.content_encoding);
978   }
979   if (b->idx.named.grpc_encoding != nullptr) {
980     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
981     set_incoming_message_compression_algorithm(
982         call, decode_message_compression(b->idx.named.grpc_encoding->md));
983     grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
984   }
985   uint32_t message_encodings_accepted_by_peer = 1u;
986   uint32_t stream_encodings_accepted_by_peer = 1u;
987   if (b->idx.named.grpc_accept_encoding != nullptr) {
988     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
989     set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
990                                    &message_encodings_accepted_by_peer, false);
991     grpc_metadata_batch_remove(b, b->idx.named.grpc_accept_encoding);
992   }
993   if (b->idx.named.accept_encoding != nullptr) {
994     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
995     set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
996                                    &stream_encodings_accepted_by_peer, true);
997     grpc_metadata_batch_remove(b, b->idx.named.accept_encoding);
998   }
999   call->encodings_accepted_by_peer =
1000       grpc_compression_bitset_from_message_stream_compression_bitset(
1001           message_encodings_accepted_by_peer,
1002           stream_encodings_accepted_by_peer);
1003   publish_app_metadata(call, b, false);
1004 }
1005 
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1006 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1007                                  grpc_error* batch_error) {
1008   grpc_call* call = static_cast<grpc_call*>(args);
1009   if (batch_error != GRPC_ERROR_NONE) {
1010     set_final_status(call, batch_error);
1011   } else if (b->idx.named.grpc_status != nullptr) {
1012     grpc_status_code status_code =
1013         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1014     grpc_error* error = GRPC_ERROR_NONE;
1015     if (status_code != GRPC_STATUS_OK) {
1016       error = grpc_error_set_int(
1017           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"),
1018           GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1019     }
1020     if (b->idx.named.grpc_message != nullptr) {
1021       error = grpc_error_set_str(
1022           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1023           grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1024       grpc_metadata_batch_remove(b, b->idx.named.grpc_message);
1025     } else if (error != GRPC_ERROR_NONE) {
1026       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1027                                  grpc_empty_slice());
1028     }
1029     set_final_status(call, GRPC_ERROR_REF(error));
1030     grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
1031     GRPC_ERROR_UNREF(error);
1032   } else if (!call->is_client) {
1033     set_final_status(call, GRPC_ERROR_NONE);
1034   } else {
1035     gpr_log(GPR_DEBUG,
1036             "Received trailing metadata with no error and no status");
1037     set_final_status(
1038         call, grpc_error_set_int(
1039                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1040                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1041   }
1042   publish_app_metadata(call, b, true);
1043 }
1044 
grpc_call_get_arena(grpc_call * call)1045 gpr_arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1046 
grpc_call_get_call_stack(grpc_call * call)1047 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1048   return CALL_STACK_FROM_CALL(call);
1049 }
1050 
1051 /*******************************************************************************
1052  * BATCH API IMPLEMENTATION
1053  */
1054 
are_write_flags_valid(uint32_t flags)1055 static bool are_write_flags_valid(uint32_t flags) {
1056   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1057   const uint32_t allowed_write_positions =
1058       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1059   const uint32_t invalid_positions = ~allowed_write_positions;
1060   return !(flags & invalid_positions);
1061 }
1062 
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1063 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1064   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1065   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1066   if (!is_client) {
1067     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1068   }
1069   return !(flags & invalid_positions);
1070 }
1071 
batch_slot_for_op(grpc_op_type type)1072 static size_t batch_slot_for_op(grpc_op_type type) {
1073   switch (type) {
1074     case GRPC_OP_SEND_INITIAL_METADATA:
1075       return 0;
1076     case GRPC_OP_SEND_MESSAGE:
1077       return 1;
1078     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1079     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1080       return 2;
1081     case GRPC_OP_RECV_INITIAL_METADATA:
1082       return 3;
1083     case GRPC_OP_RECV_MESSAGE:
1084       return 4;
1085     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1086     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1087       return 5;
1088   }
1089   GPR_UNREACHABLE_CODE(return 123456789);
1090 }
1091 
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops,size_t num_ops)1092 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1093                                                       const grpc_op* ops,
1094                                                       size_t num_ops) {
1095   size_t slot_idx = batch_slot_for_op(ops[0].op);
1096   batch_control** pslot = &call->active_batches[slot_idx];
1097   batch_control* bctl;
1098   if (*pslot != nullptr) {
1099     bctl = *pslot;
1100     if (bctl->call != nullptr) {
1101       return nullptr;
1102     }
1103     memset(bctl, 0, sizeof(*bctl));
1104   } else {
1105     bctl = static_cast<batch_control*>(
1106         gpr_arena_alloc(call->arena, sizeof(batch_control)));
1107     *pslot = bctl;
1108   }
1109   bctl->call = call;
1110   bctl->op.payload = &call->stream_op_payload;
1111   return bctl;
1112 }
1113 
finish_batch_completion(void * user_data,grpc_cq_completion * storage)1114 static void finish_batch_completion(void* user_data,
1115                                     grpc_cq_completion* storage) {
1116   batch_control* bctl = static_cast<batch_control*>(user_data);
1117   grpc_call* call = bctl->call;
1118   bctl->call = nullptr;
1119   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1120 }
1121 
reset_batch_errors(batch_control * bctl)1122 static void reset_batch_errors(batch_control* bctl) {
1123   GRPC_ERROR_UNREF(
1124       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1125   gpr_atm_rel_store(&bctl->batch_error,
1126                     reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1127 }
1128 
post_batch_completion(batch_control * bctl)1129 static void post_batch_completion(batch_control* bctl) {
1130   grpc_call* next_child_call;
1131   grpc_call* call = bctl->call;
1132   grpc_error* error = GRPC_ERROR_REF(
1133       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1134 
1135   if (bctl->op.send_initial_metadata) {
1136     grpc_metadata_batch_destroy(
1137         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1138   }
1139   if (bctl->op.send_message) {
1140     call->sending_message = false;
1141   }
1142   if (bctl->op.send_trailing_metadata) {
1143     grpc_metadata_batch_destroy(
1144         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1145   }
1146   if (bctl->op.recv_trailing_metadata) {
1147     /* propagate cancellation to any interested children */
1148     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1149     parent_call* pc = get_parent_call(call);
1150     if (pc != nullptr) {
1151       grpc_call* child;
1152       gpr_mu_lock(&pc->child_list_mu);
1153       child = pc->first_child;
1154       if (child != nullptr) {
1155         do {
1156           next_child_call = child->child->sibling_next;
1157           if (child->cancellation_is_inherited) {
1158             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1159             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1160             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1161           }
1162           child = next_child_call;
1163         } while (child != pc->first_child);
1164       }
1165       gpr_mu_unlock(&pc->child_list_mu);
1166     }
1167     GRPC_ERROR_UNREF(error);
1168     error = GRPC_ERROR_NONE;
1169   }
1170   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1171       *call->receiving_buffer != nullptr) {
1172     grpc_byte_buffer_destroy(*call->receiving_buffer);
1173     *call->receiving_buffer = nullptr;
1174   }
1175   reset_batch_errors(bctl);
1176 
1177   if (bctl->completion_data.notify_tag.is_closure) {
1178     /* unrefs error */
1179     bctl->call = nullptr;
1180     /* This closure may be meant to be run within some combiner. Since we aren't
1181      * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
1182      * of GRPC_CLOSURE_RUN.
1183      */
1184     GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
1185                        error);
1186     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1187   } else {
1188     /* unrefs error */
1189     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1190                    finish_batch_completion, bctl,
1191                    &bctl->completion_data.cq_completion);
1192   }
1193 }
1194 
finish_batch_step(batch_control * bctl)1195 static void finish_batch_step(batch_control* bctl) {
1196   if (gpr_unref(&bctl->steps_to_complete)) {
1197     post_batch_completion(bctl);
1198   }
1199 }
1200 
continue_receiving_slices(batch_control * bctl)1201 static void continue_receiving_slices(batch_control* bctl) {
1202   grpc_error* error;
1203   grpc_call* call = bctl->call;
1204   for (;;) {
1205     size_t remaining = call->receiving_stream->length() -
1206                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1207     if (remaining == 0) {
1208       call->receiving_message = 0;
1209       call->receiving_stream.reset();
1210       finish_batch_step(bctl);
1211       return;
1212     }
1213     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1214       error = call->receiving_stream->Pull(&call->receiving_slice);
1215       if (error == GRPC_ERROR_NONE) {
1216         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1217                               call->receiving_slice);
1218       } else {
1219         call->receiving_stream.reset();
1220         grpc_byte_buffer_destroy(*call->receiving_buffer);
1221         *call->receiving_buffer = nullptr;
1222         call->receiving_message = 0;
1223         finish_batch_step(bctl);
1224         return;
1225       }
1226     } else {
1227       return;
1228     }
1229   }
1230 }
1231 
receiving_slice_ready(void * bctlp,grpc_error * error)1232 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1233   batch_control* bctl = static_cast<batch_control*>(bctlp);
1234   grpc_call* call = bctl->call;
1235   bool release_error = false;
1236 
1237   if (error == GRPC_ERROR_NONE) {
1238     grpc_slice slice;
1239     error = call->receiving_stream->Pull(&slice);
1240     if (error == GRPC_ERROR_NONE) {
1241       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1242                             slice);
1243       continue_receiving_slices(bctl);
1244     } else {
1245       /* Error returned by ByteStream::Pull() needs to be released manually */
1246       release_error = true;
1247     }
1248   }
1249 
1250   if (error != GRPC_ERROR_NONE) {
1251     if (grpc_trace_operation_failures.enabled()) {
1252       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1253     }
1254     call->receiving_stream.reset();
1255     grpc_byte_buffer_destroy(*call->receiving_buffer);
1256     *call->receiving_buffer = nullptr;
1257     call->receiving_message = 0;
1258     finish_batch_step(bctl);
1259     if (release_error) {
1260       GRPC_ERROR_UNREF(error);
1261     }
1262   }
1263 }
1264 
process_data_after_md(batch_control * bctl)1265 static void process_data_after_md(batch_control* bctl) {
1266   grpc_call* call = bctl->call;
1267   if (call->receiving_stream == nullptr) {
1268     *call->receiving_buffer = nullptr;
1269     call->receiving_message = 0;
1270     finish_batch_step(bctl);
1271   } else {
1272     call->test_only_last_message_flags = call->receiving_stream->flags();
1273     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1274         (call->incoming_message_compression_algorithm >
1275          GRPC_MESSAGE_COMPRESS_NONE)) {
1276       grpc_compression_algorithm algo;
1277       GPR_ASSERT(
1278           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1279               &algo, call->incoming_message_compression_algorithm,
1280               (grpc_stream_compression_algorithm)0));
1281       *call->receiving_buffer =
1282           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1283     } else {
1284       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1285     }
1286     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1287                       grpc_schedule_on_exec_ctx);
1288     continue_receiving_slices(bctl);
1289   }
1290 }
1291 
receiving_stream_ready(void * bctlp,grpc_error * error)1292 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1293   batch_control* bctl = static_cast<batch_control*>(bctlp);
1294   grpc_call* call = bctl->call;
1295   if (error != GRPC_ERROR_NONE) {
1296     call->receiving_stream.reset();
1297     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1298         GRPC_ERROR_NONE) {
1299       gpr_atm_rel_store(&bctl->batch_error,
1300                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1301     }
1302     cancel_with_error(call, GRPC_ERROR_REF(error));
1303   }
1304   /* If recv_state is RECV_NONE, we will save the batch_control
1305    * object with rel_cas, and will not use it after the cas. Its corresponding
1306    * acq_load is in receiving_initial_metadata_ready() */
1307   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1308       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1309     process_data_after_md(bctl);
1310   }
1311 }
1312 
1313 // The recv_message_ready callback used when sending a batch containing
1314 // a recv_message op down the filter stack.  Yields the call combiner
1315 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1316 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1317                                                     grpc_error* error) {
1318   batch_control* bctl = static_cast<batch_control*>(bctlp);
1319   grpc_call* call = bctl->call;
1320   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1321   receiving_stream_ready(bctlp, error);
1322 }
1323 
validate_filtered_metadata(batch_control * bctl)1324 static void validate_filtered_metadata(batch_control* bctl) {
1325   grpc_compression_algorithm compression_algorithm;
1326   grpc_call* call = bctl->call;
1327   if (call->incoming_stream_compression_algorithm !=
1328           GRPC_STREAM_COMPRESS_NONE &&
1329       call->incoming_message_compression_algorithm !=
1330           GRPC_MESSAGE_COMPRESS_NONE) {
1331     char* error_msg = nullptr;
1332     gpr_asprintf(&error_msg,
1333                  "Incoming stream has both stream compression (%d) and message "
1334                  "compression (%d).",
1335                  call->incoming_stream_compression_algorithm,
1336                  call->incoming_message_compression_algorithm);
1337     gpr_log(GPR_ERROR, "%s", error_msg);
1338     cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1339     gpr_free(error_msg);
1340   } else if (
1341       grpc_compression_algorithm_from_message_stream_compression_algorithm(
1342           &compression_algorithm, call->incoming_message_compression_algorithm,
1343           call->incoming_stream_compression_algorithm) == 0) {
1344     char* error_msg = nullptr;
1345     gpr_asprintf(&error_msg,
1346                  "Error in incoming message compression (%d) or stream "
1347                  "compression (%d).",
1348                  call->incoming_stream_compression_algorithm,
1349                  call->incoming_message_compression_algorithm);
1350     cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1351     gpr_free(error_msg);
1352   } else {
1353     char* error_msg = nullptr;
1354     const grpc_compression_options compression_options =
1355         grpc_channel_compression_options(call->channel);
1356     if (compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
1357       gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1358                    compression_algorithm);
1359       gpr_log(GPR_ERROR, "%s", error_msg);
1360       cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1361     } else if (grpc_compression_options_is_algorithm_enabled(
1362                    &compression_options, compression_algorithm) == 0) {
1363       /* check if algorithm is supported by current channel config */
1364       const char* algo_name = nullptr;
1365       grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1366       gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1367                    algo_name);
1368       gpr_log(GPR_ERROR, "%s", error_msg);
1369       cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1370     }
1371     gpr_free(error_msg);
1372 
1373     GPR_ASSERT(call->encodings_accepted_by_peer != 0);
1374     if (!GPR_BITGET(call->encodings_accepted_by_peer, compression_algorithm)) {
1375       if (grpc_compression_trace.enabled()) {
1376         const char* algo_name = nullptr;
1377         grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1378         gpr_log(GPR_ERROR,
1379                 "Compression algorithm ('%s') not present in the bitset of "
1380                 "accepted encodings ('0x%x')",
1381                 algo_name, call->encodings_accepted_by_peer);
1382       }
1383     }
1384   }
1385 }
1386 
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1387 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1388   batch_control* bctl = static_cast<batch_control*>(bctlp);
1389   grpc_call* call = bctl->call;
1390 
1391   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1392 
1393   if (error == GRPC_ERROR_NONE) {
1394     grpc_metadata_batch* md =
1395         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1396     recv_initial_filter(call, md);
1397 
1398     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1399     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1400     validate_filtered_metadata(bctl);
1401 
1402     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1403       call->send_deadline = md->deadline;
1404     }
1405   } else {
1406     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1407         GRPC_ERROR_NONE) {
1408       gpr_atm_rel_store(&bctl->batch_error,
1409                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1410     }
1411     cancel_with_error(call, GRPC_ERROR_REF(error));
1412   }
1413 
1414   grpc_closure* saved_rsr_closure = nullptr;
1415   while (true) {
1416     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1417     /* Should only receive initial metadata once */
1418     GPR_ASSERT(rsr_bctlp != 1);
1419     if (rsr_bctlp == 0) {
1420       /* We haven't seen initial metadata and messages before, thus initial
1421        * metadata is received first.
1422        * no_barrier_cas is used, as this function won't access the batch_control
1423        * object saved by receiving_stream_ready() if the initial metadata is
1424        * received first. */
1425       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1426                                  RECV_INITIAL_METADATA_FIRST)) {
1427         break;
1428       }
1429     } else {
1430       /* Already received messages */
1431       saved_rsr_closure =
1432           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1433                               grpc_schedule_on_exec_ctx);
1434       /* No need to modify recv_state */
1435       break;
1436     }
1437   }
1438   if (saved_rsr_closure != nullptr) {
1439     GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
1440   }
1441 
1442   finish_batch_step(bctl);
1443 }
1444 
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1445 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1446   batch_control* bctl = static_cast<batch_control*>(bctlp);
1447   grpc_call* call = bctl->call;
1448   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1449   grpc_metadata_batch* md =
1450       &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1451   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1452   finish_batch_step(bctl);
1453 }
1454 
finish_batch(void * bctlp,grpc_error * error)1455 static void finish_batch(void* bctlp, grpc_error* error) {
1456   batch_control* bctl = static_cast<batch_control*>(bctlp);
1457   grpc_call* call = bctl->call;
1458   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1459   if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1460       GRPC_ERROR_NONE) {
1461     gpr_atm_rel_store(&bctl->batch_error,
1462                       reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1463   }
1464   if (error != GRPC_ERROR_NONE) {
1465     cancel_with_error(call, GRPC_ERROR_REF(error));
1466   }
1467   finish_batch_step(bctl);
1468 }
1469 
free_no_op_completion(void * p,grpc_cq_completion * completion)1470 static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
1471   gpr_free(completion);
1472 }
1473 
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1474 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1475                                         size_t nops, void* notify_tag,
1476                                         int is_notify_tag_closure) {
1477   GPR_TIMER_SCOPE("call_start_batch", 0);
1478 
1479   size_t i;
1480   const grpc_op* op;
1481   batch_control* bctl;
1482   bool has_send_ops = false;
1483   int num_recv_ops = 0;
1484   grpc_call_error error = GRPC_CALL_OK;
1485   grpc_transport_stream_op_batch* stream_op;
1486   grpc_transport_stream_op_batch_payload* stream_op_payload;
1487 
1488   GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
1489 
1490   if (nops == 0) {
1491     if (!is_notify_tag_closure) {
1492       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1493       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1494                      free_no_op_completion, nullptr,
1495                      static_cast<grpc_cq_completion*>(
1496                          gpr_malloc(sizeof(grpc_cq_completion))));
1497     } else {
1498       GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
1499     }
1500     error = GRPC_CALL_OK;
1501     goto done;
1502   }
1503 
1504   bctl = reuse_or_allocate_batch_control(call, ops, nops);
1505   if (bctl == nullptr) {
1506     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1507   }
1508   bctl->completion_data.notify_tag.tag = notify_tag;
1509   bctl->completion_data.notify_tag.is_closure =
1510       static_cast<uint8_t>(is_notify_tag_closure != 0);
1511 
1512   stream_op = &bctl->op;
1513   stream_op_payload = &call->stream_op_payload;
1514 
1515   /* rewrite batch ops into a transport op */
1516   for (i = 0; i < nops; i++) {
1517     op = &ops[i];
1518     if (op->reserved != nullptr) {
1519       error = GRPC_CALL_ERROR;
1520       goto done_with_error;
1521     }
1522     switch (op->op) {
1523       case GRPC_OP_SEND_INITIAL_METADATA: {
1524         /* Flag validation: currently allow no flags */
1525         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1526           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1527           goto done_with_error;
1528         }
1529         if (call->sent_initial_metadata) {
1530           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1531           goto done_with_error;
1532         }
1533         /* process compression level */
1534         memset(&call->compression_md, 0, sizeof(call->compression_md));
1535         size_t additional_metadata_count = 0;
1536         grpc_compression_level effective_compression_level =
1537             GRPC_COMPRESS_LEVEL_NONE;
1538         bool level_set = false;
1539         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1540           effective_compression_level =
1541               op->data.send_initial_metadata.maybe_compression_level.level;
1542           level_set = true;
1543         } else {
1544           const grpc_compression_options copts =
1545               grpc_channel_compression_options(call->channel);
1546           if (copts.default_level.is_set) {
1547             level_set = true;
1548             effective_compression_level = copts.default_level.level;
1549           }
1550         }
1551         if (level_set && !call->is_client) {
1552           const grpc_compression_algorithm calgo =
1553               compression_algorithm_for_level_locked(
1554                   call, effective_compression_level);
1555           /* the following will be picked up by the compress filter and used
1556            * as the call's compression algorithm. */
1557           call->compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1558           call->compression_md.value = grpc_compression_algorithm_slice(calgo);
1559           additional_metadata_count++;
1560         }
1561 
1562         if (op->data.send_initial_metadata.count + additional_metadata_count >
1563             INT_MAX) {
1564           error = GRPC_CALL_ERROR_INVALID_METADATA;
1565           goto done_with_error;
1566         }
1567         stream_op->send_initial_metadata = true;
1568         call->sent_initial_metadata = true;
1569         if (!prepare_application_metadata(
1570                 call, static_cast<int>(op->data.send_initial_metadata.count),
1571                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1572                 &call->compression_md,
1573                 static_cast<int>(additional_metadata_count))) {
1574           error = GRPC_CALL_ERROR_INVALID_METADATA;
1575           goto done_with_error;
1576         }
1577         /* TODO(ctiller): just make these the same variable? */
1578         if (call->is_client) {
1579           call->metadata_batch[0][0].deadline = call->send_deadline;
1580         }
1581         stream_op_payload->send_initial_metadata.send_initial_metadata =
1582             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1583         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1584             op->flags;
1585         if (call->is_client) {
1586           stream_op_payload->send_initial_metadata.peer_string =
1587               &call->peer_string;
1588         }
1589         has_send_ops = true;
1590         break;
1591       }
1592       case GRPC_OP_SEND_MESSAGE: {
1593         if (!are_write_flags_valid(op->flags)) {
1594           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1595           goto done_with_error;
1596         }
1597         if (op->data.send_message.send_message == nullptr) {
1598           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1599           goto done_with_error;
1600         }
1601         if (call->sending_message) {
1602           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1603           goto done_with_error;
1604         }
1605         uint32_t flags = op->flags;
1606         /* If the outgoing buffer is already compressed, mark it as so in the
1607            flags. These will be picked up by the compression filter and further
1608            (wasteful) attempts at compression skipped. */
1609         if (op->data.send_message.send_message->data.raw.compression >
1610             GRPC_COMPRESS_NONE) {
1611           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1612         }
1613         stream_op->send_message = true;
1614         call->sending_message = true;
1615         call->sending_stream.Init(
1616             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1617         stream_op_payload->send_message.send_message.reset(
1618             call->sending_stream.get());
1619         has_send_ops = true;
1620         break;
1621       }
1622       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1623         /* Flag validation: currently allow no flags */
1624         if (op->flags != 0) {
1625           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1626           goto done_with_error;
1627         }
1628         if (!call->is_client) {
1629           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1630           goto done_with_error;
1631         }
1632         if (call->sent_final_op) {
1633           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1634           goto done_with_error;
1635         }
1636         stream_op->send_trailing_metadata = true;
1637         call->sent_final_op = true;
1638         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1639             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1640         has_send_ops = true;
1641         break;
1642       }
1643       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1644         /* Flag validation: currently allow no flags */
1645         if (op->flags != 0) {
1646           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1647           goto done_with_error;
1648         }
1649         if (call->is_client) {
1650           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1651           goto done_with_error;
1652         }
1653         if (call->sent_final_op) {
1654           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1655           goto done_with_error;
1656         }
1657         if (op->data.send_status_from_server.trailing_metadata_count >
1658             INT_MAX) {
1659           error = GRPC_CALL_ERROR_INVALID_METADATA;
1660           goto done_with_error;
1661         }
1662         stream_op->send_trailing_metadata = true;
1663         call->sent_final_op = true;
1664         GPR_ASSERT(call->send_extra_metadata_count == 0);
1665         call->send_extra_metadata_count = 1;
1666         call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
1667             call->channel, op->data.send_status_from_server.status);
1668         grpc_error* status_error =
1669             op->data.send_status_from_server.status == GRPC_STATUS_OK
1670                 ? GRPC_ERROR_NONE
1671                 : grpc_error_set_int(
1672                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1673                           "Server returned error"),
1674                       GRPC_ERROR_INT_GRPC_STATUS,
1675                       static_cast<intptr_t>(
1676                           op->data.send_status_from_server.status));
1677         if (op->data.send_status_from_server.status_details != nullptr) {
1678           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1679               GRPC_MDSTR_GRPC_MESSAGE,
1680               grpc_slice_ref_internal(
1681                   *op->data.send_status_from_server.status_details));
1682           call->send_extra_metadata_count++;
1683           if (status_error != GRPC_ERROR_NONE) {
1684             char* msg = grpc_slice_to_c_string(
1685                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1686             status_error =
1687                 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1688                                    grpc_slice_from_copied_string(msg));
1689             gpr_free(msg);
1690           }
1691         }
1692 
1693         gpr_atm_rel_store(&call->status_error,
1694                           reinterpret_cast<gpr_atm>(status_error));
1695         if (!prepare_application_metadata(
1696                 call,
1697                 static_cast<int>(
1698                     op->data.send_status_from_server.trailing_metadata_count),
1699                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1700                 nullptr, 0)) {
1701           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1702             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1703           }
1704           call->send_extra_metadata_count = 0;
1705           error = GRPC_CALL_ERROR_INVALID_METADATA;
1706           goto done_with_error;
1707         }
1708         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1709             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1710         has_send_ops = true;
1711         break;
1712       }
1713       case GRPC_OP_RECV_INITIAL_METADATA: {
1714         /* Flag validation: currently allow no flags */
1715         if (op->flags != 0) {
1716           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1717           goto done_with_error;
1718         }
1719         if (call->received_initial_metadata) {
1720           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1721           goto done_with_error;
1722         }
1723         call->received_initial_metadata = true;
1724         call->buffered_metadata[0] =
1725             op->data.recv_initial_metadata.recv_initial_metadata;
1726         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1727                           receiving_initial_metadata_ready, bctl,
1728                           grpc_schedule_on_exec_ctx);
1729         stream_op->recv_initial_metadata = true;
1730         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1731             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1732         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1733             &call->receiving_initial_metadata_ready;
1734         if (!call->is_client) {
1735           stream_op_payload->recv_initial_metadata.peer_string =
1736               &call->peer_string;
1737         }
1738         ++num_recv_ops;
1739         break;
1740       }
1741       case GRPC_OP_RECV_MESSAGE: {
1742         /* Flag validation: currently allow no flags */
1743         if (op->flags != 0) {
1744           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1745           goto done_with_error;
1746         }
1747         if (call->receiving_message) {
1748           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1749           goto done_with_error;
1750         }
1751         call->receiving_message = true;
1752         stream_op->recv_message = true;
1753         call->receiving_buffer = op->data.recv_message.recv_message;
1754         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1755         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1756                           receiving_stream_ready_in_call_combiner, bctl,
1757                           grpc_schedule_on_exec_ctx);
1758         stream_op_payload->recv_message.recv_message_ready =
1759             &call->receiving_stream_ready;
1760         ++num_recv_ops;
1761         break;
1762       }
1763       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1764         /* Flag validation: currently allow no flags */
1765         if (op->flags != 0) {
1766           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1767           goto done_with_error;
1768         }
1769         if (!call->is_client) {
1770           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1771           goto done_with_error;
1772         }
1773         if (call->requested_final_op) {
1774           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1775           goto done_with_error;
1776         }
1777         call->requested_final_op = true;
1778         call->buffered_metadata[1] =
1779             op->data.recv_status_on_client.trailing_metadata;
1780         call->final_op.client.status = op->data.recv_status_on_client.status;
1781         call->final_op.client.status_details =
1782             op->data.recv_status_on_client.status_details;
1783         call->final_op.client.error_string =
1784             op->data.recv_status_on_client.error_string;
1785         stream_op->recv_trailing_metadata = true;
1786         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1787             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1788         stream_op_payload->recv_trailing_metadata.collect_stats =
1789             &call->final_info.stats.transport_stream_stats;
1790         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1791                           receiving_trailing_metadata_ready, bctl,
1792                           grpc_schedule_on_exec_ctx);
1793         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1794             &call->receiving_trailing_metadata_ready;
1795         ++num_recv_ops;
1796         break;
1797       }
1798       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1799         /* Flag validation: currently allow no flags */
1800         if (op->flags != 0) {
1801           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1802           goto done_with_error;
1803         }
1804         if (call->is_client) {
1805           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1806           goto done_with_error;
1807         }
1808         if (call->requested_final_op) {
1809           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1810           goto done_with_error;
1811         }
1812         call->requested_final_op = true;
1813         call->final_op.server.cancelled =
1814             op->data.recv_close_on_server.cancelled;
1815         stream_op->recv_trailing_metadata = true;
1816         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1817             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1818         stream_op_payload->recv_trailing_metadata.collect_stats =
1819             &call->final_info.stats.transport_stream_stats;
1820         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1821                           receiving_trailing_metadata_ready, bctl,
1822                           grpc_schedule_on_exec_ctx);
1823         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1824             &call->receiving_trailing_metadata_ready;
1825         ++num_recv_ops;
1826         break;
1827       }
1828     }
1829   }
1830 
1831   GRPC_CALL_INTERNAL_REF(call, "completion");
1832   if (!is_notify_tag_closure) {
1833     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1834   }
1835   gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
1836 
1837   if (has_send_ops) {
1838     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1839                       grpc_schedule_on_exec_ctx);
1840     stream_op->on_complete = &bctl->finish_batch;
1841   }
1842 
1843   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1844   execute_batch(call, stream_op, &bctl->start_batch);
1845 
1846 done:
1847   return error;
1848 
1849 done_with_error:
1850   /* reverse any mutations that occurred */
1851   if (stream_op->send_initial_metadata) {
1852     call->sent_initial_metadata = false;
1853     grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1854   }
1855   if (stream_op->send_message) {
1856     call->sending_message = false;
1857     call->sending_stream->Orphan();
1858   }
1859   if (stream_op->send_trailing_metadata) {
1860     call->sent_final_op = false;
1861     grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1862   }
1863   if (stream_op->recv_initial_metadata) {
1864     call->received_initial_metadata = false;
1865   }
1866   if (stream_op->recv_message) {
1867     call->receiving_message = false;
1868   }
1869   if (stream_op->recv_trailing_metadata) {
1870     call->requested_final_op = false;
1871   }
1872   goto done;
1873 }
1874 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1875 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1876                                       size_t nops, void* tag, void* reserved) {
1877   grpc_core::ExecCtx exec_ctx;
1878   grpc_call_error err;
1879 
1880   GRPC_API_TRACE(
1881       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1882       "reserved=%p)",
1883       5, (call, ops, (unsigned long)nops, tag, reserved));
1884 
1885   if (reserved != nullptr) {
1886     err = GRPC_CALL_ERROR;
1887   } else {
1888     err = call_start_batch(call, ops, nops, tag, 0);
1889   }
1890 
1891   return err;
1892 }
1893 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1894 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1895                                                   const grpc_op* ops,
1896                                                   size_t nops,
1897                                                   grpc_closure* closure) {
1898   return call_start_batch(call, ops, nops, closure, 1);
1899 }
1900 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1901 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1902                            void* value, void (*destroy)(void* value)) {
1903   if (call->context[elem].destroy) {
1904     call->context[elem].destroy(call->context[elem].value);
1905   }
1906   call->context[elem].value = value;
1907   call->context[elem].destroy = destroy;
1908 }
1909 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1910 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1911   return call->context[elem].value;
1912 }
1913 
grpc_call_is_client(grpc_call * call)1914 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
1915 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)1916 grpc_compression_algorithm grpc_call_compression_for_level(
1917     grpc_call* call, grpc_compression_level level) {
1918   grpc_compression_algorithm algo =
1919       compression_algorithm_for_level_locked(call, level);
1920   return algo;
1921 }
1922 
grpc_call_error_to_string(grpc_call_error error)1923 const char* grpc_call_error_to_string(grpc_call_error error) {
1924   switch (error) {
1925     case GRPC_CALL_ERROR:
1926       return "GRPC_CALL_ERROR";
1927     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
1928       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
1929     case GRPC_CALL_ERROR_ALREADY_FINISHED:
1930       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
1931     case GRPC_CALL_ERROR_ALREADY_INVOKED:
1932       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
1933     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
1934       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
1935     case GRPC_CALL_ERROR_INVALID_FLAGS:
1936       return "GRPC_CALL_ERROR_INVALID_FLAGS";
1937     case GRPC_CALL_ERROR_INVALID_MESSAGE:
1938       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
1939     case GRPC_CALL_ERROR_INVALID_METADATA:
1940       return "GRPC_CALL_ERROR_INVALID_METADATA";
1941     case GRPC_CALL_ERROR_NOT_INVOKED:
1942       return "GRPC_CALL_ERROR_NOT_INVOKED";
1943     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
1944       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
1945     case GRPC_CALL_ERROR_NOT_ON_SERVER:
1946       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
1947     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
1948       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
1949     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
1950       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
1951     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
1952       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
1953     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
1954       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
1955     case GRPC_CALL_OK:
1956       return "GRPC_CALL_OK";
1957   }
1958   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
1959 }
1960