1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/impl/codegen/port_platform.h>
20 
21 #include <set>
22 #include <vector>
23 
24 #include <grpc/grpc.h>
25 #include <gtest/gtest.h>
26 
27 #include "src/cpp/server/load_reporter/load_data_store.h"
28 #include "test/core/util/port.h"
29 #include "test/core/util/test_config.h"
30 
31 namespace grpc {
32 namespace testing {
33 namespace {
34 
35 using ::grpc::load_reporter::CallMetricValue;
36 using ::grpc::load_reporter::LoadDataStore;
37 using ::grpc::load_reporter::LoadRecordKey;
38 using ::grpc::load_reporter::LoadRecordValue;
39 using ::grpc::load_reporter::PerBalancerStore;
40 using ::grpc::load_reporter::kInvalidLbId;
41 
42 class LoadDataStoreTest : public ::testing::Test {
43  public:
LoadDataStoreTest()44   LoadDataStoreTest()
45       : kKey1(kLbId1, kLbTag1, kUser1, kClientIp1),
46         kKey2(kLbId2, kLbTag2, kUser2, kClientIp2) {}
47 
48   // Check whether per_balancer_stores contains a store which was originally
49   // created for <hostname, lb_id, and load_key>.
PerBalancerStoresContains(const LoadDataStore & load_data_store,const std::set<PerBalancerStore * > * per_balancer_stores,const grpc::string & hostname,const grpc::string & lb_id,const grpc::string & load_key)50   bool PerBalancerStoresContains(
51       const LoadDataStore& load_data_store,
52       const std::set<PerBalancerStore*>* per_balancer_stores,
53       const grpc::string& hostname, const grpc::string& lb_id,
54       const grpc::string& load_key) {
55     auto original_per_balancer_store =
56         load_data_store.FindPerBalancerStore(hostname, lb_id);
57     EXPECT_NE(original_per_balancer_store, nullptr);
58     EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id);
59     EXPECT_EQ(original_per_balancer_store->load_key(), load_key);
60     for (auto per_balancer_store : *per_balancer_stores) {
61       if (per_balancer_store == original_per_balancer_store) {
62         return true;
63       }
64     }
65     return false;
66   }
67 
FormatLbId(size_t index)68   grpc::string FormatLbId(size_t index) {
69     return "kLbId" + std::to_string(index);
70   }
71 
72   const grpc::string kHostname1 = "kHostname1";
73   const grpc::string kHostname2 = "kHostname2";
74   const grpc::string kLbId1 = "kLbId1";
75   const grpc::string kLbId2 = "kLbId2";
76   const grpc::string kLbId3 = "kLbId3";
77   const grpc::string kLbId4 = "kLbId4";
78   const grpc::string kLoadKey1 = "kLoadKey1";
79   const grpc::string kLoadKey2 = "kLoadKey2";
80   const grpc::string kLbTag1 = "kLbTag1";
81   const grpc::string kLbTag2 = "kLbTag2";
82   const grpc::string kUser1 = "kUser1";
83   const grpc::string kUser2 = "kUser2";
84   const grpc::string kClientIp1 = "00";
85   const grpc::string kClientIp2 = "02";
86   const grpc::string kMetric1 = "kMetric1";
87   const grpc::string kMetric2 = "kMetric2";
88   const LoadRecordKey kKey1;
89   const LoadRecordKey kKey2;
90 };
91 
92 using PerBalancerStoreTest = LoadDataStoreTest;
93 
TEST_F(LoadDataStoreTest,AssignToSelf)94 TEST_F(LoadDataStoreTest, AssignToSelf) {
95   LoadDataStore load_data_store;
96   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
97   auto assigned_stores = load_data_store.GetAssignedStores(kHostname1, kLbId1);
98   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores,
99                                         kHostname1, kLbId1, kLoadKey1));
100 }
101 
TEST_F(LoadDataStoreTest,ReassignOrphanStores)102 TEST_F(LoadDataStoreTest, ReassignOrphanStores) {
103   LoadDataStore load_data_store;
104   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
105   load_data_store.ReportStreamCreated(kHostname1, kLbId2, kLoadKey1);
106   load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
107   load_data_store.ReportStreamCreated(kHostname2, kLbId4, kLoadKey1);
108   // 1. Close the second stream.
109   load_data_store.ReportStreamClosed(kHostname1, kLbId2);
110   auto assigned_to_lb_id_1 =
111       load_data_store.GetAssignedStores(kHostname1, kLbId1);
112   // The orphaned store is re-assigned to kLbId1 with the same load key.
113   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
114                                         kHostname1, kLbId1, kLoadKey1));
115   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
116                                         kHostname1, kLbId2, kLoadKey1));
117   // 2. Close the first stream.
118   load_data_store.ReportStreamClosed(kHostname1, kLbId1);
119   auto assigned_to_lb_id_3 =
120       load_data_store.GetAssignedStores(kHostname1, kLbId3);
121   // The orphaned stores are re-assigned to kLbId3 with the same host,
122   // because there isn't any LB with the same load key.
123   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
124                                         kHostname1, kLbId1, kLoadKey1));
125   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
126                                         kHostname1, kLbId2, kLoadKey1));
127   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
128                                         kHostname1, kLbId3, kLoadKey2));
129   // 3. Close the third stream.
130   load_data_store.ReportStreamClosed(kHostname1, kLbId3);
131   auto assigned_to_lb_id_4 =
132       load_data_store.GetAssignedStores(kHostname2, kLbId4);
133   // There is no active LB for the first host now. kLbId4 is active but
134   // it's for the second host, so it wll NOT adopt the orphaned stores.
135   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
136                                          kHostname1, kLbId1, kLoadKey1));
137   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
138                                          kHostname1, kLbId2, kLoadKey1));
139   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
140                                          kHostname1, kLbId3, kLoadKey2));
141   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
142                                         kHostname2, kLbId4, kLoadKey1));
143 }
144 
TEST_F(LoadDataStoreTest,OrphanAssignmentIsSticky)145 TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) {
146   LoadDataStore load_data_store;
147   std::set<grpc::string> active_lb_ids;
148   size_t num_lb_ids = 1000;
149   for (size_t i = 0; i < num_lb_ids; ++i) {
150     load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
151     active_lb_ids.insert(FormatLbId(i));
152   }
153   grpc::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids);
154   load_data_store.ReportStreamClosed(kHostname1, orphaned_lb_id);
155   active_lb_ids.erase(orphaned_lb_id);
156   // Find which LB is assigned the orphaned store.
157   grpc::string assigned_lb_id = "";
158   for (const auto& lb_id : active_lb_ids) {
159     if (PerBalancerStoresContains(
160             load_data_store,
161             load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
162             orphaned_lb_id, kLoadKey1)) {
163       assigned_lb_id = lb_id;
164       break;
165     }
166   }
167   EXPECT_STRNE(assigned_lb_id.c_str(), "");
168   // Close 10 more stream, skipping the assigned_lb_id. The assignment of
169   // orphaned_lb_id shouldn't change.
170   for (size_t _ = 0; _ < 10; ++_) {
171     grpc::string lb_id_to_close = "";
172     for (const auto& lb_id : active_lb_ids) {
173       if (lb_id != assigned_lb_id) {
174         lb_id_to_close = lb_id;
175         break;
176       }
177     }
178     EXPECT_STRNE(lb_id_to_close.c_str(), "");
179     load_data_store.ReportStreamClosed(kHostname1, lb_id_to_close);
180     active_lb_ids.erase(lb_id_to_close);
181     EXPECT_TRUE(PerBalancerStoresContains(
182         load_data_store,
183         load_data_store.GetAssignedStores(kHostname1, assigned_lb_id),
184         kHostname1, orphaned_lb_id, kLoadKey1));
185   }
186   // Close the assigned_lb_id, orphaned_lb_id will be re-assigned again.
187   load_data_store.ReportStreamClosed(kHostname1, assigned_lb_id);
188   active_lb_ids.erase(assigned_lb_id);
189   size_t orphaned_lb_id_occurences = 0;
190   for (const auto& lb_id : active_lb_ids) {
191     if (PerBalancerStoresContains(
192             load_data_store,
193             load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
194             orphaned_lb_id, kLoadKey1)) {
195       orphaned_lb_id_occurences++;
196     }
197   }
198   EXPECT_EQ(orphaned_lb_id_occurences, 1U);
199 }
200 
TEST_F(LoadDataStoreTest,HostTemporarilyLoseAllStreams)201 TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) {
202   LoadDataStore load_data_store;
203   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
204   load_data_store.ReportStreamCreated(kHostname2, kLbId2, kLoadKey1);
205   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
206   auto store_invalid_lb_id_1 =
207       load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
208   EXPECT_FALSE(store_lb_id_1->IsSuspended());
209   EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
210   // Disconnect all the streams of the first host.
211   load_data_store.ReportStreamClosed(kHostname1, kLbId1);
212   // All the streams of that host are suspended.
213   EXPECT_TRUE(store_lb_id_1->IsSuspended());
214   EXPECT_TRUE(store_invalid_lb_id_1->IsSuspended());
215   // Detailed load data won't be kept when the PerBalancerStore is suspended.
216   store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
217   store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
218   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U);
219   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U);
220   // The stores for different hosts won't mix, even if the load key is the same.
221   auto assigned_to_lb_id_2 =
222       load_data_store.GetAssignedStores(kHostname2, kLbId2);
223   EXPECT_EQ(assigned_to_lb_id_2->size(), 2U);
224   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
225                                         kHostname2, kLbId2, kLoadKey1));
226   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
227                                         kHostname2, kInvalidLbId, ""));
228   // A new stream is created for the first host.
229   load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
230   // The stores for the first host are resumed.
231   EXPECT_FALSE(store_lb_id_1->IsSuspended());
232   EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
233   store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
234   store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
235   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
236   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U);
237   // The resumed stores are assigned to the new LB.
238   auto assigned_to_lb_id_3 =
239       load_data_store.GetAssignedStores(kHostname1, kLbId3);
240   EXPECT_EQ(assigned_to_lb_id_3->size(), 3U);
241   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
242                                         kHostname1, kLbId1, kLoadKey1));
243   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
244                                         kHostname1, kInvalidLbId, ""));
245   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
246                                         kHostname1, kLbId3, kLoadKey2));
247 }
248 
TEST_F(LoadDataStoreTest,OneStorePerLbId)249 TEST_F(LoadDataStoreTest, OneStorePerLbId) {
250   LoadDataStore load_data_store;
251   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId1), nullptr);
252   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId),
253             nullptr);
254   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
255   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
256   // Create The first stream.
257   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
258   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
259   auto store_invalid_lb_id_1 =
260       load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
261   // Two stores will be created: one is for the stream; the other one is for
262   // kInvalidLbId.
263   EXPECT_NE(store_lb_id_1, nullptr);
264   EXPECT_NE(store_invalid_lb_id_1, nullptr);
265   EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1);
266   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
267   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
268   // Create the second stream.
269   load_data_store.ReportStreamCreated(kHostname2, kLbId3, kLoadKey1);
270   auto store_lb_id_3 = load_data_store.FindPerBalancerStore(kHostname2, kLbId3);
271   auto store_invalid_lb_id_2 =
272       load_data_store.FindPerBalancerStore(kHostname2, kInvalidLbId);
273   EXPECT_NE(store_lb_id_3, nullptr);
274   EXPECT_NE(store_invalid_lb_id_2, nullptr);
275   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2);
276   // The PerBalancerStores created for different hosts are independent.
277   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1);
278   EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1);
279   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
280 }
281 
TEST_F(LoadDataStoreTest,ExactlyOnceAssignment)282 TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) {
283   LoadDataStore load_data_store;
284   size_t num_create = 100;
285   size_t num_close = 50;
286   for (size_t i = 0; i < num_create; ++i) {
287     load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
288   }
289   for (size_t i = 0; i < num_close; ++i) {
290     load_data_store.ReportStreamClosed(kHostname1, FormatLbId(i));
291   }
292   std::set<grpc::string> reported_lb_ids;
293   for (size_t i = num_close; i < num_create; ++i) {
294     for (auto assigned_store :
295          *load_data_store.GetAssignedStores(kHostname1, FormatLbId(i))) {
296       EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second);
297     }
298   }
299   // Add one for kInvalidLbId.
300   EXPECT_EQ(reported_lb_ids.size(), (num_create + 1));
301   EXPECT_NE(reported_lb_ids.find(kInvalidLbId), reported_lb_ids.end());
302 }
303 
TEST_F(LoadDataStoreTest,UnknownBalancerIdTracking)304 TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) {
305   LoadDataStore load_data_store;
306   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
307   // Merge data for a known LB ID.
308   LoadRecordValue v1(192);
309   load_data_store.MergeRow(kHostname1, kKey1, v1);
310   // Merge data for unknown LB ID.
311   LoadRecordValue v2(23);
312   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
313   load_data_store.MergeRow(
314       kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v2);
315   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
316   LoadRecordValue v3(952);
317   load_data_store.MergeRow(
318       kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v3);
319   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
320   // The data kept for a known LB ID is correct.
321   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
322   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
323   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
324             v1.start_count());
325   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count());
326   // No PerBalancerStore created for Unknown LB ID.
327   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId2), nullptr);
328   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
329   // End all the started RPCs for kLbId1.
330   LoadRecordValue v4(0, v1.start_count());
331   load_data_store.MergeRow(kHostname1, kKey1, v4);
332   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
333   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
334             v1.start_count());
335   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.ok_count(),
336             v4.ok_count());
337   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U);
338   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId1));
339   // End all the started RPCs for kLbId2.
340   LoadRecordValue v5(0, v2.start_count());
341   load_data_store.MergeRow(
342       kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v5);
343   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
344   // End some of the started RPCs for kLbId3.
345   LoadRecordValue v6(0, v3.start_count() / 2);
346   load_data_store.MergeRow(
347       kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v6);
348   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
349 }
350 
TEST_F(PerBalancerStoreTest,Suspend)351 TEST_F(PerBalancerStoreTest, Suspend) {
352   PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
353   EXPECT_FALSE(per_balancer_store.IsSuspended());
354   // Suspend the store.
355   per_balancer_store.Suspend();
356   EXPECT_TRUE(per_balancer_store.IsSuspended());
357   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
358   // Data merged when the store is suspended won't be kept.
359   LoadRecordValue v1(139, 19);
360   per_balancer_store.MergeRow(kKey1, v1);
361   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
362   // Resume the store.
363   per_balancer_store.Resume();
364   EXPECT_FALSE(per_balancer_store.IsSuspended());
365   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
366   // Data merged after the store is resumed will be kept.
367   LoadRecordValue v2(23, 0, 51);
368   per_balancer_store.MergeRow(kKey1, v2);
369   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
370   // Suspend the store.
371   per_balancer_store.Suspend();
372   EXPECT_TRUE(per_balancer_store.IsSuspended());
373   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
374   // Data merged when the store is suspended won't be kept.
375   LoadRecordValue v3(62, 11);
376   per_balancer_store.MergeRow(kKey1, v3);
377   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
378   // Resume the store.
379   per_balancer_store.Resume();
380   EXPECT_FALSE(per_balancer_store.IsSuspended());
381   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
382   // Data merged after the store is resumed will be kept.
383   LoadRecordValue v4(225, 98);
384   per_balancer_store.MergeRow(kKey1, v4);
385   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
386   // In-progress count is always kept.
387   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
388             v1.start_count() - v1.ok_count() + v2.start_count() -
389                 v2.error_count() + v3.start_count() - v3.ok_count() +
390                 v4.start_count() - v4.ok_count());
391 }
392 
TEST_F(PerBalancerStoreTest,DataAggregation)393 TEST_F(PerBalancerStoreTest, DataAggregation) {
394   PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
395   // Construct some Values.
396   LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
397   v1.InsertCallMetric(kMetric1, CallMetricValue(3, 2773.2));
398   LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
399   v2.InsertCallMetric(kMetric1, CallMetricValue(7, 25.234));
400   v2.InsertCallMetric(kMetric2, CallMetricValue(2, 387.08));
401   // v3 doesn't change the number of in-progress RPCs.
402   LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772);
403   v3.InsertCallMetric(kMetric1, CallMetricValue(61, 3465.0));
404   v3.InsertCallMetric(kMetric2, CallMetricValue(13, 672.0));
405   // The initial state of the store.
406   uint64_t num_calls_in_progress = 0;
407   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
408   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
409             num_calls_in_progress);
410   // Merge v1 and get report of the number of in-progress calls.
411   per_balancer_store.MergeRow(kKey1, v1);
412   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
413   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
414             num_calls_in_progress +=
415             (v1.start_count() - v1.ok_count() - v1.error_count()));
416   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
417   // Merge v2 and get report of the number of in-progress calls.
418   per_balancer_store.MergeRow(kKey2, v2);
419   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
420   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
421             num_calls_in_progress +=
422             (v2.start_count() - v2.ok_count() - v2.error_count()));
423   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
424   // Merge v3 and get report of the number of in-progress calls.
425   per_balancer_store.MergeRow(kKey1, v3);
426   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
427   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
428             num_calls_in_progress);
429   // LoadRecordValue for kKey1 is aggregated correctly.
430   LoadRecordValue value_for_key1 =
431       per_balancer_store.load_record_map().find(kKey1)->second;
432   EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count());
433   EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count());
434   EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count());
435   EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent());
436   EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv());
437   EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms());
438   EXPECT_EQ(value_for_key1.call_metrics().size(), 2U);
439   EXPECT_EQ(value_for_key1.call_metrics().find(kMetric1)->second.num_calls(),
440             v1.call_metrics().find(kMetric1)->second.num_calls() +
441                 v3.call_metrics().find(kMetric1)->second.num_calls());
442   EXPECT_EQ(
443       value_for_key1.call_metrics().find(kMetric1)->second.total_metric_value(),
444       v1.call_metrics().find(kMetric1)->second.total_metric_value() +
445           v3.call_metrics().find(kMetric1)->second.total_metric_value());
446   EXPECT_EQ(value_for_key1.call_metrics().find(kMetric2)->second.num_calls(),
447             v3.call_metrics().find(kMetric2)->second.num_calls());
448   EXPECT_EQ(
449       value_for_key1.call_metrics().find(kMetric2)->second.total_metric_value(),
450       v3.call_metrics().find(kMetric2)->second.total_metric_value());
451   // LoadRecordValue for kKey2 is aggregated (trivially) correctly.
452   LoadRecordValue value_for_key2 =
453       per_balancer_store.load_record_map().find(kKey2)->second;
454   EXPECT_EQ(value_for_key2.start_count(), v2.start_count());
455   EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count());
456   EXPECT_EQ(value_for_key2.error_count(), v2.error_count());
457   EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent());
458   EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv());
459   EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms());
460   EXPECT_EQ(value_for_key2.call_metrics().size(), 2U);
461   EXPECT_EQ(value_for_key2.call_metrics().find(kMetric1)->second.num_calls(),
462             v2.call_metrics().find(kMetric1)->second.num_calls());
463   EXPECT_EQ(
464       value_for_key2.call_metrics().find(kMetric1)->second.total_metric_value(),
465       v2.call_metrics().find(kMetric1)->second.total_metric_value());
466   EXPECT_EQ(value_for_key2.call_metrics().find(kMetric2)->second.num_calls(),
467             v2.call_metrics().find(kMetric2)->second.num_calls());
468   EXPECT_EQ(
469       value_for_key2.call_metrics().find(kMetric2)->second.total_metric_value(),
470       v2.call_metrics().find(kMetric2)->second.total_metric_value());
471 }
472 
473 }  // namespace
474 }  // namespace testing
475 }  // namespace grpc
476 
main(int argc,char ** argv)477 int main(int argc, char** argv) {
478   grpc_test_init(argc, argv);
479   ::testing::InitGoogleTest(&argc, argv);
480   return RUN_ALL_TESTS();
481 }
482