1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define DEBUG false // STOPSHIP if true 18 #include "Log.h" 19 20 #include "ValueMetricProducer.h" 21 22 #include <limits.h> 23 #include <stdlib.h> 24 25 #include "../guardrail/StatsdStats.h" 26 #include "../stats_log_util.h" 27 #include "metrics/parsing_utils/metrics_manager_util.h" 28 29 using android::util::FIELD_COUNT_REPEATED; 30 using android::util::FIELD_TYPE_BOOL; 31 using android::util::FIELD_TYPE_DOUBLE; 32 using android::util::FIELD_TYPE_INT32; 33 using android::util::FIELD_TYPE_INT64; 34 using android::util::FIELD_TYPE_MESSAGE; 35 using android::util::FIELD_TYPE_STRING; 36 using android::util::ProtoOutputStream; 37 using std::map; 38 using std::shared_ptr; 39 using std::unordered_map; 40 41 namespace android { 42 namespace os { 43 namespace statsd { 44 45 // for StatsLogReport 46 const int FIELD_ID_ID = 1; 47 const int FIELD_ID_VALUE_METRICS = 7; 48 const int FIELD_ID_TIME_BASE = 9; 49 const int FIELD_ID_BUCKET_SIZE = 10; 50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; 51 const int FIELD_ID_IS_ACTIVE = 14; 52 // for ValueMetricDataWrapper 53 const int FIELD_ID_DATA = 1; 54 const int FIELD_ID_SKIPPED = 2; 55 // for SkippedBuckets 56 const int FIELD_ID_SKIPPED_START_MILLIS = 3; 57 const int FIELD_ID_SKIPPED_END_MILLIS = 4; 58 const int FIELD_ID_SKIPPED_DROP_EVENT = 5; 59 // for DumpEvent Proto 60 const int FIELD_ID_BUCKET_DROP_REASON = 1; 61 const int FIELD_ID_DROP_TIME = 2; 62 // for ValueMetricData 63 const int FIELD_ID_DIMENSION_IN_WHAT = 1; 64 const int FIELD_ID_BUCKET_INFO = 3; 65 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; 66 const int FIELD_ID_SLICE_BY_STATE = 6; 67 // for ValueBucketInfo 68 const int FIELD_ID_VALUE_INDEX = 1; 69 const int FIELD_ID_VALUE_LONG = 2; 70 const int FIELD_ID_VALUE_DOUBLE = 3; 71 const int FIELD_ID_VALUES = 9; 72 const int FIELD_ID_BUCKET_NUM = 4; 73 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; 74 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; 75 const int FIELD_ID_CONDITION_TRUE_NS = 10; 76 77 const Value ZERO_LONG((int64_t)0); 78 const Value ZERO_DOUBLE((int64_t)0); 79 80 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently 81 ValueMetricProducer::ValueMetricProducer( 82 const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, 83 const vector<ConditionState>& initialConditionCache, 84 const sp<ConditionWizard>& conditionWizard, const uint64_t protoHash, 85 const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard, 86 const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs, 87 const sp<StatsPullerManager>& pullerManager, 88 const unordered_map<int, shared_ptr<Activation>>& eventActivationMap, 89 const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap, 90 const vector<int>& slicedStateAtoms, 91 const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap) 92 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, 93 conditionWizard, protoHash, eventActivationMap, eventDeactivationMap, 94 slicedStateAtoms, stateGroupMap), 95 mWhatMatcherIndex(whatMatcherIndex), 96 mEventMatcherWizard(matcherWizard), 97 mPullerManager(pullerManager), 98 mPullTagId(pullTagId), 99 mIsPulled(pullTagId != -1), 100 mMinBucketSizeNs(metric.min_bucket_size_nanos()), 101 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 102 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 103 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first 104 : StatsdStats::kDimensionKeySizeSoftLimit), 105 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 106 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 107 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second 108 : StatsdStats::kDimensionKeySizeHardLimit), 109 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), 110 mAggregationType(metric.aggregation_type()), 111 mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), 112 mValueDirection(metric.value_direction()), 113 mSkipZeroDiffOutput(metric.skip_zero_diff_output()), 114 mUseZeroDefaultBase(metric.use_zero_default_base()), 115 mHasGlobalBase(false), 116 mCurrentBucketIsSkipped(false), 117 mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC 118 : StatsdStats::kPullMaxDelayNs), 119 mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), 120 // Condition timer will be set later within the constructor after pulling events 121 mConditionTimer(false, timeBaseNs) { 122 int64_t bucketSizeMills = 0; 123 if (metric.has_bucket()) { 124 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); 125 } else { 126 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 127 } 128 129 mBucketSizeNs = bucketSizeMills * 1000000; 130 131 translateFieldMatcher(metric.value_field(), &mFieldMatchers); 132 133 if (metric.has_dimensions_in_what()) { 134 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); 135 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); 136 mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()); 137 } 138 139 if (metric.links().size() > 0) { 140 for (const auto& link : metric.links()) { 141 Metric2Condition mc; 142 mc.conditionId = link.condition(); 143 translateFieldMatcher(link.fields_in_what(), &mc.metricFields); 144 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); 145 mMetric2ConditionLinks.push_back(mc); 146 } 147 mConditionSliced = true; 148 } 149 150 for (const auto& stateLink : metric.state_link()) { 151 Metric2State ms; 152 ms.stateAtomId = stateLink.state_atom_id(); 153 translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields); 154 translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields); 155 mMetric2StateLinks.push_back(ms); 156 } 157 158 if (metric.has_threshold()) { 159 mUploadThreshold = metric.threshold(); 160 } 161 162 int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs); 163 mCurrentBucketNum += numBucketsForward; 164 165 flushIfNeededLocked(startTimeNs); 166 167 if (mIsPulled) { 168 mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(), 169 mBucketSizeNs); 170 } 171 172 // Only do this for partial buckets like first bucket. All other buckets should use 173 // flushIfNeeded to adjust start and end to bucket boundaries. 174 // Adjust start for partial bucket 175 mCurrentBucketStartTimeNs = startTimeNs; 176 mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs); 177 178 // Now that activations are processed, start the condition timer if needed. 179 mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue, 180 mCurrentBucketStartTimeNs); 181 182 VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), 183 (long long)mBucketSizeNs, (long long)mTimeBaseNs); 184 } 185 186 ValueMetricProducer::~ValueMetricProducer() { 187 VLOG("~ValueMetricProducer() called"); 188 if (mIsPulled) { 189 mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this); 190 } 191 } 192 193 bool ValueMetricProducer::onConfigUpdatedLocked( 194 const StatsdConfig& config, const int configIndex, const int metricIndex, 195 const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers, 196 const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap, 197 const unordered_map<int64_t, int>& newAtomMatchingTrackerMap, 198 const sp<EventMatcherWizard>& matcherWizard, 199 const vector<sp<ConditionTracker>>& allConditionTrackers, 200 const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard, 201 const unordered_map<int64_t, int>& metricToActivationMap, 202 unordered_map<int, vector<int>>& trackerToMetricMap, 203 unordered_map<int, vector<int>>& conditionToMetricMap, 204 unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap, 205 unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap, 206 vector<int>& metricsWithActivation) { 207 if (!MetricProducer::onConfigUpdatedLocked( 208 config, configIndex, metricIndex, allAtomMatchingTrackers, 209 oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard, 210 allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap, 211 trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap, 212 deactivationAtomTrackerToMetricMap, metricsWithActivation)) { 213 return false; 214 } 215 216 const ValueMetric& metric = config.value_metric(configIndex); 217 // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps. 218 if (!handleMetricWithAtomMatchingTrackers(metric.what(), metricIndex, /*enforceOneAtom=*/false, 219 allAtomMatchingTrackers, newAtomMatchingTrackerMap, 220 trackerToMetricMap, mWhatMatcherIndex)) { 221 return false; 222 } 223 224 if (metric.has_condition() && 225 !handleMetricWithConditions(metric.condition(), metricIndex, conditionTrackerMap, 226 metric.links(), allConditionTrackers, mConditionTrackerIndex, 227 conditionToMetricMap)) { 228 return false; 229 } 230 sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard; 231 mEventMatcherWizard = matcherWizard; 232 return true; 233 } 234 235 void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId, 236 const HashableDimensionKey& primaryKey, 237 const FieldValue& oldState, const FieldValue& newState) { 238 VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d", 239 (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(), 240 oldState.mValue.int_value, newState.mValue.int_value); 241 242 // If old and new states are in the same StateGroup, then we do not need to 243 // pull for this state change. 244 FieldValue oldStateCopy = oldState; 245 FieldValue newStateCopy = newState; 246 mapStateValue(atomId, &oldStateCopy); 247 mapStateValue(atomId, &newStateCopy); 248 if (oldStateCopy == newStateCopy) { 249 return; 250 } 251 252 // If condition is not true or metric is not active, we do not need to pull 253 // for this state change. 254 if (mCondition != ConditionState::kTrue || !mIsActive) { 255 return; 256 } 257 258 bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs; 259 if (isEventLate) { 260 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 261 (long long)mCurrentBucketStartTimeNs); 262 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); 263 return; 264 } 265 mStateChangePrimaryKey.first = atomId; 266 mStateChangePrimaryKey.second = primaryKey; 267 if (mIsPulled) { 268 pullAndMatchEventsLocked(eventTimeNs); 269 } 270 mStateChangePrimaryKey.first = 0; 271 mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY; 272 flushIfNeededLocked(eventTimeNs); 273 } 274 275 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, 276 const int64_t eventTime) { 277 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); 278 } 279 280 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { 281 StatsdStats::getInstance().noteBucketDropped(mMetricId); 282 283 // The current partial bucket is not flushed and does not require a pull, 284 // so the data is still valid. 285 flushIfNeededLocked(dropTimeNs); 286 clearPastBucketsLocked(dropTimeNs); 287 } 288 289 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { 290 mPastBuckets.clear(); 291 mSkippedBuckets.clear(); 292 } 293 294 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, 295 const bool include_current_partial_bucket, 296 const bool erase_data, 297 const DumpLatency dumpLatency, 298 std::set<string> *str_set, 299 ProtoOutputStream* protoOutput) { 300 VLOG("metric %lld dump report now...", (long long)mMetricId); 301 if (include_current_partial_bucket) { 302 // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the 303 // current bucket will have incomplete data and the next will have the wrong snapshot to do 304 // a diff against. If the condition is false, we are fine since the base data is reset and 305 // we are not tracking anything. 306 bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; 307 if (pullNeeded) { 308 switch (dumpLatency) { 309 case FAST: 310 invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED); 311 break; 312 case NO_TIME_CONSTRAINTS: 313 pullAndMatchEventsLocked(dumpTimeNs); 314 break; 315 } 316 } 317 flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs); 318 } 319 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 320 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); 321 322 if (mPastBuckets.empty() && mSkippedBuckets.empty()) { 323 return; 324 } 325 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs); 326 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs); 327 // Fills the dimension path if not slicing by ALL. 328 if (!mSliceByPositionALL) { 329 if (!mDimensionsInWhat.empty()) { 330 uint64_t dimenPathToken = 331 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); 332 writeDimensionPathToProto(mDimensionsInWhat, protoOutput); 333 protoOutput->end(dimenPathToken); 334 } 335 } 336 337 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); 338 339 for (const auto& skippedBucket : mSkippedBuckets) { 340 uint64_t wrapperToken = 341 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); 342 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, 343 (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs))); 344 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, 345 (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs))); 346 for (const auto& dropEvent : skippedBucket.dropEvents) { 347 uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | 348 FIELD_ID_SKIPPED_DROP_EVENT); 349 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason); 350 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, 351 (long long)(NanoToMillis(dropEvent.dropTimeNs))); 352 protoOutput->end(dropEventToken); 353 } 354 protoOutput->end(wrapperToken); 355 } 356 357 for (const auto& pair : mPastBuckets) { 358 const MetricDimensionKey& dimensionKey = pair.first; 359 VLOG(" dimension key %s", dimensionKey.toString().c_str()); 360 uint64_t wrapperToken = 361 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 362 363 // First fill dimension. 364 if (mSliceByPositionALL) { 365 uint64_t dimensionToken = 366 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 367 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); 368 protoOutput->end(dimensionToken); 369 } else { 370 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), 371 FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); 372 } 373 374 // Then fill slice_by_state. 375 for (auto state : dimensionKey.getStateValuesKey().getValues()) { 376 uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | 377 FIELD_ID_SLICE_BY_STATE); 378 writeStateToProto(state, protoOutput); 379 protoOutput->end(stateToken); 380 } 381 382 // Then fill bucket_info (ValueBucketInfo). 383 for (const auto& bucket : pair.second) { 384 uint64_t bucketInfoToken = protoOutput->start( 385 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 386 387 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) { 388 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS, 389 (long long)NanoToMillis(bucket.mBucketStartNs)); 390 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS, 391 (long long)NanoToMillis(bucket.mBucketEndNs)); 392 } else { 393 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, 394 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); 395 } 396 // We only write the condition timer value if the metric has a 397 // condition and/or is sliced by state. 398 // If the metric is sliced by state, the condition timer value is 399 // also sliced by state to reflect time spent in that state. 400 if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) { 401 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS, 402 (long long)bucket.mConditionTrueNs); 403 } 404 for (int i = 0; i < (int)bucket.valueIndex.size(); i++) { 405 int index = bucket.valueIndex[i]; 406 const Value& value = bucket.values[i]; 407 uint64_t valueToken = protoOutput->start( 408 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); 409 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, 410 index); 411 if (value.getType() == LONG) { 412 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, 413 (long long)value.long_value); 414 VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs, 415 (long long)bucket.mBucketEndNs, index, (long long)value.long_value); 416 } else if (value.getType() == DOUBLE) { 417 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, 418 value.double_value); 419 VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs, 420 (long long)bucket.mBucketEndNs, index, value.double_value); 421 } else { 422 VLOG("Wrong value type for ValueMetric output: %d", value.getType()); 423 } 424 protoOutput->end(valueToken); 425 } 426 protoOutput->end(bucketInfoToken); 427 } 428 protoOutput->end(wrapperToken); 429 } 430 protoOutput->end(protoToken); 431 432 VLOG("metric %lld done with dump report...", (long long)mMetricId); 433 if (erase_data) { 434 mPastBuckets.clear(); 435 mSkippedBuckets.clear(); 436 } 437 } 438 439 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, 440 const BucketDropReason reason) { 441 if (!mCurrentBucketIsSkipped) { 442 // Only report to StatsdStats once per invalid bucket. 443 StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); 444 } 445 446 skipCurrentBucket(dropTimeNs, reason); 447 } 448 449 void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, 450 const BucketDropReason reason) { 451 invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason); 452 resetBase(); 453 } 454 455 void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs, 456 const BucketDropReason reason) { 457 if (!maxDropEventsReached()) { 458 mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); 459 } 460 mCurrentBucketIsSkipped = true; 461 } 462 463 void ValueMetricProducer::resetBase() { 464 for (auto& slice : mCurrentBaseInfo) { 465 for (auto& baseInfo : slice.second.baseInfos) { 466 baseInfo.hasBase = false; 467 } 468 } 469 mHasGlobalBase = false; 470 } 471 472 // Handle active state change. Active state change is treated like a condition change: 473 // - drop bucket if active state change event arrives too late 474 // - if condition is true, pull data on active state changes 475 // - ConditionTimer tracks changes based on AND of condition and active state. 476 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) { 477 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; 478 if (isEventTooLate) { 479 // Drop bucket because event arrived too late, ie. we are missing data for this bucket. 480 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); 481 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); 482 } 483 484 // Call parent method once we've verified the validity of current bucket. 485 MetricProducer::onActiveStateChangedLocked(eventTimeNs); 486 487 if (ConditionState::kTrue != mCondition) { 488 return; 489 } 490 491 // Pull on active state changes. 492 if (!isEventTooLate) { 493 if (mIsPulled) { 494 pullAndMatchEventsLocked(eventTimeNs); 495 } 496 // When active state changes from true to false, clear diff base but don't 497 // reset other counters as we may accumulate more value in the bucket. 498 if (mUseDiff && !mIsActive) { 499 resetBase(); 500 } 501 } 502 503 flushIfNeededLocked(eventTimeNs); 504 505 // Let condition timer know of new active state. 506 mConditionTimer.onConditionChanged(mIsActive, eventTimeNs); 507 508 updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs); 509 } 510 511 void ValueMetricProducer::onConditionChangedLocked(const bool condition, 512 const int64_t eventTimeNs) { 513 ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; 514 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; 515 516 // If the config is not active, skip the event. 517 if (!mIsActive) { 518 mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition; 519 return; 520 } 521 522 // If the event arrived late, mark the bucket as invalid and skip the event. 523 if (isEventTooLate) { 524 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 525 (long long)mCurrentBucketStartTimeNs); 526 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); 527 StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); 528 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); 529 mCondition = ConditionState::kUnknown; 530 mConditionTimer.onConditionChanged(mCondition, eventTimeNs); 531 532 updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); 533 return; 534 } 535 536 // If the previous condition was unknown, mark the bucket as invalid 537 // because the bucket will contain partial data. For example, the condition 538 // change might happen close to the end of the bucket and we might miss a 539 // lot of data. 540 // 541 // We still want to pull to set the base. 542 if (mCondition == ConditionState::kUnknown) { 543 invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); 544 } 545 546 // Pull and match for the following condition change cases: 547 // unknown/false -> true - condition changed 548 // true -> false - condition changed 549 // true -> true - old condition was true so we can flush the bucket at the 550 // end if needed. 551 // 552 // We don’t need to pull for unknown -> false or false -> false. 553 // 554 // onConditionChangedLocked might happen on bucket boundaries if this is 555 // called before #onDataPulled. 556 if (mIsPulled && 557 (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) { 558 pullAndMatchEventsLocked(eventTimeNs); 559 } 560 561 // For metrics that use diff, when condition changes from true to false, 562 // clear diff base but don't reset other counts because we may accumulate 563 // more value in the bucket. 564 if (mUseDiff && 565 (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) { 566 resetBase(); 567 } 568 569 // Update condition state after pulling. 570 mCondition = newCondition; 571 572 flushIfNeededLocked(eventTimeNs); 573 mConditionTimer.onConditionChanged(mCondition, eventTimeNs); 574 575 updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs); 576 } 577 578 void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers(bool newCondition, 579 int64_t eventTimeNs) { 580 if (mSlicedStateAtoms.empty()) { 581 return; 582 } 583 584 // Utilize the current state key of each DimensionsInWhat key to determine 585 // which condition timers to update. 586 // 587 // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`. 588 bool inPulledData; 589 for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mCurrentBaseInfo) { 590 // If the new condition is true, turn ON the condition timer only if 591 // the DimensionInWhat key was present in the pulled data. 592 inPulledData = dimensionInWhatInfo.hasCurrentState; 593 mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey, 594 dimensionInWhatInfo.currentState)] 595 .conditionTimer.onConditionChanged(newCondition && inPulledData, eventTimeNs); 596 } 597 } 598 599 void ValueMetricProducer::prepareFirstBucketLocked() { 600 // Kicks off the puller immediately if condition is true and diff based. 601 if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) { 602 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs); 603 } 604 } 605 606 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { 607 vector<std::shared_ptr<LogEvent>> allData; 608 if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) { 609 ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); 610 invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED); 611 return; 612 } 613 614 accumulateEvents(allData, timestampNs, timestampNs); 615 } 616 617 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { 618 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; 619 } 620 621 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely 622 // to be delayed. Other events like condition changes or app upgrade which are not based on 623 // AlarmManager might have arrived earlier and close the bucket. 624 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, 625 bool pullSuccess, int64_t originalPullTimeNs) { 626 std::lock_guard<std::mutex> lock(mMutex); 627 if (mCondition == ConditionState::kTrue) { 628 // If the pull failed, we won't be able to compute a diff. 629 if (!pullSuccess) { 630 invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED); 631 } else { 632 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); 633 if (isEventLate) { 634 // If the event is late, we are in the middle of a bucket. Just 635 // process the data without trying to snap the data to the nearest bucket. 636 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs); 637 } else { 638 // For scheduled pulled data, the effective event time is snap to the nearest 639 // bucket end. In the case of waking up from a deep sleep state, we will 640 // attribute to the previous bucket end. If the sleep was long but not very 641 // long, we will be in the immediate next bucket. Previous bucket may get a 642 // larger number as we pull at a later time than real bucket end. 643 // 644 // If the sleep was very long, we skip more than one bucket before sleep. In 645 // this case, if the diff base will be cleared and this new data will serve as 646 // new diff base. 647 int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; 648 StatsdStats::getInstance().noteBucketBoundaryDelayNs( 649 mMetricId, originalPullTimeNs - bucketEndTime); 650 accumulateEvents(allData, originalPullTimeNs, bucketEndTime); 651 } 652 } 653 } 654 655 // We can probably flush the bucket. Since we used bucketEndTime when calling 656 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. 657 flushIfNeededLocked(originalPullTimeNs); 658 } 659 660 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, 661 int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { 662 bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; 663 if (isEventLate) { 664 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", 665 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); 666 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); 667 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); 668 return; 669 } 670 671 const int64_t elapsedRealtimeNs = getElapsedRealtimeNs(); 672 const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs; 673 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); 674 if (pullDelayNs > mMaxPullDelayNs) { 675 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, 676 (long long)mMaxPullDelayNs); 677 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); 678 // We are missing one pull from the bucket which means we will not have a complete view of 679 // what's going on. 680 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED); 681 return; 682 } 683 684 mMatchedMetricDimensionKeys.clear(); 685 for (const auto& data : allData) { 686 LogEvent localCopy = data->makeCopy(); 687 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == 688 MatchingState::kMatched) { 689 localCopy.setElapsedTimestampNs(eventElapsedTimeNs); 690 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); 691 } 692 } 693 // If a key that is: 694 // 1. Tracked in mCurrentSlicedBucket and 695 // 2. A superset of the current mStateChangePrimaryKey 696 // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys) 697 // then we need to reset the base. 698 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { 699 const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat(); 700 bool presentInPulledData = 701 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end(); 702 if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) { 703 auto it = mCurrentBaseInfo.find(whatKey); 704 for (auto& baseInfo : it->second.baseInfos) { 705 baseInfo.hasBase = false; 706 } 707 // Set to false when DimensionInWhat key is not present in a pull. 708 // Used in onMatchedLogEventInternalLocked() to ensure the condition 709 // timer is turned on the next pull when data is present. 710 it->second.hasCurrentState = false; 711 // Turn OFF condition timer for keys not present in pulled data. 712 currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs); 713 } 714 } 715 mMatchedMetricDimensionKeys.clear(); 716 mHasGlobalBase = true; 717 718 // If we reach the guardrail, we might have dropped some data which means the bucket is 719 // incomplete. 720 // 721 // The base also needs to be reset. If we do not have the full data, we might 722 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key 723 // might be missing from mCurrentSlicedBucket. 724 if (hasReachedGuardRailLimit()) { 725 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED); 726 mCurrentSlicedBucket.clear(); 727 } 728 } 729 730 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { 731 if (mCurrentSlicedBucket.size() == 0) { 732 return; 733 } 734 735 fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId, 736 (unsigned long)mCurrentSlicedBucket.size()); 737 if (verbose) { 738 for (const auto& it : mCurrentSlicedBucket) { 739 for (const auto& interval : it.second.intervals) { 740 fprintf(out, "\t(what)%s\t(states)%s (value)%s\n", 741 it.first.getDimensionKeyInWhat().toString().c_str(), 742 it.first.getStateValuesKey().toString().c_str(), 743 interval.value.toString().c_str()); 744 } 745 } 746 } 747 } 748 749 bool ValueMetricProducer::hasReachedGuardRailLimit() const { 750 return mCurrentSlicedBucket.size() >= mDimensionHardLimit; 751 } 752 753 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { 754 // ===========GuardRail============== 755 // 1. Report the tuple count if the tuple count > soft limit 756 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) { 757 return false; 758 } 759 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) { 760 size_t newTupleCount = mCurrentSlicedBucket.size() + 1; 761 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 762 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 763 if (hasReachedGuardRailLimit()) { 764 ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, 765 newKey.toString().c_str()); 766 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId); 767 return true; 768 } 769 } 770 771 return false; 772 } 773 774 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) { 775 // ===========GuardRail============== 776 // 1. Report the tuple count if the tuple count > soft limit 777 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) { 778 return false; 779 } 780 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) { 781 size_t newTupleCount = mCurrentFullBucket.size() + 1; 782 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 783 if (newTupleCount > mDimensionHardLimit) { 784 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s", 785 (long long)mMetricId, 786 newKey.toString().c_str()); 787 return true; 788 } 789 } 790 791 return false; 792 } 793 794 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { 795 for (const FieldValue& value : event.getValues()) { 796 if (value.mField.matches(matcher)) { 797 switch (value.mValue.type) { 798 case INT: 799 ret.setLong(value.mValue.int_value); 800 break; 801 case LONG: 802 ret.setLong(value.mValue.long_value); 803 break; 804 case FLOAT: 805 ret.setDouble(value.mValue.float_value); 806 break; 807 case DOUBLE: 808 ret.setDouble(value.mValue.double_value); 809 break; 810 default: 811 return false; 812 break; 813 } 814 return true; 815 } 816 } 817 return false; 818 } 819 820 bool ValueMetricProducer::multipleBucketsSkipped(const int64_t numBucketsForward) { 821 // Skip buckets if this is a pulled metric or a pushed metric that is diffed. 822 return numBucketsForward > 1 && (mIsPulled || mUseDiff); 823 } 824 825 void ValueMetricProducer::onMatchedLogEventInternalLocked( 826 const size_t matcherIndex, const MetricDimensionKey& eventKey, 827 const ConditionKey& conditionKey, bool condition, const LogEvent& event, 828 const map<int, HashableDimensionKey>& statePrimaryKeys) { 829 auto whatKey = eventKey.getDimensionKeyInWhat(); 830 auto stateKey = eventKey.getStateValuesKey(); 831 832 // Skip this event if a state changed occurred for a different primary key. 833 auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first); 834 // Check that both the atom id and the primary key are equal. 835 if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) { 836 VLOG("ValueMetric skip event with primary key %s because state change primary key " 837 "is %s", 838 it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str()); 839 return; 840 } 841 842 int64_t eventTimeNs = event.GetElapsedTimestampNs(); 843 if (eventTimeNs < mCurrentBucketStartTimeNs) { 844 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 845 (long long)mCurrentBucketStartTimeNs); 846 return; 847 } 848 mMatchedMetricDimensionKeys.insert(whatKey); 849 850 if (!mIsPulled) { 851 // We cannot flush without doing a pull first. 852 flushIfNeededLocked(eventTimeNs); 853 } 854 855 // We should not accumulate the data for pushed metrics when the condition is false. 856 bool shouldSkipForPushMetric = !mIsPulled && !condition; 857 // For pulled metrics, there are two cases: 858 // - to compute diffs, we need to process all the state changes 859 // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a 860 // state change from 861 // + True -> True: we should process the data, it might be a bucket boundary 862 // + True -> False: we als need to process the data. 863 bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff 864 && mCondition != ConditionState::kTrue; 865 if (shouldSkipForPushMetric || shouldSkipForPulledMetric) { 866 VLOG("ValueMetric skip event because condition is false and we are not using diff (for " 867 "pulled metric)"); 868 return; 869 } 870 871 if (hitGuardRailLocked(eventKey)) { 872 return; 873 } 874 875 const auto& returnVal = 876 mCurrentBaseInfo.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey())); 877 DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second; 878 const HashableDimensionKey oldStateKey = dimensionsInWhatInfo.currentState; 879 vector<BaseInfo>& baseInfos = dimensionsInWhatInfo.baseInfos; 880 if (baseInfos.size() < mFieldMatchers.size()) { 881 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); 882 baseInfos.resize(mFieldMatchers.size()); 883 } 884 885 // Ensure we turn on the condition timer in the case where dimensions 886 // were missing on a previous pull due to a state change. 887 bool stateChange = oldStateKey != stateKey; 888 if (!dimensionsInWhatInfo.hasCurrentState) { 889 stateChange = true; 890 dimensionsInWhatInfo.hasCurrentState = true; 891 } 892 893 // We need to get the intervals stored with the previous state key so we can 894 // close these value intervals. 895 vector<Interval>& intervals = 896 mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)].intervals; 897 if (intervals.size() < mFieldMatchers.size()) { 898 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); 899 intervals.resize(mFieldMatchers.size()); 900 } 901 902 // We only use anomaly detection under certain cases. 903 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics 904 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the 905 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner. 906 // Whoever next works on it should look into the cases where it is triggered in this function. 907 // Discussion here: http://ag/6124370. 908 bool useAnomalyDetection = true; 909 910 dimensionsInWhatInfo.hasCurrentState = true; 911 dimensionsInWhatInfo.currentState = stateKey; 912 for (int i = 0; i < (int)mFieldMatchers.size(); i++) { 913 const Matcher& matcher = mFieldMatchers[i]; 914 BaseInfo& baseInfo = baseInfos[i]; 915 Interval& interval = intervals[i]; 916 interval.valueIndex = i; 917 Value value; 918 if (!getDoubleOrLong(event, matcher, value)) { 919 VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); 920 StatsdStats::getInstance().noteBadValueType(mMetricId); 921 return; 922 } 923 interval.seenNewData = true; 924 925 if (mUseDiff) { 926 if (!baseInfo.hasBase) { 927 if (mHasGlobalBase && mUseZeroDefaultBase) { 928 // The bucket has global base. This key does not. 929 // Optionally use zero as base. 930 baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); 931 baseInfo.hasBase = true; 932 } else { 933 // no base. just update base and return. 934 baseInfo.base = value; 935 baseInfo.hasBase = true; 936 // If we're missing a base, do not use anomaly detection on incomplete data 937 useAnomalyDetection = false; 938 // Continue (instead of return) here in order to set baseInfo.base and 939 // baseInfo.hasBase for other baseInfos 940 continue; 941 } 942 } 943 944 Value diff; 945 switch (mValueDirection) { 946 case ValueMetric::INCREASING: 947 if (value >= baseInfo.base) { 948 diff = value - baseInfo.base; 949 } else if (mUseAbsoluteValueOnReset) { 950 diff = value; 951 } else { 952 VLOG("Unexpected decreasing value"); 953 StatsdStats::getInstance().notePullDataError(mPullTagId); 954 baseInfo.base = value; 955 // If we've got bad data, do not use anomaly detection 956 useAnomalyDetection = false; 957 continue; 958 } 959 break; 960 case ValueMetric::DECREASING: 961 if (baseInfo.base >= value) { 962 diff = baseInfo.base - value; 963 } else if (mUseAbsoluteValueOnReset) { 964 diff = value; 965 } else { 966 VLOG("Unexpected increasing value"); 967 StatsdStats::getInstance().notePullDataError(mPullTagId); 968 baseInfo.base = value; 969 // If we've got bad data, do not use anomaly detection 970 useAnomalyDetection = false; 971 continue; 972 } 973 break; 974 case ValueMetric::ANY: 975 diff = value - baseInfo.base; 976 break; 977 default: 978 break; 979 } 980 baseInfo.base = value; 981 value = diff; 982 } 983 984 if (interval.hasValue) { 985 switch (mAggregationType) { 986 case ValueMetric::SUM: 987 // for AVG, we add up and take average when flushing the bucket 988 case ValueMetric::AVG: 989 interval.value += value; 990 break; 991 case ValueMetric::MIN: 992 interval.value = std::min(value, interval.value); 993 break; 994 case ValueMetric::MAX: 995 interval.value = std::max(value, interval.value); 996 break; 997 default: 998 break; 999 } 1000 } else { 1001 interval.value = value; 1002 interval.hasValue = true; 1003 } 1004 interval.sampleSize += 1; 1005 } 1006 1007 // State change. 1008 if (!mSlicedStateAtoms.empty() && stateChange) { 1009 // Turn OFF the condition timer for the previous state key. 1010 mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)] 1011 .conditionTimer.onConditionChanged(false, eventTimeNs); 1012 1013 // Turn ON the condition timer for the new state key. 1014 mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)] 1015 .conditionTimer.onConditionChanged(true, eventTimeNs); 1016 } 1017 1018 // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due 1019 // to MULTIPLE_BUCKETS_SKIPPED. 1020 if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { 1021 // TODO: propgate proper values down stream when anomaly support doubles 1022 long wholeBucketVal = intervals[0].value.long_value; 1023 auto prev = mCurrentFullBucket.find(eventKey); 1024 if (prev != mCurrentFullBucket.end()) { 1025 wholeBucketVal += prev->second; 1026 } 1027 for (auto& tracker : mAnomalyTrackers) { 1028 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey, 1029 wholeBucketVal); 1030 } 1031 } 1032 } 1033 1034 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket 1035 // if mCondition is true! 1036 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { 1037 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 1038 if (eventTimeNs < currentBucketEndTimeNs) { 1039 VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs, 1040 (long long)(currentBucketEndTimeNs)); 1041 return; 1042 } 1043 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); 1044 int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; 1045 flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs); 1046 } 1047 1048 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const { 1049 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 1050 if (eventTimeNs < currentBucketEndTimeNs) { 1051 return 0; 1052 } 1053 return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; 1054 } 1055 1056 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, 1057 const int64_t& nextBucketStartTimeNs) { 1058 if (mCondition == ConditionState::kUnknown) { 1059 StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); 1060 invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN); 1061 } 1062 1063 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, 1064 (int)mCurrentSlicedBucket.size()); 1065 1066 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); 1067 int64_t bucketEndTime = fullBucketEndTimeNs; 1068 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); 1069 1070 if (multipleBucketsSkipped(numBucketsForward)) { 1071 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); 1072 StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); 1073 // Something went wrong. Maybe the device was sleeping for a long time. It is better 1074 // to mark the current bucket as invalid. The last pull might have been successful through. 1075 invalidateCurrentBucketWithoutResetBase(eventTimeNs, 1076 BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); 1077 // End the bucket at the next bucket start time so the entire interval is skipped. 1078 bucketEndTime = nextBucketStartTimeNs; 1079 } else if (eventTimeNs < fullBucketEndTimeNs) { 1080 bucketEndTime = eventTimeNs; 1081 } 1082 1083 // Close the current bucket. 1084 int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); 1085 bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; 1086 if (!isBucketLargeEnough) { 1087 skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); 1088 } 1089 if (!mCurrentBucketIsSkipped) { 1090 bool bucketHasData = false; 1091 // The current bucket is large enough to keep. 1092 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { 1093 PastValueBucket bucket = 1094 buildPartialBucket(bucketEndTime, currentValueBucket.intervals); 1095 if (!mSlicedStateAtoms.empty()) { 1096 bucket.mConditionTrueNs = 1097 currentValueBucket.conditionTimer.newBucketStart(bucketEndTime); 1098 } else { 1099 bucket.mConditionTrueNs = conditionTrueDuration; 1100 } 1101 // it will auto create new vector of ValuebucketInfo if the key is not found. 1102 if (bucket.valueIndex.size() > 0) { 1103 auto& bucketList = mPastBuckets[metricDimensionKey]; 1104 bucketList.push_back(bucket); 1105 bucketHasData = true; 1106 } 1107 } 1108 if (!bucketHasData) { 1109 skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); 1110 } 1111 } 1112 1113 if (mCurrentBucketIsSkipped) { 1114 mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; 1115 mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; 1116 mSkippedBuckets.emplace_back(mCurrentSkippedBucket); 1117 } 1118 1119 // This means that the current bucket was not flushed before a forced bucket split. 1120 // This can happen if an app update or a dump report with include_current_partial_bucket is 1121 // requested before we get a chance to flush the bucket due to receiving new data, either from 1122 // the statsd socket or the StatsPullerManager. 1123 if (bucketEndTime < nextBucketStartTimeNs) { 1124 SkippedBucket bucketInGap; 1125 bucketInGap.bucketStartTimeNs = bucketEndTime; 1126 bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; 1127 bucketInGap.dropEvents.emplace_back( 1128 buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); 1129 mSkippedBuckets.emplace_back(bucketInGap); 1130 } 1131 appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); 1132 initCurrentSlicedBucket(nextBucketStartTimeNs); 1133 // Update the condition timer again, in case we skipped buckets. 1134 mConditionTimer.newBucketStart(nextBucketStartTimeNs); 1135 1136 // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing 1137 // by state. Otherwise, the "global" condition timer will be used. 1138 if (!mSlicedStateAtoms.empty()) { 1139 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { 1140 currentValueBucket.conditionTimer.newBucketStart(nextBucketStartTimeNs); 1141 } 1142 } 1143 mCurrentBucketNum += numBucketsForward; 1144 } 1145 1146 PastValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, 1147 const std::vector<Interval>& intervals) { 1148 PastValueBucket bucket; 1149 bucket.mBucketStartNs = mCurrentBucketStartTimeNs; 1150 bucket.mBucketEndNs = bucketEndTime; 1151 1152 // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold, 1153 // then all interval values are discarded for this bucket. 1154 if ((intervals.size() <= 0) || (intervals[0].hasValue && !valuePassesThreshold(intervals[0]))) { 1155 return bucket; 1156 } 1157 1158 for (const Interval& interval : intervals) { 1159 // Skip the output if the diff is zero 1160 if (!interval.hasValue || (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero())) { 1161 continue; 1162 } 1163 1164 bucket.valueIndex.push_back(interval.valueIndex); 1165 bucket.values.push_back(getFinalValue(interval)); 1166 } 1167 return bucket; 1168 } 1169 1170 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) { 1171 StatsdStats::getInstance().noteBucketCount(mMetricId); 1172 // Cleanup data structure to aggregate values. 1173 for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { 1174 bool obsolete = true; 1175 for (auto& interval : it->second.intervals) { 1176 interval.hasValue = false; 1177 interval.sampleSize = 0; 1178 if (interval.seenNewData) { 1179 obsolete = false; 1180 } 1181 interval.seenNewData = false; 1182 } 1183 1184 if (obsolete && !mSlicedStateAtoms.empty()) { 1185 // When slicing by state, only delete the MetricDimensionKey when the 1186 // state key in the MetricDimensionKey is not the current state key. 1187 const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat(); 1188 const auto& currentBaseInfoItr = mCurrentBaseInfo.find(dimensionInWhatKey); 1189 1190 if ((currentBaseInfoItr != mCurrentBaseInfo.end()) && 1191 (it->first.getStateValuesKey() == currentBaseInfoItr->second.currentState)) { 1192 obsolete = false; 1193 } 1194 } 1195 if (obsolete) { 1196 it = mCurrentSlicedBucket.erase(it); 1197 } else { 1198 it++; 1199 } 1200 // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete 1201 } 1202 1203 mCurrentBucketIsSkipped = false; 1204 mCurrentSkippedBucket.reset(); 1205 1206 // If we do not have a global base when the condition is true, 1207 // we will have incomplete bucket for the next bucket. 1208 if (mUseDiff && !mHasGlobalBase && mCondition) { 1209 mCurrentBucketIsSkipped = false; 1210 } 1211 mCurrentBucketStartTimeNs = nextBucketStartTimeNs; 1212 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, 1213 (long long)mCurrentBucketStartTimeNs); 1214 } 1215 1216 void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { 1217 if (mCurrentBucketIsSkipped) { 1218 if (isFullBucketReached) { 1219 // If the bucket is invalid, we ignore the full bucket since it contains invalid data. 1220 mCurrentFullBucket.clear(); 1221 } 1222 // Current bucket is invalid, we do not add it to the full bucket. 1223 return; 1224 } 1225 1226 if (isFullBucketReached) { // If full bucket, send to anomaly tracker. 1227 // Accumulate partial buckets with current value and then send to anomaly tracker. 1228 if (mCurrentFullBucket.size() > 0) { 1229 for (const auto& slice : mCurrentSlicedBucket) { 1230 if (hitFullBucketGuardRailLocked(slice.first) || slice.second.intervals.empty()) { 1231 continue; 1232 } 1233 // TODO: fix this when anomaly can accept double values 1234 auto& interval = slice.second.intervals[0]; 1235 if (interval.hasValue) { 1236 mCurrentFullBucket[slice.first] += interval.value.long_value; 1237 } 1238 } 1239 for (const auto& slice : mCurrentFullBucket) { 1240 for (auto& tracker : mAnomalyTrackers) { 1241 if (tracker != nullptr) { 1242 tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum); 1243 } 1244 } 1245 } 1246 mCurrentFullBucket.clear(); 1247 } else { 1248 // Skip aggregating the partial buckets since there's no previous partial bucket. 1249 for (const auto& slice : mCurrentSlicedBucket) { 1250 for (auto& tracker : mAnomalyTrackers) { 1251 if (tracker != nullptr && !slice.second.intervals.empty()) { 1252 // TODO: fix this when anomaly can accept double values 1253 auto& interval = slice.second.intervals[0]; 1254 if (interval.hasValue) { 1255 tracker->addPastBucket(slice.first, interval.value.long_value, 1256 mCurrentBucketNum); 1257 } 1258 } 1259 } 1260 } 1261 } 1262 } else { 1263 // Accumulate partial bucket. 1264 for (const auto& slice : mCurrentSlicedBucket) { 1265 if (!slice.second.intervals.empty()) { 1266 // TODO: fix this when anomaly can accept double values 1267 auto& interval = slice.second.intervals[0]; 1268 if (interval.hasValue) { 1269 mCurrentFullBucket[slice.first] += interval.value.long_value; 1270 } 1271 } 1272 } 1273 } 1274 } 1275 1276 size_t ValueMetricProducer::byteSizeLocked() const { 1277 size_t totalSize = 0; 1278 for (const auto& pair : mPastBuckets) { 1279 totalSize += pair.second.size() * kBucketSize; 1280 } 1281 return totalSize; 1282 } 1283 1284 bool ValueMetricProducer::valuePassesThreshold(const Interval& interval) { 1285 if (mUploadThreshold == nullopt) { 1286 return true; 1287 } 1288 1289 Value finalValue = getFinalValue(interval); 1290 1291 double doubleValue = 1292 finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value; 1293 switch (mUploadThreshold->value_comparison_case()) { 1294 case UploadThreshold::kLtInt: 1295 return doubleValue < (double)mUploadThreshold->lt_int(); 1296 case UploadThreshold::kGtInt: 1297 return doubleValue > (double)mUploadThreshold->gt_int(); 1298 case UploadThreshold::kLteInt: 1299 return doubleValue <= (double)mUploadThreshold->lte_int(); 1300 case UploadThreshold::kGteInt: 1301 return doubleValue >= (double)mUploadThreshold->gte_int(); 1302 case UploadThreshold::kLtFloat: 1303 return doubleValue <= (double)mUploadThreshold->lt_float(); 1304 case UploadThreshold::kGtFloat: 1305 return doubleValue >= (double)mUploadThreshold->gt_float(); 1306 default: 1307 ALOGE("Value metric no upload threshold type used"); 1308 return false; 1309 } 1310 } 1311 1312 Value ValueMetricProducer::getFinalValue(const Interval& interval) { 1313 if (mAggregationType != ValueMetric::AVG) { 1314 return interval.value; 1315 } else { 1316 double sum = interval.value.type == LONG ? (double)interval.value.long_value 1317 : interval.value.double_value; 1318 return Value((double)sum / interval.sampleSize); 1319 } 1320 } 1321 1322 } // namespace statsd 1323 } // namespace os 1324 } // namespace android 1325