1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <gtest/gtest_prod.h> 20 21 #include <optional> 22 23 #include "FieldValue.h" 24 #include "HashableDimensionKey.h" 25 #include "MetricProducer.h" 26 #include "anomaly/AnomalyTracker.h" 27 #include "condition/ConditionTracker.h" 28 #include "external/PullDataReceiver.h" 29 #include "external/StatsPullerManager.h" 30 #include "matchers/EventMatcherWizard.h" 31 #include "src/statsd_config.pb.h" 32 #include "stats_log_util.h" 33 #include "stats_util.h" 34 35 namespace android { 36 namespace os { 37 namespace statsd { 38 39 template <typename AggregatedValue> 40 struct PastBucket { 41 int64_t mBucketStartNs; 42 int64_t mBucketEndNs; 43 std::vector<int> aggIndex; 44 std::vector<AggregatedValue> aggregates; 45 std::vector<int> sampleSizes; 46 47 /** 48 * If the metric has no condition, then this field is just wasted. 49 * When we tune statsd memory usage in the future, this is a candidate to optimize. 50 */ 51 int64_t mConditionTrueNs; 52 53 /** 54 * The semantic is the value which needs to be applied to mConditionTrueNs for correction 55 * to be performed prior normalization calculation on the user (read server) side. Applied only 56 * to ValueMetrics with pulled atoms. 57 */ 58 int64_t mConditionCorrectionNs; 59 }; 60 61 // Aggregates values within buckets. 62 // 63 // There are different events that might complete a bucket 64 // - a condition change 65 // - an app upgrade 66 // - an alarm set to the end of the bucket 67 template <typename AggregatedValue, typename DimExtras> 68 class ValueMetricProducer : public MetricProducer, public virtual PullDataReceiver { 69 public: 70 struct PullOptions { 71 const int pullAtomId; 72 const sp<StatsPullerManager>& pullerManager; 73 }; 74 75 struct BucketOptions { 76 const int64_t timeBaseNs; 77 const int64_t startTimeNs; 78 const int64_t bucketSizeNs; 79 const int64_t minBucketSizeNs; 80 const optional<int64_t> conditionCorrectionThresholdNs; 81 const optional<bool> splitBucketForAppUpgrade; 82 }; 83 84 struct WhatOptions { 85 const bool containsAnyPositionInDimensionsInWhat; 86 const bool shouldUseNestedDimensions; 87 const int whatMatcherIndex; 88 const sp<EventMatcherWizard>& matcherWizard; 89 const FieldMatcher& dimensionsInWhat; 90 const vector<Matcher>& fieldMatchers; 91 const vector<ValueMetric::AggregationType> aggregationTypes; 92 }; 93 94 struct ConditionOptions { 95 const int conditionIndex; 96 const ConditionLinks& conditionLinks; 97 const vector<ConditionState>& initialConditionCache; 98 const sp<ConditionWizard>& conditionWizard; 99 }; 100 101 struct StateOptions { 102 const StateLinks& stateLinks; 103 const vector<int>& slicedStateAtoms; 104 const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap; 105 }; 106 107 struct ActivationOptions { 108 const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap; 109 const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>& 110 eventDeactivationMap; 111 }; 112 113 struct GuardrailOptions { 114 const size_t dimensionSoftLimit; 115 const size_t dimensionHardLimit; 116 }; 117 118 virtual ~ValueMetricProducer(); 119 120 // Process data pulled on bucket boundary. onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & data,PullResult pullResult,int64_t originalPullTimeNs)121 virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 122 PullResult pullResult, int64_t originalPullTimeNs) override { 123 } 124 125 // Determine if metric needs to pull isPullNeeded()126 virtual bool isPullNeeded() const override { 127 return false; 128 } 129 130 // ValueMetric needs special logic if it's a pulled atom. 131 void onStatsdInitCompleted(int64_t eventTimeNs) override; 132 133 void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey, 134 const FieldValue& oldState, const FieldValue& newState) override; 135 136 protected: 137 ValueMetricProducer(int64_t metricId, const ConfigKey& key, uint64_t protoHash, 138 const PullOptions& pullOptions, const BucketOptions& bucketOptions, 139 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions, 140 const StateOptions& stateOptions, 141 const ActivationOptions& activationOptions, 142 const GuardrailOptions& guardrailOptions, 143 const wp<ConfigMetadataProvider> configMetadataProvider); 144 145 void onMatchedLogEventInternalLocked( 146 const size_t matcherIndex, const MetricDimensionKey& eventKey, 147 const ConditionKey& conditionKey, bool condition, const LogEvent& event, 148 const std::map<int, HashableDimensionKey>& statePrimaryKeys) override; 149 150 // Determine whether or not a LogEvent can be skipped. 151 virtual inline bool canSkipLogEventLocked( 152 const MetricDimensionKey& eventKey, bool condition, int64_t eventTimeNs, 153 const std::map<int, HashableDimensionKey>& statePrimaryKeys) const = 0; 154 155 void notifyAppUpgradeInternalLocked(const int64_t eventTimeNs) override; 156 157 void onDumpReportLocked(const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, 158 const bool eraseData, const DumpLatency dumpLatency, 159 std::set<string>* strSet, 160 android::util::ProtoOutputStream* protoOutput) override; 161 162 struct DumpProtoFields { 163 const int metricTypeFieldId; 164 const int bucketNumFieldId; 165 const int startBucketMsFieldId; 166 const int endBucketMsFieldId; 167 const int conditionTrueNsFieldId; 168 const optional<int> conditionCorrectionNsFieldId; 169 }; 170 171 virtual DumpProtoFields getDumpProtoFields() const = 0; 172 173 void clearPastBucketsLocked(const int64_t dumpTimeNs) override; 174 175 // ValueMetricProducer internal interface to handle active state change. 176 void onActiveStateChangedLocked(const int64_t eventTimeNs, const bool isActive) override; 177 onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)178 virtual void onActiveStateChangedInternalLocked(const int64_t eventTimeNs, 179 const bool isActive) { 180 } 181 182 // ValueMetricProducer internal interface to handle condition change. 183 void onConditionChangedLocked(const bool condition, int64_t eventTimeNs) override; 184 185 // Only called when mIsActive, the event is NOT too late, and after pulling. onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)186 virtual void onConditionChangedInternalLocked(const ConditionState oldCondition, 187 const ConditionState newCondition, 188 const int64_t eventTimeNs) { 189 } 190 191 // Internal interface to handle sliced condition change. 192 void onSlicedConditionMayChangeLocked(bool overallCondition, int64_t eventTime) override; 193 194 void dumpStatesLocked(int out, bool verbose) const override; 195 196 virtual std::string aggregatedValueToString(const AggregatedValue& aggregate) const = 0; 197 198 // For pulled metrics, this method should only be called if a pull has been done. Else we will 199 // not have complete data for the bucket. 200 void flushIfNeededLocked(int64_t eventTime) override; 201 202 // For pulled metrics, this method should only be called if a pulled has been done. Else we will 203 // not have complete data for the bucket. 204 void flushCurrentBucketLocked(int64_t eventTimeNs, int64_t nextBucketStartTimeNs) override; 205 206 void dropDataLocked(const int64_t dropTimeNs) override; 207 208 // Calculate how many buckets are present between the current bucket and eventTimeNs. 209 int64_t calcBucketsForwardCount(const int64_t eventTimeNs) const; 210 211 // Mark the data as invalid. 212 virtual void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); 213 214 // Skips the current bucket without notifying StatsdStats of the skipped bucket. 215 // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that 216 // causes the bucket to be invalidated will not notify StatsdStats. 217 void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); 218 219 optional<InvalidConfigReason> onConfigUpdatedLocked( 220 const StatsdConfig& config, int configIndex, int metricIndex, 221 const std::vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers, 222 const std::unordered_map<int64_t, int>& oldAtomMatchingTrackerMap, 223 const std::unordered_map<int64_t, int>& newAtomMatchingTrackerMap, 224 const sp<EventMatcherWizard>& matcherWizard, 225 const std::vector<sp<ConditionTracker>>& allConditionTrackers, 226 const std::unordered_map<int64_t, int>& conditionTrackerMap, 227 const sp<ConditionWizard>& wizard, 228 const std::unordered_map<int64_t, int>& metricToActivationMap, 229 std::unordered_map<int, std::vector<int>>& trackerToMetricMap, 230 std::unordered_map<int, std::vector<int>>& conditionToMetricMap, 231 std::unordered_map<int, std::vector<int>>& activationAtomTrackerToMetricMap, 232 std::unordered_map<int, std::vector<int>>& deactivationAtomTrackerToMetricMap, 233 std::vector<int>& metricsWithActivation) override; 234 235 size_t computeValueBucketSizeLocked(const bool isFullBucket, const MetricDimensionKey& dimKey, 236 const bool isFirstBucket, 237 const PastBucket<AggregatedValue>& bucket) const; 238 239 virtual size_t getAggregatedValueSize(const AggregatedValue& value) const = 0; 240 241 virtual optional<int64_t> getConditionIdForMetric(const StatsdConfig& config, 242 const int configIndex) const = 0; 243 244 virtual int64_t getWhatAtomMatcherIdForMetric(const StatsdConfig& config, 245 const int configIndex) const = 0; 246 247 virtual ConditionLinks getConditionLinksForMetric(const StatsdConfig& config, 248 const int configIndex) const = 0; 249 250 int mWhatMatcherIndex; 251 252 sp<EventMatcherWizard> mEventMatcherWizard; 253 254 const sp<StatsPullerManager> mPullerManager; 255 256 // Value fields for matching. 257 const std::vector<Matcher> mFieldMatchers; 258 259 // Value fields for matching. 260 std::set<HashableDimensionKey> mMatchedMetricDimensionKeys; 261 262 // Holds the atom id, primary key pair from a state change. 263 // Only used for pulled metrics. 264 // TODO(b/185796114): can be passed as function arguments instead. 265 pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey; 266 267 // Atom Id for pulled data. -1 if this is not pulled. 268 const int mPullAtomId; 269 270 // Tracks the value information of one value field. 271 struct Interval { 272 // Index in multi value aggregation. 273 int aggIndex; 274 275 // Current aggregation, depending on the aggregation type. 276 AggregatedValue aggregate; 277 278 // Number of samples collected. 279 int sampleSize = 0; 280 hasValueInterval281 inline bool hasValue() const { 282 return sampleSize > 0; 283 } 284 }; 285 286 // Internal state of an ongoing aggregation bucket. 287 struct CurrentBucket { 288 // If the `MetricDimensionKey` state key is the current state key, then 289 // the condition timer will be updated later (e.g. condition/state/active 290 // state change) with the correct condition and time. CurrentBucketCurrentBucket291 CurrentBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) { 292 } 293 // Value information for each value field of the metric. 294 std::vector<Interval> intervals; 295 // Tracks how long the condition is true. 296 ConditionTimer conditionTimer; 297 }; 298 299 // Tracks the internal state in the ongoing aggregation bucket for each DimensionsInWhat 300 // key and StateValuesKey pair. 301 std::unordered_map<MetricDimensionKey, CurrentBucket> mCurrentSlicedBucket; 302 303 // State key and any extra information for a specific DimensionsInWhat key. 304 struct DimensionsInWhatInfo { DimensionsInWhatInfoDimensionsInWhatInfo305 DimensionsInWhatInfo(const HashableDimensionKey& stateKey) 306 : dimExtras(), currentState(stateKey), hasCurrentState(false) { 307 } 308 309 DimExtras dimExtras; 310 311 // Whether new data is seen in the bucket. 312 // TODO, this could be per base in the dim extras. 313 bool seenNewData = false; 314 315 // Last seen state value(s). 316 HashableDimensionKey currentState; 317 // Whether this dimensions in what key has a current state key. 318 bool hasCurrentState; 319 }; 320 321 // Tracks current state key and other information for each DimensionsInWhat key. 322 std::unordered_map<HashableDimensionKey, DimensionsInWhatInfo> mDimInfos; 323 324 // Save the past buckets and we can clear when the StatsLogReport is dumped. 325 std::unordered_map<MetricDimensionKey, std::vector<PastBucket<AggregatedValue>>> mPastBuckets; 326 327 const int64_t mMinBucketSizeNs; 328 329 // Util function to check whether the specified dimension hits the guardrail. 330 bool hitGuardRailLocked(const MetricDimensionKey& newKey) const; 331 332 bool hasReachedGuardRailLimit() const; 333 pullAndMatchEventsLocked(const int64_t timestampNs)334 virtual void pullAndMatchEventsLocked(const int64_t timestampNs) { 335 } 336 337 virtual bool multipleBucketsSkipped(const int64_t numBucketsForward) const = 0; 338 339 virtual PastBucket<AggregatedValue> buildPartialBucket(int64_t bucketEndTime, 340 std::vector<Interval>& intervals) = 0; 341 342 virtual void closeCurrentBucket(const int64_t eventTimeNs, int64_t nextBucketStartTimeNs); 343 344 virtual void initNextSlicedBucket(int64_t nextBucketStartTimeNs); 345 346 // Updates the condition timers in the current sliced bucket when there is a 347 // condition change or an active state change. 348 void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs); 349 350 virtual void writePastBucketAggregateToProto(const int aggIndex, 351 const AggregatedValue& aggregate, 352 const int sampleSize, 353 ProtoOutputStream* const protoOutput) const = 0; 354 355 static const size_t kBucketSize = sizeof(PastBucket<AggregatedValue>{}); 356 357 const size_t mDimensionSoftLimit; 358 359 const size_t mDimensionHardLimit; 360 361 // This is to track whether or not the bucket is skipped for any of the reasons listed in 362 // BucketDropReason, many of which make the bucket potentially invalid. 363 bool mCurrentBucketIsSkipped; 364 365 /** Stores condition correction threshold from the ValueMetric configuration */ 366 optional<int64_t> mConditionCorrectionThresholdNs; 367 isEventLateLocked(const int64_t eventTimeNs)368 inline bool isEventLateLocked(const int64_t eventTimeNs) const { 369 return eventTimeNs < mCurrentBucketStartTimeNs; 370 } 371 372 // Returns true if any of the intervals have seen new data. 373 // This should return true unless there is an error parsing the value fields from the event. 374 virtual bool aggregateFields(const int64_t eventTimeNs, const MetricDimensionKey& eventKey, 375 const LogEvent& event, std::vector<Interval>& intervals, 376 DimExtras& dimExtras) = 0; 377 378 // If this is a pulled metric isPulled()379 inline bool isPulled() const { 380 return mPullAtomId != -1; 381 } 382 383 private: 384 }; // ValueMetricProducer 385 386 } // namespace statsd 387 } // namespace os 388 } // namespace android 389