1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "compactor_stack.h"
17 
18 #include <vector>
19 
20 #include "random_generator.h"
21 #include "sampler.h"
22 
23 namespace dist_proc {
24 namespace aggregation {
25 namespace internal {
26 
27 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random)
28     : CompactorStack(inv_eps, inv_delta, 0, random) {
29 }
30 
31 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random)
32     : random_(random) {
33     if (k != 0) {
34         k_ = k;
35     } else {
36         // k = 1/eps * sqrt(log_2(1/delta)) - taken from proof of Thm 1.
37         double raw_k = inv_eps * std::sqrt(std::log2(inv_delta));
38         k_ = std::pow(2, std::lround(std::log2(raw_k)));
39     }
40     Reset();
41 }
42 
43 CompactorStack::~CompactorStack() {
44     ClearCompactors();
45 }
46 
47 // Initialize or reset the compactor stack and all counters and thresholds.
48 void CompactorStack::Reset() {
49     overall_capacity_ = 0;
50     ClearCompactors();
51     sampler_ = nullptr;
52     AddLevel();
53 }
54 
55 void CompactorStack::Add(const int64_t value) {
56     if (sampler_ == nullptr) {
57         compactors_[0].push_back(value);
58         num_items_in_compactors_++;
59         CompactStack();
60     } else {
61         sampler_->Add(value);
62     }
63 }
64 
65 // Adds an item to the compactor stack with weight >= 1.
66 // Does nothing if weight <= 0.
67 void CompactorStack::AddWithWeight(int64_t value, int weight) {
68     if (weight > 0) {
69         int remaining_weight = weight;
70         size_t level_to_add = 0;
71         if (sampler_ != nullptr) {
72             sampler_->AddWithWeight(value, remaining_weight % sampler_->capacity());
73             remaining_weight /= sampler_->capacity();
74             level_to_add = sampler_->num_replaced_levels();
75         }
76         while (remaining_weight != 0) {
77             if (level_to_add >= compactors_.size()) {
78                 AddLevel();
79             }
80             if ((remaining_weight & 1) != 0) {
81                 compactors_[level_to_add].push_back(value);
82                 num_items_in_compactors_++;
83             }
84             remaining_weight >>= 1;
85             level_to_add++;
86         }
87         CompactStack();
88     }
89 }
90 
91 void CompactorStack::SortCompactorContents() {
92     for (std::vector<int64_t>& compactor : compactors_) {
93         std::sort(compactor.begin(), compactor.end());
94     }
95 }
96 
97 void CompactorStack::ClearCompactors() {
98     compactors_.clear();
99     num_items_in_compactors_ = 0;
100 }
101 
102 void CompactorStack::AddLevel() {
103     compactors_.resize(compactors_.size() + 1);
104 
105     int cap_at_lowest_active_level = TargetCapacityAtLevel(lowest_active_level());
106     // All levels i get capacity that previously level i-1 had, except the
107     // (previous) lowest active level, which gets a new smaller capacity.
108     // Overall capacity changes by that amount.
109     overall_capacity_ += cap_at_lowest_active_level;
110 
111     if (cap_at_lowest_active_level == 0) {
112         DoubleSamplerCapacity();
113     }
114 }
115 
116 void CompactorStack::CompactStack() {
117     while (num_items_in_compactors_ >= overall_capacity_) {
118         for (size_t i = 0; i < compactors_.size(); i++) {
119             if (!compactors_[i].empty() &&
120                 static_cast<int>(compactors_[i].size()) >= TargetCapacityAtLevel(i)) {
121                 CompactLevel(i);
122                 if (num_items_in_compactors_ < overall_capacity_) {
123                     break;
124                 }
125             }
126         }
127     }
128 }
129 
130 void CompactorStack::CompactLevel(int level) {
131     if (level == static_cast<int>(compactors_.size()) - 1) {
132         AddLevel();
133     }
134     Halve(&compactors_[level], &compactors_[level + 1]);
135     std::vector<int64_t>().swap(compactors_[level]);
136 }
137 
138 // To compact the items in a compactor to roughly half the size,
139 // sorts the items and adds every even or odd item (determined randomly)
140 // to the up_compactor.
141 void CompactorStack::Halve(std::vector<int64_t>* down_compactor,
142                            std::vector<int64_t>* up_compactor) {
143     std::sort(down_compactor->begin(), down_compactor->end());
144     double half_of_items = down_compactor->size() / static_cast<double>(2);
145     bool keep_even_items = (random_->UnbiasedUniform(2) == 0);
146     num_items_in_compactors_ -= static_cast<int>(keep_even_items ? std::floor(half_of_items)
147                                                                  : std::ceil(half_of_items));
148 
149     bool even = true;
150 
151     for (size_t i = 0; i < down_compactor->size(); i++) {
152         if (even == keep_even_items) {
153             up_compactor->push_back((*down_compactor)[i]);
154         }
155         even = !even;
156     }
157     down_compactor->clear();
158 }
159 
160 int CompactorStack::TargetCapacityAtLevel(int h) const {
161     int num_stack_levels = compactors_.size();
162 
163     int raw_capacity = static_cast<int>(std::ceil(std::pow(c_, num_stack_levels - h - 1) * k_));
164 
165     // If the capacity is two or less, the level will be replaced by the
166     // sampler.
167     return raw_capacity > 2 ? raw_capacity : 0;
168 }
169 
170 void CompactorStack::DoubleSamplerCapacity() {
171     int prev_lowest_active_level = lowest_active_level();
172     if (sampler_ != nullptr) {
173         sampler_->DoubleCapacity();
174     } else {
175         sampler_ = std::make_unique<KllSampler>(this);
176     }
177 
178     CompactLevel(prev_lowest_active_level);
179 }
180 
181 int CompactorStack::num_stored_items() const {
182     if (sampler_ == nullptr) {
183         return num_items_in_compactors_;
184     } else {
185         return num_items_in_compactors_ +
186                ((sampler_->sampled_item_and_weight().has_value()) ? 1 : 0);
187     }
188 }
189 
190 std::optional<std::pair<const int64_t, int64_t>> CompactorStack::sampled_item_and_weight() const {
191     if (sampler_ != nullptr) {
192         return sampler_->sampled_item_and_weight();
193     } else {
194         return std::nullopt;
195     }
196 }
197 
198 int64_t CompactorStack::sampler_capacity() const {
199     return sampler_ ? sampler_->capacity() : 1;  // capacity = 1 to denote the empty sampler.
200 }
201 
202 int CompactorStack::lowest_active_level() const {
203     return sampler_ ? sampler_->num_replaced_levels() : 0;
204 }
205 
206 }  // namespace internal
207 }  // namespace aggregation
208 }  // namespace dist_proc
209