1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "compactor_stack.h"
17 
18 #define LOG_TAG "libkll"
19 
20 #include <log/log.h>
21 
22 #include <vector>
23 
24 #include "random_generator.h"
25 #include "sampler.h"
26 
27 namespace dist_proc {
28 namespace aggregation {
29 namespace internal {
30 
CompactorStack(int64_t inv_eps,int64_t inv_delta,RandomGenerator * random)31 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random)
32     : CompactorStack(inv_eps, inv_delta, 0, random) {
33 }
34 
CompactorStack(int64_t inv_eps,int64_t inv_delta,int k,RandomGenerator * random)35 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random)
36     : random_(random) {
37     if (k != 0) {
38         k_ = k;
39     } else {
40         // k = 1/eps * sqrt(log_2(1/delta)) - taken from proof of Thm 1.
41         double raw_k = inv_eps * std::sqrt(std::log2(inv_delta));
42         k_ = std::pow(2, std::lround(std::log2(raw_k)));
43     }
44     Reset();
45 }
46 
~CompactorStack()47 CompactorStack::~CompactorStack() {
48     ClearCompactors();
49 }
50 
51 // Initialize or reset the compactor stack and all counters and thresholds.
Reset()52 void CompactorStack::Reset() {
53     overall_capacity_ = 0;
54     ClearCompactors();
55     sampler_ = nullptr;
56     AddLevel();
57 }
58 
Add(const int64_t value)59 void CompactorStack::Add(const int64_t value) {
60     if (sampler_ == nullptr) {
61         compactors_[0].push_back(value);
62         num_items_in_compactors_++;
63         CompactStack();
64     } else {
65         sampler_->Add(value);
66     }
67 }
68 
69 // Adds an item to the compactor stack with weight >= 1.
70 // Does nothing if weight <= 0.
AddWithWeight(int64_t value,int weight)71 void CompactorStack::AddWithWeight(int64_t value, int weight) {
72     if (weight > 0) {
73         int remaining_weight = weight;
74         size_t level_to_add = 0;
75         if (sampler_ != nullptr) {
76             sampler_->AddWithWeight(value, remaining_weight % sampler_->capacity());
77             remaining_weight /= sampler_->capacity();
78             level_to_add = sampler_->num_replaced_levels();
79         }
80         while (remaining_weight != 0) {
81             if (level_to_add >= compactors_.size()) {
82                 AddLevel();
83             }
84             if ((remaining_weight & 1) != 0) {
85                 compactors_[level_to_add].push_back(value);
86                 num_items_in_compactors_++;
87             }
88             remaining_weight >>= 1;
89             level_to_add++;
90         }
91         CompactStack();
92     }
93 }
94 
SortCompactorContents()95 void CompactorStack::SortCompactorContents() {
96     for (std::vector<int64_t>& compactor : compactors_) {
97         std::sort(compactor.begin(), compactor.end());
98     }
99 }
100 
ClearCompactors()101 void CompactorStack::ClearCompactors() {
102     compactors_.clear();
103     num_items_in_compactors_ = 0;
104 }
105 
AddLevel()106 void CompactorStack::AddLevel() {
107     compactors_.resize(compactors_.size() + 1);
108 
109     int cap_at_lowest_active_level = TargetCapacityAtLevel(lowest_active_level());
110     // All levels i get capacity that previously level i-1 had, except the
111     // (previous) lowest active level, which gets a new smaller capacity.
112     // Overall capacity changes by that amount.
113     overall_capacity_ += cap_at_lowest_active_level;
114 
115     if (cap_at_lowest_active_level == 0) {
116         DoubleSamplerCapacity();
117     }
118 }
119 
CompactStack()120 void CompactorStack::CompactStack() {
121     int initial_num_items_in_compactors = num_items_in_compactors_;
122     while (num_items_in_compactors_ >= overall_capacity_) {
123         for (size_t i = 0; i < compactors_.size(); i++) {
124             if (!compactors_[i].empty() &&
125                 static_cast<int>(compactors_[i].size()) >= TargetCapacityAtLevel(i)) {
126                 CompactLevel(i);
127                 if (num_items_in_compactors_ < overall_capacity_) {
128                     break;
129                 }
130             }
131         }
132         // TODO(b/237694338): Remove the temporary infinite loop detection code
133         if (num_items_in_compactors_ >= initial_num_items_in_compactors) {
134             // The loop above didn't do anything in terms of reducing the number of items.
135             // To prevent an infinite loop, crash now.
136             ALOGI("num_items_in_compactors_=%d, compactors_.size()=%zu, overall_capacity_=%d",
137                   num_items_in_compactors_, compactors_.size(), overall_capacity_);
138             for (size_t i = 0; i < compactors_.size(); i++) {
139                 const std::vector<int64_t>& compactor = compactors_[i];
140                 ALOGI("compactors_[%zu].size()=%zu, TargetCapacityAtLevel(i)=%d", i,
141                       compactor.size(), TargetCapacityAtLevel(i));
142             }
143             LOG_ALWAYS_FATAL("Detected infinite loop in %s ", __func__);
144         }
145         initial_num_items_in_compactors = num_items_in_compactors_;
146     }
147 }
148 
CompactLevel(int level)149 void CompactorStack::CompactLevel(int level) {
150     if (level == static_cast<int>(compactors_.size()) - 1) {
151         AddLevel();
152     }
153     Halve(&compactors_[level], &compactors_[level + 1]);
154     std::vector<int64_t>().swap(compactors_[level]);
155 }
156 
157 // To compact the items in a compactor to roughly half the size,
158 // sorts the items and adds every even or odd item (determined randomly)
159 // to the up_compactor.
Halve(std::vector<int64_t> * down_compactor,std::vector<int64_t> * up_compactor)160 void CompactorStack::Halve(std::vector<int64_t>* down_compactor,
161                            std::vector<int64_t>* up_compactor) {
162     std::sort(down_compactor->begin(), down_compactor->end());
163     double half_of_items = down_compactor->size() / static_cast<double>(2);
164     bool keep_even_items = (random_->UnbiasedUniform(2) == 0);
165     num_items_in_compactors_ -= static_cast<int>(keep_even_items ? std::floor(half_of_items)
166                                                                  : std::ceil(half_of_items));
167 
168     bool even = true;
169 
170     for (size_t i = 0; i < down_compactor->size(); i++) {
171         if (even == keep_even_items) {
172             up_compactor->push_back((*down_compactor)[i]);
173         }
174         even = !even;
175     }
176     down_compactor->clear();
177 }
178 
TargetCapacityAtLevel(int h) const179 int CompactorStack::TargetCapacityAtLevel(int h) const {
180     int num_stack_levels = compactors_.size();
181 
182     int raw_capacity = static_cast<int>(std::ceil(std::pow(c_, num_stack_levels - h - 1) * k_));
183 
184     // If the capacity is two or less, the level will be replaced by the
185     // sampler.
186     return raw_capacity > 2 ? raw_capacity : 0;
187 }
188 
DoubleSamplerCapacity()189 void CompactorStack::DoubleSamplerCapacity() {
190     int prev_lowest_active_level = lowest_active_level();
191     if (sampler_ != nullptr) {
192         sampler_->DoubleCapacity();
193     } else {
194         sampler_ = std::make_unique<KllSampler>(this);
195     }
196 
197     CompactLevel(prev_lowest_active_level);
198 }
199 
num_stored_items() const200 int CompactorStack::num_stored_items() const {
201     if (sampler_ == nullptr) {
202         return num_items_in_compactors_;
203     } else {
204         return num_items_in_compactors_ +
205                ((sampler_->sampled_item_and_weight().has_value()) ? 1 : 0);
206     }
207 }
208 
sampled_item_and_weight() const209 std::optional<std::pair<const int64_t, int64_t>> CompactorStack::sampled_item_and_weight() const {
210     if (sampler_ != nullptr) {
211         return sampler_->sampled_item_and_weight();
212     } else {
213         return std::nullopt;
214     }
215 }
216 
sampler_capacity() const217 int64_t CompactorStack::sampler_capacity() const {
218     return sampler_ ? sampler_->capacity() : 1;  // capacity = 1 to denote the empty sampler.
219 }
220 
lowest_active_level() const221 int CompactorStack::lowest_active_level() const {
222     return sampler_ ? sampler_->num_replaced_levels() : 0;
223 }
224 
225 }  // namespace internal
226 }  // namespace aggregation
227 }  // namespace dist_proc
228