1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false
18 #include "Log.h"
19
20 #include "StatsPullerManager.h"
21
22 #include <cutils/log.h>
23 #include <math.h>
24 #include <stdint.h>
25
26 #include <algorithm>
27 #include <iostream>
28
29 #include "../StatsService.h"
30 #include "../logd/LogEvent.h"
31 #include "../stats_log_util.h"
32 #include "../statscompanion_util.h"
33 #include "StatsCallbackPuller.h"
34 #include "TrainInfoPuller.h"
35 #include "statslog_statsd.h"
36
37 using std::shared_ptr;
38 using std::vector;
39
40 namespace android {
41 namespace os {
42 namespace statsd {
43
44 // Values smaller than this may require to update the alarm.
45 const int64_t NO_ALARM_UPDATE = INT64_MAX;
46
StatsPullerManager()47 StatsPullerManager::StatsPullerManager()
48 : kAllPullAtomInfo({
49 // TrainInfo.
50 {{.uid = AID_STATSD, .atomTag = util::TRAIN_INFO}, new TrainInfoPuller()},
51 }),
52 mNextPullTimeNs(NO_ALARM_UPDATE) {
53 }
54
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)55 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
56 vector<shared_ptr<LogEvent>>* data) {
57 ATRACE_CALL();
58 std::lock_guard<std::mutex> _l(mLock);
59 return PullLocked(tagId, configKey, eventTimeNs, data);
60 }
61
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)62 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
63 vector<std::shared_ptr<LogEvent>>* data) {
64 ATRACE_CALL();
65 std::lock_guard<std::mutex> _l(mLock);
66 return PullLocked(tagId, uids, eventTimeNs, data);
67 }
68
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)69 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
70 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
71 vector<int32_t> uids;
72 const auto& uidProviderIt = mPullUidProviders.find(configKey);
73 if (uidProviderIt == mPullUidProviders.end()) {
74 ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
75 configKey.ToString().c_str());
76 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
77 return false;
78 }
79 sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
80 if (pullUidProvider == nullptr) {
81 ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
82 configKey.ToString().c_str());
83 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
84 return false;
85 }
86 uids = pullUidProvider->getPullAtomUids(tagId);
87 return PullLocked(tagId, uids, eventTimeNs, data);
88 }
89
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)90 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
91 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
92 VLOG("Initiating pulling %d", tagId);
93 for (int32_t uid : uids) {
94 PullerKey key = {.uid = uid, .atomTag = tagId};
95 auto pullerIt = kAllPullAtomInfo.find(key);
96 if (pullerIt != kAllPullAtomInfo.end()) {
97 PullErrorCode status = pullerIt->second->Pull(eventTimeNs, data);
98 VLOG("pulled %zu items", data->size());
99 if (status != PULL_SUCCESS) {
100 StatsdStats::getInstance().notePullFailed(tagId);
101 }
102 // If we received a dead object exception, it means the client process has died.
103 // We can remove the puller from the map.
104 if (status == PULL_DEAD_OBJECT) {
105 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
106 tagId,
107 /*registered=*/false);
108 kAllPullAtomInfo.erase(pullerIt);
109 }
110 return status == PULL_SUCCESS;
111 }
112 }
113 StatsdStats::getInstance().notePullerNotFound(tagId);
114 ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
115 return false; // Return early since we don't know what to pull.
116 }
117
PullerForMatcherExists(int tagId) const118 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
119 // Pulled atoms might be registered after we parse the config, so just make sure the id is in
120 // an appropriate range.
121 return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
122 }
123
updateAlarmLocked()124 void StatsPullerManager::updateAlarmLocked() {
125 if (mNextPullTimeNs == NO_ALARM_UPDATE) {
126 VLOG("No need to set alarms. Skipping");
127 return;
128 }
129
130 // TODO(b/151045771): do not hold a lock while making a binder call
131 if (mStatsCompanionService != nullptr) {
132 mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
133 } else {
134 VLOG("StatsCompanionService not available. Alarm not set.");
135 }
136 return;
137 }
138
SetStatsCompanionService(const shared_ptr<IStatsCompanionService> & statsCompanionService)139 void StatsPullerManager::SetStatsCompanionService(
140 const shared_ptr<IStatsCompanionService>& statsCompanionService) {
141 std::lock_guard<std::mutex> _l(mLock);
142 shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
143 mStatsCompanionService = statsCompanionService;
144 for (const auto& pulledAtom : kAllPullAtomInfo) {
145 pulledAtom.second->SetStatsCompanionService(statsCompanionService);
146 }
147 if (mStatsCompanionService != nullptr) {
148 updateAlarmLocked();
149 }
150 }
151
RegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver,int64_t nextPullTimeNs,int64_t intervalNs)152 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
153 const wp<PullDataReceiver>& receiver,
154 int64_t nextPullTimeNs, int64_t intervalNs) {
155 std::lock_guard<std::mutex> _l(mLock);
156 auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
157 for (auto it = receivers.begin(); it != receivers.end(); it++) {
158 if (it->receiver == receiver) {
159 VLOG("Receiver already registered of %d", (int)receivers.size());
160 return;
161 }
162 }
163 ReceiverInfo receiverInfo;
164 receiverInfo.receiver = receiver;
165
166 // Round it to the nearest minutes. This is the limit of alarm manager.
167 // In practice, we should always have larger buckets.
168 int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
169 // Scheduled pulling should be at least 1 min apart.
170 // This can be lower in cts tests, in which case we round it to 1 min.
171 if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
172 roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
173 }
174
175 receiverInfo.intervalNs = roundedIntervalNs;
176 receiverInfo.nextPullTimeNs = nextPullTimeNs;
177 receivers.push_back(receiverInfo);
178
179 // There is only one alarm for all pulled events. So only set it to the smallest denom.
180 if (nextPullTimeNs < mNextPullTimeNs) {
181 VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
182 mNextPullTimeNs = nextPullTimeNs;
183 updateAlarmLocked();
184 }
185 VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
186 }
187
UnRegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver)188 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
189 const wp<PullDataReceiver>& receiver) {
190 std::lock_guard<std::mutex> _l(mLock);
191 auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
192 if (receiversIt == mReceivers.end()) {
193 VLOG("Unknown pull code or no receivers: %d", tagId);
194 return;
195 }
196 std::list<ReceiverInfo>& receivers = receiversIt->second;
197 for (auto it = receivers.begin(); it != receivers.end(); it++) {
198 if (receiver == it->receiver) {
199 receivers.erase(it);
200 VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
201 return;
202 }
203 }
204 }
205
RegisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)206 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
207 const wp<PullUidProvider>& provider) {
208 std::lock_guard<std::mutex> _l(mLock);
209 mPullUidProviders[configKey] = provider;
210 }
211
UnregisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)212 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
213 const wp<PullUidProvider>& provider) {
214 std::lock_guard<std::mutex> _l(mLock);
215 const auto& it = mPullUidProviders.find(configKey);
216 if (it != mPullUidProviders.end() && it->second == provider) {
217 mPullUidProviders.erase(it);
218 }
219 }
220
OnAlarmFired(int64_t elapsedTimeNs)221 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
222 ATRACE_CALL();
223 std::lock_guard<std::mutex> _l(mLock);
224 int64_t wallClockNs = getWallClockNs();
225
226 int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
227
228 vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
229 for (auto& pair : mReceivers) {
230 vector<ReceiverInfo*> receivers;
231 if (pair.second.size() != 0) {
232 for (ReceiverInfo& receiverInfo : pair.second) {
233 // If pullNecessary and enough time has passed for the next bucket, then add
234 // receiver to the list that will pull on this alarm.
235 // If pullNecessary is false, check if next pull time needs to be updated.
236 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
237 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
238 receiverPtr->isPullNeeded()) {
239 receivers.push_back(&receiverInfo);
240 } else {
241 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
242 receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
243 int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
244 receiverInfo.intervalNs;
245 receiverInfo.nextPullTimeNs +=
246 (numBucketsAhead + 1) * receiverInfo.intervalNs;
247 }
248 minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
249 }
250 }
251 if (receivers.size() > 0) {
252 needToPull.push_back(make_pair(&pair.first, receivers));
253 }
254 }
255 }
256 for (const auto& pullInfo : needToPull) {
257 vector<shared_ptr<LogEvent>> data;
258 PullResult pullResult =
259 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
260 ? PullResult::PULL_RESULT_SUCCESS
261 : PullResult::PULL_RESULT_FAIL;
262 if (pullResult == PullResult::PULL_RESULT_FAIL) {
263 VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
264 }
265
266 // Convention is to mark pull atom timestamp at request time.
267 // If we pull at t0, puller starts at t1, finishes at t2, and send back
268 // at t3, we mark t0 as its timestamp, which should correspond to its
269 // triggering event, such as condition change at t0.
270 // Here the triggering event is alarm fired from AlarmManager.
271 // In ValueMetricProducer and GaugeMetricProducer we do same thing
272 // when pull on condition change, etc.
273 for (auto& event : data) {
274 event->setElapsedTimestampNs(elapsedTimeNs);
275 event->setLogdWallClockTimestampNs(wallClockNs);
276 }
277
278 for (const auto& receiverInfo : pullInfo.second) {
279 sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
280 if (receiverPtr != nullptr) {
281 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
282 // We may have just come out of a coma, compute next pull time.
283 int numBucketsAhead =
284 (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
285 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
286 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
287 } else {
288 VLOG("receiver already gone.");
289 }
290 }
291 }
292
293 VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
294 (long long)minNextPullTimeNs);
295 mNextPullTimeNs = minNextPullTimeNs;
296 updateAlarmLocked();
297 }
298
ForceClearPullerCache()299 int StatsPullerManager::ForceClearPullerCache() {
300 ATRACE_CALL();
301 std::lock_guard<std::mutex> _l(mLock);
302 int totalCleared = 0;
303 for (const auto& pulledAtom : kAllPullAtomInfo) {
304 totalCleared += pulledAtom.second->ForceClearCache();
305 }
306 return totalCleared;
307 }
308
ClearPullerCacheIfNecessary(int64_t timestampNs)309 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
310 ATRACE_CALL();
311 std::lock_guard<std::mutex> _l(mLock);
312 int totalCleared = 0;
313 for (const auto& pulledAtom : kAllPullAtomInfo) {
314 totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
315 }
316 return totalCleared;
317 }
318
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)319 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
320 const int64_t coolDownNs, const int64_t timeoutNs,
321 const vector<int32_t>& additiveFields,
322 const shared_ptr<IPullAtomCallback>& callback) {
323 ATRACE_CALL();
324 std::lock_guard<std::mutex> _l(mLock);
325 VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
326
327 if (callback == nullptr) {
328 ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
329 return;
330 }
331
332 int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
333 int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
334
335 sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
336 actualTimeoutNs, additiveFields);
337 PullerKey key = {.uid = uid, .atomTag = atomTag};
338 auto it = kAllPullAtomInfo.find(key);
339 if (it != kAllPullAtomInfo.end()) {
340 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
341 /*registered=*/false);
342 }
343 kAllPullAtomInfo[key] = puller;
344 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
345 }
346
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)347 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
348 ATRACE_CALL();
349 std::lock_guard<std::mutex> _l(mLock);
350 PullerKey key = {.uid = uid, .atomTag = atomTag};
351 if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
352 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
353 /*registered=*/false);
354 kAllPullAtomInfo.erase(key);
355 }
356 }
357
358 } // namespace statsd
359 } // namespace os
360 } // namespace android
361