1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/subchannel.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 
26 #include <algorithm>
27 #include <cstring>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/string_util.h>
31 
32 #include "src/core/ext/filters/client_channel/client_channel.h"
33 #include "src/core/ext/filters/client_channel/parse_address.h"
34 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
35 #include "src/core/ext/filters/client_channel/subchannel_index.h"
36 #include "src/core/ext/filters/client_channel/uri_parser.h"
37 #include "src/core/lib/backoff/backoff.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/connected_channel.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/alloc.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/gprpp/manual_constructor.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/iomgr/sockaddr_utils.h"
46 #include "src/core/lib/iomgr/timer.h"
47 #include "src/core/lib/profiling/timers.h"
48 #include "src/core/lib/slice/slice_internal.h"
49 #include "src/core/lib/surface/channel.h"
50 #include "src/core/lib/surface/channel_init.h"
51 #include "src/core/lib/transport/connectivity_state.h"
52 
53 #define INTERNAL_REF_BITS 16
54 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
55 
56 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
57 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
58 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
59 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
60 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
61 
62 namespace {
63 struct state_watcher {
64   grpc_closure closure;
65   grpc_subchannel* subchannel;
66   grpc_connectivity_state connectivity_state;
67 };
68 }  // namespace
69 
70 typedef struct external_state_watcher {
71   grpc_subchannel* subchannel;
72   grpc_pollset_set* pollset_set;
73   grpc_closure* notify;
74   grpc_closure closure;
75   struct external_state_watcher* next;
76   struct external_state_watcher* prev;
77 } external_state_watcher;
78 
79 struct grpc_subchannel {
80   grpc_connector* connector;
81 
82   /** refcount
83       - lower INTERNAL_REF_BITS bits are for internal references:
84         these do not keep the subchannel open.
85       - upper remaining bits are for public references: these do
86         keep the subchannel open */
87   gpr_atm ref_pair;
88 
89   /** non-transport related channel filters */
90   const grpc_channel_filter** filters;
91   size_t num_filters;
92   /** channel arguments */
93   grpc_channel_args* args;
94 
95   grpc_subchannel_key* key;
96 
97   /** set during connection */
98   grpc_connect_out_args connecting_result;
99 
100   /** callback for connection finishing */
101   grpc_closure on_connected;
102 
103   /** callback for our alarm */
104   grpc_closure on_alarm;
105 
106   /** pollset_set tracking who's interested in a connection
107       being setup */
108   grpc_pollset_set* pollset_set;
109 
110   /** mutex protecting remaining elements */
111   gpr_mu mu;
112 
113   /** active connection, or null; of type grpc_core::ConnectedSubchannel
114    */
115   grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
116 
117   /** have we seen a disconnection? */
118   bool disconnected;
119   /** are we connecting */
120   bool connecting;
121   /** connectivity state tracking */
122   grpc_connectivity_state_tracker state_tracker;
123 
124   external_state_watcher root_external_state_watcher;
125 
126   /** backoff state */
127   grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
128   grpc_millis next_attempt_deadline;
129   grpc_millis min_connect_timeout_ms;
130 
131   /** do we have an active alarm? */
132   bool have_alarm;
133   /** have we started the backoff loop */
134   bool backoff_begun;
135   // reset_backoff() was called while alarm was pending
136   bool deferred_reset_backoff;
137   /** our alarm */
138   grpc_timer alarm;
139 
140   grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
141       channelz_subchannel;
142 };
143 
144 struct grpc_subchannel_call {
145   grpc_core::ConnectedSubchannel* connection;
146   grpc_closure* schedule_closure_after_destroy;
147 };
148 
149 #define SUBCHANNEL_CALL_TO_CALL_STACK(call)                          \
150   (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
151                                          sizeof(grpc_subchannel_call)))
152 #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack)           \
153   (grpc_subchannel_call*)(((char*)(call_stack)) -         \
154                           GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
155                               sizeof(grpc_subchannel_call)))
156 
157 static void on_subchannel_connected(void* subchannel, grpc_error* error);
158 
159 #ifndef NDEBUG
160 #define REF_REASON reason
161 #define REF_MUTATE_EXTRA_ARGS \
162   GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
163 #define REF_MUTATE_PURPOSE(x) , file, line, reason, x
164 #else
165 #define REF_REASON ""
166 #define REF_MUTATE_EXTRA_ARGS
167 #define REF_MUTATE_PURPOSE(x)
168 #endif
169 
170 /*
171  * connection implementation
172  */
173 
connection_destroy(void * arg,grpc_error * error)174 static void connection_destroy(void* arg, grpc_error* error) {
175   grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
176   grpc_channel_stack_destroy(stk);
177   gpr_free(stk);
178 }
179 
180 /*
181  * grpc_subchannel implementation
182  */
183 
subchannel_destroy(void * arg,grpc_error * error)184 static void subchannel_destroy(void* arg, grpc_error* error) {
185   grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
186   if (c->channelz_subchannel != nullptr) {
187     c->channelz_subchannel->AddTraceEvent(
188         grpc_core::channelz::ChannelTrace::Severity::Info,
189         grpc_slice_from_static_string("Subchannel destroyed"));
190     c->channelz_subchannel->MarkSubchannelDestroyed();
191     c->channelz_subchannel.reset();
192   }
193   gpr_free((void*)c->filters);
194   grpc_channel_args_destroy(c->args);
195   grpc_connectivity_state_destroy(&c->state_tracker);
196   grpc_connector_unref(c->connector);
197   grpc_pollset_set_destroy(c->pollset_set);
198   grpc_subchannel_key_destroy(c->key);
199   gpr_mu_destroy(&c->mu);
200   gpr_free(c);
201 }
202 
ref_mutate(grpc_subchannel * c,gpr_atm delta,int barrier REF_MUTATE_EXTRA_ARGS)203 static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta,
204                           int barrier REF_MUTATE_EXTRA_ARGS) {
205   gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
206                             : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
207 #ifndef NDEBUG
208   if (grpc_trace_stream_refcount.enabled()) {
209     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
210             "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
211             purpose, old_val, old_val + delta, reason);
212   }
213 #endif
214   return old_val;
215 }
216 
grpc_subchannel_ref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)217 grpc_subchannel* grpc_subchannel_ref(
218     grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
219   gpr_atm old_refs;
220   old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
221                         0 REF_MUTATE_PURPOSE("STRONG_REF"));
222   GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
223   return c;
224 }
225 
grpc_subchannel_weak_ref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)226 grpc_subchannel* grpc_subchannel_weak_ref(
227     grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
228   gpr_atm old_refs;
229   old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
230   GPR_ASSERT(old_refs != 0);
231   return c;
232 }
233 
grpc_subchannel_ref_from_weak_ref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)234 grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
235     grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
236   if (!c) return nullptr;
237   for (;;) {
238     gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
239     if (old_refs >= (1 << INTERNAL_REF_BITS)) {
240       gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
241       if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
242         return c;
243       }
244     } else {
245       return nullptr;
246     }
247   }
248 }
249 
disconnect(grpc_subchannel * c)250 static void disconnect(grpc_subchannel* c) {
251   grpc_subchannel_index_unregister(c->key, c);
252   gpr_mu_lock(&c->mu);
253   GPR_ASSERT(!c->disconnected);
254   c->disconnected = true;
255   grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
256                                             "Subchannel disconnected"));
257   c->connected_subchannel.reset();
258   gpr_mu_unlock(&c->mu);
259 }
260 
grpc_subchannel_unref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)261 void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
262   gpr_atm old_refs;
263   // add a weak ref and subtract a strong ref (atomically)
264   old_refs = ref_mutate(
265       c, static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS),
266       1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
267   if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
268     disconnect(c);
269   }
270   GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref");
271 }
272 
grpc_subchannel_weak_unref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)273 void grpc_subchannel_weak_unref(
274     grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
275   gpr_atm old_refs;
276   old_refs = ref_mutate(c, -static_cast<gpr_atm>(1),
277                         1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
278   if (old_refs == 1) {
279     GRPC_CLOSURE_SCHED(
280         GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx),
281         GRPC_ERROR_NONE);
282   }
283 }
284 
parse_args_for_backoff_values(const grpc_channel_args * args,grpc_core::BackOff::Options * backoff_options,grpc_millis * min_connect_timeout_ms)285 static void parse_args_for_backoff_values(
286     const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options,
287     grpc_millis* min_connect_timeout_ms) {
288   grpc_millis initial_backoff_ms =
289       GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
290   *min_connect_timeout_ms =
291       GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
292   grpc_millis max_backoff_ms =
293       GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
294   bool fixed_reconnect_backoff = false;
295   if (args != nullptr) {
296     for (size_t i = 0; i < args->num_args; i++) {
297       if (0 == strcmp(args->args[i].key,
298                       "grpc.testing.fixed_reconnect_backoff_ms")) {
299         fixed_reconnect_backoff = true;
300         initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
301             grpc_channel_arg_get_integer(
302                 &args->args[i],
303                 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
304       } else if (0 ==
305                  strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
306         fixed_reconnect_backoff = false;
307         *min_connect_timeout_ms = grpc_channel_arg_get_integer(
308             &args->args[i],
309             {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
310       } else if (0 ==
311                  strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
312         fixed_reconnect_backoff = false;
313         max_backoff_ms = grpc_channel_arg_get_integer(
314             &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
315       } else if (0 == strcmp(args->args[i].key,
316                              GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
317         fixed_reconnect_backoff = false;
318         initial_backoff_ms = grpc_channel_arg_get_integer(
319             &args->args[i],
320             {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
321       }
322     }
323   }
324   backoff_options->set_initial_backoff(initial_backoff_ms)
325       .set_multiplier(fixed_reconnect_backoff
326                           ? 1.0
327                           : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
328       .set_jitter(fixed_reconnect_backoff ? 0.0
329                                           : GRPC_SUBCHANNEL_RECONNECT_JITTER)
330       .set_max_backoff(max_backoff_ms);
331 }
332 
grpc_subchannel_create(grpc_connector * connector,const grpc_subchannel_args * args)333 grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
334                                         const grpc_subchannel_args* args) {
335   grpc_subchannel_key* key = grpc_subchannel_key_create(args);
336   grpc_subchannel* c = grpc_subchannel_index_find(key);
337   if (c) {
338     grpc_subchannel_key_destroy(key);
339     return c;
340   }
341 
342   GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
343   c = static_cast<grpc_subchannel*>(gpr_zalloc(sizeof(*c)));
344   c->key = key;
345   gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
346   c->connector = connector;
347   grpc_connector_ref(c->connector);
348   c->num_filters = args->filter_count;
349   if (c->num_filters > 0) {
350     c->filters = static_cast<const grpc_channel_filter**>(
351         gpr_malloc(sizeof(grpc_channel_filter*) * c->num_filters));
352     memcpy((void*)c->filters, args->filters,
353            sizeof(grpc_channel_filter*) * c->num_filters);
354   } else {
355     c->filters = nullptr;
356   }
357   c->pollset_set = grpc_pollset_set_create();
358   grpc_resolved_address* addr =
359       static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
360   grpc_get_subchannel_address_arg(args->args, addr);
361   grpc_resolved_address* new_address = nullptr;
362   grpc_channel_args* new_args = nullptr;
363   if (grpc_proxy_mappers_map_address(addr, args->args, &new_address,
364                                      &new_args)) {
365     GPR_ASSERT(new_address != nullptr);
366     gpr_free(addr);
367     addr = new_address;
368   }
369   static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
370   grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
371   gpr_free(addr);
372   c->args = grpc_channel_args_copy_and_add_and_remove(
373       new_args != nullptr ? new_args : args->args, keys_to_remove,
374       GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
375   gpr_free(new_arg.value.string);
376   if (new_args != nullptr) grpc_channel_args_destroy(new_args);
377   c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
378       &c->root_external_state_watcher;
379   GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c,
380                     grpc_schedule_on_exec_ctx);
381   grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
382                                "subchannel");
383   grpc_core::BackOff::Options backoff_options;
384   parse_args_for_backoff_values(args->args, &backoff_options,
385                                 &c->min_connect_timeout_ms);
386   c->backoff.Init(backoff_options);
387   gpr_mu_init(&c->mu);
388 
389   const grpc_arg* arg =
390       grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
391   bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
392   arg = grpc_channel_args_find(c->args,
393                                GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
394   const grpc_integer_options options = {0, 0, INT_MAX};
395   size_t channel_tracer_max_nodes =
396       (size_t)grpc_channel_arg_get_integer(arg, options);
397   if (channelz_enabled) {
398     c->channelz_subchannel =
399         grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(
400             c, channel_tracer_max_nodes);
401     c->channelz_subchannel->AddTraceEvent(
402         grpc_core::channelz::ChannelTrace::Severity::Info,
403         grpc_slice_from_static_string("Subchannel created"));
404   }
405 
406   return grpc_subchannel_index_register(key, c);
407 }
408 
grpc_subchannel_get_channelz_node(grpc_subchannel * subchannel)409 grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
410     grpc_subchannel* subchannel) {
411   return subchannel->channelz_subchannel.get();
412 }
413 
continue_connect_locked(grpc_subchannel * c)414 static void continue_connect_locked(grpc_subchannel* c) {
415   grpc_connect_in_args args;
416   args.interested_parties = c->pollset_set;
417   const grpc_millis min_deadline =
418       c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now();
419   c->next_attempt_deadline = c->backoff->NextAttemptTime();
420   args.deadline = std::max(c->next_attempt_deadline, min_deadline);
421   args.channel_args = c->args;
422   grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
423                               GRPC_ERROR_NONE, "connecting");
424   grpc_connector_connect(c->connector, &args, &c->connecting_result,
425                          &c->on_connected);
426 }
427 
grpc_subchannel_check_connectivity(grpc_subchannel * c,grpc_error ** error)428 grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
429                                                            grpc_error** error) {
430   grpc_connectivity_state state;
431   gpr_mu_lock(&c->mu);
432   state = grpc_connectivity_state_get(&c->state_tracker, error);
433   gpr_mu_unlock(&c->mu);
434   return state;
435 }
436 
on_external_state_watcher_done(void * arg,grpc_error * error)437 static void on_external_state_watcher_done(void* arg, grpc_error* error) {
438   external_state_watcher* w = static_cast<external_state_watcher*>(arg);
439   grpc_closure* follow_up = w->notify;
440   if (w->pollset_set != nullptr) {
441     grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set,
442                                      w->pollset_set);
443   }
444   gpr_mu_lock(&w->subchannel->mu);
445   w->next->prev = w->prev;
446   w->prev->next = w->next;
447   gpr_mu_unlock(&w->subchannel->mu);
448   GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher");
449   gpr_free(w);
450   GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
451 }
452 
on_alarm(void * arg,grpc_error * error)453 static void on_alarm(void* arg, grpc_error* error) {
454   grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
455   gpr_mu_lock(&c->mu);
456   c->have_alarm = false;
457   if (c->disconnected) {
458     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
459                                                              &error, 1);
460   } else if (c->deferred_reset_backoff) {
461     c->deferred_reset_backoff = false;
462     error = GRPC_ERROR_NONE;
463   } else {
464     GRPC_ERROR_REF(error);
465   }
466   if (error == GRPC_ERROR_NONE) {
467     gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
468     continue_connect_locked(c);
469     gpr_mu_unlock(&c->mu);
470   } else {
471     gpr_mu_unlock(&c->mu);
472     GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
473   }
474   GRPC_ERROR_UNREF(error);
475 }
476 
maybe_start_connecting_locked(grpc_subchannel * c)477 static void maybe_start_connecting_locked(grpc_subchannel* c) {
478   if (c->disconnected) {
479     /* Don't try to connect if we're already disconnected */
480     return;
481   }
482   if (c->connecting) {
483     /* Already connecting: don't restart */
484     return;
485   }
486   if (c->connected_subchannel != nullptr) {
487     /* Already connected: don't restart */
488     return;
489   }
490   if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
491     /* Nobody is interested in connecting: so don't just yet */
492     return;
493   }
494   c->connecting = true;
495   GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
496   if (!c->backoff_begun) {
497     c->backoff_begun = true;
498     continue_connect_locked(c);
499   } else {
500     GPR_ASSERT(!c->have_alarm);
501     c->have_alarm = true;
502     const grpc_millis time_til_next =
503         c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
504     if (time_til_next <= 0) {
505       gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c);
506     } else {
507       gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", c,
508               time_til_next);
509     }
510     GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
511     grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
512   }
513 }
514 
grpc_subchannel_notify_on_state_change(grpc_subchannel * c,grpc_pollset_set * interested_parties,grpc_connectivity_state * state,grpc_closure * notify)515 void grpc_subchannel_notify_on_state_change(
516     grpc_subchannel* c, grpc_pollset_set* interested_parties,
517     grpc_connectivity_state* state, grpc_closure* notify) {
518   external_state_watcher* w;
519 
520   if (state == nullptr) {
521     gpr_mu_lock(&c->mu);
522     for (w = c->root_external_state_watcher.next;
523          w != &c->root_external_state_watcher; w = w->next) {
524       if (w->notify == notify) {
525         grpc_connectivity_state_notify_on_state_change(&c->state_tracker,
526                                                        nullptr, &w->closure);
527       }
528     }
529     gpr_mu_unlock(&c->mu);
530   } else {
531     w = static_cast<external_state_watcher*>(gpr_malloc(sizeof(*w)));
532     w->subchannel = c;
533     w->pollset_set = interested_parties;
534     w->notify = notify;
535     GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w,
536                       grpc_schedule_on_exec_ctx);
537     if (interested_parties != nullptr) {
538       grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties);
539     }
540     GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
541     gpr_mu_lock(&c->mu);
542     w->next = &c->root_external_state_watcher;
543     w->prev = w->next->prev;
544     w->next->prev = w->prev->next = w;
545     grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
546                                                    &w->closure);
547     maybe_start_connecting_locked(c);
548     gpr_mu_unlock(&c->mu);
549   }
550 }
551 
on_connected_subchannel_connectivity_changed(void * p,grpc_error * error)552 static void on_connected_subchannel_connectivity_changed(void* p,
553                                                          grpc_error* error) {
554   state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(p);
555   grpc_subchannel* c = connected_subchannel_watcher->subchannel;
556   gpr_mu* mu = &c->mu;
557 
558   gpr_mu_lock(mu);
559 
560   switch (connected_subchannel_watcher->connectivity_state) {
561     case GRPC_CHANNEL_TRANSIENT_FAILURE:
562     case GRPC_CHANNEL_SHUTDOWN: {
563       if (!c->disconnected && c->connected_subchannel != nullptr) {
564         if (grpc_trace_stream_refcount.enabled()) {
565           gpr_log(GPR_INFO,
566                   "Connected subchannel %p of subchannel %p has gone into %s. "
567                   "Attempting to reconnect.",
568                   c->connected_subchannel.get(), c,
569                   grpc_connectivity_state_name(
570                       connected_subchannel_watcher->connectivity_state));
571         }
572         c->connected_subchannel.reset();
573         grpc_connectivity_state_set(&c->state_tracker,
574                                     GRPC_CHANNEL_TRANSIENT_FAILURE,
575                                     GRPC_ERROR_REF(error), "reflect_child");
576         c->backoff_begun = false;
577         c->backoff->Reset();
578         maybe_start_connecting_locked(c);
579       } else {
580         connected_subchannel_watcher->connectivity_state =
581             GRPC_CHANNEL_SHUTDOWN;
582       }
583       break;
584     }
585     default: {
586       grpc_connectivity_state_set(
587           &c->state_tracker, connected_subchannel_watcher->connectivity_state,
588           GRPC_ERROR_REF(error), "reflect_child");
589       GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
590       c->connected_subchannel->NotifyOnStateChange(
591           nullptr, &connected_subchannel_watcher->connectivity_state,
592           &connected_subchannel_watcher->closure);
593       connected_subchannel_watcher = nullptr;
594     }
595   }
596   gpr_mu_unlock(mu);
597   GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
598   gpr_free(connected_subchannel_watcher);
599 }
600 
publish_transport_locked(grpc_subchannel * c)601 static bool publish_transport_locked(grpc_subchannel* c) {
602   /* construct channel stack */
603   grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
604   grpc_channel_stack_builder_set_channel_arguments(
605       builder, c->connecting_result.channel_args);
606   grpc_channel_stack_builder_set_transport(builder,
607                                            c->connecting_result.transport);
608 
609   if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
610     grpc_channel_stack_builder_destroy(builder);
611     return false;
612   }
613   grpc_channel_stack* stk;
614   grpc_error* error = grpc_channel_stack_builder_finish(
615       builder, 0, 1, connection_destroy, nullptr,
616       reinterpret_cast<void**>(&stk));
617   if (error != GRPC_ERROR_NONE) {
618     grpc_transport_destroy(c->connecting_result.transport);
619     gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
620             grpc_error_string(error));
621     GRPC_ERROR_UNREF(error);
622     return false;
623   }
624   memset(&c->connecting_result, 0, sizeof(c->connecting_result));
625 
626   /* initialize state watcher */
627   state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(
628       gpr_zalloc(sizeof(*connected_subchannel_watcher)));
629   connected_subchannel_watcher->subchannel = c;
630   connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
631   GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
632                     on_connected_subchannel_connectivity_changed,
633                     connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
634 
635   if (c->disconnected) {
636     gpr_free(connected_subchannel_watcher);
637     grpc_channel_stack_destroy(stk);
638     gpr_free(stk);
639     return false;
640   }
641 
642   /* publish */
643   c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
644       stk, c->channelz_subchannel.get()));
645   gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
646           c->connected_subchannel.get(), c);
647 
648   /* setup subchannel watching connected subchannel for changes; subchannel
649      ref for connecting is donated to the state watcher */
650   GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
651   GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
652   c->connected_subchannel->NotifyOnStateChange(
653       c->pollset_set, &connected_subchannel_watcher->connectivity_state,
654       &connected_subchannel_watcher->closure);
655 
656   /* signal completion */
657   grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
658                               GRPC_ERROR_NONE, "connected");
659   return true;
660 }
661 
on_subchannel_connected(void * arg,grpc_error * error)662 static void on_subchannel_connected(void* arg, grpc_error* error) {
663   grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
664   grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
665 
666   GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected");
667   gpr_mu_lock(&c->mu);
668   c->connecting = false;
669   if (c->connecting_result.transport != nullptr &&
670       publish_transport_locked(c)) {
671     /* do nothing, transport was published */
672   } else if (c->disconnected) {
673     GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
674   } else {
675     grpc_connectivity_state_set(
676         &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
677         grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
678                                "Connect Failed", &error, 1),
679                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
680         "connect_failed");
681 
682     const char* errmsg = grpc_error_string(error);
683     gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
684 
685     maybe_start_connecting_locked(c);
686     GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
687   }
688   gpr_mu_unlock(&c->mu);
689   GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected");
690   grpc_channel_args_destroy(delete_channel_args);
691 }
692 
grpc_subchannel_reset_backoff(grpc_subchannel * subchannel)693 void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) {
694   gpr_mu_lock(&subchannel->mu);
695   if (subchannel->have_alarm) {
696     subchannel->deferred_reset_backoff = true;
697     grpc_timer_cancel(&subchannel->alarm);
698   } else {
699     subchannel->backoff_begun = false;
700     subchannel->backoff->Reset();
701     maybe_start_connecting_locked(subchannel);
702   }
703   gpr_mu_unlock(&subchannel->mu);
704 }
705 
706 /*
707  * grpc_subchannel_call implementation
708  */
709 
subchannel_call_destroy(void * call,grpc_error * error)710 static void subchannel_call_destroy(void* call, grpc_error* error) {
711   GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0);
712   grpc_subchannel_call* c = static_cast<grpc_subchannel_call*>(call);
713   grpc_core::ConnectedSubchannel* connection = c->connection;
714   grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
715                           c->schedule_closure_after_destroy);
716   connection->Unref(DEBUG_LOCATION, "subchannel_call");
717 }
718 
grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call * call,grpc_closure * closure)719 void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call,
720                                               grpc_closure* closure) {
721   GPR_ASSERT(call->schedule_closure_after_destroy == nullptr);
722   GPR_ASSERT(closure != nullptr);
723   call->schedule_closure_after_destroy = closure;
724 }
725 
grpc_subchannel_call_ref(grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)726 grpc_subchannel_call* grpc_subchannel_call_ref(
727     grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
728   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
729   return c;
730 }
731 
grpc_subchannel_call_unref(grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)732 void grpc_subchannel_call_unref(
733     grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
734   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
735 }
736 
grpc_subchannel_call_process_op(grpc_subchannel_call * call,grpc_transport_stream_op_batch * batch)737 void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
738                                      grpc_transport_stream_op_batch* batch) {
739   GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0);
740   grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
741   grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
742   GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
743   top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
744 }
745 
746 grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
grpc_subchannel_get_connected_subchannel(grpc_subchannel * c)747 grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) {
748   gpr_mu_lock(&c->mu);
749   auto copy = c->connected_subchannel;
750   gpr_mu_unlock(&c->mu);
751   return copy;
752 }
753 
grpc_subchannel_get_key(const grpc_subchannel * subchannel)754 const grpc_subchannel_key* grpc_subchannel_get_key(
755     const grpc_subchannel* subchannel) {
756   return subchannel->key;
757 }
758 
grpc_connected_subchannel_call_get_parent_data(grpc_subchannel_call * subchannel_call)759 void* grpc_connected_subchannel_call_get_parent_data(
760     grpc_subchannel_call* subchannel_call) {
761   grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack();
762   return (char*)subchannel_call + sizeof(grpc_subchannel_call) +
763          chanstk->call_stack_size;
764 }
765 
grpc_subchannel_call_get_call_stack(grpc_subchannel_call * subchannel_call)766 grpc_call_stack* grpc_subchannel_call_get_call_stack(
767     grpc_subchannel_call* subchannel_call) {
768   return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
769 }
770 
grpc_uri_to_sockaddr(const char * uri_str,grpc_resolved_address * addr)771 static void grpc_uri_to_sockaddr(const char* uri_str,
772                                  grpc_resolved_address* addr) {
773   grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
774   GPR_ASSERT(uri != nullptr);
775   if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
776   grpc_uri_destroy(uri);
777 }
778 
grpc_get_subchannel_address_arg(const grpc_channel_args * args,grpc_resolved_address * addr)779 void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
780                                      grpc_resolved_address* addr) {
781   const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
782   memset(addr, 0, sizeof(*addr));
783   if (*addr_uri_str != '\0') {
784     grpc_uri_to_sockaddr(addr_uri_str, addr);
785   }
786 }
787 
grpc_subchannel_get_target(grpc_subchannel * subchannel)788 const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) {
789   const grpc_arg* addr_arg =
790       grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS);
791   const char* addr_str = grpc_channel_arg_get_string(addr_arg);
792   GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
793   return addr_str;
794 }
795 
grpc_get_subchannel_address_uri_arg(const grpc_channel_args * args)796 const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
797   const grpc_arg* addr_arg =
798       grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
799   const char* addr_str = grpc_channel_arg_get_string(addr_arg);
800   GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
801   return addr_str;
802 }
803 
grpc_create_subchannel_address_arg(const grpc_resolved_address * addr)804 grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
805   return grpc_channel_arg_string_create(
806       (char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
807       addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
808 }
809 
810 namespace grpc_core {
811 
ConnectedSubchannel(grpc_channel_stack * channel_stack,channelz::SubchannelNode * channelz_subchannel)812 ConnectedSubchannel::ConnectedSubchannel(
813     grpc_channel_stack* channel_stack,
814     channelz::SubchannelNode* channelz_subchannel)
815     : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
816       channel_stack_(channel_stack),
817       channelz_subchannel_(channelz_subchannel) {}
818 
~ConnectedSubchannel()819 ConnectedSubchannel::~ConnectedSubchannel() {
820   GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
821 }
822 
NotifyOnStateChange(grpc_pollset_set * interested_parties,grpc_connectivity_state * state,grpc_closure * closure)823 void ConnectedSubchannel::NotifyOnStateChange(
824     grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
825     grpc_closure* closure) {
826   grpc_transport_op* op = grpc_make_transport_op(nullptr);
827   grpc_channel_element* elem;
828   op->connectivity_state = state;
829   op->on_connectivity_state_change = closure;
830   op->bind_pollset_set = interested_parties;
831   elem = grpc_channel_stack_element(channel_stack_, 0);
832   elem->filter->start_transport_op(elem, op);
833 }
834 
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)835 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
836                                grpc_closure* on_ack) {
837   grpc_transport_op* op = grpc_make_transport_op(nullptr);
838   grpc_channel_element* elem;
839   op->send_ping.on_initiate = on_initiate;
840   op->send_ping.on_ack = on_ack;
841   elem = grpc_channel_stack_element(channel_stack_, 0);
842   elem->filter->start_transport_op(elem, op);
843 }
844 
CreateCall(const CallArgs & args,grpc_subchannel_call ** call)845 grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
846                                             grpc_subchannel_call** call) {
847   size_t allocation_size =
848       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call));
849   if (args.parent_data_size > 0) {
850     allocation_size +=
851         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
852         args.parent_data_size;
853   } else {
854     allocation_size += channel_stack_->call_stack_size;
855   }
856   *call = static_cast<grpc_subchannel_call*>(
857       gpr_arena_alloc(args.arena, allocation_size));
858   grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
859   RefCountedPtr<ConnectedSubchannel> connection =
860       Ref(DEBUG_LOCATION, "subchannel_call");
861   connection.release();  // Ref is passed to the grpc_subchannel_call object.
862   (*call)->connection = this;
863   const grpc_call_element_args call_args = {
864       callstk,           /* call_stack */
865       nullptr,           /* server_transport_data */
866       args.context,      /* context */
867       args.path,         /* path */
868       args.start_time,   /* start_time */
869       args.deadline,     /* deadline */
870       args.arena,        /* arena */
871       args.call_combiner /* call_combiner */
872   };
873   grpc_error* error = grpc_call_stack_init(
874       channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
875   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
876     const char* error_string = grpc_error_string(error);
877     gpr_log(GPR_ERROR, "error: %s", error_string);
878     return error;
879   }
880   grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
881   return GRPC_ERROR_NONE;
882 }
883 
884 }  // namespace grpc_core
885