1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
33
34 namespace grpc_core {
35
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
37
38 const char* kXdsClusterAttribute = "xds_cluster_name";
39
40 namespace {
41
42 //
43 // XdsResolver
44 //
45
46 class XdsResolver : public Resolver {
47 public:
XdsResolver(ResolverArgs args)48 explicit XdsResolver(ResolverArgs args)
49 : Resolver(std::move(args.work_serializer),
50 std::move(args.result_handler)),
51 server_name_(absl::StripPrefix(args.uri.path(), "/")),
52 args_(grpc_channel_args_copy(args.args)),
53 interested_parties_(args.pollset_set) {
54 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
55 gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
56 server_name_.c_str());
57 }
58 }
59
~XdsResolver()60 ~XdsResolver() override {
61 grpc_channel_args_destroy(args_);
62 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
63 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
64 }
65 }
66
67 void StartLocked() override;
68
69 void ShutdownLocked() override;
70
71 private:
72 class Notifier {
73 public:
74 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
75 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
76 Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
77 explicit Notifier(RefCountedPtr<XdsResolver> resolver);
78
79 private:
80 enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
81
82 static void RunInExecCtx(void* arg, grpc_error* error);
83 void RunInWorkSerializer(grpc_error* error);
84
85 RefCountedPtr<XdsResolver> resolver_;
86 grpc_closure closure_;
87 XdsApi::LdsUpdate update_;
88 Type type_;
89 };
90
91 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
92 public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)93 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
94 : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)95 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
96 new Notifier(resolver_, std::move(listener));
97 }
OnError(grpc_error * error)98 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()99 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
100
101 private:
102 RefCountedPtr<XdsResolver> resolver_;
103 };
104
105 class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
106 public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)107 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
108 : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)109 void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
110 new Notifier(resolver_, std::move(route_config));
111 }
OnError(grpc_error * error)112 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()113 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
114
115 private:
116 RefCountedPtr<XdsResolver> resolver_;
117 };
118
119 class ClusterState
120 : public RefCounted<ClusterState, PolymorphicRefCount, false> {
121 public:
122 using ClusterStateMap =
123 std::map<std::string, std::unique_ptr<ClusterState>>;
124
ClusterState(const std::string & cluster_name,ClusterStateMap * cluster_state_map)125 ClusterState(const std::string& cluster_name,
126 ClusterStateMap* cluster_state_map)
127 : it_(cluster_state_map
128 ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
129 .first) {}
cluster() const130 const std::string& cluster() const { return it_->first; }
131
132 private:
133 ClusterStateMap::iterator it_;
134 };
135
136 class XdsConfigSelector : public ConfigSelector {
137 public:
138 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
139 const std::vector<XdsApi::Route>& routes,
140 grpc_error* error);
141 ~XdsConfigSelector() override;
142
name() const143 const char* name() const override { return "XdsConfigSelector"; }
144
Equals(const ConfigSelector * other) const145 bool Equals(const ConfigSelector* other) const override {
146 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
147 // Don't need to compare resolver_, since that will always be the same.
148 return route_table_ == other_xds->route_table_ &&
149 clusters_ == other_xds->clusters_;
150 }
151
152 CallConfig GetCallConfig(GetCallConfigArgs args) override;
153
154 private:
155 struct Route {
156 XdsApi::Route route;
157 absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
158 weighted_cluster_state;
159 RefCountedPtr<ServiceConfig> method_config;
operator ==grpc_core::__anon6328d4090111::XdsResolver::XdsConfigSelector::Route160 bool operator==(const Route& other) const {
161 return route == other.route &&
162 weighted_cluster_state == other.weighted_cluster_state;
163 }
164 };
165 using RouteTable = std::vector<Route>;
166
167 void MaybeAddCluster(const std::string& name);
168 grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
169 const XdsApi::Route& route);
170
171 RefCountedPtr<XdsResolver> resolver_;
172 RouteTable route_table_;
173 std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
174 };
175
176 void OnListenerUpdate(XdsApi::LdsUpdate listener);
177 void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
178 void OnError(grpc_error* error);
179 void OnResourceDoesNotExist();
180
181 grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
182 void GenerateResult();
183 void MaybeRemoveUnusedClusters();
184
185 std::string server_name_;
186 const grpc_channel_args* args_;
187 grpc_pollset_set* interested_parties_;
188 RefCountedPtr<XdsClient> xds_client_;
189 XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
190 std::string route_config_name_;
191 XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
192 ClusterState::ClusterStateMap cluster_state_map_;
193 std::vector<XdsApi::Route> current_update_;
194 XdsApi::Duration http_max_stream_duration_;
195 };
196
197 //
198 // XdsResolver::Notifier
199 //
200
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::LdsUpdate update)201 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
202 XdsApi::LdsUpdate update)
203 : resolver_(std::move(resolver)),
204 update_(std::move(update)),
205 type_(kLdsUpdate) {
206 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
207 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
208 }
209
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::RdsUpdate update)210 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
211 XdsApi::RdsUpdate update)
212 : resolver_(std::move(resolver)), type_(kRdsUpdate) {
213 update_.rds_update = std::move(update);
214 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
215 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
216 }
217
Notifier(RefCountedPtr<XdsResolver> resolver,grpc_error * error)218 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
219 grpc_error* error)
220 : resolver_(std::move(resolver)), type_(kError) {
221 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
222 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
223 }
224
Notifier(RefCountedPtr<XdsResolver> resolver)225 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
226 : resolver_(std::move(resolver)), type_(kDoesNotExist) {
227 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
228 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
229 }
230
RunInExecCtx(void * arg,grpc_error * error)231 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
232 Notifier* self = static_cast<Notifier*>(arg);
233 GRPC_ERROR_REF(error);
234 self->resolver_->work_serializer()->Run(
235 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
236 }
237
RunInWorkSerializer(grpc_error * error)238 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
239 if (resolver_->xds_client_ == nullptr) {
240 GRPC_ERROR_UNREF(error);
241 delete this;
242 return;
243 }
244 switch (type_) {
245 case kLdsUpdate:
246 resolver_->OnListenerUpdate(std::move(update_));
247 break;
248 case kRdsUpdate:
249 resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
250 break;
251 case kError:
252 resolver_->OnError(error);
253 break;
254 case kDoesNotExist:
255 resolver_->OnResourceDoesNotExist();
256 break;
257 };
258 delete this;
259 }
260
261 //
262 // XdsResolver::XdsConfigSelector
263 //
264
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,const std::vector<XdsApi::Route> & routes,grpc_error * error)265 XdsResolver::XdsConfigSelector::XdsConfigSelector(
266 RefCountedPtr<XdsResolver> resolver,
267 const std::vector<XdsApi::Route>& routes, grpc_error* error)
268 : resolver_(std::move(resolver)) {
269 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
270 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
271 resolver_.get(), this);
272 }
273 // 1. Construct the route table
274 // 2 Update resolver's cluster state map
275 // 3. Construct cluster list to hold on to entries in the cluster state
276 // map.
277 // Reserve the necessary entries up-front to avoid reallocation as we add
278 // elements. This is necessary because the string_view in the entry's
279 // weighted_cluster_state field points to the memory in the route field, so
280 // moving the entry in a reallocation will cause the string_view to point to
281 // invalid data.
282 route_table_.reserve(routes.size());
283 for (auto& route : routes) {
284 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
285 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
286 resolver_.get(), this, route.ToString().c_str());
287 }
288 route_table_.emplace_back();
289 auto& route_entry = route_table_.back();
290 route_entry.route = route;
291 // If the route doesn't specify a timeout, set its timeout to the global
292 // one.
293 if (!route.max_stream_duration.has_value()) {
294 route_entry.route.max_stream_duration =
295 resolver_->http_max_stream_duration_;
296 }
297 error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
298 if (route.weighted_clusters.empty()) {
299 MaybeAddCluster(route.cluster_name);
300 } else {
301 uint32_t end = 0;
302 for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
303 MaybeAddCluster(weighted_cluster.name);
304 end += weighted_cluster.weight;
305 route_entry.weighted_cluster_state.emplace_back(end,
306 weighted_cluster.name);
307 }
308 }
309 }
310 }
311
CreateMethodConfig(RefCountedPtr<ServiceConfig> * method_config,const XdsApi::Route & route)312 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
313 RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
314 grpc_error* error = GRPC_ERROR_NONE;
315 std::vector<std::string> fields;
316 if (route.max_stream_duration.has_value() &&
317 (route.max_stream_duration->seconds != 0 ||
318 route.max_stream_duration->nanos != 0)) {
319 fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
320 route.max_stream_duration->seconds,
321 route.max_stream_duration->nanos));
322 }
323 if (!fields.empty()) {
324 std::string json = absl::StrCat(
325 "{\n"
326 " \"methodConfig\": [ {\n"
327 " \"name\": [\n"
328 " {}\n"
329 " ],\n"
330 " ",
331 absl::StrJoin(fields, ",\n"),
332 "\n } ]\n"
333 "}");
334 *method_config =
335 ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
336 }
337 return error;
338 }
339
~XdsConfigSelector()340 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
341 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
342 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
343 resolver_.get(), this);
344 }
345 clusters_.clear();
346 resolver_->MaybeRemoveUnusedClusters();
347 }
348
MaybeAddCluster(const std::string & name)349 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
350 if (clusters_.find(name) == clusters_.end()) {
351 auto it = resolver_->cluster_state_map_.find(name);
352 if (it == resolver_->cluster_state_map_.end()) {
353 auto new_cluster_state =
354 MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
355 clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
356 } else {
357 clusters_[it->second->cluster()] = it->second->Ref();
358 }
359 }
360 }
361
PathMatch(const absl::string_view & path,const XdsApi::Route::Matchers::PathMatcher & path_matcher)362 bool PathMatch(const absl::string_view& path,
363 const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
364 switch (path_matcher.type) {
365 case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
366 return path_matcher.case_sensitive
367 ? absl::StartsWith(path, path_matcher.string_matcher)
368 : absl::StartsWithIgnoreCase(path,
369 path_matcher.string_matcher);
370 case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
371 return path_matcher.case_sensitive
372 ? path == path_matcher.string_matcher
373 : absl::EqualsIgnoreCase(path, path_matcher.string_matcher);
374 case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
375 // Note: Case-sensitive option will already have been set appropriately
376 // in path_matcher.regex_matcher when it was constructed, so no
377 // need to check it here.
378 return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
379 default:
380 return false;
381 }
382 }
383
GetMetadataValue(const std::string & target_key,grpc_metadata_batch * initial_metadata,std::string * concatenated_value)384 absl::optional<absl::string_view> GetMetadataValue(
385 const std::string& target_key, grpc_metadata_batch* initial_metadata,
386 std::string* concatenated_value) {
387 // Find all values for the specified key.
388 GPR_DEBUG_ASSERT(initial_metadata != nullptr);
389 absl::InlinedVector<absl::string_view, 1> values;
390 for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
391 md = md->next) {
392 absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
393 absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
394 if (target_key == key) values.push_back(value);
395 }
396 // If none found, no match.
397 if (values.empty()) return absl::nullopt;
398 // If exactly one found, return it as-is.
399 if (values.size() == 1) return values.front();
400 // If more than one found, concatenate the values, using
401 // *concatenated_values as a temporary holding place for the
402 // concatenated string.
403 *concatenated_value = absl::StrJoin(values, ",");
404 return *concatenated_value;
405 }
406
HeaderMatchHelper(const XdsApi::Route::Matchers::HeaderMatcher & header_matcher,grpc_metadata_batch * initial_metadata)407 bool HeaderMatchHelper(
408 const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
409 grpc_metadata_batch* initial_metadata) {
410 std::string concatenated_value;
411 absl::optional<absl::string_view> value;
412 // Note: If we ever allow binary headers here, we still need to
413 // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
414 // they are not visible to the LB policy in grpc-go.
415 if (absl::EndsWith(header_matcher.name, "-bin") ||
416 header_matcher.name == "grpc-previous-rpc-attempts") {
417 value = absl::nullopt;
418 } else if (header_matcher.name == "content-type") {
419 value = "application/grpc";
420 } else {
421 value = GetMetadataValue(header_matcher.name, initial_metadata,
422 &concatenated_value);
423 }
424 if (!value.has_value()) {
425 if (header_matcher.type ==
426 XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
427 return !header_matcher.present_match;
428 } else {
429 // For all other header matcher types, we need the header value to
430 // exist to consider matches.
431 return false;
432 }
433 }
434 switch (header_matcher.type) {
435 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
436 return value.value() == header_matcher.string_matcher;
437 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
438 return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
439 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
440 int64_t int_value;
441 if (!absl::SimpleAtoi(value.value(), &int_value)) {
442 return false;
443 }
444 return int_value >= header_matcher.range_start &&
445 int_value < header_matcher.range_end;
446 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
447 return absl::StartsWith(value.value(), header_matcher.string_matcher);
448 case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
449 return absl::EndsWith(value.value(), header_matcher.string_matcher);
450 default:
451 return false;
452 }
453 }
454
HeadersMatch(const std::vector<XdsApi::Route::Matchers::HeaderMatcher> & header_matchers,grpc_metadata_batch * initial_metadata)455 bool HeadersMatch(
456 const std::vector<XdsApi::Route::Matchers::HeaderMatcher>& header_matchers,
457 grpc_metadata_batch* initial_metadata) {
458 for (const auto& header_matcher : header_matchers) {
459 bool match = HeaderMatchHelper(header_matcher, initial_metadata);
460 if (header_matcher.invert_match) match = !match;
461 if (!match) return false;
462 }
463 return true;
464 }
465
UnderFraction(const uint32_t fraction_per_million)466 bool UnderFraction(const uint32_t fraction_per_million) {
467 // Generate a random number in [0, 1000000).
468 const uint32_t random_number = rand() % 1000000;
469 return random_number < fraction_per_million;
470 }
471
GetCallConfig(GetCallConfigArgs args)472 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
473 GetCallConfigArgs args) {
474 for (const auto& entry : route_table_) {
475 // Path matching.
476 if (!PathMatch(StringViewFromSlice(*args.path),
477 entry.route.matchers.path_matcher)) {
478 continue;
479 }
480 // Header Matching.
481 if (!HeadersMatch(entry.route.matchers.header_matchers,
482 args.initial_metadata)) {
483 continue;
484 }
485 // Match fraction check
486 if (entry.route.matchers.fraction_per_million.has_value() &&
487 !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
488 continue;
489 }
490 // Found a route match
491 absl::string_view cluster_name;
492 if (entry.route.weighted_clusters.empty()) {
493 cluster_name = entry.route.cluster_name;
494 } else {
495 const uint32_t key =
496 rand() %
497 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
498 .first;
499 // Find the index in weighted clusters corresponding to key.
500 size_t mid = 0;
501 size_t start_index = 0;
502 size_t end_index = entry.weighted_cluster_state.size() - 1;
503 size_t index = 0;
504 while (end_index > start_index) {
505 mid = (start_index + end_index) / 2;
506 if (entry.weighted_cluster_state[mid].first > key) {
507 end_index = mid;
508 } else if (entry.weighted_cluster_state[mid].first < key) {
509 start_index = mid + 1;
510 } else {
511 index = mid + 1;
512 break;
513 }
514 }
515 if (index == 0) index = start_index;
516 GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
517 cluster_name = entry.weighted_cluster_state[index].second;
518 }
519 auto it = clusters_.find(cluster_name);
520 GPR_ASSERT(it != clusters_.end());
521 XdsResolver* resolver =
522 static_cast<XdsResolver*>(resolver_->Ref().release());
523 ClusterState* cluster_state = it->second->Ref().release();
524 CallConfig call_config;
525 if (entry.method_config != nullptr) {
526 call_config.service_config = entry.method_config;
527 call_config.method_configs =
528 entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
529 }
530 call_config.call_attributes[kXdsClusterAttribute] = it->first;
531 call_config.on_call_committed = [resolver, cluster_state]() {
532 cluster_state->Unref();
533 ExecCtx::Run(
534 // TODO(roth): This hop into the ExecCtx is being done to avoid
535 // entering the WorkSerializer while holding the client channel data
536 // plane mutex, since that can lead to deadlocks. However, we should
537 // not have to solve this problem in each individual ConfigSelector
538 // implementation. When we have time, we should fix the client channel
539 // code to avoid this by not invoking the
540 // CallConfig::on_call_committed callback until after it has released
541 // the data plane mutex.
542 DEBUG_LOCATION,
543 GRPC_CLOSURE_CREATE(
544 [](void* arg, grpc_error* /*error*/) {
545 auto* resolver = static_cast<XdsResolver*>(arg);
546 resolver->work_serializer()->Run(
547 [resolver]() {
548 resolver->MaybeRemoveUnusedClusters();
549 resolver->Unref();
550 },
551 DEBUG_LOCATION);
552 },
553 resolver, nullptr),
554 GRPC_ERROR_NONE);
555 };
556 return call_config;
557 }
558 return CallConfig();
559 }
560
561 //
562 // XdsResolver
563 //
564
StartLocked()565 void XdsResolver::StartLocked() {
566 grpc_error* error = GRPC_ERROR_NONE;
567 xds_client_ = XdsClient::GetOrCreate(&error);
568 if (error != GRPC_ERROR_NONE) {
569 gpr_log(GPR_ERROR,
570 "Failed to create xds client -- channel will remain in "
571 "TRANSIENT_FAILURE: %s",
572 grpc_error_string(error));
573 result_handler()->ReturnError(error);
574 return;
575 }
576 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
577 interested_parties_);
578 channelz::ChannelNode* parent_channelz_node =
579 grpc_channel_args_find_pointer<channelz::ChannelNode>(
580 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
581 if (parent_channelz_node != nullptr) {
582 xds_client_->AddChannelzLinkage(parent_channelz_node);
583 }
584 auto watcher = absl::make_unique<ListenerWatcher>(Ref());
585 listener_watcher_ = watcher.get();
586 xds_client_->WatchListenerData(server_name_, std::move(watcher));
587 }
588
ShutdownLocked()589 void XdsResolver::ShutdownLocked() {
590 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
591 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
592 }
593 if (xds_client_ != nullptr) {
594 if (listener_watcher_ != nullptr) {
595 xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
596 /*delay_unsubscription=*/false);
597 }
598 if (route_config_watcher_ != nullptr) {
599 xds_client_->CancelRouteConfigDataWatch(
600 server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
601 }
602 channelz::ChannelNode* parent_channelz_node =
603 grpc_channel_args_find_pointer<channelz::ChannelNode>(
604 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
605 if (parent_channelz_node != nullptr) {
606 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
607 }
608 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
609 interested_parties_);
610 xds_client_.reset();
611 }
612 }
613
OnListenerUpdate(XdsApi::LdsUpdate listener)614 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
615 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
616 gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
617 }
618 if (listener.route_config_name != route_config_name_) {
619 if (route_config_watcher_ != nullptr) {
620 xds_client_->CancelRouteConfigDataWatch(
621 route_config_name_, route_config_watcher_,
622 /*delay_unsubscription=*/!listener.route_config_name.empty());
623 route_config_watcher_ = nullptr;
624 }
625 route_config_name_ = std::move(listener.route_config_name);
626 if (!route_config_name_.empty()) {
627 auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
628 route_config_watcher_ = watcher.get();
629 xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
630 }
631 }
632 http_max_stream_duration_ = listener.http_max_stream_duration;
633 if (route_config_name_.empty()) {
634 GPR_ASSERT(listener.rds_update.has_value());
635 OnRouteConfigUpdate(std::move(*listener.rds_update));
636 }
637 }
638
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)639 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
640 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
641 gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
642 }
643 // Find the relevant VirtualHost from the RouteConfiguration.
644 XdsApi::RdsUpdate::VirtualHost* vhost =
645 rds_update.FindVirtualHostForDomain(server_name_);
646 if (vhost == nullptr) {
647 OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
648 absl::StrCat("could not find VirtualHost for ", server_name_,
649 " in RouteConfiguration")
650 .c_str()));
651 return;
652 }
653 // Save the list of routes in the resolver.
654 current_update_ = std::move(vhost->routes);
655 // Send a new result to the channel.
656 GenerateResult();
657 }
658
OnError(grpc_error * error)659 void XdsResolver::OnError(grpc_error* error) {
660 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
661 this, grpc_error_string(error));
662 Result result;
663 result.args = grpc_channel_args_copy(args_);
664 result.service_config_error = error;
665 result_handler()->ReturnResult(std::move(result));
666 }
667
OnResourceDoesNotExist()668 void XdsResolver::OnResourceDoesNotExist() {
669 gpr_log(GPR_ERROR,
670 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
671 "update and returning empty service config",
672 this);
673 current_update_.clear();
674 Result result;
675 result.service_config =
676 ServiceConfig::Create(args_, "{}", &result.service_config_error);
677 GPR_ASSERT(result.service_config != nullptr);
678 result.args = grpc_channel_args_copy(args_);
679 result_handler()->ReturnResult(std::move(result));
680 }
681
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)682 grpc_error* XdsResolver::CreateServiceConfig(
683 RefCountedPtr<ServiceConfig>* service_config) {
684 std::vector<std::string> clusters;
685 for (const auto& cluster : cluster_state_map_) {
686 clusters.push_back(
687 absl::StrFormat(" \"%s\":{\n"
688 " \"childPolicy\":[ {\n"
689 " \"cds_experimental\":{\n"
690 " \"cluster\": \"%s\"\n"
691 " }\n"
692 " } ]\n"
693 " }",
694 cluster.first, cluster.first));
695 }
696 std::vector<std::string> config_parts;
697 config_parts.push_back(
698 "{\n"
699 " \"loadBalancingConfig\":[\n"
700 " { \"xds_cluster_manager_experimental\":{\n"
701 " \"children\":{\n");
702 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
703 config_parts.push_back(
704 " }\n"
705 " } }\n"
706 " ]\n"
707 "}");
708 std::string json = absl::StrJoin(config_parts, "");
709 grpc_error* error = GRPC_ERROR_NONE;
710 *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
711 return error;
712 }
713
GenerateResult()714 void XdsResolver::GenerateResult() {
715 if (current_update_.empty()) return;
716 // First create XdsConfigSelector, which may add new entries to the cluster
717 // state map, and then CreateServiceConfig for LB policies.
718 grpc_error* error = GRPC_ERROR_NONE;
719 auto config_selector =
720 MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
721 if (error != GRPC_ERROR_NONE) {
722 OnError(error);
723 return;
724 }
725 Result result;
726 error = CreateServiceConfig(&result.service_config);
727 if (error != GRPC_ERROR_NONE) {
728 OnError(error);
729 return;
730 }
731 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
732 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
733 result.service_config->json_string().c_str());
734 }
735 grpc_arg new_arg = config_selector->MakeChannelArg();
736 result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
737 result_handler()->ReturnResult(std::move(result));
738 }
739
MaybeRemoveUnusedClusters()740 void XdsResolver::MaybeRemoveUnusedClusters() {
741 bool update_needed = false;
742 for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
743 RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
744 if (cluster_state != nullptr) {
745 ++it;
746 } else {
747 update_needed = true;
748 it = cluster_state_map_.erase(it);
749 }
750 }
751 if (update_needed && xds_client_ != nullptr) {
752 // Send a new result to the channel.
753 GenerateResult();
754 }
755 }
756
757 //
758 // Factory
759 //
760
761 class XdsResolverFactory : public ResolverFactory {
762 public:
IsValidUri(const URI & uri) const763 bool IsValidUri(const URI& uri) const override {
764 if (GPR_UNLIKELY(!uri.authority().empty())) {
765 gpr_log(GPR_ERROR, "URI authority not supported");
766 return false;
767 }
768 return true;
769 }
770
CreateResolver(ResolverArgs args) const771 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
772 if (!IsValidUri(args.uri)) return nullptr;
773 return MakeOrphanable<XdsResolver>(std::move(args));
774 }
775
scheme() const776 const char* scheme() const override { return "xds"; }
777 };
778
779 } // namespace
780
781 } // namespace grpc_core
782
grpc_resolver_xds_init()783 void grpc_resolver_xds_init() {
784 grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
785 absl::make_unique<grpc_core::XdsResolverFactory>());
786 }
787
grpc_resolver_xds_shutdown()788 void grpc_resolver_xds_shutdown() {}
789