1 /* 2 * Copyright 2021 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #include "compactor_stack.h" 17 18 #include <cmath> 19 #include <cstdint> 20 #include <type_traits> 21 #include <vector> 22 23 #include "gmock/gmock.h" 24 #include "random_generator.h" 25 26 namespace dist_proc { 27 namespace aggregation { 28 namespace internal { 29 30 namespace { 31 32 using ::testing::AnyOf; 33 using ::testing::Contains; 34 35 struct CompactorsTestParams { 36 int num_items; 37 int capacity; 38 }; 39 40 std::vector<CompactorsTestParams> GenCompactorsTestParams() { 41 std::vector<CompactorsTestParams> testcases; 42 for (int num_items = 1; num_items < 1000; num_items += std::ceil(std::log(1 + num_items))) { 43 for (int capacity = 2; capacity < 1030; capacity *= 2) { 44 testcases.push_back({num_items, capacity}); 45 } 46 } 47 return testcases; 48 } 49 50 class AddsToCompactorsTest : public ::testing::TestWithParam<CompactorsTestParams> { 51 protected: 52 AddsToCompactorsTest() : random_() { 53 } 54 MTRandomGenerator random_; 55 }; 56 57 TEST_P(AddsToCompactorsTest, AddsToCompactorsTestWeightOne) { 58 // This test should not depend on the seed, since we only test 59 // num_stored_items, capacity and sampler weight. 60 std::random_device rd; 61 std::mt19937 gen(rd()); 62 std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max()); 63 const CompactorsTestParams params = GetParam(); 64 CompactorStack compactor_stack(1000, 100000, &random_); 65 KllSampler sampler(&compactor_stack); 66 while (sampler.capacity() < params.capacity) { 67 sampler.DoubleCapacity(); 68 } 69 for (int i = 0; i < params.num_items; i++) { 70 sampler.Add(dis(gen)); 71 } 72 EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity()); 73 if (sampler.sampled_item_and_weight().has_value()) { 74 EXPECT_EQ(sampler.sampled_item_and_weight().value().second, 75 params.num_items % sampler.capacity()); 76 } else { 77 EXPECT_EQ(0, params.num_items % sampler.capacity()); 78 } 79 } 80 81 TEST_P(AddsToCompactorsTest, AddWithWeightToCompactorsTest) { 82 // This test should not depend on the seed, since we only test 83 // num_stored_items, capacity and sampler weight. 84 std::random_device rd; 85 std::mt19937 gen(rd()); 86 std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max()); 87 const CompactorsTestParams params = GetParam(); 88 CompactorStack compactor_stack(1000, 100000, &random_); 89 KllSampler sampler(&compactor_stack); 90 while (sampler.capacity() < params.capacity) { 91 sampler.DoubleCapacity(); 92 } 93 int remaining_num_items = params.num_items; 94 while (remaining_num_items > 0) { 95 // Add +1 to weight to avoid adding values with 0 weight. 96 int weight = random_.UnbiasedUniform(remaining_num_items) + 1; 97 sampler.AddWithWeight(dis(gen), weight); 98 remaining_num_items -= weight; 99 } 100 EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity()); 101 if (sampler.sampled_item_and_weight().has_value()) { 102 EXPECT_EQ(sampler.sampled_item_and_weight().value().second, 103 params.num_items % sampler.capacity()); 104 } else { 105 EXPECT_EQ(0, params.num_items % sampler.capacity()); 106 } 107 } 108 109 INSTANTIATE_TEST_SUITE_P(AddsToCompactorsTestCases, AddsToCompactorsTest, 110 ::testing::ValuesIn(GenCompactorsTestParams())); 111 class KllQuantileUseSamplerTest : public ::testing::Test { 112 protected: 113 KllQuantileUseSamplerTest() { 114 } 115 ~KllQuantileUseSamplerTest() override { 116 } 117 MTRandomGenerator random_; 118 }; 119 120 TEST_F(KllQuantileUseSamplerTest, ZeroCapacityAfterReplacedWithSampler) { 121 CompactorStack compactor_stack(10, 10, &random_); 122 for (int i = 0; i < 200000; i++) { 123 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 124 } 125 EXPECT_TRUE(compactor_stack.IsSamplerOn()); 126 const auto& compactors = compactor_stack.compactors(); 127 for (int i = 0; i < compactor_stack.lowest_active_level(); i++) { 128 EXPECT_EQ(compactors[i].capacity(), 0u); 129 } 130 } 131 132 TEST_F(KllQuantileUseSamplerTest, NumStoredItemsWithSampledItem) { 133 CompactorStack compactor_stack(10, 10, &random_); 134 for (int i = 0; i < 2000; i++) { 135 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 136 } 137 EXPECT_TRUE(compactor_stack.IsSamplerOn()); 138 139 int num_items_in_compactors = 0; 140 for (const auto& compactor : compactor_stack.compactors()) { 141 if (compactor.capacity() > 0) { 142 num_items_in_compactors += compactor.size(); 143 } 144 } 145 if (compactor_stack.sampled_item_and_weight().has_value()) { 146 EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1); 147 } else { 148 EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors); 149 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 150 EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1); 151 } 152 } 153 154 TEST_F(KllQuantileUseSamplerTest, ResetWithSampler) { 155 // Set a fixed seed for this test, as it is not given that there are 40 items 156 // in the compactor stack after 2000 insertions. 157 MTRandomGenerator random(10); 158 CompactorStack compactor_stack(10, 10, &random); 159 for (int i = 0; i < 2000; i++) { 160 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 161 } 162 EXPECT_TRUE(compactor_stack.IsSamplerOn()); 163 EXPECT_GE(compactor_stack.num_stored_items(), 40); 164 EXPECT_LE(compactor_stack.num_stored_items(), 2000); 165 EXPECT_GE(compactor_stack.compactors().size(), 2u); 166 167 compactor_stack.Reset(); 168 EXPECT_FALSE(compactor_stack.IsSamplerOn()); 169 EXPECT_FALSE(compactor_stack.sampled_item_and_weight().has_value()); 170 EXPECT_EQ(compactor_stack.num_stored_items(), 0); 171 EXPECT_EQ(compactor_stack.compactors().size(), 1u); 172 173 for (int i = 0; i < 2000; i++) { 174 compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 175 } 176 EXPECT_TRUE(compactor_stack.IsSamplerOn()); 177 EXPECT_GE(compactor_stack.num_stored_items(), 40); 178 EXPECT_LE(compactor_stack.num_stored_items(), 2000); 179 EXPECT_GE(compactor_stack.compactors().size(), 2u); 180 } 181 182 struct AddWithSamplerParam { 183 int64_t inv_eps; 184 int64_t inv_delta; 185 int64_t num_items; 186 }; 187 188 class AddWithSamplerTest : public ::testing::TestWithParam<AddWithSamplerParam> {}; 189 190 TEST_P(AddWithSamplerTest, AddWithSampler) { 191 const AddWithSamplerParam params = GetParam(); 192 std::random_device rd; 193 MTRandomGenerator random(rd()); 194 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random); 195 196 int upbound = (random.UnbiasedUniform(3) + 1) * params.num_items + 1; 197 for (int i = 0; i < upbound; i++) { 198 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 199 } 200 EXPECT_TRUE(compactor_stack.IsSamplerOn()); 201 202 int lowest_active_level = compactor_stack.lowest_active_level(); 203 int size_before_add = compactor_stack.compactors()[lowest_active_level].size(); 204 uint64_t previous_sampled_item = -1; 205 if (compactor_stack.sampled_item_and_weight().has_value()) { 206 previous_sampled_item = compactor_stack.sampled_item_and_weight()->first; 207 } 208 // Add additional value sufficiently many times to complete full sampler 209 // cycle. 210 for (int i = 0; i < compactor_stack.sampler_capacity(); i++) { 211 compactor_stack.Add(10); 212 } 213 214 int size_after_add = compactor_stack.compactors()[lowest_active_level].size(); 215 if (size_after_add > 0) { 216 EXPECT_EQ(size_after_add, size_before_add + 1); 217 EXPECT_THAT(compactor_stack.compactors()[lowest_active_level], 218 AnyOf(Contains(10), Contains(previous_sampled_item))); 219 } 220 } 221 222 TEST_P(AddWithSamplerTest, AddWithWeightWithSampler) { 223 // Set a fixed seed, since the tests depends on which level is propagated 224 // in the compactor stack. 225 uint64_t seed = 3; 226 const AddWithSamplerParam params = GetParam(); 227 MTRandomGenerator random = MTRandomGenerator(seed); 228 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random); 229 230 for (int i = 0; i < params.num_items; i++) { 231 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint32_t>::max())); 232 } 233 ASSERT_TRUE(compactor_stack.IsSamplerOn()); 234 if (!compactor_stack.sampled_item_and_weight().has_value()) { 235 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 236 } 237 int lowest_active_level = compactor_stack.lowest_active_level(); 238 int size_before_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size(); 239 int size_before_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size(); 240 int size_before_add_level_plus3 = compactor_stack.compactors()[lowest_active_level + 3].size(); 241 uint64_t previous_sampled_item = -1; 242 if (compactor_stack.sampled_item_and_weight().has_value()) { 243 previous_sampled_item = compactor_stack.sampled_item_and_weight()->first; 244 } 245 246 // Expected additions: one item from the sampler to the lowest active level; 247 // one item to the lowest active level + 2. 248 int weight = 5 * (1 << lowest_active_level) - 1; 249 compactor_stack.AddWithWeight(10.0, weight); 250 251 int size_after_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size(); 252 // > 0 to make sure we are not checking compact case 253 if (size_after_add_lowest_level > 0) { 254 EXPECT_EQ(size_after_add_lowest_level, size_before_add_lowest_level + 1); 255 EXPECT_THAT(compactor_stack.compactors()[lowest_active_level], 256 AnyOf(Contains(10.0), Contains(previous_sampled_item))); 257 } 258 259 // Level+2 is expected to change. 260 // Exact behavior also subject to seed - a lower level might be compacted 261 // into this one, or this one might be compacted up. 262 // Test only checks the uncompacted case. (Different from Java version) 263 int size_after_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size(); 264 if (size_after_add_level_plus2 > 265 // make sure we are not checking comact case 266 size_before_add_level_plus2) { 267 EXPECT_EQ(size_after_add_level_plus2, size_before_add_level_plus2 + 1); 268 EXPECT_THAT(compactor_stack.compactors()[lowest_active_level + 2], Contains(10.0)); 269 int size_after_add_level_plus3 = 270 compactor_stack.compactors()[lowest_active_level + 3].size(); 271 EXPECT_EQ(size_after_add_level_plus3, size_before_add_level_plus3); 272 } 273 } 274 275 TEST_P(AddWithSamplerTest, CompactorsStillBigXorSamplerIsOn) { 276 std::random_device rd; 277 uint64_t seed = rd(); 278 const AddWithSamplerParam params = GetParam(); 279 MTRandomGenerator random = MTRandomGenerator(seed); 280 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random); 281 for (int i = 0; i < params.num_items; i++) { 282 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 283 int lowest_compactor_capacity = compactor_stack.TargetCapacityAtLevel(0); 284 EXPECT_TRUE((lowest_compactor_capacity > 2) ^ (compactor_stack.IsSamplerOn())); 285 } 286 } 287 288 TEST_P(AddWithSamplerTest, SampledItemIsPresent) { 289 std::random_device rd; 290 uint64_t seed = rd(); 291 const AddWithSamplerParam params = GetParam(); 292 MTRandomGenerator random = MTRandomGenerator(seed); 293 CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random); 294 295 for (int i = 0; i < params.num_items; i++) { 296 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 297 } 298 int sampled_item_weight = (compactor_stack.sampled_item_and_weight().has_value()) 299 ? compactor_stack.sampled_item_and_weight().value().second 300 : 0; 301 compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max())); 302 int sampled_item_weight_after_add = 303 (compactor_stack.sampled_item_and_weight().has_value()) 304 ? compactor_stack.sampled_item_and_weight().value().second 305 : 0; 306 sampled_item_weight = std::max(sampled_item_weight, sampled_item_weight_after_add); 307 EXPECT_GT(sampled_item_weight, 0); 308 EXPECT_LE(sampled_item_weight, (1 << compactor_stack.lowest_active_level())); 309 } 310 311 INSTANTIATE_TEST_SUITE_P(AddWithSamplerTestCases, AddWithSamplerTest, 312 ::testing::ValuesIn(std::vector<AddWithSamplerParam>{ 313 {10, 10, 2400}, 314 {10, 100, 4600}, 315 {10, 1000, 5000}, 316 {50, 10000, 5000000}, 317 {100, 100, 1250000}, 318 {100, 1000, 2000000}})); 319 320 } // namespace 321 322 } // namespace internal 323 } // namespace aggregation 324 } // namespace dist_proc 325