1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false
18 #include "Log.h"
19 
20 #include "StatsPullerManager.h"
21 
22 #include <cutils/log.h>
23 #include <math.h>
24 #include <stdint.h>
25 
26 #include <algorithm>
27 #include <iostream>
28 
29 #include "../StatsService.h"
30 #include "../logd/LogEvent.h"
31 #include "../stats_log_util.h"
32 #include "../statscompanion_util.h"
33 #include "StatsCallbackPuller.h"
34 #include "TrainInfoPuller.h"
35 #include "statslog_statsd.h"
36 
37 using std::shared_ptr;
38 using std::vector;
39 
40 namespace android {
41 namespace os {
42 namespace statsd {
43 
44 // Values smaller than this may require to update the alarm.
45 const int64_t NO_ALARM_UPDATE = INT64_MAX;
46 
StatsPullerManager()47 StatsPullerManager::StatsPullerManager()
48     : kAllPullAtomInfo({
49               // TrainInfo.
50               {{.uid = AID_STATSD, .atomTag = util::TRAIN_INFO}, new TrainInfoPuller()},
51       }),
52       mNextPullTimeNs(NO_ALARM_UPDATE) {
53 }
54 
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)55 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
56                               vector<shared_ptr<LogEvent>>* data) {
57     ATRACE_CALL();
58     std::lock_guard<std::mutex> _l(mLock);
59     return PullLocked(tagId, configKey, eventTimeNs, data);
60 }
61 
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)62 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
63                               vector<std::shared_ptr<LogEvent>>* data) {
64     ATRACE_CALL();
65     std::lock_guard<std::mutex> _l(mLock);
66     return PullLocked(tagId, uids, eventTimeNs, data);
67 }
68 
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)69 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
70                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
71     vector<int32_t> uids;
72     const auto& uidProviderIt = mPullUidProviders.find(configKey);
73     if (uidProviderIt == mPullUidProviders.end()) {
74         ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
75               configKey.ToString().c_str());
76         StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
77         return false;
78     }
79     sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
80     if (pullUidProvider == nullptr) {
81         ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
82               configKey.ToString().c_str());
83         StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
84         return false;
85     }
86     uids = pullUidProvider->getPullAtomUids(tagId);
87     return PullLocked(tagId, uids, eventTimeNs, data);
88 }
89 
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)90 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
91                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
92     VLOG("Initiating pulling %d", tagId);
93     for (int32_t uid : uids) {
94         PullerKey key = {.uid = uid, .atomTag = tagId};
95         auto pullerIt = kAllPullAtomInfo.find(key);
96         if (pullerIt != kAllPullAtomInfo.end()) {
97             PullErrorCode status = pullerIt->second->Pull(eventTimeNs, data);
98             VLOG("pulled %zu items", data->size());
99             if (status != PULL_SUCCESS) {
100                 StatsdStats::getInstance().notePullFailed(tagId);
101             }
102             // If we received a dead object exception, it means the client process has died.
103             // We can remove the puller from the map.
104             if (status == PULL_DEAD_OBJECT) {
105                 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
106                         tagId,
107                         /*registered=*/false);
108                 kAllPullAtomInfo.erase(pullerIt);
109             }
110             return status == PULL_SUCCESS;
111         }
112     }
113     StatsdStats::getInstance().notePullerNotFound(tagId);
114     ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
115     return false;  // Return early since we don't know what to pull.
116 }
117 
PullerForMatcherExists(int tagId) const118 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
119     // Pulled atoms might be registered after we parse the config, so just make sure the id is in
120     // an appropriate range.
121     return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
122 }
123 
updateAlarmLocked()124 void StatsPullerManager::updateAlarmLocked() {
125     if (mNextPullTimeNs == NO_ALARM_UPDATE) {
126         VLOG("No need to set alarms. Skipping");
127         return;
128     }
129 
130     // TODO(b/151045771): do not hold a lock while making a binder call
131     if (mStatsCompanionService != nullptr) {
132         mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
133     } else {
134         VLOG("StatsCompanionService not available. Alarm not set.");
135     }
136     return;
137 }
138 
SetStatsCompanionService(const shared_ptr<IStatsCompanionService> & statsCompanionService)139 void StatsPullerManager::SetStatsCompanionService(
140         const shared_ptr<IStatsCompanionService>& statsCompanionService) {
141     std::lock_guard<std::mutex> _l(mLock);
142     shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
143     mStatsCompanionService = statsCompanionService;
144     for (const auto& pulledAtom : kAllPullAtomInfo) {
145         pulledAtom.second->SetStatsCompanionService(statsCompanionService);
146     }
147     if (mStatsCompanionService != nullptr) {
148         updateAlarmLocked();
149     }
150 }
151 
RegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver,int64_t nextPullTimeNs,int64_t intervalNs)152 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
153                                           const wp<PullDataReceiver>& receiver,
154                                           int64_t nextPullTimeNs, int64_t intervalNs) {
155     std::lock_guard<std::mutex> _l(mLock);
156     auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
157     for (auto it = receivers.begin(); it != receivers.end(); it++) {
158         if (it->receiver == receiver) {
159             VLOG("Receiver already registered of %d", (int)receivers.size());
160             return;
161         }
162     }
163     ReceiverInfo receiverInfo;
164     receiverInfo.receiver = receiver;
165 
166     // Round it to the nearest minutes. This is the limit of alarm manager.
167     // In practice, we should always have larger buckets.
168     int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
169     // Scheduled pulling should be at least 1 min apart.
170     // This can be lower in cts tests, in which case we round it to 1 min.
171     if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
172         roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
173     }
174 
175     receiverInfo.intervalNs = roundedIntervalNs;
176     receiverInfo.nextPullTimeNs = nextPullTimeNs;
177     receivers.push_back(receiverInfo);
178 
179     // There is only one alarm for all pulled events. So only set it to the smallest denom.
180     if (nextPullTimeNs < mNextPullTimeNs) {
181         VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
182         mNextPullTimeNs = nextPullTimeNs;
183         updateAlarmLocked();
184     }
185     VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
186 }
187 
UnRegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver)188 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
189                                             const wp<PullDataReceiver>& receiver) {
190     std::lock_guard<std::mutex> _l(mLock);
191     auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
192     if (receiversIt == mReceivers.end()) {
193         VLOG("Unknown pull code or no receivers: %d", tagId);
194         return;
195     }
196     std::list<ReceiverInfo>& receivers = receiversIt->second;
197     for (auto it = receivers.begin(); it != receivers.end(); it++) {
198         if (receiver == it->receiver) {
199             receivers.erase(it);
200             VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
201             return;
202         }
203     }
204 }
205 
RegisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)206 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
207                                                  const wp<PullUidProvider>& provider) {
208     std::lock_guard<std::mutex> _l(mLock);
209     mPullUidProviders[configKey] = provider;
210 }
211 
UnregisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)212 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
213                                                    const wp<PullUidProvider>& provider) {
214     std::lock_guard<std::mutex> _l(mLock);
215     const auto& it = mPullUidProviders.find(configKey);
216     if (it != mPullUidProviders.end() && it->second == provider) {
217         mPullUidProviders.erase(it);
218     }
219 }
220 
OnAlarmFired(int64_t elapsedTimeNs)221 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
222     ATRACE_CALL();
223     std::lock_guard<std::mutex> _l(mLock);
224     int64_t wallClockNs = getWallClockNs();
225 
226     int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
227 
228     vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
229     for (auto& pair : mReceivers) {
230         vector<ReceiverInfo*> receivers;
231         if (pair.second.size() != 0) {
232             for (ReceiverInfo& receiverInfo : pair.second) {
233                 // If pullNecessary and enough time has passed for the next bucket, then add
234                 // receiver to the list that will pull on this alarm.
235                 // If pullNecessary is false, check if next pull time needs to be updated.
236                 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
237                 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
238                     receiverPtr->isPullNeeded()) {
239                     receivers.push_back(&receiverInfo);
240                 } else {
241                     if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
242                         receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
243                         int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
244                                               receiverInfo.intervalNs;
245                         receiverInfo.nextPullTimeNs +=
246                                 (numBucketsAhead + 1) * receiverInfo.intervalNs;
247                     }
248                     minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
249                 }
250             }
251             if (receivers.size() > 0) {
252                 needToPull.push_back(make_pair(&pair.first, receivers));
253             }
254         }
255     }
256     for (const auto& pullInfo : needToPull) {
257         vector<shared_ptr<LogEvent>> data;
258         PullResult pullResult =
259                 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
260                         ? PullResult::PULL_RESULT_SUCCESS
261                         : PullResult::PULL_RESULT_FAIL;
262         if (pullResult == PullResult::PULL_RESULT_FAIL) {
263             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
264         }
265 
266         // Convention is to mark pull atom timestamp at request time.
267         // If we pull at t0, puller starts at t1, finishes at t2, and send back
268         // at t3, we mark t0 as its timestamp, which should correspond to its
269         // triggering event, such as condition change at t0.
270         // Here the triggering event is alarm fired from AlarmManager.
271         // In ValueMetricProducer and GaugeMetricProducer we do same thing
272         // when pull on condition change, etc.
273         for (auto& event : data) {
274             event->setElapsedTimestampNs(elapsedTimeNs);
275             event->setLogdWallClockTimestampNs(wallClockNs);
276         }
277 
278         for (const auto& receiverInfo : pullInfo.second) {
279             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
280             if (receiverPtr != nullptr) {
281                 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
282                 // We may have just come out of a coma, compute next pull time.
283                 int numBucketsAhead =
284                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
285                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
286                 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
287             } else {
288                 VLOG("receiver already gone.");
289             }
290         }
291     }
292 
293     VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
294          (long long)minNextPullTimeNs);
295     mNextPullTimeNs = minNextPullTimeNs;
296     updateAlarmLocked();
297 }
298 
ForceClearPullerCache()299 int StatsPullerManager::ForceClearPullerCache() {
300     ATRACE_CALL();
301     std::lock_guard<std::mutex> _l(mLock);
302     int totalCleared = 0;
303     for (const auto& pulledAtom : kAllPullAtomInfo) {
304         totalCleared += pulledAtom.second->ForceClearCache();
305     }
306     return totalCleared;
307 }
308 
ClearPullerCacheIfNecessary(int64_t timestampNs)309 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
310     ATRACE_CALL();
311     std::lock_guard<std::mutex> _l(mLock);
312     int totalCleared = 0;
313     for (const auto& pulledAtom : kAllPullAtomInfo) {
314         totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
315     }
316     return totalCleared;
317 }
318 
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)319 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
320                                                   const int64_t coolDownNs, const int64_t timeoutNs,
321                                                   const vector<int32_t>& additiveFields,
322                                                   const shared_ptr<IPullAtomCallback>& callback) {
323     ATRACE_CALL();
324     std::lock_guard<std::mutex> _l(mLock);
325     VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
326 
327     if (callback == nullptr) {
328         ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
329         return;
330     }
331 
332     int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
333     int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
334 
335     sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
336                                                              actualTimeoutNs, additiveFields);
337     PullerKey key = {.uid = uid, .atomTag = atomTag};
338     auto it = kAllPullAtomInfo.find(key);
339     if (it != kAllPullAtomInfo.end()) {
340         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
341                                                                          /*registered=*/false);
342     }
343     kAllPullAtomInfo[key] = puller;
344     StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
345 }
346 
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)347 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
348     ATRACE_CALL();
349     std::lock_guard<std::mutex> _l(mLock);
350     PullerKey key = {.uid = uid, .atomTag = atomTag};
351     if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
352         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
353                                                                          /*registered=*/false);
354         kAllPullAtomInfo.erase(key);
355     }
356 }
357 
358 }  // namespace statsd
359 }  // namespace os
360 }  // namespace android
361