1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "sampler.h"
17
18 #include <cmath>
19 #include <cstdint>
20 #include <type_traits>
21 #include <vector>
22
23 #include "compactor_stack.h"
24 #include "gmock/gmock.h"
25
26 namespace dist_proc {
27 namespace aggregation {
28 namespace internal {
29
30 namespace {
31
32 using ::testing::_;
33 using ::testing::AnyOf;
34 using ::testing::Contains;
35 using ::testing::Eq;
36 using ::testing::Optional;
37 using ::testing::Pair;
38
39 class KllQuantileSamplerTest : public ::testing::Test {
40 protected:
KllQuantileSamplerTest()41 KllQuantileSamplerTest() : random_() {
42 }
43 MTRandomGenerator random_;
44 };
45
TEST_F(KllQuantileSamplerTest,Add100Items)46 TEST_F(KllQuantileSamplerTest, Add100Items) {
47 CompactorStack compactor_stack(1000, 100000, &random_);
48 KllSampler sampler(&compactor_stack);
49 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
50
51 sampler.Add(4);
52 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(4), Eq(1))));
53 sampler.Add(10);
54 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
55
56 for (int i = 0; i < 100; i++) {
57 sampler.Reset();
58 sampler.DoubleCapacity();
59 sampler.Add(4);
60 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(4), Eq(1))));
61 sampler.Add(10);
62 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(4), Eq(10)), Eq(2))));
63 sampler.Add(14);
64 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(_, Eq(3))));
65 sampler.Add(24);
66 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
67 }
68 }
69
TEST_F(KllQuantileSamplerTest,ZeroItems)70 TEST_F(KllQuantileSamplerTest, ZeroItems) {
71 CompactorStack compactor_stack(1000, 100000, &random_);
72 KllSampler sampler(&compactor_stack);
73 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
74 }
75
TEST_F(KllQuantileSamplerTest,OneItem)76 TEST_F(KllQuantileSamplerTest, OneItem) {
77 CompactorStack compactor_stack(1000, 100000, &random_);
78 KllSampler sampler(&compactor_stack);
79 sampler.Add(4);
80 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(4), Eq(1))));
81 }
82
TEST_F(KllQuantileSamplerTest,TwoInts)83 TEST_F(KllQuantileSamplerTest, TwoInts) {
84 CompactorStack compactor_stack(1000, 100000, &random_);
85 KllSampler sampler(&compactor_stack);
86 EXPECT_EQ(sampler.capacity(), 2);
87 sampler.Add(1);
88 sampler.Add(2);
89
90 const std::vector<int64_t>& compactor =
91 compactor_stack.compactors()[sampler.num_replaced_levels()];
92
93 EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2))));
94 EXPECT_EQ(compactor_stack.num_stored_items(), 1);
95 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
96 }
97
TEST_F(KllQuantileSamplerTest,TwoIntsCapacityFour)98 TEST_F(KllQuantileSamplerTest, TwoIntsCapacityFour) {
99 CompactorStack compactor_stack(1000, 100000, &random_);
100 KllSampler sampler(&compactor_stack);
101 sampler.DoubleCapacity();
102 EXPECT_EQ(sampler.capacity(), 4);
103 EXPECT_EQ(sampler.num_replaced_levels(), 2);
104
105 sampler.Add(1);
106 sampler.Add(2);
107
108 EXPECT_EQ(compactor_stack.num_stored_items(), 0);
109 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(1), Eq(2)), Eq(2))));
110 }
111
TEST_F(KllQuantileSamplerTest,FourInts)112 TEST_F(KllQuantileSamplerTest, FourInts) {
113 CompactorStack compactor_stack(1000, 100000, &random_);
114 KllSampler sampler(&compactor_stack);
115 EXPECT_EQ(sampler.capacity(), 2);
116 sampler.Add(1);
117 sampler.Add(2);
118 sampler.Add(3);
119 sampler.Add(4);
120
121 const std::vector<int64_t>& compactor =
122 compactor_stack.compactors()[sampler.num_replaced_levels()];
123 EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2))));
124 EXPECT_THAT(compactor, AnyOf(Contains(Eq(3)), Contains(Eq(4))));
125
126 EXPECT_EQ(compactor_stack.num_stored_items(), 2);
127 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
128 }
129
TEST_F(KllQuantileSamplerTest,ThreeInts)130 TEST_F(KllQuantileSamplerTest, ThreeInts) {
131 CompactorStack compactor_stack(1000, 100000, &random_);
132 KllSampler sampler(&compactor_stack);
133 EXPECT_EQ(sampler.capacity(), 2);
134
135 sampler.Add(1);
136 sampler.Add(2);
137 sampler.Add(3);
138
139 const std::vector<int64_t>& compactor =
140 compactor_stack.compactors()[sampler.num_replaced_levels()];
141 EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2))));
142 EXPECT_EQ(compactor_stack.num_stored_items(), 1);
143 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(3), Eq(1))));
144 }
145
TEST_F(KllQuantileSamplerTest,AddWithWeightZero)146 TEST_F(KllQuantileSamplerTest, AddWithWeightZero) {
147 CompactorStack compactor_stack(1000, 100000, &random_);
148 KllSampler sampler(&compactor_stack);
149 sampler.AddWithWeight(5, 0);
150 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
151 }
152
TEST_F(KllQuantileSamplerTest,AddWithWeightOne)153 TEST_F(KllQuantileSamplerTest, AddWithWeightOne) {
154 CompactorStack compactor_stack(1000, 100000, &random_);
155 KllSampler sampler(&compactor_stack);
156 sampler.AddWithWeight(5, 1);
157 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(5), Eq(1))));
158 }
159
TEST_F(KllQuantileSamplerTest,AddWithWeightTwo)160 TEST_F(KllQuantileSamplerTest, AddWithWeightTwo) {
161 CompactorStack compactor_stack(1000, 100000, &random_);
162 KllSampler sampler(&compactor_stack);
163 sampler.AddWithWeight(1, 2);
164
165 const std::vector<int64_t>& compactor =
166 compactor_stack.compactors()[sampler.num_replaced_levels()];
167 EXPECT_THAT(compactor, Contains(Eq(1)));
168 EXPECT_EQ(compactor_stack.num_stored_items(), 1);
169 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
170 }
171
TEST_F(KllQuantileSamplerTest,AddWithWeightThree)172 TEST_F(KllQuantileSamplerTest, AddWithWeightThree) {
173 CompactorStack compactor_stack(1000, 100000, &random_);
174 KllSampler sampler(&compactor_stack);
175 sampler.AddWithWeight(3, 3);
176
177 const std::vector<int64_t>& compactor =
178 compactor_stack.compactors()[sampler.num_replaced_levels()];
179 EXPECT_THAT(compactor, Contains(Eq(3)));
180 EXPECT_EQ(compactor_stack.num_stored_items(), 1);
181 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(3), Eq(1))));
182 }
183
TEST_F(KllQuantileSamplerTest,WeightThreeNonEmptySampler)184 TEST_F(KllQuantileSamplerTest, WeightThreeNonEmptySampler) {
185 CompactorStack compactor_stack(1000, 100000, &random_);
186 KllSampler sampler(&compactor_stack);
187 sampler.Add(1);
188 sampler.DoubleCapacity();
189 sampler.DoubleCapacity();
190 sampler.AddWithWeight(2, 3);
191
192 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(1), Eq(2)), Eq(4))));
193 }
194
TEST_F(KllQuantileSamplerTest,WeightFiveNonEmptySampler)195 TEST_F(KllQuantileSamplerTest, WeightFiveNonEmptySampler) {
196 CompactorStack compactor_stack(1000, 100000, &random_);
197 KllSampler sampler(&compactor_stack);
198 sampler.Add(1);
199 sampler.DoubleCapacity();
200 sampler.Add(2);
201 sampler.AddWithWeight(3, 5);
202
203 const std::vector<int64_t>& compactor =
204 compactor_stack.compactors()[sampler.num_replaced_levels()];
205 EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2)), Contains(Eq(3))));
206 EXPECT_EQ(compactor_stack.num_stored_items(), 1);
207 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(3), Eq(3))));
208 }
209
TEST_F(KllQuantileSamplerTest,DoubleCapacityBetweenAdds)210 TEST_F(KllQuantileSamplerTest, DoubleCapacityBetweenAdds) {
211 CompactorStack compactor_stack(1000, 100000, &random_);
212 KllSampler sampler(&compactor_stack);
213 sampler.Add(1);
214 sampler.DoubleCapacity();
215 EXPECT_EQ(sampler.capacity(), 4);
216 EXPECT_EQ(sampler.num_replaced_levels(), 2);
217 sampler.Add(2);
218 sampler.Add(3);
219 EXPECT_THAT(sampler.sampled_item_and_weight(),
220 Optional(Pair(AnyOf(Eq((1)), Eq((2)), Eq((3))), Eq(3))));
221 sampler.DoubleCapacity();
222 EXPECT_EQ(sampler.capacity(), 8);
223 EXPECT_EQ(sampler.num_replaced_levels(), 3);
224 sampler.Add(4);
225 sampler.Add(5);
226 EXPECT_THAT(sampler.sampled_item_and_weight(),
227 Optional(Pair(AnyOf(Eq((1)), Eq((2)), Eq((3)), Eq((4)), Eq((5))), Eq(5))));
228 }
229
TEST_F(KllQuantileSamplerTest,ResetZeroItems)230 TEST_F(KllQuantileSamplerTest, ResetZeroItems) {
231 CompactorStack compactor_stack(1000, 100000, &random_);
232 KllSampler sampler(&compactor_stack);
233 sampler.Reset();
234 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
235 }
236
TEST_F(KllQuantileSamplerTest,ResetBetweenAddingOneItem)237 TEST_F(KllQuantileSamplerTest, ResetBetweenAddingOneItem) {
238 CompactorStack compactor_stack(1000, 100000, &random_);
239 KllSampler sampler(&compactor_stack);
240 sampler.Add(1);
241 sampler.Reset();
242 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
243 sampler.Add(2);
244 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(2), Eq(1))));
245 }
246
TEST_F(KllQuantileSamplerTest,ResetBetweenAddingTenItems)247 TEST_F(KllQuantileSamplerTest, ResetBetweenAddingTenItems) {
248 CompactorStack compactor_stack(1000, 100000, &random_);
249 KllSampler sampler(&compactor_stack);
250 sampler.DoubleCapacity();
251 EXPECT_EQ(sampler.capacity(), 4);
252 EXPECT_EQ(sampler.num_replaced_levels(), 2);
253
254 for (int i = 0; i < 10; i++) {
255 sampler.Add(i);
256 }
257 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(_, Eq(2))));
258
259 sampler.Reset();
260 EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
261 EXPECT_EQ(sampler.capacity(), 2);
262 EXPECT_EQ(sampler.num_replaced_levels(), 1);
263 sampler.DoubleCapacity();
264 EXPECT_EQ(sampler.capacity(), 4);
265 EXPECT_EQ(sampler.num_replaced_levels(), 2);
266 for (int i = 0; i < 10; i++) {
267 sampler.Add(i);
268 }
269 EXPECT_EQ(compactor_stack.num_stored_items(), 4);
270 EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(8), Eq(9)), Eq(2))));
271 }
272
273 class SamplerItemSeenTest : public ::testing::TestWithParam<int> {};
274
275 // Whenever making a larger change to the sampler remove the fixed seed for
276 // this test. You should see around 2% failure rate (among all num_item sizes)
277 // in this test when running it with a sufficiently large --num_test_runs.
278 //
279 // The test checks whether we generate all items from a set of [0, n)
280 // items in 2 * n * log(n) runs. The expected value for number of repetitions
281 // required is n * H_n (see coupon collector problem
282 // https://en.wikipedia.org/wiki/Coupon_collector%27s_problem).
283 //
284 // For a given n the probability that we see all items is:
285 // Stirling_2nd(runs, n) * n! / n^runs (n^runs is the total number
286 // of possible outcomes, Stirling 2nd is the number of possibilities to
287 // partition run elements into n non-empty buckets regardless of order and n!
288 // is needed to introduce the ordering).)
TEST_P(SamplerItemSeenTest,AllItemsSeen)289 TEST_P(SamplerItemSeenTest, AllItemsSeen) {
290 const int num_items = GetParam();
291 auto random = MTRandomGenerator(10);
292 bool seen_items[1000] = {};
293 int num_repetitions = 2 * num_items * std::ceil(std::log(1 + num_items));
294 for (int i = 0; i < num_repetitions; i++) {
295 CompactorStack compactor_stack(1000, 100000, &random);
296 KllSampler sampler(&compactor_stack);
297 while (sampler.capacity() <= num_items) {
298 sampler.DoubleCapacity();
299 }
300 for (int j = 0; j < num_items; j++) {
301 sampler.Add(j);
302 }
303 seen_items[sampler.sampled_item_and_weight().value().first] = true;
304 EXPECT_EQ(sampler.sampled_item_and_weight().value().second, num_items);
305 }
306
307 for (int i = 0; i < num_items; i++) {
308 EXPECT_TRUE(seen_items[i]);
309 }
310 }
311
312 INSTANTIATE_TEST_SUITE_P(SamplerEveryItemSeenTestCases, SamplerItemSeenTest,
313 testing::Range(1, 1000, 10));
314 } // namespace
315
316 } // namespace internal
317 } // namespace aggregation
318 } // namespace dist_proc
319