1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "sampler.h"
17 
18 #include <cmath>
19 #include <cstdint>
20 #include <type_traits>
21 #include <vector>
22 
23 #include "compactor_stack.h"
24 #include "gmock/gmock.h"
25 
26 namespace dist_proc {
27 namespace aggregation {
28 namespace internal {
29 
30 namespace {
31 
32 using ::testing::_;
33 using ::testing::AnyOf;
34 using ::testing::Contains;
35 using ::testing::Eq;
36 using ::testing::Optional;
37 using ::testing::Pair;
38 
39 class KllQuantileSamplerTest : public ::testing::Test {
40 protected:
KllQuantileSamplerTest()41     KllQuantileSamplerTest() : random_() {
42     }
43     MTRandomGenerator random_;
44 };
45 
TEST_F(KllQuantileSamplerTest,Add100Items)46 TEST_F(KllQuantileSamplerTest, Add100Items) {
47     CompactorStack compactor_stack(1000, 100000, &random_);
48     KllSampler sampler(&compactor_stack);
49     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
50 
51     sampler.Add(4);
52     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(4), Eq(1))));
53     sampler.Add(10);
54     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
55 
56     for (int i = 0; i < 100; i++) {
57         sampler.Reset();
58         sampler.DoubleCapacity();
59         sampler.Add(4);
60         EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(4), Eq(1))));
61         sampler.Add(10);
62         EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(4), Eq(10)), Eq(2))));
63         sampler.Add(14);
64         EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(_, Eq(3))));
65         sampler.Add(24);
66         EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
67     }
68 }
69 
TEST_F(KllQuantileSamplerTest,ZeroItems)70 TEST_F(KllQuantileSamplerTest, ZeroItems) {
71     CompactorStack compactor_stack(1000, 100000, &random_);
72     KllSampler sampler(&compactor_stack);
73     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
74 }
75 
TEST_F(KllQuantileSamplerTest,OneItem)76 TEST_F(KllQuantileSamplerTest, OneItem) {
77     CompactorStack compactor_stack(1000, 100000, &random_);
78     KllSampler sampler(&compactor_stack);
79     sampler.Add(4);
80     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(4), Eq(1))));
81 }
82 
TEST_F(KllQuantileSamplerTest,TwoInts)83 TEST_F(KllQuantileSamplerTest, TwoInts) {
84     CompactorStack compactor_stack(1000, 100000, &random_);
85     KllSampler sampler(&compactor_stack);
86     EXPECT_EQ(sampler.capacity(), 2);
87     sampler.Add(1);
88     sampler.Add(2);
89 
90     const std::vector<int64_t>& compactor =
91             compactor_stack.compactors()[sampler.num_replaced_levels()];
92 
93     EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2))));
94     EXPECT_EQ(compactor_stack.num_stored_items(), 1);
95     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
96 }
97 
TEST_F(KllQuantileSamplerTest,TwoIntsCapacityFour)98 TEST_F(KllQuantileSamplerTest, TwoIntsCapacityFour) {
99     CompactorStack compactor_stack(1000, 100000, &random_);
100     KllSampler sampler(&compactor_stack);
101     sampler.DoubleCapacity();
102     EXPECT_EQ(sampler.capacity(), 4);
103     EXPECT_EQ(sampler.num_replaced_levels(), 2);
104 
105     sampler.Add(1);
106     sampler.Add(2);
107 
108     EXPECT_EQ(compactor_stack.num_stored_items(), 0);
109     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(1), Eq(2)), Eq(2))));
110 }
111 
TEST_F(KllQuantileSamplerTest,FourInts)112 TEST_F(KllQuantileSamplerTest, FourInts) {
113     CompactorStack compactor_stack(1000, 100000, &random_);
114     KllSampler sampler(&compactor_stack);
115     EXPECT_EQ(sampler.capacity(), 2);
116     sampler.Add(1);
117     sampler.Add(2);
118     sampler.Add(3);
119     sampler.Add(4);
120 
121     const std::vector<int64_t>& compactor =
122             compactor_stack.compactors()[sampler.num_replaced_levels()];
123     EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2))));
124     EXPECT_THAT(compactor, AnyOf(Contains(Eq(3)), Contains(Eq(4))));
125 
126     EXPECT_EQ(compactor_stack.num_stored_items(), 2);
127     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
128 }
129 
TEST_F(KllQuantileSamplerTest,ThreeInts)130 TEST_F(KllQuantileSamplerTest, ThreeInts) {
131     CompactorStack compactor_stack(1000, 100000, &random_);
132     KllSampler sampler(&compactor_stack);
133     EXPECT_EQ(sampler.capacity(), 2);
134 
135     sampler.Add(1);
136     sampler.Add(2);
137     sampler.Add(3);
138 
139     const std::vector<int64_t>& compactor =
140             compactor_stack.compactors()[sampler.num_replaced_levels()];
141     EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2))));
142     EXPECT_EQ(compactor_stack.num_stored_items(), 1);
143     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(3), Eq(1))));
144 }
145 
TEST_F(KllQuantileSamplerTest,AddWithWeightZero)146 TEST_F(KllQuantileSamplerTest, AddWithWeightZero) {
147     CompactorStack compactor_stack(1000, 100000, &random_);
148     KllSampler sampler(&compactor_stack);
149     sampler.AddWithWeight(5, 0);
150     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
151 }
152 
TEST_F(KllQuantileSamplerTest,AddWithWeightOne)153 TEST_F(KllQuantileSamplerTest, AddWithWeightOne) {
154     CompactorStack compactor_stack(1000, 100000, &random_);
155     KllSampler sampler(&compactor_stack);
156     sampler.AddWithWeight(5, 1);
157     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(5), Eq(1))));
158 }
159 
TEST_F(KllQuantileSamplerTest,AddWithWeightTwo)160 TEST_F(KllQuantileSamplerTest, AddWithWeightTwo) {
161     CompactorStack compactor_stack(1000, 100000, &random_);
162     KllSampler sampler(&compactor_stack);
163     sampler.AddWithWeight(1, 2);
164 
165     const std::vector<int64_t>& compactor =
166             compactor_stack.compactors()[sampler.num_replaced_levels()];
167     EXPECT_THAT(compactor, Contains(Eq(1)));
168     EXPECT_EQ(compactor_stack.num_stored_items(), 1);
169     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
170 }
171 
TEST_F(KllQuantileSamplerTest,AddWithWeightThree)172 TEST_F(KllQuantileSamplerTest, AddWithWeightThree) {
173     CompactorStack compactor_stack(1000, 100000, &random_);
174     KllSampler sampler(&compactor_stack);
175     sampler.AddWithWeight(3, 3);
176 
177     const std::vector<int64_t>& compactor =
178             compactor_stack.compactors()[sampler.num_replaced_levels()];
179     EXPECT_THAT(compactor, Contains(Eq(3)));
180     EXPECT_EQ(compactor_stack.num_stored_items(), 1);
181     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(3), Eq(1))));
182 }
183 
TEST_F(KllQuantileSamplerTest,WeightThreeNonEmptySampler)184 TEST_F(KllQuantileSamplerTest, WeightThreeNonEmptySampler) {
185     CompactorStack compactor_stack(1000, 100000, &random_);
186     KllSampler sampler(&compactor_stack);
187     sampler.Add(1);
188     sampler.DoubleCapacity();
189     sampler.DoubleCapacity();
190     sampler.AddWithWeight(2, 3);
191 
192     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(1), Eq(2)), Eq(4))));
193 }
194 
TEST_F(KllQuantileSamplerTest,WeightFiveNonEmptySampler)195 TEST_F(KllQuantileSamplerTest, WeightFiveNonEmptySampler) {
196     CompactorStack compactor_stack(1000, 100000, &random_);
197     KllSampler sampler(&compactor_stack);
198     sampler.Add(1);
199     sampler.DoubleCapacity();
200     sampler.Add(2);
201     sampler.AddWithWeight(3, 5);
202 
203     const std::vector<int64_t>& compactor =
204             compactor_stack.compactors()[sampler.num_replaced_levels()];
205     EXPECT_THAT(compactor, AnyOf(Contains(Eq(1)), Contains(Eq(2)), Contains(Eq(3))));
206     EXPECT_EQ(compactor_stack.num_stored_items(), 1);
207     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(3), Eq(3))));
208 }
209 
TEST_F(KllQuantileSamplerTest,DoubleCapacityBetweenAdds)210 TEST_F(KllQuantileSamplerTest, DoubleCapacityBetweenAdds) {
211     CompactorStack compactor_stack(1000, 100000, &random_);
212     KllSampler sampler(&compactor_stack);
213     sampler.Add(1);
214     sampler.DoubleCapacity();
215     EXPECT_EQ(sampler.capacity(), 4);
216     EXPECT_EQ(sampler.num_replaced_levels(), 2);
217     sampler.Add(2);
218     sampler.Add(3);
219     EXPECT_THAT(sampler.sampled_item_and_weight(),
220                 Optional(Pair(AnyOf(Eq((1)), Eq((2)), Eq((3))), Eq(3))));
221     sampler.DoubleCapacity();
222     EXPECT_EQ(sampler.capacity(), 8);
223     EXPECT_EQ(sampler.num_replaced_levels(), 3);
224     sampler.Add(4);
225     sampler.Add(5);
226     EXPECT_THAT(sampler.sampled_item_and_weight(),
227                 Optional(Pair(AnyOf(Eq((1)), Eq((2)), Eq((3)), Eq((4)), Eq((5))), Eq(5))));
228 }
229 
TEST_F(KllQuantileSamplerTest,ResetZeroItems)230 TEST_F(KllQuantileSamplerTest, ResetZeroItems) {
231     CompactorStack compactor_stack(1000, 100000, &random_);
232     KllSampler sampler(&compactor_stack);
233     sampler.Reset();
234     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
235 }
236 
TEST_F(KllQuantileSamplerTest,ResetBetweenAddingOneItem)237 TEST_F(KllQuantileSamplerTest, ResetBetweenAddingOneItem) {
238     CompactorStack compactor_stack(1000, 100000, &random_);
239     KllSampler sampler(&compactor_stack);
240     sampler.Add(1);
241     sampler.Reset();
242     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
243     sampler.Add(2);
244     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(Eq(2), Eq(1))));
245 }
246 
TEST_F(KllQuantileSamplerTest,ResetBetweenAddingTenItems)247 TEST_F(KllQuantileSamplerTest, ResetBetweenAddingTenItems) {
248     CompactorStack compactor_stack(1000, 100000, &random_);
249     KllSampler sampler(&compactor_stack);
250     sampler.DoubleCapacity();
251     EXPECT_EQ(sampler.capacity(), 4);
252     EXPECT_EQ(sampler.num_replaced_levels(), 2);
253 
254     for (int i = 0; i < 10; i++) {
255         sampler.Add(i);
256     }
257     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(_, Eq(2))));
258 
259     sampler.Reset();
260     EXPECT_THAT(sampler.sampled_item_and_weight(), Eq(std::nullopt));
261     EXPECT_EQ(sampler.capacity(), 2);
262     EXPECT_EQ(sampler.num_replaced_levels(), 1);
263     sampler.DoubleCapacity();
264     EXPECT_EQ(sampler.capacity(), 4);
265     EXPECT_EQ(sampler.num_replaced_levels(), 2);
266     for (int i = 0; i < 10; i++) {
267         sampler.Add(i);
268     }
269     EXPECT_EQ(compactor_stack.num_stored_items(), 4);
270     EXPECT_THAT(sampler.sampled_item_and_weight(), Optional(Pair(AnyOf(Eq(8), Eq(9)), Eq(2))));
271 }
272 
273 class SamplerItemSeenTest : public ::testing::TestWithParam<int> {};
274 
275 // Whenever making a larger change to the sampler remove the fixed seed for
276 // this test. You should see around 2% failure rate (among all num_item sizes)
277 // in this test when running it with a sufficiently large --num_test_runs.
278 //
279 // The test checks whether we generate all items from a set of [0, n)
280 // items in 2 * n * log(n) runs. The expected value for number of repetitions
281 // required is n * H_n (see coupon collector problem
282 // https://en.wikipedia.org/wiki/Coupon_collector%27s_problem).
283 //
284 // For a given n the probability that we see all items is:
285 // Stirling_2nd(runs, n) * n! / n^runs (n^runs is the total number
286 // of possible outcomes, Stirling 2nd is the number of possibilities to
287 // partition run elements into n non-empty buckets regardless of order and n!
288 // is needed to introduce the ordering).)
TEST_P(SamplerItemSeenTest,AllItemsSeen)289 TEST_P(SamplerItemSeenTest, AllItemsSeen) {
290     const int num_items = GetParam();
291     auto random = MTRandomGenerator(10);
292     bool seen_items[1000] = {};
293     int num_repetitions = 2 * num_items * std::ceil(std::log(1 + num_items));
294     for (int i = 0; i < num_repetitions; i++) {
295         CompactorStack compactor_stack(1000, 100000, &random);
296         KllSampler sampler(&compactor_stack);
297         while (sampler.capacity() <= num_items) {
298             sampler.DoubleCapacity();
299         }
300         for (int j = 0; j < num_items; j++) {
301             sampler.Add(j);
302         }
303         seen_items[sampler.sampled_item_and_weight().value().first] = true;
304         EXPECT_EQ(sampler.sampled_item_and_weight().value().second, num_items);
305     }
306 
307     for (int i = 0; i < num_items; i++) {
308         EXPECT_TRUE(seen_items[i]);
309     }
310 }
311 
312 INSTANTIATE_TEST_SUITE_P(SamplerEveryItemSeenTestCases, SamplerItemSeenTest,
313                          testing::Range(1, 1000, 10));
314 }  // namespace
315 
316 }  // namespace internal
317 }  // namespace aggregation
318 }  // namespace dist_proc
319