1 /* 2 * Copyright 2021 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #include "compactor_stack.h" 17 18 #include <vector> 19 20 #include "random_generator.h" 21 #include "sampler.h" 22 23 namespace dist_proc { 24 namespace aggregation { 25 namespace internal { 26 27 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random) 28 : CompactorStack(inv_eps, inv_delta, 0, random) { 29 } 30 31 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random) 32 : random_(random) { 33 if (k != 0) { 34 k_ = k; 35 } else { 36 // k = 1/eps * sqrt(log_2(1/delta)) - taken from proof of Thm 1. 37 double raw_k = inv_eps * std::sqrt(std::log2(inv_delta)); 38 k_ = std::pow(2, std::lround(std::log2(raw_k))); 39 } 40 Reset(); 41 } 42 43 CompactorStack::~CompactorStack() { 44 ClearCompactors(); 45 } 46 47 // Initialize or reset the compactor stack and all counters and thresholds. 48 void CompactorStack::Reset() { 49 overall_capacity_ = 0; 50 ClearCompactors(); 51 sampler_ = nullptr; 52 AddLevel(); 53 } 54 55 void CompactorStack::Add(const int64_t value) { 56 if (sampler_ == nullptr) { 57 compactors_[0].push_back(value); 58 num_items_in_compactors_++; 59 CompactStack(); 60 } else { 61 sampler_->Add(value); 62 } 63 } 64 65 // Adds an item to the compactor stack with weight >= 1. 66 // Does nothing if weight <= 0. 67 void CompactorStack::AddWithWeight(int64_t value, int weight) { 68 if (weight > 0) { 69 int remaining_weight = weight; 70 size_t level_to_add = 0; 71 if (sampler_ != nullptr) { 72 sampler_->AddWithWeight(value, remaining_weight % sampler_->capacity()); 73 remaining_weight /= sampler_->capacity(); 74 level_to_add = sampler_->num_replaced_levels(); 75 } 76 while (remaining_weight != 0) { 77 if (level_to_add >= compactors_.size()) { 78 AddLevel(); 79 } 80 if ((remaining_weight & 1) != 0) { 81 compactors_[level_to_add].push_back(value); 82 num_items_in_compactors_++; 83 } 84 remaining_weight >>= 1; 85 level_to_add++; 86 } 87 CompactStack(); 88 } 89 } 90 91 void CompactorStack::SortCompactorContents() { 92 for (std::vector<int64_t>& compactor : compactors_) { 93 std::sort(compactor.begin(), compactor.end()); 94 } 95 } 96 97 void CompactorStack::ClearCompactors() { 98 compactors_.clear(); 99 num_items_in_compactors_ = 0; 100 } 101 102 void CompactorStack::AddLevel() { 103 compactors_.resize(compactors_.size() + 1); 104 105 int cap_at_lowest_active_level = TargetCapacityAtLevel(lowest_active_level()); 106 // All levels i get capacity that previously level i-1 had, except the 107 // (previous) lowest active level, which gets a new smaller capacity. 108 // Overall capacity changes by that amount. 109 overall_capacity_ += cap_at_lowest_active_level; 110 111 if (cap_at_lowest_active_level == 0) { 112 DoubleSamplerCapacity(); 113 } 114 } 115 116 void CompactorStack::CompactStack() { 117 while (num_items_in_compactors_ >= overall_capacity_) { 118 for (size_t i = 0; i < compactors_.size(); i++) { 119 if (!compactors_[i].empty() && 120 static_cast<int>(compactors_[i].size()) >= TargetCapacityAtLevel(i)) { 121 CompactLevel(i); 122 if (num_items_in_compactors_ < overall_capacity_) { 123 break; 124 } 125 } 126 } 127 } 128 } 129 130 void CompactorStack::CompactLevel(int level) { 131 if (level == static_cast<int>(compactors_.size()) - 1) { 132 AddLevel(); 133 } 134 Halve(&compactors_[level], &compactors_[level + 1]); 135 std::vector<int64_t>().swap(compactors_[level]); 136 } 137 138 // To compact the items in a compactor to roughly half the size, 139 // sorts the items and adds every even or odd item (determined randomly) 140 // to the up_compactor. 141 void CompactorStack::Halve(std::vector<int64_t>* down_compactor, 142 std::vector<int64_t>* up_compactor) { 143 std::sort(down_compactor->begin(), down_compactor->end()); 144 double half_of_items = down_compactor->size() / static_cast<double>(2); 145 bool keep_even_items = (random_->UnbiasedUniform(2) == 0); 146 num_items_in_compactors_ -= static_cast<int>(keep_even_items ? std::floor(half_of_items) 147 : std::ceil(half_of_items)); 148 149 bool even = true; 150 151 for (size_t i = 0; i < down_compactor->size(); i++) { 152 if (even == keep_even_items) { 153 up_compactor->push_back((*down_compactor)[i]); 154 } 155 even = !even; 156 } 157 down_compactor->clear(); 158 } 159 160 int CompactorStack::TargetCapacityAtLevel(int h) const { 161 int num_stack_levels = compactors_.size(); 162 163 int raw_capacity = static_cast<int>(std::ceil(std::pow(c_, num_stack_levels - h - 1) * k_)); 164 165 // If the capacity is two or less, the level will be replaced by the 166 // sampler. 167 return raw_capacity > 2 ? raw_capacity : 0; 168 } 169 170 void CompactorStack::DoubleSamplerCapacity() { 171 int prev_lowest_active_level = lowest_active_level(); 172 if (sampler_ != nullptr) { 173 sampler_->DoubleCapacity(); 174 } else { 175 sampler_ = std::make_unique<KllSampler>(this); 176 } 177 178 CompactLevel(prev_lowest_active_level); 179 } 180 181 int CompactorStack::num_stored_items() const { 182 if (sampler_ == nullptr) { 183 return num_items_in_compactors_; 184 } else { 185 return num_items_in_compactors_ + 186 ((sampler_->sampled_item_and_weight().has_value()) ? 1 : 0); 187 } 188 } 189 190 std::optional<std::pair<const int64_t, int64_t>> CompactorStack::sampled_item_and_weight() const { 191 if (sampler_ != nullptr) { 192 return sampler_->sampled_item_and_weight(); 193 } else { 194 return std::nullopt; 195 } 196 } 197 198 int64_t CompactorStack::sampler_capacity() const { 199 return sampler_ ? sampler_->capacity() : 1; // capacity = 1 to denote the empty sampler. 200 } 201 202 int CompactorStack::lowest_active_level() const { 203 return sampler_ ? sampler_->num_replaced_levels() : 0; 204 } 205 206 } // namespace internal 207 } // namespace aggregation 208 } // namespace dist_proc 209