1 /* 2 * Copyright 2021 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include <assert.h> 19 20 #include <optional> 21 22 #include "compactor_stack.h" 23 24 namespace dist_proc { 25 namespace aggregation { 26 namespace internal { 27 28 class CompactorStack; 29 30 // Class that does reservoir sampling to uniformly-at-random select one out of 31 // capacity() items that are added to it. The selected item is added to the 32 // compactor stack and sampling continues with the next capacity_ items. 33 // 34 // Serves as an replacement of num_replaced_levels levels of size 2 of the 35 // compactor stack, while only using constant memory. 36 class KllSampler { 37 public: KllSampler(CompactorStack * compactor_stack)38 KllSampler(CompactorStack* compactor_stack) : compactor_stack_(compactor_stack) { 39 assert(compactor_stack != nullptr); 40 Reset(); 41 } 42 43 void Reset(); 44 45 // Adds an item to the sampler with weight one. 46 void Add(int64_t item); 47 48 // Adds an item to the sampler with weight >= 1. Does nothing if weight <= 0. 49 void AddWithWeight(int64_t item, int weight); 50 51 void DoubleCapacity(); 52 capacity()53 int64_t capacity() const { 54 return capacity_; 55 } 56 sampled_item_and_weight()57 std::optional<std::pair<int64_t, int>> sampled_item_and_weight() const { 58 if (item_weight_ == 0) { 59 return std::nullopt; 60 } 61 return std::make_pair(sampled_item_, item_weight_); 62 } 63 num_replaced_levels()64 int num_replaced_levels() const { 65 return num_replaced_levels_; 66 } 67 68 private: 69 void AddSampleToCompactorStackAndRestart(); 70 int64_t sampled_item_; 71 int64_t item_weight_; 72 int64_t capacity_; 73 int num_replaced_levels_; 74 CompactorStack* compactor_stack_; 75 }; 76 77 } // namespace internal 78 } // namespace aggregation 79 } // namespace dist_proc 80