1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/method_params.h"
38 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
40 #include "src/core/ext/filters/client_channel/retry_throttle.h"
41 #include "src/core/ext/filters/client_channel/subchannel.h"
42 #include "src/core/ext/filters/deadline/deadline_filter.h"
43 #include "src/core/lib/backoff/backoff.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/channel/connected_channel.h"
46 #include "src/core/lib/channel/status_util.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/inlined_vector.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/combiner.h"
51 #include "src/core/lib/iomgr/iomgr.h"
52 #include "src/core/lib/iomgr/polling_entity.h"
53 #include "src/core/lib/profiling/timers.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/slice/slice_string_helpers.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/transport/connectivity_state.h"
58 #include "src/core/lib/transport/error_utils.h"
59 #include "src/core/lib/transport/metadata.h"
60 #include "src/core/lib/transport/metadata_batch.h"
61 #include "src/core/lib/transport/service_config.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 
65 using grpc_core::internal::ClientChannelMethodParams;
66 using grpc_core::internal::ServerRetryThrottleData;
67 
68 /* Client channel implementation */
69 
70 // By default, we buffer 256 KiB per RPC for retries.
71 // TODO(roth): Do we have any data to suggest a better value?
72 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
73 
74 // This value was picked arbitrarily.  It can be changed if there is
75 // any even moderately compelling reason to do so.
76 #define RETRY_BACKOFF_JITTER 0.2
77 
78 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
79 
80 /*************************************************************************
81  * CHANNEL-WIDE FUNCTIONS
82  */
83 
84 struct external_connectivity_watcher;
85 
86 typedef grpc_core::SliceHashTable<
87     grpc_core::RefCountedPtr<ClientChannelMethodParams>>
88     MethodParamsTable;
89 
90 typedef struct client_channel_channel_data {
91   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
92   bool started_resolving;
93   bool deadline_checking_enabled;
94   grpc_client_channel_factory* client_channel_factory;
95   bool enable_retries;
96   size_t per_rpc_retry_buffer_size;
97 
98   /** combiner protecting all variables below in this data structure */
99   grpc_combiner* combiner;
100   /** currently active load balancer */
101   grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
102   /** retry throttle data */
103   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
104   /** maps method names to method_parameters structs */
105   grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
106   /** incoming resolver result - set by resolver.next() */
107   grpc_channel_args* resolver_result;
108   /** a list of closures that are all waiting for resolver result to come in */
109   grpc_closure_list waiting_for_resolver_result_closures;
110   /** resolver callback */
111   grpc_closure on_resolver_result_changed;
112   /** connectivity state being tracked */
113   grpc_connectivity_state_tracker state_tracker;
114   /** when an lb_policy arrives, should we try to exit idle */
115   bool exit_idle_when_lb_policy_arrives;
116   /** owning stack */
117   grpc_channel_stack* owning_stack;
118   /** interested parties (owned) */
119   grpc_pollset_set* interested_parties;
120 
121   /* external_connectivity_watcher_list head is guarded by its own mutex, since
122    * counts need to be grabbed immediately without polling on a cq */
123   gpr_mu external_connectivity_watcher_list_mu;
124   struct external_connectivity_watcher* external_connectivity_watcher_list_head;
125 
126   /* the following properties are guarded by a mutex since APIs require them
127      to be instantaneously available */
128   gpr_mu info_mu;
129   grpc_core::UniquePtr<char> info_lb_policy_name;
130   /** service config in JSON form */
131   grpc_core::UniquePtr<char> info_service_config_json;
132 } channel_data;
133 
134 typedef struct {
135   channel_data* chand;
136   /** used as an identifier, don't dereference it because the LB policy may be
137    * non-existing when the callback is run */
138   grpc_core::LoadBalancingPolicy* lb_policy;
139   grpc_closure closure;
140 } reresolution_request_args;
141 
142 /** We create one watcher for each new lb_policy that is returned from a
143     resolver, to watch for state changes from the lb_policy. When a state
144     change is seen, we update the channel, and create a new watcher. */
145 typedef struct {
146   channel_data* chand;
147   grpc_closure on_changed;
148   grpc_connectivity_state state;
149   grpc_core::LoadBalancingPolicy* lb_policy;
150 } lb_policy_connectivity_watcher;
151 
152 static void watch_lb_policy_locked(channel_data* chand,
153                                    grpc_core::LoadBalancingPolicy* lb_policy,
154                                    grpc_connectivity_state current_state);
155 
set_channel_connectivity_state_locked(channel_data * chand,grpc_connectivity_state state,grpc_error * error,const char * reason)156 static void set_channel_connectivity_state_locked(channel_data* chand,
157                                                   grpc_connectivity_state state,
158                                                   grpc_error* error,
159                                                   const char* reason) {
160   /* TODO: Improve failure handling:
161    * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
162    * - Hand over pending picks from old policies during the switch that happens
163    *   when resolver provides an update. */
164   if (chand->lb_policy != nullptr) {
165     if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
166       /* cancel picks with wait_for_ready=false */
167       chand->lb_policy->CancelMatchingPicksLocked(
168           /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
169           /* check= */ 0, GRPC_ERROR_REF(error));
170     } else if (state == GRPC_CHANNEL_SHUTDOWN) {
171       /* cancel all picks */
172       chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
173                                                   GRPC_ERROR_REF(error));
174     }
175   }
176   if (grpc_client_channel_trace.enabled()) {
177     gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
178             grpc_connectivity_state_name(state));
179   }
180   grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
181 }
182 
on_lb_policy_state_changed_locked(void * arg,grpc_error * error)183 static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
184   lb_policy_connectivity_watcher* w =
185       static_cast<lb_policy_connectivity_watcher*>(arg);
186   /* check if the notification is for the latest policy */
187   if (w->lb_policy == w->chand->lb_policy.get()) {
188     if (grpc_client_channel_trace.enabled()) {
189       gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
190               w->lb_policy, grpc_connectivity_state_name(w->state));
191     }
192     set_channel_connectivity_state_locked(w->chand, w->state,
193                                           GRPC_ERROR_REF(error), "lb_changed");
194     if (w->state != GRPC_CHANNEL_SHUTDOWN) {
195       watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
196     }
197   }
198   GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
199   gpr_free(w);
200 }
201 
watch_lb_policy_locked(channel_data * chand,grpc_core::LoadBalancingPolicy * lb_policy,grpc_connectivity_state current_state)202 static void watch_lb_policy_locked(channel_data* chand,
203                                    grpc_core::LoadBalancingPolicy* lb_policy,
204                                    grpc_connectivity_state current_state) {
205   lb_policy_connectivity_watcher* w =
206       static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
207   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
208   w->chand = chand;
209   GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
210                     grpc_combiner_scheduler(chand->combiner));
211   w->state = current_state;
212   w->lb_policy = lb_policy;
213   lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
214 }
215 
start_resolving_locked(channel_data * chand)216 static void start_resolving_locked(channel_data* chand) {
217   if (grpc_client_channel_trace.enabled()) {
218     gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
219   }
220   GPR_ASSERT(!chand->started_resolving);
221   chand->started_resolving = true;
222   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
223   chand->resolver->NextLocked(&chand->resolver_result,
224                               &chand->on_resolver_result_changed);
225 }
226 
227 typedef struct {
228   char* server_name;
229   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
230 } service_config_parsing_state;
231 
parse_retry_throttle_params(const grpc_json * field,service_config_parsing_state * parsing_state)232 static void parse_retry_throttle_params(
233     const grpc_json* field, service_config_parsing_state* parsing_state) {
234   if (strcmp(field->key, "retryThrottling") == 0) {
235     if (parsing_state->retry_throttle_data != nullptr) return;  // Duplicate.
236     if (field->type != GRPC_JSON_OBJECT) return;
237     int max_milli_tokens = 0;
238     int milli_token_ratio = 0;
239     for (grpc_json* sub_field = field->child; sub_field != nullptr;
240          sub_field = sub_field->next) {
241       if (sub_field->key == nullptr) return;
242       if (strcmp(sub_field->key, "maxTokens") == 0) {
243         if (max_milli_tokens != 0) return;  // Duplicate.
244         if (sub_field->type != GRPC_JSON_NUMBER) return;
245         max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
246         if (max_milli_tokens == -1) return;
247         max_milli_tokens *= 1000;
248       } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
249         if (milli_token_ratio != 0) return;  // Duplicate.
250         if (sub_field->type != GRPC_JSON_NUMBER) return;
251         // We support up to 3 decimal digits.
252         size_t whole_len = strlen(sub_field->value);
253         uint32_t multiplier = 1;
254         uint32_t decimal_value = 0;
255         const char* decimal_point = strchr(sub_field->value, '.');
256         if (decimal_point != nullptr) {
257           whole_len = static_cast<size_t>(decimal_point - sub_field->value);
258           multiplier = 1000;
259           size_t decimal_len = strlen(decimal_point + 1);
260           if (decimal_len > 3) decimal_len = 3;
261           if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
262                                          &decimal_value)) {
263             return;
264           }
265           uint32_t decimal_multiplier = 1;
266           for (size_t i = 0; i < (3 - decimal_len); ++i) {
267             decimal_multiplier *= 10;
268           }
269           decimal_value *= decimal_multiplier;
270         }
271         uint32_t whole_value;
272         if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
273                                        &whole_value)) {
274           return;
275         }
276         milli_token_ratio =
277             static_cast<int>((whole_value * multiplier) + decimal_value);
278         if (milli_token_ratio <= 0) return;
279       }
280     }
281     parsing_state->retry_throttle_data =
282         grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
283             parsing_state->server_name, max_milli_tokens, milli_token_ratio);
284   }
285 }
286 
287 // Invoked from the resolver NextLocked() callback when the resolver
288 // is shutting down.
on_resolver_shutdown_locked(channel_data * chand,grpc_error * error)289 static void on_resolver_shutdown_locked(channel_data* chand,
290                                         grpc_error* error) {
291   if (grpc_client_channel_trace.enabled()) {
292     gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
293   }
294   if (chand->lb_policy != nullptr) {
295     if (grpc_client_channel_trace.enabled()) {
296       gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
297               chand->lb_policy.get());
298     }
299     grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
300                                      chand->interested_parties);
301     chand->lb_policy.reset();
302   }
303   if (chand->resolver != nullptr) {
304     // This should never happen; it can only be triggered by a resolver
305     // implementation spotaneously deciding to report shutdown without
306     // being orphaned.  This code is included just to be defensive.
307     if (grpc_client_channel_trace.enabled()) {
308       gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
309               chand, chand->resolver.get());
310     }
311     chand->resolver.reset();
312     set_channel_connectivity_state_locked(
313         chand, GRPC_CHANNEL_SHUTDOWN,
314         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
315             "Resolver spontaneous shutdown", &error, 1),
316         "resolver_spontaneous_shutdown");
317   }
318   grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
319                              GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
320                                  "Channel disconnected", &error, 1));
321   GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
322   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
323   grpc_channel_args_destroy(chand->resolver_result);
324   chand->resolver_result = nullptr;
325   GRPC_ERROR_UNREF(error);
326 }
327 
328 // Returns the LB policy name from the resolver result.
329 static grpc_core::UniquePtr<char>
get_lb_policy_name_from_resolver_result_locked(channel_data * chand)330 get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
331   // Find LB policy name in channel args.
332   const grpc_arg* channel_arg =
333       grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
334   const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
335   // Special case: If at least one balancer address is present, we use
336   // the grpclb policy, regardless of what the resolver actually specified.
337   channel_arg =
338       grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
339   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
340     grpc_lb_addresses* addresses =
341         static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
342     if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
343       if (lb_policy_name != nullptr &&
344           gpr_stricmp(lb_policy_name, "grpclb") != 0) {
345         gpr_log(GPR_INFO,
346                 "resolver requested LB policy %s but provided at least one "
347                 "balancer address -- forcing use of grpclb LB policy",
348                 lb_policy_name);
349       }
350       lb_policy_name = "grpclb";
351     }
352   }
353   // Use pick_first if nothing was specified and we didn't select grpclb
354   // above.
355   if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
356   return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
357 }
358 
request_reresolution_locked(void * arg,grpc_error * error)359 static void request_reresolution_locked(void* arg, grpc_error* error) {
360   reresolution_request_args* args =
361       static_cast<reresolution_request_args*>(arg);
362   channel_data* chand = args->chand;
363   // If this invocation is for a stale LB policy, treat it as an LB shutdown
364   // signal.
365   if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
366       chand->resolver == nullptr) {
367     GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
368     gpr_free(args);
369     return;
370   }
371   if (grpc_client_channel_trace.enabled()) {
372     gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
373   }
374   chand->resolver->RequestReresolutionLocked();
375   // Give back the closure to the LB policy.
376   chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
377 }
378 
379 // Creates a new LB policy, replacing any previous one.
380 // If the new policy is created successfully, sets *connectivity_state and
381 // *connectivity_error to its initial connectivity state; otherwise,
382 // leaves them unchanged.
create_new_lb_policy_locked(channel_data * chand,char * lb_policy_name,grpc_connectivity_state * connectivity_state,grpc_error ** connectivity_error)383 static void create_new_lb_policy_locked(
384     channel_data* chand, char* lb_policy_name,
385     grpc_connectivity_state* connectivity_state,
386     grpc_error** connectivity_error) {
387   grpc_core::LoadBalancingPolicy::Args lb_policy_args;
388   lb_policy_args.combiner = chand->combiner;
389   lb_policy_args.client_channel_factory = chand->client_channel_factory;
390   lb_policy_args.args = chand->resolver_result;
391   grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
392       grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
393           lb_policy_name, lb_policy_args);
394   if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
395     gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
396   } else {
397     if (grpc_client_channel_trace.enabled()) {
398       gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
399               lb_policy_name, new_lb_policy.get());
400     }
401     // Swap out the LB policy and update the fds in
402     // chand->interested_parties.
403     if (chand->lb_policy != nullptr) {
404       if (grpc_client_channel_trace.enabled()) {
405         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
406                 chand->lb_policy.get());
407       }
408       grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
409                                        chand->interested_parties);
410       chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
411     }
412     chand->lb_policy = std::move(new_lb_policy);
413     grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
414                                      chand->interested_parties);
415     // Set up re-resolution callback.
416     reresolution_request_args* args =
417         static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
418     args->chand = chand;
419     args->lb_policy = chand->lb_policy.get();
420     GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
421                       grpc_combiner_scheduler(chand->combiner));
422     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
423     chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
424     // Get the new LB policy's initial connectivity state and start a
425     // connectivity watch.
426     GRPC_ERROR_UNREF(*connectivity_error);
427     *connectivity_state =
428         chand->lb_policy->CheckConnectivityLocked(connectivity_error);
429     if (chand->exit_idle_when_lb_policy_arrives) {
430       chand->lb_policy->ExitIdleLocked();
431       chand->exit_idle_when_lb_policy_arrives = false;
432     }
433     watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
434   }
435 }
436 
437 // Returns the service config (as a JSON string) from the resolver result.
438 // Also updates state in chand.
439 static grpc_core::UniquePtr<char>
get_service_config_from_resolver_result_locked(channel_data * chand)440 get_service_config_from_resolver_result_locked(channel_data* chand) {
441   const grpc_arg* channel_arg =
442       grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
443   const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
444   if (service_config_json != nullptr) {
445     if (grpc_client_channel_trace.enabled()) {
446       gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
447               chand, service_config_json);
448     }
449     grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
450         grpc_core::ServiceConfig::Create(service_config_json);
451     if (service_config != nullptr) {
452       if (chand->enable_retries) {
453         channel_arg =
454             grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
455         const char* server_uri = grpc_channel_arg_get_string(channel_arg);
456         GPR_ASSERT(server_uri != nullptr);
457         grpc_uri* uri = grpc_uri_parse(server_uri, true);
458         GPR_ASSERT(uri->path[0] != '\0');
459         service_config_parsing_state parsing_state;
460         parsing_state.server_name =
461             uri->path[0] == '/' ? uri->path + 1 : uri->path;
462         service_config->ParseGlobalParams(parse_retry_throttle_params,
463                                           &parsing_state);
464         grpc_uri_destroy(uri);
465         chand->retry_throttle_data =
466             std::move(parsing_state.retry_throttle_data);
467       }
468       chand->method_params_table = service_config->CreateMethodConfigTable(
469           ClientChannelMethodParams::CreateFromJson);
470     }
471   }
472   return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
473 }
474 
475 // Callback invoked when a resolver result is available.
on_resolver_result_changed_locked(void * arg,grpc_error * error)476 static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
477   channel_data* chand = static_cast<channel_data*>(arg);
478   if (grpc_client_channel_trace.enabled()) {
479     const char* disposition =
480         chand->resolver_result != nullptr
481             ? ""
482             : (error == GRPC_ERROR_NONE ? " (transient error)"
483                                         : " (resolver shutdown)");
484     gpr_log(GPR_INFO,
485             "chand=%p: got resolver result: resolver_result=%p error=%s%s",
486             chand, chand->resolver_result, grpc_error_string(error),
487             disposition);
488   }
489   // Handle shutdown.
490   if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
491     on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
492     return;
493   }
494   // Data used to set the channel's connectivity state.
495   bool set_connectivity_state = true;
496   grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
497   grpc_error* connectivity_error =
498       GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
499   // chand->resolver_result will be null in the case of a transient
500   // resolution error.  In that case, we don't have any new result to
501   // process, which means that we keep using the previous result (if any).
502   if (chand->resolver_result == nullptr) {
503     if (grpc_client_channel_trace.enabled()) {
504       gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
505     }
506   } else {
507     grpc_core::UniquePtr<char> lb_policy_name =
508         get_lb_policy_name_from_resolver_result_locked(chand);
509     // Check to see if we're already using the right LB policy.
510     // Note: It's safe to use chand->info_lb_policy_name here without
511     // taking a lock on chand->info_mu, because this function is the
512     // only thing that modifies its value, and it can only be invoked
513     // once at any given time.
514     bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
515                                   gpr_stricmp(chand->info_lb_policy_name.get(),
516                                               lb_policy_name.get()) != 0;
517     if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
518       // Continue using the same LB policy.  Update with new addresses.
519       if (grpc_client_channel_trace.enabled()) {
520         gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
521                 chand, lb_policy_name.get(), chand->lb_policy.get());
522       }
523       chand->lb_policy->UpdateLocked(*chand->resolver_result);
524       // No need to set the channel's connectivity state; the existing
525       // watch on the LB policy will take care of that.
526       set_connectivity_state = false;
527     } else {
528       // Instantiate new LB policy.
529       create_new_lb_policy_locked(chand, lb_policy_name.get(),
530                                   &connectivity_state, &connectivity_error);
531     }
532     // Find service config.
533     grpc_core::UniquePtr<char> service_config_json =
534         get_service_config_from_resolver_result_locked(chand);
535     // Swap out the data used by cc_get_channel_info().
536     gpr_mu_lock(&chand->info_mu);
537     chand->info_lb_policy_name = std::move(lb_policy_name);
538     chand->info_service_config_json = std::move(service_config_json);
539     gpr_mu_unlock(&chand->info_mu);
540     // Clean up.
541     grpc_channel_args_destroy(chand->resolver_result);
542     chand->resolver_result = nullptr;
543   }
544   // Set the channel's connectivity state if needed.
545   if (set_connectivity_state) {
546     set_channel_connectivity_state_locked(
547         chand, connectivity_state, connectivity_error, "resolver_result");
548   } else {
549     GRPC_ERROR_UNREF(connectivity_error);
550   }
551   // Invoke closures that were waiting for results and renew the watch.
552   GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
553   chand->resolver->NextLocked(&chand->resolver_result,
554                               &chand->on_resolver_result_changed);
555 }
556 
start_transport_op_locked(void * arg,grpc_error * error_ignored)557 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
558   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
559   grpc_channel_element* elem =
560       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
561   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
562 
563   if (op->on_connectivity_state_change != nullptr) {
564     grpc_connectivity_state_notify_on_state_change(
565         &chand->state_tracker, op->connectivity_state,
566         op->on_connectivity_state_change);
567     op->on_connectivity_state_change = nullptr;
568     op->connectivity_state = nullptr;
569   }
570 
571   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
572     if (chand->lb_policy == nullptr) {
573       grpc_error* error =
574           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
575       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
576       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
577     } else {
578       grpc_error* error = GRPC_ERROR_NONE;
579       grpc_core::LoadBalancingPolicy::PickState pick_state;
580       pick_state.initial_metadata = nullptr;
581       pick_state.initial_metadata_flags = 0;
582       pick_state.on_complete = nullptr;
583       memset(&pick_state.subchannel_call_context, 0,
584              sizeof(pick_state.subchannel_call_context));
585       pick_state.user_data = nullptr;
586       // Pick must return synchronously, because pick_state.on_complete is null.
587       GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
588       if (pick_state.connected_subchannel != nullptr) {
589         pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
590                                               op->send_ping.on_ack);
591       } else {
592         if (error == GRPC_ERROR_NONE) {
593           error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
594               "LB policy dropped call on ping");
595         }
596         GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
597         GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
598       }
599       op->bind_pollset = nullptr;
600     }
601     op->send_ping.on_initiate = nullptr;
602     op->send_ping.on_ack = nullptr;
603   }
604 
605   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
606     if (chand->resolver != nullptr) {
607       set_channel_connectivity_state_locked(
608           chand, GRPC_CHANNEL_SHUTDOWN,
609           GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
610       chand->resolver.reset();
611       if (!chand->started_resolving) {
612         grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
613                                    GRPC_ERROR_REF(op->disconnect_with_error));
614         GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
615       }
616       if (chand->lb_policy != nullptr) {
617         grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
618                                          chand->interested_parties);
619         chand->lb_policy.reset();
620       }
621     }
622     GRPC_ERROR_UNREF(op->disconnect_with_error);
623   }
624 
625   if (op->reset_connect_backoff) {
626     if (chand->resolver != nullptr) {
627       chand->resolver->ResetBackoffLocked();
628       chand->resolver->RequestReresolutionLocked();
629     }
630     if (chand->lb_policy != nullptr) {
631       chand->lb_policy->ResetBackoffLocked();
632     }
633   }
634 
635   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
636 
637   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
638 }
639 
cc_start_transport_op(grpc_channel_element * elem,grpc_transport_op * op)640 static void cc_start_transport_op(grpc_channel_element* elem,
641                                   grpc_transport_op* op) {
642   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
643 
644   GPR_ASSERT(op->set_accept_stream == false);
645   if (op->bind_pollset != nullptr) {
646     grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
647   }
648 
649   op->handler_private.extra_arg = elem;
650   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
651   GRPC_CLOSURE_SCHED(
652       GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
653                         op, grpc_combiner_scheduler(chand->combiner)),
654       GRPC_ERROR_NONE);
655 }
656 
cc_get_channel_info(grpc_channel_element * elem,const grpc_channel_info * info)657 static void cc_get_channel_info(grpc_channel_element* elem,
658                                 const grpc_channel_info* info) {
659   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
660   gpr_mu_lock(&chand->info_mu);
661   if (info->lb_policy_name != nullptr) {
662     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
663   }
664   if (info->service_config_json != nullptr) {
665     *info->service_config_json =
666         gpr_strdup(chand->info_service_config_json.get());
667   }
668   gpr_mu_unlock(&chand->info_mu);
669 }
670 
671 /* Constructor for channel_data */
cc_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)672 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
673                                         grpc_channel_element_args* args) {
674   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
675   GPR_ASSERT(args->is_last);
676   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
677   // Initialize data members.
678   chand->combiner = grpc_combiner_create();
679   gpr_mu_init(&chand->info_mu);
680   gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
681 
682   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
683   chand->external_connectivity_watcher_list_head = nullptr;
684   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
685 
686   chand->owning_stack = args->channel_stack;
687   GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
688                     on_resolver_result_changed_locked, chand,
689                     grpc_combiner_scheduler(chand->combiner));
690   chand->interested_parties = grpc_pollset_set_create();
691   grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
692                                "client_channel");
693   grpc_client_channel_start_backup_polling(chand->interested_parties);
694   // Record max per-RPC retry buffer size.
695   const grpc_arg* arg = grpc_channel_args_find(
696       args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
697   chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
698       arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
699   // Record enable_retries.
700   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
701   chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
702   // Record client channel factory.
703   arg = grpc_channel_args_find(args->channel_args,
704                                GRPC_ARG_CLIENT_CHANNEL_FACTORY);
705   if (arg == nullptr) {
706     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
707         "Missing client channel factory in args for client channel filter");
708   }
709   if (arg->type != GRPC_ARG_POINTER) {
710     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
711         "client channel factory arg must be a pointer");
712   }
713   grpc_client_channel_factory_ref(
714       static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
715   chand->client_channel_factory =
716       static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
717   // Get server name to resolve, using proxy mapper if needed.
718   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
719   if (arg == nullptr) {
720     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721         "Missing server uri in args for client channel filter");
722   }
723   if (arg->type != GRPC_ARG_STRING) {
724     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
725         "server uri arg must be a string");
726   }
727   char* proxy_name = nullptr;
728   grpc_channel_args* new_args = nullptr;
729   grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
730                               &proxy_name, &new_args);
731   // Instantiate resolver.
732   chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
733       proxy_name != nullptr ? proxy_name : arg->value.string,
734       new_args != nullptr ? new_args : args->channel_args,
735       chand->interested_parties, chand->combiner);
736   if (proxy_name != nullptr) gpr_free(proxy_name);
737   if (new_args != nullptr) grpc_channel_args_destroy(new_args);
738   if (chand->resolver == nullptr) {
739     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
740   }
741   chand->deadline_checking_enabled =
742       grpc_deadline_checking_enabled(args->channel_args);
743   return GRPC_ERROR_NONE;
744 }
745 
746 /* Destructor for channel_data */
cc_destroy_channel_elem(grpc_channel_element * elem)747 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
748   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
749   if (chand->resolver != nullptr) {
750     // The only way we can get here is if we never started resolving,
751     // because we take a ref to the channel stack when we start
752     // resolving and do not release it until the resolver callback is
753     // invoked after the resolver shuts down.
754     chand->resolver.reset();
755   }
756   if (chand->client_channel_factory != nullptr) {
757     grpc_client_channel_factory_unref(chand->client_channel_factory);
758   }
759   if (chand->lb_policy != nullptr) {
760     grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
761                                      chand->interested_parties);
762     chand->lb_policy.reset();
763   }
764   // TODO(roth): Once we convert the filter API to C++, there will no
765   // longer be any need to explicitly reset these smart pointer data members.
766   chand->info_lb_policy_name.reset();
767   chand->info_service_config_json.reset();
768   chand->retry_throttle_data.reset();
769   chand->method_params_table.reset();
770   grpc_client_channel_stop_backup_polling(chand->interested_parties);
771   grpc_connectivity_state_destroy(&chand->state_tracker);
772   grpc_pollset_set_destroy(chand->interested_parties);
773   GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
774   gpr_mu_destroy(&chand->info_mu);
775   gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
776 }
777 
778 /*************************************************************************
779  * PER-CALL FUNCTIONS
780  */
781 
782 // Max number of batches that can be pending on a call at any given
783 // time.  This includes one batch for each of the following ops:
784 //   recv_initial_metadata
785 //   send_initial_metadata
786 //   recv_message
787 //   send_message
788 //   recv_trailing_metadata
789 //   send_trailing_metadata
790 #define MAX_PENDING_BATCHES 6
791 
792 // Retry support:
793 //
794 // In order to support retries, we act as a proxy for stream op batches.
795 // When we get a batch from the surface, we add it to our list of pending
796 // batches, and we then use those batches to construct separate "child"
797 // batches to be started on the subchannel call.  When the child batches
798 // return, we then decide which pending batches have been completed and
799 // schedule their callbacks accordingly.  If a subchannel call fails and
800 // we want to retry it, we do a new pick and start again, constructing
801 // new "child" batches for the new subchannel call.
802 //
803 // Note that retries are committed when receiving data from the server
804 // (except for Trailers-Only responses).  However, there may be many
805 // send ops started before receiving any data, so we may have already
806 // completed some number of send ops (and returned the completions up to
807 // the surface) by the time we realize that we need to retry.  To deal
808 // with this, we cache data for send ops, so that we can replay them on a
809 // different subchannel call even after we have completed the original
810 // batches.
811 //
812 // There are two sets of data to maintain:
813 // - In call_data (in the parent channel), we maintain a list of pending
814 //   ops and cached data for send ops.
815 // - In the subchannel call, we maintain state to indicate what ops have
816 //   already been sent down to that call.
817 //
818 // When constructing the "child" batches, we compare those two sets of
819 // data to see which batches need to be sent to the subchannel call.
820 
821 // TODO(roth): In subsequent PRs:
822 // - add support for transparent retries (including initial metadata)
823 // - figure out how to record stats in census for retries
824 //   (census filter is on top of this one)
825 // - add census stats for retries
826 
827 // State used for starting a retryable batch on a subchannel call.
828 // This provides its own grpc_transport_stream_op_batch and other data
829 // structures needed to populate the ops in the batch.
830 // We allocate one struct on the arena for each attempt at starting a
831 // batch on a given subchannel call.
832 typedef struct {
833   gpr_refcount refs;
834   grpc_call_element* elem;
835   grpc_subchannel_call* subchannel_call;  // Holds a ref.
836   // The batch to use in the subchannel call.
837   // Its payload field points to subchannel_call_retry_state.batch_payload.
838   grpc_transport_stream_op_batch batch;
839   // For intercepting on_complete.
840   grpc_closure on_complete;
841 } subchannel_batch_data;
842 
843 // Retry state associated with a subchannel call.
844 // Stored in the parent_data of the subchannel call object.
845 typedef struct {
846   // subchannel_batch_data.batch.payload points to this.
847   grpc_transport_stream_op_batch_payload batch_payload;
848   // For send_initial_metadata.
849   // Note that we need to make a copy of the initial metadata for each
850   // subchannel call instead of just referring to the copy in call_data,
851   // because filters in the subchannel stack will probably add entries,
852   // so we need to start in a pristine state for each attempt of the call.
853   grpc_linked_mdelem* send_initial_metadata_storage;
854   grpc_metadata_batch send_initial_metadata;
855   // For send_message.
856   grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
857       send_message;
858   // For send_trailing_metadata.
859   grpc_linked_mdelem* send_trailing_metadata_storage;
860   grpc_metadata_batch send_trailing_metadata;
861   // For intercepting recv_initial_metadata.
862   grpc_metadata_batch recv_initial_metadata;
863   grpc_closure recv_initial_metadata_ready;
864   bool trailing_metadata_available;
865   // For intercepting recv_message.
866   grpc_closure recv_message_ready;
867   grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
868   // For intercepting recv_trailing_metadata.
869   grpc_metadata_batch recv_trailing_metadata;
870   grpc_transport_stream_stats collect_stats;
871   grpc_closure recv_trailing_metadata_ready;
872   // These fields indicate which ops have been started and completed on
873   // this subchannel call.
874   size_t started_send_message_count;
875   size_t completed_send_message_count;
876   size_t started_recv_message_count;
877   size_t completed_recv_message_count;
878   bool started_send_initial_metadata : 1;
879   bool completed_send_initial_metadata : 1;
880   bool started_send_trailing_metadata : 1;
881   bool completed_send_trailing_metadata : 1;
882   bool started_recv_initial_metadata : 1;
883   bool completed_recv_initial_metadata : 1;
884   bool started_recv_trailing_metadata : 1;
885   bool completed_recv_trailing_metadata : 1;
886   // State for callback processing.
887   bool retry_dispatched : 1;
888   subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
889   grpc_error* recv_initial_metadata_error;
890   subchannel_batch_data* recv_message_ready_deferred_batch;
891   grpc_error* recv_message_error;
892   subchannel_batch_data* recv_trailing_metadata_internal_batch;
893 } subchannel_call_retry_state;
894 
895 // Pending batches stored in call data.
896 typedef struct {
897   // The pending batch.  If nullptr, this slot is empty.
898   grpc_transport_stream_op_batch* batch;
899   // Indicates whether payload for send ops has been cached in call data.
900   bool send_ops_cached;
901 } pending_batch;
902 
903 /** Call data.  Holds a pointer to grpc_subchannel_call and the
904     associated machinery to create such a pointer.
905     Handles queueing of stream ops until a call object is ready, waiting
906     for initial metadata before trying to create a call object,
907     and handling cancellation gracefully. */
908 typedef struct client_channel_call_data {
909   // State for handling deadlines.
910   // The code in deadline_filter.c requires this to be the first field.
911   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
912   // and this struct both independently store pointers to the call stack
913   // and call combiner.  If/when we have time, find a way to avoid this
914   // without breaking the grpc_deadline_state abstraction.
915   grpc_deadline_state deadline_state;
916 
917   grpc_slice path;  // Request path.
918   gpr_timespec call_start_time;
919   grpc_millis deadline;
920   gpr_arena* arena;
921   grpc_call_stack* owning_call;
922   grpc_call_combiner* call_combiner;
923 
924   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
925   grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
926 
927   grpc_subchannel_call* subchannel_call;
928 
929   // Set when we get a cancel_stream op.
930   grpc_error* cancel_error;
931 
932   grpc_core::LoadBalancingPolicy::PickState pick;
933   grpc_closure pick_closure;
934   grpc_closure pick_cancel_closure;
935 
936   // state needed to support channelz interception of recv trailing metadata.
937   grpc_closure recv_trailing_metadata_ready_channelz;
938   grpc_closure* original_recv_trailing_metadata;
939   grpc_metadata_batch* recv_trailing_metadata;
940 
941   grpc_polling_entity* pollent;
942   bool pollent_added_to_interested_parties;
943 
944   // Batches are added to this list when received from above.
945   // They are removed when we are done handling the batch (i.e., when
946   // either we have invoked all of the batch's callbacks or we have
947   // passed the batch down to the subchannel call and are not
948   // intercepting any of its callbacks).
949   pending_batch pending_batches[MAX_PENDING_BATCHES];
950   bool pending_send_initial_metadata : 1;
951   bool pending_send_message : 1;
952   bool pending_send_trailing_metadata : 1;
953 
954   // Retry state.
955   bool enable_retries : 1;
956   bool retry_committed : 1;
957   bool last_attempt_got_server_pushback : 1;
958   int num_attempts_completed;
959   size_t bytes_buffered_for_retry;
960   grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
961   grpc_timer retry_timer;
962 
963   // The number of pending retriable subchannel batches containing send ops.
964   // We hold a ref to the call stack while this is non-zero, since replay
965   // batches may not complete until after all callbacks have been returned
966   // to the surface, and we need to make sure that the call is not destroyed
967   // until all of these batches have completed.
968   // Note that we actually only need to track replay batches, but it's
969   // easier to track all batches with send ops.
970   int num_pending_retriable_subchannel_send_batches;
971 
972   // Cached data for retrying send ops.
973   // send_initial_metadata
974   bool seen_send_initial_metadata;
975   grpc_linked_mdelem* send_initial_metadata_storage;
976   grpc_metadata_batch send_initial_metadata;
977   uint32_t send_initial_metadata_flags;
978   gpr_atm* peer_string;
979   // send_message
980   // When we get a send_message op, we replace the original byte stream
981   // with a CachingByteStream that caches the slices to a local buffer for
982   // use in retries.
983   // Note: We inline the cache for the first 3 send_message ops and use
984   // dynamic allocation after that.  This number was essentially picked
985   // at random; it could be changed in the future to tune performance.
986   grpc_core::ManualConstructor<
987       grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
988       send_messages;
989   // send_trailing_metadata
990   bool seen_send_trailing_metadata;
991   grpc_linked_mdelem* send_trailing_metadata_storage;
992   grpc_metadata_batch send_trailing_metadata;
993 } call_data;
994 
995 // Forward declarations.
996 static void retry_commit(grpc_call_element* elem,
997                          subchannel_call_retry_state* retry_state);
998 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
999 static void on_complete(void* arg, grpc_error* error);
1000 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
1001 static void start_pick_locked(void* arg, grpc_error* ignored);
1002 static void maybe_intercept_recv_trailing_metadata_for_channelz(
1003     grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
1004 
1005 //
1006 // send op data caching
1007 //
1008 
1009 // Caches data for send ops so that it can be retried later, if not
1010 // already cached.
maybe_cache_send_ops_for_batch(call_data * calld,pending_batch * pending)1011 static void maybe_cache_send_ops_for_batch(call_data* calld,
1012                                            pending_batch* pending) {
1013   if (pending->send_ops_cached) return;
1014   pending->send_ops_cached = true;
1015   grpc_transport_stream_op_batch* batch = pending->batch;
1016   // Save a copy of metadata for send_initial_metadata ops.
1017   if (batch->send_initial_metadata) {
1018     calld->seen_send_initial_metadata = true;
1019     GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
1020     grpc_metadata_batch* send_initial_metadata =
1021         batch->payload->send_initial_metadata.send_initial_metadata;
1022     calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
1023         calld->arena,
1024         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1025     grpc_metadata_batch_copy(send_initial_metadata,
1026                              &calld->send_initial_metadata,
1027                              calld->send_initial_metadata_storage);
1028     calld->send_initial_metadata_flags =
1029         batch->payload->send_initial_metadata.send_initial_metadata_flags;
1030     calld->peer_string = batch->payload->send_initial_metadata.peer_string;
1031   }
1032   // Set up cache for send_message ops.
1033   if (batch->send_message) {
1034     grpc_core::ByteStreamCache* cache =
1035         static_cast<grpc_core::ByteStreamCache*>(
1036             gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
1037     new (cache) grpc_core::ByteStreamCache(
1038         std::move(batch->payload->send_message.send_message));
1039     calld->send_messages->push_back(cache);
1040   }
1041   // Save metadata batch for send_trailing_metadata ops.
1042   if (batch->send_trailing_metadata) {
1043     calld->seen_send_trailing_metadata = true;
1044     GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
1045     grpc_metadata_batch* send_trailing_metadata =
1046         batch->payload->send_trailing_metadata.send_trailing_metadata;
1047     calld->send_trailing_metadata_storage =
1048         (grpc_linked_mdelem*)gpr_arena_alloc(
1049             calld->arena,
1050             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1051     grpc_metadata_batch_copy(send_trailing_metadata,
1052                              &calld->send_trailing_metadata,
1053                              calld->send_trailing_metadata_storage);
1054   }
1055 }
1056 
1057 // Frees cached send_initial_metadata.
free_cached_send_initial_metadata(channel_data * chand,call_data * calld)1058 static void free_cached_send_initial_metadata(channel_data* chand,
1059                                               call_data* calld) {
1060   if (grpc_client_channel_trace.enabled()) {
1061     gpr_log(GPR_INFO,
1062             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1063             calld);
1064   }
1065   grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1066 }
1067 
1068 // Frees cached send_message at index idx.
free_cached_send_message(channel_data * chand,call_data * calld,size_t idx)1069 static void free_cached_send_message(channel_data* chand, call_data* calld,
1070                                      size_t idx) {
1071   if (grpc_client_channel_trace.enabled()) {
1072     gpr_log(GPR_INFO,
1073             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1074             chand, calld, idx);
1075   }
1076   (*calld->send_messages)[idx]->Destroy();
1077 }
1078 
1079 // Frees cached send_trailing_metadata.
free_cached_send_trailing_metadata(channel_data * chand,call_data * calld)1080 static void free_cached_send_trailing_metadata(channel_data* chand,
1081                                                call_data* calld) {
1082   if (grpc_client_channel_trace.enabled()) {
1083     gpr_log(GPR_INFO,
1084             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1085             chand, calld);
1086   }
1087   grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1088 }
1089 
1090 // Frees cached send ops that have already been completed after
1091 // committing the call.
free_cached_send_op_data_after_commit(grpc_call_element * elem,subchannel_call_retry_state * retry_state)1092 static void free_cached_send_op_data_after_commit(
1093     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
1094   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1095   call_data* calld = static_cast<call_data*>(elem->call_data);
1096   if (retry_state->completed_send_initial_metadata) {
1097     free_cached_send_initial_metadata(chand, calld);
1098   }
1099   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1100     free_cached_send_message(chand, calld, i);
1101   }
1102   if (retry_state->completed_send_trailing_metadata) {
1103     free_cached_send_trailing_metadata(chand, calld);
1104   }
1105 }
1106 
1107 // Frees cached send ops that were completed by the completed batch in
1108 // batch_data.  Used when batches are completed after the call is committed.
free_cached_send_op_data_for_completed_batch(grpc_call_element * elem,subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state)1109 static void free_cached_send_op_data_for_completed_batch(
1110     grpc_call_element* elem, subchannel_batch_data* batch_data,
1111     subchannel_call_retry_state* retry_state) {
1112   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1113   call_data* calld = static_cast<call_data*>(elem->call_data);
1114   if (batch_data->batch.send_initial_metadata) {
1115     free_cached_send_initial_metadata(chand, calld);
1116   }
1117   if (batch_data->batch.send_message) {
1118     free_cached_send_message(chand, calld,
1119                              retry_state->completed_send_message_count - 1);
1120   }
1121   if (batch_data->batch.send_trailing_metadata) {
1122     free_cached_send_trailing_metadata(chand, calld);
1123   }
1124 }
1125 
1126 //
1127 // pending_batches management
1128 //
1129 
1130 // Returns the index into calld->pending_batches to be used for batch.
get_batch_index(grpc_transport_stream_op_batch * batch)1131 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1132   // Note: It is important the send_initial_metadata be the first entry
1133   // here, since the code in pick_subchannel_locked() assumes it will be.
1134   if (batch->send_initial_metadata) return 0;
1135   if (batch->send_message) return 1;
1136   if (batch->send_trailing_metadata) return 2;
1137   if (batch->recv_initial_metadata) return 3;
1138   if (batch->recv_message) return 4;
1139   if (batch->recv_trailing_metadata) return 5;
1140   GPR_UNREACHABLE_CODE(return (size_t)-1);
1141 }
1142 
1143 // This is called via the call combiner, so access to calld is synchronized.
pending_batches_add(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1144 static void pending_batches_add(grpc_call_element* elem,
1145                                 grpc_transport_stream_op_batch* batch) {
1146   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1147   call_data* calld = static_cast<call_data*>(elem->call_data);
1148   const size_t idx = get_batch_index(batch);
1149   if (grpc_client_channel_trace.enabled()) {
1150     gpr_log(GPR_INFO,
1151             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1152             calld, idx);
1153   }
1154   pending_batch* pending = &calld->pending_batches[idx];
1155   GPR_ASSERT(pending->batch == nullptr);
1156   pending->batch = batch;
1157   pending->send_ops_cached = false;
1158   if (calld->enable_retries) {
1159     // Update state in calld about pending batches.
1160     // Also check if the batch takes us over the retry buffer limit.
1161     // Note: We don't check the size of trailing metadata here, because
1162     // gRPC clients do not send trailing metadata.
1163     if (batch->send_initial_metadata) {
1164       calld->pending_send_initial_metadata = true;
1165       calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1166           batch->payload->send_initial_metadata.send_initial_metadata);
1167     }
1168     if (batch->send_message) {
1169       calld->pending_send_message = true;
1170       calld->bytes_buffered_for_retry +=
1171           batch->payload->send_message.send_message->length();
1172     }
1173     if (batch->send_trailing_metadata) {
1174       calld->pending_send_trailing_metadata = true;
1175     }
1176     if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
1177                      chand->per_rpc_retry_buffer_size)) {
1178       if (grpc_client_channel_trace.enabled()) {
1179         gpr_log(GPR_INFO,
1180                 "chand=%p calld=%p: exceeded retry buffer size, committing",
1181                 chand, calld);
1182       }
1183       subchannel_call_retry_state* retry_state =
1184           calld->subchannel_call == nullptr
1185               ? nullptr
1186               : static_cast<subchannel_call_retry_state*>(
1187                     grpc_connected_subchannel_call_get_parent_data(
1188                         calld->subchannel_call));
1189       retry_commit(elem, retry_state);
1190       // If we are not going to retry and have not yet started, pretend
1191       // retries are disabled so that we don't bother with retry overhead.
1192       if (calld->num_attempts_completed == 0) {
1193         if (grpc_client_channel_trace.enabled()) {
1194           gpr_log(GPR_INFO,
1195                   "chand=%p calld=%p: disabling retries before first attempt",
1196                   chand, calld);
1197         }
1198         calld->enable_retries = false;
1199       }
1200     }
1201   }
1202 }
1203 
pending_batch_clear(call_data * calld,pending_batch * pending)1204 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1205   if (calld->enable_retries) {
1206     if (pending->batch->send_initial_metadata) {
1207       calld->pending_send_initial_metadata = false;
1208     }
1209     if (pending->batch->send_message) {
1210       calld->pending_send_message = false;
1211     }
1212     if (pending->batch->send_trailing_metadata) {
1213       calld->pending_send_trailing_metadata = false;
1214     }
1215   }
1216   pending->batch = nullptr;
1217 }
1218 
1219 // This is called via the call combiner, so access to calld is synchronized.
fail_pending_batch_in_call_combiner(void * arg,grpc_error * error)1220 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1221   grpc_transport_stream_op_batch* batch =
1222       static_cast<grpc_transport_stream_op_batch*>(arg);
1223   call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1224   // Note: This will release the call combiner.
1225   grpc_transport_stream_op_batch_finish_with_failure(
1226       batch, GRPC_ERROR_REF(error), calld->call_combiner);
1227 }
1228 
1229 // This is called via the call combiner, so access to calld is synchronized.
1230 // If yield_call_combiner is true, assumes responsibility for yielding
1231 // the call combiner.
pending_batches_fail(grpc_call_element * elem,grpc_error * error,bool yield_call_combiner)1232 static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1233                                  bool yield_call_combiner) {
1234   GPR_ASSERT(error != GRPC_ERROR_NONE);
1235   call_data* calld = static_cast<call_data*>(elem->call_data);
1236   if (grpc_client_channel_trace.enabled()) {
1237     size_t num_batches = 0;
1238     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1239       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1240     }
1241     gpr_log(GPR_INFO,
1242             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1243             elem->channel_data, calld, num_batches, grpc_error_string(error));
1244   }
1245   grpc_core::CallCombinerClosureList closures;
1246   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1247     pending_batch* pending = &calld->pending_batches[i];
1248     grpc_transport_stream_op_batch* batch = pending->batch;
1249     if (batch != nullptr) {
1250       batch->handler_private.extra_arg = calld;
1251       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1252                         fail_pending_batch_in_call_combiner, batch,
1253                         grpc_schedule_on_exec_ctx);
1254       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1255                    "pending_batches_fail");
1256       pending_batch_clear(calld, pending);
1257     }
1258   }
1259   if (yield_call_combiner) {
1260     closures.RunClosures(calld->call_combiner);
1261   } else {
1262     closures.RunClosuresWithoutYielding(calld->call_combiner);
1263   }
1264   GRPC_ERROR_UNREF(error);
1265 }
1266 
1267 // This is called via the call combiner, so access to calld is synchronized.
resume_pending_batch_in_call_combiner(void * arg,grpc_error * ignored)1268 static void resume_pending_batch_in_call_combiner(void* arg,
1269                                                   grpc_error* ignored) {
1270   grpc_transport_stream_op_batch* batch =
1271       static_cast<grpc_transport_stream_op_batch*>(arg);
1272   grpc_subchannel_call* subchannel_call =
1273       static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1274   // Note: This will release the call combiner.
1275   grpc_subchannel_call_process_op(subchannel_call, batch);
1276 }
1277 
1278 // This is called via the call combiner, so access to calld is synchronized.
pending_batches_resume(grpc_call_element * elem)1279 static void pending_batches_resume(grpc_call_element* elem) {
1280   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1281   call_data* calld = static_cast<call_data*>(elem->call_data);
1282   if (calld->enable_retries) {
1283     start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1284     return;
1285   }
1286   // Retries not enabled; send down batches as-is.
1287   if (grpc_client_channel_trace.enabled()) {
1288     size_t num_batches = 0;
1289     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1290       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1291     }
1292     gpr_log(GPR_INFO,
1293             "chand=%p calld=%p: starting %" PRIuPTR
1294             " pending batches on subchannel_call=%p",
1295             chand, calld, num_batches, calld->subchannel_call);
1296   }
1297   grpc_core::CallCombinerClosureList closures;
1298   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1299     pending_batch* pending = &calld->pending_batches[i];
1300     grpc_transport_stream_op_batch* batch = pending->batch;
1301     if (batch != nullptr) {
1302       maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch);
1303       batch->handler_private.extra_arg = calld->subchannel_call;
1304       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1305                         resume_pending_batch_in_call_combiner, batch,
1306                         grpc_schedule_on_exec_ctx);
1307       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1308                    "pending_batches_resume");
1309       pending_batch_clear(calld, pending);
1310     }
1311   }
1312   // Note: This will release the call combiner.
1313   closures.RunClosures(calld->call_combiner);
1314 }
1315 
maybe_clear_pending_batch(grpc_call_element * elem,pending_batch * pending)1316 static void maybe_clear_pending_batch(grpc_call_element* elem,
1317                                       pending_batch* pending) {
1318   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1319   call_data* calld = static_cast<call_data*>(elem->call_data);
1320   grpc_transport_stream_op_batch* batch = pending->batch;
1321   // We clear the pending batch if all of its callbacks have been
1322   // scheduled and reset to nullptr.
1323   if (batch->on_complete == nullptr &&
1324       (!batch->recv_initial_metadata ||
1325        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1326            nullptr) &&
1327       (!batch->recv_message ||
1328        batch->payload->recv_message.recv_message_ready == nullptr) &&
1329       (!batch->recv_trailing_metadata ||
1330        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1331            nullptr)) {
1332     if (grpc_client_channel_trace.enabled()) {
1333       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
1334               calld);
1335     }
1336     pending_batch_clear(calld, pending);
1337   }
1338 }
1339 
1340 // Returns a pointer to the first pending batch for which predicate(batch)
1341 // returns true, or null if not found.
1342 template <typename Predicate>
pending_batch_find(grpc_call_element * elem,const char * log_message,Predicate predicate)1343 static pending_batch* pending_batch_find(grpc_call_element* elem,
1344                                          const char* log_message,
1345                                          Predicate predicate) {
1346   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1347   call_data* calld = static_cast<call_data*>(elem->call_data);
1348   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1349     pending_batch* pending = &calld->pending_batches[i];
1350     grpc_transport_stream_op_batch* batch = pending->batch;
1351     if (batch != nullptr && predicate(batch)) {
1352       if (grpc_client_channel_trace.enabled()) {
1353         gpr_log(GPR_INFO,
1354                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1355                 calld, log_message, i);
1356       }
1357       return pending;
1358     }
1359   }
1360   return nullptr;
1361 }
1362 
1363 //
1364 // retry code
1365 //
1366 
1367 // Commits the call so that no further retry attempts will be performed.
retry_commit(grpc_call_element * elem,subchannel_call_retry_state * retry_state)1368 static void retry_commit(grpc_call_element* elem,
1369                          subchannel_call_retry_state* retry_state) {
1370   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1371   call_data* calld = static_cast<call_data*>(elem->call_data);
1372   if (calld->retry_committed) return;
1373   calld->retry_committed = true;
1374   if (grpc_client_channel_trace.enabled()) {
1375     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
1376   }
1377   if (retry_state != nullptr) {
1378     free_cached_send_op_data_after_commit(elem, retry_state);
1379   }
1380 }
1381 
1382 // Starts a retry after appropriate back-off.
do_retry(grpc_call_element * elem,subchannel_call_retry_state * retry_state,grpc_millis server_pushback_ms)1383 static void do_retry(grpc_call_element* elem,
1384                      subchannel_call_retry_state* retry_state,
1385                      grpc_millis server_pushback_ms) {
1386   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1387   call_data* calld = static_cast<call_data*>(elem->call_data);
1388   GPR_ASSERT(calld->method_params != nullptr);
1389   const ClientChannelMethodParams::RetryPolicy* retry_policy =
1390       calld->method_params->retry_policy();
1391   GPR_ASSERT(retry_policy != nullptr);
1392   // Reset subchannel call and connected subchannel.
1393   if (calld->subchannel_call != nullptr) {
1394     GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1395                                "client_channel_call_retry");
1396     calld->subchannel_call = nullptr;
1397   }
1398   if (calld->pick.connected_subchannel != nullptr) {
1399     calld->pick.connected_subchannel.reset();
1400   }
1401   // Compute backoff delay.
1402   grpc_millis next_attempt_time;
1403   if (server_pushback_ms >= 0) {
1404     next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1405     calld->last_attempt_got_server_pushback = true;
1406   } else {
1407     if (calld->num_attempts_completed == 1 ||
1408         calld->last_attempt_got_server_pushback) {
1409       calld->retry_backoff.Init(
1410           grpc_core::BackOff::Options()
1411               .set_initial_backoff(retry_policy->initial_backoff)
1412               .set_multiplier(retry_policy->backoff_multiplier)
1413               .set_jitter(RETRY_BACKOFF_JITTER)
1414               .set_max_backoff(retry_policy->max_backoff));
1415       calld->last_attempt_got_server_pushback = false;
1416     }
1417     next_attempt_time = calld->retry_backoff->NextAttemptTime();
1418   }
1419   if (grpc_client_channel_trace.enabled()) {
1420     gpr_log(GPR_INFO,
1421             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
1422             calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1423   }
1424   // Schedule retry after computed delay.
1425   GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1426                     grpc_combiner_scheduler(chand->combiner));
1427   grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1428   // Update bookkeeping.
1429   if (retry_state != nullptr) retry_state->retry_dispatched = true;
1430 }
1431 
1432 // Returns true if the call is being retried.
maybe_retry(grpc_call_element * elem,subchannel_batch_data * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)1433 static bool maybe_retry(grpc_call_element* elem,
1434                         subchannel_batch_data* batch_data,
1435                         grpc_status_code status,
1436                         grpc_mdelem* server_pushback_md) {
1437   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1438   call_data* calld = static_cast<call_data*>(elem->call_data);
1439   // Get retry policy.
1440   if (calld->method_params == nullptr) return false;
1441   const ClientChannelMethodParams::RetryPolicy* retry_policy =
1442       calld->method_params->retry_policy();
1443   if (retry_policy == nullptr) return false;
1444   // If we've already dispatched a retry from this call, return true.
1445   // This catches the case where the batch has multiple callbacks
1446   // (i.e., it includes either recv_message or recv_initial_metadata).
1447   subchannel_call_retry_state* retry_state = nullptr;
1448   if (batch_data != nullptr) {
1449     retry_state = static_cast<subchannel_call_retry_state*>(
1450         grpc_connected_subchannel_call_get_parent_data(
1451             batch_data->subchannel_call));
1452     if (retry_state->retry_dispatched) {
1453       if (grpc_client_channel_trace.enabled()) {
1454         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
1455                 calld);
1456       }
1457       return true;
1458     }
1459   }
1460   // Check status.
1461   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
1462     if (calld->retry_throttle_data != nullptr) {
1463       calld->retry_throttle_data->RecordSuccess();
1464     }
1465     if (grpc_client_channel_trace.enabled()) {
1466       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
1467     }
1468     return false;
1469   }
1470   // Status is not OK.  Check whether the status is retryable.
1471   if (!retry_policy->retryable_status_codes.Contains(status)) {
1472     if (grpc_client_channel_trace.enabled()) {
1473       gpr_log(GPR_INFO,
1474               "chand=%p calld=%p: status %s not configured as retryable", chand,
1475               calld, grpc_status_code_to_string(status));
1476     }
1477     return false;
1478   }
1479   // Record the failure and check whether retries are throttled.
1480   // Note that it's important for this check to come after the status
1481   // code check above, since we should only record failures whose statuses
1482   // match the configured retryable status codes, so that we don't count
1483   // things like failures due to malformed requests (INVALID_ARGUMENT).
1484   // Conversely, it's important for this to come before the remaining
1485   // checks, so that we don't fail to record failures due to other factors.
1486   if (calld->retry_throttle_data != nullptr &&
1487       !calld->retry_throttle_data->RecordFailure()) {
1488     if (grpc_client_channel_trace.enabled()) {
1489       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
1490     }
1491     return false;
1492   }
1493   // Check whether the call is committed.
1494   if (calld->retry_committed) {
1495     if (grpc_client_channel_trace.enabled()) {
1496       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
1497               calld);
1498     }
1499     return false;
1500   }
1501   // Check whether we have retries remaining.
1502   ++calld->num_attempts_completed;
1503   if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1504     if (grpc_client_channel_trace.enabled()) {
1505       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1506               calld, retry_policy->max_attempts);
1507     }
1508     return false;
1509   }
1510   // If the call was cancelled from the surface, don't retry.
1511   if (calld->cancel_error != GRPC_ERROR_NONE) {
1512     if (grpc_client_channel_trace.enabled()) {
1513       gpr_log(GPR_INFO,
1514               "chand=%p calld=%p: call cancelled from surface, not retrying",
1515               chand, calld);
1516     }
1517     return false;
1518   }
1519   // Check server push-back.
1520   grpc_millis server_pushback_ms = -1;
1521   if (server_pushback_md != nullptr) {
1522     // If the value is "-1" or any other unparseable string, we do not retry.
1523     uint32_t ms;
1524     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1525       if (grpc_client_channel_trace.enabled()) {
1526         gpr_log(GPR_INFO,
1527                 "chand=%p calld=%p: not retrying due to server push-back",
1528                 chand, calld);
1529       }
1530       return false;
1531     } else {
1532       if (grpc_client_channel_trace.enabled()) {
1533         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1534                 chand, calld, ms);
1535       }
1536       server_pushback_ms = (grpc_millis)ms;
1537     }
1538   }
1539   do_retry(elem, retry_state, server_pushback_ms);
1540   return true;
1541 }
1542 
1543 //
1544 // subchannel_batch_data
1545 //
1546 
1547 // Creates a subchannel_batch_data object on the call's arena with the
1548 // specified refcount.  If set_on_complete is true, the batch's
1549 // on_complete callback will be set to point to on_complete();
1550 // otherwise, the batch's on_complete callback will be null.
batch_data_create(grpc_call_element * elem,int refcount,bool set_on_complete)1551 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1552                                                 int refcount,
1553                                                 bool set_on_complete) {
1554   call_data* calld = static_cast<call_data*>(elem->call_data);
1555   subchannel_call_retry_state* retry_state =
1556       static_cast<subchannel_call_retry_state*>(
1557           grpc_connected_subchannel_call_get_parent_data(
1558               calld->subchannel_call));
1559   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1560       gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1561   batch_data->elem = elem;
1562   batch_data->subchannel_call =
1563       GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1564   batch_data->batch.payload = &retry_state->batch_payload;
1565   gpr_ref_init(&batch_data->refs, refcount);
1566   if (set_on_complete) {
1567     GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1568                       grpc_schedule_on_exec_ctx);
1569     batch_data->batch.on_complete = &batch_data->on_complete;
1570   }
1571   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1572   return batch_data;
1573 }
1574 
batch_data_unref(subchannel_batch_data * batch_data)1575 static void batch_data_unref(subchannel_batch_data* batch_data) {
1576   if (gpr_unref(&batch_data->refs)) {
1577     subchannel_call_retry_state* retry_state =
1578         static_cast<subchannel_call_retry_state*>(
1579             grpc_connected_subchannel_call_get_parent_data(
1580                 batch_data->subchannel_call));
1581     if (batch_data->batch.send_initial_metadata) {
1582       grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
1583     }
1584     if (batch_data->batch.send_trailing_metadata) {
1585       grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
1586     }
1587     if (batch_data->batch.recv_initial_metadata) {
1588       grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
1589     }
1590     if (batch_data->batch.recv_trailing_metadata) {
1591       grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
1592     }
1593     GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1594     call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1595     GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1596   }
1597 }
1598 
1599 //
1600 // recv_initial_metadata callback handling
1601 //
1602 
1603 // Invokes recv_initial_metadata_ready for a subchannel batch.
invoke_recv_initial_metadata_callback(void * arg,grpc_error * error)1604 static void invoke_recv_initial_metadata_callback(void* arg,
1605                                                   grpc_error* error) {
1606   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1607   // Find pending batch.
1608   pending_batch* pending = pending_batch_find(
1609       batch_data->elem, "invoking recv_initial_metadata_ready for",
1610       [](grpc_transport_stream_op_batch* batch) {
1611         return batch->recv_initial_metadata &&
1612                batch->payload->recv_initial_metadata
1613                        .recv_initial_metadata_ready != nullptr;
1614       });
1615   GPR_ASSERT(pending != nullptr);
1616   // Return metadata.
1617   subchannel_call_retry_state* retry_state =
1618       static_cast<subchannel_call_retry_state*>(
1619           grpc_connected_subchannel_call_get_parent_data(
1620               batch_data->subchannel_call));
1621   grpc_metadata_batch_move(
1622       &retry_state->recv_initial_metadata,
1623       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1624   // Update bookkeeping.
1625   // Note: Need to do this before invoking the callback, since invoking
1626   // the callback will result in yielding the call combiner.
1627   grpc_closure* recv_initial_metadata_ready =
1628       pending->batch->payload->recv_initial_metadata
1629           .recv_initial_metadata_ready;
1630   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1631       nullptr;
1632   maybe_clear_pending_batch(batch_data->elem, pending);
1633   batch_data_unref(batch_data);
1634   // Invoke callback.
1635   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1636 }
1637 
1638 // Intercepts recv_initial_metadata_ready callback for retries.
1639 // Commits the call and returns the initial metadata up the stack.
recv_initial_metadata_ready(void * arg,grpc_error * error)1640 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1641   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1642   grpc_call_element* elem = batch_data->elem;
1643   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1644   call_data* calld = static_cast<call_data*>(elem->call_data);
1645   if (grpc_client_channel_trace.enabled()) {
1646     gpr_log(GPR_INFO,
1647             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1648             chand, calld, grpc_error_string(error));
1649   }
1650   subchannel_call_retry_state* retry_state =
1651       static_cast<subchannel_call_retry_state*>(
1652           grpc_connected_subchannel_call_get_parent_data(
1653               batch_data->subchannel_call));
1654   retry_state->completed_recv_initial_metadata = true;
1655   // If a retry was already dispatched, then we're not going to use the
1656   // result of this recv_initial_metadata op, so do nothing.
1657   if (retry_state->retry_dispatched) {
1658     GRPC_CALL_COMBINER_STOP(
1659         calld->call_combiner,
1660         "recv_initial_metadata_ready after retry dispatched");
1661     return;
1662   }
1663   // If we got an error or a Trailers-Only response and have not yet gotten
1664   // the recv_trailing_metadata_ready callback, then defer propagating this
1665   // callback back to the surface.  We can evaluate whether to retry when
1666   // recv_trailing_metadata comes back.
1667   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
1668                     error != GRPC_ERROR_NONE) &&
1669                    !retry_state->completed_recv_trailing_metadata)) {
1670     if (grpc_client_channel_trace.enabled()) {
1671       gpr_log(GPR_INFO,
1672               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1673               "(Trailers-Only)",
1674               chand, calld);
1675     }
1676     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
1677     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1678     if (!retry_state->started_recv_trailing_metadata) {
1679       // recv_trailing_metadata not yet started by application; start it
1680       // ourselves to get status.
1681       start_internal_recv_trailing_metadata(elem);
1682     } else {
1683       GRPC_CALL_COMBINER_STOP(
1684           calld->call_combiner,
1685           "recv_initial_metadata_ready trailers-only or error");
1686     }
1687     return;
1688   }
1689   // Received valid initial metadata, so commit the call.
1690   retry_commit(elem, retry_state);
1691   // Invoke the callback to return the result to the surface.
1692   // Manually invoking a callback function; it does not take ownership of error.
1693   invoke_recv_initial_metadata_callback(batch_data, error);
1694 }
1695 
1696 //
1697 // recv_message callback handling
1698 //
1699 
1700 // Invokes recv_message_ready for a subchannel batch.
invoke_recv_message_callback(void * arg,grpc_error * error)1701 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1702   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1703   // Find pending op.
1704   pending_batch* pending = pending_batch_find(
1705       batch_data->elem, "invoking recv_message_ready for",
1706       [](grpc_transport_stream_op_batch* batch) {
1707         return batch->recv_message &&
1708                batch->payload->recv_message.recv_message_ready != nullptr;
1709       });
1710   GPR_ASSERT(pending != nullptr);
1711   // Return payload.
1712   subchannel_call_retry_state* retry_state =
1713       static_cast<subchannel_call_retry_state*>(
1714           grpc_connected_subchannel_call_get_parent_data(
1715               batch_data->subchannel_call));
1716   *pending->batch->payload->recv_message.recv_message =
1717       std::move(retry_state->recv_message);
1718   // Update bookkeeping.
1719   // Note: Need to do this before invoking the callback, since invoking
1720   // the callback will result in yielding the call combiner.
1721   grpc_closure* recv_message_ready =
1722       pending->batch->payload->recv_message.recv_message_ready;
1723   pending->batch->payload->recv_message.recv_message_ready = nullptr;
1724   maybe_clear_pending_batch(batch_data->elem, pending);
1725   batch_data_unref(batch_data);
1726   // Invoke callback.
1727   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1728 }
1729 
1730 // Intercepts recv_message_ready callback for retries.
1731 // Commits the call and returns the message up the stack.
recv_message_ready(void * arg,grpc_error * error)1732 static void recv_message_ready(void* arg, grpc_error* error) {
1733   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1734   grpc_call_element* elem = batch_data->elem;
1735   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1736   call_data* calld = static_cast<call_data*>(elem->call_data);
1737   if (grpc_client_channel_trace.enabled()) {
1738     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1739             chand, calld, grpc_error_string(error));
1740   }
1741   subchannel_call_retry_state* retry_state =
1742       static_cast<subchannel_call_retry_state*>(
1743           grpc_connected_subchannel_call_get_parent_data(
1744               batch_data->subchannel_call));
1745   ++retry_state->completed_recv_message_count;
1746   // If a retry was already dispatched, then we're not going to use the
1747   // result of this recv_message op, so do nothing.
1748   if (retry_state->retry_dispatched) {
1749     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1750                             "recv_message_ready after retry dispatched");
1751     return;
1752   }
1753   // If we got an error or the payload was nullptr and we have not yet gotten
1754   // the recv_trailing_metadata_ready callback, then defer propagating this
1755   // callback back to the surface.  We can evaluate whether to retry when
1756   // recv_trailing_metadata comes back.
1757   if (GPR_UNLIKELY(
1758           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1759           !retry_state->completed_recv_trailing_metadata)) {
1760     if (grpc_client_channel_trace.enabled()) {
1761       gpr_log(GPR_INFO,
1762               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1763               "message and recv_trailing_metadata pending)",
1764               chand, calld);
1765     }
1766     retry_state->recv_message_ready_deferred_batch = batch_data;
1767     retry_state->recv_message_error = GRPC_ERROR_REF(error);
1768     if (!retry_state->started_recv_trailing_metadata) {
1769       // recv_trailing_metadata not yet started by application; start it
1770       // ourselves to get status.
1771       start_internal_recv_trailing_metadata(elem);
1772     } else {
1773       GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1774     }
1775     return;
1776   }
1777   // Received a valid message, so commit the call.
1778   retry_commit(elem, retry_state);
1779   // Invoke the callback to return the result to the surface.
1780   // Manually invoking a callback function; it does not take ownership of error.
1781   invoke_recv_message_callback(batch_data, error);
1782 }
1783 
1784 //
1785 // recv_trailing_metadata handling
1786 //
1787 
1788 // Sets *status and *server_pushback_md based on md_batch and error.
1789 // Only sets *server_pushback_md if server_pushback_md != nullptr.
get_call_status(grpc_call_element * elem,grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)1790 static void get_call_status(grpc_call_element* elem,
1791                             grpc_metadata_batch* md_batch, grpc_error* error,
1792                             grpc_status_code* status,
1793                             grpc_mdelem** server_pushback_md) {
1794   call_data* calld = static_cast<call_data*>(elem->call_data);
1795   if (error != GRPC_ERROR_NONE) {
1796     grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1797                           nullptr);
1798   } else {
1799     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1800     *status =
1801         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1802     if (server_pushback_md != nullptr &&
1803         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1804       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1805     }
1806   }
1807   GRPC_ERROR_UNREF(error);
1808 }
1809 
1810 // Adds recv_trailing_metadata_ready closure to closures.
add_closure_for_recv_trailing_metadata_ready(grpc_call_element * elem,subchannel_batch_data * batch_data,grpc_error * error,grpc_core::CallCombinerClosureList * closures)1811 static void add_closure_for_recv_trailing_metadata_ready(
1812     grpc_call_element* elem, subchannel_batch_data* batch_data,
1813     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1814   // Find pending batch.
1815   pending_batch* pending = pending_batch_find(
1816       elem, "invoking recv_trailing_metadata for",
1817       [](grpc_transport_stream_op_batch* batch) {
1818         return batch->recv_trailing_metadata &&
1819                batch->payload->recv_trailing_metadata
1820                        .recv_trailing_metadata_ready != nullptr;
1821       });
1822   // If we generated the recv_trailing_metadata op internally via
1823   // start_internal_recv_trailing_metadata(), then there will be no
1824   // pending batch.
1825   if (pending == nullptr) {
1826     GRPC_ERROR_UNREF(error);
1827     return;
1828   }
1829   // Return metadata.
1830   subchannel_call_retry_state* retry_state =
1831       static_cast<subchannel_call_retry_state*>(
1832           grpc_connected_subchannel_call_get_parent_data(
1833               batch_data->subchannel_call));
1834   grpc_metadata_batch_move(
1835       &retry_state->recv_trailing_metadata,
1836       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1837   // Add closure.
1838   closures->Add(pending->batch->payload->recv_trailing_metadata
1839                     .recv_trailing_metadata_ready,
1840                 error, "recv_trailing_metadata_ready for pending batch");
1841   // Update bookkeeping.
1842   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1843       nullptr;
1844   maybe_clear_pending_batch(elem, pending);
1845 }
1846 
1847 // Adds any necessary closures for deferred recv_initial_metadata and
1848 // recv_message callbacks to closures.
add_closures_for_deferred_recv_callbacks(subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state,grpc_core::CallCombinerClosureList * closures)1849 static void add_closures_for_deferred_recv_callbacks(
1850     subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1851     grpc_core::CallCombinerClosureList* closures) {
1852   if (batch_data->batch.recv_trailing_metadata) {
1853     // Add closure for deferred recv_initial_metadata_ready.
1854     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1855                      nullptr)) {
1856       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1857                         invoke_recv_initial_metadata_callback,
1858                         retry_state->recv_initial_metadata_ready_deferred_batch,
1859                         grpc_schedule_on_exec_ctx);
1860       closures->Add(&retry_state->recv_initial_metadata_ready,
1861                     retry_state->recv_initial_metadata_error,
1862                     "resuming recv_initial_metadata_ready");
1863       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1864     }
1865     // Add closure for deferred recv_message_ready.
1866     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1867                      nullptr)) {
1868       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
1869                         invoke_recv_message_callback,
1870                         retry_state->recv_message_ready_deferred_batch,
1871                         grpc_schedule_on_exec_ctx);
1872       closures->Add(&retry_state->recv_message_ready,
1873                     retry_state->recv_message_error,
1874                     "resuming recv_message_ready");
1875       retry_state->recv_message_ready_deferred_batch = nullptr;
1876     }
1877   }
1878 }
1879 
1880 // Returns true if any op in the batch was not yet started.
1881 // Only looks at send ops, since recv ops are always started immediately.
pending_batch_is_unstarted(pending_batch * pending,call_data * calld,subchannel_call_retry_state * retry_state)1882 static bool pending_batch_is_unstarted(
1883     pending_batch* pending, call_data* calld,
1884     subchannel_call_retry_state* retry_state) {
1885   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1886     return false;
1887   }
1888   if (pending->batch->send_initial_metadata &&
1889       !retry_state->started_send_initial_metadata) {
1890     return true;
1891   }
1892   if (pending->batch->send_message &&
1893       retry_state->started_send_message_count < calld->send_messages->size()) {
1894     return true;
1895   }
1896   if (pending->batch->send_trailing_metadata &&
1897       !retry_state->started_send_trailing_metadata) {
1898     return true;
1899   }
1900   return false;
1901 }
1902 
1903 // For any pending batch containing an op that has not yet been started,
1904 // adds the pending batch's completion closures to closures.
add_closures_to_fail_unstarted_pending_batches(grpc_call_element * elem,subchannel_call_retry_state * retry_state,grpc_error * error,grpc_core::CallCombinerClosureList * closures)1905 static void add_closures_to_fail_unstarted_pending_batches(
1906     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1907     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1908   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1909   call_data* calld = static_cast<call_data*>(elem->call_data);
1910   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1911     pending_batch* pending = &calld->pending_batches[i];
1912     if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1913       if (grpc_client_channel_trace.enabled()) {
1914         gpr_log(GPR_INFO,
1915                 "chand=%p calld=%p: failing unstarted pending batch at index "
1916                 "%" PRIuPTR,
1917                 chand, calld, i);
1918       }
1919       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1920                     "failing on_complete for pending batch");
1921       pending->batch->on_complete = nullptr;
1922       maybe_clear_pending_batch(elem, pending);
1923     }
1924   }
1925   GRPC_ERROR_UNREF(error);
1926 }
1927 
1928 // Runs necessary closures upon completion of a call attempt.
run_closures_for_completed_call(subchannel_batch_data * batch_data,grpc_error * error)1929 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1930                                             grpc_error* error) {
1931   grpc_call_element* elem = batch_data->elem;
1932   call_data* calld = static_cast<call_data*>(elem->call_data);
1933   subchannel_call_retry_state* retry_state =
1934       static_cast<subchannel_call_retry_state*>(
1935           grpc_connected_subchannel_call_get_parent_data(
1936               batch_data->subchannel_call));
1937   // Construct list of closures to execute.
1938   grpc_core::CallCombinerClosureList closures;
1939   // First, add closure for recv_trailing_metadata_ready.
1940   add_closure_for_recv_trailing_metadata_ready(
1941       elem, batch_data, GRPC_ERROR_REF(error), &closures);
1942   // If there are deferred recv_initial_metadata_ready or recv_message_ready
1943   // callbacks, add them to closures.
1944   add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1945   // Add closures to fail any pending batches that have not yet been started.
1946   add_closures_to_fail_unstarted_pending_batches(
1947       elem, retry_state, GRPC_ERROR_REF(error), &closures);
1948   // Don't need batch_data anymore.
1949   batch_data_unref(batch_data);
1950   // Schedule all of the closures identified above.
1951   // Note: This will release the call combiner.
1952   closures.RunClosures(calld->call_combiner);
1953   GRPC_ERROR_UNREF(error);
1954 }
1955 
1956 // Intercepts recv_trailing_metadata_ready callback for retries.
1957 // Commits the call and returns the trailing metadata up the stack.
recv_trailing_metadata_ready(void * arg,grpc_error * error)1958 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1959   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1960   grpc_call_element* elem = batch_data->elem;
1961   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1962   call_data* calld = static_cast<call_data*>(elem->call_data);
1963   if (grpc_client_channel_trace.enabled()) {
1964     gpr_log(GPR_INFO,
1965             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1966             chand, calld, grpc_error_string(error));
1967   }
1968   subchannel_call_retry_state* retry_state =
1969       static_cast<subchannel_call_retry_state*>(
1970           grpc_connected_subchannel_call_get_parent_data(
1971               batch_data->subchannel_call));
1972   retry_state->completed_recv_trailing_metadata = true;
1973   // Get the call's status and check for server pushback metadata.
1974   grpc_status_code status = GRPC_STATUS_OK;
1975   grpc_mdelem* server_pushback_md = nullptr;
1976   grpc_metadata_batch* md_batch =
1977       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
1978   get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
1979                   &server_pushback_md);
1980   grpc_core::channelz::SubchannelNode* channelz_subchannel =
1981       calld->pick.connected_subchannel->channelz_subchannel();
1982   if (channelz_subchannel != nullptr) {
1983     if (status == GRPC_STATUS_OK) {
1984       channelz_subchannel->RecordCallSucceeded();
1985     } else {
1986       channelz_subchannel->RecordCallFailed();
1987     }
1988   }
1989   if (grpc_client_channel_trace.enabled()) {
1990     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1991             calld, grpc_status_code_to_string(status));
1992   }
1993   // Check if we should retry.
1994   if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1995     // Unref batch_data for deferred recv_initial_metadata_ready or
1996     // recv_message_ready callbacks, if any.
1997     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1998       batch_data_unref(batch_data);
1999       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2000     }
2001     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2002       batch_data_unref(batch_data);
2003       GRPC_ERROR_UNREF(retry_state->recv_message_error);
2004     }
2005     batch_data_unref(batch_data);
2006     return;
2007   }
2008   // Not retrying, so commit the call.
2009   retry_commit(elem, retry_state);
2010   // Run any necessary closures.
2011   run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
2012 }
2013 
2014 //
2015 // on_complete callback handling
2016 //
2017 
2018 // Adds the on_complete closure for the pending batch completed in
2019 // batch_data to closures.
add_closure_for_completed_pending_batch(grpc_call_element * elem,subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state,grpc_error * error,grpc_core::CallCombinerClosureList * closures)2020 static void add_closure_for_completed_pending_batch(
2021     grpc_call_element* elem, subchannel_batch_data* batch_data,
2022     subchannel_call_retry_state* retry_state, grpc_error* error,
2023     grpc_core::CallCombinerClosureList* closures) {
2024   pending_batch* pending = pending_batch_find(
2025       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2026         // Match the pending batch with the same set of send ops as the
2027         // subchannel batch we've just completed.
2028         return batch->on_complete != nullptr &&
2029                batch_data->batch.send_initial_metadata ==
2030                    batch->send_initial_metadata &&
2031                batch_data->batch.send_message == batch->send_message &&
2032                batch_data->batch.send_trailing_metadata ==
2033                    batch->send_trailing_metadata;
2034       });
2035   // If batch_data is a replay batch, then there will be no pending
2036   // batch to complete.
2037   if (pending == nullptr) {
2038     GRPC_ERROR_UNREF(error);
2039     return;
2040   }
2041   // Add closure.
2042   closures->Add(pending->batch->on_complete, error,
2043                 "on_complete for pending batch");
2044   pending->batch->on_complete = nullptr;
2045   maybe_clear_pending_batch(elem, pending);
2046 }
2047 
2048 // If there are any cached ops to replay or pending ops to start on the
2049 // subchannel call, adds a closure to closures to invoke
2050 // start_retriable_subchannel_batches().
add_closures_for_replay_or_pending_send_ops(grpc_call_element * elem,subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state,grpc_core::CallCombinerClosureList * closures)2051 static void add_closures_for_replay_or_pending_send_ops(
2052     grpc_call_element* elem, subchannel_batch_data* batch_data,
2053     subchannel_call_retry_state* retry_state,
2054     grpc_core::CallCombinerClosureList* closures) {
2055   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2056   call_data* calld = static_cast<call_data*>(elem->call_data);
2057   bool have_pending_send_message_ops =
2058       retry_state->started_send_message_count < calld->send_messages->size();
2059   bool have_pending_send_trailing_metadata_op =
2060       calld->seen_send_trailing_metadata &&
2061       !retry_state->started_send_trailing_metadata;
2062   if (!have_pending_send_message_ops &&
2063       !have_pending_send_trailing_metadata_op) {
2064     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2065       pending_batch* pending = &calld->pending_batches[i];
2066       grpc_transport_stream_op_batch* batch = pending->batch;
2067       if (batch == nullptr || pending->send_ops_cached) continue;
2068       if (batch->send_message) have_pending_send_message_ops = true;
2069       if (batch->send_trailing_metadata) {
2070         have_pending_send_trailing_metadata_op = true;
2071       }
2072     }
2073   }
2074   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2075     if (grpc_client_channel_trace.enabled()) {
2076       gpr_log(GPR_INFO,
2077               "chand=%p calld=%p: starting next batch for pending send op(s)",
2078               chand, calld);
2079     }
2080     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2081                       start_retriable_subchannel_batches, elem,
2082                       grpc_schedule_on_exec_ctx);
2083     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2084                   "starting next batch for send_* op(s)");
2085   }
2086 }
2087 
2088 // Callback used to intercept on_complete from subchannel calls.
2089 // Called only when retries are enabled.
on_complete(void * arg,grpc_error * error)2090 static void on_complete(void* arg, grpc_error* error) {
2091   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
2092   grpc_call_element* elem = batch_data->elem;
2093   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2094   call_data* calld = static_cast<call_data*>(elem->call_data);
2095   if (grpc_client_channel_trace.enabled()) {
2096     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2097     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2098             chand, calld, grpc_error_string(error), batch_str);
2099     gpr_free(batch_str);
2100   }
2101   subchannel_call_retry_state* retry_state =
2102       static_cast<subchannel_call_retry_state*>(
2103           grpc_connected_subchannel_call_get_parent_data(
2104               batch_data->subchannel_call));
2105   // Update bookkeeping in retry_state.
2106   if (batch_data->batch.send_initial_metadata) {
2107     retry_state->completed_send_initial_metadata = true;
2108   }
2109   if (batch_data->batch.send_message) {
2110     ++retry_state->completed_send_message_count;
2111   }
2112   if (batch_data->batch.send_trailing_metadata) {
2113     retry_state->completed_send_trailing_metadata = true;
2114   }
2115   // If the call is committed, free cached data for send ops that we've just
2116   // completed.
2117   if (calld->retry_committed) {
2118     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2119   }
2120   // Construct list of closures to execute.
2121   grpc_core::CallCombinerClosureList closures;
2122   // If a retry was already dispatched, that means we saw
2123   // recv_trailing_metadata before this, so we do nothing here.
2124   // Otherwise, invoke the callback to return the result to the surface.
2125   if (!retry_state->retry_dispatched) {
2126     // Add closure for the completed pending batch, if any.
2127     add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
2128                                             GRPC_ERROR_REF(error), &closures);
2129     // If needed, add a callback to start any replay or pending send ops on
2130     // the subchannel call.
2131     if (!retry_state->completed_recv_trailing_metadata) {
2132       add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2133                                                   &closures);
2134     }
2135   }
2136   // Track number of pending subchannel send batches and determine if this
2137   // was the last one.
2138   --calld->num_pending_retriable_subchannel_send_batches;
2139   const bool last_send_batch_complete =
2140       calld->num_pending_retriable_subchannel_send_batches == 0;
2141   // Don't need batch_data anymore.
2142   batch_data_unref(batch_data);
2143   // Schedule all of the closures identified above.
2144   // Note: This yeilds the call combiner.
2145   closures.RunClosures(calld->call_combiner);
2146   // If this was the last subchannel send batch, unref the call stack.
2147   if (last_send_batch_complete) {
2148     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
2149   }
2150 }
2151 
2152 //
2153 // subchannel batch construction
2154 //
2155 
2156 // Helper function used to start a subchannel batch in the call combiner.
start_batch_in_call_combiner(void * arg,grpc_error * ignored)2157 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2158   grpc_transport_stream_op_batch* batch =
2159       static_cast<grpc_transport_stream_op_batch*>(arg);
2160   grpc_subchannel_call* subchannel_call =
2161       static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2162   // Note: This will release the call combiner.
2163   grpc_subchannel_call_process_op(subchannel_call, batch);
2164 }
2165 
2166 // Adds a closure to closures that will execute batch in the call combiner.
add_closure_for_subchannel_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch,grpc_core::CallCombinerClosureList * closures)2167 static void add_closure_for_subchannel_batch(
2168     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2169     grpc_core::CallCombinerClosureList* closures) {
2170   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2171   call_data* calld = static_cast<call_data*>(elem->call_data);
2172   batch->handler_private.extra_arg = calld->subchannel_call;
2173   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2174                     start_batch_in_call_combiner, batch,
2175                     grpc_schedule_on_exec_ctx);
2176   if (grpc_client_channel_trace.enabled()) {
2177     char* batch_str = grpc_transport_stream_op_batch_string(batch);
2178     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2179             calld, batch_str);
2180     gpr_free(batch_str);
2181   }
2182   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2183                 "start_subchannel_batch");
2184 }
2185 
2186 // Adds retriable send_initial_metadata op to batch_data.
add_retriable_send_initial_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2187 static void add_retriable_send_initial_metadata_op(
2188     call_data* calld, subchannel_call_retry_state* retry_state,
2189     subchannel_batch_data* batch_data) {
2190   // Maps the number of retries to the corresponding metadata value slice.
2191   static const grpc_slice* retry_count_strings[] = {
2192       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2193   // We need to make a copy of the metadata batch for each attempt, since
2194   // the filters in the subchannel stack may modify this batch, and we don't
2195   // want those modifications to be passed forward to subsequent attempts.
2196   //
2197   // If we've already completed one or more attempts, add the
2198   // grpc-retry-attempts header.
2199   retry_state->send_initial_metadata_storage =
2200       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2201           calld->arena, sizeof(grpc_linked_mdelem) *
2202                             (calld->send_initial_metadata.list.count +
2203                              (calld->num_attempts_completed > 0))));
2204   grpc_metadata_batch_copy(&calld->send_initial_metadata,
2205                            &retry_state->send_initial_metadata,
2206                            retry_state->send_initial_metadata_storage);
2207   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2208                        .grpc_previous_rpc_attempts != nullptr)) {
2209     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2210                                retry_state->send_initial_metadata.idx.named
2211                                    .grpc_previous_rpc_attempts);
2212   }
2213   if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
2214     grpc_mdelem retry_md = grpc_mdelem_from_slices(
2215         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2216         *retry_count_strings[calld->num_attempts_completed - 1]);
2217     grpc_error* error = grpc_metadata_batch_add_tail(
2218         &retry_state->send_initial_metadata,
2219         &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
2220                                                         .list.count],
2221         retry_md);
2222     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2223       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2224               grpc_error_string(error));
2225       GPR_ASSERT(false);
2226     }
2227   }
2228   retry_state->started_send_initial_metadata = true;
2229   batch_data->batch.send_initial_metadata = true;
2230   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2231       &retry_state->send_initial_metadata;
2232   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2233       calld->send_initial_metadata_flags;
2234   batch_data->batch.payload->send_initial_metadata.peer_string =
2235       calld->peer_string;
2236 }
2237 
2238 // Adds retriable send_message op to batch_data.
add_retriable_send_message_op(grpc_call_element * elem,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2239 static void add_retriable_send_message_op(
2240     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2241     subchannel_batch_data* batch_data) {
2242   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2243   call_data* calld = static_cast<call_data*>(elem->call_data);
2244   if (grpc_client_channel_trace.enabled()) {
2245     gpr_log(GPR_INFO,
2246             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2247             chand, calld, retry_state->started_send_message_count);
2248   }
2249   grpc_core::ByteStreamCache* cache =
2250       (*calld->send_messages)[retry_state->started_send_message_count];
2251   ++retry_state->started_send_message_count;
2252   retry_state->send_message.Init(cache);
2253   batch_data->batch.send_message = true;
2254   batch_data->batch.payload->send_message.send_message.reset(
2255       retry_state->send_message.get());
2256 }
2257 
2258 // Adds retriable send_trailing_metadata op to batch_data.
add_retriable_send_trailing_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2259 static void add_retriable_send_trailing_metadata_op(
2260     call_data* calld, subchannel_call_retry_state* retry_state,
2261     subchannel_batch_data* batch_data) {
2262   // We need to make a copy of the metadata batch for each attempt, since
2263   // the filters in the subchannel stack may modify this batch, and we don't
2264   // want those modifications to be passed forward to subsequent attempts.
2265   retry_state->send_trailing_metadata_storage =
2266       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2267           calld->arena, sizeof(grpc_linked_mdelem) *
2268                             calld->send_trailing_metadata.list.count));
2269   grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2270                            &retry_state->send_trailing_metadata,
2271                            retry_state->send_trailing_metadata_storage);
2272   retry_state->started_send_trailing_metadata = true;
2273   batch_data->batch.send_trailing_metadata = true;
2274   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2275       &retry_state->send_trailing_metadata;
2276 }
2277 
2278 // Adds retriable recv_initial_metadata op to batch_data.
add_retriable_recv_initial_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2279 static void add_retriable_recv_initial_metadata_op(
2280     call_data* calld, subchannel_call_retry_state* retry_state,
2281     subchannel_batch_data* batch_data) {
2282   retry_state->started_recv_initial_metadata = true;
2283   batch_data->batch.recv_initial_metadata = true;
2284   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2285   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2286       &retry_state->recv_initial_metadata;
2287   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2288       &retry_state->trailing_metadata_available;
2289   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2290                     recv_initial_metadata_ready, batch_data,
2291                     grpc_schedule_on_exec_ctx);
2292   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2293       &retry_state->recv_initial_metadata_ready;
2294 }
2295 
2296 // Adds retriable recv_message op to batch_data.
add_retriable_recv_message_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2297 static void add_retriable_recv_message_op(
2298     call_data* calld, subchannel_call_retry_state* retry_state,
2299     subchannel_batch_data* batch_data) {
2300   ++retry_state->started_recv_message_count;
2301   batch_data->batch.recv_message = true;
2302   batch_data->batch.payload->recv_message.recv_message =
2303       &retry_state->recv_message;
2304   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
2305                     batch_data, grpc_schedule_on_exec_ctx);
2306   batch_data->batch.payload->recv_message.recv_message_ready =
2307       &retry_state->recv_message_ready;
2308 }
2309 
2310 // Adds retriable recv_trailing_metadata op to batch_data.
add_retriable_recv_trailing_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2311 static void add_retriable_recv_trailing_metadata_op(
2312     call_data* calld, subchannel_call_retry_state* retry_state,
2313     subchannel_batch_data* batch_data) {
2314   retry_state->started_recv_trailing_metadata = true;
2315   batch_data->batch.recv_trailing_metadata = true;
2316   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
2317   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2318       &retry_state->recv_trailing_metadata;
2319   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
2320       &retry_state->collect_stats;
2321   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
2322                     recv_trailing_metadata_ready, batch_data,
2323                     grpc_schedule_on_exec_ctx);
2324   batch_data->batch.payload->recv_trailing_metadata
2325       .recv_trailing_metadata_ready =
2326       &retry_state->recv_trailing_metadata_ready;
2327 }
2328 
2329 // Helper function used to start a recv_trailing_metadata batch.  This
2330 // is used in the case where a recv_initial_metadata or recv_message
2331 // op fails in a way that we know the call is over but when the application
2332 // has not yet started its own recv_trailing_metadata op.
start_internal_recv_trailing_metadata(grpc_call_element * elem)2333 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2334   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2335   call_data* calld = static_cast<call_data*>(elem->call_data);
2336   if (grpc_client_channel_trace.enabled()) {
2337     gpr_log(GPR_INFO,
2338             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2339             "started; starting it internally",
2340             chand, calld);
2341   }
2342   subchannel_call_retry_state* retry_state =
2343       static_cast<subchannel_call_retry_state*>(
2344           grpc_connected_subchannel_call_get_parent_data(
2345               calld->subchannel_call));
2346   // Create batch_data with 2 refs, since this batch will be unreffed twice:
2347   // once for the recv_trailing_metadata_ready callback when the subchannel
2348   // batch returns, and again when we actually get a recv_trailing_metadata
2349   // op from the surface.
2350   subchannel_batch_data* batch_data =
2351       batch_data_create(elem, 2, false /* set_on_complete */);
2352   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2353   retry_state->recv_trailing_metadata_internal_batch = batch_data;
2354   // Note: This will release the call combiner.
2355   grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2356 }
2357 
2358 // If there are any cached send ops that need to be replayed on the
2359 // current subchannel call, creates and returns a new subchannel batch
2360 // to replay those ops.  Otherwise, returns nullptr.
maybe_create_subchannel_batch_for_replay(grpc_call_element * elem,subchannel_call_retry_state * retry_state)2361 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2362     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2363   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2364   call_data* calld = static_cast<call_data*>(elem->call_data);
2365   subchannel_batch_data* replay_batch_data = nullptr;
2366   // send_initial_metadata.
2367   if (calld->seen_send_initial_metadata &&
2368       !retry_state->started_send_initial_metadata &&
2369       !calld->pending_send_initial_metadata) {
2370     if (grpc_client_channel_trace.enabled()) {
2371       gpr_log(GPR_INFO,
2372               "chand=%p calld=%p: replaying previously completed "
2373               "send_initial_metadata op",
2374               chand, calld);
2375     }
2376     replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
2377     add_retriable_send_initial_metadata_op(calld, retry_state,
2378                                            replay_batch_data);
2379   }
2380   // send_message.
2381   // Note that we can only have one send_message op in flight at a time.
2382   if (retry_state->started_send_message_count < calld->send_messages->size() &&
2383       retry_state->started_send_message_count ==
2384           retry_state->completed_send_message_count &&
2385       !calld->pending_send_message) {
2386     if (grpc_client_channel_trace.enabled()) {
2387       gpr_log(GPR_INFO,
2388               "chand=%p calld=%p: replaying previously completed "
2389               "send_message op",
2390               chand, calld);
2391     }
2392     if (replay_batch_data == nullptr) {
2393       replay_batch_data =
2394           batch_data_create(elem, 1, true /* set_on_complete */);
2395     }
2396     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2397   }
2398   // send_trailing_metadata.
2399   // Note that we only add this op if we have no more send_message ops
2400   // to start, since we can't send down any more send_message ops after
2401   // send_trailing_metadata.
2402   if (calld->seen_send_trailing_metadata &&
2403       retry_state->started_send_message_count == calld->send_messages->size() &&
2404       !retry_state->started_send_trailing_metadata &&
2405       !calld->pending_send_trailing_metadata) {
2406     if (grpc_client_channel_trace.enabled()) {
2407       gpr_log(GPR_INFO,
2408               "chand=%p calld=%p: replaying previously completed "
2409               "send_trailing_metadata op",
2410               chand, calld);
2411     }
2412     if (replay_batch_data == nullptr) {
2413       replay_batch_data =
2414           batch_data_create(elem, 1, true /* set_on_complete */);
2415     }
2416     add_retriable_send_trailing_metadata_op(calld, retry_state,
2417                                             replay_batch_data);
2418   }
2419   return replay_batch_data;
2420 }
2421 
2422 // Adds subchannel batches for pending batches to batches, updating
2423 // *num_batches as needed.
add_subchannel_batches_for_pending_batches(grpc_call_element * elem,subchannel_call_retry_state * retry_state,grpc_core::CallCombinerClosureList * closures)2424 static void add_subchannel_batches_for_pending_batches(
2425     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2426     grpc_core::CallCombinerClosureList* closures) {
2427   call_data* calld = static_cast<call_data*>(elem->call_data);
2428   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2429     pending_batch* pending = &calld->pending_batches[i];
2430     grpc_transport_stream_op_batch* batch = pending->batch;
2431     if (batch == nullptr) continue;
2432     // Skip any batch that either (a) has already been started on this
2433     // subchannel call or (b) we can't start yet because we're still
2434     // replaying send ops that need to be completed first.
2435     // TODO(roth): Note that if any one op in the batch can't be sent
2436     // yet due to ops that we're replaying, we don't start any of the ops
2437     // in the batch.  This is probably okay, but it could conceivably
2438     // lead to increased latency in some cases -- e.g., we could delay
2439     // starting a recv op due to it being in the same batch with a send
2440     // op.  If/when we revamp the callback protocol in
2441     // transport_stream_op_batch, we may be able to fix this.
2442     if (batch->send_initial_metadata &&
2443         retry_state->started_send_initial_metadata) {
2444       continue;
2445     }
2446     if (batch->send_message && retry_state->completed_send_message_count <
2447                                    retry_state->started_send_message_count) {
2448       continue;
2449     }
2450     // Note that we only start send_trailing_metadata if we have no more
2451     // send_message ops to start, since we can't send down any more
2452     // send_message ops after send_trailing_metadata.
2453     if (batch->send_trailing_metadata &&
2454         (retry_state->started_send_message_count + batch->send_message <
2455              calld->send_messages->size() ||
2456          retry_state->started_send_trailing_metadata)) {
2457       continue;
2458     }
2459     if (batch->recv_initial_metadata &&
2460         retry_state->started_recv_initial_metadata) {
2461       continue;
2462     }
2463     if (batch->recv_message && retry_state->completed_recv_message_count <
2464                                    retry_state->started_recv_message_count) {
2465       continue;
2466     }
2467     if (batch->recv_trailing_metadata &&
2468         retry_state->started_recv_trailing_metadata) {
2469       // If we previously completed a recv_trailing_metadata op
2470       // initiated by start_internal_recv_trailing_metadata(), use the
2471       // result of that instead of trying to re-start this op.
2472       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2473                         nullptr))) {
2474         // If the batch completed, then trigger the completion callback
2475         // directly, so that we return the previously returned results to
2476         // the application.  Otherwise, just unref the internally
2477         // started subchannel batch, since we'll propagate the
2478         // completion when it completes.
2479         if (retry_state->completed_recv_trailing_metadata) {
2480           // Batches containing recv_trailing_metadata always succeed.
2481           closures->Add(
2482               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
2483               "re-executing recv_trailing_metadata_ready to propagate "
2484               "internally triggered result");
2485         } else {
2486           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2487         }
2488         retry_state->recv_trailing_metadata_internal_batch = nullptr;
2489       }
2490       continue;
2491     }
2492     // If we're not retrying, just send the batch as-is.
2493     if (calld->method_params == nullptr ||
2494         calld->method_params->retry_policy() == nullptr ||
2495         calld->retry_committed) {
2496       add_closure_for_subchannel_batch(elem, batch, closures);
2497       pending_batch_clear(calld, pending);
2498       continue;
2499     }
2500     // Create batch with the right number of callbacks.
2501     const bool has_send_ops = batch->send_initial_metadata ||
2502                               batch->send_message ||
2503                               batch->send_trailing_metadata;
2504     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2505                               batch->recv_message +
2506                               batch->recv_trailing_metadata;
2507     subchannel_batch_data* batch_data = batch_data_create(
2508         elem, num_callbacks, has_send_ops /* set_on_complete */);
2509     // Cache send ops if needed.
2510     maybe_cache_send_ops_for_batch(calld, pending);
2511     // send_initial_metadata.
2512     if (batch->send_initial_metadata) {
2513       add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2514     }
2515     // send_message.
2516     if (batch->send_message) {
2517       add_retriable_send_message_op(elem, retry_state, batch_data);
2518     }
2519     // send_trailing_metadata.
2520     if (batch->send_trailing_metadata) {
2521       add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2522     }
2523     // recv_initial_metadata.
2524     if (batch->recv_initial_metadata) {
2525       // recv_flags is only used on the server side.
2526       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2527       add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2528     }
2529     // recv_message.
2530     if (batch->recv_message) {
2531       add_retriable_recv_message_op(calld, retry_state, batch_data);
2532     }
2533     // recv_trailing_metadata.
2534     if (batch->recv_trailing_metadata) {
2535       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2536     }
2537     add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
2538     // Track number of pending subchannel send batches.
2539     // If this is the first one, take a ref to the call stack.
2540     if (batch->send_initial_metadata || batch->send_message ||
2541         batch->send_trailing_metadata) {
2542       if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2543         GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2544       }
2545       ++calld->num_pending_retriable_subchannel_send_batches;
2546     }
2547   }
2548 }
2549 
2550 // Constructs and starts whatever subchannel batches are needed on the
2551 // subchannel call.
start_retriable_subchannel_batches(void * arg,grpc_error * ignored)2552 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2553   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2554   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2555   call_data* calld = static_cast<call_data*>(elem->call_data);
2556   if (grpc_client_channel_trace.enabled()) {
2557     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
2558             chand, calld);
2559   }
2560   subchannel_call_retry_state* retry_state =
2561       static_cast<subchannel_call_retry_state*>(
2562           grpc_connected_subchannel_call_get_parent_data(
2563               calld->subchannel_call));
2564   // Construct list of closures to execute, one for each pending batch.
2565   grpc_core::CallCombinerClosureList closures;
2566   // Replay previously-returned send_* ops if needed.
2567   subchannel_batch_data* replay_batch_data =
2568       maybe_create_subchannel_batch_for_replay(elem, retry_state);
2569   if (replay_batch_data != nullptr) {
2570     add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2571                                      &closures);
2572     // Track number of pending subchannel send batches.
2573     // If this is the first one, take a ref to the call stack.
2574     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2575       GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2576     }
2577     ++calld->num_pending_retriable_subchannel_send_batches;
2578   }
2579   // Now add pending batches.
2580   add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
2581   // Start batches on subchannel call.
2582   if (grpc_client_channel_trace.enabled()) {
2583     gpr_log(GPR_INFO,
2584             "chand=%p calld=%p: starting %" PRIuPTR
2585             " retriable batches on subchannel_call=%p",
2586             chand, calld, closures.size(), calld->subchannel_call);
2587   }
2588   // Note: This will yield the call combiner.
2589   closures.RunClosures(calld->call_combiner);
2590 }
2591 
2592 //
2593 // Channelz
2594 //
2595 
recv_trailing_metadata_ready_channelz(void * arg,grpc_error * error)2596 static void recv_trailing_metadata_ready_channelz(void* arg,
2597                                                   grpc_error* error) {
2598   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2599   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2600   call_data* calld = static_cast<call_data*>(elem->call_data);
2601   if (grpc_client_channel_trace.enabled()) {
2602     gpr_log(GPR_INFO,
2603             "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
2604             "error=%s",
2605             chand, calld, grpc_error_string(error));
2606   }
2607   GPR_ASSERT(calld->recv_trailing_metadata != nullptr);
2608   grpc_status_code status = GRPC_STATUS_OK;
2609   grpc_metadata_batch* md_batch = calld->recv_trailing_metadata;
2610   get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
2611   grpc_core::channelz::SubchannelNode* channelz_subchannel =
2612       calld->pick.connected_subchannel->channelz_subchannel();
2613   GPR_ASSERT(channelz_subchannel != nullptr);
2614   if (status == GRPC_STATUS_OK) {
2615     channelz_subchannel->RecordCallSucceeded();
2616   } else {
2617     channelz_subchannel->RecordCallFailed();
2618   }
2619   calld->recv_trailing_metadata = nullptr;
2620   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
2621 }
2622 
2623 // If channelz is enabled, intercept recv_trailing so that we may check the
2624 // status and associate it to a subchannel.
2625 // Returns true if callback was intercepted, false otherwise.
maybe_intercept_recv_trailing_metadata_for_channelz(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2626 static void maybe_intercept_recv_trailing_metadata_for_channelz(
2627     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2628   call_data* calld = static_cast<call_data*>(elem->call_data);
2629   // only intercept payloads with recv trailing.
2630   if (!batch->recv_trailing_metadata) {
2631     return;
2632   }
2633   // only add interceptor is channelz is enabled.
2634   if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
2635     return;
2636   }
2637   if (grpc_client_channel_trace.enabled()) {
2638     gpr_log(GPR_INFO,
2639             "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
2640             batch);
2641   }
2642   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
2643                     recv_trailing_metadata_ready_channelz, elem,
2644                     grpc_schedule_on_exec_ctx);
2645   // save some state needed for the interception callback.
2646   GPR_ASSERT(calld->recv_trailing_metadata == nullptr);
2647   calld->recv_trailing_metadata =
2648       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2649   calld->original_recv_trailing_metadata =
2650       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2651   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2652       &calld->recv_trailing_metadata_ready_channelz;
2653 }
2654 
2655 //
2656 // LB pick
2657 //
2658 
create_subchannel_call(grpc_call_element * elem,grpc_error * error)2659 static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2660   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2661   call_data* calld = static_cast<call_data*>(elem->call_data);
2662   const size_t parent_data_size =
2663       calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
2664   const grpc_core::ConnectedSubchannel::CallArgs call_args = {
2665       calld->pollent,                       // pollent
2666       calld->path,                          // path
2667       calld->call_start_time,               // start_time
2668       calld->deadline,                      // deadline
2669       calld->arena,                         // arena
2670       calld->pick.subchannel_call_context,  // context
2671       calld->call_combiner,                 // call_combiner
2672       parent_data_size                      // parent_data_size
2673   };
2674   grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
2675       call_args, &calld->subchannel_call);
2676   if (grpc_client_channel_trace.enabled()) {
2677     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
2678             chand, calld, calld->subchannel_call, grpc_error_string(new_error));
2679   }
2680   if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
2681     new_error = grpc_error_add_child(new_error, error);
2682     pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2683   } else {
2684     grpc_core::channelz::SubchannelNode* channelz_subchannel =
2685         calld->pick.connected_subchannel->channelz_subchannel();
2686     if (channelz_subchannel != nullptr) {
2687       channelz_subchannel->RecordCallStarted();
2688     }
2689     if (parent_data_size > 0) {
2690       subchannel_call_retry_state* retry_state =
2691           static_cast<subchannel_call_retry_state*>(
2692               grpc_connected_subchannel_call_get_parent_data(
2693                   calld->subchannel_call));
2694       retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2695     }
2696     pending_batches_resume(elem);
2697   }
2698   GRPC_ERROR_UNREF(error);
2699 }
2700 
2701 // Invoked when a pick is completed, on both success or failure.
pick_done(void * arg,grpc_error * error)2702 static void pick_done(void* arg, grpc_error* error) {
2703   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2704   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2705   call_data* calld = static_cast<call_data*>(elem->call_data);
2706   if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
2707     // Failed to create subchannel.
2708     // If there was no error, this is an LB policy drop, in which case
2709     // we return an error; otherwise, we may retry.
2710     grpc_status_code status = GRPC_STATUS_OK;
2711     grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2712                           nullptr);
2713     if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2714         !maybe_retry(elem, nullptr /* batch_data */, status,
2715                      nullptr /* server_pushback_md */)) {
2716       grpc_error* new_error =
2717           error == GRPC_ERROR_NONE
2718               ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2719                     "Call dropped by load balancing policy")
2720               : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2721                     "Failed to create subchannel", &error, 1);
2722       if (grpc_client_channel_trace.enabled()) {
2723         gpr_log(GPR_INFO,
2724                 "chand=%p calld=%p: failed to create subchannel: error=%s",
2725                 chand, calld, grpc_error_string(new_error));
2726       }
2727       pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2728     }
2729   } else {
2730     /* Create call on subchannel. */
2731     create_subchannel_call(elem, GRPC_ERROR_REF(error));
2732   }
2733 }
2734 
maybe_add_call_to_channel_interested_parties_locked(grpc_call_element * elem)2735 static void maybe_add_call_to_channel_interested_parties_locked(
2736     grpc_call_element* elem) {
2737   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2738   call_data* calld = static_cast<call_data*>(elem->call_data);
2739   if (!calld->pollent_added_to_interested_parties) {
2740     calld->pollent_added_to_interested_parties = true;
2741     grpc_polling_entity_add_to_pollset_set(calld->pollent,
2742                                            chand->interested_parties);
2743   }
2744 }
2745 
maybe_del_call_from_channel_interested_parties_locked(grpc_call_element * elem)2746 static void maybe_del_call_from_channel_interested_parties_locked(
2747     grpc_call_element* elem) {
2748   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2749   call_data* calld = static_cast<call_data*>(elem->call_data);
2750   if (calld->pollent_added_to_interested_parties) {
2751     calld->pollent_added_to_interested_parties = false;
2752     grpc_polling_entity_del_from_pollset_set(calld->pollent,
2753                                              chand->interested_parties);
2754   }
2755 }
2756 
2757 // Invoked when a pick is completed to leave the client_channel combiner
2758 // and continue processing in the call combiner.
2759 // If needed, removes the call's polling entity from chand->interested_parties.
pick_done_locked(grpc_call_element * elem,grpc_error * error)2760 static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2761   call_data* calld = static_cast<call_data*>(elem->call_data);
2762   maybe_del_call_from_channel_interested_parties_locked(elem);
2763   GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2764                     grpc_schedule_on_exec_ctx);
2765   GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
2766 }
2767 
2768 namespace grpc_core {
2769 
2770 // Performs subchannel pick via LB policy.
2771 class LbPicker {
2772  public:
2773   // Starts a pick on chand->lb_policy.
StartLocked(grpc_call_element * elem)2774   static void StartLocked(grpc_call_element* elem) {
2775     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2776     call_data* calld = static_cast<call_data*>(elem->call_data);
2777     if (grpc_client_channel_trace.enabled()) {
2778       gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
2779               chand, calld, chand->lb_policy.get());
2780     }
2781     // If this is a retry, use the send_initial_metadata payload that
2782     // we've cached; otherwise, use the pending batch.  The
2783     // send_initial_metadata batch will be the first pending batch in the
2784     // list, as set by get_batch_index() above.
2785     calld->pick.initial_metadata =
2786         calld->seen_send_initial_metadata
2787             ? &calld->send_initial_metadata
2788             : calld->pending_batches[0]
2789                   .batch->payload->send_initial_metadata.send_initial_metadata;
2790     calld->pick.initial_metadata_flags =
2791         calld->seen_send_initial_metadata
2792             ? calld->send_initial_metadata_flags
2793             : calld->pending_batches[0]
2794                   .batch->payload->send_initial_metadata
2795                   .send_initial_metadata_flags;
2796     GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
2797                       grpc_combiner_scheduler(chand->combiner));
2798     calld->pick.on_complete = &calld->pick_closure;
2799     GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
2800     grpc_error* error = GRPC_ERROR_NONE;
2801     const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
2802     if (GPR_LIKELY(pick_done)) {
2803       // Pick completed synchronously.
2804       if (grpc_client_channel_trace.enabled()) {
2805         gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
2806                 chand, calld);
2807       }
2808       pick_done_locked(elem, error);
2809       GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2810     } else {
2811       // Pick will be returned asynchronously.
2812       // Add the polling entity from call_data to the channel_data's
2813       // interested_parties, so that the I/O of the LB policy can be done
2814       // under it.  It will be removed in pick_done_locked().
2815       maybe_add_call_to_channel_interested_parties_locked(elem);
2816       // Request notification on call cancellation.
2817       GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2818       grpc_call_combiner_set_notify_on_cancel(
2819           calld->call_combiner,
2820           GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
2821                             &LbPicker::CancelLocked, elem,
2822                             grpc_combiner_scheduler(chand->combiner)));
2823     }
2824   }
2825 
2826  private:
2827   // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
2828   // Unrefs the LB policy and invokes pick_done_locked().
DoneLocked(void * arg,grpc_error * error)2829   static void DoneLocked(void* arg, grpc_error* error) {
2830     grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2831     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2832     call_data* calld = static_cast<call_data*>(elem->call_data);
2833     if (grpc_client_channel_trace.enabled()) {
2834       gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
2835               chand, calld);
2836     }
2837     pick_done_locked(elem, GRPC_ERROR_REF(error));
2838     GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2839   }
2840 
2841   // Note: This runs under the client_channel combiner, but will NOT be
2842   // holding the call combiner.
CancelLocked(void * arg,grpc_error * error)2843   static void CancelLocked(void* arg, grpc_error* error) {
2844     grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2845     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2846     call_data* calld = static_cast<call_data*>(elem->call_data);
2847     // Note: chand->lb_policy may have changed since we started our pick,
2848     // in which case we will be cancelling the pick on a policy other than
2849     // the one we started it on.  However, this will just be a no-op.
2850     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
2851       if (grpc_client_channel_trace.enabled()) {
2852         gpr_log(GPR_INFO,
2853                 "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
2854                 calld, chand->lb_policy.get());
2855       }
2856       chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
2857     }
2858     GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
2859   }
2860 };
2861 
2862 }  // namespace grpc_core
2863 
2864 // Applies service config to the call.  Must be invoked once we know
2865 // that the resolver has returned results to the channel.
apply_service_config_to_call_locked(grpc_call_element * elem)2866 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2867   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2868   call_data* calld = static_cast<call_data*>(elem->call_data);
2869   if (grpc_client_channel_trace.enabled()) {
2870     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2871             chand, calld);
2872   }
2873   if (chand->retry_throttle_data != nullptr) {
2874     calld->retry_throttle_data = chand->retry_throttle_data->Ref();
2875   }
2876   if (chand->method_params_table != nullptr) {
2877     calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2878         *chand->method_params_table, calld->path);
2879     if (calld->method_params != nullptr) {
2880       // If the deadline from the service config is shorter than the one
2881       // from the client API, reset the deadline timer.
2882       if (chand->deadline_checking_enabled &&
2883           calld->method_params->timeout() != 0) {
2884         const grpc_millis per_method_deadline =
2885             grpc_timespec_to_millis_round_up(calld->call_start_time) +
2886             calld->method_params->timeout();
2887         if (per_method_deadline < calld->deadline) {
2888           calld->deadline = per_method_deadline;
2889           grpc_deadline_state_reset(elem, calld->deadline);
2890         }
2891       }
2892       // If the service config set wait_for_ready and the application
2893       // did not explicitly set it, use the value from the service config.
2894       uint32_t* send_initial_metadata_flags =
2895           &calld->pending_batches[0]
2896                .batch->payload->send_initial_metadata
2897                .send_initial_metadata_flags;
2898       if (GPR_UNLIKELY(
2899               calld->method_params->wait_for_ready() !=
2900                   ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2901               !(*send_initial_metadata_flags &
2902                 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2903         if (calld->method_params->wait_for_ready() ==
2904             ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2905           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2906         } else {
2907           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2908         }
2909       }
2910     }
2911   }
2912   // If no retry policy, disable retries.
2913   // TODO(roth): Remove this when adding support for transparent retries.
2914   if (calld->method_params == nullptr ||
2915       calld->method_params->retry_policy() == nullptr) {
2916     calld->enable_retries = false;
2917   }
2918 }
2919 
2920 // Invoked once resolver results are available.
process_service_config_and_start_lb_pick_locked(grpc_call_element * elem)2921 static void process_service_config_and_start_lb_pick_locked(
2922     grpc_call_element* elem) {
2923   call_data* calld = static_cast<call_data*>(elem->call_data);
2924   // Only get service config data on the first attempt.
2925   if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
2926     apply_service_config_to_call_locked(elem);
2927   }
2928   // Start LB pick.
2929   grpc_core::LbPicker::StartLocked(elem);
2930 }
2931 
2932 namespace grpc_core {
2933 
2934 // Handles waiting for a resolver result.
2935 // Used only for the first call on an idle channel.
2936 class ResolverResultWaiter {
2937  public:
ResolverResultWaiter(grpc_call_element * elem)2938   explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
2939     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2940     call_data* calld = static_cast<call_data*>(elem->call_data);
2941     if (grpc_client_channel_trace.enabled()) {
2942       gpr_log(GPR_INFO,
2943               "chand=%p calld=%p: deferring pick pending resolver result",
2944               chand, calld);
2945     }
2946     // Add closure to be run when a resolver result is available.
2947     GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
2948                       grpc_combiner_scheduler(chand->combiner));
2949     AddToWaitingList();
2950     // Set cancellation closure, so that we abort if the call is cancelled.
2951     GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
2952                       this, grpc_combiner_scheduler(chand->combiner));
2953     grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
2954                                             &cancel_closure_);
2955   }
2956 
2957  private:
2958   // Adds closure_ to chand->waiting_for_resolver_result_closures.
AddToWaitingList()2959   void AddToWaitingList() {
2960     channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
2961     grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2962                              &done_closure_, GRPC_ERROR_NONE);
2963   }
2964 
2965   // Invoked when a resolver result is available.
DoneLocked(void * arg,grpc_error * error)2966   static void DoneLocked(void* arg, grpc_error* error) {
2967     ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2968     // If CancelLocked() has already run, delete ourselves without doing
2969     // anything.  Note that the call stack may have already been destroyed,
2970     // so it's not safe to access anything in elem_.
2971     if (GPR_UNLIKELY(self->finished_)) {
2972       if (grpc_client_channel_trace.enabled()) {
2973         gpr_log(GPR_INFO, "call cancelled before resolver result");
2974       }
2975       Delete(self);
2976       return;
2977     }
2978     // Otherwise, process the resolver result.
2979     grpc_call_element* elem = self->elem_;
2980     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2981     call_data* calld = static_cast<call_data*>(elem->call_data);
2982     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2983       if (grpc_client_channel_trace.enabled()) {
2984         gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
2985                 chand, calld);
2986       }
2987       pick_done_locked(elem, GRPC_ERROR_REF(error));
2988     } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2989       // Shutting down.
2990       if (grpc_client_channel_trace.enabled()) {
2991         gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
2992                 calld);
2993       }
2994       pick_done_locked(elem,
2995                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2996     } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
2997       // Transient resolver failure.
2998       // If call has wait_for_ready=true, try again; otherwise, fail.
2999       uint32_t send_initial_metadata_flags =
3000           calld->seen_send_initial_metadata
3001               ? calld->send_initial_metadata_flags
3002               : calld->pending_batches[0]
3003                     .batch->payload->send_initial_metadata
3004                     .send_initial_metadata_flags;
3005       if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
3006         if (grpc_client_channel_trace.enabled()) {
3007           gpr_log(GPR_INFO,
3008                   "chand=%p calld=%p: resolver returned but no LB policy; "
3009                   "wait_for_ready=true; trying again",
3010                   chand, calld);
3011         }
3012         // Re-add ourselves to the waiting list.
3013         self->AddToWaitingList();
3014         // Return early so that we don't set finished_ to true below.
3015         return;
3016       } else {
3017         if (grpc_client_channel_trace.enabled()) {
3018           gpr_log(GPR_INFO,
3019                   "chand=%p calld=%p: resolver returned but no LB policy; "
3020                   "wait_for_ready=false; failing",
3021                   chand, calld);
3022         }
3023         pick_done_locked(
3024             elem,
3025             grpc_error_set_int(
3026                 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
3027                 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
3028       }
3029     } else {
3030       if (grpc_client_channel_trace.enabled()) {
3031         gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
3032                 chand, calld);
3033       }
3034       process_service_config_and_start_lb_pick_locked(elem);
3035     }
3036     self->finished_ = true;
3037   }
3038 
3039   // Invoked when the call is cancelled.
3040   // Note: This runs under the client_channel combiner, but will NOT be
3041   // holding the call combiner.
CancelLocked(void * arg,grpc_error * error)3042   static void CancelLocked(void* arg, grpc_error* error) {
3043     ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
3044     // If DoneLocked() has already run, delete ourselves without doing anything.
3045     if (GPR_LIKELY(self->finished_)) {
3046       Delete(self);
3047       return;
3048     }
3049     // If we are being cancelled, immediately invoke pick_done_locked()
3050     // to propagate the error back to the caller.
3051     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3052       grpc_call_element* elem = self->elem_;
3053       channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3054       call_data* calld = static_cast<call_data*>(elem->call_data);
3055       if (grpc_client_channel_trace.enabled()) {
3056         gpr_log(GPR_INFO,
3057                 "chand=%p calld=%p: cancelling call waiting for name "
3058                 "resolution",
3059                 chand, calld);
3060       }
3061       // Note: Although we are not in the call combiner here, we are
3062       // basically stealing the call combiner from the pending pick, so
3063       // it's safe to call pick_done_locked() here -- we are essentially
3064       // calling it here instead of calling it in DoneLocked().
3065       pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3066                                  "Pick cancelled", &error, 1));
3067     }
3068     self->finished_ = true;
3069   }
3070 
3071   grpc_call_element* elem_;
3072   grpc_closure done_closure_;
3073   grpc_closure cancel_closure_;
3074   bool finished_ = false;
3075 };
3076 
3077 }  // namespace grpc_core
3078 
start_pick_locked(void * arg,grpc_error * ignored)3079 static void start_pick_locked(void* arg, grpc_error* ignored) {
3080   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3081   call_data* calld = static_cast<call_data*>(elem->call_data);
3082   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3083   GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
3084   GPR_ASSERT(calld->subchannel_call == nullptr);
3085   if (GPR_LIKELY(chand->lb_policy != nullptr)) {
3086     // We already have resolver results, so process the service config
3087     // and start an LB pick.
3088     process_service_config_and_start_lb_pick_locked(elem);
3089   } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
3090     pick_done_locked(elem,
3091                      GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
3092   } else {
3093     // We do not yet have an LB policy, so wait for a resolver result.
3094     if (GPR_UNLIKELY(!chand->started_resolving)) {
3095       start_resolving_locked(chand);
3096     }
3097     // Create a new waiter, which will delete itself when done.
3098     grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
3099     // Add the polling entity from call_data to the channel_data's
3100     // interested_parties, so that the I/O of the resolver can be done
3101     // under it.  It will be removed in pick_done_locked().
3102     maybe_add_call_to_channel_interested_parties_locked(elem);
3103   }
3104 }
3105 
3106 //
3107 // filter call vtable functions
3108 //
3109 
cc_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)3110 static void cc_start_transport_stream_op_batch(
3111     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
3112   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
3113   call_data* calld = static_cast<call_data*>(elem->call_data);
3114   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3115   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
3116     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
3117   }
3118   // If we've previously been cancelled, immediately fail any new batches.
3119   if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
3120     if (grpc_client_channel_trace.enabled()) {
3121       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
3122               chand, calld, grpc_error_string(calld->cancel_error));
3123     }
3124     // Note: This will release the call combiner.
3125     grpc_transport_stream_op_batch_finish_with_failure(
3126         batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
3127     return;
3128   }
3129   // Handle cancellation.
3130   if (GPR_UNLIKELY(batch->cancel_stream)) {
3131     // Stash a copy of cancel_error in our call data, so that we can use
3132     // it for subsequent operations.  This ensures that if the call is
3133     // cancelled before any batches are passed down (e.g., if the deadline
3134     // is in the past when the call starts), we can return the right
3135     // error to the caller when the first batch does get passed down.
3136     GRPC_ERROR_UNREF(calld->cancel_error);
3137     calld->cancel_error =
3138         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
3139     if (grpc_client_channel_trace.enabled()) {
3140       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
3141               calld, grpc_error_string(calld->cancel_error));
3142     }
3143     // If we do not have a subchannel call (i.e., a pick has not yet
3144     // been started), fail all pending batches.  Otherwise, send the
3145     // cancellation down to the subchannel call.
3146     if (calld->subchannel_call == nullptr) {
3147       pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
3148                            false /* yield_call_combiner */);
3149       // Note: This will release the call combiner.
3150       grpc_transport_stream_op_batch_finish_with_failure(
3151           batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
3152     } else {
3153       // Note: This will release the call combiner.
3154       grpc_subchannel_call_process_op(calld->subchannel_call, batch);
3155     }
3156     return;
3157   }
3158   // Add the batch to the pending list.
3159   pending_batches_add(elem, batch);
3160   // Check if we've already gotten a subchannel call.
3161   // Note that once we have completed the pick, we do not need to enter
3162   // the channel combiner, which is more efficient (especially for
3163   // streaming calls).
3164   if (calld->subchannel_call != nullptr) {
3165     if (grpc_client_channel_trace.enabled()) {
3166       gpr_log(GPR_INFO,
3167               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
3168               calld, calld->subchannel_call);
3169     }
3170     pending_batches_resume(elem);
3171     return;
3172   }
3173   // We do not yet have a subchannel call.
3174   // For batches containing a send_initial_metadata op, enter the channel
3175   // combiner to start a pick.
3176   if (GPR_LIKELY(batch->send_initial_metadata)) {
3177     if (grpc_client_channel_trace.enabled()) {
3178       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
3179               chand, calld);
3180     }
3181     GRPC_CLOSURE_SCHED(
3182         GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
3183                           elem, grpc_combiner_scheduler(chand->combiner)),
3184         GRPC_ERROR_NONE);
3185   } else {
3186     // For all other batches, release the call combiner.
3187     if (grpc_client_channel_trace.enabled()) {
3188       gpr_log(GPR_INFO,
3189               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
3190               calld);
3191     }
3192     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
3193                             "batch does not include send_initial_metadata");
3194   }
3195 }
3196 
3197 /* Constructor for call_data */
cc_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)3198 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
3199                                      const grpc_call_element_args* args) {
3200   call_data* calld = static_cast<call_data*>(elem->call_data);
3201   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3202   // Initialize data members.
3203   calld->path = grpc_slice_ref_internal(args->path);
3204   calld->call_start_time = args->start_time;
3205   calld->deadline = args->deadline;
3206   calld->arena = args->arena;
3207   calld->owning_call = args->call_stack;
3208   calld->call_combiner = args->call_combiner;
3209   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
3210     grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
3211                              calld->deadline);
3212   }
3213   calld->enable_retries = chand->enable_retries;
3214   calld->send_messages.Init();
3215   return GRPC_ERROR_NONE;
3216 }
3217 
3218 /* Destructor for call_data */
cc_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)3219 static void cc_destroy_call_elem(grpc_call_element* elem,
3220                                  const grpc_call_final_info* final_info,
3221                                  grpc_closure* then_schedule_closure) {
3222   call_data* calld = static_cast<call_data*>(elem->call_data);
3223   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3224   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
3225     grpc_deadline_state_destroy(elem);
3226   }
3227   grpc_slice_unref_internal(calld->path);
3228   calld->retry_throttle_data.reset();
3229   calld->method_params.reset();
3230   GRPC_ERROR_UNREF(calld->cancel_error);
3231   if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
3232     grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
3233                                              then_schedule_closure);
3234     then_schedule_closure = nullptr;
3235     GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
3236                                "client_channel_destroy_call");
3237   }
3238   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
3239     GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
3240   }
3241   if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
3242     calld->pick.connected_subchannel.reset();
3243   }
3244   for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
3245     if (calld->pick.subchannel_call_context[i].value != nullptr) {
3246       calld->pick.subchannel_call_context[i].destroy(
3247           calld->pick.subchannel_call_context[i].value);
3248     }
3249   }
3250   calld->send_messages.Destroy();
3251   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
3252 }
3253 
cc_set_pollset_or_pollset_set(grpc_call_element * elem,grpc_polling_entity * pollent)3254 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
3255                                           grpc_polling_entity* pollent) {
3256   call_data* calld = static_cast<call_data*>(elem->call_data);
3257   calld->pollent = pollent;
3258 }
3259 
3260 /*************************************************************************
3261  * EXPORTED SYMBOLS
3262  */
3263 
3264 const grpc_channel_filter grpc_client_channel_filter = {
3265     cc_start_transport_stream_op_batch,
3266     cc_start_transport_op,
3267     sizeof(call_data),
3268     cc_init_call_elem,
3269     cc_set_pollset_or_pollset_set,
3270     cc_destroy_call_elem,
3271     sizeof(channel_data),
3272     cc_init_channel_elem,
3273     cc_destroy_channel_elem,
3274     cc_get_channel_info,
3275     "client-channel",
3276 };
3277 
try_to_connect_locked(void * arg,grpc_error * error_ignored)3278 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
3279   channel_data* chand = static_cast<channel_data*>(arg);
3280   if (chand->lb_policy != nullptr) {
3281     chand->lb_policy->ExitIdleLocked();
3282   } else {
3283     chand->exit_idle_when_lb_policy_arrives = true;
3284     if (!chand->started_resolving && chand->resolver != nullptr) {
3285       start_resolving_locked(chand);
3286     }
3287   }
3288   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
3289 }
3290 
grpc_client_channel_populate_child_refs(grpc_channel_element * elem,grpc_core::ChildRefsList * child_subchannels,grpc_core::ChildRefsList * child_channels)3291 void grpc_client_channel_populate_child_refs(
3292     grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
3293     grpc_core::ChildRefsList* child_channels) {
3294   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3295   if (chand->lb_policy != nullptr) {
3296     chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
3297                                                child_channels);
3298   }
3299 }
3300 
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)3301 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3302     grpc_channel_element* elem, int try_to_connect) {
3303   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3304   grpc_connectivity_state out =
3305       grpc_connectivity_state_check(&chand->state_tracker);
3306   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
3307     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
3308     GRPC_CLOSURE_SCHED(
3309         GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3310                             grpc_combiner_scheduler(chand->combiner)),
3311         GRPC_ERROR_NONE);
3312   }
3313   return out;
3314 }
3315 
3316 typedef struct external_connectivity_watcher {
3317   channel_data* chand;
3318   grpc_polling_entity pollent;
3319   grpc_closure* on_complete;
3320   grpc_closure* watcher_timer_init;
3321   grpc_connectivity_state* state;
3322   grpc_closure my_closure;
3323   struct external_connectivity_watcher* next;
3324 } external_connectivity_watcher;
3325 
lookup_external_connectivity_watcher(channel_data * chand,grpc_closure * on_complete)3326 static external_connectivity_watcher* lookup_external_connectivity_watcher(
3327     channel_data* chand, grpc_closure* on_complete) {
3328   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3329   external_connectivity_watcher* w =
3330       chand->external_connectivity_watcher_list_head;
3331   while (w != nullptr && w->on_complete != on_complete) {
3332     w = w->next;
3333   }
3334   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3335   return w;
3336 }
3337 
external_connectivity_watcher_list_append(channel_data * chand,external_connectivity_watcher * w)3338 static void external_connectivity_watcher_list_append(
3339     channel_data* chand, external_connectivity_watcher* w) {
3340   GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3341 
3342   gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3343   GPR_ASSERT(!w->next);
3344   w->next = chand->external_connectivity_watcher_list_head;
3345   chand->external_connectivity_watcher_list_head = w;
3346   gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3347 }
3348 
external_connectivity_watcher_list_remove(channel_data * chand,external_connectivity_watcher * too_remove)3349 static void external_connectivity_watcher_list_remove(
3350     channel_data* chand, external_connectivity_watcher* too_remove) {
3351   GPR_ASSERT(
3352       lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3353   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3354   if (too_remove == chand->external_connectivity_watcher_list_head) {
3355     chand->external_connectivity_watcher_list_head = too_remove->next;
3356     gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3357     return;
3358   }
3359   external_connectivity_watcher* w =
3360       chand->external_connectivity_watcher_list_head;
3361   while (w != nullptr) {
3362     if (w->next == too_remove) {
3363       w->next = w->next->next;
3364       gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3365       return;
3366     }
3367     w = w->next;
3368   }
3369   GPR_UNREACHABLE_CODE(return );
3370 }
3371 
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)3372 int grpc_client_channel_num_external_connectivity_watchers(
3373     grpc_channel_element* elem) {
3374   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3375   int count = 0;
3376 
3377   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3378   external_connectivity_watcher* w =
3379       chand->external_connectivity_watcher_list_head;
3380   while (w != nullptr) {
3381     count++;
3382     w = w->next;
3383   }
3384   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3385 
3386   return count;
3387 }
3388 
on_external_watch_complete_locked(void * arg,grpc_error * error)3389 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
3390   external_connectivity_watcher* w =
3391       static_cast<external_connectivity_watcher*>(arg);
3392   grpc_closure* follow_up = w->on_complete;
3393   grpc_polling_entity_del_from_pollset_set(&w->pollent,
3394                                            w->chand->interested_parties);
3395   GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
3396                            "external_connectivity_watcher");
3397   external_connectivity_watcher_list_remove(w->chand, w);
3398   gpr_free(w);
3399   GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
3400 }
3401 
watch_connectivity_state_locked(void * arg,grpc_error * error_ignored)3402 static void watch_connectivity_state_locked(void* arg,
3403                                             grpc_error* error_ignored) {
3404   external_connectivity_watcher* w =
3405       static_cast<external_connectivity_watcher*>(arg);
3406   external_connectivity_watcher* found = nullptr;
3407   if (w->state != nullptr) {
3408     external_connectivity_watcher_list_append(w->chand, w);
3409     // An assumption is being made that the closure is scheduled on the exec ctx
3410     // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
3411     GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
3412     GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3413                       grpc_combiner_scheduler(w->chand->combiner));
3414     grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3415                                                    w->state, &w->my_closure);
3416   } else {
3417     GPR_ASSERT(w->watcher_timer_init == nullptr);
3418     found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3419     if (found) {
3420       GPR_ASSERT(found->on_complete == w->on_complete);
3421       grpc_connectivity_state_notify_on_state_change(
3422           &found->chand->state_tracker, nullptr, &found->my_closure);
3423     }
3424     grpc_polling_entity_del_from_pollset_set(&w->pollent,
3425                                              w->chand->interested_parties);
3426     GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
3427                              "external_connectivity_watcher");
3428     gpr_free(w);
3429   }
3430 }
3431 
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * closure,grpc_closure * watcher_timer_init)3432 void grpc_client_channel_watch_connectivity_state(
3433     grpc_channel_element* elem, grpc_polling_entity pollent,
3434     grpc_connectivity_state* state, grpc_closure* closure,
3435     grpc_closure* watcher_timer_init) {
3436   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3437   external_connectivity_watcher* w =
3438       static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
3439   w->chand = chand;
3440   w->pollent = pollent;
3441   w->on_complete = closure;
3442   w->state = state;
3443   w->watcher_timer_init = watcher_timer_init;
3444   grpc_polling_entity_add_to_pollset_set(&w->pollent,
3445                                          chand->interested_parties);
3446   GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3447                          "external_connectivity_watcher");
3448   GRPC_CLOSURE_SCHED(
3449       GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
3450                         grpc_combiner_scheduler(chand->combiner)),
3451       GRPC_ERROR_NONE);
3452 }
3453 
grpc_client_channel_get_subchannel_call(grpc_call_element * elem)3454 grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3455     grpc_call_element* elem) {
3456   call_data* calld = static_cast<call_data*>(elem->call_data);
3457   return calld->subchannel_call;
3458 }
3459