1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/subchannel.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 
26 #include <algorithm>
27 #include <cstring>
28 
29 #include "absl/strings/str_format.h"
30 
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/string_util.h>
33 
34 #include "src/core/ext/filters/client_channel/client_channel.h"
35 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
36 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
37 #include "src/core/ext/filters/client_channel/service_config.h"
38 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
39 #include "src/core/lib/backoff/backoff.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/connected_channel.h"
42 #include "src/core/lib/debug/stats.h"
43 #include "src/core/lib/gpr/alloc.h"
44 #include "src/core/lib/gprpp/debug_location.h"
45 #include "src/core/lib/gprpp/manual_constructor.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/gprpp/sync.h"
48 #include "src/core/lib/iomgr/parse_address.h"
49 #include "src/core/lib/iomgr/sockaddr_utils.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_internal.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/channel_init.h"
54 #include "src/core/lib/transport/connectivity_state.h"
55 #include "src/core/lib/transport/error_utils.h"
56 #include "src/core/lib/transport/status_metadata.h"
57 #include "src/core/lib/uri/uri_parser.h"
58 
59 // Strong and weak refs.
60 #define INTERNAL_REF_BITS 16
61 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
62 
63 // Backoff parameters.
64 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
65 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
66 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
67 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
68 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
69 
70 // Conversion between subchannel call and call stack.
71 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
72   (grpc_call_stack*)((char*)(call) +        \
73                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
74 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
75   (SubchannelCall*)(((char*)(call_stack)) -      \
76                     GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
77 
78 namespace grpc_core {
79 
80 TraceFlag grpc_trace_subchannel(false, "subchannel");
81 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
82 
83 //
84 // ConnectedSubchannel
85 //
86 
ConnectedSubchannel(grpc_channel_stack * channel_stack,const grpc_channel_args * args,RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)87 ConnectedSubchannel::ConnectedSubchannel(
88     grpc_channel_stack* channel_stack, const grpc_channel_args* args,
89     RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
90     : RefCounted<ConnectedSubchannel>(
91           GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
92               ? "ConnectedSubchannel"
93               : nullptr),
94       channel_stack_(channel_stack),
95       args_(grpc_channel_args_copy(args)),
96       channelz_subchannel_(std::move(channelz_subchannel)) {}
97 
~ConnectedSubchannel()98 ConnectedSubchannel::~ConnectedSubchannel() {
99   grpc_channel_args_destroy(args_);
100   GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
101 }
102 
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)103 void ConnectedSubchannel::StartWatch(
104     grpc_pollset_set* interested_parties,
105     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
106   grpc_transport_op* op = grpc_make_transport_op(nullptr);
107   op->start_connectivity_watch = std::move(watcher);
108   op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
109   op->bind_pollset_set = interested_parties;
110   grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
111   elem->filter->start_transport_op(elem, op);
112 }
113 
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)114 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
115                                grpc_closure* on_ack) {
116   grpc_transport_op* op = grpc_make_transport_op(nullptr);
117   grpc_channel_element* elem;
118   op->send_ping.on_initiate = on_initiate;
119   op->send_ping.on_ack = on_ack;
120   elem = grpc_channel_stack_element(channel_stack_, 0);
121   elem->filter->start_transport_op(elem, op);
122 }
123 
GetInitialCallSizeEstimate() const124 size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
125   return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
126          channel_stack_->call_stack_size;
127 }
128 
129 //
130 // SubchannelCall
131 //
132 
Create(Args args,grpc_error ** error)133 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
134                                                      grpc_error** error) {
135   const size_t allocation_size =
136       args.connected_subchannel->GetInitialCallSizeEstimate();
137   Arena* arena = args.arena;
138   return RefCountedPtr<SubchannelCall>(new (
139       arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
140 }
141 
SubchannelCall(Args args,grpc_error ** error)142 SubchannelCall::SubchannelCall(Args args, grpc_error** error)
143     : connected_subchannel_(std::move(args.connected_subchannel)),
144       deadline_(args.deadline) {
145   grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
146   const grpc_call_element_args call_args = {
147       callstk,           /* call_stack */
148       nullptr,           /* server_transport_data */
149       args.context,      /* context */
150       args.path,         /* path */
151       args.start_time,   /* start_time */
152       args.deadline,     /* deadline */
153       args.arena,        /* arena */
154       args.call_combiner /* call_combiner */
155   };
156   *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
157                                 SubchannelCall::Destroy, this, &call_args);
158   if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
159     const char* error_string = grpc_error_string(*error);
160     gpr_log(GPR_ERROR, "error: %s", error_string);
161     return;
162   }
163   grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
164   auto* channelz_node = connected_subchannel_->channelz_subchannel();
165   if (channelz_node != nullptr) {
166     channelz_node->RecordCallStarted();
167   }
168 }
169 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)170 void SubchannelCall::StartTransportStreamOpBatch(
171     grpc_transport_stream_op_batch* batch) {
172   GPR_TIMER_SCOPE("subchannel_call_process_op", 0);
173   MaybeInterceptRecvTrailingMetadata(batch);
174   grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
175   grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
176   GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
177   top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
178 }
179 
GetCallStack()180 grpc_call_stack* SubchannelCall::GetCallStack() {
181   return SUBCHANNEL_CALL_TO_CALL_STACK(this);
182 }
183 
SetAfterCallStackDestroy(grpc_closure * closure)184 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
185   GPR_ASSERT(after_call_stack_destroy_ == nullptr);
186   GPR_ASSERT(closure != nullptr);
187   after_call_stack_destroy_ = closure;
188 }
189 
Ref()190 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
191   IncrementRefCount();
192   return RefCountedPtr<SubchannelCall>(this);
193 }
194 
Ref(const grpc_core::DebugLocation & location,const char * reason)195 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(
196     const grpc_core::DebugLocation& location, const char* reason) {
197   IncrementRefCount(location, reason);
198   return RefCountedPtr<SubchannelCall>(this);
199 }
200 
Unref()201 void SubchannelCall::Unref() {
202   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
203 }
204 
Unref(const DebugLocation &,const char * reason)205 void SubchannelCall::Unref(const DebugLocation& /*location*/,
206                            const char* reason) {
207   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
208 }
209 
Destroy(void * arg,grpc_error *)210 void SubchannelCall::Destroy(void* arg, grpc_error* /*error*/) {
211   GPR_TIMER_SCOPE("subchannel_call_destroy", 0);
212   SubchannelCall* self = static_cast<SubchannelCall*>(arg);
213   // Keep some members before destroying the subchannel call.
214   grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
215   RefCountedPtr<ConnectedSubchannel> connected_subchannel =
216       std::move(self->connected_subchannel_);
217   // Destroy the subchannel call.
218   self->~SubchannelCall();
219   // Destroy the call stack. This should be after destroying the subchannel
220   // call, because call->after_call_stack_destroy(), if not null, will free the
221   // call arena.
222   grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
223                           after_call_stack_destroy);
224   // Automatically reset connected_subchannel. This should be after destroying
225   // the call stack, because destroying call stack needs access to the channel
226   // stack.
227 }
228 
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)229 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
230     grpc_transport_stream_op_batch* batch) {
231   // only intercept payloads with recv trailing.
232   if (!batch->recv_trailing_metadata) {
233     return;
234   }
235   // only add interceptor is channelz is enabled.
236   if (connected_subchannel_->channelz_subchannel() == nullptr) {
237     return;
238   }
239   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
240                     this, grpc_schedule_on_exec_ctx);
241   // save some state needed for the interception callback.
242   GPR_ASSERT(recv_trailing_metadata_ == nullptr);
243   recv_trailing_metadata_ =
244       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
245   original_recv_trailing_metadata_ =
246       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
247   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
248       &recv_trailing_metadata_ready_;
249 }
250 
251 namespace {
252 
253 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,grpc_millis deadline,grpc_metadata_batch * md_batch,grpc_error * error)254 void GetCallStatus(grpc_status_code* status, grpc_millis deadline,
255                    grpc_metadata_batch* md_batch, grpc_error* error) {
256   if (error != GRPC_ERROR_NONE) {
257     grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
258   } else {
259     if (md_batch->idx.named.grpc_status != nullptr) {
260       *status = grpc_get_status_code_from_metadata(
261           md_batch->idx.named.grpc_status->md);
262     } else {
263       *status = GRPC_STATUS_UNKNOWN;
264     }
265   }
266   GRPC_ERROR_UNREF(error);
267 }
268 
269 }  // namespace
270 
RecvTrailingMetadataReady(void * arg,grpc_error * error)271 void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
272   SubchannelCall* call = static_cast<SubchannelCall*>(arg);
273   GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
274   grpc_status_code status = GRPC_STATUS_OK;
275   GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_,
276                 GRPC_ERROR_REF(error));
277   channelz::SubchannelNode* channelz_subchannel =
278       call->connected_subchannel_->channelz_subchannel();
279   GPR_ASSERT(channelz_subchannel != nullptr);
280   if (status == GRPC_STATUS_OK) {
281     channelz_subchannel->RecordCallSucceeded();
282   } else {
283     channelz_subchannel->RecordCallFailed();
284   }
285   Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
286                GRPC_ERROR_REF(error));
287 }
288 
IncrementRefCount()289 void SubchannelCall::IncrementRefCount() {
290   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
291 }
292 
IncrementRefCount(const grpc_core::DebugLocation &,const char * reason)293 void SubchannelCall::IncrementRefCount(
294     const grpc_core::DebugLocation& /*location*/, const char* reason) {
295   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
296 }
297 
298 //
299 // Subchannel::ConnectedSubchannelStateWatcher
300 //
301 
302 class Subchannel::ConnectedSubchannelStateWatcher
303     : public AsyncConnectivityStateWatcherInterface {
304  public:
305   // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(Subchannel * c)306   explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) {
307     // Steal subchannel ref for connecting.
308     GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
309     GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
310   }
311 
~ConnectedSubchannelStateWatcher()312   ~ConnectedSubchannelStateWatcher() override {
313     GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
314   }
315 
316  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)317   void OnConnectivityStateChange(grpc_connectivity_state new_state,
318                                  const absl::Status& status) override {
319     Subchannel* c = subchannel_;
320     MutexLock lock(&c->mu_);
321     switch (new_state) {
322       case GRPC_CHANNEL_TRANSIENT_FAILURE:
323       case GRPC_CHANNEL_SHUTDOWN: {
324         if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
325           if (grpc_trace_subchannel.enabled()) {
326             gpr_log(GPR_INFO,
327                     "Connected subchannel %p of subchannel %p has gone into "
328                     "%s. Attempting to reconnect.",
329                     c->connected_subchannel_.get(), c,
330                     ConnectivityStateName(new_state));
331           }
332           c->connected_subchannel_.reset();
333           if (c->channelz_node() != nullptr) {
334             c->channelz_node()->SetChildSocket(nullptr);
335           }
336           // We need to construct our own status if the underlying state was
337           // shutdown since the accompanying status will be StatusCode::OK
338           // otherwise.
339           c->SetConnectivityStateLocked(
340               GRPC_CHANNEL_TRANSIENT_FAILURE,
341               new_state == GRPC_CHANNEL_SHUTDOWN
342                   ? absl::Status(absl::StatusCode::kUnavailable,
343                                  "Subchannel has disconnected.")
344                   : status);
345           c->backoff_begun_ = false;
346           c->backoff_.Reset();
347         }
348         break;
349       }
350       default: {
351         // In principle, this should never happen.  We should not get
352         // a callback for READY, because that was the state we started
353         // this watch from.  And a connected subchannel should never go
354         // from READY to CONNECTING or IDLE.
355         c->SetConnectivityStateLocked(new_state, status);
356       }
357     }
358   }
359 
360   Subchannel* subchannel_;
361 };
362 
363 // Asynchronously notifies the \a watcher of a change in the connectvity state
364 // of \a subchannel to the current \a state. Deletes itself when done.
365 class Subchannel::AsyncWatcherNotifierLocked {
366  public:
AsyncWatcherNotifierLocked(RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)367   AsyncWatcherNotifierLocked(
368       RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,
369       Subchannel* subchannel, grpc_connectivity_state state,
370       const absl::Status& status)
371       : watcher_(std::move(watcher)) {
372     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
373     if (state == GRPC_CHANNEL_READY) {
374       connected_subchannel = subchannel->connected_subchannel_;
375     }
376     watcher_->PushConnectivityStateChange(
377         {state, status, std::move(connected_subchannel)});
378     ExecCtx::Run(DEBUG_LOCATION,
379                  GRPC_CLOSURE_INIT(
380                      &closure_,
381                      [](void* arg, grpc_error* /*error*/) {
382                        auto* self =
383                            static_cast<AsyncWatcherNotifierLocked*>(arg);
384                        self->watcher_->OnConnectivityStateChange();
385                        delete self;
386                      },
387                      this, nullptr),
388                  GRPC_ERROR_NONE);
389   }
390 
391  private:
392   RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher_;
393   grpc_closure closure_;
394 };
395 
396 //
397 // Subchannel::ConnectivityStateWatcherList
398 //
399 
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)400 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
401     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
402   watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
403 }
404 
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)405 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
406     ConnectivityStateWatcherInterface* watcher) {
407   watchers_.erase(watcher);
408 }
409 
NotifyLocked(Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)410 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
411     Subchannel* subchannel, grpc_connectivity_state state,
412     const absl::Status& status) {
413   for (const auto& p : watchers_) {
414     new AsyncWatcherNotifierLocked(p.second, subchannel, state, status);
415   }
416 }
417 
418 //
419 // Subchannel::HealthWatcherMap::HealthWatcher
420 //
421 
422 // State needed for tracking the connectivity state with a particular
423 // health check service name.
424 class Subchannel::HealthWatcherMap::HealthWatcher
425     : public AsyncConnectivityStateWatcherInterface {
426  public:
HealthWatcher(Subchannel * c,std::string health_check_service_name,grpc_connectivity_state subchannel_state)427   HealthWatcher(Subchannel* c, std::string health_check_service_name,
428                 grpc_connectivity_state subchannel_state)
429       : subchannel_(c),
430         health_check_service_name_(std::move(health_check_service_name)),
431         state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
432                                                       : subchannel_state) {
433     GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher");
434     // If the subchannel is already connected, start health checking.
435     if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
436   }
437 
~HealthWatcher()438   ~HealthWatcher() override {
439     GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher");
440   }
441 
health_check_service_name() const442   const std::string& health_check_service_name() const {
443     return health_check_service_name_;
444   }
445 
state() const446   grpc_connectivity_state state() const { return state_; }
447 
AddWatcherLocked(grpc_connectivity_state initial_state,RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher)448   void AddWatcherLocked(
449       grpc_connectivity_state initial_state,
450       RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) {
451     if (state_ != initial_state) {
452       new AsyncWatcherNotifierLocked(watcher, subchannel_, state_, status_);
453     }
454     watcher_list_.AddWatcherLocked(std::move(watcher));
455   }
456 
RemoveWatcherLocked(Subchannel::ConnectivityStateWatcherInterface * watcher)457   void RemoveWatcherLocked(
458       Subchannel::ConnectivityStateWatcherInterface* watcher) {
459     watcher_list_.RemoveWatcherLocked(watcher);
460   }
461 
HasWatchers() const462   bool HasWatchers() const { return !watcher_list_.empty(); }
463 
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)464   void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) {
465     if (state == GRPC_CHANNEL_READY) {
466       // If we had not already notified for CONNECTING state, do so now.
467       // (We may have missed this earlier, because if the transition
468       // from IDLE to CONNECTING to READY was too quick, the connected
469       // subchannel may not have sent us a notification for CONNECTING.)
470       if (state_ != GRPC_CHANNEL_CONNECTING) {
471         state_ = GRPC_CHANNEL_CONNECTING;
472         status_ = status;
473         watcher_list_.NotifyLocked(subchannel_, state_, status);
474       }
475       // If we've become connected, start health checking.
476       StartHealthCheckingLocked();
477     } else {
478       state_ = state;
479       status_ = status;
480       watcher_list_.NotifyLocked(subchannel_, state_, status);
481       // We're not connected, so stop health checking.
482       health_check_client_.reset();
483     }
484   }
485 
Orphan()486   void Orphan() override {
487     watcher_list_.Clear();
488     health_check_client_.reset();
489     Unref();
490   }
491 
492  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)493   void OnConnectivityStateChange(grpc_connectivity_state new_state,
494                                  const absl::Status& status) override {
495     MutexLock lock(&subchannel_->mu_);
496     if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) {
497       state_ = new_state;
498       status_ = status;
499       watcher_list_.NotifyLocked(subchannel_, new_state, status);
500     }
501   }
502 
StartHealthCheckingLocked()503   void StartHealthCheckingLocked() {
504     GPR_ASSERT(health_check_client_ == nullptr);
505     health_check_client_ = MakeOrphanable<HealthCheckClient>(
506         health_check_service_name_, subchannel_->connected_subchannel_,
507         subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
508   }
509 
510   Subchannel* subchannel_;
511   std::string health_check_service_name_;
512   OrphanablePtr<HealthCheckClient> health_check_client_;
513   grpc_connectivity_state state_;
514   absl::Status status_;
515   ConnectivityStateWatcherList watcher_list_;
516 };
517 
518 //
519 // Subchannel::HealthWatcherMap
520 //
521 
AddWatcherLocked(Subchannel * subchannel,grpc_connectivity_state initial_state,const std::string & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)522 void Subchannel::HealthWatcherMap::AddWatcherLocked(
523     Subchannel* subchannel, grpc_connectivity_state initial_state,
524     const std::string& health_check_service_name,
525     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
526   // If the health check service name is not already present in the map,
527   // add it.
528   auto it = map_.find(health_check_service_name);
529   HealthWatcher* health_watcher;
530   if (it == map_.end()) {
531     auto w = MakeOrphanable<HealthWatcher>(
532         subchannel, health_check_service_name, subchannel->state_);
533     health_watcher = w.get();
534     map_.emplace(health_check_service_name, std::move(w));
535   } else {
536     health_watcher = it->second.get();
537   }
538   // Add the watcher to the entry.
539   health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
540 }
541 
RemoveWatcherLocked(const std::string & health_check_service_name,ConnectivityStateWatcherInterface * watcher)542 void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
543     const std::string& health_check_service_name,
544     ConnectivityStateWatcherInterface* watcher) {
545   auto it = map_.find(health_check_service_name);
546   GPR_ASSERT(it != map_.end());
547   it->second->RemoveWatcherLocked(watcher);
548   // If we just removed the last watcher for this service name, remove
549   // the map entry.
550   if (!it->second->HasWatchers()) map_.erase(it);
551 }
552 
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)553 void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
554                                                 const absl::Status& status) {
555   for (const auto& p : map_) {
556     p.second->NotifyLocked(state, status);
557   }
558 }
559 
560 grpc_connectivity_state
CheckConnectivityStateLocked(Subchannel * subchannel,const std::string & health_check_service_name)561 Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
562     Subchannel* subchannel, const std::string& health_check_service_name) {
563   auto it = map_.find(health_check_service_name);
564   if (it == map_.end()) {
565     // If the health check service name is not found in the map, we're
566     // not currently doing a health check for that service name.  If the
567     // subchannel's state without health checking is READY, report
568     // CONNECTING, since that's what we'd be in as soon as we do start a
569     // watch.  Otherwise, report the channel's state without health checking.
570     return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
571                                                     : subchannel->state_;
572   }
573   HealthWatcher* health_watcher = it->second.get();
574   return health_watcher->state();
575 }
576 
ShutdownLocked()577 void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
578 
579 //
580 // Subchannel
581 //
582 
583 namespace {
584 
ParseArgsForBackoffValues(const grpc_channel_args * args,grpc_millis * min_connect_timeout_ms)585 BackOff::Options ParseArgsForBackoffValues(
586     const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) {
587   grpc_millis initial_backoff_ms =
588       GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
589   *min_connect_timeout_ms =
590       GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
591   grpc_millis max_backoff_ms =
592       GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
593   bool fixed_reconnect_backoff = false;
594   if (args != nullptr) {
595     for (size_t i = 0; i < args->num_args; i++) {
596       if (0 == strcmp(args->args[i].key,
597                       "grpc.testing.fixed_reconnect_backoff_ms")) {
598         fixed_reconnect_backoff = true;
599         initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
600             grpc_channel_arg_get_integer(
601                 &args->args[i],
602                 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
603       } else if (0 ==
604                  strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
605         fixed_reconnect_backoff = false;
606         *min_connect_timeout_ms = grpc_channel_arg_get_integer(
607             &args->args[i],
608             {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
609       } else if (0 ==
610                  strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
611         fixed_reconnect_backoff = false;
612         max_backoff_ms = grpc_channel_arg_get_integer(
613             &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
614       } else if (0 == strcmp(args->args[i].key,
615                              GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
616         fixed_reconnect_backoff = false;
617         initial_backoff_ms = grpc_channel_arg_get_integer(
618             &args->args[i],
619             {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
620       }
621     }
622   }
623   return BackOff::Options()
624       .set_initial_backoff(initial_backoff_ms)
625       .set_multiplier(fixed_reconnect_backoff
626                           ? 1.0
627                           : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
628       .set_jitter(fixed_reconnect_backoff ? 0.0
629                                           : GRPC_SUBCHANNEL_RECONNECT_JITTER)
630       .set_max_backoff(max_backoff_ms);
631 }
632 
633 }  // namespace
634 
PushConnectivityStateChange(ConnectivityStateChange state_change)635 void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
636     ConnectivityStateChange state_change) {
637   MutexLock lock(&mu_);
638   connectivity_state_queue_.push_back(std::move(state_change));
639 }
640 
641 Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
PopConnectivityStateChange()642 Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
643   MutexLock lock(&mu_);
644   GPR_ASSERT(!connectivity_state_queue_.empty());
645   ConnectivityStateChange state_change = connectivity_state_queue_.front();
646   connectivity_state_queue_.pop_front();
647   return state_change;
648 }
649 
Subchannel(SubchannelKey * key,OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)650 Subchannel::Subchannel(SubchannelKey* key,
651                        OrphanablePtr<SubchannelConnector> connector,
652                        const grpc_channel_args* args)
653     : key_(key),
654       connector_(std::move(connector)),
655       backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
656   GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
657   gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS);
658   pollset_set_ = grpc_pollset_set_create();
659   grpc_resolved_address* addr =
660       static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
661   GetAddressFromSubchannelAddressArg(args, addr);
662   grpc_resolved_address* new_address = nullptr;
663   grpc_channel_args* new_args = nullptr;
664   if (ProxyMapperRegistry::MapAddress(*addr, args, &new_address, &new_args)) {
665     GPR_ASSERT(new_address != nullptr);
666     gpr_free(addr);
667     addr = new_address;
668   }
669   static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
670   grpc_arg new_arg = CreateSubchannelAddressArg(addr);
671   gpr_free(addr);
672   args_ = grpc_channel_args_copy_and_add_and_remove(
673       new_args != nullptr ? new_args : args, keys_to_remove,
674       GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
675   gpr_free(new_arg.value.string);
676   if (new_args != nullptr) grpc_channel_args_destroy(new_args);
677   GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
678                     grpc_schedule_on_exec_ctx);
679   const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
680   const bool channelz_enabled =
681       grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
682   arg = grpc_channel_args_find(
683       args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
684   const grpc_integer_options options = {
685       GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
686   size_t channel_tracer_max_memory =
687       static_cast<size_t>(grpc_channel_arg_get_integer(arg, options));
688   if (channelz_enabled) {
689     channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
690         GetTargetAddress(), channel_tracer_max_memory);
691     channelz_node_->AddTraceEvent(
692         channelz::ChannelTrace::Severity::Info,
693         grpc_slice_from_static_string("subchannel created"));
694   }
695 }
696 
~Subchannel()697 Subchannel::~Subchannel() {
698   if (channelz_node_ != nullptr) {
699     channelz_node_->AddTraceEvent(
700         channelz::ChannelTrace::Severity::Info,
701         grpc_slice_from_static_string("Subchannel destroyed"));
702     channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
703   }
704   grpc_channel_args_destroy(args_);
705   connector_.reset();
706   grpc_pollset_set_destroy(pollset_set_);
707   delete key_;
708 }
709 
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)710 Subchannel* Subchannel::Create(OrphanablePtr<SubchannelConnector> connector,
711                                const grpc_channel_args* args) {
712   SubchannelKey* key = new SubchannelKey(args);
713   SubchannelPoolInterface* subchannel_pool =
714       SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args);
715   GPR_ASSERT(subchannel_pool != nullptr);
716   Subchannel* c = subchannel_pool->FindSubchannel(key);
717   if (c != nullptr) {
718     delete key;
719     return c;
720   }
721   c = new Subchannel(key, std::move(connector), args);
722   // Try to register the subchannel before setting the subchannel pool.
723   // Otherwise, in case of a registration race, unreffing c in
724   // RegisterSubchannel() will cause c to be tried to be unregistered, while
725   // its key maps to a different subchannel.
726   Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c);
727   if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
728   return registered;
729 }
730 
ThrottleKeepaliveTime(int new_keepalive_time)731 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
732   MutexLock lock(&mu_);
733   // Only update the value if the new keepalive time is larger.
734   if (new_keepalive_time > keepalive_time_) {
735     keepalive_time_ = new_keepalive_time;
736     if (grpc_trace_subchannel.enabled()) {
737       gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this,
738               new_keepalive_time);
739     }
740     const grpc_arg arg_to_add = grpc_channel_arg_integer_create(
741         const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time);
742     const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS;
743     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
744         args_, &arg_to_remove, 1, &arg_to_add, 1);
745     grpc_channel_args_destroy(args_);
746     args_ = new_args;
747   }
748 }
749 
Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)750 Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
751   gpr_atm old_refs;
752   old_refs = RefMutate((1 << INTERNAL_REF_BITS),
753                        0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF"));
754   GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
755   return this;
756 }
757 
Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)758 void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
759   gpr_atm old_refs;
760   // add a weak ref and subtract a strong ref (atomically)
761   old_refs = RefMutate(
762       static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS),
763       1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF"));
764   if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
765     Disconnect();
766   }
767   GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref");
768 }
769 
WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)770 Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
771   gpr_atm old_refs;
772   old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF"));
773   GPR_ASSERT(old_refs != 0);
774   return this;
775 }
776 
777 namespace {
778 
subchannel_destroy(void * arg,grpc_error *)779 void subchannel_destroy(void* arg, grpc_error* /*error*/) {
780   Subchannel* self = static_cast<Subchannel*>(arg);
781   delete self;
782 }
783 
784 }  // namespace
785 
WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)786 void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
787   gpr_atm old_refs;
788   old_refs = RefMutate(-static_cast<gpr_atm>(1),
789                        1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF"));
790   if (old_refs == 1) {
791     ExecCtx::Run(DEBUG_LOCATION,
792                  GRPC_CLOSURE_CREATE(subchannel_destroy, this,
793                                      grpc_schedule_on_exec_ctx),
794                  GRPC_ERROR_NONE);
795   }
796 }
797 
RefFromWeakRef()798 Subchannel* Subchannel::RefFromWeakRef() {
799   for (;;) {
800     gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_);
801     if (old_refs >= (1 << INTERNAL_REF_BITS)) {
802       gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
803       if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) {
804         return this;
805       }
806     } else {
807       return nullptr;
808     }
809   }
810 }
811 
GetTargetAddress()812 const char* Subchannel::GetTargetAddress() {
813   const grpc_arg* addr_arg =
814       grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS);
815   const char* addr_str = grpc_channel_arg_get_string(addr_arg);
816   GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
817   return addr_str;
818 }
819 
channelz_node()820 channelz::SubchannelNode* Subchannel::channelz_node() {
821   return channelz_node_.get();
822 }
823 
CheckConnectivityState(const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectedSubchannel> * connected_subchannel)824 grpc_connectivity_state Subchannel::CheckConnectivityState(
825     const absl::optional<std::string>& health_check_service_name,
826     RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
827   MutexLock lock(&mu_);
828   grpc_connectivity_state state;
829   if (!health_check_service_name.has_value()) {
830     state = state_;
831   } else {
832     state = health_watcher_map_.CheckConnectivityStateLocked(
833         this, *health_check_service_name);
834   }
835   if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
836     *connected_subchannel = connected_subchannel_;
837   }
838   return state;
839 }
840 
WatchConnectivityState(grpc_connectivity_state initial_state,const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)841 void Subchannel::WatchConnectivityState(
842     grpc_connectivity_state initial_state,
843     const absl::optional<std::string>& health_check_service_name,
844     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
845   MutexLock lock(&mu_);
846   grpc_pollset_set* interested_parties = watcher->interested_parties();
847   if (interested_parties != nullptr) {
848     grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
849   }
850   if (!health_check_service_name.has_value()) {
851     if (state_ != initial_state) {
852       new AsyncWatcherNotifierLocked(watcher, this, state_, status_);
853     }
854     watcher_list_.AddWatcherLocked(std::move(watcher));
855   } else {
856     health_watcher_map_.AddWatcherLocked(
857         this, initial_state, *health_check_service_name, std::move(watcher));
858   }
859 }
860 
CancelConnectivityStateWatch(const absl::optional<std::string> & health_check_service_name,ConnectivityStateWatcherInterface * watcher)861 void Subchannel::CancelConnectivityStateWatch(
862     const absl::optional<std::string>& health_check_service_name,
863     ConnectivityStateWatcherInterface* watcher) {
864   MutexLock lock(&mu_);
865   grpc_pollset_set* interested_parties = watcher->interested_parties();
866   if (interested_parties != nullptr) {
867     grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
868   }
869   if (!health_check_service_name.has_value()) {
870     watcher_list_.RemoveWatcherLocked(watcher);
871   } else {
872     health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
873                                             watcher);
874   }
875 }
876 
AttemptToConnect()877 void Subchannel::AttemptToConnect() {
878   MutexLock lock(&mu_);
879   MaybeStartConnectingLocked();
880 }
881 
ResetBackoff()882 void Subchannel::ResetBackoff() {
883   MutexLock lock(&mu_);
884   backoff_.Reset();
885   if (have_retry_alarm_) {
886     retry_immediately_ = true;
887     grpc_timer_cancel(&retry_alarm_);
888   } else {
889     backoff_begun_ = false;
890     MaybeStartConnectingLocked();
891   }
892 }
893 
CreateSubchannelAddressArg(const grpc_resolved_address * addr)894 grpc_arg Subchannel::CreateSubchannelAddressArg(
895     const grpc_resolved_address* addr) {
896   return grpc_channel_arg_string_create(
897       const_cast<char*>(GRPC_ARG_SUBCHANNEL_ADDRESS),
898       gpr_strdup(addr->len > 0 ? grpc_sockaddr_to_uri(addr).c_str() : ""));
899 }
900 
GetUriFromSubchannelAddressArg(const grpc_channel_args * args)901 const char* Subchannel::GetUriFromSubchannelAddressArg(
902     const grpc_channel_args* args) {
903   const grpc_arg* addr_arg =
904       grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
905   const char* addr_str = grpc_channel_arg_get_string(addr_arg);
906   GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
907   return addr_str;
908 }
909 
910 namespace {
911 
UriToSockaddr(const char * uri_str,grpc_resolved_address * addr)912 void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) {
913   absl::StatusOr<URI> uri = URI::Parse(uri_str);
914   if (!uri.ok()) {
915     gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
916     GPR_ASSERT(uri.ok());
917   }
918   if (!grpc_parse_uri(*uri, addr)) memset(addr, 0, sizeof(*addr));
919 }
920 
921 }  // namespace
922 
GetAddressFromSubchannelAddressArg(const grpc_channel_args * args,grpc_resolved_address * addr)923 void Subchannel::GetAddressFromSubchannelAddressArg(
924     const grpc_channel_args* args, grpc_resolved_address* addr) {
925   const char* addr_uri_str = GetUriFromSubchannelAddressArg(args);
926   memset(addr, 0, sizeof(*addr));
927   if (*addr_uri_str != '\0') {
928     UriToSockaddr(addr_uri_str, addr);
929   }
930 }
931 
932 namespace {
933 
934 // Returns a string indicating the subchannel's connectivity state change to
935 // \a state.
SubchannelConnectivityStateChangeString(grpc_connectivity_state state)936 const char* SubchannelConnectivityStateChangeString(
937     grpc_connectivity_state state) {
938   switch (state) {
939     case GRPC_CHANNEL_IDLE:
940       return "Subchannel state change to IDLE";
941     case GRPC_CHANNEL_CONNECTING:
942       return "Subchannel state change to CONNECTING";
943     case GRPC_CHANNEL_READY:
944       return "Subchannel state change to READY";
945     case GRPC_CHANNEL_TRANSIENT_FAILURE:
946       return "Subchannel state change to TRANSIENT_FAILURE";
947     case GRPC_CHANNEL_SHUTDOWN:
948       return "Subchannel state change to SHUTDOWN";
949   }
950   GPR_UNREACHABLE_CODE(return "UNKNOWN");
951 }
952 
953 }  // namespace
954 
955 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)956 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
957                                             const absl::Status& status) {
958   state_ = state;
959   status_ = status;
960   if (channelz_node_ != nullptr) {
961     channelz_node_->UpdateConnectivityState(state);
962     channelz_node_->AddTraceEvent(
963         channelz::ChannelTrace::Severity::Info,
964         grpc_slice_from_static_string(
965             SubchannelConnectivityStateChangeString(state)));
966   }
967   // Notify non-health watchers.
968   watcher_list_.NotifyLocked(this, state, status);
969   // Notify health watchers.
970   health_watcher_map_.NotifyLocked(state, status);
971 }
972 
MaybeStartConnectingLocked()973 void Subchannel::MaybeStartConnectingLocked() {
974   if (disconnected_) {
975     // Don't try to connect if we're already disconnected.
976     return;
977   }
978   if (connecting_) {
979     // Already connecting: don't restart.
980     return;
981   }
982   if (connected_subchannel_ != nullptr) {
983     // Already connected: don't restart.
984     return;
985   }
986   connecting_ = true;
987   GRPC_SUBCHANNEL_WEAK_REF(this, "connecting");
988   if (!backoff_begun_) {
989     backoff_begun_ = true;
990     ContinueConnectingLocked();
991   } else {
992     GPR_ASSERT(!have_retry_alarm_);
993     have_retry_alarm_ = true;
994     const grpc_millis time_til_next =
995         next_attempt_deadline_ - ExecCtx::Get()->Now();
996     if (time_til_next <= 0) {
997       gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this);
998     } else {
999       gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds",
1000               this, time_til_next);
1001     }
1002     GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this,
1003                       grpc_schedule_on_exec_ctx);
1004     grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_);
1005   }
1006 }
1007 
OnRetryAlarm(void * arg,grpc_error * error)1008 void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
1009   Subchannel* c = static_cast<Subchannel*>(arg);
1010   // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use
1011   //                 MutexLock instead of ReleasableMutexLock, here.
1012   ReleasableMutexLock lock(&c->mu_);
1013   c->have_retry_alarm_ = false;
1014   if (c->disconnected_) {
1015     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
1016                                                              &error, 1);
1017   } else if (c->retry_immediately_) {
1018     c->retry_immediately_ = false;
1019     error = GRPC_ERROR_NONE;
1020   } else {
1021     GRPC_ERROR_REF(error);
1022   }
1023   if (error == GRPC_ERROR_NONE) {
1024     gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
1025     c->ContinueConnectingLocked();
1026     lock.Unlock();
1027   } else {
1028     lock.Unlock();
1029     GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
1030   }
1031   GRPC_ERROR_UNREF(error);
1032 }
1033 
ContinueConnectingLocked()1034 void Subchannel::ContinueConnectingLocked() {
1035   SubchannelConnector::Args args;
1036   args.interested_parties = pollset_set_;
1037   const grpc_millis min_deadline =
1038       min_connect_timeout_ms_ + ExecCtx::Get()->Now();
1039   next_attempt_deadline_ = backoff_.NextAttemptTime();
1040   args.deadline = std::max(next_attempt_deadline_, min_deadline);
1041   args.channel_args = args_;
1042   SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status());
1043   connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
1044 }
1045 
OnConnectingFinished(void * arg,grpc_error * error)1046 void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
1047   auto* c = static_cast<Subchannel*>(arg);
1048   const grpc_channel_args* delete_channel_args =
1049       c->connecting_result_.channel_args;
1050   GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished");
1051   {
1052     MutexLock lock(&c->mu_);
1053     c->connecting_ = false;
1054     if (c->connecting_result_.transport != nullptr &&
1055         c->PublishTransportLocked()) {
1056       // Do nothing, transport was published.
1057     } else if (c->disconnected_) {
1058       GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
1059     } else {
1060       gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error));
1061       c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
1062                                     grpc_error_to_absl_status(error));
1063       GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
1064     }
1065   }
1066   GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished");
1067   grpc_channel_args_destroy(delete_channel_args);
1068 }
1069 
1070 namespace {
1071 
ConnectionDestroy(void * arg,grpc_error *)1072 void ConnectionDestroy(void* arg, grpc_error* /*error*/) {
1073   grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
1074   grpc_channel_stack_destroy(stk);
1075   gpr_free(stk);
1076 }
1077 
1078 }  // namespace
1079 
PublishTransportLocked()1080 bool Subchannel::PublishTransportLocked() {
1081   // Construct channel stack.
1082   grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
1083   grpc_channel_stack_builder_set_channel_arguments(
1084       builder, connecting_result_.channel_args);
1085   grpc_channel_stack_builder_set_transport(builder,
1086                                            connecting_result_.transport);
1087   if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
1088     grpc_channel_stack_builder_destroy(builder);
1089     return false;
1090   }
1091   grpc_channel_stack* stk;
1092   grpc_error* error = grpc_channel_stack_builder_finish(
1093       builder, 0, 1, ConnectionDestroy, nullptr,
1094       reinterpret_cast<void**>(&stk));
1095   if (error != GRPC_ERROR_NONE) {
1096     grpc_transport_destroy(connecting_result_.transport);
1097     gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
1098             grpc_error_string(error));
1099     GRPC_ERROR_UNREF(error);
1100     return false;
1101   }
1102   RefCountedPtr<channelz::SocketNode> socket =
1103       std::move(connecting_result_.socket_node);
1104   connecting_result_.Reset();
1105   if (disconnected_) {
1106     grpc_channel_stack_destroy(stk);
1107     gpr_free(stk);
1108     return false;
1109   }
1110   // Publish.
1111   connected_subchannel_.reset(
1112       new ConnectedSubchannel(stk, args_, channelz_node_));
1113   gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
1114           connected_subchannel_.get(), this);
1115   if (channelz_node_ != nullptr) {
1116     channelz_node_->SetChildSocket(std::move(socket));
1117   }
1118   // Start watching connected subchannel.
1119   connected_subchannel_->StartWatch(
1120       pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(this));
1121   // Report initial state.
1122   SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
1123   return true;
1124 }
1125 
Disconnect()1126 void Subchannel::Disconnect() {
1127   // The subchannel_pool is only used once here in this subchannel, so the
1128   // access can be outside of the lock.
1129   if (subchannel_pool_ != nullptr) {
1130     subchannel_pool_->UnregisterSubchannel(key_);
1131     subchannel_pool_.reset();
1132   }
1133   MutexLock lock(&mu_);
1134   GPR_ASSERT(!disconnected_);
1135   disconnected_ = true;
1136   connector_.reset();
1137   connected_subchannel_.reset();
1138   health_watcher_map_.ShutdownLocked();
1139 }
1140 
RefMutate(gpr_atm delta,int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS)1141 gpr_atm Subchannel::RefMutate(
1142     gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) {
1143   gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta)
1144                             : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta);
1145 #ifndef NDEBUG
1146   if (grpc_trace_subchannel_refcount.enabled()) {
1147     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
1148             "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this,
1149             purpose, old_val, old_val + delta, reason);
1150   }
1151 #endif
1152   return old_val;
1153 }
1154 
1155 }  // namespace grpc_core
1156