1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <set>
23 #include <vector>
24 
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27 
28 #include "src/core/ext/xds/xds_api.h"
29 #include "src/core/ext/xds/xds_bootstrap.h"
30 #include "src/core/ext/xds/xds_client_stats.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/gprpp/dual_ref_counted.h"
33 #include "src/core/lib/gprpp/memory.h"
34 #include "src/core/lib/gprpp/orphanable.h"
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/gprpp/ref_counted_ptr.h"
37 #include "src/core/lib/gprpp/sync.h"
38 
39 namespace grpc_core {
40 
41 extern TraceFlag grpc_xds_client_trace;
42 extern TraceFlag grpc_xds_client_refcount_trace;
43 
44 class XdsClient : public DualRefCounted<XdsClient> {
45  public:
46   // Listener data watcher interface.  Implemented by callers.
47   class ListenerWatcherInterface {
48    public:
49     virtual ~ListenerWatcherInterface() = default;
50     virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
51     virtual void OnError(grpc_error* error) = 0;
52     virtual void OnResourceDoesNotExist() = 0;
53   };
54 
55   // RouteConfiguration data watcher interface.  Implemented by callers.
56   class RouteConfigWatcherInterface {
57    public:
58     virtual ~RouteConfigWatcherInterface() = default;
59     virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
60     virtual void OnError(grpc_error* error) = 0;
61     virtual void OnResourceDoesNotExist() = 0;
62   };
63 
64   // Cluster data watcher interface.  Implemented by callers.
65   class ClusterWatcherInterface {
66    public:
67     virtual ~ClusterWatcherInterface() = default;
68     virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
69     virtual void OnError(grpc_error* error) = 0;
70     virtual void OnResourceDoesNotExist() = 0;
71   };
72 
73   // Endpoint data watcher interface.  Implemented by callers.
74   class EndpointWatcherInterface {
75    public:
76     virtual ~EndpointWatcherInterface() = default;
77     virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
78     virtual void OnError(grpc_error* error) = 0;
79     virtual void OnResourceDoesNotExist() = 0;
80   };
81 
82   // Factory function to get or create the global XdsClient instance.
83   // If *error is not GRPC_ERROR_NONE upon return, then there was
84   // an error initializing the client.
85   static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
86 
87   // Callers should not instantiate directly.  Use GetOrCreate() instead.
88   explicit XdsClient(grpc_error** error);
89   ~XdsClient() override;
90 
certificate_provider_store()91   CertificateProviderStore& certificate_provider_store() {
92     return *certificate_provider_store_;
93   }
94 
interested_parties()95   grpc_pollset_set* interested_parties() const { return interested_parties_; }
96 
97   // TODO(roth): When we add federation, there will be multiple channels
98   // inside the XdsClient, and the set of channels may change over time,
99   // but not every channel may use every one of the child channels, so
100   // this API will need to change.  At minumum, we will need to hold a
101   // ref to the parent channelz node so that we can update its list of
102   // children as the set of xDS channels changes.  However, we may also
103   // want to make this a bit more selective such that only those
104   // channels on which a given parent channel is actually requesting
105   // resources will actually be marked as its children.
106   void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
107   void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
108 
109   void Orphan() override;
110 
111   // Start and cancel listener data watch for a listener.
112   // The XdsClient takes ownership of the watcher, but the caller may
113   // keep a raw pointer to the watcher, which may be used only for
114   // cancellation.  (Because the caller does not own the watcher, the
115   // pointer must not be used for any other purpose.)
116   // If the caller is going to start a new watch after cancelling the
117   // old one, it should set delay_unsubscription to true.
118   void WatchListenerData(absl::string_view listener_name,
119                          std::unique_ptr<ListenerWatcherInterface> watcher);
120   void CancelListenerDataWatch(absl::string_view listener_name,
121                                ListenerWatcherInterface* watcher,
122                                bool delay_unsubscription = false);
123 
124   // Start and cancel route config data watch for a listener.
125   // The XdsClient takes ownership of the watcher, but the caller may
126   // keep a raw pointer to the watcher, which may be used only for
127   // cancellation.  (Because the caller does not own the watcher, the
128   // pointer must not be used for any other purpose.)
129   // If the caller is going to start a new watch after cancelling the
130   // old one, it should set delay_unsubscription to true.
131   void WatchRouteConfigData(
132       absl::string_view route_config_name,
133       std::unique_ptr<RouteConfigWatcherInterface> watcher);
134   void CancelRouteConfigDataWatch(absl::string_view route_config_name,
135                                   RouteConfigWatcherInterface* watcher,
136                                   bool delay_unsubscription = false);
137 
138   // Start and cancel cluster data watch for a cluster.
139   // The XdsClient takes ownership of the watcher, but the caller may
140   // keep a raw pointer to the watcher, which may be used only for
141   // cancellation.  (Because the caller does not own the watcher, the
142   // pointer must not be used for any other purpose.)
143   // If the caller is going to start a new watch after cancelling the
144   // old one, it should set delay_unsubscription to true.
145   void WatchClusterData(absl::string_view cluster_name,
146                         std::unique_ptr<ClusterWatcherInterface> watcher);
147   void CancelClusterDataWatch(absl::string_view cluster_name,
148                               ClusterWatcherInterface* watcher,
149                               bool delay_unsubscription = false);
150 
151   // Start and cancel endpoint data watch for a cluster.
152   // The XdsClient takes ownership of the watcher, but the caller may
153   // keep a raw pointer to the watcher, which may be used only for
154   // cancellation.  (Because the caller does not own the watcher, the
155   // pointer must not be used for any other purpose.)
156   // If the caller is going to start a new watch after cancelling the
157   // old one, it should set delay_unsubscription to true.
158   void WatchEndpointData(absl::string_view eds_service_name,
159                          std::unique_ptr<EndpointWatcherInterface> watcher);
160   void CancelEndpointDataWatch(absl::string_view eds_service_name,
161                                EndpointWatcherInterface* watcher,
162                                bool delay_unsubscription = false);
163 
164   // Adds and removes drop stats for cluster_name and eds_service_name.
165   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
166       absl::string_view lrs_server, absl::string_view cluster_name,
167       absl::string_view eds_service_name);
168   void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
169                               absl::string_view cluster_name,
170                               absl::string_view eds_service_name,
171                               XdsClusterDropStats* cluster_drop_stats);
172 
173   // Adds and removes locality stats for cluster_name and eds_service_name
174   // for the specified locality.
175   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
176       absl::string_view lrs_server, absl::string_view cluster_name,
177       absl::string_view eds_service_name,
178       RefCountedPtr<XdsLocalityName> locality);
179   void RemoveClusterLocalityStats(
180       absl::string_view /*lrs_server*/, absl::string_view cluster_name,
181       absl::string_view eds_service_name,
182       const RefCountedPtr<XdsLocalityName>& locality,
183       XdsClusterLocalityStats* cluster_locality_stats);
184 
185   // Resets connection backoff state.
186   void ResetBackoff();
187 
188  private:
189   // Contains a channel to the xds server and all the data related to the
190   // channel.  Holds a ref to the xds client object.
191   //
192   // Currently, there is only one ChannelState object per XdsClient
193   // object, and it has essentially the same lifetime.  But in the
194   // future, when we add federation support, a single XdsClient may have
195   // multiple underlying channels to talk to different xDS servers.
196   class ChannelState : public InternallyRefCounted<ChannelState> {
197    public:
198     template <typename T>
199     class RetryableCall;
200 
201     class AdsCallState;
202     class LrsCallState;
203 
204     ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
205                  const XdsBootstrap::XdsServer& server);
206     ~ChannelState() override;
207 
208     void Orphan() override;
209 
channel()210     grpc_channel* channel() const { return channel_; }
xds_client()211     XdsClient* xds_client() const { return xds_client_.get(); }
212     AdsCallState* ads_calld() const;
213     LrsCallState* lrs_calld() const;
214 
215     void MaybeStartLrsCall();
216     void StopLrsCall();
217 
218     bool HasActiveAdsCall() const;
219 
220     void StartConnectivityWatchLocked();
221     void CancelConnectivityWatchLocked();
222 
223     void Subscribe(const std::string& type_url, const std::string& name);
224     void Unsubscribe(const std::string& type_url, const std::string& name,
225                      bool delay_unsubscription);
226 
227    private:
228     class StateWatcher;
229 
230     // The owning xds client.
231     WeakRefCountedPtr<XdsClient> xds_client_;
232 
233     const XdsBootstrap::XdsServer& server_;
234 
235     // The channel and its status.
236     grpc_channel* channel_;
237     bool shutting_down_ = false;
238     StateWatcher* watcher_ = nullptr;
239 
240     // The retryable XDS calls.
241     OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
242     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
243   };
244 
245   struct ListenerState {
246     std::map<ListenerWatcherInterface*,
247              std::unique_ptr<ListenerWatcherInterface>>
248         watchers;
249     // The latest data seen from LDS.
250     absl::optional<XdsApi::LdsUpdate> update;
251   };
252 
253   struct RouteConfigState {
254     std::map<RouteConfigWatcherInterface*,
255              std::unique_ptr<RouteConfigWatcherInterface>>
256         watchers;
257     // The latest data seen from RDS.
258     absl::optional<XdsApi::RdsUpdate> update;
259   };
260 
261   struct ClusterState {
262     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
263         watchers;
264     // The latest data seen from CDS.
265     absl::optional<XdsApi::CdsUpdate> update;
266   };
267 
268   struct EndpointState {
269     std::map<EndpointWatcherInterface*,
270              std::unique_ptr<EndpointWatcherInterface>>
271         watchers;
272     // The latest data seen from EDS.
273     absl::optional<XdsApi::EdsUpdate> update;
274   };
275 
276   struct LoadReportState {
277     struct LocalityState {
278       XdsClusterLocalityStats* locality_stats = nullptr;
279       XdsClusterLocalityStats::Snapshot deleted_locality_stats;
280     };
281 
282     XdsClusterDropStats* drop_stats = nullptr;
283     XdsClusterDropStats::Snapshot deleted_drop_stats;
284     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
285              XdsLocalityName::Less>
286         locality_stats;
287     grpc_millis last_report_time = ExecCtx::Get()->Now();
288   };
289 
290   // Sends an error notification to all watchers.
291   void NotifyOnErrorLocked(grpc_error* error);
292 
293   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
294       bool send_all_clusters, const std::set<std::string>& clusters);
295 
296   const grpc_millis request_timeout_;
297   grpc_pollset_set* interested_parties_;
298   std::unique_ptr<XdsBootstrap> bootstrap_;
299   OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
300   XdsApi api_;
301 
302   Mutex mu_;
303 
304   // The channel for communicating with the xds server.
305   OrphanablePtr<ChannelState> chand_;
306 
307   // One entry for each watched LDS resource.
308   std::map<std::string /*listener_name*/, ListenerState> listener_map_;
309   // One entry for each watched RDS resource.
310   std::map<std::string /*route_config_name*/, RouteConfigState>
311       route_config_map_;
312   // One entry for each watched CDS resource.
313   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
314   // One entry for each watched EDS resource.
315   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
316 
317   // Load report data.
318   std::map<
319       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
320       LoadReportState>
321       load_report_map_;
322 
323   // Stores the most recent accepted resource version for each resource type.
324   std::map<std::string /*type*/, std::string /*version*/> resource_version_map_;
325 
326   bool shutting_down_ = false;
327 };
328 
329 namespace internal {
330 void SetXdsChannelArgsForTest(grpc_channel_args* args);
331 void UnsetGlobalXdsClientForTest();
332 }  // namespace internal
333 
334 }  // namespace grpc_core
335 
336 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */
337