1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <memory> 25 #include <set> 26 #include <unordered_map> 27 28 #include <grpc/support/log.h> 29 #include <grpcpp/impl/codegen/config.h> 30 31 #include "src/cpp/server/load_reporter/constants.h" 32 33 namespace grpc { 34 namespace load_reporter { 35 36 // The load data storage is organized in hierarchy. The LoadDataStore is the 37 // top-level data store. In LoadDataStore, for each host we keep a 38 // PerHostStore, in which for each balancer we keep a PerBalancerStore. Each 39 // PerBalancerStore maintains a map of load records, mapping from LoadRecordKey 40 // to LoadRecordValue. The LoadRecordValue contains a map of customized call 41 // metrics, mapping from a call metric name to the CallMetricValue. 42 43 // The value of a customized call metric. 44 class CallMetricValue { 45 public: 46 explicit CallMetricValue(uint64_t num_calls = 0, 47 double total_metric_value = 0) num_calls_(num_calls)48 : num_calls_(num_calls), total_metric_value_(total_metric_value) {} 49 MergeFrom(CallMetricValue other)50 void MergeFrom(CallMetricValue other) { 51 num_calls_ += other.num_calls_; 52 total_metric_value_ += other.total_metric_value_; 53 } 54 55 // Getters. num_calls()56 uint64_t num_calls() const { return num_calls_; } total_metric_value()57 double total_metric_value() const { return total_metric_value_; } 58 59 private: 60 // The number of calls that finished with this metric. 61 uint64_t num_calls_ = 0; 62 // The sum of metric values across all the calls that finished with this 63 // metric. 64 double total_metric_value_ = 0; 65 }; 66 67 // The key of a load record. 68 class LoadRecordKey { 69 public: LoadRecordKey(grpc::string lb_id,grpc::string lb_tag,grpc::string user_id,grpc::string client_ip_hex)70 LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id, 71 grpc::string client_ip_hex) 72 : lb_id_(std::move(lb_id)), 73 lb_tag_(std::move(lb_tag)), 74 user_id_(std::move(user_id)), 75 client_ip_hex_(std::move(client_ip_hex)) {} 76 77 // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag. 78 LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id); 79 ToString()80 grpc::string ToString() const { 81 return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ + 82 ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ + 83 "]"; 84 } 85 86 bool operator==(const LoadRecordKey& other) const { 87 return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ && 88 user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_; 89 } 90 91 // Gets the client IP bytes in network order (i.e., big-endian). 92 grpc::string GetClientIpBytes() const; 93 94 // Getters. lb_id()95 const grpc::string& lb_id() const { return lb_id_; } lb_tag()96 const grpc::string& lb_tag() const { return lb_tag_; } user_id()97 const grpc::string& user_id() const { return user_id_; } client_ip_hex()98 const grpc::string& client_ip_hex() const { return client_ip_hex_; } 99 100 struct Hasher { hash_combineHasher101 void hash_combine(size_t* seed, const grpc::string& k) const { 102 *seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) + 103 (*seed >> 2); 104 } 105 operatorHasher106 size_t operator()(const LoadRecordKey& k) const { 107 size_t h = 0; 108 hash_combine(&h, k.lb_id_); 109 hash_combine(&h, k.lb_tag_); 110 hash_combine(&h, k.user_id_); 111 hash_combine(&h, k.client_ip_hex_); 112 return h; 113 } 114 }; 115 116 private: 117 grpc::string lb_id_; 118 grpc::string lb_tag_; 119 grpc::string user_id_; 120 grpc::string client_ip_hex_; 121 }; 122 123 // The value of a load record. 124 class LoadRecordValue { 125 public: 126 explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0, 127 uint64_t error_count = 0, uint64_t bytes_sent = 0, 128 uint64_t bytes_recv = 0, uint64_t latency_ms = 0) start_count_(start_count)129 : start_count_(start_count), 130 ok_count_(ok_count), 131 error_count_(error_count), 132 bytes_sent_(bytes_sent), 133 bytes_recv_(bytes_recv), 134 latency_ms_(latency_ms) {} 135 136 LoadRecordValue(grpc::string metric_name, uint64_t num_calls, 137 double total_metric_value); 138 MergeFrom(const LoadRecordValue & other)139 void MergeFrom(const LoadRecordValue& other) { 140 start_count_ += other.start_count_; 141 ok_count_ += other.ok_count_; 142 error_count_ += other.error_count_; 143 bytes_sent_ += other.bytes_sent_; 144 bytes_recv_ += other.bytes_recv_; 145 latency_ms_ += other.latency_ms_; 146 for (const auto& p : other.call_metrics_) { 147 const grpc::string& key = p.first; 148 const CallMetricValue& value = p.second; 149 call_metrics_[key].MergeFrom(value); 150 } 151 } 152 GetNumCallsInProgressDelta()153 int64_t GetNumCallsInProgressDelta() const { 154 return static_cast<int64_t>(start_count_ - ok_count_ - error_count_); 155 } 156 ToString()157 grpc::string ToString() const { 158 return "[start_count_=" + grpc::to_string(start_count_) + 159 ", ok_count_=" + grpc::to_string(ok_count_) + 160 ", error_count_=" + grpc::to_string(error_count_) + 161 ", bytes_sent_=" + grpc::to_string(bytes_sent_) + 162 ", bytes_recv_=" + grpc::to_string(bytes_recv_) + 163 ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " + 164 grpc::to_string(call_metrics_.size()) + " other call metric(s)]"; 165 } 166 InsertCallMetric(const grpc::string & metric_name,const CallMetricValue & metric_value)167 bool InsertCallMetric(const grpc::string& metric_name, 168 const CallMetricValue& metric_value) { 169 return call_metrics_.insert({metric_name, metric_value}).second; 170 } 171 172 // Getters. start_count()173 uint64_t start_count() const { return start_count_; } ok_count()174 uint64_t ok_count() const { return ok_count_; } error_count()175 uint64_t error_count() const { return error_count_; } bytes_sent()176 uint64_t bytes_sent() const { return bytes_sent_; } bytes_recv()177 uint64_t bytes_recv() const { return bytes_recv_; } latency_ms()178 uint64_t latency_ms() const { return latency_ms_; } call_metrics()179 const std::unordered_map<grpc::string, CallMetricValue>& call_metrics() 180 const { 181 return call_metrics_; 182 } 183 184 private: 185 uint64_t start_count_ = 0; 186 uint64_t ok_count_ = 0; 187 uint64_t error_count_ = 0; 188 uint64_t bytes_sent_ = 0; 189 uint64_t bytes_recv_ = 0; 190 uint64_t latency_ms_ = 0; 191 std::unordered_map<grpc::string, CallMetricValue> call_metrics_; 192 }; 193 194 // Stores the data associated with a particular LB ID. 195 class PerBalancerStore { 196 public: 197 using LoadRecordMap = 198 std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>; 199 PerBalancerStore(grpc::string lb_id,grpc::string load_key)200 PerBalancerStore(grpc::string lb_id, grpc::string load_key) 201 : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {} 202 203 // Merge a load record with the given key and value if the store is not 204 // suspended. 205 void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value); 206 207 // Suspend this store, so that no detailed load data will be recorded. 208 void Suspend(); 209 // Resume this store from suspension. 210 void Resume(); 211 // Is this store suspended or not? IsSuspended()212 bool IsSuspended() const { return suspended_; } 213 IsNumCallsInProgressChangedSinceLastReport()214 bool IsNumCallsInProgressChangedSinceLastReport() const { 215 return num_calls_in_progress_ != last_reported_num_calls_in_progress_; 216 } 217 218 uint64_t GetNumCallsInProgressForReport(); 219 ToString()220 grpc::string ToString() { 221 return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ + 222 "]"; 223 } 224 ClearLoadRecordMap()225 void ClearLoadRecordMap() { load_record_map_.clear(); } 226 227 // Getters. lb_id()228 const grpc::string& lb_id() const { return lb_id_; } load_key()229 const grpc::string& load_key() const { return load_key_; } load_record_map()230 const LoadRecordMap& load_record_map() const { return load_record_map_; } 231 232 private: 233 grpc::string lb_id_; 234 // TODO(juanlishen): Use bytestring protobuf type? 235 grpc::string load_key_; 236 LoadRecordMap load_record_map_; 237 uint64_t num_calls_in_progress_ = 0; 238 uint64_t last_reported_num_calls_in_progress_ = 0; 239 bool suspended_ = false; 240 }; 241 242 // Stores the data associated with a particular host. 243 class PerHostStore { 244 public: 245 // When a report stream is created, a PerBalancerStore is created for the 246 // LB ID (guaranteed unique) associated with that stream. If it is the only 247 // active store, adopt all the orphaned stores. If it is the first created 248 // store, adopt the store of kInvalidLbId. 249 void ReportStreamCreated(const grpc::string& lb_id, 250 const grpc::string& load_key); 251 252 // When a report stream is closed, the PerBalancerStores assigned to the 253 // associate LB ID need to be re-assigned to other active balancers, 254 // ideally with the same load key. If there is no active balancer, we have 255 // to suspend those stores and drop the incoming load data until they are 256 // resumed. 257 void ReportStreamClosed(const grpc::string& lb_id); 258 259 // Returns null if not found. Caller doesn't own the returned store. 260 PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const; 261 262 // Returns null if lb_id is not found. The returned pointer points to the 263 // underlying data structure, which is not owned by the caller. 264 const std::set<PerBalancerStore*>* GetAssignedStores( 265 const grpc::string& lb_id) const; 266 267 private: 268 // Creates a PerBalancerStore for the given LB ID, assigns the store to 269 // itself, and records the LB ID to the load key. 270 void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key); 271 272 void AssignOrphanedStore(PerBalancerStore* orphaned_store, 273 const grpc::string& new_receiver); 274 275 std::unordered_map<grpc::string, std::set<grpc::string>> 276 load_key_to_receiving_lb_ids_; 277 278 // Key: LB ID. The key set includes all the LB IDs that have been 279 // allocated for reporting streams so far. 280 // Value: the unique pointer to the PerBalancerStore of the LB ID. 281 std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>> 282 per_balancer_stores_; 283 284 // Key: LB ID. The key set includes the LB IDs of the balancers that are 285 // currently receiving report. 286 // Value: the set of raw pointers to the PerBalancerStores assigned to the LB 287 // ID. Note that the sets in assigned_stores_ form a division of the value set 288 // of per_balancer_stores_. 289 std::unordered_map<grpc::string, std::set<PerBalancerStore*>> 290 assigned_stores_; 291 }; 292 293 // Thread-unsafe two-level bookkeeper of all the load data. 294 // Note: We never remove any store objects from this class, as per the 295 // current spec. That's because premature removal of the store objects 296 // may lead to loss of critical information, e.g., mapping from lb_id to 297 // load_key, and the number of in-progress calls. Such loss will cause 298 // information inconsistency when the balancer is re-connected. Keeping 299 // all the stores should be fine for PerHostStore, since we assume there 300 // should only be a few hostnames. But it's a potential problem for 301 // PerBalancerStore. 302 class LoadDataStore { 303 public: 304 // Returns null if not found. Caller doesn't own the returned store. 305 PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname, 306 const grpc::string& lb_id) const; 307 308 // Returns null if hostname or lb_id is not found. The returned pointer points 309 // to the underlying data structure, which is not owned by the caller. 310 const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname, 311 const string& lb_id); 312 313 // If a PerBalancerStore can be found by the hostname and LB ID in 314 // LoadRecordKey, the load data will be merged to that store. Otherwise, 315 // only track the number of the in-progress calls for this unknown LB ID. 316 void MergeRow(const grpc::string& hostname, const LoadRecordKey& key, 317 const LoadRecordValue& value); 318 319 // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated 320 // with some received load data but unknown to this load data store)? IsTrackedUnknownBalancerId(const grpc::string & lb_id)321 bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const { 322 return unknown_balancer_id_trackers_.find(lb_id) != 323 unknown_balancer_id_trackers_.end(); 324 } 325 326 // Wrapper around PerHostStore::ReportStreamCreated. 327 void ReportStreamCreated(const grpc::string& hostname, 328 const grpc::string& lb_id, 329 const grpc::string& load_key); 330 331 // Wrapper around PerHostStore::ReportStreamClosed. 332 void ReportStreamClosed(const grpc::string& hostname, 333 const grpc::string& lb_id); 334 335 private: 336 // Buffered data that was fetched from Census but hasn't been sent to 337 // balancer. We need to keep this data ourselves because Census will 338 // delete the data once it's returned. 339 std::unordered_map<grpc::string, PerHostStore> per_host_stores_; 340 341 // Tracks the number of in-progress calls for each unknown LB ID. 342 std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_; 343 }; 344 345 } // namespace load_reporter 346 } // namespace grpc 347 348 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H 349