1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29 
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/memory.h"
46 #include "src/core/lib/gprpp/orphanable.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/iomgr/sockaddr.h"
50 #include "src/core/lib/iomgr/sockaddr_utils.h"
51 #include "src/core/lib/iomgr/timer.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/slice/slice_string_helpers.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/channel.h"
56 #include "src/core/lib/surface/channel_init.h"
57 #include "src/core/lib/transport/static_metadata.h"
58 
59 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
60 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
61 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
62 #define GRPC_XDS_RECONNECT_JITTER 0.2
63 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
64 
65 namespace grpc_core {
66 
67 TraceFlag grpc_xds_client_trace(false, "xds_client");
68 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
69 
70 namespace {
71 
72 Mutex* g_mu = nullptr;
73 const grpc_channel_args* g_channel_args = nullptr;
74 XdsClient* g_xds_client = nullptr;
75 
76 }  // namespace
77 
78 //
79 // Internal class declarations
80 //
81 
82 // An xds call wrapper that can restart a call upon failure. Holds a ref to
83 // the xds channel. The template parameter is the kind of wrapped xds call.
84 template <typename T>
85 class XdsClient::ChannelState::RetryableCall
86     : public InternallyRefCounted<RetryableCall<T>> {
87  public:
88   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
89 
90   void Orphan() override;
91 
92   void OnCallFinishedLocked();
93 
calld() const94   T* calld() const { return calld_.get(); }
chand() const95   ChannelState* chand() const { return chand_.get(); }
96 
97   bool IsCurrentCallOnChannel() const;
98 
99  private:
100   void StartNewCallLocked();
101   void StartRetryTimerLocked();
102   static void OnRetryTimer(void* arg, grpc_error* error);
103   void OnRetryTimerLocked(grpc_error* error);
104 
105   // The wrapped xds call that talks to the xds server. It's instantiated
106   // every time we start a new call. It's null during call retry backoff.
107   OrphanablePtr<T> calld_;
108   // The owning xds channel.
109   RefCountedPtr<ChannelState> chand_;
110 
111   // Retry state.
112   BackOff backoff_;
113   grpc_timer retry_timer_;
114   grpc_closure on_retry_timer_;
115   bool retry_timer_callback_pending_ = false;
116 
117   bool shutting_down_ = false;
118 };
119 
120 // Contains an ADS call to the xds server.
121 class XdsClient::ChannelState::AdsCallState
122     : public InternallyRefCounted<AdsCallState> {
123  public:
124   // The ctor and dtor should not be used directly.
125   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
126   ~AdsCallState() override;
127 
128   void Orphan() override;
129 
parent() const130   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const131   ChannelState* chand() const { return parent_->chand(); }
xds_client() const132   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const133   bool seen_response() const { return seen_response_; }
134 
135   void Subscribe(const std::string& type_url, const std::string& name);
136   void Unsubscribe(const std::string& type_url, const std::string& name,
137                    bool delay_unsubscription);
138 
139   bool HasSubscribedResources() const;
140 
141  private:
142   class ResourceState : public InternallyRefCounted<ResourceState> {
143    public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)144     ResourceState(const std::string& type_url, const std::string& name,
145                   bool sent_initial_request)
146         : type_url_(type_url),
147           name_(name),
148           sent_initial_request_(sent_initial_request) {
149       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
150                         grpc_schedule_on_exec_ctx);
151     }
152 
Orphan()153     void Orphan() override {
154       Finish();
155       Unref(DEBUG_LOCATION, "Orphan");
156     }
157 
Start(RefCountedPtr<AdsCallState> ads_calld)158     void Start(RefCountedPtr<AdsCallState> ads_calld) {
159       if (sent_initial_request_) return;
160       sent_initial_request_ = true;
161       ads_calld_ = std::move(ads_calld);
162       Ref(DEBUG_LOCATION, "timer").release();
163       timer_pending_ = true;
164       grpc_timer_init(
165           &timer_,
166           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
167           &timer_callback_);
168     }
169 
Finish()170     void Finish() {
171       if (timer_pending_) {
172         grpc_timer_cancel(&timer_);
173         timer_pending_ = false;
174       }
175     }
176 
177    private:
OnTimer(void * arg,grpc_error * error)178     static void OnTimer(void* arg, grpc_error* error) {
179       ResourceState* self = static_cast<ResourceState*>(arg);
180       {
181         MutexLock lock(&self->ads_calld_->xds_client()->mu_);
182         self->OnTimerLocked(GRPC_ERROR_REF(error));
183       }
184       self->ads_calld_.reset();
185       self->Unref(DEBUG_LOCATION, "timer");
186     }
187 
OnTimerLocked(grpc_error * error)188     void OnTimerLocked(grpc_error* error) {
189       if (error == GRPC_ERROR_NONE && timer_pending_) {
190         timer_pending_ = false;
191         grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
192             absl::StrFormat(
193                 "timeout obtaining resource {type=%s name=%s} from xds server",
194                 type_url_, name_)
195                 .c_str());
196         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
197           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
198                   grpc_error_string(watcher_error));
199         }
200         if (type_url_ == XdsApi::kLdsTypeUrl) {
201           ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
202           for (const auto& p : state.watchers) {
203             p.first->OnError(GRPC_ERROR_REF(watcher_error));
204           }
205         } else if (type_url_ == XdsApi::kRdsTypeUrl) {
206           RouteConfigState& state =
207               ads_calld_->xds_client()->route_config_map_[name_];
208           for (const auto& p : state.watchers) {
209             p.first->OnError(GRPC_ERROR_REF(watcher_error));
210           }
211         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
212           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
213           for (const auto& p : state.watchers) {
214             p.first->OnError(GRPC_ERROR_REF(watcher_error));
215           }
216         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
217           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
218           for (const auto& p : state.watchers) {
219             p.first->OnError(GRPC_ERROR_REF(watcher_error));
220           }
221         } else {
222           GPR_UNREACHABLE_CODE(return );
223         }
224         GRPC_ERROR_UNREF(watcher_error);
225       }
226       GRPC_ERROR_UNREF(error);
227     }
228 
229     const std::string type_url_;
230     const std::string name_;
231 
232     RefCountedPtr<AdsCallState> ads_calld_;
233     bool sent_initial_request_;
234     bool timer_pending_ = false;
235     grpc_timer timer_;
236     grpc_closure timer_callback_;
237   };
238 
239   struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState240     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
241 
242     // Nonce and error for this resource type.
243     std::string nonce;
244     grpc_error* error = GRPC_ERROR_NONE;
245 
246     // Subscribed resources of this type.
247     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
248         subscribed_resources;
249   };
250 
251   void SendMessageLocked(const std::string& type_url);
252 
253   void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
254   void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
255   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
256   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
257 
258   static void OnRequestSent(void* arg, grpc_error* error);
259   void OnRequestSentLocked(grpc_error* error);
260   static void OnResponseReceived(void* arg, grpc_error* error);
261   bool OnResponseReceivedLocked();
262   static void OnStatusReceived(void* arg, grpc_error* error);
263   void OnStatusReceivedLocked(grpc_error* error);
264 
265   bool IsCurrentCallOnChannel() const;
266 
267   std::set<absl::string_view> ResourceNamesForRequest(
268       const std::string& type_url);
269 
270   // The owning RetryableCall<>.
271   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
272 
273   bool sent_initial_message_ = false;
274   bool seen_response_ = false;
275 
276   // Always non-NULL.
277   grpc_call* call_;
278 
279   // recv_initial_metadata
280   grpc_metadata_array initial_metadata_recv_;
281 
282   // send_message
283   grpc_byte_buffer* send_message_payload_ = nullptr;
284   grpc_closure on_request_sent_;
285 
286   // recv_message
287   grpc_byte_buffer* recv_message_payload_ = nullptr;
288   grpc_closure on_response_received_;
289 
290   // recv_trailing_metadata
291   grpc_metadata_array trailing_metadata_recv_;
292   grpc_status_code status_code_;
293   grpc_slice status_details_;
294   grpc_closure on_status_received_;
295 
296   // Resource types for which requests need to be sent.
297   std::set<std::string /*type_url*/> buffered_requests_;
298 
299   // State for each resource type.
300   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
301 };
302 
303 // Contains an LRS call to the xds server.
304 class XdsClient::ChannelState::LrsCallState
305     : public InternallyRefCounted<LrsCallState> {
306  public:
307   // The ctor and dtor should not be used directly.
308   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
309   ~LrsCallState() override;
310 
311   void Orphan() override;
312 
313   void MaybeStartReportingLocked();
314 
parent()315   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const316   ChannelState* chand() const { return parent_->chand(); }
xds_client() const317   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const318   bool seen_response() const { return seen_response_; }
319 
320  private:
321   // Reports client-side load stats according to a fixed interval.
322   class Reporter : public InternallyRefCounted<Reporter> {
323    public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)324     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
325         : parent_(std::move(parent)), report_interval_(report_interval) {
326       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
327                         grpc_schedule_on_exec_ctx);
328       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
329                         grpc_schedule_on_exec_ctx);
330       ScheduleNextReportLocked();
331     }
332 
333     void Orphan() override;
334 
335    private:
336     void ScheduleNextReportLocked();
337     static void OnNextReportTimer(void* arg, grpc_error* error);
338     bool OnNextReportTimerLocked(grpc_error* error);
339     bool SendReportLocked();
340     static void OnReportDone(void* arg, grpc_error* error);
341     bool OnReportDoneLocked(grpc_error* error);
342 
IsCurrentReporterOnCall() const343     bool IsCurrentReporterOnCall() const {
344       return this == parent_->reporter_.get();
345     }
xds_client() const346     XdsClient* xds_client() const { return parent_->xds_client(); }
347 
348     // The owning LRS call.
349     RefCountedPtr<LrsCallState> parent_;
350 
351     // The load reporting state.
352     const grpc_millis report_interval_;
353     bool last_report_counters_were_zero_ = false;
354     bool next_report_timer_callback_pending_ = false;
355     grpc_timer next_report_timer_;
356     grpc_closure on_next_report_timer_;
357     grpc_closure on_report_done_;
358   };
359 
360   static void OnInitialRequestSent(void* arg, grpc_error* error);
361   void OnInitialRequestSentLocked();
362   static void OnResponseReceived(void* arg, grpc_error* error);
363   bool OnResponseReceivedLocked();
364   static void OnStatusReceived(void* arg, grpc_error* error);
365   void OnStatusReceivedLocked(grpc_error* error);
366 
367   bool IsCurrentCallOnChannel() const;
368 
369   // The owning RetryableCall<>.
370   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
371   bool seen_response_ = false;
372 
373   // Always non-NULL.
374   grpc_call* call_;
375 
376   // recv_initial_metadata
377   grpc_metadata_array initial_metadata_recv_;
378 
379   // send_message
380   grpc_byte_buffer* send_message_payload_ = nullptr;
381   grpc_closure on_initial_request_sent_;
382 
383   // recv_message
384   grpc_byte_buffer* recv_message_payload_ = nullptr;
385   grpc_closure on_response_received_;
386 
387   // recv_trailing_metadata
388   grpc_metadata_array trailing_metadata_recv_;
389   grpc_status_code status_code_;
390   grpc_slice status_details_;
391   grpc_closure on_status_received_;
392 
393   // Load reporting state.
394   bool send_all_clusters_ = false;
395   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
396   grpc_millis load_reporting_interval_ = 0;
397   OrphanablePtr<Reporter> reporter_;
398 };
399 
400 //
401 // XdsClient::ChannelState::StateWatcher
402 //
403 
404 class XdsClient::ChannelState::StateWatcher
405     : public AsyncConnectivityStateWatcherInterface {
406  public:
StateWatcher(RefCountedPtr<ChannelState> parent)407   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
408       : parent_(std::move(parent)) {}
409 
410  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)411   void OnConnectivityStateChange(grpc_connectivity_state new_state,
412                                  const absl::Status& status) override {
413     MutexLock lock(&parent_->xds_client_->mu_);
414     if (!parent_->shutting_down_ &&
415         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
416       // In TRANSIENT_FAILURE.  Notify all watchers of error.
417       gpr_log(GPR_INFO,
418               "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
419               "status_message:(%s)",
420               parent_->xds_client(), status.ToString().c_str());
421       parent_->xds_client()->NotifyOnErrorLocked(
422           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
423               "xds channel in TRANSIENT_FAILURE"));
424     }
425   }
426 
427   RefCountedPtr<ChannelState> parent_;
428 };
429 
430 //
431 // XdsClient::ChannelState
432 //
433 
434 namespace {
435 
CreateXdsChannel(const XdsBootstrap::XdsServer & server)436 grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
437   // Build channel args.
438   absl::InlinedVector<grpc_arg, 2> args_to_add = {
439       grpc_channel_arg_integer_create(
440           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
441           5 * 60 * GPR_MS_PER_SEC),
442       grpc_channel_arg_integer_create(
443           const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
444   };
445   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
446       g_channel_args, args_to_add.data(), args_to_add.size());
447   // Create channel creds.
448   RefCountedPtr<grpc_channel_credentials> channel_creds =
449       XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
450                                                 server.channel_creds_config);
451   // Create channel.
452   grpc_channel* channel = grpc_secure_channel_create(
453       channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
454   grpc_channel_args_destroy(new_args);
455   return channel;
456 }
457 
458 }  // namespace
459 
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)460 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
461                                       const XdsBootstrap::XdsServer& server)
462     : InternallyRefCounted<ChannelState>(
463           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
464               ? "ChannelState"
465               : nullptr),
466       xds_client_(std::move(xds_client)),
467       server_(server) {
468   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
469     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
470             xds_client_.get(), server.server_uri.c_str());
471   }
472   channel_ = CreateXdsChannel(server);
473   GPR_ASSERT(channel_ != nullptr);
474   StartConnectivityWatchLocked();
475 }
476 
~ChannelState()477 XdsClient::ChannelState::~ChannelState() {
478   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
479     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
480             this);
481   }
482   grpc_channel_destroy(channel_);
483   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
484 }
485 
Orphan()486 void XdsClient::ChannelState::Orphan() {
487   shutting_down_ = true;
488   CancelConnectivityWatchLocked();
489   ads_calld_.reset();
490   lrs_calld_.reset();
491   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
492 }
493 
ads_calld() const494 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
495     const {
496   return ads_calld_->calld();
497 }
498 
lrs_calld() const499 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
500     const {
501   return lrs_calld_->calld();
502 }
503 
HasActiveAdsCall() const504 bool XdsClient::ChannelState::HasActiveAdsCall() const {
505   return ads_calld_->calld() != nullptr;
506 }
507 
MaybeStartLrsCall()508 void XdsClient::ChannelState::MaybeStartLrsCall() {
509   if (lrs_calld_ != nullptr) return;
510   lrs_calld_.reset(
511       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
512 }
513 
StopLrsCall()514 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
515 
StartConnectivityWatchLocked()516 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
517   grpc_channel_element* client_channel_elem =
518       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
519   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
520   watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
521   grpc_client_channel_start_connectivity_watch(
522       client_channel_elem, GRPC_CHANNEL_IDLE,
523       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
524 }
525 
CancelConnectivityWatchLocked()526 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
527   grpc_channel_element* client_channel_elem =
528       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
529   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
530   grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
531 }
532 
Subscribe(const std::string & type_url,const std::string & name)533 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
534                                         const std::string& name) {
535   if (ads_calld_ == nullptr) {
536     // Start the ADS call if this is the first request.
537     ads_calld_.reset(new RetryableCall<AdsCallState>(
538         Ref(DEBUG_LOCATION, "ChannelState+ads")));
539     // Note: AdsCallState's ctor will automatically subscribe to all
540     // resources that the XdsClient already has watchers for, so we can
541     // return here.
542     return;
543   }
544   // If the ADS call is in backoff state, we don't need to do anything now
545   // because when the call is restarted it will resend all necessary requests.
546   if (ads_calld() == nullptr) return;
547   // Subscribe to this resource if the ADS call is active.
548   ads_calld()->Subscribe(type_url, name);
549 }
550 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)551 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
552                                           const std::string& name,
553                                           bool delay_unsubscription) {
554   if (ads_calld_ != nullptr) {
555     auto* calld = ads_calld_->calld();
556     if (calld != nullptr) {
557       calld->Unsubscribe(type_url, name, delay_unsubscription);
558       if (!calld->HasSubscribedResources()) ads_calld_.reset();
559     }
560   }
561 }
562 
563 //
564 // XdsClient::ChannelState::RetryableCall<>
565 //
566 
567 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)568 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
569     RefCountedPtr<ChannelState> chand)
570     : chand_(std::move(chand)),
571       backoff_(
572           BackOff::Options()
573               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
574                                    1000)
575               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
576               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
577               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
578   // Closure Initialization
579   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
580                     grpc_schedule_on_exec_ctx);
581   StartNewCallLocked();
582 }
583 
584 template <typename T>
Orphan()585 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
586   shutting_down_ = true;
587   calld_.reset();
588   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
589   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
590 }
591 
592 template <typename T>
OnCallFinishedLocked()593 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
594   const bool seen_response = calld_->seen_response();
595   calld_.reset();
596   if (seen_response) {
597     // If we lost connection to the xds server, reset backoff and restart the
598     // call immediately.
599     backoff_.Reset();
600     StartNewCallLocked();
601   } else {
602     // If we failed to connect to the xds server, retry later.
603     StartRetryTimerLocked();
604   }
605 }
606 
607 template <typename T>
StartNewCallLocked()608 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
609   if (shutting_down_) return;
610   GPR_ASSERT(chand_->channel_ != nullptr);
611   GPR_ASSERT(calld_ == nullptr);
612   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
613     gpr_log(GPR_INFO,
614             "[xds_client %p] Start new call from retryable call (chand: %p, "
615             "retryable call: %p)",
616             chand()->xds_client(), chand(), this);
617   }
618   calld_ = MakeOrphanable<T>(
619       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
620 }
621 
622 template <typename T>
StartRetryTimerLocked()623 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
624   if (shutting_down_) return;
625   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
626   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
627     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
628     gpr_log(GPR_INFO,
629             "[xds_client %p] Failed to connect to xds server (chand: %p) "
630             "retry timer will fire in %" PRId64 "ms.",
631             chand()->xds_client(), chand(), timeout);
632   }
633   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
634   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
635   retry_timer_callback_pending_ = true;
636 }
637 
638 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)639 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
640     void* arg, grpc_error* error) {
641   RetryableCall* calld = static_cast<RetryableCall*>(arg);
642   {
643     MutexLock lock(&calld->chand_->xds_client()->mu_);
644     calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
645   }
646   calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
647 }
648 
649 template <typename T>
OnRetryTimerLocked(grpc_error * error)650 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
651     grpc_error* error) {
652   retry_timer_callback_pending_ = false;
653   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
654     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
655       gpr_log(
656           GPR_INFO,
657           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
658           chand()->xds_client(), chand(), this);
659     }
660     StartNewCallLocked();
661   }
662   GRPC_ERROR_UNREF(error);
663 }
664 
665 //
666 // XdsClient::ChannelState::AdsCallState
667 //
668 
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)669 XdsClient::ChannelState::AdsCallState::AdsCallState(
670     RefCountedPtr<RetryableCall<AdsCallState>> parent)
671     : InternallyRefCounted<AdsCallState>(
672           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
673               ? "AdsCallState"
674               : nullptr),
675       parent_(std::move(parent)) {
676   // Init the ADS call. Note that the call will progress every time there's
677   // activity in xds_client()->interested_parties_, which is comprised of
678   // the polling entities from client_channel.
679   GPR_ASSERT(xds_client() != nullptr);
680   // Create a call with the specified method name.
681   const auto& method =
682       chand()->server_.ShouldUseV3()
683           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
684           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
685   call_ = grpc_channel_create_pollset_set_call(
686       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
687       xds_client()->interested_parties_, method, nullptr,
688       GRPC_MILLIS_INF_FUTURE, nullptr);
689   GPR_ASSERT(call_ != nullptr);
690   // Init data associated with the call.
691   grpc_metadata_array_init(&initial_metadata_recv_);
692   grpc_metadata_array_init(&trailing_metadata_recv_);
693   // Start the call.
694   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
695     gpr_log(GPR_INFO,
696             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
697             "call: %p)",
698             xds_client(), chand(), this, call_);
699   }
700   // Create the ops.
701   grpc_call_error call_error;
702   grpc_op ops[3];
703   memset(ops, 0, sizeof(ops));
704   // Op: send initial metadata.
705   grpc_op* op = ops;
706   op->op = GRPC_OP_SEND_INITIAL_METADATA;
707   op->data.send_initial_metadata.count = 0;
708   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
709               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
710   op->reserved = nullptr;
711   op++;
712   call_error = grpc_call_start_batch_and_execute(
713       call_, ops, static_cast<size_t>(op - ops), nullptr);
714   GPR_ASSERT(GRPC_CALL_OK == call_error);
715   // Op: send request message.
716   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
717                     grpc_schedule_on_exec_ctx);
718   for (const auto& p : xds_client()->listener_map_) {
719     Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
720   }
721   for (const auto& p : xds_client()->route_config_map_) {
722     Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
723   }
724   for (const auto& p : xds_client()->cluster_map_) {
725     Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
726   }
727   for (const auto& p : xds_client()->endpoint_map_) {
728     Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
729   }
730   // Op: recv initial metadata.
731   op = ops;
732   op->op = GRPC_OP_RECV_INITIAL_METADATA;
733   op->data.recv_initial_metadata.recv_initial_metadata =
734       &initial_metadata_recv_;
735   op->flags = 0;
736   op->reserved = nullptr;
737   op++;
738   // Op: recv response.
739   op->op = GRPC_OP_RECV_MESSAGE;
740   op->data.recv_message.recv_message = &recv_message_payload_;
741   op->flags = 0;
742   op->reserved = nullptr;
743   op++;
744   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
745   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
746                     grpc_schedule_on_exec_ctx);
747   call_error = grpc_call_start_batch_and_execute(
748       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
749   GPR_ASSERT(GRPC_CALL_OK == call_error);
750   // Op: recv server status.
751   op = ops;
752   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
753   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
754   op->data.recv_status_on_client.status = &status_code_;
755   op->data.recv_status_on_client.status_details = &status_details_;
756   op->flags = 0;
757   op->reserved = nullptr;
758   op++;
759   // This callback signals the end of the call, so it relies on the initial
760   // ref instead of a new ref. When it's invoked, it's the initial ref that is
761   // unreffed.
762   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
763                     grpc_schedule_on_exec_ctx);
764   call_error = grpc_call_start_batch_and_execute(
765       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
766   GPR_ASSERT(GRPC_CALL_OK == call_error);
767 }
768 
~AdsCallState()769 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
770   grpc_metadata_array_destroy(&initial_metadata_recv_);
771   grpc_metadata_array_destroy(&trailing_metadata_recv_);
772   grpc_byte_buffer_destroy(send_message_payload_);
773   grpc_byte_buffer_destroy(recv_message_payload_);
774   grpc_slice_unref_internal(status_details_);
775   GPR_ASSERT(call_ != nullptr);
776   grpc_call_unref(call_);
777 }
778 
Orphan()779 void XdsClient::ChannelState::AdsCallState::Orphan() {
780   GPR_ASSERT(call_ != nullptr);
781   // If we are here because xds_client wants to cancel the call,
782   // on_status_received_ will complete the cancellation and clean up. Otherwise,
783   // we are here because xds_client has to orphan a failed call, then the
784   // following cancellation will be a no-op.
785   grpc_call_cancel_internal(call_);
786   state_map_.clear();
787   // Note that the initial ref is hold by on_status_received_. So the
788   // corresponding unref happens in on_status_received_ instead of here.
789 }
790 
SendMessageLocked(const std::string & type_url)791 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
792     const std::string& type_url) {
793   // Buffer message sending if an existing message is in flight.
794   if (send_message_payload_ != nullptr) {
795     buffered_requests_.insert(type_url);
796     return;
797   }
798   auto& state = state_map_[type_url];
799   grpc_slice request_payload_slice;
800   std::set<absl::string_view> resource_names =
801       ResourceNamesForRequest(type_url);
802   request_payload_slice = xds_client()->api_.CreateAdsRequest(
803       chand()->server_, type_url, resource_names,
804       xds_client()->resource_version_map_[type_url], state.nonce,
805       GRPC_ERROR_REF(state.error), !sent_initial_message_);
806   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
807       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
808     state_map_.erase(type_url);
809   }
810   sent_initial_message_ = true;
811   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
812     gpr_log(GPR_INFO,
813             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
814             "error=%s resources=%s",
815             xds_client(), type_url.c_str(),
816             xds_client()->resource_version_map_[type_url].c_str(),
817             state.nonce.c_str(), grpc_error_string(state.error),
818             absl::StrJoin(resource_names, " ").c_str());
819   }
820   GRPC_ERROR_UNREF(state.error);
821   state.error = GRPC_ERROR_NONE;
822   // Create message payload.
823   send_message_payload_ =
824       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
825   grpc_slice_unref_internal(request_payload_slice);
826   // Send the message.
827   grpc_op op;
828   memset(&op, 0, sizeof(op));
829   op.op = GRPC_OP_SEND_MESSAGE;
830   op.data.send_message.send_message = send_message_payload_;
831   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
832   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
833                     grpc_schedule_on_exec_ctx);
834   grpc_call_error call_error =
835       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
836   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
837     gpr_log(GPR_ERROR,
838             "[xds_client %p] calld=%p call_error=%d sending ADS message",
839             xds_client(), this, call_error);
840     GPR_ASSERT(GRPC_CALL_OK == call_error);
841   }
842 }
843 
Subscribe(const std::string & type_url,const std::string & name)844 void XdsClient::ChannelState::AdsCallState::Subscribe(
845     const std::string& type_url, const std::string& name) {
846   auto& state = state_map_[type_url].subscribed_resources[name];
847   if (state == nullptr) {
848     state = MakeOrphanable<ResourceState>(
849         type_url, name, !xds_client()->resource_version_map_[type_url].empty());
850     SendMessageLocked(type_url);
851   }
852 }
853 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)854 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
855     const std::string& type_url, const std::string& name,
856     bool delay_unsubscription) {
857   state_map_[type_url].subscribed_resources.erase(name);
858   if (!delay_unsubscription) SendMessageLocked(type_url);
859 }
860 
HasSubscribedResources() const861 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
862   for (const auto& p : state_map_) {
863     if (!p.second.subscribed_resources.empty()) return true;
864   }
865   return false;
866 }
867 
AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map)868 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
869     XdsApi::LdsUpdateMap lds_update_map) {
870   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
871     gpr_log(GPR_INFO,
872             "[xds_client %p] LDS update received containing %" PRIuPTR
873             " resources",
874             xds_client(), lds_update_map.size());
875   }
876   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
877   std::set<std::string> rds_resource_names_seen;
878   for (auto& p : lds_update_map) {
879     const std::string& listener_name = p.first;
880     XdsApi::LdsUpdate& lds_update = p.second;
881     auto& state = lds_state.subscribed_resources[listener_name];
882     if (state != nullptr) state->Finish();
883     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
884       gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
885               xds_client(), listener_name.c_str(),
886               (!lds_update.route_config_name.empty()
887                    ? lds_update.route_config_name.c_str()
888                    : "<inlined>"));
889       if (lds_update.rds_update.has_value()) {
890         gpr_log(GPR_INFO, "RouteConfiguration: %s",
891                 lds_update.rds_update->ToString().c_str());
892       }
893     }
894     // Record the RDS resource names seen.
895     if (!lds_update.route_config_name.empty()) {
896       rds_resource_names_seen.insert(lds_update.route_config_name);
897     }
898     // Ignore identical update.
899     ListenerState& listener_state = xds_client()->listener_map_[listener_name];
900     if (listener_state.update.has_value() &&
901         *listener_state.update == lds_update) {
902       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
903         gpr_log(GPR_INFO,
904                 "[xds_client %p] LDS update for %s identical to current, "
905                 "ignoring.",
906                 xds_client(), listener_name.c_str());
907       }
908       continue;
909     }
910     // Update the listener state.
911     listener_state.update = std::move(lds_update);
912     // Notify watchers.
913     for (const auto& p : listener_state.watchers) {
914       p.first->OnListenerChanged(*listener_state.update);
915     }
916   }
917   // For any subscribed resource that is not present in the update,
918   // remove it from the cache and notify watchers that it does not exist.
919   for (const auto& p : lds_state.subscribed_resources) {
920     const std::string& listener_name = p.first;
921     if (lds_update_map.find(listener_name) == lds_update_map.end()) {
922       ListenerState& listener_state =
923           xds_client()->listener_map_[listener_name];
924       // If the resource was newly requested but has not yet been received,
925       // we don't want to generate an error for the watchers, because this LDS
926       // response may be in reaction to an earlier request that did not yet
927       // request the new resource, so its absence from the response does not
928       // necessarily indicate that the resource does not exist.
929       // For that case, we rely on the request timeout instead.
930       if (!listener_state.update.has_value()) continue;
931       listener_state.update.reset();
932       for (const auto& p : listener_state.watchers) {
933         p.first->OnResourceDoesNotExist();
934       }
935     }
936   }
937   // For any RDS resource that is no longer referred to by any LDS
938   // resources, remove it from the cache and notify watchers that it
939   // does not exist.
940   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
941   for (const auto& p : rds_state.subscribed_resources) {
942     const std::string& rds_resource_name = p.first;
943     if (rds_resource_names_seen.find(rds_resource_name) ==
944         rds_resource_names_seen.end()) {
945       RouteConfigState& route_config_state =
946           xds_client()->route_config_map_[rds_resource_name];
947       route_config_state.update.reset();
948       for (const auto& p : route_config_state.watchers) {
949         p.first->OnResourceDoesNotExist();
950       }
951     }
952   }
953 }
954 
AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map)955 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
956     XdsApi::RdsUpdateMap rds_update_map) {
957   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
958     gpr_log(GPR_INFO,
959             "[xds_client %p] RDS update received containing %" PRIuPTR
960             " resources",
961             xds_client(), rds_update_map.size());
962   }
963   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
964   for (auto& p : rds_update_map) {
965     const std::string& route_config_name = p.first;
966     XdsApi::RdsUpdate& rds_update = p.second;
967     auto& state = rds_state.subscribed_resources[route_config_name];
968     if (state != nullptr) state->Finish();
969     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
970       gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
971               rds_update.ToString().c_str());
972     }
973     RouteConfigState& route_config_state =
974         xds_client()->route_config_map_[route_config_name];
975     // Ignore identical update.
976     if (route_config_state.update.has_value() &&
977         *route_config_state.update == rds_update) {
978       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
979         gpr_log(GPR_INFO,
980                 "[xds_client %p] RDS resource identical to current, ignoring",
981                 xds_client());
982       }
983       continue;
984     }
985     // Update the cache.
986     route_config_state.update = std::move(rds_update);
987     // Notify all watchers.
988     for (const auto& p : route_config_state.watchers) {
989       p.first->OnRouteConfigChanged(*route_config_state.update);
990     }
991   }
992 }
993 
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)994 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
995     XdsApi::CdsUpdateMap cds_update_map) {
996   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
997     gpr_log(GPR_INFO,
998             "[xds_client %p] CDS update received containing %" PRIuPTR
999             " resources",
1000             xds_client(), cds_update_map.size());
1001   }
1002   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1003   std::set<std::string> eds_resource_names_seen;
1004   for (auto& p : cds_update_map) {
1005     const char* cluster_name = p.first.c_str();
1006     XdsApi::CdsUpdate& cds_update = p.second;
1007     auto& state = cds_state.subscribed_resources[cluster_name];
1008     if (state != nullptr) state->Finish();
1009     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1010       gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1011               cluster_name, cds_update.ToString().c_str());
1012     }
1013     // Record the EDS resource names seen.
1014     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1015                                        ? cluster_name
1016                                        : cds_update.eds_service_name);
1017     // Ignore identical update.
1018     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1019     if (cluster_state.update.has_value() &&
1020         *cluster_state.update == cds_update) {
1021       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1022         gpr_log(GPR_INFO,
1023                 "[xds_client %p] CDS update identical to current, ignoring.",
1024                 xds_client());
1025       }
1026       continue;
1027     }
1028     // Update the cluster state.
1029     cluster_state.update = std::move(cds_update);
1030     // Notify all watchers.
1031     for (const auto& p : cluster_state.watchers) {
1032       p.first->OnClusterChanged(cluster_state.update.value());
1033     }
1034   }
1035   // For any subscribed resource that is not present in the update,
1036   // remove it from the cache and notify watchers that it does not exist.
1037   for (const auto& p : cds_state.subscribed_resources) {
1038     const std::string& cluster_name = p.first;
1039     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1040       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1041       // If the resource was newly requested but has not yet been received,
1042       // we don't want to generate an error for the watchers, because this CDS
1043       // response may be in reaction to an earlier request that did not yet
1044       // request the new resource, so its absence from the response does not
1045       // necessarily indicate that the resource does not exist.
1046       // For that case, we rely on the request timeout instead.
1047       if (!cluster_state.update.has_value()) continue;
1048       cluster_state.update.reset();
1049       for (const auto& p : cluster_state.watchers) {
1050         p.first->OnResourceDoesNotExist();
1051       }
1052     }
1053   }
1054   // For any EDS resource that is no longer referred to by any CDS
1055   // resources, remove it from the cache and notify watchers that it
1056   // does not exist.
1057   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1058   for (const auto& p : eds_state.subscribed_resources) {
1059     const std::string& eds_resource_name = p.first;
1060     if (eds_resource_names_seen.find(eds_resource_name) ==
1061         eds_resource_names_seen.end()) {
1062       EndpointState& endpoint_state =
1063           xds_client()->endpoint_map_[eds_resource_name];
1064       endpoint_state.update.reset();
1065       for (const auto& p : endpoint_state.watchers) {
1066         p.first->OnResourceDoesNotExist();
1067       }
1068     }
1069   }
1070 }
1071 
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1072 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1073     XdsApi::EdsUpdateMap eds_update_map) {
1074   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1075     gpr_log(GPR_INFO,
1076             "[xds_client %p] EDS update received containing %" PRIuPTR
1077             " resources",
1078             xds_client(), eds_update_map.size());
1079   }
1080   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1081   for (auto& p : eds_update_map) {
1082     const char* eds_service_name = p.first.c_str();
1083     XdsApi::EdsUpdate& eds_update = p.second;
1084     auto& state = eds_state.subscribed_resources[eds_service_name];
1085     if (state != nullptr) state->Finish();
1086     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1087       gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1088               eds_service_name, eds_update.ToString().c_str());
1089     }
1090     EndpointState& endpoint_state =
1091         xds_client()->endpoint_map_[eds_service_name];
1092     // Ignore identical update.
1093     if (endpoint_state.update.has_value() &&
1094         *endpoint_state.update == eds_update) {
1095       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1096         gpr_log(GPR_INFO,
1097                 "[xds_client %p] EDS update identical to current, ignoring.",
1098                 xds_client());
1099       }
1100       continue;
1101     }
1102     // Update the cluster state.
1103     endpoint_state.update = std::move(eds_update);
1104     // Notify all watchers.
1105     for (const auto& p : endpoint_state.watchers) {
1106       p.first->OnEndpointChanged(endpoint_state.update.value());
1107     }
1108   }
1109 }
1110 
OnRequestSent(void * arg,grpc_error * error)1111 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1112                                                           grpc_error* error) {
1113   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1114   {
1115     MutexLock lock(&ads_calld->xds_client()->mu_);
1116     ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1117   }
1118   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1119 }
1120 
OnRequestSentLocked(grpc_error * error)1121 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1122     grpc_error* error) {
1123   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1124     // Clean up the sent message.
1125     grpc_byte_buffer_destroy(send_message_payload_);
1126     send_message_payload_ = nullptr;
1127     // Continue to send another pending message if any.
1128     // TODO(roth): The current code to handle buffered messages has the
1129     // advantage of sending only the most recent list of resource names for
1130     // each resource type (no matter how many times that resource type has
1131     // been requested to send while the current message sending is still
1132     // pending). But its disadvantage is that we send the requests in fixed
1133     // order of resource types. We need to fix this if we are seeing some
1134     // resource type(s) starved due to frequent requests of other resource
1135     // type(s).
1136     auto it = buffered_requests_.begin();
1137     if (it != buffered_requests_.end()) {
1138       SendMessageLocked(*it);
1139       buffered_requests_.erase(it);
1140     }
1141   }
1142   GRPC_ERROR_UNREF(error);
1143 }
1144 
OnResponseReceived(void * arg,grpc_error *)1145 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1146     void* arg, grpc_error* /* error */) {
1147   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1148   bool done;
1149   {
1150     MutexLock lock(&ads_calld->xds_client()->mu_);
1151     done = ads_calld->OnResponseReceivedLocked();
1152   }
1153   if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1154 }
1155 
OnResponseReceivedLocked()1156 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1157   // Empty payload means the call was cancelled.
1158   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1159     return true;
1160   }
1161   // Read the response.
1162   grpc_byte_buffer_reader bbr;
1163   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1164   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1165   grpc_byte_buffer_reader_destroy(&bbr);
1166   grpc_byte_buffer_destroy(recv_message_payload_);
1167   recv_message_payload_ = nullptr;
1168   // Parse and validate the response.
1169   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1170       response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1171       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1172       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1173       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1174   grpc_slice_unref_internal(response_slice);
1175   if (result.type_url.empty()) {
1176     // Ignore unparsable response.
1177     gpr_log(GPR_ERROR,
1178             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1179             xds_client(), grpc_error_string(result.parse_error));
1180     GRPC_ERROR_UNREF(result.parse_error);
1181   } else {
1182     // Update nonce.
1183     auto& state = state_map_[result.type_url];
1184     state.nonce = std::move(result.nonce);
1185     // NACK or ACK the response.
1186     if (result.parse_error != GRPC_ERROR_NONE) {
1187       GRPC_ERROR_UNREF(state.error);
1188       state.error = result.parse_error;
1189       // NACK unacceptable update.
1190       gpr_log(GPR_ERROR,
1191               "[xds_client %p] ADS response invalid for resource type %s "
1192               "version %s, will NACK: nonce=%s error=%s",
1193               xds_client(), result.type_url.c_str(), result.version.c_str(),
1194               state.nonce.c_str(), grpc_error_string(result.parse_error));
1195       SendMessageLocked(result.type_url);
1196     } else {
1197       seen_response_ = true;
1198       // Accept the ADS response according to the type_url.
1199       if (result.type_url == XdsApi::kLdsTypeUrl) {
1200         AcceptLdsUpdate(std::move(result.lds_update_map));
1201       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1202         AcceptRdsUpdate(std::move(result.rds_update_map));
1203       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1204         AcceptCdsUpdate(std::move(result.cds_update_map));
1205       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1206         AcceptEdsUpdate(std::move(result.eds_update_map));
1207       }
1208       xds_client()->resource_version_map_[result.type_url] =
1209           std::move(result.version);
1210       // ACK the update.
1211       SendMessageLocked(result.type_url);
1212       // Start load reporting if needed.
1213       auto& lrs_call = chand()->lrs_calld_;
1214       if (lrs_call != nullptr) {
1215         LrsCallState* lrs_calld = lrs_call->calld();
1216         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1217       }
1218     }
1219   }
1220   if (xds_client()->shutting_down_) return true;
1221   // Keep listening for updates.
1222   grpc_op op;
1223   memset(&op, 0, sizeof(op));
1224   op.op = GRPC_OP_RECV_MESSAGE;
1225   op.data.recv_message.recv_message = &recv_message_payload_;
1226   op.flags = 0;
1227   op.reserved = nullptr;
1228   GPR_ASSERT(call_ != nullptr);
1229   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1230   const grpc_call_error call_error =
1231       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1232   GPR_ASSERT(GRPC_CALL_OK == call_error);
1233   return false;
1234 }
1235 
OnStatusReceived(void * arg,grpc_error * error)1236 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1237     void* arg, grpc_error* error) {
1238   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1239   {
1240     MutexLock lock(&ads_calld->xds_client()->mu_);
1241     ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1242   }
1243   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1244 }
1245 
OnStatusReceivedLocked(grpc_error * error)1246 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1247     grpc_error* error) {
1248   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1249     char* status_details = grpc_slice_to_c_string(status_details_);
1250     gpr_log(GPR_INFO,
1251             "[xds_client %p] ADS call status received. Status = %d, details "
1252             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1253             xds_client(), status_code_, status_details, chand(), this, call_,
1254             grpc_error_string(error));
1255     gpr_free(status_details);
1256   }
1257   // Ignore status from a stale call.
1258   if (IsCurrentCallOnChannel()) {
1259     // Try to restart the call.
1260     parent_->OnCallFinishedLocked();
1261     // Send error to all watchers.
1262     xds_client()->NotifyOnErrorLocked(
1263         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1264   }
1265   GRPC_ERROR_UNREF(error);
1266 }
1267 
IsCurrentCallOnChannel() const1268 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1269   // If the retryable ADS call is null (which only happens when the xds channel
1270   // is shutting down), all the ADS calls are stale.
1271   if (chand()->ads_calld_ == nullptr) return false;
1272   return this == chand()->ads_calld_->calld();
1273 }
1274 
1275 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1276 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1277     const std::string& type_url) {
1278   std::set<absl::string_view> resource_names;
1279   auto it = state_map_.find(type_url);
1280   if (it != state_map_.end()) {
1281     for (auto& p : it->second.subscribed_resources) {
1282       resource_names.insert(p.first);
1283       OrphanablePtr<ResourceState>& state = p.second;
1284       state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1285     }
1286   }
1287   return resource_names;
1288 }
1289 
1290 //
1291 // XdsClient::ChannelState::LrsCallState::Reporter
1292 //
1293 
Orphan()1294 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1295   if (next_report_timer_callback_pending_) {
1296     grpc_timer_cancel(&next_report_timer_);
1297   }
1298 }
1299 
1300 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1301     ScheduleNextReportLocked() {
1302   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1303   grpc_timer_init(&next_report_timer_, next_report_time,
1304                   &on_next_report_timer_);
1305   next_report_timer_callback_pending_ = true;
1306 }
1307 
OnNextReportTimer(void * arg,grpc_error * error)1308 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1309     void* arg, grpc_error* error) {
1310   Reporter* self = static_cast<Reporter*>(arg);
1311   bool done;
1312   {
1313     MutexLock lock(&self->xds_client()->mu_);
1314     done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1315   }
1316   if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1317 }
1318 
OnNextReportTimerLocked(grpc_error * error)1319 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1320     grpc_error* error) {
1321   next_report_timer_callback_pending_ = false;
1322   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1323     GRPC_ERROR_UNREF(error);
1324     return true;
1325   }
1326   return SendReportLocked();
1327 }
1328 
1329 namespace {
1330 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1331 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1332   for (const auto& p : snapshot) {
1333     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1334     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1335     for (const auto& q : cluster_snapshot.locality_stats) {
1336       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1337       if (!locality_snapshot.IsZero()) return false;
1338     }
1339   }
1340   return true;
1341 }
1342 
1343 }  // namespace
1344 
SendReportLocked()1345 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1346   // Construct snapshot from all reported stats.
1347   XdsApi::ClusterLoadReportMap snapshot =
1348       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1349                                                   parent_->cluster_names_);
1350   // Skip client load report if the counters were all zero in the last
1351   // report and they are still zero in this one.
1352   const bool old_val = last_report_counters_were_zero_;
1353   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1354   if (old_val && last_report_counters_were_zero_) {
1355     if (xds_client()->load_report_map_.empty()) {
1356       parent_->chand()->StopLrsCall();
1357       return true;
1358     }
1359     ScheduleNextReportLocked();
1360     return false;
1361   }
1362   // Create a request that contains the snapshot.
1363   grpc_slice request_payload_slice =
1364       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1365   parent_->send_message_payload_ =
1366       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1367   grpc_slice_unref_internal(request_payload_slice);
1368   // Send the report.
1369   grpc_op op;
1370   memset(&op, 0, sizeof(op));
1371   op.op = GRPC_OP_SEND_MESSAGE;
1372   op.data.send_message.send_message = parent_->send_message_payload_;
1373   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1374       parent_->call_, &op, 1, &on_report_done_);
1375   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1376     gpr_log(GPR_ERROR,
1377             "[xds_client %p] calld=%p call_error=%d sending client load report",
1378             xds_client(), this, call_error);
1379     GPR_ASSERT(GRPC_CALL_OK == call_error);
1380   }
1381   return false;
1382 }
1383 
OnReportDone(void * arg,grpc_error * error)1384 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1385     void* arg, grpc_error* error) {
1386   Reporter* self = static_cast<Reporter*>(arg);
1387   bool done;
1388   {
1389     MutexLock lock(&self->xds_client()->mu_);
1390     done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1391   }
1392   if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1393 }
1394 
OnReportDoneLocked(grpc_error * error)1395 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1396     grpc_error* error) {
1397   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1398   parent_->send_message_payload_ = nullptr;
1399   // If there are no more registered stats to report, cancel the call.
1400   if (xds_client()->load_report_map_.empty()) {
1401     parent_->chand()->StopLrsCall();
1402     GRPC_ERROR_UNREF(error);
1403     return true;
1404   }
1405   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1406     GRPC_ERROR_UNREF(error);
1407     // If this reporter is no longer the current one on the call, the reason
1408     // might be that it was orphaned for a new one due to config update.
1409     if (!IsCurrentReporterOnCall()) {
1410       parent_->MaybeStartReportingLocked();
1411     }
1412     return true;
1413   }
1414   ScheduleNextReportLocked();
1415   return false;
1416 }
1417 
1418 //
1419 // XdsClient::ChannelState::LrsCallState
1420 //
1421 
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1422 XdsClient::ChannelState::LrsCallState::LrsCallState(
1423     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1424     : InternallyRefCounted<LrsCallState>(
1425           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1426               ? "LrsCallState"
1427               : nullptr),
1428       parent_(std::move(parent)) {
1429   // Init the LRS call. Note that the call will progress every time there's
1430   // activity in xds_client()->interested_parties_, which is comprised of
1431   // the polling entities from client_channel.
1432   GPR_ASSERT(xds_client() != nullptr);
1433   const auto& method =
1434       chand()->server_.ShouldUseV3()
1435           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1436           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1437   call_ = grpc_channel_create_pollset_set_call(
1438       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1439       xds_client()->interested_parties_, method, nullptr,
1440       GRPC_MILLIS_INF_FUTURE, nullptr);
1441   GPR_ASSERT(call_ != nullptr);
1442   // Init the request payload.
1443   grpc_slice request_payload_slice =
1444       xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1445   send_message_payload_ =
1446       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1447   grpc_slice_unref_internal(request_payload_slice);
1448   // Init other data associated with the LRS call.
1449   grpc_metadata_array_init(&initial_metadata_recv_);
1450   grpc_metadata_array_init(&trailing_metadata_recv_);
1451   // Start the call.
1452   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1453     gpr_log(GPR_INFO,
1454             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1455             "call: %p)",
1456             xds_client(), chand(), this, call_);
1457   }
1458   // Create the ops.
1459   grpc_call_error call_error;
1460   grpc_op ops[3];
1461   memset(ops, 0, sizeof(ops));
1462   // Op: send initial metadata.
1463   grpc_op* op = ops;
1464   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1465   op->data.send_initial_metadata.count = 0;
1466   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1467               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1468   op->reserved = nullptr;
1469   op++;
1470   // Op: send request message.
1471   GPR_ASSERT(send_message_payload_ != nullptr);
1472   op->op = GRPC_OP_SEND_MESSAGE;
1473   op->data.send_message.send_message = send_message_payload_;
1474   op->flags = 0;
1475   op->reserved = nullptr;
1476   op++;
1477   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1478   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1479                     grpc_schedule_on_exec_ctx);
1480   call_error = grpc_call_start_batch_and_execute(
1481       call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1482   GPR_ASSERT(GRPC_CALL_OK == call_error);
1483   // Op: recv initial metadata.
1484   op = ops;
1485   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1486   op->data.recv_initial_metadata.recv_initial_metadata =
1487       &initial_metadata_recv_;
1488   op->flags = 0;
1489   op->reserved = nullptr;
1490   op++;
1491   // Op: recv response.
1492   op->op = GRPC_OP_RECV_MESSAGE;
1493   op->data.recv_message.recv_message = &recv_message_payload_;
1494   op->flags = 0;
1495   op->reserved = nullptr;
1496   op++;
1497   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1498   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1499                     grpc_schedule_on_exec_ctx);
1500   call_error = grpc_call_start_batch_and_execute(
1501       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1502   GPR_ASSERT(GRPC_CALL_OK == call_error);
1503   // Op: recv server status.
1504   op = ops;
1505   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1506   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1507   op->data.recv_status_on_client.status = &status_code_;
1508   op->data.recv_status_on_client.status_details = &status_details_;
1509   op->flags = 0;
1510   op->reserved = nullptr;
1511   op++;
1512   // This callback signals the end of the call, so it relies on the initial
1513   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1514   // unreffed.
1515   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1516                     grpc_schedule_on_exec_ctx);
1517   call_error = grpc_call_start_batch_and_execute(
1518       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1519   GPR_ASSERT(GRPC_CALL_OK == call_error);
1520 }
1521 
~LrsCallState()1522 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1523   grpc_metadata_array_destroy(&initial_metadata_recv_);
1524   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1525   grpc_byte_buffer_destroy(send_message_payload_);
1526   grpc_byte_buffer_destroy(recv_message_payload_);
1527   grpc_slice_unref_internal(status_details_);
1528   GPR_ASSERT(call_ != nullptr);
1529   grpc_call_unref(call_);
1530 }
1531 
Orphan()1532 void XdsClient::ChannelState::LrsCallState::Orphan() {
1533   reporter_.reset();
1534   GPR_ASSERT(call_ != nullptr);
1535   // If we are here because xds_client wants to cancel the call,
1536   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1537   // we are here because xds_client has to orphan a failed call, then the
1538   // following cancellation will be a no-op.
1539   grpc_call_cancel_internal(call_);
1540   // Note that the initial ref is hold by on_status_received_. So the
1541   // corresponding unref happens in on_status_received_ instead of here.
1542 }
1543 
MaybeStartReportingLocked()1544 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1545   // Don't start again if already started.
1546   if (reporter_ != nullptr) return;
1547   // Don't start if the previous send_message op (of the initial request or the
1548   // last report of the previous reporter) hasn't completed.
1549   if (send_message_payload_ != nullptr) return;
1550   // Don't start if no LRS response has arrived.
1551   if (!seen_response()) return;
1552   // Don't start if the ADS call hasn't received any valid response. Note that
1553   // this must be the first channel because it is the current channel but its
1554   // ADS call hasn't seen any response.
1555   if (chand()->ads_calld_ == nullptr ||
1556       chand()->ads_calld_->calld() == nullptr ||
1557       !chand()->ads_calld_->calld()->seen_response()) {
1558     return;
1559   }
1560   // Start reporting.
1561   reporter_ = MakeOrphanable<Reporter>(
1562       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1563 }
1564 
OnInitialRequestSent(void * arg,grpc_error *)1565 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1566     void* arg, grpc_error* /*error*/) {
1567   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1568   {
1569     MutexLock lock(&lrs_calld->xds_client()->mu_);
1570     lrs_calld->OnInitialRequestSentLocked();
1571   }
1572   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1573 }
1574 
OnInitialRequestSentLocked()1575 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1576   // Clear the send_message_payload_.
1577   grpc_byte_buffer_destroy(send_message_payload_);
1578   send_message_payload_ = nullptr;
1579   MaybeStartReportingLocked();
1580 }
1581 
OnResponseReceived(void * arg,grpc_error *)1582 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1583     void* arg, grpc_error* /*error*/) {
1584   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1585   bool done;
1586   {
1587     MutexLock lock(&lrs_calld->xds_client()->mu_);
1588     done = lrs_calld->OnResponseReceivedLocked();
1589   }
1590   if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1591 }
1592 
OnResponseReceivedLocked()1593 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1594   // Empty payload means the call was cancelled.
1595   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1596     return true;
1597   }
1598   // Read the response.
1599   grpc_byte_buffer_reader bbr;
1600   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1601   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1602   grpc_byte_buffer_reader_destroy(&bbr);
1603   grpc_byte_buffer_destroy(recv_message_payload_);
1604   recv_message_payload_ = nullptr;
1605   // This anonymous lambda is a hack to avoid the usage of goto.
1606   [&]() {
1607     // Parse the response.
1608     bool send_all_clusters = false;
1609     std::set<std::string> new_cluster_names;
1610     grpc_millis new_load_reporting_interval;
1611     grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1612         response_slice, &send_all_clusters, &new_cluster_names,
1613         &new_load_reporting_interval);
1614     if (parse_error != GRPC_ERROR_NONE) {
1615       gpr_log(GPR_ERROR,
1616               "[xds_client %p] LRS response parsing failed. error=%s",
1617               xds_client(), grpc_error_string(parse_error));
1618       GRPC_ERROR_UNREF(parse_error);
1619       return;
1620     }
1621     seen_response_ = true;
1622     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1623       gpr_log(
1624           GPR_INFO,
1625           "[xds_client %p] LRS response received, %" PRIuPTR
1626           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1627           "ms",
1628           xds_client(), new_cluster_names.size(), send_all_clusters,
1629           new_load_reporting_interval);
1630       size_t i = 0;
1631       for (const auto& name : new_cluster_names) {
1632         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1633                 xds_client(), i++, name.c_str());
1634       }
1635     }
1636     if (new_load_reporting_interval <
1637         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1638       new_load_reporting_interval =
1639           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1640       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1641         gpr_log(GPR_INFO,
1642                 "[xds_client %p] Increased load_report_interval to minimum "
1643                 "value %dms",
1644                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1645       }
1646     }
1647     // Ignore identical update.
1648     if (send_all_clusters == send_all_clusters_ &&
1649         cluster_names_ == new_cluster_names &&
1650         load_reporting_interval_ == new_load_reporting_interval) {
1651       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1652         gpr_log(GPR_INFO,
1653                 "[xds_client %p] Incoming LRS response identical to current, "
1654                 "ignoring.",
1655                 xds_client());
1656       }
1657       return;
1658     }
1659     // Stop current load reporting (if any) to adopt the new config.
1660     reporter_.reset();
1661     // Record the new config.
1662     send_all_clusters_ = send_all_clusters;
1663     cluster_names_ = std::move(new_cluster_names);
1664     load_reporting_interval_ = new_load_reporting_interval;
1665     // Try starting sending load report.
1666     MaybeStartReportingLocked();
1667   }();
1668   grpc_slice_unref_internal(response_slice);
1669   if (xds_client()->shutting_down_) return true;
1670   // Keep listening for LRS config updates.
1671   grpc_op op;
1672   memset(&op, 0, sizeof(op));
1673   op.op = GRPC_OP_RECV_MESSAGE;
1674   op.data.recv_message.recv_message = &recv_message_payload_;
1675   op.flags = 0;
1676   op.reserved = nullptr;
1677   GPR_ASSERT(call_ != nullptr);
1678   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1679   const grpc_call_error call_error =
1680       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1681   GPR_ASSERT(GRPC_CALL_OK == call_error);
1682   return false;
1683 }
1684 
OnStatusReceived(void * arg,grpc_error * error)1685 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1686     void* arg, grpc_error* error) {
1687   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1688   {
1689     MutexLock lock(&lrs_calld->xds_client()->mu_);
1690     lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1691   }
1692   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1693 }
1694 
OnStatusReceivedLocked(grpc_error * error)1695 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1696     grpc_error* error) {
1697   GPR_ASSERT(call_ != nullptr);
1698   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1699     char* status_details = grpc_slice_to_c_string(status_details_);
1700     gpr_log(GPR_INFO,
1701             "[xds_client %p] LRS call status received. Status = %d, details "
1702             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1703             xds_client(), status_code_, status_details, chand(), this, call_,
1704             grpc_error_string(error));
1705     gpr_free(status_details);
1706   }
1707   // Ignore status from a stale call.
1708   if (IsCurrentCallOnChannel()) {
1709     GPR_ASSERT(!xds_client()->shutting_down_);
1710     // Try to restart the call.
1711     parent_->OnCallFinishedLocked();
1712   }
1713   GRPC_ERROR_UNREF(error);
1714 }
1715 
IsCurrentCallOnChannel() const1716 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1717   // If the retryable LRS call is null (which only happens when the xds channel
1718   // is shutting down), all the LRS calls are stale.
1719   if (chand()->lrs_calld_ == nullptr) return false;
1720   return this == chand()->lrs_calld_->calld();
1721 }
1722 
1723 //
1724 // XdsClient
1725 //
1726 
1727 namespace {
1728 
GetRequestTimeout()1729 grpc_millis GetRequestTimeout() {
1730   return grpc_channel_args_find_integer(
1731       g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1732       {15000, 0, INT_MAX});
1733 }
1734 
1735 }  // namespace
1736 
XdsClient(grpc_error ** error)1737 XdsClient::XdsClient(grpc_error** error)
1738     : DualRefCounted<XdsClient>(
1739           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1740                                                                   : nullptr),
1741       request_timeout_(GetRequestTimeout()),
1742       interested_parties_(grpc_pollset_set_create()),
1743       bootstrap_(
1744           XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1745       certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1746           bootstrap_ == nullptr
1747               ? CertificateProviderStore::PluginDefinitionMap()
1748               : bootstrap_->certificate_providers())),
1749       api_(this, &grpc_xds_client_trace,
1750            bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
1751   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1752     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1753   }
1754   if (*error != GRPC_ERROR_NONE) {
1755     gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1756             this, grpc_error_string(*error));
1757     return;
1758   }
1759   // Create ChannelState object.
1760   chand_ = MakeOrphanable<ChannelState>(
1761       WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1762 }
1763 
~XdsClient()1764 XdsClient::~XdsClient() {
1765   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1766     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1767   }
1768   grpc_pollset_set_destroy(interested_parties_);
1769 }
1770 
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1771 void XdsClient::AddChannelzLinkage(
1772     channelz::ChannelNode* parent_channelz_node) {
1773   channelz::ChannelNode* xds_channelz_node =
1774       grpc_channel_get_channelz_node(chand_->channel());
1775   if (xds_channelz_node != nullptr) {
1776     parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1777   }
1778 }
1779 
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1780 void XdsClient::RemoveChannelzLinkage(
1781     channelz::ChannelNode* parent_channelz_node) {
1782   channelz::ChannelNode* xds_channelz_node =
1783       grpc_channel_get_channelz_node(chand_->channel());
1784   if (xds_channelz_node != nullptr) {
1785     parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1786   }
1787 }
1788 
Orphan()1789 void XdsClient::Orphan() {
1790   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1791     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1792   }
1793   {
1794     MutexLock lock(g_mu);
1795     if (g_xds_client == this) g_xds_client = nullptr;
1796   }
1797   {
1798     MutexLock lock(&mu_);
1799     shutting_down_ = true;
1800     // Orphan ChannelState object.
1801     chand_.reset();
1802     // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1803     // created by the XdsResolver because the maps contain refs for watchers
1804     // which in turn hold refs to the loadbalancing policies. At this point, it
1805     // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1806     // policies before those calls are done would lead to issues such as
1807     // https://github.com/grpc/grpc/issues/20928.
1808     if (!listener_map_.empty()) {
1809       cluster_map_.clear();
1810       endpoint_map_.clear();
1811     }
1812   }
1813 }
1814 
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1815 void XdsClient::WatchListenerData(
1816     absl::string_view listener_name,
1817     std::unique_ptr<ListenerWatcherInterface> watcher) {
1818   std::string listener_name_str = std::string(listener_name);
1819   MutexLock lock(&mu_);
1820   ListenerState& listener_state = listener_map_[listener_name_str];
1821   ListenerWatcherInterface* w = watcher.get();
1822   listener_state.watchers[w] = std::move(watcher);
1823   // If we've already received an LDS update, notify the new watcher
1824   // immediately.
1825   if (listener_state.update.has_value()) {
1826     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1827       gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1828               this, listener_name_str.c_str());
1829     }
1830     w->OnListenerChanged(*listener_state.update);
1831   }
1832   chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1833 }
1834 
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1835 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1836                                         ListenerWatcherInterface* watcher,
1837                                         bool delay_unsubscription) {
1838   MutexLock lock(&mu_);
1839   if (shutting_down_) return;
1840   std::string listener_name_str = std::string(listener_name);
1841   ListenerState& listener_state = listener_map_[listener_name_str];
1842   auto it = listener_state.watchers.find(watcher);
1843   if (it != listener_state.watchers.end()) {
1844     listener_state.watchers.erase(it);
1845     if (listener_state.watchers.empty()) {
1846       listener_map_.erase(listener_name_str);
1847       chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1848                           delay_unsubscription);
1849     }
1850   }
1851 }
1852 
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1853 void XdsClient::WatchRouteConfigData(
1854     absl::string_view route_config_name,
1855     std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1856   std::string route_config_name_str = std::string(route_config_name);
1857   MutexLock lock(&mu_);
1858   RouteConfigState& route_config_state =
1859       route_config_map_[route_config_name_str];
1860   RouteConfigWatcherInterface* w = watcher.get();
1861   route_config_state.watchers[w] = std::move(watcher);
1862   // If we've already received an RDS update, notify the new watcher
1863   // immediately.
1864   if (route_config_state.update.has_value()) {
1865     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1866       gpr_log(GPR_INFO,
1867               "[xds_client %p] returning cached route config data for %s", this,
1868               route_config_name_str.c_str());
1869     }
1870     w->OnRouteConfigChanged(*route_config_state.update);
1871   }
1872   chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1873 }
1874 
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1875 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1876                                            RouteConfigWatcherInterface* watcher,
1877                                            bool delay_unsubscription) {
1878   MutexLock lock(&mu_);
1879   if (shutting_down_) return;
1880   std::string route_config_name_str = std::string(route_config_name);
1881   RouteConfigState& route_config_state =
1882       route_config_map_[route_config_name_str];
1883   auto it = route_config_state.watchers.find(watcher);
1884   if (it != route_config_state.watchers.end()) {
1885     route_config_state.watchers.erase(it);
1886     if (route_config_state.watchers.empty()) {
1887       route_config_map_.erase(route_config_name_str);
1888       chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1889                           delay_unsubscription);
1890     }
1891   }
1892 }
1893 
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1894 void XdsClient::WatchClusterData(
1895     absl::string_view cluster_name,
1896     std::unique_ptr<ClusterWatcherInterface> watcher) {
1897   std::string cluster_name_str = std::string(cluster_name);
1898   MutexLock lock(&mu_);
1899   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1900   ClusterWatcherInterface* w = watcher.get();
1901   cluster_state.watchers[w] = std::move(watcher);
1902   // If we've already received a CDS update, notify the new watcher
1903   // immediately.
1904   if (cluster_state.update.has_value()) {
1905     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1906       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1907               this, cluster_name_str.c_str());
1908     }
1909     w->OnClusterChanged(cluster_state.update.value());
1910   }
1911   chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1912 }
1913 
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1914 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1915                                        ClusterWatcherInterface* watcher,
1916                                        bool delay_unsubscription) {
1917   MutexLock lock(&mu_);
1918   if (shutting_down_) return;
1919   std::string cluster_name_str = std::string(cluster_name);
1920   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1921   auto it = cluster_state.watchers.find(watcher);
1922   if (it != cluster_state.watchers.end()) {
1923     cluster_state.watchers.erase(it);
1924     if (cluster_state.watchers.empty()) {
1925       cluster_map_.erase(cluster_name_str);
1926       chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1927                           delay_unsubscription);
1928     }
1929   }
1930 }
1931 
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1932 void XdsClient::WatchEndpointData(
1933     absl::string_view eds_service_name,
1934     std::unique_ptr<EndpointWatcherInterface> watcher) {
1935   std::string eds_service_name_str = std::string(eds_service_name);
1936   MutexLock lock(&mu_);
1937   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1938   EndpointWatcherInterface* w = watcher.get();
1939   endpoint_state.watchers[w] = std::move(watcher);
1940   // If we've already received an EDS update, notify the new watcher
1941   // immediately.
1942   if (endpoint_state.update.has_value()) {
1943     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1944       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1945               this, eds_service_name_str.c_str());
1946     }
1947     w->OnEndpointChanged(endpoint_state.update.value());
1948   }
1949   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1950 }
1951 
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1952 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1953                                         EndpointWatcherInterface* watcher,
1954                                         bool delay_unsubscription) {
1955   MutexLock lock(&mu_);
1956   if (shutting_down_) return;
1957   std::string eds_service_name_str = std::string(eds_service_name);
1958   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1959   auto it = endpoint_state.watchers.find(watcher);
1960   if (it != endpoint_state.watchers.end()) {
1961     endpoint_state.watchers.erase(it);
1962     if (endpoint_state.watchers.empty()) {
1963       endpoint_map_.erase(eds_service_name_str);
1964       chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1965                           delay_unsubscription);
1966     }
1967   }
1968 }
1969 
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1970 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1971     absl::string_view lrs_server, absl::string_view cluster_name,
1972     absl::string_view eds_service_name) {
1973   // TODO(roth): When we add support for direct federation, use the
1974   // server name specified in lrs_server.
1975   auto key =
1976       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1977   MutexLock lock(&mu_);
1978   // We jump through some hoops here to make sure that the absl::string_views
1979   // stored in the XdsClusterDropStats object point to the strings
1980   // in the load_report_map_ key, so that they have the same lifetime.
1981   auto it = load_report_map_
1982                 .emplace(std::make_pair(std::move(key), LoadReportState()))
1983                 .first;
1984   LoadReportState& load_report_state = it->second;
1985   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1986   if (load_report_state.drop_stats != nullptr) {
1987     cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1988   }
1989   if (cluster_drop_stats == nullptr) {
1990     if (load_report_state.drop_stats != nullptr) {
1991       load_report_state.deleted_drop_stats +=
1992           load_report_state.drop_stats->GetSnapshotAndReset();
1993     }
1994     cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1995         Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1996         it->first.first /*cluster_name*/,
1997         it->first.second /*eds_service_name*/);
1998     load_report_state.drop_stats = cluster_drop_stats.get();
1999   }
2000   chand_->MaybeStartLrsCall();
2001   return cluster_drop_stats;
2002 }
2003 
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)2004 void XdsClient::RemoveClusterDropStats(
2005     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2006     absl::string_view eds_service_name,
2007     XdsClusterDropStats* cluster_drop_stats) {
2008   MutexLock lock(&mu_);
2009   // TODO(roth): When we add support for direct federation, use the
2010   // server name specified in lrs_server.
2011   auto it = load_report_map_.find(
2012       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2013   if (it == load_report_map_.end()) return;
2014   LoadReportState& load_report_state = it->second;
2015   if (load_report_state.drop_stats == cluster_drop_stats) {
2016     // Record final snapshot in deleted_drop_stats, which will be
2017     // added to the next load report.
2018     load_report_state.deleted_drop_stats +=
2019         load_report_state.drop_stats->GetSnapshotAndReset();
2020     load_report_state.drop_stats = nullptr;
2021   }
2022 }
2023 
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2024 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2025     absl::string_view lrs_server, absl::string_view cluster_name,
2026     absl::string_view eds_service_name,
2027     RefCountedPtr<XdsLocalityName> locality) {
2028   // TODO(roth): When we add support for direct federation, use the
2029   // server name specified in lrs_server.
2030   auto key =
2031       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2032   MutexLock lock(&mu_);
2033   // We jump through some hoops here to make sure that the absl::string_views
2034   // stored in the XdsClusterLocalityStats object point to the strings
2035   // in the load_report_map_ key, so that they have the same lifetime.
2036   auto it = load_report_map_
2037                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2038                 .first;
2039   LoadReportState& load_report_state = it->second;
2040   LoadReportState::LocalityState& locality_state =
2041       load_report_state.locality_stats[locality];
2042   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2043   if (locality_state.locality_stats != nullptr) {
2044     cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2045   }
2046   if (cluster_locality_stats == nullptr) {
2047     if (locality_state.locality_stats != nullptr) {
2048       locality_state.deleted_locality_stats +=
2049           locality_state.locality_stats->GetSnapshotAndReset();
2050     }
2051     cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2052         Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2053         it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2054         std::move(locality));
2055     locality_state.locality_stats = cluster_locality_stats.get();
2056   }
2057   chand_->MaybeStartLrsCall();
2058   return cluster_locality_stats;
2059 }
2060 
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2061 void XdsClient::RemoveClusterLocalityStats(
2062     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2063     absl::string_view eds_service_name,
2064     const RefCountedPtr<XdsLocalityName>& locality,
2065     XdsClusterLocalityStats* cluster_locality_stats) {
2066   MutexLock lock(&mu_);
2067   // TODO(roth): When we add support for direct federation, use the
2068   // server name specified in lrs_server.
2069   auto it = load_report_map_.find(
2070       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2071   if (it == load_report_map_.end()) return;
2072   LoadReportState& load_report_state = it->second;
2073   auto locality_it = load_report_state.locality_stats.find(locality);
2074   if (locality_it == load_report_state.locality_stats.end()) return;
2075   LoadReportState::LocalityState& locality_state = locality_it->second;
2076   if (locality_state.locality_stats == cluster_locality_stats) {
2077     // Record final snapshot in deleted_locality_stats, which will be
2078     // added to the next load report.
2079     locality_state.deleted_locality_stats +=
2080         locality_state.locality_stats->GetSnapshotAndReset();
2081     locality_state.locality_stats = nullptr;
2082   }
2083 }
2084 
ResetBackoff()2085 void XdsClient::ResetBackoff() {
2086   MutexLock lock(&mu_);
2087   if (chand_ != nullptr) {
2088     grpc_channel_reset_connect_backoff(chand_->channel());
2089   }
2090 }
2091 
NotifyOnErrorLocked(grpc_error * error)2092 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2093   for (const auto& p : listener_map_) {
2094     const ListenerState& listener_state = p.second;
2095     for (const auto& p : listener_state.watchers) {
2096       p.first->OnError(GRPC_ERROR_REF(error));
2097     }
2098   }
2099   for (const auto& p : route_config_map_) {
2100     const RouteConfigState& route_config_state = p.second;
2101     for (const auto& p : route_config_state.watchers) {
2102       p.first->OnError(GRPC_ERROR_REF(error));
2103     }
2104   }
2105   for (const auto& p : cluster_map_) {
2106     const ClusterState& cluster_state = p.second;
2107     for (const auto& p : cluster_state.watchers) {
2108       p.first->OnError(GRPC_ERROR_REF(error));
2109     }
2110   }
2111   for (const auto& p : endpoint_map_) {
2112     const EndpointState& endpoint_state = p.second;
2113     for (const auto& p : endpoint_state.watchers) {
2114       p.first->OnError(GRPC_ERROR_REF(error));
2115     }
2116   }
2117   GRPC_ERROR_UNREF(error);
2118 }
2119 
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2120 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2121     bool send_all_clusters, const std::set<std::string>& clusters) {
2122   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2123     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2124   }
2125   XdsApi::ClusterLoadReportMap snapshot_map;
2126   for (auto load_report_it = load_report_map_.begin();
2127        load_report_it != load_report_map_.end();) {
2128     // Cluster key is cluster and EDS service name.
2129     const auto& cluster_key = load_report_it->first;
2130     LoadReportState& load_report = load_report_it->second;
2131     // If the CDS response for a cluster indicates to use LRS but the
2132     // LRS server does not say that it wants reports for this cluster,
2133     // then we'll have stats objects here whose data we're not going to
2134     // include in the load report.  However, we still need to clear out
2135     // the data from the stats objects, so that if the LRS server starts
2136     // asking for the data in the future, we don't incorrectly include
2137     // data from previous reporting intervals in that future report.
2138     const bool record_stats =
2139         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2140     XdsApi::ClusterLoadReport snapshot;
2141     // Aggregate drop stats.
2142     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2143     if (load_report.drop_stats != nullptr) {
2144       snapshot.dropped_requests +=
2145           load_report.drop_stats->GetSnapshotAndReset();
2146       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2147         gpr_log(GPR_INFO,
2148                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2149                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2150                 load_report.drop_stats);
2151       }
2152     }
2153     // Aggregate locality stats.
2154     for (auto it = load_report.locality_stats.begin();
2155          it != load_report.locality_stats.end();) {
2156       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2157       auto& locality_state = it->second;
2158       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2159           snapshot.locality_stats[locality_name];
2160       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2161       if (locality_state.locality_stats != nullptr) {
2162         locality_snapshot +=
2163             locality_state.locality_stats->GetSnapshotAndReset();
2164         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2165           gpr_log(GPR_INFO,
2166                   "[xds_client %p] cluster=%s eds_service_name=%s "
2167                   "locality=%s locality_stats=%p",
2168                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2169                   locality_name->AsHumanReadableString().c_str(),
2170                   locality_state.locality_stats);
2171         }
2172       }
2173       // If the only thing left in this entry was final snapshots from
2174       // deleted locality stats objects, remove the entry.
2175       if (locality_state.locality_stats == nullptr) {
2176         it = load_report.locality_stats.erase(it);
2177       } else {
2178         ++it;
2179       }
2180     }
2181     // Compute load report interval.
2182     const grpc_millis now = ExecCtx::Get()->Now();
2183     snapshot.load_report_interval = now - load_report.last_report_time;
2184     load_report.last_report_time = now;
2185     // Record snapshot.
2186     if (record_stats) {
2187       snapshot_map[cluster_key] = std::move(snapshot);
2188     }
2189     // If the only thing left in this entry was final snapshots from
2190     // deleted stats objects, remove the entry.
2191     if (load_report.locality_stats.empty() &&
2192         load_report.drop_stats == nullptr) {
2193       load_report_it = load_report_map_.erase(load_report_it);
2194     } else {
2195       ++load_report_it;
2196     }
2197   }
2198   return snapshot_map;
2199 }
2200 
2201 //
2202 // accessors for global state
2203 //
2204 
XdsClientGlobalInit()2205 void XdsClientGlobalInit() { g_mu = new Mutex; }
2206 
XdsClientGlobalShutdown()2207 void XdsClientGlobalShutdown() {
2208   delete g_mu;
2209   g_mu = nullptr;
2210 }
2211 
GetOrCreate(grpc_error ** error)2212 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2213   MutexLock lock(g_mu);
2214   if (g_xds_client != nullptr) {
2215     auto xds_client = g_xds_client->RefIfNonZero();
2216     if (xds_client != nullptr) return xds_client;
2217   }
2218   auto xds_client = MakeRefCounted<XdsClient>(error);
2219   g_xds_client = xds_client.get();
2220   return xds_client;
2221 }
2222 
2223 namespace internal {
2224 
SetXdsChannelArgsForTest(grpc_channel_args * args)2225 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2226   MutexLock lock(g_mu);
2227   g_channel_args = args;
2228 }
2229 
UnsetGlobalXdsClientForTest()2230 void UnsetGlobalXdsClientForTest() {
2231   MutexLock lock(g_mu);
2232   g_xds_client = nullptr;
2233 }
2234 
2235 }  // namespace internal
2236 
2237 }  // namespace grpc_core
2238