1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <atomic> 25 #include <chrono> 26 #include <deque> 27 #include <vector> 28 29 #include <grpc/support/log.h> 30 #include <grpcpp/impl/codegen/config.h> 31 32 #include "src/cpp/server/load_reporter/load_data_store.h" 33 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" 34 35 #include "opencensus/stats/stats.h" 36 37 namespace grpc { 38 namespace load_reporter { 39 40 // The interface to get the Census stats. Abstracted for mocking. 41 class CensusViewProvider { 42 public: 43 // Maps from the view name to the view data. 44 using ViewDataMap = 45 std::unordered_map<grpc::string, ::opencensus::stats::ViewData>; 46 // Maps from the view name to the view descriptor. 47 using ViewDescriptorMap = 48 std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>; 49 50 CensusViewProvider(); 51 virtual ~CensusViewProvider() = default; 52 53 // Fetches the view data accumulated since last fetching, and returns it as a 54 // map from the view name to the view data. 55 virtual ViewDataMap FetchViewData() = 0; 56 57 // A helper function that gets a row with the input tag values from the view 58 // data. Only used when we know that row must exist because we have seen a row 59 // with the same tag values in a related view data. Several ViewData's are 60 // considered related if their views are based on the measures that are always 61 // recorded at the same time. 62 static double GetRelatedViewDataRowDouble( 63 const ViewDataMap& view_data_map, const char* view_name, 64 size_t view_name_len, const std::vector<grpc::string>& tag_values); 65 static uint64_t GetRelatedViewDataRowInt( 66 const ViewDataMap& view_data_map, const char* view_name, 67 size_t view_name_len, const std::vector<grpc::string>& tag_values); 68 69 protected: view_descriptor_map()70 const ViewDescriptorMap& view_descriptor_map() const { 71 return view_descriptor_map_; 72 } 73 74 private: 75 ViewDescriptorMap view_descriptor_map_; 76 // Tag keys. 77 ::opencensus::stats::TagKey tag_key_token_; 78 ::opencensus::stats::TagKey tag_key_host_; 79 ::opencensus::stats::TagKey tag_key_user_id_; 80 ::opencensus::stats::TagKey tag_key_status_; 81 ::opencensus::stats::TagKey tag_key_metric_name_; 82 }; 83 84 // The default implementation fetches the real stats from Census. 85 class CensusViewProviderDefaultImpl : public CensusViewProvider { 86 public: 87 CensusViewProviderDefaultImpl(); 88 89 ViewDataMap FetchViewData() override; 90 91 private: 92 std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_; 93 }; 94 95 // The interface to get the CPU stats. Abstracted for mocking. 96 class CpuStatsProvider { 97 public: 98 // The used and total amounts of CPU usage. 99 using CpuStatsSample = std::pair<uint64_t, uint64_t>; 100 101 virtual ~CpuStatsProvider() = default; 102 103 // Gets the cumulative used CPU and total CPU resource. 104 virtual CpuStatsSample GetCpuStats() = 0; 105 }; 106 107 // The default implementation reads CPU jiffies from the system to calculate CPU 108 // utilization. 109 class CpuStatsProviderDefaultImpl : public CpuStatsProvider { 110 public: 111 CpuStatsSample GetCpuStats() override; 112 }; 113 114 // Maintains all the load data and load reporting streams. 115 class LoadReporter { 116 public: 117 // TODO(juanlishen): Allow config for providers from users. LoadReporter(uint32_t feedback_sample_window_seconds,std::unique_ptr<CensusViewProvider> census_view_provider,std::unique_ptr<CpuStatsProvider> cpu_stats_provider)118 LoadReporter(uint32_t feedback_sample_window_seconds, 119 std::unique_ptr<CensusViewProvider> census_view_provider, 120 std::unique_ptr<CpuStatsProvider> cpu_stats_provider) 121 : feedback_sample_window_seconds_(feedback_sample_window_seconds), 122 census_view_provider_(std::move(census_view_provider)), 123 cpu_stats_provider_(std::move(cpu_stats_provider)) { 124 // Append the initial record so that the next real record can have a base. 125 AppendNewFeedbackRecord(0, 0); 126 } 127 128 // Fetches the latest data from Census and merge it into the data store. 129 // Also adds a new sample to the LB feedback sliding window. 130 // Thread-unsafe. (1). The access to the load data store and feedback records 131 // has locking. (2). The access to the Census view provider and CPU stats 132 // provider lacks locking, but we only access these two members in this method 133 // (in testing, we also access them when setting up expectation). So the 134 // invocations of this method must be serialized. 135 void FetchAndSample(); 136 137 // Generates a report for that host and balancer. The report contains 138 // all the stats data accumulated between the last report (i.e., the last 139 // consumption) and the last fetch from Census (i.e., the last production). 140 // Thread-safe. 141 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads( 142 const grpc::string& hostname, const grpc::string& lb_id); 143 144 // The feedback is calculated from the stats data recorded in the sliding 145 // window. Outdated records are discarded. 146 // Thread-safe. 147 ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback(); 148 149 // Wrapper around LoadDataStore::ReportStreamCreated. 150 // Thread-safe. 151 void ReportStreamCreated(const grpc::string& hostname, 152 const grpc::string& lb_id, 153 const grpc::string& load_key); 154 155 // Wrapper around LoadDataStore::ReportStreamClosed. 156 // Thread-safe. 157 void ReportStreamClosed(const grpc::string& hostname, 158 const grpc::string& lb_id); 159 160 // Generates a unique LB ID of length kLbIdLength. Returns an empty string 161 // upon failure. Thread-safe. 162 grpc::string GenerateLbId(); 163 164 // Accessors only for testing. census_view_provider()165 CensusViewProvider* census_view_provider() { 166 return census_view_provider_.get(); 167 } cpu_stats_provider()168 CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); } 169 170 private: 171 struct LoadBalancingFeedbackRecord { 172 std::chrono::system_clock::time_point end_time; 173 uint64_t rpcs; 174 uint64_t errors; 175 uint64_t cpu_usage; 176 uint64_t cpu_limit; 177 LoadBalancingFeedbackRecordLoadBalancingFeedbackRecord178 LoadBalancingFeedbackRecord( 179 const std::chrono::system_clock::time_point& end_time, uint64_t rpcs, 180 uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit) 181 : end_time(end_time), 182 rpcs(rpcs), 183 errors(errors), 184 cpu_usage(cpu_usage), 185 cpu_limit(cpu_limit) {} 186 }; 187 188 // Finds the view data about starting call from the view_data_map and merges 189 // the data to the load data store. 190 void ProcessViewDataCallStart( 191 const CensusViewProvider::ViewDataMap& view_data_map); 192 // Finds the view data about ending call from the view_data_map and merges the 193 // data to the load data store. 194 void ProcessViewDataCallEnd( 195 const CensusViewProvider::ViewDataMap& view_data_map); 196 // Finds the view data about the customized call metrics from the 197 // view_data_map and merges the data to the load data store. 198 void ProcessViewDataOtherCallMetrics( 199 const CensusViewProvider::ViewDataMap& view_data_map); 200 IsRecordInWindow(const LoadBalancingFeedbackRecord & record,std::chrono::system_clock::time_point now)201 bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record, 202 std::chrono::system_clock::time_point now) { 203 return record.end_time > now - feedback_sample_window_seconds_; 204 } 205 206 void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors); 207 208 // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches 209 // it to the load. 210 void AttachOrphanLoadId(::grpc::lb::v1::Load* load, 211 const PerBalancerStore& per_balancer_store); 212 213 std::atomic<int64_t> next_lb_id_{0}; 214 const std::chrono::seconds feedback_sample_window_seconds_; 215 std::mutex feedback_mu_; 216 std::deque<LoadBalancingFeedbackRecord> feedback_records_; 217 // TODO(juanlishen): Lock in finer grain. Locking the whole store may be 218 // too expensive. 219 std::mutex store_mu_; 220 LoadDataStore load_data_store_; 221 std::unique_ptr<CensusViewProvider> census_view_provider_; 222 std::unique_ptr<CpuStatsProvider> cpu_stats_provider_; 223 }; 224 225 } // namespace load_reporter 226 } // namespace grpc 227 228 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 229