1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/impl/codegen/port_platform.h>
20
21 #include <stdio.h>
22 #include <cstdlib>
23 #include <set>
24 #include <unordered_map>
25 #include <vector>
26
27 #include "src/core/lib/iomgr/socket_utils.h"
28 #include "src/cpp/server/load_reporter/load_data_store.h"
29
30 namespace grpc {
31 namespace load_reporter {
32
33 // Some helper functions.
34 namespace {
35
36 // Given a map from type K to a set of value type V, finds the set associated
37 // with the given key and erases the value from the set. If the set becomes
38 // empty, also erases the key-set pair. Returns true if the value is erased
39 // successfully.
40 template <typename K, typename V>
UnorderedMapOfSetEraseKeyValue(std::unordered_map<K,std::set<V>> & map,const K & key,const V & value)41 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
42 const K& key, const V& value) {
43 auto it = map.find(key);
44 if (it != map.end()) {
45 size_t erased = it->second.erase(value);
46 if (it->second.size() == 0) {
47 map.erase(it);
48 }
49 return erased;
50 }
51 return false;
52 };
53
54 // Given a map from type K to a set of value type V, removes the given key and
55 // the associated set, and returns the set. Returns an empty set if the key is
56 // not found.
57 template <typename K, typename V>
UnorderedMapOfSetExtract(std::unordered_map<K,std::set<V>> & map,const K & key)58 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
59 const K& key) {
60 auto it = map.find(key);
61 if (it != map.end()) {
62 auto set = std::move(it->second);
63 map.erase(it);
64 return set;
65 }
66 return {};
67 };
68
69 // From a non-empty container, returns a pointer to a random element.
70 template <typename C>
RandomElement(const C & container)71 const typename C::value_type* RandomElement(const C& container) {
72 GPR_ASSERT(!container.empty());
73 auto it = container.begin();
74 std::advance(it, std::rand() % container.size());
75 return &(*it);
76 }
77
78 } // namespace
79
LoadRecordKey(const grpc::string & client_ip_and_token,grpc::string user_id)80 LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token,
81 grpc::string user_id)
82 : user_id_(std::move(user_id)) {
83 GPR_ASSERT(client_ip_and_token.size() >= 2);
84 int ip_hex_size;
85 GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
86 &ip_hex_size) == 1);
87 GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
88 ip_hex_size == kIpv6AddressLength);
89 size_t cur_pos = 2;
90 client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
91 cur_pos += ip_hex_size;
92 if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
93 lb_id_ = kInvalidLbId;
94 lb_tag_ = "";
95 } else {
96 lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
97 lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
98 }
99 }
100
GetClientIpBytes() const101 grpc::string LoadRecordKey::GetClientIpBytes() const {
102 if (client_ip_hex_.empty()) {
103 return "";
104 } else if (client_ip_hex_.size() == kIpv4AddressLength) {
105 uint32_t ip_bytes;
106 if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
107 gpr_log(GPR_ERROR,
108 "Can't parse client IP (%s) from a hex string to an integer.",
109 client_ip_hex_.c_str());
110 return "";
111 }
112 ip_bytes = grpc_htonl(ip_bytes);
113 return grpc::string(reinterpret_cast<const char*>(&ip_bytes),
114 sizeof(ip_bytes));
115 } else if (client_ip_hex_.size() == kIpv6AddressLength) {
116 uint32_t ip_bytes[4];
117 for (size_t i = 0; i < 4; ++i) {
118 if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
119 ip_bytes + i) != 1) {
120 gpr_log(
121 GPR_ERROR,
122 "Can't parse client IP part (%s) from a hex string to an integer.",
123 client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
124 return "";
125 }
126 ip_bytes[i] = grpc_htonl(ip_bytes[i]);
127 }
128 return grpc::string(reinterpret_cast<const char*>(ip_bytes),
129 sizeof(ip_bytes));
130 } else {
131 GPR_UNREACHABLE_CODE(return "");
132 }
133 }
134
LoadRecordValue(grpc::string metric_name,uint64_t num_calls,double total_metric_value)135 LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
136 double total_metric_value) {
137 call_metrics_.emplace(std::move(metric_name),
138 CallMetricValue(num_calls, total_metric_value));
139 }
140
MergeRow(const LoadRecordKey & key,const LoadRecordValue & value)141 void PerBalancerStore::MergeRow(const LoadRecordKey& key,
142 const LoadRecordValue& value) {
143 // During suspension, the load data received will be dropped.
144 if (!suspended_) {
145 load_record_map_[key].MergeFrom(value);
146 gpr_log(GPR_DEBUG,
147 "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
148 this, key.ToString().c_str(), value.ToString().c_str());
149 } else {
150 gpr_log(GPR_DEBUG,
151 "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
152 this, key.ToString().c_str(), value.ToString().c_str());
153 }
154 // We always keep track of num_calls_in_progress_, so that when this
155 // store is resumed, we still have a correct value of
156 // num_calls_in_progress_.
157 GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) +
158 value.GetNumCallsInProgressDelta() >=
159 0);
160 num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
161 }
162
Suspend()163 void PerBalancerStore::Suspend() {
164 suspended_ = true;
165 load_record_map_.clear();
166 gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
167 }
168
Resume()169 void PerBalancerStore::Resume() {
170 suspended_ = false;
171 gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
172 }
173
GetNumCallsInProgressForReport()174 uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
175 GPR_ASSERT(!suspended_);
176 last_reported_num_calls_in_progress_ = num_calls_in_progress_;
177 return num_calls_in_progress_;
178 }
179
ReportStreamCreated(const grpc::string & lb_id,const grpc::string & load_key)180 void PerHostStore::ReportStreamCreated(const grpc::string& lb_id,
181 const grpc::string& load_key) {
182 GPR_ASSERT(lb_id != kInvalidLbId);
183 SetUpForNewLbId(lb_id, load_key);
184 // Prior to this one, there was no load balancer receiving report, so we may
185 // have unassigned orphaned stores to assign to this new balancer.
186 // TODO(juanlishen): If the load key of this new stream is the same with
187 // some previously adopted orphan store, we may want to take the orphan to
188 // this stream. Need to discuss with LB team.
189 if (assigned_stores_.size() == 1) {
190 for (const auto& p : per_balancer_stores_) {
191 const grpc::string& other_lb_id = p.first;
192 const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
193 if (other_lb_id != lb_id) {
194 orphaned_store->Resume();
195 AssignOrphanedStore(orphaned_store.get(), lb_id);
196 }
197 }
198 }
199 // The first connected balancer will adopt the kInvalidLbId.
200 if (per_balancer_stores_.size() == 1) {
201 SetUpForNewLbId(kInvalidLbId, "");
202 ReportStreamClosed(kInvalidLbId);
203 }
204 }
205
ReportStreamClosed(const grpc::string & lb_id)206 void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) {
207 auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
208 GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
209 // Remove this closed stream from our records.
210 GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
211 load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
212 lb_id));
213 std::set<PerBalancerStore*> orphaned_stores =
214 UnorderedMapOfSetExtract(assigned_stores_, lb_id);
215 // The stores that were assigned to this balancer are orphaned now. They
216 // should be re-assigned to other balancers which are still receiving reports.
217 for (PerBalancerStore* orphaned_store : orphaned_stores) {
218 const grpc::string* new_receiver = nullptr;
219 auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
220 if (it != load_key_to_receiving_lb_ids_.end()) {
221 // First, try to pick from the active balancers with the same load key.
222 new_receiver = RandomElement(it->second);
223 } else if (!assigned_stores_.empty()) {
224 // If failed, pick from all the remaining active balancers.
225 new_receiver = &(RandomElement(assigned_stores_)->first);
226 }
227 if (new_receiver != nullptr) {
228 AssignOrphanedStore(orphaned_store, *new_receiver);
229 } else {
230 // Load data for an LB ID that can't be assigned to any stream should
231 // be dropped.
232 orphaned_store->Suspend();
233 }
234 }
235 }
236
FindPerBalancerStore(const grpc::string & lb_id) const237 PerBalancerStore* PerHostStore::FindPerBalancerStore(
238 const grpc::string& lb_id) const {
239 return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
240 ? per_balancer_stores_.find(lb_id)->second.get()
241 : nullptr;
242 }
243
GetAssignedStores(const grpc::string & lb_id) const244 const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
245 const grpc::string& lb_id) const {
246 auto it = assigned_stores_.find(lb_id);
247 if (it == assigned_stores_.end()) return nullptr;
248 return &(it->second);
249 }
250
AssignOrphanedStore(PerBalancerStore * orphaned_store,const grpc::string & new_receiver)251 void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
252 const grpc::string& new_receiver) {
253 auto it = assigned_stores_.find(new_receiver);
254 GPR_ASSERT(it != assigned_stores_.end());
255 it->second.insert(orphaned_store);
256 gpr_log(GPR_INFO,
257 "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
258 " ID of %s to new receiver %s",
259 this, orphaned_store, orphaned_store->lb_id().c_str(),
260 new_receiver.c_str());
261 }
262
SetUpForNewLbId(const grpc::string & lb_id,const grpc::string & load_key)263 void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id,
264 const grpc::string& load_key) {
265 // The top-level caller (i.e., LoadReportService) should guarantee the
266 // lb_id is unique for each reporting stream.
267 GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
268 GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
269 load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
270 std::unique_ptr<PerBalancerStore> per_balancer_store(
271 new PerBalancerStore(lb_id, load_key));
272 assigned_stores_[lb_id] = {per_balancer_store.get()};
273 per_balancer_stores_[lb_id] = std::move(per_balancer_store);
274 }
275
FindPerBalancerStore(const string & hostname,const string & lb_id) const276 PerBalancerStore* LoadDataStore::FindPerBalancerStore(
277 const string& hostname, const string& lb_id) const {
278 auto it = per_host_stores_.find(hostname);
279 if (it != per_host_stores_.end()) {
280 const PerHostStore& per_host_store = it->second;
281 return per_host_store.FindPerBalancerStore(lb_id);
282 } else {
283 return nullptr;
284 }
285 }
286
MergeRow(const grpc::string & hostname,const LoadRecordKey & key,const LoadRecordValue & value)287 void LoadDataStore::MergeRow(const grpc::string& hostname,
288 const LoadRecordKey& key,
289 const LoadRecordValue& value) {
290 PerBalancerStore* per_balancer_store =
291 FindPerBalancerStore(hostname, key.lb_id());
292 if (per_balancer_store != nullptr) {
293 per_balancer_store->MergeRow(key, value);
294 return;
295 }
296 // Unknown LB ID. Track it until its number of in-progress calls drops to
297 // zero.
298 int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
299 if (in_progress_delta != 0) {
300 auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
301 if (it_tracker == unknown_balancer_id_trackers_.end()) {
302 gpr_log(
303 GPR_DEBUG,
304 "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
305 this, key.lb_id().c_str());
306 unknown_balancer_id_trackers_.insert(
307 {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
308 } else if ((it_tracker->second += in_progress_delta) == 0) {
309 unknown_balancer_id_trackers_.erase(it_tracker);
310 gpr_log(GPR_DEBUG,
311 "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
312 this, key.lb_id().c_str());
313 }
314 }
315 }
316
GetAssignedStores(const grpc::string & hostname,const grpc::string & lb_id)317 const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
318 const grpc::string& hostname, const grpc::string& lb_id) {
319 auto it = per_host_stores_.find(hostname);
320 if (it == per_host_stores_.end()) return nullptr;
321 return it->second.GetAssignedStores(lb_id);
322 }
323
ReportStreamCreated(const grpc::string & hostname,const grpc::string & lb_id,const grpc::string & load_key)324 void LoadDataStore::ReportStreamCreated(const grpc::string& hostname,
325 const grpc::string& lb_id,
326 const grpc::string& load_key) {
327 per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
328 }
329
ReportStreamClosed(const grpc::string & hostname,const grpc::string & lb_id)330 void LoadDataStore::ReportStreamClosed(const grpc::string& hostname,
331 const grpc::string& lb_id) {
332 auto it_per_host_store = per_host_stores_.find(hostname);
333 GPR_ASSERT(it_per_host_store != per_host_stores_.end());
334 it_per_host_store->second.ReportStreamClosed(lb_id);
335 }
336
337 } // namespace load_reporter
338 } // namespace grpc
339