1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "sampler.h"
17 
18 #include <cmath>
19 #include <cstdint>
20 #include <vector>
21 
22 #include "random_generator.h"
23 
24 namespace dist_proc {
25 namespace aggregation {
26 namespace internal {
27 
Reset()28 void KllSampler::Reset() {
29     sampled_item_ = 0;
30     item_weight_ = 0;
31     capacity_ = 2;
32     num_replaced_levels_ = 1;
33 }
34 
Add(const int64_t item)35 void KllSampler::Add(const int64_t item) {
36     if (compactor_stack_->random()->UnbiasedUniform(++item_weight_) == 0) {
37         sampled_item_ = item;
38     }
39 
40     if (item_weight_ >= capacity_) {
41         AddSampleToCompactorStackAndRestart();
42     }
43 }
44 
AddWithWeight(int64_t item,int weight)45 void KllSampler::AddWithWeight(int64_t item, int weight) {
46     if (weight > 0) {
47         if (item_weight_ + weight < capacity_) {
48             item_weight_ += weight;
49             if (compactor_stack_->random()->UnbiasedUniform(item_weight_) <
50                 static_cast<uint64_t>(weight)) {
51                 sampled_item_ = item;
52             }
53         } else {
54             int64_t added_weight = capacity_ - item_weight_;
55             if (compactor_stack_->random()->UnbiasedUniform(capacity_) <
56                 static_cast<uint64_t>(added_weight)) {
57                 sampled_item_ = item;
58             }
59             item_weight_ = capacity_;
60             AddSampleToCompactorStackAndRestart();
61             AddWithWeight(item, weight - added_weight);
62         }
63     }
64 }
65 
DoubleCapacity()66 void KllSampler::DoubleCapacity() {
67     capacity_ *= 2;
68     num_replaced_levels_++;
69 }
70 
AddSampleToCompactorStackAndRestart()71 void KllSampler::AddSampleToCompactorStackAndRestart() {
72     compactor_stack_->AddWithWeight(sampled_item_, item_weight_);
73     item_weight_ = 0;
74     sampled_item_ = 0;
75 }
76 
77 }  // namespace internal
78 }  // namespace aggregation
79 }  // namespace dist_proc
80