1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17 #define DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "../guardrail/StatsdStats.h"
21 #include "GaugeMetricProducer.h"
22 #include "../stats_log_util.h"
23 
24 #include <cutils/log.h>
25 
26 using android::util::FIELD_COUNT_REPEATED;
27 using android::util::FIELD_TYPE_BOOL;
28 using android::util::FIELD_TYPE_FLOAT;
29 using android::util::FIELD_TYPE_INT32;
30 using android::util::FIELD_TYPE_INT64;
31 using android::util::FIELD_TYPE_MESSAGE;
32 using android::util::FIELD_TYPE_STRING;
33 using android::util::ProtoOutputStream;
34 using std::map;
35 using std::string;
36 using std::unordered_map;
37 using std::vector;
38 using std::make_shared;
39 using std::shared_ptr;
40 
41 namespace android {
42 namespace os {
43 namespace statsd {
44 
45 // for StatsLogReport
46 const int FIELD_ID_ID = 1;
47 const int FIELD_ID_GAUGE_METRICS = 8;
48 const int FIELD_ID_TIME_BASE = 9;
49 const int FIELD_ID_BUCKET_SIZE = 10;
50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
51 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
52 // for GaugeMetricDataWrapper
53 const int FIELD_ID_DATA = 1;
54 const int FIELD_ID_SKIPPED = 2;
55 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
56 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
57 // for GaugeMetricData
58 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
59 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
60 const int FIELD_ID_BUCKET_INFO = 3;
61 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
62 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
63 // for GaugeBucketInfo
64 const int FIELD_ID_ATOM = 3;
65 const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4;
66 const int FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP = 5;
67 const int FIELD_ID_BUCKET_NUM = 6;
68 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
69 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
70 
GaugeMetricProducer(const ConfigKey & key,const GaugeMetric & metric,const int conditionIndex,const sp<ConditionWizard> & wizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs,shared_ptr<StatsPullerManager> statsPullerManager)71 GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
72                                          const int conditionIndex,
73                                          const sp<ConditionWizard>& wizard, const int pullTagId,
74                                          const int64_t timeBaseNs, const int64_t startTimeNs,
75                                          shared_ptr<StatsPullerManager> statsPullerManager)
76     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
77       mStatsPullerManager(statsPullerManager),
78       mPullTagId(pullTagId),
79       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
80       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
81                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
82                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
83                                   : StatsdStats::kDimensionKeySizeSoftLimit),
84       mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
85                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
86                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
87                                   : StatsdStats::kDimensionKeySizeHardLimit),
88       mGaugeAtomsPerDimensionLimit(metric.max_num_gauge_atoms_per_bucket()) {
89     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
90     mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
91     int64_t bucketSizeMills = 0;
92     if (metric.has_bucket()) {
93         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
94     } else {
95         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
96     }
97     mBucketSizeNs = bucketSizeMills * 1000000;
98 
99     mSamplingType = metric.sampling_type();
100     if (!metric.gauge_fields_filter().include_all()) {
101         translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
102     }
103 
104     // TODO: use UidMap if uid->pkg_name is required
105     if (metric.has_dimensions_in_what()) {
106         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
107         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
108     }
109 
110     if (metric.has_dimensions_in_condition()) {
111         translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
112     }
113 
114     if (metric.links().size() > 0) {
115         for (const auto& link : metric.links()) {
116             Metric2Condition mc;
117             mc.conditionId = link.condition();
118             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
119             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
120             mMetric2ConditionLinks.push_back(mc);
121         }
122     }
123     mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
124     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
125             HasPositionALL(metric.dimensions_in_condition());
126 
127     flushIfNeededLocked(startTimeNs);
128     // Kicks off the puller immediately.
129     if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
130         mStatsPullerManager->RegisterReceiver(
131                 mPullTagId, this, getCurrentBucketEndTimeNs(), mBucketSizeNs);
132     }
133 
134     VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
135          (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs,
136          mConditionSliced);
137 }
138 
139 // for testing
GaugeMetricProducer(const ConfigKey & key,const GaugeMetric & metric,const int conditionIndex,const sp<ConditionWizard> & wizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs)140 GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
141                                          const int conditionIndex,
142                                          const sp<ConditionWizard>& wizard, const int pullTagId,
143                                          const int64_t timeBaseNs, const int64_t startTimeNs)
144     : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, timeBaseNs, startTimeNs,
145                           make_shared<StatsPullerManager>()) {
146 }
147 
~GaugeMetricProducer()148 GaugeMetricProducer::~GaugeMetricProducer() {
149     VLOG("~GaugeMetricProducer() called");
150     if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
151         mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
152     }
153 }
154 
dumpStatesLocked(FILE * out,bool verbose) const155 void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
156     if (mCurrentSlicedBucket == nullptr ||
157         mCurrentSlicedBucket->size() == 0) {
158         return;
159     }
160 
161     fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
162             (unsigned long)mCurrentSlicedBucket->size());
163     if (verbose) {
164         for (const auto& it : *mCurrentSlicedBucket) {
165             fprintf(out, "\t(what)%s\t(condition)%s  %d atoms\n",
166                 it.first.getDimensionKeyInWhat().toString().c_str(),
167                 it.first.getDimensionKeyInCondition().toString().c_str(),
168                 (int)it.second.size());
169         }
170     }
171 }
172 
clearPastBucketsLocked(const int64_t dumpTimeNs)173 void GaugeMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
174     flushIfNeededLocked(dumpTimeNs);
175     mPastBuckets.clear();
176     mSkippedBuckets.clear();
177 }
178 
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,std::set<string> * str_set,ProtoOutputStream * protoOutput)179 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
180                                              const bool include_current_partial_bucket,
181                                              std::set<string> *str_set,
182                                              ProtoOutputStream* protoOutput) {
183     VLOG("Gauge metric %lld report now...", (long long)mMetricId);
184     if (include_current_partial_bucket) {
185         flushLocked(dumpTimeNs);
186     } else {
187         flushIfNeededLocked(dumpTimeNs);
188     }
189 
190     if (mPastBuckets.empty()) {
191         return;
192     }
193 
194     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
195     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
196     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
197 
198     // Fills the dimension path if not slicing by ALL.
199     if (!mSliceByPositionALL) {
200         if (!mDimensionsInWhat.empty()) {
201             uint64_t dimenPathToken = protoOutput->start(
202                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
203             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
204             protoOutput->end(dimenPathToken);
205         }
206         if (!mDimensionsInCondition.empty()) {
207             uint64_t dimenPathToken = protoOutput->start(
208                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
209             writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
210             protoOutput->end(dimenPathToken);
211         }
212     }
213 
214     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
215 
216     for (const auto& pair : mSkippedBuckets) {
217         uint64_t wrapperToken =
218                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
219         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
220                            (long long)(NanoToMillis(pair.first)));
221         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
222                            (long long)(NanoToMillis(pair.second)));
223         protoOutput->end(wrapperToken);
224     }
225     mSkippedBuckets.clear();
226 
227     for (const auto& pair : mPastBuckets) {
228         const MetricDimensionKey& dimensionKey = pair.first;
229 
230         VLOG("Gauge dimension key %s", dimensionKey.toString().c_str());
231         uint64_t wrapperToken =
232                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
233 
234         // First fill dimension.
235         if (mSliceByPositionALL) {
236             uint64_t dimensionToken = protoOutput->start(
237                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
238             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
239             protoOutput->end(dimensionToken);
240 
241             if (dimensionKey.hasDimensionKeyInCondition()) {
242                 uint64_t dimensionInConditionToken = protoOutput->start(
243                         FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
244                 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(),
245                                       str_set, protoOutput);
246                 protoOutput->end(dimensionInConditionToken);
247             }
248         } else {
249             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
250                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
251             if (dimensionKey.hasDimensionKeyInCondition()) {
252                 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
253                                                FIELD_ID_DIMENSION_LEAF_IN_CONDITION,
254                                                str_set, protoOutput);
255             }
256         }
257 
258         // Then fill bucket_info (GaugeBucketInfo).
259         for (const auto& bucket : pair.second) {
260             uint64_t bucketInfoToken = protoOutput->start(
261                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
262 
263             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
264                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
265                                    (long long)NanoToMillis(bucket.mBucketStartNs));
266                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
267                                    (long long)NanoToMillis(bucket.mBucketEndNs));
268             } else {
269                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
270                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
271             }
272 
273             if (!bucket.mGaugeAtoms.empty()) {
274                 for (const auto& atom : bucket.mGaugeAtoms) {
275                     uint64_t atomsToken =
276                         protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
277                                            FIELD_ID_ATOM);
278                     writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput);
279                     protoOutput->end(atomsToken);
280                 }
281                 const bool truncateTimestamp =
282                         android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.find(
283                                 mTagId) ==
284                         android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.end();
285                 for (const auto& atom : bucket.mGaugeAtoms) {
286                     const int64_t elapsedTimestampNs =  truncateTimestamp ?
287                         truncateTimestampNsToFiveMinutes(atom.mElapsedTimestamps) :
288                             atom.mElapsedTimestamps;
289                     const int64_t wallClockNs = truncateTimestamp ?
290                         truncateTimestampNsToFiveMinutes(atom.mWallClockTimestampNs) :
291                             atom.mWallClockTimestampNs;
292                     protoOutput->write(
293                         FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
294                         (long long)elapsedTimestampNs);
295                     protoOutput->write(
296                         FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED |
297                             FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP,
298                         (long long)wallClockNs);
299                 }
300             }
301             protoOutput->end(bucketInfoToken);
302             VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.",
303                  (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs,
304                  (int)bucket.mGaugeAtoms.size());
305         }
306         protoOutput->end(wrapperToken);
307     }
308     protoOutput->end(protoToken);
309 
310     mPastBuckets.clear();
311     // TODO: Clear mDimensionKeyMap once the report is dumped.
312 }
313 
pullLocked(const int64_t timestampNs)314 void GaugeMetricProducer::pullLocked(const int64_t timestampNs) {
315     bool triggerPuller = false;
316     switch(mSamplingType) {
317         // When the metric wants to do random sampling and there is already one gauge atom for the
318         // current bucket, do not do it again.
319         case GaugeMetric::RANDOM_ONE_SAMPLE: {
320             triggerPuller = mCondition && mCurrentSlicedBucket->empty();
321             break;
322         }
323         case GaugeMetric::ALL_CONDITION_CHANGES: {
324             triggerPuller = true;
325             break;
326         }
327         case GaugeMetric::CONDITION_CHANGE_TO_TRUE: {
328             triggerPuller = mCondition;
329             break;
330         }
331         default:
332             break;
333     }
334     if (!triggerPuller) {
335         return;
336     }
337 
338     vector<std::shared_ptr<LogEvent>> allData;
339     if (!mStatsPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
340         ALOGE("Gauge Stats puller failed for tag: %d", mPullTagId);
341         return;
342     }
343 
344     for (const auto& data : allData) {
345         onMatchedLogEventLocked(0, *data);
346     }
347 }
348 
onConditionChangedLocked(const bool conditionMet,const int64_t eventTimeNs)349 void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
350                                                    const int64_t eventTimeNs) {
351     VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId);
352     flushIfNeededLocked(eventTimeNs);
353     mCondition = conditionMet;
354 
355     if (mPullTagId != -1) {
356         pullLocked(eventTimeNs);
357     }  // else: Push mode. No need to proactively pull the gauge data.
358 }
359 
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTimeNs)360 void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
361                                                            const int64_t eventTimeNs) {
362     VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId,
363          overallCondition);
364     flushIfNeededLocked(eventTimeNs);
365     // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will
366     // pull for every dimension.
367     mCondition = overallCondition;
368     if (mPullTagId != -1) {
369         pullLocked(eventTimeNs);
370     }  // else: Push mode. No need to proactively pull the gauge data.
371 }
372 
getGaugeFields(const LogEvent & event)373 std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
374     if (mFieldMatchers.size() > 0) {
375         std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>();
376         filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
377         return gaugeFields;
378     } else {
379         return std::make_shared<vector<FieldValue>>(event.getValues());
380     }
381 }
382 
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData)383 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
384     std::lock_guard<std::mutex> lock(mMutex);
385     if (allData.size() == 0) {
386         return;
387     }
388     for (const auto& data : allData) {
389         onMatchedLogEventLocked(0, *data);
390     }
391 }
392 
hitGuardRailLocked(const MetricDimensionKey & newKey)393 bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
394     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
395         return false;
396     }
397     // 1. Report the tuple count if the tuple count > soft limit
398     if (mCurrentSlicedBucket->size() > mDimensionSoftLimit - 1) {
399         size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
400         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
401         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
402         if (newTupleCount > mDimensionHardLimit) {
403             ALOGE("GaugeMetric %lld dropping data for dimension key %s",
404                 (long long)mMetricId, newKey.toString().c_str());
405             return true;
406         }
407     }
408 
409     return false;
410 }
411 
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event)412 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
413         const size_t matcherIndex, const MetricDimensionKey& eventKey,
414         const ConditionKey& conditionKey, bool condition,
415         const LogEvent& event) {
416     if (condition == false) {
417         return;
418     }
419     int64_t eventTimeNs = event.GetElapsedTimestampNs();
420     mTagId = event.GetTagId();
421     if (eventTimeNs < mCurrentBucketStartTimeNs) {
422         VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
423              (long long)mCurrentBucketStartTimeNs);
424         return;
425     }
426     flushIfNeededLocked(eventTimeNs);
427 
428     // When gauge metric wants to randomly sample the output atom, we just simply use the first
429     // gauge in the given bucket.
430     if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
431         mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
432         return;
433     }
434     if (hitGuardRailLocked(eventKey)) {
435         return;
436     }
437     if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) {
438         return;
439     }
440     GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs, getWallClockNs());
441     (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
442     // Anomaly detection on gauge metric only works when there is one numeric
443     // field specified.
444     if (mAnomalyTrackers.size() > 0) {
445         if (gaugeAtom.mFields->size() == 1) {
446             const Value& value = gaugeAtom.mFields->begin()->mValue;
447             long gaugeVal = 0;
448             if (value.getType() == INT) {
449                 gaugeVal = (long)value.int_value;
450             } else if (value.getType() == LONG) {
451                 gaugeVal = value.long_value;
452             }
453             for (auto& tracker : mAnomalyTrackers) {
454                 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
455                                                  gaugeVal);
456             }
457         }
458     }
459 }
460 
updateCurrentSlicedBucketForAnomaly()461 void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
462     for (const auto& slice : *mCurrentSlicedBucket) {
463         if (slice.second.empty()) {
464             continue;
465         }
466         const Value& value = slice.second.front().mFields->front().mValue;
467         long gaugeVal = 0;
468         if (value.getType() == INT) {
469             gaugeVal = (long)value.int_value;
470         } else if (value.getType() == LONG) {
471             gaugeVal = value.long_value;
472         }
473         (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
474     }
475 }
476 
dropDataLocked(const int64_t dropTimeNs)477 void GaugeMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
478     flushIfNeededLocked(dropTimeNs);
479     mPastBuckets.clear();
480 }
481 
482 // When a new matched event comes in, we check if event falls into the current
483 // bucket. If not, flush the old counter to past buckets and initialize the new
484 // bucket.
485 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
486 // the GaugeMetricProducer while holding the lock.
flushIfNeededLocked(const int64_t & eventTimeNs)487 void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
488     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
489 
490     if (eventTimeNs < currentBucketEndTimeNs) {
491         VLOG("Gauge eventTime is %lld, less than next bucket start time %lld",
492              (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
493         return;
494     }
495 
496     flushCurrentBucketLocked(eventTimeNs);
497 
498     // Adjusts the bucket start and end times.
499     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
500     mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
501     mCurrentBucketNum += numBucketsForward;
502     VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId,
503          (long long)mCurrentBucketStartTimeNs);
504 }
505 
flushCurrentBucketLocked(const int64_t & eventTimeNs)506 void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
507     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
508 
509     GaugeBucket info;
510     info.mBucketStartNs = mCurrentBucketStartTimeNs;
511     if (eventTimeNs < fullBucketEndTimeNs) {
512         info.mBucketEndNs = eventTimeNs;
513     } else {
514         info.mBucketEndNs = fullBucketEndTimeNs;
515     }
516 
517     if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
518         for (const auto& slice : *mCurrentSlicedBucket) {
519             info.mGaugeAtoms = slice.second;
520             auto& bucketList = mPastBuckets[slice.first];
521             bucketList.push_back(info);
522             VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
523                  slice.first.toString().c_str());
524         }
525     } else {
526         mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
527     }
528 
529     // If we have anomaly trackers, we need to update the partial bucket values.
530     if (mAnomalyTrackers.size() > 0) {
531         updateCurrentSlicedBucketForAnomaly();
532 
533         if (eventTimeNs > fullBucketEndTimeNs) {
534             // This is known to be a full bucket, so send this data to the anomaly tracker.
535             for (auto& tracker : mAnomalyTrackers) {
536                 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
537             }
538             mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
539         }
540     }
541 
542     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
543 }
544 
byteSizeLocked() const545 size_t GaugeMetricProducer::byteSizeLocked() const {
546     size_t totalSize = 0;
547     for (const auto& pair : mPastBuckets) {
548         for (const auto& bucket : pair.second) {
549             totalSize += bucket.mGaugeAtoms.size() * sizeof(GaugeAtom);
550             for (const auto& atom : bucket.mGaugeAtoms) {
551                 if (atom.mFields != nullptr) {
552                     totalSize += atom.mFields->size() * sizeof(FieldValue);
553                 }
554             }
555         }
556     }
557     return totalSize;
558 }
559 
560 }  // namespace statsd
561 }  // namespace os
562 }  // namespace android
563