1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "kll.h"
18 
19 #include <cstdint>
20 #include <memory>
21 
22 #include "aggregator.pb.h"
23 #include "compactor_stack.h"
24 #include "encoding/encoder.h"
25 #include "kll-quantiles.pb.h"
26 
27 namespace dist_proc {
28 namespace aggregation {
29 
30 using zetasketch::android::AggregatorStateProto;
31 
32 std::unique_ptr<KllQuantile> KllQuantile::Create(std::string* error) {
33     return Create(KllQuantileOptions(), error);
34 }
35 
36 std::unique_ptr<KllQuantile> KllQuantile::Create(const KllQuantileOptions& options,
37                                                  std::string* error) {
38     if (options.k() < 0) {
39         if (error != nullptr) {
40             *error = "k has to be >= 0";
41         }
42         return nullptr;
43     }
44     return std::unique_ptr<KllQuantile>(
45             new KllQuantile(options.inv_eps(), options.inv_delta(), options.k(), options.random()));
46 }
47 
48 void KllQuantile::Add(const int64_t value) {
49     compactor_stack_.Add(value);
50     UpdateMin(value);
51     UpdateMax(value);
52     num_values_++;
53 }
54 
55 void KllQuantile::AddWeighted(int64_t value, int weight) {
56     if (weight > 0) {
57         compactor_stack_.AddWithWeight(value, weight);
58         UpdateMin(value);
59         UpdateMax(value);
60         num_values_ += weight;
61     }
62 }
63 
64 AggregatorStateProto KllQuantile::SerializeToProto() {
65     AggregatorStateProto aggregator_state;
66 
67     aggregator_state.set_type(zetasketch::android::KLL_QUANTILES);
68     aggregator_state.set_num_values(num_values_);
69     aggregator_state.set_value_type(zetasketch::android::DefaultOpsType::INT64);
70 
71     zetasketch::android::KllQuantilesStateProto* quantile_state =
72             aggregator_state.MutableExtension(zetasketch::android::kll_quantiles_state);
73 
74     quantile_state->set_k(compactor_stack_.k());
75     quantile_state->set_inv_eps(inv_eps_);
76 
77     if (num_values_ == 0) {
78         return aggregator_state;
79     }
80 
81     // Encode min/max.
82     encoding::Encoder::AppendToString(min_, quantile_state->mutable_min());
83     encoding::Encoder::AppendToString(max_, quantile_state->mutable_max());
84 
85     // Sort compactors before encoding them, to only do sorting work once (vs.
86     // every time a sketch is read and extracted or merged), and to reduce sketch
87     // cardinality, which saves space in e.g. column store dictionaries.
88     compactor_stack_.SortCompactorContents();
89 
90     // Encode compactors.
91     const std::vector<std::vector<int64_t>>& compactors = compactor_stack_.compactors();
92     quantile_state->mutable_compactors()->Reserve(compactors.size());
93 
94     for (const auto& compactor : compactors) {
95         encoding::Encoder::SerializeToPackedStringAll(
96                 compactor.begin(), compactor.end(),
97                 quantile_state
98                         ->add_compactors()  // Adds one compactor to the compactors field.
99                         ->mutable_packed_values());
100     }
101 
102     // Encode sampler.
103     if (compactor_stack_.IsSamplerOn()) {
104         const auto& sampled_item_and_weight = compactor_stack_.sampled_item_and_weight();
105         if (sampled_item_and_weight.has_value()) {
106             encoding::Encoder::AppendToString(
107                     sampled_item_and_weight->first,
108                     quantile_state->mutable_sampler()->mutable_sampled_item());
109             quantile_state->mutable_sampler()->set_sampled_weight(sampled_item_and_weight->second);
110         }
111         quantile_state->mutable_sampler()->set_log_capacity(compactor_stack_.lowest_active_level());
112     }
113 
114     return aggregator_state;
115 }
116 
117 void KllQuantile::UpdateMin(int64_t value) {
118     if (num_values_ == 0 || min_ > value) {
119         min_ = value;
120     }
121 }
122 
123 void KllQuantile::UpdateMax(int64_t value) {
124     if (num_values_ == 0 || max_ < value) {
125         max_ = value;
126     }
127 }
128 
129 void KllQuantile::Reset() {
130     num_values_ = 0;
131     compactor_stack_.Reset();
132 }
133 
134 }  // namespace aggregation
135 }  // namespace dist_proc
136