1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "NumericValueMetricProducer.h"
21
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "FieldValue.h"
26 #include "guardrail/StatsdStats.h"
27 #include "metrics/parsing_utils/metrics_manager_util.h"
28 #include "stats_log_util.h"
29
30 using android::util::FIELD_COUNT_REPEATED;
31 using android::util::FIELD_TYPE_BOOL;
32 using android::util::FIELD_TYPE_DOUBLE;
33 using android::util::FIELD_TYPE_INT32;
34 using android::util::FIELD_TYPE_INT64;
35 using android::util::FIELD_TYPE_MESSAGE;
36 using android::util::FIELD_TYPE_STRING;
37 using android::util::ProtoOutputStream;
38 using std::optional;
39 using std::shared_ptr;
40 using std::string;
41 using std::unordered_map;
42
43 namespace android {
44 namespace os {
45 namespace statsd {
46
47 // for StatsLogReport
48 const int FIELD_ID_VALUE_METRICS = 7;
49 // for ValueBucketInfo
50 const int FIELD_ID_VALUE_INDEX = 1;
51 const int FIELD_ID_VALUE_LONG = 2;
52 const int FIELD_ID_VALUE_DOUBLE = 3;
53 const int FIELD_ID_VALUE_SAMPLESIZE = 4;
54 const int FIELD_ID_VALUES = 9;
55 const int FIELD_ID_BUCKET_NUM = 4;
56 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
57 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
58 const int FIELD_ID_CONDITION_TRUE_NS = 10;
59 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
60
61 const Value ZERO_LONG((int64_t)0);
62 const Value ZERO_DOUBLE(0.0);
63
64 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions,const wp<ConfigMetadataProvider> configMetadataProvider)65 NumericValueMetricProducer::NumericValueMetricProducer(
66 const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
67 const PullOptions& pullOptions, const BucketOptions& bucketOptions,
68 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
69 const StateOptions& stateOptions, const ActivationOptions& activationOptions,
70 const GuardrailOptions& guardrailOptions,
71 const wp<ConfigMetadataProvider> configMetadataProvider)
72 : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
73 conditionOptions, stateOptions, activationOptions, guardrailOptions,
74 configMetadataProvider),
75 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
76 mAggregationTypes(whatOptions.aggregationTypes),
77 mIncludeSampleSize(metric.has_include_sample_size()
78 ? metric.include_sample_size()
79 : hasAvgAggregationType(whatOptions.aggregationTypes)),
80 mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
81 mValueDirection(metric.value_direction()),
82 mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
83 mUseZeroDefaultBase(metric.use_zero_default_base()),
84 mHasGlobalBase(false),
85 mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
86 : StatsdStats::kPullMaxDelayNs),
87 mDedupedFieldMatchers(dedupFieldMatchers(whatOptions.fieldMatchers)) {
88 // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
89 if (metric.has_threshold()) {
90 mUploadThreshold = metric.threshold();
91 }
92 }
93
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)94 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
95 const BucketDropReason reason) {
96 ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
97
98 switch (reason) {
99 case BucketDropReason::DUMP_REPORT_REQUESTED:
100 case BucketDropReason::EVENT_IN_WRONG_BUCKET:
101 case BucketDropReason::CONDITION_UNKNOWN:
102 case BucketDropReason::PULL_FAILED:
103 case BucketDropReason::PULL_DELAYED:
104 case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
105 resetBase();
106 break;
107 default:
108 break;
109 }
110 }
111
resetBase()112 void NumericValueMetricProducer::resetBase() {
113 for (auto& [_, dimInfo] : mDimInfos) {
114 for (optional<Value>& base : dimInfo.dimExtras) {
115 base.reset();
116 }
117 }
118 mHasGlobalBase = false;
119 }
120
writePastBucketAggregateToProto(const int aggIndex,const Value & value,const int sampleSize,ProtoOutputStream * const protoOutput) const121 void NumericValueMetricProducer::writePastBucketAggregateToProto(
122 const int aggIndex, const Value& value, const int sampleSize,
123 ProtoOutputStream* const protoOutput) const {
124 uint64_t valueToken =
125 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
126 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
127 if (mIncludeSampleSize) {
128 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
129 }
130 if (value.getType() == LONG) {
131 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value);
132 VLOG("\t\t value %d: %lld", aggIndex, (long long)value.long_value);
133 } else if (value.getType() == DOUBLE) {
134 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value);
135 VLOG("\t\t value %d: %.2f", aggIndex, value.double_value);
136 } else {
137 VLOG("Wrong value type for ValueMetric output: %d", value.getType());
138 }
139 protoOutput->end(valueToken);
140 }
141
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)142 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
143 const bool isActive) {
144 // When active state changes from true to false for pulled metric, clear diff base but don't
145 // reset other counters as we may accumulate more value in the bucket.
146 if (mUseDiff && !isActive) {
147 resetBase();
148 }
149 }
150
151 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)152 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
153 const ConditionState newCondition,
154 const int64_t eventTimeNs) {
155 // For metrics that use diff, when condition changes from true to false,
156 // clear diff base but don't reset other counts because we may accumulate
157 // more value in the bucket.
158 if (mUseDiff &&
159 (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
160 resetBase();
161 }
162 }
163
prepareFirstBucketLocked()164 void NumericValueMetricProducer::prepareFirstBucketLocked() {
165 // Kicks off the puller immediately if condition is true and diff based.
166 if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
167 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
168 }
169 }
170
pullAndMatchEventsLocked(const int64_t timestampNs)171 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
172 vector<shared_ptr<LogEvent>> allData;
173 if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
174 ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
175 invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
176 return;
177 }
178
179 accumulateEvents(allData, timestampNs, timestampNs);
180 }
181
calcPreviousBucketEndTime(const int64_t currentTimeNs)182 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
183 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
184 }
185
186 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
187 // to be delayed. Other events like condition changes or app upgrade which are not based on
188 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)189 void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
190 PullResult pullResult, int64_t originalPullTimeNs) {
191 lock_guard<mutex> lock(mMutex);
192 if (mCondition == ConditionState::kTrue) {
193 // If the pull failed, we won't be able to compute a diff.
194 if (pullResult == PullResult::PULL_RESULT_FAIL) {
195 invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
196 } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
197 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
198 if (isEventLate) {
199 // If the event is late, we are in the middle of a bucket. Just
200 // process the data without trying to snap the data to the nearest bucket.
201 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
202 } else {
203 // For scheduled pulled data, the effective event time is snap to the nearest
204 // bucket end. In the case of waking up from a deep sleep state, we will
205 // attribute to the previous bucket end. If the sleep was long but not very
206 // long, we will be in the immediate next bucket. Previous bucket may get a
207 // larger number as we pull at a later time than real bucket end.
208 //
209 // If the sleep was very long, we skip more than one bucket before sleep. In
210 // this case, if the diff base will be cleared and this new data will serve as
211 // new diff base.
212 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
213 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
214 mMetricId, originalPullTimeNs - bucketEndTimeNs);
215 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
216 }
217 }
218 }
219
220 // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
221 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
222 flushIfNeededLocked(originalPullTimeNs);
223 }
224
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const225 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
226 const LogEvent& newEvent,
227 const vector<int>& newValueIndices) const {
228 if (eventValues.second.size() != newValueIndices.size()) {
229 ALOGE("NumericValueMetricProducer value indices sizes don't match");
230 return;
231 }
232 vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
233 const vector<FieldValue>& newFieldValues = newEvent.getValues();
234 for (size_t i = 0; i < eventValues.second.size(); ++i) {
235 if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
236 (*aggregateFieldValues)[eventValues.second[i]].mValue +=
237 newFieldValues[newValueIndices[i]].mValue;
238 }
239 }
240 }
241
242 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)243 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
244 int64_t originalPullTimeNs,
245 int64_t eventElapsedTimeNs) {
246 if (isEventLateLocked(eventElapsedTimeNs)) {
247 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
248 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
249 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
250 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
251 return;
252 }
253
254 const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
255 const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
256 StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
257 if (pullDelayNs > mMaxPullDelayNs) {
258 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
259 (long long)mMaxPullDelayNs);
260 StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
261 // We are missing one pull from the bucket which means we will not have a complete view of
262 // what's going on.
263 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
264 return;
265 }
266
267 mMatchedMetricDimensionKeys.clear();
268 if (mUseDiff) {
269 // An extra aggregation step is needed to sum values with matching dimensions
270 // before calculating the diff between sums of consecutive pulls.
271 std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
272 for (const auto& data : allData) {
273 const auto [matchResult, transformedEvent] =
274 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
275 if (matchResult != MatchingState::kMatched) {
276 continue;
277 }
278
279 // Get dimensions_in_what key and value indices.
280 HashableDimensionKey dimensionsInWhat;
281 vector<int> valueIndices(mDedupedFieldMatchers.size(), -1);
282 const LogEvent& eventRef = transformedEvent == nullptr ? *data : *transformedEvent;
283 if (!filterValues(mDimensionsInWhat, mDedupedFieldMatchers, eventRef.getValues(),
284 dimensionsInWhat, valueIndices)) {
285 StatsdStats::getInstance().noteBadValueType(mMetricId);
286 }
287
288 // Store new event in map or combine values in existing event.
289 auto it = aggregateEvents.find(dimensionsInWhat);
290 if (it == aggregateEvents.end()) {
291 aggregateEvents.emplace(std::piecewise_construct,
292 std::forward_as_tuple(dimensionsInWhat),
293 std::forward_as_tuple(eventRef, valueIndices));
294 } else {
295 combineValueFields(it->second, eventRef, valueIndices);
296 }
297 }
298
299 for (auto& [dimKey, eventInfo] : aggregateEvents) {
300 eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
301 onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
302 }
303 } else {
304 for (const auto& data : allData) {
305 const auto [matchResult, transformedEvent] =
306 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
307 if (matchResult == MatchingState::kMatched) {
308 LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
309 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
310 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
311 }
312 }
313 }
314
315 // If a key that is:
316 // 1. Tracked in mCurrentSlicedBucket and
317 // 2. A superset of the current mStateChangePrimaryKey
318 // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
319 // then we clear the data from mDimInfos to reset the base and current state key.
320 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
321 const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
322 bool presentInPulledData =
323 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
324 if (!presentInPulledData &&
325 containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
326 mStateChangePrimaryKey.first)) {
327 auto it = mDimInfos.find(whatKey);
328 if (it != mDimInfos.end()) {
329 mDimInfos.erase(it);
330 }
331 // Turn OFF condition timer for keys not present in pulled data.
332 currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
333 }
334 }
335 mMatchedMetricDimensionKeys.clear();
336 mHasGlobalBase = true;
337
338 // If we reach the guardrail, we might have dropped some data which means the bucket is
339 // incomplete.
340 //
341 // The base also needs to be reset. If we do not have the full data, we might
342 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
343 // might be missing from mCurrentSlicedBucket.
344 if (hasReachedGuardRailLimit()) {
345 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
346 mCurrentSlicedBucket.clear();
347 }
348 }
349
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)350 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
351 // ===========GuardRail==============
352 // 1. Report the tuple count if the tuple count > soft limit
353 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
354 return false;
355 }
356 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
357 size_t newTupleCount = mCurrentFullBucket.size() + 1;
358 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
359 if (newTupleCount > mDimensionHardLimit) {
360 if (!mHasHitGuardrail) {
361 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
362 (long long)mMetricId, newKey.toString().c_str());
363 mHasHitGuardrail = true;
364 }
365 return true;
366 }
367 }
368
369 return false;
370 }
371
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)372 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
373 for (const FieldValue& value : event.getValues()) {
374 if (value.mField.matches(matcher)) {
375 switch (value.mValue.type) {
376 case INT:
377 ret.setLong(value.mValue.int_value);
378 break;
379 case LONG:
380 ret.setLong(value.mValue.long_value);
381 break;
382 case FLOAT:
383 ret.setDouble(value.mValue.float_value);
384 break;
385 case DOUBLE:
386 ret.setDouble(value.mValue.double_value);
387 break;
388 default:
389 return false;
390 break;
391 }
392 return true;
393 }
394 }
395 return false;
396 }
397
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,ValueBases & bases)398 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
399 const MetricDimensionKey& eventKey,
400 const LogEvent& event, vector<Interval>& intervals,
401 ValueBases& bases) {
402 if (bases.size() < mFieldMatchers.size()) {
403 VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
404 bases.resize(mFieldMatchers.size());
405 }
406
407 // We only use anomaly detection under certain cases.
408 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
409 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
410 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
411 // Whoever next works on it should look into the cases where it is triggered in this function.
412 // Discussion here: http://ag/6124370.
413 bool useAnomalyDetection = true;
414 bool seenNewData = false;
415 for (size_t i = 0; i < mFieldMatchers.size(); i++) {
416 const Matcher& matcher = mFieldMatchers[i];
417 Interval& interval = intervals[i];
418 interval.aggIndex = i;
419 optional<Value>& base = bases[i];
420 Value value;
421 if (!getDoubleOrLong(event, matcher, value)) {
422 VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
423 StatsdStats::getInstance().noteBadValueType(mMetricId);
424 return seenNewData;
425 }
426 seenNewData = true;
427 if (mUseDiff) {
428 if (!base.has_value()) {
429 if (mHasGlobalBase && mUseZeroDefaultBase) {
430 // The bucket has global base. This key does not.
431 // Optionally use zero as base.
432 base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
433 } else {
434 // no base. just update base and return.
435 base = value;
436
437 // If we're missing a base, do not use anomaly detection on incomplete data
438 useAnomalyDetection = false;
439
440 // Continue (instead of return) here in order to set base value for other bases
441 continue;
442 }
443 }
444 Value diff;
445 switch (mValueDirection) {
446 case ValueMetric::INCREASING:
447 if (value >= base.value()) {
448 diff = value - base.value();
449 } else if (mUseAbsoluteValueOnReset) {
450 diff = value;
451 } else {
452 VLOG("Unexpected decreasing value");
453 StatsdStats::getInstance().notePullDataError(mPullAtomId);
454 base = value;
455 // If we've got bad data, do not use anomaly detection
456 useAnomalyDetection = false;
457 continue;
458 }
459 break;
460 case ValueMetric::DECREASING:
461 if (base.value() >= value) {
462 diff = base.value() - value;
463 } else if (mUseAbsoluteValueOnReset) {
464 diff = value;
465 } else {
466 VLOG("Unexpected increasing value");
467 StatsdStats::getInstance().notePullDataError(mPullAtomId);
468 base = value;
469 // If we've got bad data, do not use anomaly detection
470 useAnomalyDetection = false;
471 continue;
472 }
473 break;
474 case ValueMetric::ANY:
475 diff = value - base.value();
476 break;
477 default:
478 break;
479 }
480 base = value;
481 value = diff;
482 }
483
484 if (interval.hasValue()) {
485 switch (getAggregationTypeLocked(i)) {
486 case ValueMetric::SUM:
487 // for AVG, we add up and take average when flushing the bucket
488 case ValueMetric::AVG:
489 interval.aggregate += value;
490 break;
491 case ValueMetric::MIN:
492 interval.aggregate = min(value, interval.aggregate);
493 break;
494 case ValueMetric::MAX:
495 interval.aggregate = max(value, interval.aggregate);
496 break;
497 default:
498 break;
499 }
500 } else {
501 interval.aggregate = value;
502 }
503 interval.sampleSize += 1;
504 }
505
506 // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
507 // to MULTIPLE_BUCKETS_SKIPPED.
508 if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
509 // TODO: propgate proper values down stream when anomaly support doubles
510 long wholeBucketVal = intervals[0].aggregate.long_value;
511 auto prev = mCurrentFullBucket.find(eventKey);
512 if (prev != mCurrentFullBucket.end()) {
513 wholeBucketVal += prev->second;
514 }
515 for (auto& tracker : mAnomalyTrackers) {
516 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
517 wholeBucketVal);
518 }
519 }
520 return seenNewData;
521 }
522
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)523 PastBucket<Value> NumericValueMetricProducer::buildPartialBucket(int64_t bucketEndTimeNs,
524 vector<Interval>& intervals) {
525 PastBucket<Value> bucket;
526 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
527 bucket.mBucketEndNs = bucketEndTimeNs;
528
529 // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
530 // then all interval values are discarded for this bucket.
531 if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
532 return bucket;
533 }
534
535 for (const Interval& interval : intervals) {
536 // skip the output if the diff is zero
537 if (!interval.hasValue() ||
538 (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
539 continue;
540 }
541
542 bucket.aggIndex.push_back(interval.aggIndex);
543 bucket.aggregates.push_back(getFinalValue(interval));
544 if (mIncludeSampleSize) {
545 bucket.sampleSizes.push_back(interval.sampleSize);
546 }
547 }
548 return bucket;
549 }
550
551 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)552 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
553 const int64_t nextBucketStartTimeNs) {
554 ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
555 if (mAnomalyTrackers.size() > 0) {
556 appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
557 }
558 }
559
initNextSlicedBucket(int64_t nextBucketStartTimeNs)560 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
561 ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
562
563 // If we do not have a global base when the condition is true,
564 // we will have incomplete bucket for the next bucket.
565 if (mUseDiff && !mHasGlobalBase && mCondition) {
566 // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
567 mCurrentBucketIsSkipped = false;
568 }
569 }
570
appendToFullBucket(const bool isFullBucketReached)571 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
572 if (mCurrentBucketIsSkipped) {
573 if (isFullBucketReached) {
574 // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
575 mCurrentFullBucket.clear();
576 }
577 // Current bucket is invalid, we do not add it to the full bucket.
578 return;
579 }
580
581 if (isFullBucketReached) { // If full bucket, send to anomaly tracker.
582 // Accumulate partial buckets with current value and then send to anomaly tracker.
583 if (mCurrentFullBucket.size() > 0) {
584 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
585 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
586 currentBucket.intervals.empty()) {
587 continue;
588 }
589 // TODO: fix this when anomaly can accept double values
590 auto& interval = currentBucket.intervals[0];
591 if (interval.hasValue()) {
592 mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
593 }
594 }
595 for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
596 for (auto& tracker : mAnomalyTrackers) {
597 if (tracker != nullptr) {
598 tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
599 }
600 }
601 }
602 mCurrentFullBucket.clear();
603 } else {
604 // Skip aggregating the partial buckets since there's no previous partial bucket.
605 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
606 for (auto& tracker : mAnomalyTrackers) {
607 if (tracker != nullptr && !currentBucket.intervals.empty()) {
608 // TODO: fix this when anomaly can accept double values
609 auto& interval = currentBucket.intervals[0];
610 if (interval.hasValue()) {
611 tracker->addPastBucket(metricDimensionKey,
612 interval.aggregate.long_value,
613 mCurrentBucketNum);
614 }
615 }
616 }
617 }
618 }
619 } else {
620 // Accumulate partial bucket.
621 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
622 if (!currentBucket.intervals.empty()) {
623 // TODO: fix this when anomaly can accept double values
624 auto& interval = currentBucket.intervals[0];
625 if (interval.hasValue()) {
626 mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
627 }
628 }
629 }
630 }
631 }
632
633 // Estimate for the size of NumericValues.
getAggregatedValueSize(const Value & value) const634 size_t NumericValueMetricProducer::getAggregatedValueSize(const Value& value) const {
635 size_t valueSize = 0;
636 // Index
637 valueSize += sizeof(int32_t);
638
639 // Value
640 valueSize += value.getSize();
641
642 // Sample Size
643 if (mIncludeSampleSize) {
644 valueSize += sizeof(int32_t);
645 }
646 return valueSize;
647 }
648
byteSizeLocked() const649 size_t NumericValueMetricProducer::byteSizeLocked() const {
650 sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
651 if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
652 bool dimensionGuardrailHit = StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId);
653 return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
654 dimensionGuardrailHit) +
655 mTotalDataSize;
656 }
657 size_t totalSize = 0;
658 for (const auto& [_, buckets] : mPastBuckets) {
659 totalSize += buckets.size() * kBucketSize;
660 // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
661 }
662 return totalSize;
663 }
664
valuePassesThreshold(const Interval & interval) const665 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
666 if (mUploadThreshold == nullopt) {
667 return true;
668 }
669
670 Value finalValue = getFinalValue(interval);
671
672 double doubleValue =
673 finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
674 switch (mUploadThreshold->value_comparison_case()) {
675 case UploadThreshold::kLtInt:
676 return doubleValue < (double)mUploadThreshold->lt_int();
677 case UploadThreshold::kGtInt:
678 return doubleValue > (double)mUploadThreshold->gt_int();
679 case UploadThreshold::kLteInt:
680 return doubleValue <= (double)mUploadThreshold->lte_int();
681 case UploadThreshold::kGteInt:
682 return doubleValue >= (double)mUploadThreshold->gte_int();
683 case UploadThreshold::kLtFloat:
684 return doubleValue <= (double)mUploadThreshold->lt_float();
685 case UploadThreshold::kGtFloat:
686 return doubleValue >= (double)mUploadThreshold->gt_float();
687 default:
688 ALOGE("Value metric no upload threshold type used");
689 return false;
690 }
691 }
692
getFinalValue(const Interval & interval) const693 Value NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
694 if (getAggregationTypeLocked(interval.aggIndex) != ValueMetric::AVG) {
695 return interval.aggregate;
696 } else {
697 double sum = interval.aggregate.type == LONG ? (double)interval.aggregate.long_value
698 : interval.aggregate.double_value;
699 return Value((double)sum / interval.sampleSize);
700 }
701 }
702
getDumpProtoFields() const703 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
704 return {FIELD_ID_VALUE_METRICS,
705 FIELD_ID_BUCKET_NUM,
706 FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
707 FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
708 FIELD_ID_CONDITION_TRUE_NS,
709 FIELD_ID_CONDITION_CORRECTION_NS};
710 }
711
712 } // namespace statsd
713 } // namespace os
714 } // namespace android
715