1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <cstring>
20 
21 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
22 
23 #include "absl/strings/str_cat.h"
24 
25 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
26 
27 namespace grpc_core {
28 
29 //
30 // ChildPolicyHandler::Helper
31 //
32 
33 class ChildPolicyHandler::Helper
34     : public LoadBalancingPolicy::ChannelControlHelper {
35  public:
Helper(RefCountedPtr<ChildPolicyHandler> parent)36   explicit Helper(RefCountedPtr<ChildPolicyHandler> parent)
37       : parent_(std::move(parent)) {}
38 
~Helper()39   ~Helper() override { parent_.reset(DEBUG_LOCATION, "Helper"); }
40 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)41   RefCountedPtr<SubchannelInterface> CreateSubchannel(
42       ServerAddress address, const grpc_channel_args& args) override {
43     if (parent_->shutting_down_) return nullptr;
44     if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
45     return parent_->channel_control_helper()->CreateSubchannel(
46         std::move(address), args);
47   }
48 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)49   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
50                    std::unique_ptr<SubchannelPicker> picker) override {
51     if (parent_->shutting_down_) return;
52     // If this request is from the pending child policy, ignore it until
53     // it reports something other than CONNECTING, at which point we swap it
54     // into place.
55     if (CalledByPendingChild()) {
56       if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
57         gpr_log(GPR_INFO,
58                 "[child_policy_handler %p] helper %p: pending child policy %p "
59                 "reports state=%s (%s)",
60                 parent_.get(), this, child_, ConnectivityStateName(state),
61                 status.ToString().c_str());
62       }
63       if (state == GRPC_CHANNEL_CONNECTING) return;
64       grpc_pollset_set_del_pollset_set(
65           parent_->child_policy_->interested_parties(),
66           parent_->interested_parties());
67       parent_->child_policy_ = std::move(parent_->pending_child_policy_);
68     } else if (!CalledByCurrentChild()) {
69       // This request is from an outdated child, so ignore it.
70       return;
71     }
72     parent_->channel_control_helper()->UpdateState(state, status,
73                                                    std::move(picker));
74   }
75 
RequestReresolution()76   void RequestReresolution() override {
77     if (parent_->shutting_down_) return;
78     // Only forward re-resolution requests from the most recent child,
79     // since that's the one that will be receiving any update we receive
80     // from the resolver.
81     const LoadBalancingPolicy* latest_child_policy =
82         parent_->pending_child_policy_ != nullptr
83             ? parent_->pending_child_policy_.get()
84             : parent_->child_policy_.get();
85     if (child_ != latest_child_policy) return;
86     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
87       gpr_log(GPR_INFO, "[child_policy_handler %p] started name re-resolving",
88               parent_.get());
89     }
90     parent_->channel_control_helper()->RequestReresolution();
91   }
92 
AddTraceEvent(TraceSeverity severity,absl::string_view message)93   void AddTraceEvent(TraceSeverity severity,
94                      absl::string_view message) override {
95     if (parent_->shutting_down_) return;
96     if (!CalledByPendingChild() && !CalledByCurrentChild()) return;
97     parent_->channel_control_helper()->AddTraceEvent(severity, message);
98   }
99 
set_child(LoadBalancingPolicy * child)100   void set_child(LoadBalancingPolicy* child) { child_ = child; }
101 
102  private:
CalledByPendingChild() const103   bool CalledByPendingChild() const {
104     GPR_ASSERT(child_ != nullptr);
105     return child_ == parent_->pending_child_policy_.get();
106   }
107 
CalledByCurrentChild() const108   bool CalledByCurrentChild() const {
109     GPR_ASSERT(child_ != nullptr);
110     return child_ == parent_->child_policy_.get();
111   };
112 
113   RefCountedPtr<ChildPolicyHandler> parent_;
114   LoadBalancingPolicy* child_ = nullptr;
115 };
116 
117 //
118 // ChildPolicyHandler
119 //
120 
ShutdownLocked()121 void ChildPolicyHandler::ShutdownLocked() {
122   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
123     gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down", this);
124   }
125   shutting_down_ = true;
126   if (child_policy_ != nullptr) {
127     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
128       gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down lb_policy %p",
129               this, child_policy_.get());
130     }
131     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
132                                      interested_parties());
133     child_policy_.reset();
134   }
135   if (pending_child_policy_ != nullptr) {
136     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
137       gpr_log(GPR_INFO,
138               "[child_policy_handler %p] shutting down pending lb_policy %p",
139               this, pending_child_policy_.get());
140     }
141     grpc_pollset_set_del_pollset_set(
142         pending_child_policy_->interested_parties(), interested_parties());
143     pending_child_policy_.reset();
144   }
145 }
146 
UpdateLocked(UpdateArgs args)147 void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
148   // If the child policy name changes, we need to create a new child
149   // policy.  When this happens, we leave child_policy_ as-is and store
150   // the new child policy in pending_child_policy_.  Once the new child
151   // policy transitions into state READY, we swap it into child_policy_,
152   // replacing the original child policy.  So pending_child_policy_ is
153   // non-null only between when we apply an update that changes the child
154   // policy name and when the new child reports state READY.
155   //
156   // Updates can arrive at any point during this transition.  We always
157   // apply updates relative to the most recently created child policy,
158   // even if the most recent one is still in pending_child_policy_.  This
159   // is true both when applying the updates to an existing child policy
160   // and when determining whether we need to create a new policy.
161   //
162   // As a result of this, there are several cases to consider here:
163   //
164   // 1. We have no existing child policy (i.e., this is the first update
165   //    we receive after being created; in this case, both child_policy_
166   //    and pending_child_policy_ are null).  In this case, we create a
167   //    new child policy and store it in child_policy_.
168   //
169   // 2. We have an existing child policy and have no pending child policy
170   //    from a previous update (i.e., either there has not been a
171   //    previous update that changed the policy name, or we have already
172   //    finished swapping in the new policy; in this case, child_policy_
173   //    is non-null but pending_child_policy_ is null).  In this case:
174   //    a. If going from the current config to the new config does not
175   //       require a new policy, then we update the existing child policy.
176   //    b. If going from the current config to the new config does require a
177   //       new policy, we create a new policy.  The policy will be stored in
178   //       pending_child_policy_ and will later be swapped into
179   //       child_policy_ by the helper when the new child transitions
180   //       into state READY.
181   //
182   // 3. We have an existing child policy and have a pending child policy
183   //    from a previous update (i.e., a previous update set
184   //    pending_child_policy_ as per case 2b above and that policy has
185   //    not yet transitioned into state READY and been swapped into
186   //    child_policy_; in this case, both child_policy_ and
187   //    pending_child_policy_ are non-null).  In this case:
188   //    a. If going from the current config to the new config does not
189   //       require a new policy, then we update the existing pending
190   //       child policy.
191   //    b. If going from the current config to the new config does require a
192   //       new child policy, then we create a new policy.  The new
193   //       policy is stored in pending_child_policy_ (replacing the one
194   //       that was there before, which will be immediately shut down)
195   //       and will later be swapped into child_policy_ by the helper
196   //       when the new child transitions into state READY.
197   const bool create_policy =
198       // case 1
199       child_policy_ == nullptr ||
200       // cases 2b and 3b
201       ConfigChangeRequiresNewPolicyInstance(current_config_.get(),
202                                             args.config.get());
203   current_config_ = args.config;
204   LoadBalancingPolicy* policy_to_update = nullptr;
205   if (create_policy) {
206     // Cases 1, 2b, and 3b: create a new child policy.
207     // If child_policy_ is null, we set it (case 1), else we set
208     // pending_child_policy_ (cases 2b and 3b).
209     // TODO(roth): In cases 2b and 3b, we should start a timer here, so
210     // that there's an upper bound on the amount of time it takes us to
211     // switch to the new policy, even if the new policy stays in
212     // CONNECTING for a very long period of time.
213     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
214       gpr_log(GPR_INFO,
215               "[child_policy_handler %p] creating new %schild policy %s", this,
216               child_policy_ == nullptr ? "" : "pending ", args.config->name());
217     }
218     auto& lb_policy =
219         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
220     lb_policy = CreateChildPolicy(args.config->name(), *args.args);
221     policy_to_update = lb_policy.get();
222   } else {
223     // Cases 2a and 3a: update an existing policy.
224     // If we have a pending child policy, send the update to the pending
225     // policy (case 3a), else send it to the current policy (case 2a).
226     policy_to_update = pending_child_policy_ != nullptr
227                            ? pending_child_policy_.get()
228                            : child_policy_.get();
229   }
230   GPR_ASSERT(policy_to_update != nullptr);
231   // Update the policy.
232   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
233     gpr_log(GPR_INFO, "[child_policy_handler %p] updating %schild policy %p",
234             this,
235             policy_to_update == pending_child_policy_.get() ? "pending " : "",
236             policy_to_update);
237   }
238   policy_to_update->UpdateLocked(std::move(args));
239 }
240 
ExitIdleLocked()241 void ChildPolicyHandler::ExitIdleLocked() {
242   if (child_policy_ != nullptr) {
243     child_policy_->ExitIdleLocked();
244     if (pending_child_policy_ != nullptr) {
245       pending_child_policy_->ExitIdleLocked();
246     }
247   }
248 }
249 
ResetBackoffLocked()250 void ChildPolicyHandler::ResetBackoffLocked() {
251   if (child_policy_ != nullptr) {
252     child_policy_->ResetBackoffLocked();
253     if (pending_child_policy_ != nullptr) {
254       pending_child_policy_->ResetBackoffLocked();
255     }
256   }
257 }
258 
CreateChildPolicy(const char * child_policy_name,const grpc_channel_args & args)259 OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy(
260     const char* child_policy_name, const grpc_channel_args& args) {
261   Helper* helper = new Helper(Ref(DEBUG_LOCATION, "Helper"));
262   LoadBalancingPolicy::Args lb_policy_args;
263   lb_policy_args.work_serializer = work_serializer();
264   lb_policy_args.channel_control_helper =
265       std::unique_ptr<ChannelControlHelper>(helper);
266   lb_policy_args.args = &args;
267   OrphanablePtr<LoadBalancingPolicy> lb_policy =
268       CreateLoadBalancingPolicy(child_policy_name, std::move(lb_policy_args));
269   if (GPR_UNLIKELY(lb_policy == nullptr)) {
270     gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", child_policy_name);
271     return nullptr;
272   }
273   helper->set_child(lb_policy.get());
274   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
275     gpr_log(GPR_INFO,
276             "[child_policy_handler %p] created new LB policy \"%s\" (%p)", this,
277             child_policy_name, lb_policy.get());
278   }
279   channel_control_helper()->AddTraceEvent(
280       ChannelControlHelper::TRACE_INFO,
281       absl::StrCat("Created new LB policy \"", child_policy_name, "\""));
282   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
283                                    interested_parties());
284   return lb_policy;
285 }
286 
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const287 bool ChildPolicyHandler::ConfigChangeRequiresNewPolicyInstance(
288     LoadBalancingPolicy::Config* old_config,
289     LoadBalancingPolicy::Config* new_config) const {
290   return strcmp(old_config->name(), new_config->name()) != 0;
291 }
292 
293 OrphanablePtr<LoadBalancingPolicy>
CreateLoadBalancingPolicy(const char * name,LoadBalancingPolicy::Args args) const294 ChildPolicyHandler::CreateLoadBalancingPolicy(
295     const char* name, LoadBalancingPolicy::Args args) const {
296   return LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
297       name, std::move(args));
298 }
299 
300 }  // namespace grpc_core
301