1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/impl/codegen/port_platform.h>
20 
21 #include <stdint.h>
22 #include <stdio.h>
23 #include <chrono>
24 #include <ctime>
25 #include <iterator>
26 
27 #include "src/cpp/server/load_reporter/constants.h"
28 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
29 #include "src/cpp/server/load_reporter/load_reporter.h"
30 
31 #include "opencensus/stats/internal/set_aggregation_window.h"
32 
33 namespace grpc {
34 namespace load_reporter {
35 
GetCpuStats()36 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
37   return GetCpuStatsImpl();
38 }
39 
CensusViewProvider()40 CensusViewProvider::CensusViewProvider()
41     : tag_key_token_(::opencensus::stats::TagKey::Register(kTagKeyToken)),
42       tag_key_host_(::opencensus::stats::TagKey::Register(kTagKeyHost)),
43       tag_key_user_id_(::opencensus::stats::TagKey::Register(kTagKeyUserId)),
44       tag_key_status_(::opencensus::stats::TagKey::Register(kTagKeyStatus)),
45       tag_key_metric_name_(
46           ::opencensus::stats::TagKey::Register(kTagKeyMetricName)) {
47   // One view related to starting a call.
48   auto vd_start_count =
49       ::opencensus::stats::ViewDescriptor()
50           .set_name(kViewStartCount)
51           .set_measure(kMeasureStartCount)
52           .set_aggregation(::opencensus::stats::Aggregation::Sum())
53           .add_column(tag_key_token_)
54           .add_column(tag_key_host_)
55           .add_column(tag_key_user_id_)
56           .set_description(
57               "Delta count of calls started broken down by <token, host, "
58               "user_id>.");
59   ::opencensus::stats::SetAggregationWindow(
60       ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
61   view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
62   // Four views related to ending a call.
63   // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
64   // measure), it's infeasible to prepare fake data for testing. That's because
65   // the OpenCensus API to make up view data will add the input data as separate
66   // measurements instead of setting the data values directly.
67   auto vd_end_count =
68       ::opencensus::stats::ViewDescriptor()
69           .set_name(kViewEndCount)
70           .set_measure(kMeasureEndCount)
71           .set_aggregation(::opencensus::stats::Aggregation::Sum())
72           .add_column(tag_key_token_)
73           .add_column(tag_key_host_)
74           .add_column(tag_key_user_id_)
75           .add_column(tag_key_status_)
76           .set_description(
77               "Delta count of calls ended broken down by <token, host, "
78               "user_id, status>.");
79   ::opencensus::stats::SetAggregationWindow(
80       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
81   view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
82   auto vd_end_bytes_sent =
83       ::opencensus::stats::ViewDescriptor()
84           .set_name(kViewEndBytesSent)
85           .set_measure(kMeasureEndBytesSent)
86           .set_aggregation(::opencensus::stats::Aggregation::Sum())
87           .add_column(tag_key_token_)
88           .add_column(tag_key_host_)
89           .add_column(tag_key_user_id_)
90           .add_column(tag_key_status_)
91           .set_description(
92               "Delta sum of bytes sent broken down by <token, host, user_id, "
93               "status>.");
94   ::opencensus::stats::SetAggregationWindow(
95       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
96   view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
97   auto vd_end_bytes_received =
98       ::opencensus::stats::ViewDescriptor()
99           .set_name(kViewEndBytesReceived)
100           .set_measure(kMeasureEndBytesReceived)
101           .set_aggregation(::opencensus::stats::Aggregation::Sum())
102           .add_column(tag_key_token_)
103           .add_column(tag_key_host_)
104           .add_column(tag_key_user_id_)
105           .add_column(tag_key_status_)
106           .set_description(
107               "Delta sum of bytes received broken down by <token, host, "
108               "user_id, status>.");
109   ::opencensus::stats::SetAggregationWindow(
110       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
111   view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
112   auto vd_end_latency_ms =
113       ::opencensus::stats::ViewDescriptor()
114           .set_name(kViewEndLatencyMs)
115           .set_measure(kMeasureEndLatencyMs)
116           .set_aggregation(::opencensus::stats::Aggregation::Sum())
117           .add_column(tag_key_token_)
118           .add_column(tag_key_host_)
119           .add_column(tag_key_user_id_)
120           .add_column(tag_key_status_)
121           .set_description(
122               "Delta sum of latency in ms broken down by <token, host, "
123               "user_id, status>.");
124   ::opencensus::stats::SetAggregationWindow(
125       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
126   view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
127   // Two views related to other call metrics.
128   auto vd_metric_call_count =
129       ::opencensus::stats::ViewDescriptor()
130           .set_name(kViewOtherCallMetricCount)
131           .set_measure(kMeasureOtherCallMetric)
132           .set_aggregation(::opencensus::stats::Aggregation::Count())
133           .add_column(tag_key_token_)
134           .add_column(tag_key_host_)
135           .add_column(tag_key_user_id_)
136           .add_column(tag_key_metric_name_)
137           .set_description(
138               "Delta count of calls broken down by <token, host, user_id, "
139               "metric_name>.");
140   ::opencensus::stats::SetAggregationWindow(
141       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
142   view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
143   auto vd_metric_value =
144       ::opencensus::stats::ViewDescriptor()
145           .set_name(kViewOtherCallMetricValue)
146           .set_measure(kMeasureOtherCallMetric)
147           .set_aggregation(::opencensus::stats::Aggregation::Sum())
148           .add_column(tag_key_token_)
149           .add_column(tag_key_host_)
150           .add_column(tag_key_user_id_)
151           .add_column(tag_key_metric_name_)
152           .set_description(
153               "Delta sum of call metric value broken down "
154               "by <token, host, user_id, metric_name>.");
155   ::opencensus::stats::SetAggregationWindow(
156       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
157   view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
158 }
159 
GetRelatedViewDataRowDouble(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<grpc::string> & tag_values)160 double CensusViewProvider::GetRelatedViewDataRowDouble(
161     const ViewDataMap& view_data_map, const char* view_name,
162     size_t view_name_len, const std::vector<grpc::string>& tag_values) {
163   auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
164   GPR_ASSERT(it_vd != view_data_map.end());
165   GPR_ASSERT(it_vd->second.type() ==
166              ::opencensus::stats::ViewData::Type::kDouble);
167   auto it_row = it_vd->second.double_data().find(tag_values);
168   GPR_ASSERT(it_row != it_vd->second.double_data().end());
169   return it_row->second;
170 }
171 
GetRelatedViewDataRowInt(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<grpc::string> & tag_values)172 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
173     const ViewDataMap& view_data_map, const char* view_name,
174     size_t view_name_len, const std::vector<grpc::string>& tag_values) {
175   auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
176   GPR_ASSERT(it_vd != view_data_map.end());
177   GPR_ASSERT(it_vd->second.type() ==
178              ::opencensus::stats::ViewData::Type::kInt64);
179   auto it_row = it_vd->second.int_data().find(tag_values);
180   GPR_ASSERT(it_row != it_vd->second.int_data().end());
181   GPR_ASSERT(it_row->second >= 0);
182   return it_row->second;
183 }
184 
CensusViewProviderDefaultImpl()185 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
186   for (const auto& p : view_descriptor_map()) {
187     const grpc::string& view_name = p.first;
188     const ::opencensus::stats::ViewDescriptor& vd = p.second;
189     // We need to use pair's piecewise ctor here, otherwise the deleted copy
190     // ctor of View will be called.
191     view_map_.emplace(std::piecewise_construct,
192                       std::forward_as_tuple(view_name),
193                       std::forward_as_tuple(vd));
194   }
195 }
196 
FetchViewData()197 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
198   gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
199   ViewDataMap view_data_map;
200   for (auto& p : view_map_) {
201     const grpc::string& view_name = p.first;
202     ::opencensus::stats::View& view = p.second;
203     if (view.IsValid()) {
204       view_data_map.emplace(view_name, view.GetData());
205       gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
206               view_name.c_str());
207     } else {
208       gpr_log(
209           GPR_DEBUG,
210           "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
211           this, view_name.c_str());
212     }
213   }
214   return view_data_map;
215 }
216 
GenerateLbId()217 grpc::string LoadReporter::GenerateLbId() {
218   while (true) {
219     if (next_lb_id_ > UINT32_MAX) {
220       gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
221               this);
222       return "";
223     }
224     int64_t lb_id = next_lb_id_++;
225     // Overflow should never happen.
226     GPR_ASSERT(lb_id >= 0);
227     // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
228     char buf[kLbIdLength + 1];
229     snprintf(buf, sizeof(buf), "%08lx", lb_id);
230     grpc::string lb_id_str(buf, kLbIdLength);
231     // The client may send requests with LB ID that has never been allocated
232     // by this load reporter. Those IDs are tracked and will be skipped when
233     // we generate a new ID.
234     if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
235       return lb_id_str;
236     }
237   }
238 }
239 
240 ::grpc::lb::v1::LoadBalancingFeedback
GenerateLoadBalancingFeedback()241 LoadReporter::GenerateLoadBalancingFeedback() {
242   std::unique_lock<std::mutex> lock(feedback_mu_);
243   auto now = std::chrono::system_clock::now();
244   // Discard records outside the window until there is only one record
245   // outside the window, which is used as the base for difference.
246   while (feedback_records_.size() > 1 &&
247          !IsRecordInWindow(feedback_records_[1], now)) {
248     feedback_records_.pop_front();
249   }
250   if (feedback_records_.size() < 2) {
251     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
252   }
253   // Find the longest range with valid ends.
254   auto oldest = feedback_records_.begin();
255   auto newest = feedback_records_.end() - 1;
256   while (std::distance(oldest, newest) > 0 &&
257          (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
258     // A zero limit means that the system info reading was failed, so these
259     // records can't be used to calculate CPU utilization.
260     if (newest->cpu_limit == 0) --newest;
261     if (oldest->cpu_limit == 0) ++oldest;
262   }
263   if (std::distance(oldest, newest) < 1 ||
264       oldest->end_time == newest->end_time ||
265       newest->cpu_limit == oldest->cpu_limit) {
266     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
267   }
268   uint64_t rpcs = 0;
269   uint64_t errors = 0;
270   for (auto p = newest; p != oldest; --p) {
271     // Because these two numbers are counters, the oldest record shouldn't be
272     // included.
273     rpcs += p->rpcs;
274     errors += p->errors;
275   }
276   double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
277   double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
278   std::chrono::duration<double> duration_seconds =
279       newest->end_time - oldest->end_time;
280   lock.unlock();
281   ::grpc::lb::v1::LoadBalancingFeedback feedback;
282   feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
283   feedback.set_calls_per_second(
284       static_cast<float>(rpcs / duration_seconds.count()));
285   feedback.set_errors_per_second(
286       static_cast<float>(errors / duration_seconds.count()));
287   return feedback;
288 }
289 
290 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
GenerateLoads(const grpc::string & hostname,const grpc::string & lb_id)291 LoadReporter::GenerateLoads(const grpc::string& hostname,
292                             const grpc::string& lb_id) {
293   std::lock_guard<std::mutex> lock(store_mu_);
294   auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
295   GPR_ASSERT(assigned_stores != nullptr);
296   GPR_ASSERT(!assigned_stores->empty());
297   ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
298   for (PerBalancerStore* per_balancer_store : *assigned_stores) {
299     GPR_ASSERT(!per_balancer_store->IsSuspended());
300     if (!per_balancer_store->load_record_map().empty()) {
301       for (const auto& p : per_balancer_store->load_record_map()) {
302         const auto& key = p.first;
303         const auto& value = p.second;
304         auto load = loads.Add();
305         load->set_load_balance_tag(key.lb_tag());
306         load->set_user_id(key.user_id());
307         load->set_client_ip_address(key.GetClientIpBytes());
308         load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
309         load->set_num_calls_finished_without_error(
310             static_cast<int64_t>(value.ok_count()));
311         load->set_num_calls_finished_with_error(
312             static_cast<int64_t>(value.error_count()));
313         load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
314         load->set_total_bytes_received(
315             static_cast<int64_t>(value.bytes_recv()));
316         load->mutable_total_latency()->set_seconds(
317             static_cast<int64_t>(value.latency_ms() / 1000));
318         load->mutable_total_latency()->set_nanos(
319             (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
320         for (const auto& p : value.call_metrics()) {
321           const grpc::string& metric_name = p.first;
322           const CallMetricValue& metric_value = p.second;
323           auto call_metric_data = load->add_metric_data();
324           call_metric_data->set_metric_name(metric_name);
325           call_metric_data->set_num_calls_finished_with_metric(
326               metric_value.num_calls());
327           call_metric_data->set_total_metric_value(
328               metric_value.total_metric_value());
329         }
330         if (per_balancer_store->lb_id() != lb_id) {
331           // This per-balancer store is an orphan assigned to this receiving
332           // balancer.
333           AttachOrphanLoadId(load, *per_balancer_store);
334         }
335       }
336       per_balancer_store->ClearLoadRecordMap();
337     }
338     if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
339       auto load = loads.Add();
340       load->set_num_calls_in_progress(
341           per_balancer_store->GetNumCallsInProgressForReport());
342       if (per_balancer_store->lb_id() != lb_id) {
343         // This per-balancer store is an orphan assigned to this receiving
344         // balancer.
345         AttachOrphanLoadId(load, *per_balancer_store);
346       }
347     }
348   }
349   return loads;
350 }
351 
AttachOrphanLoadId(::grpc::lb::v1::Load * load,const PerBalancerStore & per_balancer_store)352 void LoadReporter::AttachOrphanLoadId(
353     ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
354   if (per_balancer_store.lb_id() == kInvalidLbId) {
355     load->set_load_key_unknown(true);
356   } else {
357     // We shouldn't set load_key_unknown to any value in this case because
358     // load_key_unknown and orphaned_load_identifier are under an oneof struct.
359     load->mutable_orphaned_load_identifier()->set_load_key(
360         per_balancer_store.load_key());
361     load->mutable_orphaned_load_identifier()->set_load_balancer_id(
362         per_balancer_store.lb_id());
363   }
364 }
365 
AppendNewFeedbackRecord(uint64_t rpcs,uint64_t errors)366 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
367   CpuStatsProvider::CpuStatsSample cpu_stats;
368   if (cpu_stats_provider_ != nullptr) {
369     cpu_stats = cpu_stats_provider_->GetCpuStats();
370   } else {
371     // This will make the load balancing feedback generation a no-op.
372     cpu_stats = {0, 0};
373   }
374   std::unique_lock<std::mutex> lock(feedback_mu_);
375   feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
376                                  cpu_stats.first, cpu_stats.second);
377 }
378 
ReportStreamCreated(const grpc::string & hostname,const grpc::string & lb_id,const grpc::string & load_key)379 void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
380                                        const grpc::string& lb_id,
381                                        const grpc::string& load_key) {
382   std::lock_guard<std::mutex> lock(store_mu_);
383   load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
384   gpr_log(GPR_INFO,
385           "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
386           this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
387 }
388 
ReportStreamClosed(const grpc::string & hostname,const grpc::string & lb_id)389 void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
390                                       const grpc::string& lb_id) {
391   std::lock_guard<std::mutex> lock(store_mu_);
392   load_data_store_.ReportStreamClosed(hostname, lb_id);
393   gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
394           hostname.c_str(), lb_id.c_str());
395 }
396 
ProcessViewDataCallStart(const CensusViewProvider::ViewDataMap & view_data_map)397 void LoadReporter::ProcessViewDataCallStart(
398     const CensusViewProvider::ViewDataMap& view_data_map) {
399   auto it = view_data_map.find(kViewStartCount);
400   if (it != view_data_map.end()) {
401     for (const auto& p : it->second.int_data()) {
402       const std::vector<grpc::string>& tag_values = p.first;
403       const uint64_t start_count = static_cast<uint64_t>(p.second);
404       const grpc::string& client_ip_and_token = tag_values[0];
405       const grpc::string& host = tag_values[1];
406       const grpc::string& user_id = tag_values[2];
407       LoadRecordKey key(client_ip_and_token, user_id);
408       LoadRecordValue value = LoadRecordValue(start_count);
409       {
410         std::unique_lock<std::mutex> lock(store_mu_);
411         load_data_store_.MergeRow(host, key, value);
412       }
413     }
414   }
415 }
416 
ProcessViewDataCallEnd(const CensusViewProvider::ViewDataMap & view_data_map)417 void LoadReporter::ProcessViewDataCallEnd(
418     const CensusViewProvider::ViewDataMap& view_data_map) {
419   uint64_t total_end_count = 0;
420   uint64_t total_error_count = 0;
421   auto it = view_data_map.find(kViewEndCount);
422   if (it != view_data_map.end()) {
423     for (const auto& p : it->second.int_data()) {
424       const std::vector<grpc::string>& tag_values = p.first;
425       const uint64_t end_count = static_cast<uint64_t>(p.second);
426       const grpc::string& client_ip_and_token = tag_values[0];
427       const grpc::string& host = tag_values[1];
428       const grpc::string& user_id = tag_values[2];
429       const grpc::string& status = tag_values[3];
430       // This is due to a bug reported internally of Java server load reporting
431       // implementation.
432       // TODO(juanlishen): Check whether this situation happens in OSS C++.
433       if (client_ip_and_token.size() == 0) {
434         gpr_log(GPR_DEBUG,
435                 "Skipping processing Opencensus record with empty "
436                 "client_ip_and_token tag.");
437         continue;
438       }
439       LoadRecordKey key(client_ip_and_token, user_id);
440       const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
441           view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
442           tag_values);
443       const uint64_t bytes_received =
444           CensusViewProvider::GetRelatedViewDataRowInt(
445               view_data_map, kViewEndBytesReceived,
446               sizeof(kViewEndBytesReceived) - 1, tag_values);
447       const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
448           view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
449           tag_values);
450       uint64_t ok_count = 0;
451       uint64_t error_count = 0;
452       total_end_count += end_count;
453       if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
454         ok_count = end_count;
455       } else {
456         error_count = end_count;
457         total_error_count += end_count;
458       }
459       LoadRecordValue value = LoadRecordValue(
460           0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
461       {
462         std::unique_lock<std::mutex> lock(store_mu_);
463         load_data_store_.MergeRow(host, key, value);
464       }
465     }
466   }
467   AppendNewFeedbackRecord(total_end_count, total_error_count);
468 }
469 
ProcessViewDataOtherCallMetrics(const CensusViewProvider::ViewDataMap & view_data_map)470 void LoadReporter::ProcessViewDataOtherCallMetrics(
471     const CensusViewProvider::ViewDataMap& view_data_map) {
472   auto it = view_data_map.find(kViewOtherCallMetricCount);
473   if (it != view_data_map.end()) {
474     for (const auto& p : it->second.int_data()) {
475       const std::vector<grpc::string>& tag_values = p.first;
476       const int64_t num_calls = p.second;
477       const grpc::string& client_ip_and_token = tag_values[0];
478       const grpc::string& host = tag_values[1];
479       const grpc::string& user_id = tag_values[2];
480       const grpc::string& metric_name = tag_values[3];
481       LoadRecordKey key(client_ip_and_token, user_id);
482       const double total_metric_value =
483           CensusViewProvider::GetRelatedViewDataRowDouble(
484               view_data_map, kViewOtherCallMetricValue,
485               sizeof(kViewOtherCallMetricValue) - 1, tag_values);
486       LoadRecordValue value = LoadRecordValue(
487           metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
488       {
489         std::unique_lock<std::mutex> lock(store_mu_);
490         load_data_store_.MergeRow(host, key, value);
491       }
492     }
493   }
494 }
495 
FetchAndSample()496 void LoadReporter::FetchAndSample() {
497   gpr_log(GPR_DEBUG,
498           "[LR %p] Starts fetching Census view data and sampling LB feedback "
499           "record.",
500           this);
501   CensusViewProvider::ViewDataMap view_data_map =
502       census_view_provider_->FetchViewData();
503   ProcessViewDataCallStart(view_data_map);
504   ProcessViewDataCallEnd(view_data_map);
505   ProcessViewDataOtherCallMetrics(view_data_map);
506 }
507 
508 }  // namespace load_reporter
509 }  // namespace grpc
510