1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers.  This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// The first time the policy gets a request for a pick, a ping, or to exit
30 /// the idle state, \a StartPickingLocked() is called. This method is
31 /// responsible for instantiating the internal *streaming* call to the LB
32 /// server (whichever address pick_first chose).  The call will be complete
33 /// when either the balancer sends status or when we cancel the call (e.g.,
34 /// because we are shutting down).  In needed, we retry the call.  If we
35 /// received at least one valid message from the server, a new call attempt
36 /// will be made immediately; otherwise, we apply back-off delays between
37 /// attempts.
38 ///
39 /// We maintain an internal round_robin policy instance for distributing
40 /// requests across backends.  Whenever we receive a new serverlist from
41 /// the balancer, we update the round_robin policy with the new list of
42 /// addresses.  If we cannot communicate with the balancer on startup,
43 /// however, we may enter fallback mode, in which case we will populate
44 /// the RR policy's addresses from the backend addresses returned by the
45 /// resolver.
46 ///
47 /// Once an RR policy instance is in place (and getting updated as described),
48 /// calls for a pick, a ping, or a cancellation will be serviced right
49 /// away by forwarding them to the RR instance.  Any time there's no RR
50 /// policy available (i.e., right after the creation of the gRPCLB policy),
51 /// pick and ping requests are added to a list of pending picks and pings
52 /// to be flushed and serviced when the RR policy instance becomes available.
53 ///
54 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
55 /// high level design and details.
56 
57 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
58 // using that endpoint. Because of various transitive includes in uv.h,
59 // including windows.h on Windows, uv.h must be included before other system
60 // headers. Therefore, sockaddr.h must always be included first.
61 #include <grpc/support/port_platform.h>
62 
63 #include "src/core/lib/iomgr/sockaddr.h"
64 #include "src/core/lib/iomgr/socket_utils.h"
65 
66 #include <inttypes.h>
67 #include <limits.h>
68 #include <string.h>
69 
70 #include <grpc/byte_buffer_reader.h>
71 #include <grpc/grpc.h>
72 #include <grpc/support/alloc.h>
73 #include <grpc/support/string_util.h>
74 #include <grpc/support/time.h>
75 
76 #include "src/core/ext/filters/client_channel/client_channel.h"
77 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
79 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
80 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
81 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
82 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
83 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
84 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
85 #include "src/core/ext/filters/client_channel/parse_address.h"
86 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
87 #include "src/core/ext/filters/client_channel/subchannel_index.h"
88 #include "src/core/lib/backoff/backoff.h"
89 #include "src/core/lib/channel/channel_args.h"
90 #include "src/core/lib/channel/channel_stack.h"
91 #include "src/core/lib/gpr/host_port.h"
92 #include "src/core/lib/gpr/string.h"
93 #include "src/core/lib/gprpp/manual_constructor.h"
94 #include "src/core/lib/gprpp/memory.h"
95 #include "src/core/lib/gprpp/mutex_lock.h"
96 #include "src/core/lib/gprpp/orphanable.h"
97 #include "src/core/lib/gprpp/ref_counted_ptr.h"
98 #include "src/core/lib/iomgr/combiner.h"
99 #include "src/core/lib/iomgr/sockaddr.h"
100 #include "src/core/lib/iomgr/sockaddr_utils.h"
101 #include "src/core/lib/iomgr/timer.h"
102 #include "src/core/lib/slice/slice_hash_table.h"
103 #include "src/core/lib/slice/slice_internal.h"
104 #include "src/core/lib/slice/slice_string_helpers.h"
105 #include "src/core/lib/surface/call.h"
106 #include "src/core/lib/surface/channel.h"
107 #include "src/core/lib/surface/channel_init.h"
108 #include "src/core/lib/transport/static_metadata.h"
109 
110 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
111 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
112 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
113 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
114 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
115 
116 namespace grpc_core {
117 
118 TraceFlag grpc_lb_glb_trace(false, "glb");
119 
120 namespace {
121 
122 class GrpcLb : public LoadBalancingPolicy {
123  public:
124   GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
125 
126   void UpdateLocked(const grpc_channel_args& args) override;
127   bool PickLocked(PickState* pick, grpc_error** error) override;
128   void CancelPickLocked(PickState* pick, grpc_error* error) override;
129   void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
130                                  uint32_t initial_metadata_flags_eq,
131                                  grpc_error* error) override;
132   void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
133                                  grpc_closure* closure) override;
134   grpc_connectivity_state CheckConnectivityLocked(
135       grpc_error** connectivity_error) override;
136   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
137   void ExitIdleLocked() override;
138   void ResetBackoffLocked() override;
139   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
140                                 ChildRefsList* child_channels) override;
141 
142  private:
143   /// Linked list of pending pick requests. It stores all information needed to
144   /// eventually call (Round Robin's) pick() on them. They mainly stay pending
145   /// waiting for the RR policy to be created.
146   ///
147   /// Note that when a pick is sent to the RR policy, we inject our own
148   /// on_complete callback, so that we can intercept the result before
149   /// invoking the original on_complete callback.  This allows us to set the
150   /// LB token metadata and add client_stats to the call context.
151   /// See \a pending_pick_complete() for details.
152   struct PendingPick {
153     // The grpclb instance that created the wrapping. This instance is not
154     // owned; reference counts are untouched. It's used only for logging
155     // purposes.
156     GrpcLb* grpclb_policy;
157     // The original pick.
158     PickState* pick;
159     // Our on_complete closure and the original one.
160     grpc_closure on_complete;
161     grpc_closure* original_on_complete;
162     // The LB token associated with the pick.  This is set via user_data in
163     // the pick.
164     grpc_mdelem lb_token;
165     // Stats for client-side load reporting.
166     RefCountedPtr<GrpcLbClientStats> client_stats;
167     // Next pending pick.
168     PendingPick* next = nullptr;
169   };
170 
171   /// Contains a call to the LB server and all the data related to the call.
172   class BalancerCallState
173       : public InternallyRefCountedWithTracing<BalancerCallState> {
174    public:
175     explicit BalancerCallState(
176         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
177 
178     // It's the caller's responsibility to ensure that Orphan() is called from
179     // inside the combiner.
180     void Orphan() override;
181 
182     void StartQuery();
183 
client_stats() const184     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
185 
seen_initial_response() const186     bool seen_initial_response() const { return seen_initial_response_; }
187 
188    private:
189     // So Delete() can access our private dtor.
190     template <typename T>
191     friend void grpc_core::Delete(T*);
192 
193     ~BalancerCallState();
194 
grpclb_policy() const195     GrpcLb* grpclb_policy() const {
196       return static_cast<GrpcLb*>(grpclb_policy_.get());
197     }
198 
199     void ScheduleNextClientLoadReportLocked();
200     void SendClientLoadReportLocked();
201 
202     static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
203 
204     static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
205     static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
206     static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
207     static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
208     static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
209 
210     // The owning LB policy.
211     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
212 
213     // The streaming call to the LB server. Always non-NULL.
214     grpc_call* lb_call_ = nullptr;
215 
216     // recv_initial_metadata
217     grpc_metadata_array lb_initial_metadata_recv_;
218 
219     // send_message
220     grpc_byte_buffer* send_message_payload_ = nullptr;
221     grpc_closure lb_on_initial_request_sent_;
222 
223     // recv_message
224     grpc_byte_buffer* recv_message_payload_ = nullptr;
225     grpc_closure lb_on_balancer_message_received_;
226     bool seen_initial_response_ = false;
227 
228     // recv_trailing_metadata
229     grpc_closure lb_on_balancer_status_received_;
230     grpc_metadata_array lb_trailing_metadata_recv_;
231     grpc_status_code lb_call_status_;
232     grpc_slice lb_call_status_details_;
233 
234     // The stats for client-side load reporting associated with this LB call.
235     // Created after the first serverlist is received.
236     RefCountedPtr<GrpcLbClientStats> client_stats_;
237     grpc_millis client_stats_report_interval_ = 0;
238     grpc_timer client_load_report_timer_;
239     bool client_load_report_timer_callback_pending_ = false;
240     bool last_client_load_report_counters_were_zero_ = false;
241     bool client_load_report_is_due_ = false;
242     // The closure used for either the load report timer or the callback for
243     // completion of sending the load report.
244     grpc_closure client_load_report_closure_;
245   };
246 
247   ~GrpcLb();
248 
249   void ShutdownLocked() override;
250 
251   // Helper function used in ctor and UpdateLocked().
252   void ProcessChannelArgsLocked(const grpc_channel_args& args);
253 
254   // Methods for dealing with the balancer channel and call.
255   void StartPickingLocked();
256   void StartBalancerCallLocked();
257   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
258   void StartBalancerCallRetryTimerLocked();
259   static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
260   static void OnBalancerChannelConnectivityChangedLocked(void* arg,
261                                                          grpc_error* error);
262 
263   // Pending pick methods.
264   static void PendingPickSetMetadataAndContext(PendingPick* pp);
265   PendingPick* PendingPickCreate(PickState* pick);
266   void AddPendingPick(PendingPick* pp);
267   static void OnPendingPickComplete(void* arg, grpc_error* error);
268 
269   // Methods for dealing with the RR policy.
270   void CreateOrUpdateRoundRobinPolicyLocked();
271   grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
272   void CreateRoundRobinPolicyLocked(const Args& args);
273   bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
274                                       grpc_error** error);
275   void UpdateConnectivityStateFromRoundRobinPolicyLocked(
276       grpc_error* rr_state_error);
277   static void OnRoundRobinConnectivityChangedLocked(void* arg,
278                                                     grpc_error* error);
279   static void OnRoundRobinRequestReresolutionLocked(void* arg,
280                                                     grpc_error* error);
281 
282   // Who the client is trying to communicate with.
283   const char* server_name_ = nullptr;
284 
285   // Current channel args from the resolver.
286   grpc_channel_args* args_ = nullptr;
287 
288   // Internal state.
289   bool started_picking_ = false;
290   bool shutting_down_ = false;
291   grpc_connectivity_state_tracker state_tracker_;
292 
293   // The channel for communicating with the LB server.
294   grpc_channel* lb_channel_ = nullptr;
295   // Mutex to protect the channel to the LB server. This is used when
296   // processing a channelz request.
297   gpr_mu lb_channel_mu_;
298   grpc_connectivity_state lb_channel_connectivity_;
299   grpc_closure lb_channel_on_connectivity_changed_;
300   // Are we already watching the LB channel's connectivity?
301   bool watching_lb_channel_ = false;
302   // Response generator to inject address updates into lb_channel_.
303   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
304 
305   // The data associated with the current LB call. It holds a ref to this LB
306   // policy. It's initialized every time we query for backends. It's reset to
307   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
308   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
309   // contains a non-NULL lb_call_.
310   OrphanablePtr<BalancerCallState> lb_calld_;
311   // Timeout in milliseconds for the LB call. 0 means no deadline.
312   int lb_call_timeout_ms_ = 0;
313   // Balancer call retry state.
314   BackOff lb_call_backoff_;
315   bool retry_timer_callback_pending_ = false;
316   grpc_timer lb_call_retry_timer_;
317   grpc_closure lb_on_call_retry_;
318 
319   // The deserialized response from the balancer. May be nullptr until one
320   // such response has arrived.
321   grpc_grpclb_serverlist* serverlist_ = nullptr;
322   // Index into serverlist for next pick.
323   // If the server at this index is a drop, we return a drop.
324   // Otherwise, we delegate to the RR policy.
325   size_t serverlist_index_ = 0;
326 
327   // Timeout in milliseconds for before using fallback backend addresses.
328   // 0 means not using fallback.
329   int lb_fallback_timeout_ms_ = 0;
330   // The backend addresses from the resolver.
331   grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
332   // Fallback timer.
333   bool fallback_timer_callback_pending_ = false;
334   grpc_timer lb_fallback_timer_;
335   grpc_closure lb_on_fallback_;
336 
337   // Pending picks that are waiting on the RR policy's connectivity.
338   PendingPick* pending_picks_ = nullptr;
339 
340   // The RR policy to use for the backends.
341   OrphanablePtr<LoadBalancingPolicy> rr_policy_;
342   grpc_connectivity_state rr_connectivity_state_;
343   grpc_closure on_rr_connectivity_changed_;
344   grpc_closure on_rr_request_reresolution_;
345 };
346 
347 //
348 // serverlist parsing code
349 //
350 
351 // vtable for LB tokens in grpc_lb_addresses
lb_token_copy(void * token)352 void* lb_token_copy(void* token) {
353   return token == nullptr
354              ? nullptr
355              : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
356 }
lb_token_destroy(void * token)357 void lb_token_destroy(void* token) {
358   if (token != nullptr) {
359     GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
360   }
361 }
lb_token_cmp(void * token1,void * token2)362 int lb_token_cmp(void* token1, void* token2) {
363   if (token1 > token2) return 1;
364   if (token1 < token2) return -1;
365   return 0;
366 }
367 const grpc_lb_user_data_vtable lb_token_vtable = {
368     lb_token_copy, lb_token_destroy, lb_token_cmp};
369 
370 // Returns the backend addresses extracted from the given addresses.
ExtractBackendAddresses(const grpc_lb_addresses * addresses)371 grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
372   // First pass: count the number of backend addresses.
373   size_t num_backends = 0;
374   for (size_t i = 0; i < addresses->num_addresses; ++i) {
375     if (!addresses->addresses[i].is_balancer) {
376       ++num_backends;
377     }
378   }
379   // Second pass: actually populate the addresses and (empty) LB tokens.
380   grpc_lb_addresses* backend_addresses =
381       grpc_lb_addresses_create(num_backends, &lb_token_vtable);
382   size_t num_copied = 0;
383   for (size_t i = 0; i < addresses->num_addresses; ++i) {
384     if (addresses->addresses[i].is_balancer) continue;
385     const grpc_resolved_address* addr = &addresses->addresses[i].address;
386     grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
387                                   addr->len, false /* is_balancer */,
388                                   nullptr /* balancer_name */,
389                                   (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
390     ++num_copied;
391   }
392   return backend_addresses;
393 }
394 
IsServerValid(const grpc_grpclb_server * server,size_t idx,bool log)395 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
396   if (server->drop) return false;
397   const grpc_grpclb_ip_address* ip = &server->ip_address;
398   if (GPR_UNLIKELY(server->port >> 16 != 0)) {
399     if (log) {
400       gpr_log(GPR_ERROR,
401               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
402               server->port, (unsigned long)idx);
403     }
404     return false;
405   }
406   if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
407     if (log) {
408       gpr_log(GPR_ERROR,
409               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
410               "serverlist. Ignoring",
411               ip->size, (unsigned long)idx);
412     }
413     return false;
414   }
415   return true;
416 }
417 
ParseServer(const grpc_grpclb_server * server,grpc_resolved_address * addr)418 void ParseServer(const grpc_grpclb_server* server,
419                  grpc_resolved_address* addr) {
420   memset(addr, 0, sizeof(*addr));
421   if (server->drop) return;
422   const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
423   /* the addresses are given in binary format (a in(6)_addr struct) in
424    * server->ip_address.bytes. */
425   const grpc_grpclb_ip_address* ip = &server->ip_address;
426   if (ip->size == 4) {
427     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
428     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
429     addr4->sin_family = GRPC_AF_INET;
430     memcpy(&addr4->sin_addr, ip->bytes, ip->size);
431     addr4->sin_port = netorder_port;
432   } else if (ip->size == 16) {
433     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
434     grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
435     addr6->sin6_family = GRPC_AF_INET6;
436     memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
437     addr6->sin6_port = netorder_port;
438   }
439 }
440 
441 // Returns addresses extracted from \a serverlist.
ProcessServerlist(const grpc_grpclb_serverlist * serverlist)442 grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
443   size_t num_valid = 0;
444   /* first pass: count how many are valid in order to allocate the necessary
445    * memory in a single block */
446   for (size_t i = 0; i < serverlist->num_servers; ++i) {
447     if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
448   }
449   grpc_lb_addresses* lb_addresses =
450       grpc_lb_addresses_create(num_valid, &lb_token_vtable);
451   /* second pass: actually populate the addresses and LB tokens (aka user data
452    * to the outside world) to be read by the RR policy during its creation.
453    * Given that the validity tests are very cheap, they are performed again
454    * instead of marking the valid ones during the first pass, as this would
455    * incurr in an allocation due to the arbitrary number of server */
456   size_t addr_idx = 0;
457   for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
458     const grpc_grpclb_server* server = serverlist->servers[sl_idx];
459     if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
460     GPR_ASSERT(addr_idx < num_valid);
461     /* address processing */
462     grpc_resolved_address addr;
463     ParseServer(server, &addr);
464     /* lb token processing */
465     void* user_data;
466     if (server->has_load_balance_token) {
467       const size_t lb_token_max_length =
468           GPR_ARRAY_SIZE(server->load_balance_token);
469       const size_t lb_token_length =
470           strnlen(server->load_balance_token, lb_token_max_length);
471       grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
472           server->load_balance_token, lb_token_length);
473       user_data =
474           (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
475               .payload;
476     } else {
477       char* uri = grpc_sockaddr_to_uri(&addr);
478       gpr_log(GPR_INFO,
479               "Missing LB token for backend address '%s'. The empty token will "
480               "be used instead",
481               uri);
482       gpr_free(uri);
483       user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
484     }
485     grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
486                                   false /* is_balancer */,
487                                   nullptr /* balancer_name */, user_data);
488     ++addr_idx;
489   }
490   GPR_ASSERT(addr_idx == num_valid);
491   return lb_addresses;
492 }
493 
494 //
495 // GrpcLb::BalancerCallState
496 //
497 
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)498 GrpcLb::BalancerCallState::BalancerCallState(
499     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
500     : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
501       grpclb_policy_(std::move(parent_grpclb_policy)) {
502   GPR_ASSERT(grpclb_policy_ != nullptr);
503   GPR_ASSERT(!grpclb_policy()->shutting_down_);
504   // Init the LB call. Note that the LB call will progress every time there's
505   // activity in grpclb_policy_->interested_parties(), which is comprised of
506   // the polling entities from client_channel.
507   GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
508   GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
509   const grpc_millis deadline =
510       grpclb_policy()->lb_call_timeout_ms_ == 0
511           ? GRPC_MILLIS_INF_FUTURE
512           : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
513   lb_call_ = grpc_channel_create_pollset_set_call(
514       grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
515       grpclb_policy_->interested_parties(),
516       GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
517       nullptr, deadline, nullptr);
518   // Init the LB call request payload.
519   grpc_grpclb_request* request =
520       grpc_grpclb_request_create(grpclb_policy()->server_name_);
521   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
522   send_message_payload_ =
523       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
524   grpc_slice_unref_internal(request_payload_slice);
525   grpc_grpclb_request_destroy(request);
526   // Init other data associated with the LB call.
527   grpc_metadata_array_init(&lb_initial_metadata_recv_);
528   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
529   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
530                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
531   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
532                     OnBalancerMessageReceivedLocked, this,
533                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
534   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
535                     OnBalancerStatusReceivedLocked, this,
536                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
537 }
538 
~BalancerCallState()539 GrpcLb::BalancerCallState::~BalancerCallState() {
540   GPR_ASSERT(lb_call_ != nullptr);
541   grpc_call_unref(lb_call_);
542   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
543   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
544   grpc_byte_buffer_destroy(send_message_payload_);
545   grpc_byte_buffer_destroy(recv_message_payload_);
546   grpc_slice_unref_internal(lb_call_status_details_);
547 }
548 
Orphan()549 void GrpcLb::BalancerCallState::Orphan() {
550   GPR_ASSERT(lb_call_ != nullptr);
551   // If we are here because grpclb_policy wants to cancel the call,
552   // lb_on_balancer_status_received_ will complete the cancellation and clean
553   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
554   // call, then the following cancellation will be a no-op.
555   grpc_call_cancel(lb_call_, nullptr);
556   if (client_load_report_timer_callback_pending_) {
557     grpc_timer_cancel(&client_load_report_timer_);
558   }
559   // Note that the initial ref is hold by lb_on_balancer_status_received_
560   // instead of the caller of this function. So the corresponding unref happens
561   // in lb_on_balancer_status_received_ instead of here.
562 }
563 
StartQuery()564 void GrpcLb::BalancerCallState::StartQuery() {
565   GPR_ASSERT(lb_call_ != nullptr);
566   if (grpc_lb_glb_trace.enabled()) {
567     gpr_log(GPR_INFO,
568             "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
569             grpclb_policy_.get(), this, lb_call_);
570   }
571   // Create the ops.
572   grpc_call_error call_error;
573   grpc_op ops[3];
574   memset(ops, 0, sizeof(ops));
575   // Op: send initial metadata.
576   grpc_op* op = ops;
577   op->op = GRPC_OP_SEND_INITIAL_METADATA;
578   op->data.send_initial_metadata.count = 0;
579   op->flags = 0;
580   op->reserved = nullptr;
581   op++;
582   // Op: send request message.
583   GPR_ASSERT(send_message_payload_ != nullptr);
584   op->op = GRPC_OP_SEND_MESSAGE;
585   op->data.send_message.send_message = send_message_payload_;
586   op->flags = 0;
587   op->reserved = nullptr;
588   op++;
589   // TODO(roth): We currently track this ref manually.  Once the
590   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
591   // with the callback.
592   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
593   self.release();
594   call_error = grpc_call_start_batch_and_execute(
595       lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
596   GPR_ASSERT(GRPC_CALL_OK == call_error);
597   // Op: recv initial metadata.
598   op = ops;
599   op->op = GRPC_OP_RECV_INITIAL_METADATA;
600   op->data.recv_initial_metadata.recv_initial_metadata =
601       &lb_initial_metadata_recv_;
602   op->flags = 0;
603   op->reserved = nullptr;
604   op++;
605   // Op: recv response.
606   op->op = GRPC_OP_RECV_MESSAGE;
607   op->data.recv_message.recv_message = &recv_message_payload_;
608   op->flags = 0;
609   op->reserved = nullptr;
610   op++;
611   // TODO(roth): We currently track this ref manually.  Once the
612   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
613   // with the callback.
614   self = Ref(DEBUG_LOCATION, "on_message_received");
615   self.release();
616   call_error = grpc_call_start_batch_and_execute(
617       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
618   GPR_ASSERT(GRPC_CALL_OK == call_error);
619   // Op: recv server status.
620   op = ops;
621   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
622   op->data.recv_status_on_client.trailing_metadata =
623       &lb_trailing_metadata_recv_;
624   op->data.recv_status_on_client.status = &lb_call_status_;
625   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
626   op->flags = 0;
627   op->reserved = nullptr;
628   op++;
629   // This callback signals the end of the LB call, so it relies on the initial
630   // ref instead of a new ref. When it's invoked, it's the initial ref that is
631   // unreffed.
632   call_error = grpc_call_start_batch_and_execute(
633       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
634   GPR_ASSERT(GRPC_CALL_OK == call_error);
635 };
636 
ScheduleNextClientLoadReportLocked()637 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
638   const grpc_millis next_client_load_report_time =
639       ExecCtx::Get()->Now() + client_stats_report_interval_;
640   GRPC_CLOSURE_INIT(&client_load_report_closure_,
641                     MaybeSendClientLoadReportLocked, this,
642                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
643   grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
644                   &client_load_report_closure_);
645   client_load_report_timer_callback_pending_ = true;
646 }
647 
MaybeSendClientLoadReportLocked(void * arg,grpc_error * error)648 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
649     void* arg, grpc_error* error) {
650   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
651   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
652   lb_calld->client_load_report_timer_callback_pending_ = false;
653   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
654     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
655     return;
656   }
657   // If we've already sent the initial request, then we can go ahead and send
658   // the load report. Otherwise, we need to wait until the initial request has
659   // been sent to send this (see OnInitialRequestSentLocked()).
660   if (lb_calld->send_message_payload_ == nullptr) {
661     lb_calld->SendClientLoadReportLocked();
662   } else {
663     lb_calld->client_load_report_is_due_ = true;
664   }
665 }
666 
LoadReportCountersAreZero(grpc_grpclb_request * request)667 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
668     grpc_grpclb_request* request) {
669   GrpcLbClientStats::DroppedCallCounts* drop_entries =
670       static_cast<GrpcLbClientStats::DroppedCallCounts*>(
671           request->client_stats.calls_finished_with_drop.arg);
672   return request->client_stats.num_calls_started == 0 &&
673          request->client_stats.num_calls_finished == 0 &&
674          request->client_stats.num_calls_finished_with_client_failed_to_send ==
675              0 &&
676          request->client_stats.num_calls_finished_known_received == 0 &&
677          (drop_entries == nullptr || drop_entries->size() == 0);
678 }
679 
SendClientLoadReportLocked()680 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
681   // Construct message payload.
682   GPR_ASSERT(send_message_payload_ == nullptr);
683   grpc_grpclb_request* request =
684       grpc_grpclb_load_report_request_create_locked(client_stats_.get());
685   // Skip client load report if the counters were all zero in the last
686   // report and they are still zero in this one.
687   if (LoadReportCountersAreZero(request)) {
688     if (last_client_load_report_counters_were_zero_) {
689       grpc_grpclb_request_destroy(request);
690       ScheduleNextClientLoadReportLocked();
691       return;
692     }
693     last_client_load_report_counters_were_zero_ = true;
694   } else {
695     last_client_load_report_counters_were_zero_ = false;
696   }
697   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
698   send_message_payload_ =
699       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
700   grpc_slice_unref_internal(request_payload_slice);
701   grpc_grpclb_request_destroy(request);
702   // Send the report.
703   grpc_op op;
704   memset(&op, 0, sizeof(op));
705   op.op = GRPC_OP_SEND_MESSAGE;
706   op.data.send_message.send_message = send_message_payload_;
707   GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
708                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
709   grpc_call_error call_error = grpc_call_start_batch_and_execute(
710       lb_call_, &op, 1, &client_load_report_closure_);
711   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
712     gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
713             call_error);
714     GPR_ASSERT(GRPC_CALL_OK == call_error);
715   }
716 }
717 
ClientLoadReportDoneLocked(void * arg,grpc_error * error)718 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
719                                                            grpc_error* error) {
720   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
721   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
722   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
723   lb_calld->send_message_payload_ = nullptr;
724   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
725     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
726     return;
727   }
728   lb_calld->ScheduleNextClientLoadReportLocked();
729 }
730 
OnInitialRequestSentLocked(void * arg,grpc_error * error)731 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
732                                                            grpc_error* error) {
733   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
734   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
735   lb_calld->send_message_payload_ = nullptr;
736   // If we attempted to send a client load report before the initial request was
737   // sent (and this lb_calld is still in use), send the load report now.
738   if (lb_calld->client_load_report_is_due_ &&
739       lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
740     lb_calld->SendClientLoadReportLocked();
741     lb_calld->client_load_report_is_due_ = false;
742   }
743   lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
744 }
745 
OnBalancerMessageReceivedLocked(void * arg,grpc_error * error)746 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
747     void* arg, grpc_error* error) {
748   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
749   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
750   // Empty payload means the LB call was cancelled.
751   if (lb_calld != grpclb_policy->lb_calld_.get() ||
752       lb_calld->recv_message_payload_ == nullptr) {
753     lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
754     return;
755   }
756   grpc_byte_buffer_reader bbr;
757   grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
758   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
759   grpc_byte_buffer_reader_destroy(&bbr);
760   grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
761   lb_calld->recv_message_payload_ = nullptr;
762   grpc_grpclb_initial_response* initial_response;
763   grpc_grpclb_serverlist* serverlist;
764   if (!lb_calld->seen_initial_response_ &&
765       (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
766           nullptr) {
767     // Have NOT seen initial response, look for initial response.
768     if (initial_response->has_client_stats_report_interval) {
769       lb_calld->client_stats_report_interval_ = GPR_MAX(
770           GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
771                               &initial_response->client_stats_report_interval));
772       if (grpc_lb_glb_trace.enabled()) {
773         gpr_log(GPR_INFO,
774                 "[grpclb %p] Received initial LB response message; "
775                 "client load reporting interval = %" PRId64 " milliseconds",
776                 grpclb_policy, lb_calld->client_stats_report_interval_);
777       }
778     } else if (grpc_lb_glb_trace.enabled()) {
779       gpr_log(GPR_INFO,
780               "[grpclb %p] Received initial LB response message; client load "
781               "reporting NOT enabled",
782               grpclb_policy);
783     }
784     grpc_grpclb_initial_response_destroy(initial_response);
785     lb_calld->seen_initial_response_ = true;
786   } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
787                   response_slice)) != nullptr) {
788     // Have seen initial response, look for serverlist.
789     GPR_ASSERT(lb_calld->lb_call_ != nullptr);
790     if (grpc_lb_glb_trace.enabled()) {
791       gpr_log(GPR_INFO,
792               "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
793               grpclb_policy, serverlist->num_servers);
794       for (size_t i = 0; i < serverlist->num_servers; ++i) {
795         grpc_resolved_address addr;
796         ParseServer(serverlist->servers[i], &addr);
797         char* ipport;
798         grpc_sockaddr_to_string(&ipport, &addr, false);
799         gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
800                 grpclb_policy, i, ipport);
801         gpr_free(ipport);
802       }
803     }
804     /* update serverlist */
805     if (serverlist->num_servers > 0) {
806       // Start sending client load report only after we start using the
807       // serverlist returned from the current LB call.
808       if (lb_calld->client_stats_report_interval_ > 0 &&
809           lb_calld->client_stats_ == nullptr) {
810         lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
811         // TODO(roth): We currently track this ref manually.  Once the
812         // ClosureRef API is ready, we should pass the RefCountedPtr<> along
813         // with the callback.
814         auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
815         self.release();
816         lb_calld->ScheduleNextClientLoadReportLocked();
817       }
818       if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
819                                         serverlist)) {
820         if (grpc_lb_glb_trace.enabled()) {
821           gpr_log(GPR_INFO,
822                   "[grpclb %p] Incoming server list identical to current, "
823                   "ignoring.",
824                   grpclb_policy);
825         }
826         grpc_grpclb_destroy_serverlist(serverlist);
827       } else { /* new serverlist */
828         if (grpclb_policy->serverlist_ != nullptr) {
829           /* dispose of the old serverlist */
830           grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
831         } else {
832           /* or dispose of the fallback */
833           grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
834           grpclb_policy->fallback_backend_addresses_ = nullptr;
835           if (grpclb_policy->fallback_timer_callback_pending_) {
836             grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
837           }
838         }
839         // and update the copy in the GrpcLb instance. This
840         // serverlist instance will be destroyed either upon the next
841         // update or when the GrpcLb instance is destroyed.
842         grpclb_policy->serverlist_ = serverlist;
843         grpclb_policy->serverlist_index_ = 0;
844         grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
845       }
846     } else {
847       if (grpc_lb_glb_trace.enabled()) {
848         gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
849                 grpclb_policy);
850       }
851       grpc_grpclb_destroy_serverlist(serverlist);
852     }
853   } else {
854     // No valid initial response or serverlist found.
855     gpr_log(GPR_ERROR,
856             "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
857             grpclb_policy,
858             grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
859   }
860   grpc_slice_unref_internal(response_slice);
861   if (!grpclb_policy->shutting_down_) {
862     // Keep listening for serverlist updates.
863     grpc_op op;
864     memset(&op, 0, sizeof(op));
865     op.op = GRPC_OP_RECV_MESSAGE;
866     op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
867     op.flags = 0;
868     op.reserved = nullptr;
869     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
870     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
871         lb_calld->lb_call_, &op, 1,
872         &lb_calld->lb_on_balancer_message_received_);
873     GPR_ASSERT(GRPC_CALL_OK == call_error);
874   } else {
875     lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
876   }
877 }
878 
OnBalancerStatusReceivedLocked(void * arg,grpc_error * error)879 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
880     void* arg, grpc_error* error) {
881   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
882   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
883   GPR_ASSERT(lb_calld->lb_call_ != nullptr);
884   if (grpc_lb_glb_trace.enabled()) {
885     char* status_details =
886         grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
887     gpr_log(GPR_INFO,
888             "[grpclb %p] Status from LB server received. Status = %d, details "
889             "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
890             grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
891             lb_calld->lb_call_, grpc_error_string(error));
892     gpr_free(status_details);
893   }
894   grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
895   // If this lb_calld is still in use, this call ended because of a failure so
896   // we want to retry connecting. Otherwise, we have deliberately ended this
897   // call and no further action is required.
898   if (lb_calld == grpclb_policy->lb_calld_.get()) {
899     grpclb_policy->lb_calld_.reset();
900     GPR_ASSERT(!grpclb_policy->shutting_down_);
901     if (lb_calld->seen_initial_response_) {
902       // If we lose connection to the LB server, reset the backoff and restart
903       // the LB call immediately.
904       grpclb_policy->lb_call_backoff_.Reset();
905       grpclb_policy->StartBalancerCallLocked();
906     } else {
907       // If this LB call fails establishing any connection to the LB server,
908       // retry later.
909       grpclb_policy->StartBalancerCallRetryTimerLocked();
910     }
911   }
912   lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
913 }
914 
915 //
916 // helper code for creating balancer channel
917 //
918 
ExtractBalancerAddresses(const grpc_lb_addresses * addresses)919 grpc_lb_addresses* ExtractBalancerAddresses(
920     const grpc_lb_addresses* addresses) {
921   size_t num_grpclb_addrs = 0;
922   for (size_t i = 0; i < addresses->num_addresses; ++i) {
923     if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
924   }
925   // There must be at least one balancer address, or else the
926   // client_channel would not have chosen this LB policy.
927   GPR_ASSERT(num_grpclb_addrs > 0);
928   grpc_lb_addresses* lb_addresses =
929       grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
930   size_t lb_addresses_idx = 0;
931   for (size_t i = 0; i < addresses->num_addresses; ++i) {
932     if (!addresses->addresses[i].is_balancer) continue;
933     if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
934       gpr_log(GPR_ERROR,
935               "This LB policy doesn't support user data. It will be ignored");
936     }
937     grpc_lb_addresses_set_address(
938         lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
939         addresses->addresses[i].address.len, false /* is balancer */,
940         addresses->addresses[i].balancer_name, nullptr /* user data */);
941   }
942   GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
943   return lb_addresses;
944 }
945 
946 /* Returns the channel args for the LB channel, used to create a bidirectional
947  * stream for the reception of load balancing updates.
948  *
949  * Inputs:
950  *   - \a addresses: corresponding to the balancers.
951  *   - \a response_generator: in order to propagate updates from the resolver
952  *   above the grpclb policy.
953  *   - \a args: other args inherited from the grpclb policy. */
BuildBalancerChannelArgs(const grpc_lb_addresses * addresses,FakeResolverResponseGenerator * response_generator,const grpc_channel_args * args)954 grpc_channel_args* BuildBalancerChannelArgs(
955     const grpc_lb_addresses* addresses,
956     FakeResolverResponseGenerator* response_generator,
957     const grpc_channel_args* args) {
958   grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
959   // Channel args to remove.
960   static const char* args_to_remove[] = {
961       // LB policy name, since we want to use the default (pick_first) in
962       // the LB channel.
963       GRPC_ARG_LB_POLICY_NAME,
964       // The channel arg for the server URI, since that will be different for
965       // the LB channel than for the parent channel.  The client channel
966       // factory will re-add this arg with the right value.
967       GRPC_ARG_SERVER_URI,
968       // The resolved addresses, which will be generated by the name resolver
969       // used in the LB channel.  Note that the LB channel will use the fake
970       // resolver, so this won't actually generate a query to DNS (or some
971       // other name service).  However, the addresses returned by the fake
972       // resolver will have is_balancer=false, whereas our own addresses have
973       // is_balancer=true.  We need the LB channel to return addresses with
974       // is_balancer=false so that it does not wind up recursively using the
975       // grpclb LB policy, as per the special case logic in client_channel.c.
976       GRPC_ARG_LB_ADDRESSES,
977       // The fake resolver response generator, because we are replacing it
978       // with the one from the grpclb policy, used to propagate updates to
979       // the LB channel.
980       GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
981       // The LB channel should use the authority indicated by the target
982       // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
983       // as opposed to the authority from the parent channel.
984       GRPC_ARG_DEFAULT_AUTHORITY,
985       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
986       // treated as a stand-alone channel and not inherit this argument from the
987       // args of the parent channel.
988       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
989   };
990   // Channel args to add.
991   const grpc_arg args_to_add[] = {
992       // New LB addresses.
993       // Note that we pass these in both when creating the LB channel
994       // and via the fake resolver.  The latter is what actually gets used.
995       grpc_lb_addresses_create_channel_arg(lb_addresses),
996       // The fake resolver response generator, which we use to inject
997       // address updates into the LB channel.
998       grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
999           response_generator),
1000       // A channel arg indicating the target is a grpclb load balancer.
1001       grpc_channel_arg_integer_create(
1002           const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
1003       // A channel arg indicating this is an internal channels, aka it is
1004       // owned by components in Core, not by the user application.
1005       grpc_channel_arg_integer_create(
1006           const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
1007   };
1008   // Construct channel args.
1009   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1010       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1011       GPR_ARRAY_SIZE(args_to_add));
1012   // Make any necessary modifications for security.
1013   new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
1014   // Clean up.
1015   grpc_lb_addresses_destroy(lb_addresses);
1016   return new_args;
1017 }
1018 
1019 //
1020 // ctor and dtor
1021 //
1022 
GrpcLb(const grpc_lb_addresses * addresses,const LoadBalancingPolicy::Args & args)1023 GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
1024                const LoadBalancingPolicy::Args& args)
1025     : LoadBalancingPolicy(args),
1026       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1027       lb_call_backoff_(
1028           BackOff::Options()
1029               .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1030                                    1000)
1031               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1032               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1033               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1034                                1000)) {
1035   // Initialization.
1036   gpr_mu_init(&lb_channel_mu_);
1037   grpc_subchannel_index_ref();
1038   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1039                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1040                     grpc_combiner_scheduler(args.combiner));
1041   GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
1042                     &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
1043                     grpc_combiner_scheduler(args.combiner));
1044   GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
1045                     &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
1046                     grpc_combiner_scheduler(args.combiner));
1047   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
1048   // Record server name.
1049   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1050   const char* server_uri = grpc_channel_arg_get_string(arg);
1051   GPR_ASSERT(server_uri != nullptr);
1052   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1053   GPR_ASSERT(uri->path[0] != '\0');
1054   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1055   if (grpc_lb_glb_trace.enabled()) {
1056     gpr_log(GPR_INFO,
1057             "[grpclb %p] Will use '%s' as the server name for LB request.",
1058             this, server_name_);
1059   }
1060   grpc_uri_destroy(uri);
1061   // Record LB call timeout.
1062   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1063   lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1064   // Record fallback timeout.
1065   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1066   lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
1067       arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1068   // Process channel args.
1069   ProcessChannelArgsLocked(*args.args);
1070 }
1071 
~GrpcLb()1072 GrpcLb::~GrpcLb() {
1073   GPR_ASSERT(pending_picks_ == nullptr);
1074   gpr_mu_destroy(&lb_channel_mu_);
1075   gpr_free((void*)server_name_);
1076   grpc_channel_args_destroy(args_);
1077   grpc_connectivity_state_destroy(&state_tracker_);
1078   if (serverlist_ != nullptr) {
1079     grpc_grpclb_destroy_serverlist(serverlist_);
1080   }
1081   if (fallback_backend_addresses_ != nullptr) {
1082     grpc_lb_addresses_destroy(fallback_backend_addresses_);
1083   }
1084   grpc_subchannel_index_unref();
1085 }
1086 
ShutdownLocked()1087 void GrpcLb::ShutdownLocked() {
1088   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
1089   shutting_down_ = true;
1090   lb_calld_.reset();
1091   if (retry_timer_callback_pending_) {
1092     grpc_timer_cancel(&lb_call_retry_timer_);
1093   }
1094   if (fallback_timer_callback_pending_) {
1095     grpc_timer_cancel(&lb_fallback_timer_);
1096   }
1097   rr_policy_.reset();
1098   TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
1099   // We destroy the LB channel here instead of in our destructor because
1100   // destroying the channel triggers a last callback to
1101   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1102   // alive when that callback is invoked.
1103   if (lb_channel_ != nullptr) {
1104     gpr_mu_lock(&lb_channel_mu_);
1105     grpc_channel_destroy(lb_channel_);
1106     lb_channel_ = nullptr;
1107     gpr_mu_unlock(&lb_channel_mu_);
1108   }
1109   grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
1110                               GRPC_ERROR_REF(error), "grpclb_shutdown");
1111   // Clear pending picks.
1112   PendingPick* pp;
1113   while ((pp = pending_picks_) != nullptr) {
1114     pending_picks_ = pp->next;
1115     pp->pick->connected_subchannel.reset();
1116     // Note: pp is deleted in this callback.
1117     GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
1118   }
1119   GRPC_ERROR_UNREF(error);
1120 }
1121 
1122 //
1123 // public methods
1124 //
1125 
HandOffPendingPicksLocked(LoadBalancingPolicy * new_policy)1126 void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
1127   PendingPick* pp;
1128   while ((pp = pending_picks_) != nullptr) {
1129     pending_picks_ = pp->next;
1130     pp->pick->on_complete = pp->original_on_complete;
1131     pp->pick->user_data = nullptr;
1132     grpc_error* error = GRPC_ERROR_NONE;
1133     if (new_policy->PickLocked(pp->pick, &error)) {
1134       // Synchronous return; schedule closure.
1135       GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
1136     }
1137     Delete(pp);
1138   }
1139 }
1140 
1141 // Cancel a specific pending pick.
1142 //
1143 // A grpclb pick progresses as follows:
1144 // - If there's a Round Robin policy (rr_policy_) available, it'll be
1145 //   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1146 //   that point onwards, it'll be RR's responsibility. For cancellations, that
1147 //   implies the pick needs also be cancelled by the RR instance.
1148 // - Otherwise, without an RR instance, picks stay pending at this policy's
1149 //   level (grpclb), inside the pending_picks_ list. To cancel these,
1150 //   we invoke the completion closure and set the pick's connected
1151 //   subchannel to nullptr right here.
CancelPickLocked(PickState * pick,grpc_error * error)1152 void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
1153   PendingPick* pp = pending_picks_;
1154   pending_picks_ = nullptr;
1155   while (pp != nullptr) {
1156     PendingPick* next = pp->next;
1157     if (pp->pick == pick) {
1158       pick->connected_subchannel.reset();
1159       // Note: pp is deleted in this callback.
1160       GRPC_CLOSURE_SCHED(&pp->on_complete,
1161                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1162                              "Pick Cancelled", &error, 1));
1163     } else {
1164       pp->next = pending_picks_;
1165       pending_picks_ = pp;
1166     }
1167     pp = next;
1168   }
1169   if (rr_policy_ != nullptr) {
1170     rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
1171   }
1172   GRPC_ERROR_UNREF(error);
1173 }
1174 
1175 // Cancel all pending picks.
1176 //
1177 // A grpclb pick progresses as follows:
1178 // - If there's a Round Robin policy (rr_policy_) available, it'll be
1179 //   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1180 //   that point onwards, it'll be RR's responsibility. For cancellations, that
1181 //   implies the pick needs also be cancelled by the RR instance.
1182 // - Otherwise, without an RR instance, picks stay pending at this policy's
1183 //   level (grpclb), inside the pending_picks_ list. To cancel these,
1184 //   we invoke the completion closure and set the pick's connected
1185 //   subchannel to nullptr right here.
CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,uint32_t initial_metadata_flags_eq,grpc_error * error)1186 void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
1187                                        uint32_t initial_metadata_flags_eq,
1188                                        grpc_error* error) {
1189   PendingPick* pp = pending_picks_;
1190   pending_picks_ = nullptr;
1191   while (pp != nullptr) {
1192     PendingPick* next = pp->next;
1193     if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
1194         initial_metadata_flags_eq) {
1195       // Note: pp is deleted in this callback.
1196       GRPC_CLOSURE_SCHED(&pp->on_complete,
1197                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1198                              "Pick Cancelled", &error, 1));
1199     } else {
1200       pp->next = pending_picks_;
1201       pending_picks_ = pp;
1202     }
1203     pp = next;
1204   }
1205   if (rr_policy_ != nullptr) {
1206     rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
1207                                           initial_metadata_flags_eq,
1208                                           GRPC_ERROR_REF(error));
1209   }
1210   GRPC_ERROR_UNREF(error);
1211 }
1212 
ExitIdleLocked()1213 void GrpcLb::ExitIdleLocked() {
1214   if (!started_picking_) {
1215     StartPickingLocked();
1216   }
1217 }
1218 
ResetBackoffLocked()1219 void GrpcLb::ResetBackoffLocked() {
1220   if (lb_channel_ != nullptr) {
1221     grpc_channel_reset_connect_backoff(lb_channel_);
1222   }
1223   if (rr_policy_ != nullptr) {
1224     rr_policy_->ResetBackoffLocked();
1225   }
1226 }
1227 
PickLocked(PickState * pick,grpc_error ** error)1228 bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
1229   PendingPick* pp = PendingPickCreate(pick);
1230   bool pick_done = false;
1231   if (rr_policy_ != nullptr) {
1232     if (grpc_lb_glb_trace.enabled()) {
1233       gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
1234               rr_policy_.get());
1235     }
1236     pick_done =
1237         PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
1238   } else {  // rr_policy_ == NULL
1239     if (pick->on_complete == nullptr) {
1240       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1241           "No pick result available but synchronous result required.");
1242       pick_done = true;
1243     } else {
1244       if (grpc_lb_glb_trace.enabled()) {
1245         gpr_log(GPR_INFO,
1246                 "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
1247                 this);
1248       }
1249       AddPendingPick(pp);
1250       if (!started_picking_) {
1251         StartPickingLocked();
1252       }
1253       pick_done = false;
1254     }
1255   }
1256   return pick_done;
1257 }
1258 
FillChildRefsForChannelz(ChildRefsList * child_subchannels,ChildRefsList * child_channels)1259 void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
1260                                       ChildRefsList* child_channels) {
1261   // delegate to the RoundRobin to fill the children subchannels.
1262   rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
1263   MutexLock lock(&lb_channel_mu_);
1264   if (lb_channel_ != nullptr) {
1265     grpc_core::channelz::ChannelNode* channel_node =
1266         grpc_channel_get_channelz_node(lb_channel_);
1267     if (channel_node != nullptr) {
1268       child_channels->push_back(channel_node->uuid());
1269     }
1270   }
1271 }
1272 
CheckConnectivityLocked(grpc_error ** connectivity_error)1273 grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
1274     grpc_error** connectivity_error) {
1275   return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
1276 }
1277 
NotifyOnStateChangeLocked(grpc_connectivity_state * current,grpc_closure * notify)1278 void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
1279                                        grpc_closure* notify) {
1280   grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
1281                                                  notify);
1282 }
1283 
ProcessChannelArgsLocked(const grpc_channel_args & args)1284 void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
1285   const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
1286   if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
1287     // Ignore this update.
1288     gpr_log(
1289         GPR_ERROR,
1290         "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
1291         this);
1292     return;
1293   }
1294   const grpc_lb_addresses* addresses =
1295       static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
1296   // Update fallback address list.
1297   if (fallback_backend_addresses_ != nullptr) {
1298     grpc_lb_addresses_destroy(fallback_backend_addresses_);
1299   }
1300   fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1301   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1302   // since we use this to trigger the client_load_reporting filter.
1303   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1304   grpc_arg new_arg = grpc_channel_arg_string_create(
1305       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1306   grpc_channel_args_destroy(args_);
1307   args_ = grpc_channel_args_copy_and_add_and_remove(
1308       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1309   // Construct args for balancer channel.
1310   grpc_channel_args* lb_channel_args =
1311       BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
1312   // Create balancer channel if needed.
1313   if (lb_channel_ == nullptr) {
1314     char* uri_str;
1315     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1316     gpr_mu_lock(&lb_channel_mu_);
1317     lb_channel_ = grpc_client_channel_factory_create_channel(
1318         client_channel_factory(), uri_str,
1319         GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
1320     gpr_mu_unlock(&lb_channel_mu_);
1321     GPR_ASSERT(lb_channel_ != nullptr);
1322     gpr_free(uri_str);
1323   }
1324   // Propagate updates to the LB channel (pick_first) through the fake
1325   // resolver.
1326   response_generator_->SetResponse(lb_channel_args);
1327   grpc_channel_args_destroy(lb_channel_args);
1328 }
1329 
UpdateLocked(const grpc_channel_args & args)1330 void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
1331   ProcessChannelArgsLocked(args);
1332   // If fallback is configured and the RR policy already exists, update
1333   // it with the new fallback addresses.
1334   if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
1335     CreateOrUpdateRoundRobinPolicyLocked();
1336   }
1337   // Start watching the LB channel connectivity for connection, if not
1338   // already doing so.
1339   if (!watching_lb_channel_) {
1340     lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
1341         lb_channel_, true /* try to connect */);
1342     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1343         grpc_channel_get_channel_stack(lb_channel_));
1344     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1345     watching_lb_channel_ = true;
1346     // TODO(roth): We currently track this ref manually.  Once the
1347     // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1348     // with the callback.
1349     auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1350     self.release();
1351     grpc_client_channel_watch_connectivity_state(
1352         client_channel_elem,
1353         grpc_polling_entity_create_from_pollset_set(interested_parties()),
1354         &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1355         nullptr);
1356   }
1357 }
1358 
1359 //
1360 // code for balancer channel and call
1361 //
1362 
StartPickingLocked()1363 void GrpcLb::StartPickingLocked() {
1364   // Start a timer to fall back.
1365   if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
1366       !fallback_timer_callback_pending_) {
1367     grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
1368     // TODO(roth): We currently track this ref manually.  Once the
1369     // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1370     // with the callback.
1371     auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
1372     self.release();
1373     GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1374                       grpc_combiner_scheduler(combiner()));
1375     fallback_timer_callback_pending_ = true;
1376     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1377   }
1378   started_picking_ = true;
1379   StartBalancerCallLocked();
1380 }
1381 
StartBalancerCallLocked()1382 void GrpcLb::StartBalancerCallLocked() {
1383   GPR_ASSERT(lb_channel_ != nullptr);
1384   if (shutting_down_) return;
1385   // Init the LB call data.
1386   GPR_ASSERT(lb_calld_ == nullptr);
1387   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1388   if (grpc_lb_glb_trace.enabled()) {
1389     gpr_log(GPR_INFO,
1390             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1391             this, lb_channel_, lb_calld_.get());
1392   }
1393   lb_calld_->StartQuery();
1394 }
1395 
OnFallbackTimerLocked(void * arg,grpc_error * error)1396 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1397   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1398   grpclb_policy->fallback_timer_callback_pending_ = false;
1399   // If we receive a serverlist after the timer fires but before this callback
1400   // actually runs, don't fall back.
1401   if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
1402       error == GRPC_ERROR_NONE) {
1403     if (grpc_lb_glb_trace.enabled()) {
1404       gpr_log(GPR_INFO,
1405               "[grpclb %p] Falling back to use backends from resolver",
1406               grpclb_policy);
1407     }
1408     GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
1409     grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
1410   }
1411   grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1412 }
1413 
StartBalancerCallRetryTimerLocked()1414 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1415   grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1416   if (grpc_lb_glb_trace.enabled()) {
1417     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1418     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1419     if (timeout > 0) {
1420       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1421               this, timeout);
1422     } else {
1423       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1424               this);
1425     }
1426   }
1427   // TODO(roth): We currently track this ref manually.  Once the
1428   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1429   // with the callback.
1430   auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1431   self.release();
1432   GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1433                     this, grpc_combiner_scheduler(combiner()));
1434   retry_timer_callback_pending_ = true;
1435   grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1436 }
1437 
OnBalancerCallRetryTimerLocked(void * arg,grpc_error * error)1438 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1439   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1440   grpclb_policy->retry_timer_callback_pending_ = false;
1441   if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1442       grpclb_policy->lb_calld_ == nullptr) {
1443     if (grpc_lb_glb_trace.enabled()) {
1444       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1445               grpclb_policy);
1446     }
1447     grpclb_policy->StartBalancerCallLocked();
1448   }
1449   grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1450 }
1451 
1452 // Invoked as part of the update process. It continues watching the LB channel
1453 // until it shuts down or becomes READY. It's invoked even if the LB channel
1454 // stayed READY throughout the update (for example if the update is identical).
OnBalancerChannelConnectivityChangedLocked(void * arg,grpc_error * error)1455 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1456                                                         grpc_error* error) {
1457   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1458   if (grpclb_policy->shutting_down_) goto done;
1459   // Re-initialize the lb_call. This should also take care of updating the
1460   // embedded RR policy. Note that the current RR policy, if any, will stay in
1461   // effect until an update from the new lb_call is received.
1462   switch (grpclb_policy->lb_channel_connectivity_) {
1463     case GRPC_CHANNEL_CONNECTING:
1464     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1465       // Keep watching the LB channel.
1466       grpc_channel_element* client_channel_elem =
1467           grpc_channel_stack_last_element(
1468               grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
1469       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1470       grpc_client_channel_watch_connectivity_state(
1471           client_channel_elem,
1472           grpc_polling_entity_create_from_pollset_set(
1473               grpclb_policy->interested_parties()),
1474           &grpclb_policy->lb_channel_connectivity_,
1475           &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
1476       break;
1477     }
1478       // The LB channel may be IDLE because it's shut down before the update.
1479       // Restart the LB call to kick the LB channel into gear.
1480     case GRPC_CHANNEL_IDLE:
1481     case GRPC_CHANNEL_READY:
1482       grpclb_policy->lb_calld_.reset();
1483       if (grpclb_policy->started_picking_) {
1484         if (grpclb_policy->retry_timer_callback_pending_) {
1485           grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
1486         }
1487         grpclb_policy->lb_call_backoff_.Reset();
1488         grpclb_policy->StartBalancerCallLocked();
1489       }
1490       // Fall through.
1491     case GRPC_CHANNEL_SHUTDOWN:
1492     done:
1493       grpclb_policy->watching_lb_channel_ = false;
1494       grpclb_policy->Unref(DEBUG_LOCATION,
1495                            "watch_lb_channel_connectivity_cb_shutdown");
1496   }
1497 }
1498 
1499 //
1500 // PendingPick
1501 //
1502 
1503 // Adds lb_token of selected subchannel (address) to the call's initial
1504 // metadata.
AddLbTokenToInitialMetadata(grpc_mdelem lb_token,grpc_linked_mdelem * lb_token_mdelem_storage,grpc_metadata_batch * initial_metadata)1505 grpc_error* AddLbTokenToInitialMetadata(
1506     grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
1507     grpc_metadata_batch* initial_metadata) {
1508   GPR_ASSERT(lb_token_mdelem_storage != nullptr);
1509   GPR_ASSERT(!GRPC_MDISNULL(lb_token));
1510   return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
1511                                       lb_token);
1512 }
1513 
1514 // Destroy function used when embedding client stats in call context.
DestroyClientStats(void * arg)1515 void DestroyClientStats(void* arg) {
1516   static_cast<GrpcLbClientStats*>(arg)->Unref();
1517 }
1518 
PendingPickSetMetadataAndContext(PendingPick * pp)1519 void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
1520   /* if connected_subchannel is nullptr, no pick has been made by the RR
1521    * policy (e.g., all addresses failed to connect). There won't be any
1522    * user_data/token available */
1523   if (pp->pick->connected_subchannel != nullptr) {
1524     if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
1525       AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
1526                                   &pp->pick->lb_token_mdelem_storage,
1527                                   pp->pick->initial_metadata);
1528     } else {
1529       gpr_log(GPR_ERROR,
1530               "[grpclb %p] No LB token for connected subchannel pick %p",
1531               pp->grpclb_policy, pp->pick);
1532       abort();
1533     }
1534     // Pass on client stats via context. Passes ownership of the reference.
1535     if (pp->client_stats != nullptr) {
1536       pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
1537           pp->client_stats.release();
1538       pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
1539           DestroyClientStats;
1540     }
1541   } else {
1542     pp->client_stats.reset();
1543   }
1544 }
1545 
1546 /* The \a on_complete closure passed as part of the pick requires keeping a
1547  * reference to its associated round robin instance. We wrap this closure in
1548  * order to unref the round robin instance upon its invocation */
OnPendingPickComplete(void * arg,grpc_error * error)1549 void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
1550   PendingPick* pp = static_cast<PendingPick*>(arg);
1551   PendingPickSetMetadataAndContext(pp);
1552   GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
1553   Delete(pp);
1554 }
1555 
PendingPickCreate(PickState * pick)1556 GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
1557   PendingPick* pp = New<PendingPick>();
1558   pp->grpclb_policy = this;
1559   pp->pick = pick;
1560   GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
1561                     grpc_schedule_on_exec_ctx);
1562   pp->original_on_complete = pick->on_complete;
1563   pick->on_complete = &pp->on_complete;
1564   return pp;
1565 }
1566 
AddPendingPick(PendingPick * pp)1567 void GrpcLb::AddPendingPick(PendingPick* pp) {
1568   pp->next = pending_picks_;
1569   pending_picks_ = pp;
1570 }
1571 
1572 //
1573 // code for interacting with the RR policy
1574 //
1575 
1576 // Performs a pick over \a rr_policy_. Given that a pick can return
1577 // immediately (ignoring its completion callback), we need to perform the
1578 // cleanups this callback would otherwise be responsible for.
1579 // If \a force_async is true, then we will manually schedule the
1580 // completion callback even if the pick is available immediately.
PickFromRoundRobinPolicyLocked(bool force_async,PendingPick * pp,grpc_error ** error)1581 bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
1582                                             grpc_error** error) {
1583   // Check for drops if we are not using fallback backend addresses.
1584   if (serverlist_ != nullptr) {
1585     // Look at the index into the serverlist to see if we should drop this call.
1586     grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
1587     if (serverlist_index_ == serverlist_->num_servers) {
1588       serverlist_index_ = 0;  // Wrap-around.
1589     }
1590     if (server->drop) {
1591       // Update client load reporting stats to indicate the number of
1592       // dropped calls.  Note that we have to do this here instead of in
1593       // the client_load_reporting filter, because we do not create a
1594       // subchannel call (and therefore no client_load_reporting filter)
1595       // for dropped calls.
1596       if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
1597         lb_calld_->client_stats()->AddCallDroppedLocked(
1598             server->load_balance_token);
1599       }
1600       if (force_async) {
1601         GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
1602         Delete(pp);
1603         return false;
1604       }
1605       Delete(pp);
1606       return true;
1607     }
1608   }
1609   // Set client_stats and user_data.
1610   if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
1611     pp->client_stats = lb_calld_->client_stats()->Ref();
1612   }
1613   GPR_ASSERT(pp->pick->user_data == nullptr);
1614   pp->pick->user_data = (void**)&pp->lb_token;
1615   // Pick via the RR policy.
1616   bool pick_done = rr_policy_->PickLocked(pp->pick, error);
1617   if (pick_done) {
1618     PendingPickSetMetadataAndContext(pp);
1619     if (force_async) {
1620       GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
1621       *error = GRPC_ERROR_NONE;
1622       pick_done = false;
1623     }
1624     Delete(pp);
1625   }
1626   // else, the pending pick will be registered and taken care of by the
1627   // pending pick list inside the RR policy.  Eventually,
1628   // OnPendingPickComplete() will be called, which will (among other
1629   // things) add the LB token to the call's initial metadata.
1630   return pick_done;
1631 }
1632 
CreateRoundRobinPolicyLocked(const Args & args)1633 void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
1634   GPR_ASSERT(rr_policy_ == nullptr);
1635   rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1636       "round_robin", args);
1637   if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
1638     gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
1639             this);
1640     return;
1641   }
1642   // TODO(roth): We currently track this ref manually.  Once the new
1643   // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1644   auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1645   self.release();
1646   rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
1647   grpc_error* rr_state_error = nullptr;
1648   rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
1649   // Connectivity state is a function of the RR policy updated/created.
1650   UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
1651   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1652   // created RR policy. This will make the RR policy progress upon activity on
1653   // gRPC LB, which in turn is tied to the application's call.
1654   grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
1655                                    interested_parties());
1656   // Subscribe to changes to the connectivity of the new RR.
1657   // TODO(roth): We currently track this ref manually.  Once the new
1658   // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1659   self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
1660   self.release();
1661   rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
1662                                         &on_rr_connectivity_changed_);
1663   rr_policy_->ExitIdleLocked();
1664   // Send pending picks to RR policy.
1665   PendingPick* pp;
1666   while ((pp = pending_picks_)) {
1667     pending_picks_ = pp->next;
1668     if (grpc_lb_glb_trace.enabled()) {
1669       gpr_log(GPR_INFO,
1670               "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
1671               rr_policy_.get());
1672     }
1673     grpc_error* error = GRPC_ERROR_NONE;
1674     PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
1675   }
1676 }
1677 
CreateRoundRobinPolicyArgsLocked()1678 grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
1679   grpc_lb_addresses* addresses;
1680   bool is_backend_from_grpclb_load_balancer = false;
1681   if (serverlist_ != nullptr) {
1682     GPR_ASSERT(serverlist_->num_servers > 0);
1683     addresses = ProcessServerlist(serverlist_);
1684     is_backend_from_grpclb_load_balancer = true;
1685   } else {
1686     // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
1687     // received any serverlist from the balancer, we use the fallback backends
1688     // returned by the resolver. Note that the fallback backend list may be
1689     // empty, in which case the new round_robin policy will keep the requested
1690     // picks pending.
1691     GPR_ASSERT(fallback_backend_addresses_ != nullptr);
1692     addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
1693   }
1694   GPR_ASSERT(addresses != nullptr);
1695   // Replace the LB addresses in the channel args that we pass down to
1696   // the subchannel.
1697   static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
1698   const grpc_arg args_to_add[] = {
1699       grpc_lb_addresses_create_channel_arg(addresses),
1700       // A channel arg indicating if the target is a backend inferred from a
1701       // grpclb load balancer.
1702       grpc_channel_arg_integer_create(
1703           const_cast<char*>(
1704               GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1705           is_backend_from_grpclb_load_balancer),
1706   };
1707   grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
1708       args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
1709       GPR_ARRAY_SIZE(args_to_add));
1710   grpc_lb_addresses_destroy(addresses);
1711   return args;
1712 }
1713 
CreateOrUpdateRoundRobinPolicyLocked()1714 void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
1715   if (shutting_down_) return;
1716   grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
1717   GPR_ASSERT(args != nullptr);
1718   if (rr_policy_ != nullptr) {
1719     if (grpc_lb_glb_trace.enabled()) {
1720       gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
1721               rr_policy_.get());
1722     }
1723     rr_policy_->UpdateLocked(*args);
1724   } else {
1725     LoadBalancingPolicy::Args lb_policy_args;
1726     lb_policy_args.combiner = combiner();
1727     lb_policy_args.client_channel_factory = client_channel_factory();
1728     lb_policy_args.args = args;
1729     CreateRoundRobinPolicyLocked(lb_policy_args);
1730     if (grpc_lb_glb_trace.enabled()) {
1731       gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
1732               rr_policy_.get());
1733     }
1734   }
1735   grpc_channel_args_destroy(args);
1736 }
1737 
OnRoundRobinRequestReresolutionLocked(void * arg,grpc_error * error)1738 void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
1739                                                    grpc_error* error) {
1740   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1741   if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
1742     grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1743     return;
1744   }
1745   if (grpc_lb_glb_trace.enabled()) {
1746     gpr_log(
1747         GPR_INFO,
1748         "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
1749         grpclb_policy, grpclb_policy->rr_policy_.get());
1750   }
1751   // If we are talking to a balancer, we expect to get updated addresses form
1752   // the balancer, so we can ignore the re-resolution request from the RR
1753   // policy. Otherwise, handle the re-resolution request using the
1754   // grpclb policy's original re-resolution closure.
1755   if (grpclb_policy->lb_calld_ == nullptr ||
1756       !grpclb_policy->lb_calld_->seen_initial_response()) {
1757     grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
1758   }
1759   // Give back the wrapper closure to the RR policy.
1760   grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
1761       &grpclb_policy->on_rr_request_reresolution_);
1762 }
1763 
UpdateConnectivityStateFromRoundRobinPolicyLocked(grpc_error * rr_state_error)1764 void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
1765     grpc_error* rr_state_error) {
1766   const grpc_connectivity_state curr_glb_state =
1767       grpc_connectivity_state_check(&state_tracker_);
1768   /* The new connectivity status is a function of the previous one and the new
1769    * input coming from the status of the RR policy.
1770    *
1771    *  current state (grpclb's)
1772    *  |
1773    *  v  || I  |  C  |  R  |  TF  |  SD  |  <- new state (RR's)
1774    *  ===++====+=====+=====+======+======+
1775    *   I || I  |  C  |  R  | [I]  | [I]  |
1776    *  ---++----+-----+-----+------+------+
1777    *   C || I  |  C  |  R  | [C]  | [C]  |
1778    *  ---++----+-----+-----+------+------+
1779    *   R || I  |  C  |  R  | [R]  | [R]  |
1780    *  ---++----+-----+-----+------+------+
1781    *  TF || I  |  C  |  R  | [TF] | [TF] |
1782    *  ---++----+-----+-----+------+------+
1783    *  SD || NA |  NA |  NA |  NA  |  NA  | (*)
1784    *  ---++----+-----+-----+------+------+
1785    *
1786    * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
1787    * is the current state of grpclb, which is left untouched.
1788    *
1789    *  In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
1790    *  the previous RR instance.
1791    *
1792    *  Note that the status is never updated to SHUTDOWN as a result of calling
1793    *  this function. Only glb_shutdown() has the power to set that state.
1794    *
1795    *  (*) This function mustn't be called during shutting down. */
1796   GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
1797   switch (rr_connectivity_state_) {
1798     case GRPC_CHANNEL_TRANSIENT_FAILURE:
1799     case GRPC_CHANNEL_SHUTDOWN:
1800       GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
1801       break;
1802     case GRPC_CHANNEL_IDLE:
1803     case GRPC_CHANNEL_CONNECTING:
1804     case GRPC_CHANNEL_READY:
1805       GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
1806   }
1807   if (grpc_lb_glb_trace.enabled()) {
1808     gpr_log(
1809         GPR_INFO,
1810         "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
1811         this, grpc_connectivity_state_name(rr_connectivity_state_),
1812         rr_policy_.get());
1813   }
1814   grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
1815                               rr_state_error,
1816                               "update_lb_connectivity_status_locked");
1817 }
1818 
OnRoundRobinConnectivityChangedLocked(void * arg,grpc_error * error)1819 void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
1820                                                    grpc_error* error) {
1821   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1822   if (grpclb_policy->shutting_down_) {
1823     grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
1824     return;
1825   }
1826   grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
1827       GRPC_ERROR_REF(error));
1828   // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
1829   grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
1830       &grpclb_policy->rr_connectivity_state_,
1831       &grpclb_policy->on_rr_connectivity_changed_);
1832 }
1833 
1834 //
1835 // factory
1836 //
1837 
1838 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1839  public:
CreateLoadBalancingPolicy(const LoadBalancingPolicy::Args & args) const1840   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1841       const LoadBalancingPolicy::Args& args) const override {
1842     /* Count the number of gRPC-LB addresses. There must be at least one. */
1843     const grpc_arg* arg =
1844         grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
1845     if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
1846       return nullptr;
1847     }
1848     grpc_lb_addresses* addresses =
1849         static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
1850     size_t num_grpclb_addrs = 0;
1851     for (size_t i = 0; i < addresses->num_addresses; ++i) {
1852       if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
1853     }
1854     if (num_grpclb_addrs == 0) return nullptr;
1855     return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
1856   }
1857 
name() const1858   const char* name() const override { return "grpclb"; }
1859 };
1860 
1861 }  // namespace
1862 
1863 }  // namespace grpc_core
1864 
1865 //
1866 // Plugin registration
1867 //
1868 
1869 namespace {
1870 
1871 // Only add client_load_reporting filter if the grpclb LB policy is used.
maybe_add_client_load_reporting_filter(grpc_channel_stack_builder * builder,void * arg)1872 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1873                                             void* arg) {
1874   const grpc_channel_args* args =
1875       grpc_channel_stack_builder_get_channel_arguments(builder);
1876   const grpc_arg* channel_arg =
1877       grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1878   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1879       strcmp(channel_arg->value.string, "grpclb") == 0) {
1880     return grpc_channel_stack_builder_append_filter(
1881         builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1882   }
1883   return true;
1884 }
1885 
1886 }  // namespace
1887 
grpc_lb_policy_grpclb_init()1888 void grpc_lb_policy_grpclb_init() {
1889   grpc_core::LoadBalancingPolicyRegistry::Builder::
1890       RegisterLoadBalancingPolicyFactory(
1891           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1892               grpc_core::New<grpc_core::GrpcLbFactory>()));
1893   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1894                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1895                                    maybe_add_client_load_reporting_filter,
1896                                    (void*)&grpc_client_load_reporting_filter);
1897 }
1898 
grpc_lb_policy_grpclb_shutdown()1899 void grpc_lb_policy_grpclb_shutdown() {}
1900