1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <gtest/gtest_prod.h>
20 
21 #include <optional>
22 
23 #include "FieldValue.h"
24 #include "HashableDimensionKey.h"
25 #include "MetricProducer.h"
26 #include "anomaly/AnomalyTracker.h"
27 #include "condition/ConditionTracker.h"
28 #include "external/PullDataReceiver.h"
29 #include "external/StatsPullerManager.h"
30 #include "matchers/EventMatcherWizard.h"
31 #include "src/statsd_config.pb.h"
32 #include "stats_log_util.h"
33 #include "stats_util.h"
34 
35 namespace android {
36 namespace os {
37 namespace statsd {
38 
39 template <typename AggregatedValue>
40 struct PastBucket {
41     int64_t mBucketStartNs;
42     int64_t mBucketEndNs;
43     std::vector<int> aggIndex;
44     std::vector<AggregatedValue> aggregates;
45     std::vector<int> sampleSizes;
46 
47     /**
48      * If the metric has no condition, then this field is just wasted.
49      * When we tune statsd memory usage in the future, this is a candidate to optimize.
50      */
51     int64_t mConditionTrueNs;
52 
53     /**
54      * The semantic is the value which needs to be applied to mConditionTrueNs for correction
55      * to be performed prior normalization calculation on the user (read server) side. Applied only
56      * to ValueMetrics with pulled atoms.
57      */
58     int64_t mConditionCorrectionNs;
59 };
60 
61 // Aggregates values within buckets.
62 //
63 // There are different events that might complete a bucket
64 // - a condition change
65 // - an app upgrade
66 // - an alarm set to the end of the bucket
67 template <typename AggregatedValue, typename DimExtras>
68 class ValueMetricProducer : public MetricProducer, public virtual PullDataReceiver {
69 public:
70     struct PullOptions {
71         const int pullAtomId;
72         const sp<StatsPullerManager>& pullerManager;
73     };
74 
75     struct BucketOptions {
76         const int64_t timeBaseNs;
77         const int64_t startTimeNs;
78         const int64_t bucketSizeNs;
79         const int64_t minBucketSizeNs;
80         const optional<int64_t> conditionCorrectionThresholdNs;
81         const optional<bool> splitBucketForAppUpgrade;
82     };
83 
84     struct WhatOptions {
85         const bool containsAnyPositionInDimensionsInWhat;
86         const bool shouldUseNestedDimensions;
87         const int whatMatcherIndex;
88         const sp<EventMatcherWizard>& matcherWizard;
89         const FieldMatcher& dimensionsInWhat;
90         const vector<Matcher>& fieldMatchers;
91         const vector<ValueMetric::AggregationType> aggregationTypes;
92     };
93 
94     struct ConditionOptions {
95         const int conditionIndex;
96         const ConditionLinks& conditionLinks;
97         const vector<ConditionState>& initialConditionCache;
98         const sp<ConditionWizard>& conditionWizard;
99     };
100 
101     struct StateOptions {
102         const StateLinks& stateLinks;
103         const vector<int>& slicedStateAtoms;
104         const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap;
105     };
106 
107     struct ActivationOptions {
108         const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap;
109         const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>&
110                 eventDeactivationMap;
111     };
112 
113     struct GuardrailOptions {
114         const size_t dimensionSoftLimit;
115         const size_t dimensionHardLimit;
116     };
117 
118     virtual ~ValueMetricProducer();
119 
120     // Process data pulled on bucket boundary.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & data,PullResult pullResult,int64_t originalPullTimeNs)121     virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
122                               PullResult pullResult, int64_t originalPullTimeNs) override {
123     }
124 
125     // Determine if metric needs to pull
isPullNeeded()126     virtual bool isPullNeeded() const override {
127         return false;
128     }
129 
130     // ValueMetric needs special logic if it's a pulled atom.
131     void onStatsdInitCompleted(int64_t eventTimeNs) override;
132 
133     void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
134                         const FieldValue& oldState, const FieldValue& newState) override;
135 
136 protected:
137     ValueMetricProducer(int64_t metricId, const ConfigKey& key, uint64_t protoHash,
138                         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
139                         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
140                         const StateOptions& stateOptions,
141                         const ActivationOptions& activationOptions,
142                         const GuardrailOptions& guardrailOptions,
143                         const wp<ConfigMetadataProvider> configMetadataProvider);
144 
145     void onMatchedLogEventInternalLocked(
146             const size_t matcherIndex, const MetricDimensionKey& eventKey,
147             const ConditionKey& conditionKey, bool condition, const LogEvent& event,
148             const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
149 
150     // Determine whether or not a LogEvent can be skipped.
151     virtual inline bool canSkipLogEventLocked(
152             const MetricDimensionKey& eventKey, bool condition, int64_t eventTimeNs,
153             const std::map<int, HashableDimensionKey>& statePrimaryKeys) const = 0;
154 
155     void notifyAppUpgradeInternalLocked(const int64_t eventTimeNs) override;
156 
157     void onDumpReportLocked(const int64_t dumpTimeNs, const bool includeCurrentPartialBucket,
158                             const bool eraseData, const DumpLatency dumpLatency,
159                             std::set<string>* strSet,
160                             android::util::ProtoOutputStream* protoOutput) override;
161 
162     struct DumpProtoFields {
163         const int metricTypeFieldId;
164         const int bucketNumFieldId;
165         const int startBucketMsFieldId;
166         const int endBucketMsFieldId;
167         const int conditionTrueNsFieldId;
168         const optional<int> conditionCorrectionNsFieldId;
169     };
170 
171     virtual DumpProtoFields getDumpProtoFields() const = 0;
172 
173     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
174 
175     // ValueMetricProducer internal interface to handle active state change.
176     void onActiveStateChangedLocked(const int64_t eventTimeNs, const bool isActive) override;
177 
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)178     virtual void onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
179                                                     const bool isActive) {
180     }
181 
182     // ValueMetricProducer internal interface to handle condition change.
183     void onConditionChangedLocked(const bool condition, int64_t eventTimeNs) override;
184 
185     // Only called when mIsActive, the event is NOT too late, and after pulling.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)186     virtual void onConditionChangedInternalLocked(const ConditionState oldCondition,
187                                                   const ConditionState newCondition,
188                                                   const int64_t eventTimeNs) {
189     }
190 
191     // Internal interface to handle sliced condition change.
192     void onSlicedConditionMayChangeLocked(bool overallCondition, int64_t eventTime) override;
193 
194     void dumpStatesLocked(int out, bool verbose) const override;
195 
196     virtual std::string aggregatedValueToString(const AggregatedValue& aggregate) const = 0;
197 
198     // For pulled metrics, this method should only be called if a pull has been done. Else we will
199     // not have complete data for the bucket.
200     void flushIfNeededLocked(int64_t eventTime) override;
201 
202     // For pulled metrics, this method should only be called if a pulled has been done. Else we will
203     // not have complete data for the bucket.
204     void flushCurrentBucketLocked(int64_t eventTimeNs, int64_t nextBucketStartTimeNs) override;
205 
206     void dropDataLocked(const int64_t dropTimeNs) override;
207 
208     // Calculate how many buckets are present between the current bucket and eventTimeNs.
209     int64_t calcBucketsForwardCount(const int64_t eventTimeNs) const;
210 
211     // Mark the data as invalid.
212     virtual void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
213 
214     // Skips the current bucket without notifying StatsdStats of the skipped bucket.
215     // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that
216     // causes the bucket to be invalidated will not notify StatsdStats.
217     void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
218 
219     optional<InvalidConfigReason> onConfigUpdatedLocked(
220             const StatsdConfig& config, int configIndex, int metricIndex,
221             const std::vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
222             const std::unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
223             const std::unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
224             const sp<EventMatcherWizard>& matcherWizard,
225             const std::vector<sp<ConditionTracker>>& allConditionTrackers,
226             const std::unordered_map<int64_t, int>& conditionTrackerMap,
227             const sp<ConditionWizard>& wizard,
228             const std::unordered_map<int64_t, int>& metricToActivationMap,
229             std::unordered_map<int, std::vector<int>>& trackerToMetricMap,
230             std::unordered_map<int, std::vector<int>>& conditionToMetricMap,
231             std::unordered_map<int, std::vector<int>>& activationAtomTrackerToMetricMap,
232             std::unordered_map<int, std::vector<int>>& deactivationAtomTrackerToMetricMap,
233             std::vector<int>& metricsWithActivation) override;
234 
235     size_t computeValueBucketSizeLocked(const bool isFullBucket, const MetricDimensionKey& dimKey,
236                                         const bool isFirstBucket,
237                                         const PastBucket<AggregatedValue>& bucket) const;
238 
239     virtual size_t getAggregatedValueSize(const AggregatedValue& value) const = 0;
240 
241     virtual optional<int64_t> getConditionIdForMetric(const StatsdConfig& config,
242                                                       const int configIndex) const = 0;
243 
244     virtual int64_t getWhatAtomMatcherIdForMetric(const StatsdConfig& config,
245                                                   const int configIndex) const = 0;
246 
247     virtual ConditionLinks getConditionLinksForMetric(const StatsdConfig& config,
248                                                       const int configIndex) const = 0;
249 
250     int mWhatMatcherIndex;
251 
252     sp<EventMatcherWizard> mEventMatcherWizard;
253 
254     const sp<StatsPullerManager> mPullerManager;
255 
256     // Value fields for matching.
257     const std::vector<Matcher> mFieldMatchers;
258 
259     // Value fields for matching.
260     std::set<HashableDimensionKey> mMatchedMetricDimensionKeys;
261 
262     // Holds the atom id, primary key pair from a state change.
263     // Only used for pulled metrics.
264     // TODO(b/185796114): can be passed as function arguments instead.
265     pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey;
266 
267     // Atom Id for pulled data. -1 if this is not pulled.
268     const int mPullAtomId;
269 
270     // Tracks the value information of one value field.
271     struct Interval {
272         // Index in multi value aggregation.
273         int aggIndex;
274 
275         // Current aggregation, depending on the aggregation type.
276         AggregatedValue aggregate;
277 
278         // Number of samples collected.
279         int sampleSize = 0;
280 
hasValueInterval281         inline bool hasValue() const {
282             return sampleSize > 0;
283         }
284     };
285 
286     // Internal state of an ongoing aggregation bucket.
287     struct CurrentBucket {
288         // If the `MetricDimensionKey` state key is the current state key, then
289         // the condition timer will be updated later (e.g. condition/state/active
290         // state change) with the correct condition and time.
CurrentBucketCurrentBucket291         CurrentBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) {
292         }
293         // Value information for each value field of the metric.
294         std::vector<Interval> intervals;
295         // Tracks how long the condition is true.
296         ConditionTimer conditionTimer;
297     };
298 
299     // Tracks the internal state in the ongoing aggregation bucket for each DimensionsInWhat
300     // key and StateValuesKey pair.
301     std::unordered_map<MetricDimensionKey, CurrentBucket> mCurrentSlicedBucket;
302 
303     // State key and any extra information for a specific DimensionsInWhat key.
304     struct DimensionsInWhatInfo {
DimensionsInWhatInfoDimensionsInWhatInfo305         DimensionsInWhatInfo(const HashableDimensionKey& stateKey)
306             : dimExtras(), currentState(stateKey), hasCurrentState(false) {
307         }
308 
309         DimExtras dimExtras;
310 
311         // Whether new data is seen in the bucket.
312         // TODO, this could be per base in the dim extras.
313         bool seenNewData = false;
314 
315         // Last seen state value(s).
316         HashableDimensionKey currentState;
317         // Whether this dimensions in what key has a current state key.
318         bool hasCurrentState;
319     };
320 
321     // Tracks current state key and other information for each DimensionsInWhat key.
322     std::unordered_map<HashableDimensionKey, DimensionsInWhatInfo> mDimInfos;
323 
324     // Save the past buckets and we can clear when the StatsLogReport is dumped.
325     std::unordered_map<MetricDimensionKey, std::vector<PastBucket<AggregatedValue>>> mPastBuckets;
326 
327     const int64_t mMinBucketSizeNs;
328 
329     // Util function to check whether the specified dimension hits the guardrail.
330     bool hitGuardRailLocked(const MetricDimensionKey& newKey) const;
331 
332     bool hasReachedGuardRailLimit() const;
333 
pullAndMatchEventsLocked(const int64_t timestampNs)334     virtual void pullAndMatchEventsLocked(const int64_t timestampNs) {
335     }
336 
337     virtual bool multipleBucketsSkipped(const int64_t numBucketsForward) const = 0;
338 
339     virtual PastBucket<AggregatedValue> buildPartialBucket(int64_t bucketEndTime,
340                                                            std::vector<Interval>& intervals) = 0;
341 
342     virtual void closeCurrentBucket(const int64_t eventTimeNs, int64_t nextBucketStartTimeNs);
343 
344     virtual void initNextSlicedBucket(int64_t nextBucketStartTimeNs);
345 
346     // Updates the condition timers in the current sliced bucket when there is a
347     // condition change or an active state change.
348     void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs);
349 
350     virtual void writePastBucketAggregateToProto(const int aggIndex,
351                                                  const AggregatedValue& aggregate,
352                                                  const int sampleSize,
353                                                  ProtoOutputStream* const protoOutput) const = 0;
354 
355     static const size_t kBucketSize = sizeof(PastBucket<AggregatedValue>{});
356 
357     const size_t mDimensionSoftLimit;
358 
359     const size_t mDimensionHardLimit;
360 
361     // This is to track whether or not the bucket is skipped for any of the reasons listed in
362     // BucketDropReason, many of which make the bucket potentially invalid.
363     bool mCurrentBucketIsSkipped;
364 
365     /** Stores condition correction threshold from the ValueMetric configuration */
366     optional<int64_t> mConditionCorrectionThresholdNs;
367 
isEventLateLocked(const int64_t eventTimeNs)368     inline bool isEventLateLocked(const int64_t eventTimeNs) const {
369         return eventTimeNs < mCurrentBucketStartTimeNs;
370     }
371 
372     // Returns true if any of the intervals have seen new data.
373     // This should return true unless there is an error parsing the value fields from the event.
374     virtual bool aggregateFields(const int64_t eventTimeNs, const MetricDimensionKey& eventKey,
375                                  const LogEvent& event, std::vector<Interval>& intervals,
376                                  DimExtras& dimExtras) = 0;
377 
378     // If this is a pulled metric
isPulled()379     inline bool isPulled() const {
380         return mPullAtomId != -1;
381     }
382 
383 private:
384 };  // ValueMetricProducer
385 
386 }  // namespace statsd
387 }  // namespace os
388 }  // namespace android
389