• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "ValueMetricProducer.h"
21 
22 #include <limits.h>
23 #include <stdlib.h>
24 
25 #include "../guardrail/StatsdStats.h"
26 #include "../stats_log_util.h"
27 #include "metrics/parsing_utils/metrics_manager_util.h"
28 
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BOOL;
31 using android::util::FIELD_TYPE_DOUBLE;
32 using android::util::FIELD_TYPE_INT32;
33 using android::util::FIELD_TYPE_INT64;
34 using android::util::FIELD_TYPE_MESSAGE;
35 using android::util::FIELD_TYPE_STRING;
36 using android::util::ProtoOutputStream;
37 using std::map;
38 using std::shared_ptr;
39 using std::unordered_map;
40 
41 namespace android {
42 namespace os {
43 namespace statsd {
44 
45 // for StatsLogReport
46 const int FIELD_ID_ID = 1;
47 const int FIELD_ID_VALUE_METRICS = 7;
48 const int FIELD_ID_TIME_BASE = 9;
49 const int FIELD_ID_BUCKET_SIZE = 10;
50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
51 const int FIELD_ID_IS_ACTIVE = 14;
52 // for ValueMetricDataWrapper
53 const int FIELD_ID_DATA = 1;
54 const int FIELD_ID_SKIPPED = 2;
55 // for SkippedBuckets
56 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
57 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
58 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
59 // for DumpEvent Proto
60 const int FIELD_ID_BUCKET_DROP_REASON = 1;
61 const int FIELD_ID_DROP_TIME = 2;
62 // for ValueMetricData
63 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
64 const int FIELD_ID_BUCKET_INFO = 3;
65 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
66 const int FIELD_ID_SLICE_BY_STATE = 6;
67 // for ValueBucketInfo
68 const int FIELD_ID_VALUE_INDEX = 1;
69 const int FIELD_ID_VALUE_LONG = 2;
70 const int FIELD_ID_VALUE_DOUBLE = 3;
71 const int FIELD_ID_VALUES = 9;
72 const int FIELD_ID_BUCKET_NUM = 4;
73 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
74 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
75 const int FIELD_ID_CONDITION_TRUE_NS = 10;
76 
77 const Value ZERO_LONG((int64_t)0);
78 const Value ZERO_DOUBLE((int64_t)0);
79 
80 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
81 ValueMetricProducer::ValueMetricProducer(
82         const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
83         const vector<ConditionState>& initialConditionCache,
84         const sp<ConditionWizard>& conditionWizard, const uint64_t protoHash,
85         const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard,
86         const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs,
87         const sp<StatsPullerManager>& pullerManager,
88         const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
89         const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap,
90         const vector<int>& slicedStateAtoms,
91         const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap)
92     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache,
93                      conditionWizard, protoHash, eventActivationMap, eventDeactivationMap,
94                      slicedStateAtoms, stateGroupMap),
95       mWhatMatcherIndex(whatMatcherIndex),
96       mEventMatcherWizard(matcherWizard),
97       mPullerManager(pullerManager),
98       mPullTagId(pullTagId),
99       mIsPulled(pullTagId != -1),
100       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
101       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
102                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
103                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
104                                   : StatsdStats::kDimensionKeySizeSoftLimit),
105       mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
106                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
107                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
108                                   : StatsdStats::kDimensionKeySizeHardLimit),
109       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
110       mAggregationType(metric.aggregation_type()),
111       mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
112       mValueDirection(metric.value_direction()),
113       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
114       mUseZeroDefaultBase(metric.use_zero_default_base()),
115       mHasGlobalBase(false),
116       mCurrentBucketIsSkipped(false),
117       mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
118                                                       : StatsdStats::kPullMaxDelayNs),
119       mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
120       // Condition timer will be set later within the constructor after pulling events
121       mConditionTimer(false, timeBaseNs) {
122     int64_t bucketSizeMills = 0;
123     if (metric.has_bucket()) {
124         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
125     } else {
126         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
127     }
128 
129     mBucketSizeNs = bucketSizeMills * 1000000;
130 
131     translateFieldMatcher(metric.value_field(), &mFieldMatchers);
132 
133     if (metric.has_dimensions_in_what()) {
134         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
135         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
136         mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what());
137     }
138 
139     if (metric.links().size() > 0) {
140         for (const auto& link : metric.links()) {
141             Metric2Condition mc;
142             mc.conditionId = link.condition();
143             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
144             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
145             mMetric2ConditionLinks.push_back(mc);
146         }
147         mConditionSliced = true;
148     }
149 
150     for (const auto& stateLink : metric.state_link()) {
151         Metric2State ms;
152         ms.stateAtomId = stateLink.state_atom_id();
153         translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
154         translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
155         mMetric2StateLinks.push_back(ms);
156     }
157 
158     if (metric.has_threshold()) {
159         mUploadThreshold = metric.threshold();
160     }
161 
162     int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
163     mCurrentBucketNum += numBucketsForward;
164 
165     flushIfNeededLocked(startTimeNs);
166 
167     if (mIsPulled) {
168         mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
169                                          mBucketSizeNs);
170     }
171 
172     // Only do this for partial buckets like first bucket. All other buckets should use
173     // flushIfNeeded to adjust start and end to bucket boundaries.
174     // Adjust start for partial bucket
175     mCurrentBucketStartTimeNs = startTimeNs;
176     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
177 
178     // Now that activations are processed, start the condition timer if needed.
179     mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
180                                        mCurrentBucketStartTimeNs);
181 
182     VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
183          (long long)mBucketSizeNs, (long long)mTimeBaseNs);
184 }
185 
186 ValueMetricProducer::~ValueMetricProducer() {
187     VLOG("~ValueMetricProducer() called");
188     if (mIsPulled) {
189         mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
190     }
191 }
192 
193 bool ValueMetricProducer::onConfigUpdatedLocked(
194         const StatsdConfig& config, const int configIndex, const int metricIndex,
195         const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
196         const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
197         const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
198         const sp<EventMatcherWizard>& matcherWizard,
199         const vector<sp<ConditionTracker>>& allConditionTrackers,
200         const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
201         const unordered_map<int64_t, int>& metricToActivationMap,
202         unordered_map<int, vector<int>>& trackerToMetricMap,
203         unordered_map<int, vector<int>>& conditionToMetricMap,
204         unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
205         unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
206         vector<int>& metricsWithActivation) {
207     if (!MetricProducer::onConfigUpdatedLocked(
208                 config, configIndex, metricIndex, allAtomMatchingTrackers,
209                 oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard,
210                 allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap,
211                 trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap,
212                 deactivationAtomTrackerToMetricMap, metricsWithActivation)) {
213         return false;
214     }
215 
216     const ValueMetric& metric = config.value_metric(configIndex);
217     // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
218     if (!handleMetricWithAtomMatchingTrackers(metric.what(), metricIndex, /*enforceOneAtom=*/false,
219                                               allAtomMatchingTrackers, newAtomMatchingTrackerMap,
220                                               trackerToMetricMap, mWhatMatcherIndex)) {
221         return false;
222     }
223 
224     if (metric.has_condition() &&
225         !handleMetricWithConditions(metric.condition(), metricIndex, conditionTrackerMap,
226                                     metric.links(), allConditionTrackers, mConditionTrackerIndex,
227                                     conditionToMetricMap)) {
228         return false;
229     }
230     sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
231     mEventMatcherWizard = matcherWizard;
232     return true;
233 }
234 
235 void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId,
236                                          const HashableDimensionKey& primaryKey,
237                                          const FieldValue& oldState, const FieldValue& newState) {
238     VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
239          (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
240          oldState.mValue.int_value, newState.mValue.int_value);
241 
242     // If old and new states are in the same StateGroup, then we do not need to
243     // pull for this state change.
244     FieldValue oldStateCopy = oldState;
245     FieldValue newStateCopy = newState;
246     mapStateValue(atomId, &oldStateCopy);
247     mapStateValue(atomId, &newStateCopy);
248     if (oldStateCopy == newStateCopy) {
249         return;
250     }
251 
252     // If condition is not true or metric is not active, we do not need to pull
253     // for this state change.
254     if (mCondition != ConditionState::kTrue || !mIsActive) {
255         return;
256     }
257 
258     bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs;
259     if (isEventLate) {
260         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
261              (long long)mCurrentBucketStartTimeNs);
262         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
263         return;
264     }
265     mStateChangePrimaryKey.first = atomId;
266     mStateChangePrimaryKey.second = primaryKey;
267     if (mIsPulled) {
268         pullAndMatchEventsLocked(eventTimeNs);
269     }
270     mStateChangePrimaryKey.first = 0;
271     mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
272     flushIfNeededLocked(eventTimeNs);
273 }
274 
275 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
276                                                            const int64_t eventTime) {
277     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
278 }
279 
280 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
281     StatsdStats::getInstance().noteBucketDropped(mMetricId);
282 
283     // The current partial bucket is not flushed and does not require a pull,
284     // so the data is still valid.
285     flushIfNeededLocked(dropTimeNs);
286     clearPastBucketsLocked(dropTimeNs);
287 }
288 
289 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
290     mPastBuckets.clear();
291     mSkippedBuckets.clear();
292 }
293 
294 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
295                                              const bool include_current_partial_bucket,
296                                              const bool erase_data,
297                                              const DumpLatency dumpLatency,
298                                              std::set<string> *str_set,
299                                              ProtoOutputStream* protoOutput) {
300     VLOG("metric %lld dump report now...", (long long)mMetricId);
301     if (include_current_partial_bucket) {
302         // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
303         // current bucket will have incomplete data and the next will have the wrong snapshot to do
304         // a diff against. If the condition is false, we are fine since the base data is reset and
305         // we are not tracking anything.
306         bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
307         if (pullNeeded) {
308             switch (dumpLatency) {
309                 case FAST:
310                     invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
311                     break;
312                 case NO_TIME_CONSTRAINTS:
313                     pullAndMatchEventsLocked(dumpTimeNs);
314                     break;
315             }
316         }
317         flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
318     }
319     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
320     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
321 
322     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
323         return;
324     }
325     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
326     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
327     // Fills the dimension path if not slicing by ALL.
328     if (!mSliceByPositionALL) {
329         if (!mDimensionsInWhat.empty()) {
330             uint64_t dimenPathToken =
331                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
332             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
333             protoOutput->end(dimenPathToken);
334         }
335     }
336 
337     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
338 
339     for (const auto& skippedBucket : mSkippedBuckets) {
340         uint64_t wrapperToken =
341                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
342         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
343                            (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
344         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
345                            (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
346         for (const auto& dropEvent : skippedBucket.dropEvents) {
347             uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
348                                                          FIELD_ID_SKIPPED_DROP_EVENT);
349             protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
350             protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
351                                (long long)(NanoToMillis(dropEvent.dropTimeNs)));
352             protoOutput->end(dropEventToken);
353         }
354         protoOutput->end(wrapperToken);
355     }
356 
357     for (const auto& pair : mPastBuckets) {
358         const MetricDimensionKey& dimensionKey = pair.first;
359         VLOG("  dimension key %s", dimensionKey.toString().c_str());
360         uint64_t wrapperToken =
361                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
362 
363         // First fill dimension.
364         if (mSliceByPositionALL) {
365             uint64_t dimensionToken =
366                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
367             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
368             protoOutput->end(dimensionToken);
369         } else {
370             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
371                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
372         }
373 
374         // Then fill slice_by_state.
375         for (auto state : dimensionKey.getStateValuesKey().getValues()) {
376             uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
377                                                      FIELD_ID_SLICE_BY_STATE);
378             writeStateToProto(state, protoOutput);
379             protoOutput->end(stateToken);
380         }
381 
382         // Then fill bucket_info (ValueBucketInfo).
383         for (const auto& bucket : pair.second) {
384             uint64_t bucketInfoToken = protoOutput->start(
385                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
386 
387             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
388                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
389                                    (long long)NanoToMillis(bucket.mBucketStartNs));
390                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
391                                    (long long)NanoToMillis(bucket.mBucketEndNs));
392             } else {
393                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
394                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
395             }
396             // We only write the condition timer value if the metric has a
397             // condition and/or is sliced by state.
398             // If the metric is sliced by state, the condition timer value is
399             // also sliced by state to reflect time spent in that state.
400             if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) {
401                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
402                                    (long long)bucket.mConditionTrueNs);
403             }
404             for (int i = 0; i < (int)bucket.valueIndex.size(); i++) {
405                 int index = bucket.valueIndex[i];
406                 const Value& value = bucket.values[i];
407                 uint64_t valueToken = protoOutput->start(
408                         FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
409                 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
410                                    index);
411                 if (value.getType() == LONG) {
412                     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
413                                        (long long)value.long_value);
414                     VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
415                          (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
416                 } else if (value.getType() == DOUBLE) {
417                     protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
418                                        value.double_value);
419                     VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
420                          (long long)bucket.mBucketEndNs, index, value.double_value);
421                 } else {
422                     VLOG("Wrong value type for ValueMetric output: %d", value.getType());
423                 }
424                 protoOutput->end(valueToken);
425             }
426             protoOutput->end(bucketInfoToken);
427         }
428         protoOutput->end(wrapperToken);
429     }
430     protoOutput->end(protoToken);
431 
432     VLOG("metric %lld done with dump report...", (long long)mMetricId);
433     if (erase_data) {
434         mPastBuckets.clear();
435         mSkippedBuckets.clear();
436     }
437 }
438 
439 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
440                                                                   const BucketDropReason reason) {
441     if (!mCurrentBucketIsSkipped) {
442         // Only report to StatsdStats once per invalid bucket.
443         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
444     }
445 
446     skipCurrentBucket(dropTimeNs, reason);
447 }
448 
449 void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
450                                                   const BucketDropReason reason) {
451     invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason);
452     resetBase();
453 }
454 
455 void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs,
456                                             const BucketDropReason reason) {
457     if (!maxDropEventsReached()) {
458         mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason));
459     }
460     mCurrentBucketIsSkipped = true;
461 }
462 
463 void ValueMetricProducer::resetBase() {
464     for (auto& slice : mCurrentBaseInfo) {
465         for (auto& baseInfo : slice.second.baseInfos) {
466             baseInfo.hasBase = false;
467         }
468     }
469     mHasGlobalBase = false;
470 }
471 
472 // Handle active state change. Active state change is treated like a condition change:
473 // - drop bucket if active state change event arrives too late
474 // - if condition is true, pull data on active state changes
475 // - ConditionTimer tracks changes based on AND of condition and active state.
476 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
477     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
478     if (isEventTooLate) {
479         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
480         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
481         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
482     }
483 
484     // Call parent method once we've verified the validity of current bucket.
485     MetricProducer::onActiveStateChangedLocked(eventTimeNs);
486 
487     if (ConditionState::kTrue != mCondition) {
488         return;
489     }
490 
491     // Pull on active state changes.
492     if (!isEventTooLate) {
493         if (mIsPulled) {
494             pullAndMatchEventsLocked(eventTimeNs);
495         }
496         // When active state changes from true to false, clear diff base but don't
497         // reset other counters as we may accumulate more value in the bucket.
498         if (mUseDiff && !mIsActive) {
499             resetBase();
500         }
501     }
502 
503     flushIfNeededLocked(eventTimeNs);
504 
505     // Let condition timer know of new active state.
506     mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
507 
508     updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs);
509 }
510 
511 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
512                                                    const int64_t eventTimeNs) {
513     ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
514     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
515 
516     // If the config is not active, skip the event.
517     if (!mIsActive) {
518         mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition;
519         return;
520     }
521 
522     // If the event arrived late, mark the bucket as invalid and skip the event.
523     if (isEventTooLate) {
524         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
525              (long long)mCurrentBucketStartTimeNs);
526         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
527         StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
528         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
529         mCondition = ConditionState::kUnknown;
530         mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
531 
532         updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs);
533         return;
534     }
535 
536     // If the previous condition was unknown, mark the bucket as invalid
537     // because the bucket will contain partial data. For example, the condition
538     // change might happen close to the end of the bucket and we might miss a
539     // lot of data.
540     //
541     // We still want to pull to set the base.
542     if (mCondition == ConditionState::kUnknown) {
543         invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
544     }
545 
546     // Pull and match for the following condition change cases:
547     // unknown/false -> true - condition changed
548     // true -> false - condition changed
549     // true -> true - old condition was true so we can flush the bucket at the
550     // end if needed.
551     //
552     // We don’t need to pull for unknown -> false or false -> false.
553     //
554     // onConditionChangedLocked might happen on bucket boundaries if this is
555     // called before #onDataPulled.
556     if (mIsPulled &&
557         (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) {
558         pullAndMatchEventsLocked(eventTimeNs);
559     }
560 
561     // For metrics that use diff, when condition changes from true to false,
562     // clear diff base but don't reset other counts because we may accumulate
563     // more value in the bucket.
564     if (mUseDiff &&
565         (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
566         resetBase();
567     }
568 
569     // Update condition state after pulling.
570     mCondition = newCondition;
571 
572     flushIfNeededLocked(eventTimeNs);
573     mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
574 
575     updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs);
576 }
577 
578 void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers(bool newCondition,
579                                                                    int64_t eventTimeNs) {
580     if (mSlicedStateAtoms.empty()) {
581         return;
582     }
583 
584     // Utilize the current state key of each DimensionsInWhat key to determine
585     // which condition timers to update.
586     //
587     // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`.
588     bool inPulledData;
589     for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mCurrentBaseInfo) {
590         // If the new condition is true, turn ON the condition timer only if
591         // the DimensionInWhat key was present in the pulled data.
592         inPulledData = dimensionInWhatInfo.hasCurrentState;
593         mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey,
594                                                 dimensionInWhatInfo.currentState)]
595                 .conditionTimer.onConditionChanged(newCondition && inPulledData, eventTimeNs);
596     }
597 }
598 
599 void ValueMetricProducer::prepareFirstBucketLocked() {
600     // Kicks off the puller immediately if condition is true and diff based.
601     if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
602         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
603     }
604 }
605 
606 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
607     vector<std::shared_ptr<LogEvent>> allData;
608     if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
609         ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
610         invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
611         return;
612     }
613 
614     accumulateEvents(allData, timestampNs, timestampNs);
615 }
616 
617 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
618     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
619 }
620 
621 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
622 // to be delayed. Other events like condition changes or app upgrade which are not based on
623 // AlarmManager might have arrived earlier and close the bucket.
624 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
625                                        bool pullSuccess, int64_t originalPullTimeNs) {
626     std::lock_guard<std::mutex> lock(mMutex);
627     if (mCondition == ConditionState::kTrue) {
628         // If the pull failed, we won't be able to compute a diff.
629         if (!pullSuccess) {
630             invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
631         } else {
632             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
633             if (isEventLate) {
634                 // If the event is late, we are in the middle of a bucket. Just
635                 // process the data without trying to snap the data to the nearest bucket.
636                 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
637             } else {
638                 // For scheduled pulled data, the effective event time is snap to the nearest
639                 // bucket end. In the case of waking up from a deep sleep state, we will
640                 // attribute to the previous bucket end. If the sleep was long but not very
641                 // long, we will be in the immediate next bucket. Previous bucket may get a
642                 // larger number as we pull at a later time than real bucket end.
643                 //
644                 // If the sleep was very long, we skip more than one bucket before sleep. In
645                 // this case, if the diff base will be cleared and this new data will serve as
646                 // new diff base.
647                 int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
648                 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
649                         mMetricId, originalPullTimeNs - bucketEndTime);
650                 accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
651             }
652         }
653     }
654 
655     // We can probably flush the bucket. Since we used bucketEndTime when calling
656     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
657     flushIfNeededLocked(originalPullTimeNs);
658 }
659 
660 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
661                                            int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
662     bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
663     if (isEventLate) {
664         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
665              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
666         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
667         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
668         return;
669     }
670 
671     const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
672     const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
673     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
674     if (pullDelayNs > mMaxPullDelayNs) {
675         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
676               (long long)mMaxPullDelayNs);
677         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
678         // We are missing one pull from the bucket which means we will not have a complete view of
679         // what's going on.
680         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
681         return;
682     }
683 
684     mMatchedMetricDimensionKeys.clear();
685     for (const auto& data : allData) {
686         LogEvent localCopy = data->makeCopy();
687         if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
688             MatchingState::kMatched) {
689             localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
690             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
691         }
692     }
693     // If a key that is:
694     // 1. Tracked in mCurrentSlicedBucket and
695     // 2. A superset of the current mStateChangePrimaryKey
696     // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
697     // then we need to reset the base.
698     for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
699         const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
700         bool presentInPulledData =
701                 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
702         if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) {
703             auto it = mCurrentBaseInfo.find(whatKey);
704             for (auto& baseInfo : it->second.baseInfos) {
705                 baseInfo.hasBase = false;
706             }
707             // Set to false when DimensionInWhat key is not present in a pull.
708             // Used in onMatchedLogEventInternalLocked() to ensure the condition
709             // timer is turned on the next pull when data is present.
710             it->second.hasCurrentState = false;
711             // Turn OFF condition timer for keys not present in pulled data.
712             currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
713         }
714     }
715     mMatchedMetricDimensionKeys.clear();
716     mHasGlobalBase = true;
717 
718     // If we reach the guardrail, we might have dropped some data which means the bucket is
719     // incomplete.
720     //
721     // The base also needs to be reset. If we do not have the full data, we might
722     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
723     // might be missing from mCurrentSlicedBucket.
724     if (hasReachedGuardRailLimit()) {
725         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
726         mCurrentSlicedBucket.clear();
727     }
728 }
729 
730 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
731     if (mCurrentSlicedBucket.size() == 0) {
732         return;
733     }
734 
735     fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId,
736             (unsigned long)mCurrentSlicedBucket.size());
737     if (verbose) {
738         for (const auto& it : mCurrentSlicedBucket) {
739           for (const auto& interval : it.second.intervals) {
740               fprintf(out, "\t(what)%s\t(states)%s  (value)%s\n",
741                       it.first.getDimensionKeyInWhat().toString().c_str(),
742                       it.first.getStateValuesKey().toString().c_str(),
743                       interval.value.toString().c_str());
744           }
745         }
746     }
747 }
748 
749 bool ValueMetricProducer::hasReachedGuardRailLimit() const {
750     return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
751 }
752 
753 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
754     // ===========GuardRail==============
755     // 1. Report the tuple count if the tuple count > soft limit
756     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
757         return false;
758     }
759     if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
760         size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
761         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
762         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
763         if (hasReachedGuardRailLimit()) {
764             ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
765                   newKey.toString().c_str());
766             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
767             return true;
768         }
769     }
770 
771     return false;
772 }
773 
774 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
775     // ===========GuardRail==============
776     // 1. Report the tuple count if the tuple count > soft limit
777     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
778         return false;
779     }
780     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
781         size_t newTupleCount = mCurrentFullBucket.size() + 1;
782         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
783         if (newTupleCount > mDimensionHardLimit) {
784             ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
785                   (long long)mMetricId,
786                   newKey.toString().c_str());
787             return true;
788         }
789     }
790 
791     return false;
792 }
793 
794 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
795     for (const FieldValue& value : event.getValues()) {
796         if (value.mField.matches(matcher)) {
797             switch (value.mValue.type) {
798                 case INT:
799                     ret.setLong(value.mValue.int_value);
800                     break;
801                 case LONG:
802                     ret.setLong(value.mValue.long_value);
803                     break;
804                 case FLOAT:
805                     ret.setDouble(value.mValue.float_value);
806                     break;
807                 case DOUBLE:
808                     ret.setDouble(value.mValue.double_value);
809                     break;
810                 default:
811                     return false;
812                     break;
813             }
814             return true;
815         }
816     }
817     return false;
818 }
819 
820 bool ValueMetricProducer::multipleBucketsSkipped(const int64_t numBucketsForward) {
821     // Skip buckets if this is a pulled metric or a pushed metric that is diffed.
822     return numBucketsForward > 1 && (mIsPulled || mUseDiff);
823 }
824 
825 void ValueMetricProducer::onMatchedLogEventInternalLocked(
826         const size_t matcherIndex, const MetricDimensionKey& eventKey,
827         const ConditionKey& conditionKey, bool condition, const LogEvent& event,
828         const map<int, HashableDimensionKey>& statePrimaryKeys) {
829     auto whatKey = eventKey.getDimensionKeyInWhat();
830     auto stateKey = eventKey.getStateValuesKey();
831 
832     // Skip this event if a state changed occurred for a different primary key.
833     auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
834     // Check that both the atom id and the primary key are equal.
835     if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
836         VLOG("ValueMetric skip event with primary key %s because state change primary key "
837              "is %s",
838              it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
839         return;
840     }
841 
842     int64_t eventTimeNs = event.GetElapsedTimestampNs();
843     if (eventTimeNs < mCurrentBucketStartTimeNs) {
844         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
845              (long long)mCurrentBucketStartTimeNs);
846         return;
847     }
848     mMatchedMetricDimensionKeys.insert(whatKey);
849 
850     if (!mIsPulled) {
851         // We cannot flush without doing a pull first.
852         flushIfNeededLocked(eventTimeNs);
853     }
854 
855     // We should not accumulate the data for pushed metrics when the condition is false.
856     bool shouldSkipForPushMetric = !mIsPulled && !condition;
857     // For pulled metrics, there are two cases:
858     // - to compute diffs, we need to process all the state changes
859     // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a
860     // state change from
861     //     + True -> True: we should process the data, it might be a bucket boundary
862     //     + True -> False: we als need to process the data.
863     bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
864             && mCondition != ConditionState::kTrue;
865     if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
866         VLOG("ValueMetric skip event because condition is false and we are not using diff (for "
867              "pulled metric)");
868         return;
869     }
870 
871     if (hitGuardRailLocked(eventKey)) {
872         return;
873     }
874 
875     const auto& returnVal =
876             mCurrentBaseInfo.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey()));
877     DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second;
878     const HashableDimensionKey oldStateKey = dimensionsInWhatInfo.currentState;
879     vector<BaseInfo>& baseInfos = dimensionsInWhatInfo.baseInfos;
880     if (baseInfos.size() < mFieldMatchers.size()) {
881         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
882         baseInfos.resize(mFieldMatchers.size());
883     }
884 
885     // Ensure we turn on the condition timer in the case where dimensions
886     // were missing on a previous pull due to a state change.
887     bool stateChange = oldStateKey != stateKey;
888     if (!dimensionsInWhatInfo.hasCurrentState) {
889         stateChange = true;
890         dimensionsInWhatInfo.hasCurrentState = true;
891     }
892 
893     // We need to get the intervals stored with the previous state key so we can
894     // close these value intervals.
895     vector<Interval>& intervals =
896             mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)].intervals;
897     if (intervals.size() < mFieldMatchers.size()) {
898         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
899         intervals.resize(mFieldMatchers.size());
900     }
901 
902     // We only use anomaly detection under certain cases.
903     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
904     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
905     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
906     // Whoever next works on it should look into the cases where it is triggered in this function.
907     // Discussion here: http://ag/6124370.
908     bool useAnomalyDetection = true;
909 
910     dimensionsInWhatInfo.hasCurrentState = true;
911     dimensionsInWhatInfo.currentState = stateKey;
912     for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
913         const Matcher& matcher = mFieldMatchers[i];
914         BaseInfo& baseInfo = baseInfos[i];
915         Interval& interval = intervals[i];
916         interval.valueIndex = i;
917         Value value;
918         if (!getDoubleOrLong(event, matcher, value)) {
919             VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
920             StatsdStats::getInstance().noteBadValueType(mMetricId);
921             return;
922         }
923         interval.seenNewData = true;
924 
925         if (mUseDiff) {
926             if (!baseInfo.hasBase) {
927                 if (mHasGlobalBase && mUseZeroDefaultBase) {
928                     // The bucket has global base. This key does not.
929                     // Optionally use zero as base.
930                     baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
931                     baseInfo.hasBase = true;
932                 } else {
933                     // no base. just update base and return.
934                     baseInfo.base = value;
935                     baseInfo.hasBase = true;
936                     // If we're missing a base, do not use anomaly detection on incomplete data
937                     useAnomalyDetection = false;
938                     // Continue (instead of return) here in order to set baseInfo.base and
939                     // baseInfo.hasBase for other baseInfos
940                     continue;
941                 }
942             }
943 
944             Value diff;
945             switch (mValueDirection) {
946                 case ValueMetric::INCREASING:
947                     if (value >= baseInfo.base) {
948                         diff = value - baseInfo.base;
949                     } else if (mUseAbsoluteValueOnReset) {
950                         diff = value;
951                     } else {
952                         VLOG("Unexpected decreasing value");
953                         StatsdStats::getInstance().notePullDataError(mPullTagId);
954                         baseInfo.base = value;
955                         // If we've got bad data, do not use anomaly detection
956                         useAnomalyDetection = false;
957                         continue;
958                     }
959                     break;
960                 case ValueMetric::DECREASING:
961                     if (baseInfo.base >= value) {
962                         diff = baseInfo.base - value;
963                     } else if (mUseAbsoluteValueOnReset) {
964                         diff = value;
965                     } else {
966                         VLOG("Unexpected increasing value");
967                         StatsdStats::getInstance().notePullDataError(mPullTagId);
968                         baseInfo.base = value;
969                         // If we've got bad data, do not use anomaly detection
970                         useAnomalyDetection = false;
971                         continue;
972                     }
973                     break;
974                 case ValueMetric::ANY:
975                     diff = value - baseInfo.base;
976                     break;
977                 default:
978                     break;
979             }
980             baseInfo.base = value;
981             value = diff;
982         }
983 
984         if (interval.hasValue) {
985             switch (mAggregationType) {
986                 case ValueMetric::SUM:
987                     // for AVG, we add up and take average when flushing the bucket
988                 case ValueMetric::AVG:
989                     interval.value += value;
990                     break;
991                 case ValueMetric::MIN:
992                     interval.value = std::min(value, interval.value);
993                     break;
994                 case ValueMetric::MAX:
995                     interval.value = std::max(value, interval.value);
996                     break;
997                 default:
998                     break;
999             }
1000         } else {
1001             interval.value = value;
1002             interval.hasValue = true;
1003         }
1004         interval.sampleSize += 1;
1005     }
1006 
1007     // State change.
1008     if (!mSlicedStateAtoms.empty() && stateChange) {
1009         // Turn OFF the condition timer for the previous state key.
1010         mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)]
1011                 .conditionTimer.onConditionChanged(false, eventTimeNs);
1012 
1013         // Turn ON the condition timer for the new state key.
1014         mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)]
1015                 .conditionTimer.onConditionChanged(true, eventTimeNs);
1016     }
1017 
1018     // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
1019     // to MULTIPLE_BUCKETS_SKIPPED.
1020     if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
1021         // TODO: propgate proper values down stream when anomaly support doubles
1022         long wholeBucketVal = intervals[0].value.long_value;
1023         auto prev = mCurrentFullBucket.find(eventKey);
1024         if (prev != mCurrentFullBucket.end()) {
1025             wholeBucketVal += prev->second;
1026         }
1027         for (auto& tracker : mAnomalyTrackers) {
1028             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
1029                                              wholeBucketVal);
1030         }
1031     }
1032 }
1033 
1034 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
1035 // if mCondition is true!
1036 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
1037     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
1038     if (eventTimeNs < currentBucketEndTimeNs) {
1039         VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
1040              (long long)(currentBucketEndTimeNs));
1041         return;
1042     }
1043     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
1044     int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
1045     flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
1046 }
1047 
1048 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
1049     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
1050     if (eventTimeNs < currentBucketEndTimeNs) {
1051         return 0;
1052     }
1053     return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
1054 }
1055 
1056 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
1057                                                    const int64_t& nextBucketStartTimeNs) {
1058     if (mCondition == ConditionState::kUnknown) {
1059         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
1060         invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
1061     }
1062 
1063     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
1064          (int)mCurrentSlicedBucket.size());
1065 
1066     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
1067     int64_t bucketEndTime = fullBucketEndTimeNs;
1068     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
1069 
1070     if (multipleBucketsSkipped(numBucketsForward)) {
1071         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
1072         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
1073         // Something went wrong. Maybe the device was sleeping for a long time. It is better
1074         // to mark the current bucket as invalid. The last pull might have been successful through.
1075         invalidateCurrentBucketWithoutResetBase(eventTimeNs,
1076                                                 BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
1077         // End the bucket at the next bucket start time so the entire interval is skipped.
1078         bucketEndTime = nextBucketStartTimeNs;
1079     } else if (eventTimeNs < fullBucketEndTimeNs) {
1080         bucketEndTime = eventTimeNs;
1081     }
1082 
1083     // Close the current bucket.
1084     int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
1085     bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
1086     if (!isBucketLargeEnough) {
1087         skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
1088     }
1089     if (!mCurrentBucketIsSkipped) {
1090         bool bucketHasData = false;
1091         // The current bucket is large enough to keep.
1092         for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
1093             PastValueBucket bucket =
1094                     buildPartialBucket(bucketEndTime, currentValueBucket.intervals);
1095             if (!mSlicedStateAtoms.empty()) {
1096                 bucket.mConditionTrueNs =
1097                         currentValueBucket.conditionTimer.newBucketStart(bucketEndTime);
1098             } else {
1099                 bucket.mConditionTrueNs = conditionTrueDuration;
1100             }
1101             // it will auto create new vector of ValuebucketInfo if the key is not found.
1102             if (bucket.valueIndex.size() > 0) {
1103                 auto& bucketList = mPastBuckets[metricDimensionKey];
1104                 bucketList.push_back(bucket);
1105                 bucketHasData = true;
1106             }
1107         }
1108         if (!bucketHasData) {
1109             skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
1110         }
1111     }
1112 
1113     if (mCurrentBucketIsSkipped) {
1114         mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
1115         mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
1116         mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
1117     }
1118 
1119     // This means that the current bucket was not flushed before a forced bucket split.
1120     // This can happen if an app update or a dump report with include_current_partial_bucket is
1121     // requested before we get a chance to flush the bucket due to receiving new data, either from
1122     // the statsd socket or the StatsPullerManager.
1123     if (bucketEndTime < nextBucketStartTimeNs) {
1124         SkippedBucket bucketInGap;
1125         bucketInGap.bucketStartTimeNs = bucketEndTime;
1126         bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
1127         bucketInGap.dropEvents.emplace_back(
1128                 buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
1129         mSkippedBuckets.emplace_back(bucketInGap);
1130     }
1131     appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
1132     initCurrentSlicedBucket(nextBucketStartTimeNs);
1133     // Update the condition timer again, in case we skipped buckets.
1134     mConditionTimer.newBucketStart(nextBucketStartTimeNs);
1135 
1136     // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing
1137     // by state. Otherwise, the "global" condition timer will be used.
1138     if (!mSlicedStateAtoms.empty()) {
1139         for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
1140             currentValueBucket.conditionTimer.newBucketStart(nextBucketStartTimeNs);
1141         }
1142     }
1143     mCurrentBucketNum += numBucketsForward;
1144 }
1145 
1146 PastValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
1147                                                         const std::vector<Interval>& intervals) {
1148     PastValueBucket bucket;
1149     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
1150     bucket.mBucketEndNs = bucketEndTime;
1151 
1152     // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
1153     // then all interval values are discarded for this bucket.
1154     if ((intervals.size() <= 0) || (intervals[0].hasValue && !valuePassesThreshold(intervals[0]))) {
1155         return bucket;
1156     }
1157 
1158     for (const Interval& interval : intervals) {
1159         // Skip the output if the diff is zero
1160         if (!interval.hasValue || (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero())) {
1161             continue;
1162         }
1163 
1164         bucket.valueIndex.push_back(interval.valueIndex);
1165         bucket.values.push_back(getFinalValue(interval));
1166     }
1167     return bucket;
1168 }
1169 
1170 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
1171     StatsdStats::getInstance().noteBucketCount(mMetricId);
1172     // Cleanup data structure to aggregate values.
1173     for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
1174         bool obsolete = true;
1175         for (auto& interval : it->second.intervals) {
1176             interval.hasValue = false;
1177             interval.sampleSize = 0;
1178             if (interval.seenNewData) {
1179                 obsolete = false;
1180             }
1181             interval.seenNewData = false;
1182         }
1183 
1184         if (obsolete && !mSlicedStateAtoms.empty()) {
1185             // When slicing by state, only delete the MetricDimensionKey when the
1186             // state key in the MetricDimensionKey is not the current state key.
1187             const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat();
1188             const auto& currentBaseInfoItr = mCurrentBaseInfo.find(dimensionInWhatKey);
1189 
1190             if ((currentBaseInfoItr != mCurrentBaseInfo.end()) &&
1191                 (it->first.getStateValuesKey() == currentBaseInfoItr->second.currentState)) {
1192                 obsolete = false;
1193             }
1194         }
1195         if (obsolete) {
1196             it = mCurrentSlicedBucket.erase(it);
1197         } else {
1198             it++;
1199         }
1200         // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete
1201     }
1202 
1203     mCurrentBucketIsSkipped = false;
1204     mCurrentSkippedBucket.reset();
1205 
1206     // If we do not have a global base when the condition is true,
1207     // we will have incomplete bucket for the next bucket.
1208     if (mUseDiff && !mHasGlobalBase && mCondition) {
1209         mCurrentBucketIsSkipped = false;
1210     }
1211     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
1212     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
1213          (long long)mCurrentBucketStartTimeNs);
1214 }
1215 
1216 void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
1217     if (mCurrentBucketIsSkipped) {
1218         if (isFullBucketReached) {
1219             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
1220             mCurrentFullBucket.clear();
1221         }
1222         // Current bucket is invalid, we do not add it to the full bucket.
1223         return;
1224     }
1225 
1226     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
1227         // Accumulate partial buckets with current value and then send to anomaly tracker.
1228         if (mCurrentFullBucket.size() > 0) {
1229             for (const auto& slice : mCurrentSlicedBucket) {
1230                 if (hitFullBucketGuardRailLocked(slice.first) || slice.second.intervals.empty()) {
1231                     continue;
1232                 }
1233                 // TODO: fix this when anomaly can accept double values
1234                 auto& interval = slice.second.intervals[0];
1235                 if (interval.hasValue) {
1236                     mCurrentFullBucket[slice.first] += interval.value.long_value;
1237                 }
1238             }
1239             for (const auto& slice : mCurrentFullBucket) {
1240                 for (auto& tracker : mAnomalyTrackers) {
1241                     if (tracker != nullptr) {
1242                         tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum);
1243                     }
1244                 }
1245             }
1246             mCurrentFullBucket.clear();
1247         } else {
1248             // Skip aggregating the partial buckets since there's no previous partial bucket.
1249             for (const auto& slice : mCurrentSlicedBucket) {
1250                 for (auto& tracker : mAnomalyTrackers) {
1251                     if (tracker != nullptr && !slice.second.intervals.empty()) {
1252                         // TODO: fix this when anomaly can accept double values
1253                         auto& interval = slice.second.intervals[0];
1254                         if (interval.hasValue) {
1255                             tracker->addPastBucket(slice.first, interval.value.long_value,
1256                                                    mCurrentBucketNum);
1257                         }
1258                     }
1259                 }
1260             }
1261         }
1262     } else {
1263         // Accumulate partial bucket.
1264         for (const auto& slice : mCurrentSlicedBucket) {
1265             if (!slice.second.intervals.empty()) {
1266                 // TODO: fix this when anomaly can accept double values
1267                 auto& interval = slice.second.intervals[0];
1268                 if (interval.hasValue) {
1269                     mCurrentFullBucket[slice.first] += interval.value.long_value;
1270                 }
1271             }
1272         }
1273     }
1274 }
1275 
1276 size_t ValueMetricProducer::byteSizeLocked() const {
1277     size_t totalSize = 0;
1278     for (const auto& pair : mPastBuckets) {
1279         totalSize += pair.second.size() * kBucketSize;
1280     }
1281     return totalSize;
1282 }
1283 
1284 bool ValueMetricProducer::valuePassesThreshold(const Interval& interval) {
1285     if (mUploadThreshold == nullopt) {
1286         return true;
1287     }
1288 
1289     Value finalValue = getFinalValue(interval);
1290 
1291     double doubleValue =
1292             finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
1293     switch (mUploadThreshold->value_comparison_case()) {
1294         case UploadThreshold::kLtInt:
1295             return doubleValue < (double)mUploadThreshold->lt_int();
1296         case UploadThreshold::kGtInt:
1297             return doubleValue > (double)mUploadThreshold->gt_int();
1298         case UploadThreshold::kLteInt:
1299             return doubleValue <= (double)mUploadThreshold->lte_int();
1300         case UploadThreshold::kGteInt:
1301             return doubleValue >= (double)mUploadThreshold->gte_int();
1302         case UploadThreshold::kLtFloat:
1303             return doubleValue <= (double)mUploadThreshold->lt_float();
1304         case UploadThreshold::kGtFloat:
1305             return doubleValue >= (double)mUploadThreshold->gt_float();
1306         default:
1307             ALOGE("Value metric no upload threshold type used");
1308             return false;
1309     }
1310 }
1311 
1312 Value ValueMetricProducer::getFinalValue(const Interval& interval) {
1313     if (mAggregationType != ValueMetric::AVG) {
1314         return interval.value;
1315     } else {
1316         double sum = interval.value.type == LONG ? (double)interval.value.long_value
1317                                                  : interval.value.double_value;
1318         return Value((double)sum / interval.sampleSize);
1319     }
1320 }
1321 
1322 }  // namespace statsd
1323 }  // namespace os
1324 }  // namespace android
1325