1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "ValueMetricProducer.h"
21 #include "../guardrail/StatsdStats.h"
22 #include "../stats_log_util.h"
23 
24 #include <limits.h>
25 #include <stdlib.h>
26 
27 using android::util::FIELD_COUNT_REPEATED;
28 using android::util::FIELD_TYPE_BOOL;
29 using android::util::FIELD_TYPE_DOUBLE;
30 using android::util::FIELD_TYPE_INT32;
31 using android::util::FIELD_TYPE_INT64;
32 using android::util::FIELD_TYPE_MESSAGE;
33 using android::util::FIELD_TYPE_STRING;
34 using android::util::ProtoOutputStream;
35 using std::map;
36 using std::shared_ptr;
37 using std::unordered_map;
38 
39 namespace android {
40 namespace os {
41 namespace statsd {
42 
43 // for StatsLogReport
44 const int FIELD_ID_ID = 1;
45 const int FIELD_ID_VALUE_METRICS = 7;
46 const int FIELD_ID_TIME_BASE = 9;
47 const int FIELD_ID_BUCKET_SIZE = 10;
48 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
49 const int FIELD_ID_IS_ACTIVE = 14;
50 // for ValueMetricDataWrapper
51 const int FIELD_ID_DATA = 1;
52 const int FIELD_ID_SKIPPED = 2;
53 // for SkippedBuckets
54 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
55 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
56 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
57 // for DumpEvent Proto
58 const int FIELD_ID_BUCKET_DROP_REASON = 1;
59 const int FIELD_ID_DROP_TIME = 2;
60 // for ValueMetricData
61 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
62 const int FIELD_ID_BUCKET_INFO = 3;
63 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
64 const int FIELD_ID_SLICE_BY_STATE = 6;
65 // for ValueBucketInfo
66 const int FIELD_ID_VALUE_INDEX = 1;
67 const int FIELD_ID_VALUE_LONG = 2;
68 const int FIELD_ID_VALUE_DOUBLE = 3;
69 const int FIELD_ID_VALUES = 9;
70 const int FIELD_ID_BUCKET_NUM = 4;
71 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
72 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
73 const int FIELD_ID_CONDITION_TRUE_NS = 10;
74 
75 const Value ZERO_LONG((int64_t)0);
76 const Value ZERO_DOUBLE((int64_t)0);
77 
78 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const int conditionIndex,const vector<ConditionState> & initialConditionCache,const sp<ConditionWizard> & conditionWizard,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager,const unordered_map<int,shared_ptr<Activation>> & eventActivationMap,const unordered_map<int,vector<shared_ptr<Activation>>> & eventDeactivationMap,const vector<int> & slicedStateAtoms,const unordered_map<int,unordered_map<int,int64_t>> & stateGroupMap)79 ValueMetricProducer::ValueMetricProducer(
80         const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
81         const vector<ConditionState>& initialConditionCache,
82         const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
83         const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
84         const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager,
85         const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
86         const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap,
87         const vector<int>& slicedStateAtoms,
88         const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap)
89     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache,
90                      conditionWizard, eventActivationMap, eventDeactivationMap, slicedStateAtoms,
91                      stateGroupMap),
92       mWhatMatcherIndex(whatMatcherIndex),
93       mEventMatcherWizard(matcherWizard),
94       mPullerManager(pullerManager),
95       mPullTagId(pullTagId),
96       mIsPulled(pullTagId != -1),
97       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
98       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
99                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
100                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
101                                   : StatsdStats::kDimensionKeySizeSoftLimit),
102       mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
103                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
104                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
105                                   : StatsdStats::kDimensionKeySizeHardLimit),
106       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
107       mAggregationType(metric.aggregation_type()),
108       mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
109       mValueDirection(metric.value_direction()),
110       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
111       mUseZeroDefaultBase(metric.use_zero_default_base()),
112       mHasGlobalBase(false),
113       mCurrentBucketIsSkipped(false),
114       mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
115                                                       : StatsdStats::kPullMaxDelayNs),
116       mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
117       // Condition timer will be set later within the constructor after pulling events
118       mConditionTimer(false, timeBaseNs) {
119     int64_t bucketSizeMills = 0;
120     if (metric.has_bucket()) {
121         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
122     } else {
123         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
124     }
125 
126     mBucketSizeNs = bucketSizeMills * 1000000;
127 
128     translateFieldMatcher(metric.value_field(), &mFieldMatchers);
129 
130     if (metric.has_dimensions_in_what()) {
131         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
132         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
133         mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what());
134     }
135 
136     if (metric.links().size() > 0) {
137         for (const auto& link : metric.links()) {
138             Metric2Condition mc;
139             mc.conditionId = link.condition();
140             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
141             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
142             mMetric2ConditionLinks.push_back(mc);
143         }
144         mConditionSliced = true;
145     }
146 
147     for (const auto& stateLink : metric.state_link()) {
148         Metric2State ms;
149         ms.stateAtomId = stateLink.state_atom_id();
150         translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
151         translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
152         mMetric2StateLinks.push_back(ms);
153     }
154 
155     int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
156     mCurrentBucketNum += numBucketsForward;
157 
158     flushIfNeededLocked(startTimeNs);
159 
160     if (mIsPulled) {
161         mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
162                                          mBucketSizeNs);
163     }
164 
165     // Only do this for partial buckets like first bucket. All other buckets should use
166     // flushIfNeeded to adjust start and end to bucket boundaries.
167     // Adjust start for partial bucket
168     mCurrentBucketStartTimeNs = startTimeNs;
169     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
170 
171     // Now that activations are processed, start the condition timer if needed.
172     mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
173                                        mCurrentBucketStartTimeNs);
174 
175     VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
176          (long long)mBucketSizeNs, (long long)mTimeBaseNs);
177 }
178 
~ValueMetricProducer()179 ValueMetricProducer::~ValueMetricProducer() {
180     VLOG("~ValueMetricProducer() called");
181     if (mIsPulled) {
182         mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
183     }
184 }
185 
onStateChanged(int64_t eventTimeNs,int32_t atomId,const HashableDimensionKey & primaryKey,const FieldValue & oldState,const FieldValue & newState)186 void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId,
187                                          const HashableDimensionKey& primaryKey,
188                                          const FieldValue& oldState, const FieldValue& newState) {
189     VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
190          (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
191          oldState.mValue.int_value, newState.mValue.int_value);
192 
193     // If old and new states are in the same StateGroup, then we do not need to
194     // pull for this state change.
195     FieldValue oldStateCopy = oldState;
196     FieldValue newStateCopy = newState;
197     mapStateValue(atomId, &oldStateCopy);
198     mapStateValue(atomId, &newStateCopy);
199     if (oldStateCopy == newStateCopy) {
200         return;
201     }
202 
203     // If condition is not true or metric is not active, we do not need to pull
204     // for this state change.
205     if (mCondition != ConditionState::kTrue || !mIsActive) {
206         return;
207     }
208 
209     bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs;
210     if (isEventLate) {
211         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
212              (long long)mCurrentBucketStartTimeNs);
213         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
214         return;
215     }
216     mStateChangePrimaryKey.first = atomId;
217     mStateChangePrimaryKey.second = primaryKey;
218     if (mIsPulled) {
219         pullAndMatchEventsLocked(eventTimeNs);
220     }
221     mStateChangePrimaryKey.first = 0;
222     mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
223     flushIfNeededLocked(eventTimeNs);
224 }
225 
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)226 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
227                                                            const int64_t eventTime) {
228     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
229 }
230 
dropDataLocked(const int64_t dropTimeNs)231 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
232     StatsdStats::getInstance().noteBucketDropped(mMetricId);
233 
234     // The current partial bucket is not flushed and does not require a pull,
235     // so the data is still valid.
236     flushIfNeededLocked(dropTimeNs);
237     clearPastBucketsLocked(dropTimeNs);
238 }
239 
clearPastBucketsLocked(const int64_t dumpTimeNs)240 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
241     mPastBuckets.clear();
242     mSkippedBuckets.clear();
243 }
244 
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,ProtoOutputStream * protoOutput)245 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
246                                              const bool include_current_partial_bucket,
247                                              const bool erase_data,
248                                              const DumpLatency dumpLatency,
249                                              std::set<string> *str_set,
250                                              ProtoOutputStream* protoOutput) {
251     VLOG("metric %lld dump report now...", (long long)mMetricId);
252     if (include_current_partial_bucket) {
253         // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
254         // current bucket will have incomplete data and the next will have the wrong snapshot to do
255         // a diff against. If the condition is false, we are fine since the base data is reset and
256         // we are not tracking anything.
257         bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
258         if (pullNeeded) {
259             switch (dumpLatency) {
260                 case FAST:
261                     invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
262                     break;
263                 case NO_TIME_CONSTRAINTS:
264                     pullAndMatchEventsLocked(dumpTimeNs);
265                     break;
266             }
267         }
268         flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
269     }
270     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
271     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
272 
273     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
274         return;
275     }
276     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
277     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
278     // Fills the dimension path if not slicing by ALL.
279     if (!mSliceByPositionALL) {
280         if (!mDimensionsInWhat.empty()) {
281             uint64_t dimenPathToken =
282                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
283             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
284             protoOutput->end(dimenPathToken);
285         }
286     }
287 
288     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
289 
290     for (const auto& skippedBucket : mSkippedBuckets) {
291         uint64_t wrapperToken =
292                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
293         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
294                            (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
295         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
296                            (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
297         for (const auto& dropEvent : skippedBucket.dropEvents) {
298             uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
299                                                          FIELD_ID_SKIPPED_DROP_EVENT);
300             protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
301             protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
302                                (long long)(NanoToMillis(dropEvent.dropTimeNs)));
303             ;
304             protoOutput->end(dropEventToken);
305         }
306         protoOutput->end(wrapperToken);
307     }
308 
309     for (const auto& pair : mPastBuckets) {
310         const MetricDimensionKey& dimensionKey = pair.first;
311         VLOG("  dimension key %s", dimensionKey.toString().c_str());
312         uint64_t wrapperToken =
313                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
314 
315         // First fill dimension.
316         if (mSliceByPositionALL) {
317             uint64_t dimensionToken =
318                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
319             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
320             protoOutput->end(dimensionToken);
321         } else {
322             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
323                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
324         }
325 
326         // Then fill slice_by_state.
327         for (auto state : dimensionKey.getStateValuesKey().getValues()) {
328             uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
329                                                      FIELD_ID_SLICE_BY_STATE);
330             writeStateToProto(state, protoOutput);
331             protoOutput->end(stateToken);
332         }
333 
334         // Then fill bucket_info (ValueBucketInfo).
335         for (const auto& bucket : pair.second) {
336             uint64_t bucketInfoToken = protoOutput->start(
337                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
338 
339             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
340                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
341                                    (long long)NanoToMillis(bucket.mBucketStartNs));
342                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
343                                    (long long)NanoToMillis(bucket.mBucketEndNs));
344             } else {
345                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
346                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
347             }
348             // only write the condition timer value if the metric has a condition.
349             if (mConditionTrackerIndex >= 0) {
350                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
351                                    (long long)bucket.mConditionTrueNs);
352             }
353             for (int i = 0; i < (int)bucket.valueIndex.size(); i++) {
354                 int index = bucket.valueIndex[i];
355                 const Value& value = bucket.values[i];
356                 uint64_t valueToken = protoOutput->start(
357                         FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
358                 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
359                                    index);
360                 if (value.getType() == LONG) {
361                     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
362                                        (long long)value.long_value);
363                     VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
364                          (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
365                 } else if (value.getType() == DOUBLE) {
366                     protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
367                                        value.double_value);
368                     VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
369                          (long long)bucket.mBucketEndNs, index, value.double_value);
370                 } else {
371                     VLOG("Wrong value type for ValueMetric output: %d", value.getType());
372                 }
373                 protoOutput->end(valueToken);
374             }
375             protoOutput->end(bucketInfoToken);
376         }
377         protoOutput->end(wrapperToken);
378     }
379     protoOutput->end(protoToken);
380 
381     VLOG("metric %lld dump report now...", (long long)mMetricId);
382     if (erase_data) {
383         mPastBuckets.clear();
384         mSkippedBuckets.clear();
385     }
386 }
387 
invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,const BucketDropReason reason)388 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
389                                                                   const BucketDropReason reason) {
390     if (!mCurrentBucketIsSkipped) {
391         // Only report to StatsdStats once per invalid bucket.
392         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
393     }
394 
395     skipCurrentBucket(dropTimeNs, reason);
396 }
397 
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)398 void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
399                                                   const BucketDropReason reason) {
400     invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason);
401     resetBase();
402 }
403 
skipCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)404 void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs,
405                                             const BucketDropReason reason) {
406     if (!maxDropEventsReached()) {
407         mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason));
408     }
409     mCurrentBucketIsSkipped = true;
410 }
411 
resetBase()412 void ValueMetricProducer::resetBase() {
413     for (auto& slice : mCurrentBaseInfo) {
414         for (auto& baseInfo : slice.second) {
415             baseInfo.hasBase = false;
416         }
417     }
418     mHasGlobalBase = false;
419 }
420 
421 // Handle active state change. Active state change is treated like a condition change:
422 // - drop bucket if active state change event arrives too late
423 // - if condition is true, pull data on active state changes
424 // - ConditionTimer tracks changes based on AND of condition and active state.
onActiveStateChangedLocked(const int64_t & eventTimeNs)425 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
426     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
427     if (isEventTooLate) {
428         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
429         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
430         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
431     }
432 
433     // Call parent method once we've verified the validity of current bucket.
434     MetricProducer::onActiveStateChangedLocked(eventTimeNs);
435 
436     if (ConditionState::kTrue != mCondition) {
437         return;
438     }
439 
440     // Pull on active state changes.
441     if (!isEventTooLate) {
442         if (mIsPulled) {
443             pullAndMatchEventsLocked(eventTimeNs);
444         }
445         // When active state changes from true to false, clear diff base but don't
446         // reset other counters as we may accumulate more value in the bucket.
447         if (mUseDiff && !mIsActive) {
448             resetBase();
449         }
450     }
451 
452     flushIfNeededLocked(eventTimeNs);
453 
454     // Let condition timer know of new active state.
455     mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
456 }
457 
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)458 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
459                                                    const int64_t eventTimeNs) {
460     ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
461     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
462 
463     // If the config is not active, skip the event.
464     if (!mIsActive) {
465         mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition;
466         return;
467     }
468 
469     // If the event arrived late, mark the bucket as invalid and skip the event.
470     if (isEventTooLate) {
471         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
472              (long long)mCurrentBucketStartTimeNs);
473         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
474         StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
475         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
476         mCondition = ConditionState::kUnknown;
477         mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
478         return;
479     }
480 
481     // If the previous condition was unknown, mark the bucket as invalid
482     // because the bucket will contain partial data. For example, the condition
483     // change might happen close to the end of the bucket and we might miss a
484     // lot of data.
485     //
486     // We still want to pull to set the base.
487     if (mCondition == ConditionState::kUnknown) {
488         invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
489     }
490 
491     // Pull and match for the following condition change cases:
492     // unknown/false -> true - condition changed
493     // true -> false - condition changed
494     // true -> true - old condition was true so we can flush the bucket at the
495     // end if needed.
496     //
497     // We don’t need to pull for unknown -> false or false -> false.
498     //
499     // onConditionChangedLocked might happen on bucket boundaries if this is
500     // called before #onDataPulled.
501     if (mIsPulled &&
502         (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) {
503         pullAndMatchEventsLocked(eventTimeNs);
504     }
505 
506     // For metrics that use diff, when condition changes from true to false,
507     // clear diff base but don't reset other counts because we may accumulate
508     // more value in the bucket.
509     if (mUseDiff &&
510         (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
511         resetBase();
512     }
513 
514     // Update condition state after pulling.
515     mCondition = newCondition;
516 
517     flushIfNeededLocked(eventTimeNs);
518     mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
519 }
520 
prepareFirstBucketLocked()521 void ValueMetricProducer::prepareFirstBucketLocked() {
522     // Kicks off the puller immediately if condition is true and diff based.
523     if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
524         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
525     }
526 }
527 
pullAndMatchEventsLocked(const int64_t timestampNs)528 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
529     vector<std::shared_ptr<LogEvent>> allData;
530     if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
531         ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
532         invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
533         return;
534     }
535 
536     accumulateEvents(allData, timestampNs, timestampNs);
537 }
538 
calcPreviousBucketEndTime(const int64_t currentTimeNs)539 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
540     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
541 }
542 
543 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
544 // to be delayed. Other events like condition changes or app upgrade which are not based on
545 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)546 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
547                                        bool pullSuccess, int64_t originalPullTimeNs) {
548     std::lock_guard<std::mutex> lock(mMutex);
549     if (mCondition == ConditionState::kTrue) {
550         // If the pull failed, we won't be able to compute a diff.
551         if (!pullSuccess) {
552             invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
553         } else {
554             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
555             if (isEventLate) {
556                 // If the event is late, we are in the middle of a bucket. Just
557                 // process the data without trying to snap the data to the nearest bucket.
558                 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
559             } else {
560                 // For scheduled pulled data, the effective event time is snap to the nearest
561                 // bucket end. In the case of waking up from a deep sleep state, we will
562                 // attribute to the previous bucket end. If the sleep was long but not very
563                 // long, we will be in the immediate next bucket. Previous bucket may get a
564                 // larger number as we pull at a later time than real bucket end.
565                 //
566                 // If the sleep was very long, we skip more than one bucket before sleep. In
567                 // this case, if the diff base will be cleared and this new data will serve as
568                 // new diff base.
569                 int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
570                 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
571                         mMetricId, originalPullTimeNs - bucketEndTime);
572                 accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
573             }
574         }
575     }
576 
577     // We can probably flush the bucket. Since we used bucketEndTime when calling
578     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
579     flushIfNeededLocked(originalPullTimeNs);
580 }
581 
accumulateEvents(const std::vector<std::shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)582 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
583                                            int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
584     bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
585     if (isEventLate) {
586         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
587              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
588         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
589         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
590         return;
591     }
592 
593     const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
594     const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
595     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
596     if (pullDelayNs > mMaxPullDelayNs) {
597         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
598               (long long)mMaxPullDelayNs);
599         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
600         // We are missing one pull from the bucket which means we will not have a complete view of
601         // what's going on.
602         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
603         return;
604     }
605 
606     mMatchedMetricDimensionKeys.clear();
607     for (const auto& data : allData) {
608         LogEvent localCopy = data->makeCopy();
609         if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
610             MatchingState::kMatched) {
611             localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
612             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
613         }
614     }
615     // If a key that is:
616     // 1. Tracked in mCurrentSlicedBucket and
617     // 2. A superset of the current mStateChangePrimaryKey
618     // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
619     // then we need to reset the base.
620     for (auto& slice : mCurrentSlicedBucket) {
621         const auto& whatKey = slice.first.getDimensionKeyInWhat();
622         bool presentInPulledData =
623                 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
624         if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) {
625             auto it = mCurrentBaseInfo.find(whatKey);
626             for (auto& baseInfo : it->second) {
627                 baseInfo.hasBase = false;
628             }
629         }
630     }
631     mMatchedMetricDimensionKeys.clear();
632     mHasGlobalBase = true;
633 
634     // If we reach the guardrail, we might have dropped some data which means the bucket is
635     // incomplete.
636     //
637     // The base also needs to be reset. If we do not have the full data, we might
638     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
639     // might be missing from mCurrentSlicedBucket.
640     if (hasReachedGuardRailLimit()) {
641         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
642         mCurrentSlicedBucket.clear();
643     }
644 }
645 
dumpStatesLocked(FILE * out,bool verbose) const646 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
647     if (mCurrentSlicedBucket.size() == 0) {
648         return;
649     }
650 
651     fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId,
652             (unsigned long)mCurrentSlicedBucket.size());
653     if (verbose) {
654         for (const auto& it : mCurrentSlicedBucket) {
655           for (const auto& interval : it.second) {
656               fprintf(out, "\t(what)%s\t(states)%s  (value)%s\n",
657                       it.first.getDimensionKeyInWhat().toString().c_str(),
658                       it.first.getStateValuesKey().toString().c_str(),
659                       interval.value.toString().c_str());
660           }
661         }
662     }
663 }
664 
hasReachedGuardRailLimit() const665 bool ValueMetricProducer::hasReachedGuardRailLimit() const {
666     return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
667 }
668 
hitGuardRailLocked(const MetricDimensionKey & newKey)669 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
670     // ===========GuardRail==============
671     // 1. Report the tuple count if the tuple count > soft limit
672     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
673         return false;
674     }
675     if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
676         size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
677         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
678         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
679         if (hasReachedGuardRailLimit()) {
680             ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
681                   newKey.toString().c_str());
682             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
683             return true;
684         }
685     }
686 
687     return false;
688 }
689 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)690 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
691     // ===========GuardRail==============
692     // 1. Report the tuple count if the tuple count > soft limit
693     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
694         return false;
695     }
696     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
697         size_t newTupleCount = mCurrentFullBucket.size() + 1;
698         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
699         if (newTupleCount > mDimensionHardLimit) {
700             ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
701                   (long long)mMetricId,
702                   newKey.toString().c_str());
703             return true;
704         }
705     }
706 
707     return false;
708 }
709 
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)710 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
711     for (const FieldValue& value : event.getValues()) {
712         if (value.mField.matches(matcher)) {
713             switch (value.mValue.type) {
714                 case INT:
715                     ret.setLong(value.mValue.int_value);
716                     break;
717                 case LONG:
718                     ret.setLong(value.mValue.long_value);
719                     break;
720                 case FLOAT:
721                     ret.setDouble(value.mValue.float_value);
722                     break;
723                 case DOUBLE:
724                     ret.setDouble(value.mValue.double_value);
725                     break;
726                 default:
727                     return false;
728                     break;
729             }
730             return true;
731         }
732     }
733     return false;
734 }
735 
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> & statePrimaryKeys)736 void ValueMetricProducer::onMatchedLogEventInternalLocked(
737         const size_t matcherIndex, const MetricDimensionKey& eventKey,
738         const ConditionKey& conditionKey, bool condition, const LogEvent& event,
739         const map<int, HashableDimensionKey>& statePrimaryKeys) {
740     auto whatKey = eventKey.getDimensionKeyInWhat();
741     auto stateKey = eventKey.getStateValuesKey();
742 
743     // Skip this event if a state changed occurred for a different primary key.
744     auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
745     // Check that both the atom id and the primary key are equal.
746     if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
747         VLOG("ValueMetric skip event with primary key %s because state change primary key "
748              "is %s",
749              it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
750         return;
751     }
752 
753     int64_t eventTimeNs = event.GetElapsedTimestampNs();
754     if (eventTimeNs < mCurrentBucketStartTimeNs) {
755         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
756              (long long)mCurrentBucketStartTimeNs);
757         return;
758     }
759     mMatchedMetricDimensionKeys.insert(whatKey);
760 
761     if (!mIsPulled) {
762         // We cannot flush without doing a pull first.
763         flushIfNeededLocked(eventTimeNs);
764     }
765 
766     // We should not accumulate the data for pushed metrics when the condition is false.
767     bool shouldSkipForPushMetric = !mIsPulled && !condition;
768     // For pulled metrics, there are two cases:
769     // - to compute diffs, we need to process all the state changes
770     // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a
771     // state change from
772     //     + True -> True: we should process the data, it might be a bucket boundary
773     //     + True -> False: we als need to process the data.
774     bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
775             && mCondition != ConditionState::kTrue;
776     if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
777         VLOG("ValueMetric skip event because condition is false and we are not using diff (for "
778              "pulled metric)");
779         return;
780     }
781 
782     if (hitGuardRailLocked(eventKey)) {
783         return;
784     }
785 
786     vector<BaseInfo>& baseInfos = mCurrentBaseInfo[whatKey];
787     if (baseInfos.size() < mFieldMatchers.size()) {
788         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
789         baseInfos.resize(mFieldMatchers.size());
790     }
791 
792     for (BaseInfo& baseInfo : baseInfos) {
793         if (!baseInfo.hasCurrentState) {
794             baseInfo.currentState = getUnknownStateKey();
795             baseInfo.hasCurrentState = true;
796         }
797     }
798 
799     // We need to get the intervals stored with the previous state key so we can
800     // close these value intervals.
801     const auto oldStateKey = baseInfos[0].currentState;
802     vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
803     if (intervals.size() < mFieldMatchers.size()) {
804         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
805         intervals.resize(mFieldMatchers.size());
806     }
807 
808     // We only use anomaly detection under certain cases.
809     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
810     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
811     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
812     // Whoever next works on it should look into the cases where it is triggered in this function.
813     // Discussion here: http://ag/6124370.
814     bool useAnomalyDetection = true;
815 
816     for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
817         const Matcher& matcher = mFieldMatchers[i];
818         BaseInfo& baseInfo = baseInfos[i];
819         Interval& interval = intervals[i];
820         interval.valueIndex = i;
821         Value value;
822         baseInfo.hasCurrentState = true;
823         baseInfo.currentState = stateKey;
824         if (!getDoubleOrLong(event, matcher, value)) {
825             VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
826             StatsdStats::getInstance().noteBadValueType(mMetricId);
827             return;
828         }
829         interval.seenNewData = true;
830 
831         if (mUseDiff) {
832             if (!baseInfo.hasBase) {
833                 if (mHasGlobalBase && mUseZeroDefaultBase) {
834                     // The bucket has global base. This key does not.
835                     // Optionally use zero as base.
836                     baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
837                     baseInfo.hasBase = true;
838                 } else {
839                     // no base. just update base and return.
840                     baseInfo.base = value;
841                     baseInfo.hasBase = true;
842                     // If we're missing a base, do not use anomaly detection on incomplete data
843                     useAnomalyDetection = false;
844                     // Continue (instead of return) here in order to set baseInfo.base and
845                     // baseInfo.hasBase for other baseInfos
846                     continue;
847                 }
848             }
849 
850             Value diff;
851             switch (mValueDirection) {
852                 case ValueMetric::INCREASING:
853                     if (value >= baseInfo.base) {
854                         diff = value - baseInfo.base;
855                     } else if (mUseAbsoluteValueOnReset) {
856                         diff = value;
857                     } else {
858                         VLOG("Unexpected decreasing value");
859                         StatsdStats::getInstance().notePullDataError(mPullTagId);
860                         baseInfo.base = value;
861                         // If we've got bad data, do not use anomaly detection
862                         useAnomalyDetection = false;
863                         continue;
864                     }
865                     break;
866                 case ValueMetric::DECREASING:
867                     if (baseInfo.base >= value) {
868                         diff = baseInfo.base - value;
869                     } else if (mUseAbsoluteValueOnReset) {
870                         diff = value;
871                     } else {
872                         VLOG("Unexpected increasing value");
873                         StatsdStats::getInstance().notePullDataError(mPullTagId);
874                         baseInfo.base = value;
875                         // If we've got bad data, do not use anomaly detection
876                         useAnomalyDetection = false;
877                         continue;
878                     }
879                     break;
880                 case ValueMetric::ANY:
881                     diff = value - baseInfo.base;
882                     break;
883                 default:
884                     break;
885             }
886             baseInfo.base = value;
887             value = diff;
888         }
889 
890         if (interval.hasValue) {
891             switch (mAggregationType) {
892                 case ValueMetric::SUM:
893                     // for AVG, we add up and take average when flushing the bucket
894                 case ValueMetric::AVG:
895                     interval.value += value;
896                     break;
897                 case ValueMetric::MIN:
898                     interval.value = std::min(value, interval.value);
899                     break;
900                 case ValueMetric::MAX:
901                     interval.value = std::max(value, interval.value);
902                     break;
903                 default:
904                     break;
905             }
906         } else {
907             interval.value = value;
908             interval.hasValue = true;
909         }
910         interval.sampleSize += 1;
911     }
912 
913     // Only trigger the tracker if all intervals are correct
914     if (useAnomalyDetection) {
915         // TODO: propgate proper values down stream when anomaly support doubles
916         long wholeBucketVal = intervals[0].value.long_value;
917         auto prev = mCurrentFullBucket.find(eventKey);
918         if (prev != mCurrentFullBucket.end()) {
919             wholeBucketVal += prev->second;
920         }
921         for (auto& tracker : mAnomalyTrackers) {
922             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
923                                              wholeBucketVal);
924         }
925     }
926 }
927 
928 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
929 // if mCondition is true!
flushIfNeededLocked(const int64_t & eventTimeNs)930 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
931     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
932     if (eventTimeNs < currentBucketEndTimeNs) {
933         VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
934              (long long)(currentBucketEndTimeNs));
935         return;
936     }
937     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
938     int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
939     flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
940 }
941 
calcBucketsForwardCount(const int64_t & eventTimeNs) const942 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
943     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
944     if (eventTimeNs < currentBucketEndTimeNs) {
945         return 0;
946     }
947     return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
948 }
949 
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)950 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
951                                                    const int64_t& nextBucketStartTimeNs) {
952     if (mCondition == ConditionState::kUnknown) {
953         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
954         invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
955     }
956 
957     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
958          (int)mCurrentSlicedBucket.size());
959 
960     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
961     int64_t bucketEndTime = fullBucketEndTimeNs;
962     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
963 
964     // Skip buckets if this is a pulled metric or a pushed metric that is diffed.
965     if (numBucketsForward > 1 && (mIsPulled || mUseDiff)) {
966 
967         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
968         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
969         // Something went wrong. Maybe the device was sleeping for a long time. It is better
970         // to mark the current bucket as invalid. The last pull might have been successful through.
971         invalidateCurrentBucketWithoutResetBase(eventTimeNs,
972                                                 BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
973         // End the bucket at the next bucket start time so the entire interval is skipped.
974         bucketEndTime = nextBucketStartTimeNs;
975     } else if (eventTimeNs < fullBucketEndTimeNs) {
976         bucketEndTime = eventTimeNs;
977     }
978 
979     // Close the current bucket.
980     int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
981     bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
982     if (!isBucketLargeEnough) {
983         skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
984     }
985     if (!mCurrentBucketIsSkipped) {
986         bool bucketHasData = false;
987         // The current bucket is large enough to keep.
988         for (const auto& slice : mCurrentSlicedBucket) {
989             ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
990             bucket.mConditionTrueNs = conditionTrueDuration;
991             // it will auto create new vector of ValuebucketInfo if the key is not found.
992             if (bucket.valueIndex.size() > 0) {
993                 auto& bucketList = mPastBuckets[slice.first];
994                 bucketList.push_back(bucket);
995                 bucketHasData = true;
996             }
997         }
998         if (!bucketHasData) {
999             skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
1000         }
1001     }
1002 
1003     if (mCurrentBucketIsSkipped) {
1004         mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
1005         mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
1006         mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
1007     }
1008 
1009     // This means that the current bucket was not flushed before a forced bucket split.
1010     // This can happen if an app update or a dump report with include_current_partial_bucket is
1011     // requested before we get a chance to flush the bucket due to receiving new data, either from
1012     // the statsd socket or the StatsPullerManager.
1013     if (bucketEndTime < nextBucketStartTimeNs) {
1014         SkippedBucket bucketInGap;
1015         bucketInGap.bucketStartTimeNs = bucketEndTime;
1016         bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
1017         bucketInGap.dropEvents.emplace_back(
1018                 buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
1019         mSkippedBuckets.emplace_back(bucketInGap);
1020     }
1021 
1022     appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
1023     initCurrentSlicedBucket(nextBucketStartTimeNs);
1024     // Update the condition timer again, in case we skipped buckets.
1025     mConditionTimer.newBucketStart(nextBucketStartTimeNs);
1026     mCurrentBucketNum += numBucketsForward;
1027 }
1028 
buildPartialBucket(int64_t bucketEndTime,const std::vector<Interval> & intervals)1029 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
1030                                                     const std::vector<Interval>& intervals) {
1031     ValueBucket bucket;
1032     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
1033     bucket.mBucketEndNs = bucketEndTime;
1034     for (const auto& interval : intervals) {
1035         if (interval.hasValue) {
1036             // skip the output if the diff is zero
1037             if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
1038                 continue;
1039             }
1040             bucket.valueIndex.push_back(interval.valueIndex);
1041             if (mAggregationType != ValueMetric::AVG) {
1042                 bucket.values.push_back(interval.value);
1043             } else {
1044                 double sum = interval.value.type == LONG ? (double)interval.value.long_value
1045                                                          : interval.value.double_value;
1046                 bucket.values.push_back(Value((double)sum / interval.sampleSize));
1047             }
1048         }
1049     }
1050     return bucket;
1051 }
1052 
initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)1053 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
1054     StatsdStats::getInstance().noteBucketCount(mMetricId);
1055     // Cleanup data structure to aggregate values.
1056     for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
1057         bool obsolete = true;
1058         for (auto& interval : it->second) {
1059             interval.hasValue = false;
1060             interval.sampleSize = 0;
1061             if (interval.seenNewData) {
1062                 obsolete = false;
1063             }
1064             interval.seenNewData = false;
1065         }
1066 
1067         if (obsolete) {
1068             it = mCurrentSlicedBucket.erase(it);
1069         } else {
1070             it++;
1071         }
1072         // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete
1073     }
1074 
1075     mCurrentBucketIsSkipped = false;
1076     mCurrentSkippedBucket.reset();
1077 
1078     // If we do not have a global base when the condition is true,
1079     // we will have incomplete bucket for the next bucket.
1080     if (mUseDiff && !mHasGlobalBase && mCondition) {
1081         mCurrentBucketIsSkipped = false;
1082     }
1083     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
1084     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
1085          (long long)mCurrentBucketStartTimeNs);
1086 }
1087 
appendToFullBucket(const bool isFullBucketReached)1088 void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
1089     if (mCurrentBucketIsSkipped) {
1090         if (isFullBucketReached) {
1091             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
1092             mCurrentFullBucket.clear();
1093         }
1094         // Current bucket is invalid, we do not add it to the full bucket.
1095         return;
1096     }
1097 
1098     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
1099         // Accumulate partial buckets with current value and then send to anomaly tracker.
1100         if (mCurrentFullBucket.size() > 0) {
1101             for (const auto& slice : mCurrentSlicedBucket) {
1102                 if (hitFullBucketGuardRailLocked(slice.first)) {
1103                     continue;
1104                 }
1105                 // TODO: fix this when anomaly can accept double values
1106                 auto& interval = slice.second[0];
1107                 if (interval.hasValue) {
1108                     mCurrentFullBucket[slice.first] += interval.value.long_value;
1109                 }
1110             }
1111             for (const auto& slice : mCurrentFullBucket) {
1112                 for (auto& tracker : mAnomalyTrackers) {
1113                     if (tracker != nullptr) {
1114                         tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum);
1115                     }
1116                 }
1117             }
1118             mCurrentFullBucket.clear();
1119         } else {
1120             // Skip aggregating the partial buckets since there's no previous partial bucket.
1121             for (const auto& slice : mCurrentSlicedBucket) {
1122                 for (auto& tracker : mAnomalyTrackers) {
1123                     if (tracker != nullptr) {
1124                         // TODO: fix this when anomaly can accept double values
1125                         auto& interval = slice.second[0];
1126                         if (interval.hasValue) {
1127                             tracker->addPastBucket(slice.first, interval.value.long_value,
1128                                                    mCurrentBucketNum);
1129                         }
1130                     }
1131                 }
1132             }
1133         }
1134     } else {
1135         // Accumulate partial bucket.
1136         for (const auto& slice : mCurrentSlicedBucket) {
1137             // TODO: fix this when anomaly can accept double values
1138             auto& interval = slice.second[0];
1139             if (interval.hasValue) {
1140                 mCurrentFullBucket[slice.first] += interval.value.long_value;
1141             }
1142         }
1143     }
1144 }
1145 
byteSizeLocked() const1146 size_t ValueMetricProducer::byteSizeLocked() const {
1147     size_t totalSize = 0;
1148     for (const auto& pair : mPastBuckets) {
1149         totalSize += pair.second.size() * kBucketSize;
1150     }
1151     return totalSize;
1152 }
1153 
1154 }  // namespace statsd
1155 }  // namespace os
1156 }  // namespace android
1157