1 /* 2 * Copyright 2021 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "kll.h" 18 19 #include <cstdint> 20 #include <memory> 21 22 #include "aggregator.pb.h" 23 #include "compactor_stack.h" 24 #include "encoding/encoder.h" 25 #include "kll-quantiles.pb.h" 26 27 namespace dist_proc { 28 namespace aggregation { 29 30 using zetasketch::android::AggregatorStateProto; 31 32 std::unique_ptr<KllQuantile> KllQuantile::Create(std::string* error) { 33 return Create(KllQuantileOptions(), error); 34 } 35 36 std::unique_ptr<KllQuantile> KllQuantile::Create(const KllQuantileOptions& options, 37 std::string* error) { 38 if (options.k() < 0) { 39 if (error != nullptr) { 40 *error = "k has to be >= 0"; 41 } 42 return nullptr; 43 } 44 return std::unique_ptr<KllQuantile>( 45 new KllQuantile(options.inv_eps(), options.inv_delta(), options.k(), options.random())); 46 } 47 48 void KllQuantile::Add(const int64_t value) { 49 compactor_stack_.Add(value); 50 UpdateMin(value); 51 UpdateMax(value); 52 num_values_++; 53 } 54 55 void KllQuantile::AddWeighted(int64_t value, int weight) { 56 if (weight > 0) { 57 compactor_stack_.AddWithWeight(value, weight); 58 UpdateMin(value); 59 UpdateMax(value); 60 num_values_ += weight; 61 } 62 } 63 64 AggregatorStateProto KllQuantile::SerializeToProto() { 65 AggregatorStateProto aggregator_state; 66 67 aggregator_state.set_type(zetasketch::android::KLL_QUANTILES); 68 aggregator_state.set_num_values(num_values_); 69 aggregator_state.set_value_type(zetasketch::android::DefaultOpsType::INT64); 70 71 zetasketch::android::KllQuantilesStateProto* quantile_state = 72 aggregator_state.MutableExtension(zetasketch::android::kll_quantiles_state); 73 74 quantile_state->set_k(compactor_stack_.k()); 75 quantile_state->set_inv_eps(inv_eps_); 76 77 if (num_values_ == 0) { 78 return aggregator_state; 79 } 80 81 // Encode min/max. 82 encoding::Encoder::AppendToString(min_, quantile_state->mutable_min()); 83 encoding::Encoder::AppendToString(max_, quantile_state->mutable_max()); 84 85 // Sort compactors before encoding them, to only do sorting work once (vs. 86 // every time a sketch is read and extracted or merged), and to reduce sketch 87 // cardinality, which saves space in e.g. column store dictionaries. 88 compactor_stack_.SortCompactorContents(); 89 90 // Encode compactors. 91 const std::vector<std::vector<int64_t>>& compactors = compactor_stack_.compactors(); 92 quantile_state->mutable_compactors()->Reserve(compactors.size()); 93 94 for (const auto& compactor : compactors) { 95 encoding::Encoder::SerializeToPackedStringAll( 96 compactor.begin(), compactor.end(), 97 quantile_state 98 ->add_compactors() // Adds one compactor to the compactors field. 99 ->mutable_packed_values()); 100 } 101 102 // Encode sampler. 103 if (compactor_stack_.IsSamplerOn()) { 104 const auto& sampled_item_and_weight = compactor_stack_.sampled_item_and_weight(); 105 if (sampled_item_and_weight.has_value()) { 106 encoding::Encoder::AppendToString( 107 sampled_item_and_weight->first, 108 quantile_state->mutable_sampler()->mutable_sampled_item()); 109 quantile_state->mutable_sampler()->set_sampled_weight(sampled_item_and_weight->second); 110 } 111 quantile_state->mutable_sampler()->set_log_capacity(compactor_stack_.lowest_active_level()); 112 } 113 114 return aggregator_state; 115 } 116 117 void KllQuantile::UpdateMin(int64_t value) { 118 if (num_values_ == 0 || min_ > value) { 119 min_ = value; 120 } 121 } 122 123 void KllQuantile::UpdateMax(int64_t value) { 124 if (num_values_ == 0 || max_ < value) { 125 max_ = value; 126 } 127 } 128 129 void KllQuantile::Reset() { 130 num_values_ = 0; 131 compactor_stack_.Reset(); 132 } 133 134 } // namespace aggregation 135 } // namespace dist_proc 136