1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "NumericValueMetricProducer.h"
21 
22 #include <limits.h>
23 #include <stdlib.h>
24 
25 #include "FieldValue.h"
26 #include "guardrail/StatsdStats.h"
27 #include "metrics/parsing_utils/metrics_manager_util.h"
28 #include "stats_log_util.h"
29 
30 using android::util::FIELD_COUNT_REPEATED;
31 using android::util::FIELD_TYPE_BOOL;
32 using android::util::FIELD_TYPE_DOUBLE;
33 using android::util::FIELD_TYPE_INT32;
34 using android::util::FIELD_TYPE_INT64;
35 using android::util::FIELD_TYPE_MESSAGE;
36 using android::util::FIELD_TYPE_STRING;
37 using android::util::ProtoOutputStream;
38 using std::optional;
39 using std::shared_ptr;
40 using std::string;
41 using std::unordered_map;
42 
43 namespace android {
44 namespace os {
45 namespace statsd {
46 
47 // for StatsLogReport
48 const int FIELD_ID_VALUE_METRICS = 7;
49 // for ValueBucketInfo
50 const int FIELD_ID_VALUE_INDEX = 1;
51 const int FIELD_ID_VALUE_LONG = 2;
52 const int FIELD_ID_VALUE_DOUBLE = 3;
53 const int FIELD_ID_VALUE_SAMPLESIZE = 4;
54 const int FIELD_ID_VALUES = 9;
55 const int FIELD_ID_BUCKET_NUM = 4;
56 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
57 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
58 const int FIELD_ID_CONDITION_TRUE_NS = 10;
59 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
60 
61 const Value ZERO_LONG((int64_t)0);
62 const Value ZERO_DOUBLE(0.0);
63 
64 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions,const wp<ConfigMetadataProvider> configMetadataProvider)65 NumericValueMetricProducer::NumericValueMetricProducer(
66         const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
67         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
68         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
69         const StateOptions& stateOptions, const ActivationOptions& activationOptions,
70         const GuardrailOptions& guardrailOptions,
71         const wp<ConfigMetadataProvider> configMetadataProvider)
72     : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
73                           conditionOptions, stateOptions, activationOptions, guardrailOptions,
74                           configMetadataProvider),
75       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
76       mAggregationTypes(whatOptions.aggregationTypes),
77       mIncludeSampleSize(metric.has_include_sample_size()
78                                  ? metric.include_sample_size()
79                                  : hasAvgAggregationType(whatOptions.aggregationTypes)),
80       mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
81       mValueDirection(metric.value_direction()),
82       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
83       mUseZeroDefaultBase(metric.use_zero_default_base()),
84       mHasGlobalBase(false),
85       mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
86                                                       : StatsdStats::kPullMaxDelayNs),
87       mDedupedFieldMatchers(dedupFieldMatchers(whatOptions.fieldMatchers)) {
88     // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
89     if (metric.has_threshold()) {
90         mUploadThreshold = metric.threshold();
91     }
92 }
93 
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)94 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
95                                                          const BucketDropReason reason) {
96     ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
97 
98     switch (reason) {
99         case BucketDropReason::DUMP_REPORT_REQUESTED:
100         case BucketDropReason::EVENT_IN_WRONG_BUCKET:
101         case BucketDropReason::CONDITION_UNKNOWN:
102         case BucketDropReason::PULL_FAILED:
103         case BucketDropReason::PULL_DELAYED:
104         case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
105             resetBase();
106             break;
107         default:
108             break;
109     }
110 }
111 
resetBase()112 void NumericValueMetricProducer::resetBase() {
113     for (auto& [_, dimInfo] : mDimInfos) {
114         for (optional<Value>& base : dimInfo.dimExtras) {
115             base.reset();
116         }
117     }
118     mHasGlobalBase = false;
119 }
120 
writePastBucketAggregateToProto(const int aggIndex,const Value & value,const int sampleSize,ProtoOutputStream * const protoOutput) const121 void NumericValueMetricProducer::writePastBucketAggregateToProto(
122         const int aggIndex, const Value& value, const int sampleSize,
123         ProtoOutputStream* const protoOutput) const {
124     uint64_t valueToken =
125             protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
126     protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
127     if (mIncludeSampleSize) {
128         protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
129     }
130     if (value.getType() == LONG) {
131         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value);
132         VLOG("\t\t value %d: %lld", aggIndex, (long long)value.long_value);
133     } else if (value.getType() == DOUBLE) {
134         protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value);
135         VLOG("\t\t value %d: %.2f", aggIndex, value.double_value);
136     } else {
137         VLOG("Wrong value type for ValueMetric output: %d", value.getType());
138     }
139     protoOutput->end(valueToken);
140 }
141 
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)142 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
143                                                                     const bool isActive) {
144     // When active state changes from true to false for pulled metric, clear diff base but don't
145     // reset other counters as we may accumulate more value in the bucket.
146     if (mUseDiff && !isActive) {
147         resetBase();
148     }
149 }
150 
151 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)152 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
153                                                                   const ConditionState newCondition,
154                                                                   const int64_t eventTimeNs) {
155     // For metrics that use diff, when condition changes from true to false,
156     // clear diff base but don't reset other counts because we may accumulate
157     // more value in the bucket.
158     if (mUseDiff &&
159         (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
160         resetBase();
161     }
162 }
163 
prepareFirstBucketLocked()164 void NumericValueMetricProducer::prepareFirstBucketLocked() {
165     // Kicks off the puller immediately if condition is true and diff based.
166     if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
167         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
168     }
169 }
170 
pullAndMatchEventsLocked(const int64_t timestampNs)171 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
172     vector<shared_ptr<LogEvent>> allData;
173     if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
174         ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
175         invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
176         return;
177     }
178 
179     accumulateEvents(allData, timestampNs, timestampNs);
180 }
181 
calcPreviousBucketEndTime(const int64_t currentTimeNs)182 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
183     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
184 }
185 
186 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
187 // to be delayed. Other events like condition changes or app upgrade which are not based on
188 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)189 void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
190                                               PullResult pullResult, int64_t originalPullTimeNs) {
191     lock_guard<mutex> lock(mMutex);
192     if (mCondition == ConditionState::kTrue) {
193         // If the pull failed, we won't be able to compute a diff.
194         if (pullResult == PullResult::PULL_RESULT_FAIL) {
195             invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
196         } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
197             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
198             if (isEventLate) {
199                 // If the event is late, we are in the middle of a bucket. Just
200                 // process the data without trying to snap the data to the nearest bucket.
201                 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
202             } else {
203                 // For scheduled pulled data, the effective event time is snap to the nearest
204                 // bucket end. In the case of waking up from a deep sleep state, we will
205                 // attribute to the previous bucket end. If the sleep was long but not very
206                 // long, we will be in the immediate next bucket. Previous bucket may get a
207                 // larger number as we pull at a later time than real bucket end.
208                 //
209                 // If the sleep was very long, we skip more than one bucket before sleep. In
210                 // this case, if the diff base will be cleared and this new data will serve as
211                 // new diff base.
212                 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
213                 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
214                         mMetricId, originalPullTimeNs - bucketEndTimeNs);
215                 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
216             }
217         }
218     }
219 
220     // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
221     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
222     flushIfNeededLocked(originalPullTimeNs);
223 }
224 
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const225 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
226                                                     const LogEvent& newEvent,
227                                                     const vector<int>& newValueIndices) const {
228     if (eventValues.second.size() != newValueIndices.size()) {
229         ALOGE("NumericValueMetricProducer value indices sizes don't match");
230         return;
231     }
232     vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
233     const vector<FieldValue>& newFieldValues = newEvent.getValues();
234     for (size_t i = 0; i < eventValues.second.size(); ++i) {
235         if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
236             (*aggregateFieldValues)[eventValues.second[i]].mValue +=
237                     newFieldValues[newValueIndices[i]].mValue;
238         }
239     }
240 }
241 
242 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)243 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
244                                                   int64_t originalPullTimeNs,
245                                                   int64_t eventElapsedTimeNs) {
246     if (isEventLateLocked(eventElapsedTimeNs)) {
247         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
248              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
249         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
250         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
251         return;
252     }
253 
254     const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
255     const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
256     StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
257     if (pullDelayNs > mMaxPullDelayNs) {
258         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
259               (long long)mMaxPullDelayNs);
260         StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
261         // We are missing one pull from the bucket which means we will not have a complete view of
262         // what's going on.
263         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
264         return;
265     }
266 
267     mMatchedMetricDimensionKeys.clear();
268     if (mUseDiff) {
269         // An extra aggregation step is needed to sum values with matching dimensions
270         // before calculating the diff between sums of consecutive pulls.
271         std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
272         for (const auto& data : allData) {
273             const auto [matchResult, transformedEvent] =
274                     mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
275             if (matchResult != MatchingState::kMatched) {
276                 continue;
277             }
278 
279             // Get dimensions_in_what key and value indices.
280             HashableDimensionKey dimensionsInWhat;
281             vector<int> valueIndices(mDedupedFieldMatchers.size(), -1);
282             const LogEvent& eventRef = transformedEvent == nullptr ? *data : *transformedEvent;
283             if (!filterValues(mDimensionsInWhat, mDedupedFieldMatchers, eventRef.getValues(),
284                               dimensionsInWhat, valueIndices)) {
285                 StatsdStats::getInstance().noteBadValueType(mMetricId);
286             }
287 
288             // Store new event in map or combine values in existing event.
289             auto it = aggregateEvents.find(dimensionsInWhat);
290             if (it == aggregateEvents.end()) {
291                 aggregateEvents.emplace(std::piecewise_construct,
292                                         std::forward_as_tuple(dimensionsInWhat),
293                                         std::forward_as_tuple(eventRef, valueIndices));
294             } else {
295                 combineValueFields(it->second, eventRef, valueIndices);
296             }
297         }
298 
299         for (auto& [dimKey, eventInfo] : aggregateEvents) {
300             eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
301             onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
302         }
303     } else {
304         for (const auto& data : allData) {
305             const auto [matchResult, transformedEvent] =
306                     mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
307             if (matchResult == MatchingState::kMatched) {
308                 LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
309                 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
310                 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
311             }
312         }
313     }
314 
315     // If a key that is:
316     // 1. Tracked in mCurrentSlicedBucket and
317     // 2. A superset of the current mStateChangePrimaryKey
318     // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
319     // then we clear the data from mDimInfos to reset the base and current state key.
320     for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
321         const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
322         bool presentInPulledData =
323                 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
324         if (!presentInPulledData &&
325             containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
326                                       mStateChangePrimaryKey.first)) {
327             auto it = mDimInfos.find(whatKey);
328             if (it != mDimInfos.end()) {
329                 mDimInfos.erase(it);
330             }
331             // Turn OFF condition timer for keys not present in pulled data.
332             currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
333         }
334     }
335     mMatchedMetricDimensionKeys.clear();
336     mHasGlobalBase = true;
337 
338     // If we reach the guardrail, we might have dropped some data which means the bucket is
339     // incomplete.
340     //
341     // The base also needs to be reset. If we do not have the full data, we might
342     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
343     // might be missing from mCurrentSlicedBucket.
344     if (hasReachedGuardRailLimit()) {
345         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
346         mCurrentSlicedBucket.clear();
347     }
348 }
349 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)350 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
351     // ===========GuardRail==============
352     // 1. Report the tuple count if the tuple count > soft limit
353     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
354         return false;
355     }
356     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
357         size_t newTupleCount = mCurrentFullBucket.size() + 1;
358         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
359         if (newTupleCount > mDimensionHardLimit) {
360             if (!mHasHitGuardrail) {
361                 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
362                       (long long)mMetricId, newKey.toString().c_str());
363                 mHasHitGuardrail = true;
364             }
365             return true;
366         }
367     }
368 
369     return false;
370 }
371 
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)372 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
373     for (const FieldValue& value : event.getValues()) {
374         if (value.mField.matches(matcher)) {
375             switch (value.mValue.type) {
376                 case INT:
377                     ret.setLong(value.mValue.int_value);
378                     break;
379                 case LONG:
380                     ret.setLong(value.mValue.long_value);
381                     break;
382                 case FLOAT:
383                     ret.setDouble(value.mValue.float_value);
384                     break;
385                 case DOUBLE:
386                     ret.setDouble(value.mValue.double_value);
387                     break;
388                 default:
389                     return false;
390                     break;
391             }
392             return true;
393         }
394     }
395     return false;
396 }
397 
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,ValueBases & bases)398 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
399                                                  const MetricDimensionKey& eventKey,
400                                                  const LogEvent& event, vector<Interval>& intervals,
401                                                  ValueBases& bases) {
402     if (bases.size() < mFieldMatchers.size()) {
403         VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
404         bases.resize(mFieldMatchers.size());
405     }
406 
407     // We only use anomaly detection under certain cases.
408     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
409     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
410     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
411     // Whoever next works on it should look into the cases where it is triggered in this function.
412     // Discussion here: http://ag/6124370.
413     bool useAnomalyDetection = true;
414     bool seenNewData = false;
415     for (size_t i = 0; i < mFieldMatchers.size(); i++) {
416         const Matcher& matcher = mFieldMatchers[i];
417         Interval& interval = intervals[i];
418         interval.aggIndex = i;
419         optional<Value>& base = bases[i];
420         Value value;
421         if (!getDoubleOrLong(event, matcher, value)) {
422             VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
423             StatsdStats::getInstance().noteBadValueType(mMetricId);
424             return seenNewData;
425         }
426         seenNewData = true;
427         if (mUseDiff) {
428             if (!base.has_value()) {
429                 if (mHasGlobalBase && mUseZeroDefaultBase) {
430                     // The bucket has global base. This key does not.
431                     // Optionally use zero as base.
432                     base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
433                 } else {
434                     // no base. just update base and return.
435                     base = value;
436 
437                     // If we're missing a base, do not use anomaly detection on incomplete data
438                     useAnomalyDetection = false;
439 
440                     // Continue (instead of return) here in order to set base value for other bases
441                     continue;
442                 }
443             }
444             Value diff;
445             switch (mValueDirection) {
446                 case ValueMetric::INCREASING:
447                     if (value >= base.value()) {
448                         diff = value - base.value();
449                     } else if (mUseAbsoluteValueOnReset) {
450                         diff = value;
451                     } else {
452                         VLOG("Unexpected decreasing value");
453                         StatsdStats::getInstance().notePullDataError(mPullAtomId);
454                         base = value;
455                         // If we've got bad data, do not use anomaly detection
456                         useAnomalyDetection = false;
457                         continue;
458                     }
459                     break;
460                 case ValueMetric::DECREASING:
461                     if (base.value() >= value) {
462                         diff = base.value() - value;
463                     } else if (mUseAbsoluteValueOnReset) {
464                         diff = value;
465                     } else {
466                         VLOG("Unexpected increasing value");
467                         StatsdStats::getInstance().notePullDataError(mPullAtomId);
468                         base = value;
469                         // If we've got bad data, do not use anomaly detection
470                         useAnomalyDetection = false;
471                         continue;
472                     }
473                     break;
474                 case ValueMetric::ANY:
475                     diff = value - base.value();
476                     break;
477                 default:
478                     break;
479             }
480             base = value;
481             value = diff;
482         }
483 
484         if (interval.hasValue()) {
485             switch (getAggregationTypeLocked(i)) {
486                 case ValueMetric::SUM:
487                     // for AVG, we add up and take average when flushing the bucket
488                 case ValueMetric::AVG:
489                     interval.aggregate += value;
490                     break;
491                 case ValueMetric::MIN:
492                     interval.aggregate = min(value, interval.aggregate);
493                     break;
494                 case ValueMetric::MAX:
495                     interval.aggregate = max(value, interval.aggregate);
496                     break;
497                 default:
498                     break;
499             }
500         } else {
501             interval.aggregate = value;
502         }
503         interval.sampleSize += 1;
504     }
505 
506     // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
507     // to MULTIPLE_BUCKETS_SKIPPED.
508     if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
509         // TODO: propgate proper values down stream when anomaly support doubles
510         long wholeBucketVal = intervals[0].aggregate.long_value;
511         auto prev = mCurrentFullBucket.find(eventKey);
512         if (prev != mCurrentFullBucket.end()) {
513             wholeBucketVal += prev->second;
514         }
515         for (auto& tracker : mAnomalyTrackers) {
516             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
517                                              wholeBucketVal);
518         }
519     }
520     return seenNewData;
521 }
522 
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)523 PastBucket<Value> NumericValueMetricProducer::buildPartialBucket(int64_t bucketEndTimeNs,
524                                                                  vector<Interval>& intervals) {
525     PastBucket<Value> bucket;
526     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
527     bucket.mBucketEndNs = bucketEndTimeNs;
528 
529     // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
530     // then all interval values are discarded for this bucket.
531     if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
532         return bucket;
533     }
534 
535     for (const Interval& interval : intervals) {
536         // skip the output if the diff is zero
537         if (!interval.hasValue() ||
538             (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
539             continue;
540         }
541 
542         bucket.aggIndex.push_back(interval.aggIndex);
543         bucket.aggregates.push_back(getFinalValue(interval));
544         if (mIncludeSampleSize) {
545             bucket.sampleSizes.push_back(interval.sampleSize);
546         }
547     }
548     return bucket;
549 }
550 
551 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)552 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
553                                                     const int64_t nextBucketStartTimeNs) {
554     ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
555     if (mAnomalyTrackers.size() > 0) {
556         appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
557     }
558 }
559 
initNextSlicedBucket(int64_t nextBucketStartTimeNs)560 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
561     ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
562 
563     // If we do not have a global base when the condition is true,
564     // we will have incomplete bucket for the next bucket.
565     if (mUseDiff && !mHasGlobalBase && mCondition) {
566         // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
567         mCurrentBucketIsSkipped = false;
568     }
569 }
570 
appendToFullBucket(const bool isFullBucketReached)571 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
572     if (mCurrentBucketIsSkipped) {
573         if (isFullBucketReached) {
574             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
575             mCurrentFullBucket.clear();
576         }
577         // Current bucket is invalid, we do not add it to the full bucket.
578         return;
579     }
580 
581     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
582         // Accumulate partial buckets with current value and then send to anomaly tracker.
583         if (mCurrentFullBucket.size() > 0) {
584             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
585                 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
586                     currentBucket.intervals.empty()) {
587                     continue;
588                 }
589                 // TODO: fix this when anomaly can accept double values
590                 auto& interval = currentBucket.intervals[0];
591                 if (interval.hasValue()) {
592                     mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
593                 }
594             }
595             for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
596                 for (auto& tracker : mAnomalyTrackers) {
597                     if (tracker != nullptr) {
598                         tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
599                     }
600                 }
601             }
602             mCurrentFullBucket.clear();
603         } else {
604             // Skip aggregating the partial buckets since there's no previous partial bucket.
605             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
606                 for (auto& tracker : mAnomalyTrackers) {
607                     if (tracker != nullptr && !currentBucket.intervals.empty()) {
608                         // TODO: fix this when anomaly can accept double values
609                         auto& interval = currentBucket.intervals[0];
610                         if (interval.hasValue()) {
611                             tracker->addPastBucket(metricDimensionKey,
612                                                    interval.aggregate.long_value,
613                                                    mCurrentBucketNum);
614                         }
615                     }
616                 }
617             }
618         }
619     } else {
620         // Accumulate partial bucket.
621         for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
622             if (!currentBucket.intervals.empty()) {
623                 // TODO: fix this when anomaly can accept double values
624                 auto& interval = currentBucket.intervals[0];
625                 if (interval.hasValue()) {
626                     mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
627                 }
628             }
629         }
630     }
631 }
632 
633 // Estimate for the size of NumericValues.
getAggregatedValueSize(const Value & value) const634 size_t NumericValueMetricProducer::getAggregatedValueSize(const Value& value) const {
635     size_t valueSize = 0;
636     // Index
637     valueSize += sizeof(int32_t);
638 
639     // Value
640     valueSize += value.getSize();
641 
642     // Sample Size
643     if (mIncludeSampleSize) {
644         valueSize += sizeof(int32_t);
645     }
646     return valueSize;
647 }
648 
byteSizeLocked() const649 size_t NumericValueMetricProducer::byteSizeLocked() const {
650     sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
651     if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
652         bool dimensionGuardrailHit = StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId);
653         return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
654                                          dimensionGuardrailHit) +
655                mTotalDataSize;
656     }
657     size_t totalSize = 0;
658     for (const auto& [_, buckets] : mPastBuckets) {
659         totalSize += buckets.size() * kBucketSize;
660         // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
661     }
662     return totalSize;
663 }
664 
valuePassesThreshold(const Interval & interval) const665 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
666     if (mUploadThreshold == nullopt) {
667         return true;
668     }
669 
670     Value finalValue = getFinalValue(interval);
671 
672     double doubleValue =
673             finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
674     switch (mUploadThreshold->value_comparison_case()) {
675         case UploadThreshold::kLtInt:
676             return doubleValue < (double)mUploadThreshold->lt_int();
677         case UploadThreshold::kGtInt:
678             return doubleValue > (double)mUploadThreshold->gt_int();
679         case UploadThreshold::kLteInt:
680             return doubleValue <= (double)mUploadThreshold->lte_int();
681         case UploadThreshold::kGteInt:
682             return doubleValue >= (double)mUploadThreshold->gte_int();
683         case UploadThreshold::kLtFloat:
684             return doubleValue <= (double)mUploadThreshold->lt_float();
685         case UploadThreshold::kGtFloat:
686             return doubleValue >= (double)mUploadThreshold->gt_float();
687         default:
688             ALOGE("Value metric no upload threshold type used");
689             return false;
690     }
691 }
692 
getFinalValue(const Interval & interval) const693 Value NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
694     if (getAggregationTypeLocked(interval.aggIndex) != ValueMetric::AVG) {
695         return interval.aggregate;
696     } else {
697         double sum = interval.aggregate.type == LONG ? (double)interval.aggregate.long_value
698                                                      : interval.aggregate.double_value;
699         return Value((double)sum / interval.sampleSize);
700     }
701 }
702 
getDumpProtoFields() const703 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
704     return {FIELD_ID_VALUE_METRICS,
705             FIELD_ID_BUCKET_NUM,
706             FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
707             FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
708             FIELD_ID_CONDITION_TRUE_NS,
709             FIELD_ID_CONDITION_CORRECTION_NS};
710 }
711 
712 }  // namespace statsd
713 }  // namespace os
714 }  // namespace android
715