1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define DEBUG false
18 #include "Log.h"
19 
20 #include "StatsPullerManager.h"
21 
22 #include <cutils/log.h>
23 #include <math.h>
24 #include <stdint.h>
25 
26 #include <algorithm>
27 #include <iostream>
28 
29 #include "../StatsService.h"
30 #include "../logd/LogEvent.h"
31 #include "../stats_log_util.h"
32 #include "../statscompanion_util.h"
33 #include "StatsCallbackPuller.h"
34 #include "TrainInfoPuller.h"
35 #include "statslog_statsd.h"
36 
37 using std::shared_ptr;
38 using std::vector;
39 
40 namespace android {
41 namespace os {
42 namespace statsd {
43 
44 // Stores the puller as a wp to avoid holding a reference in case it is unregistered and
45 // pullAtomCallbackDied is never called.
46 struct PullAtomCallbackDeathCookie {
PullAtomCallbackDeathCookieandroid::os::statsd::PullAtomCallbackDeathCookie47     PullAtomCallbackDeathCookie(const wp<StatsPullerManager>& pullerManager,
48                                 const PullerKey& pullerKey, const wp<StatsPuller>& puller) :
49             mPullerManager(pullerManager), mPullerKey(pullerKey), mPuller(puller) {
50     }
51 
52     wp<StatsPullerManager> mPullerManager;
53     PullerKey mPullerKey;
54     wp<StatsPuller> mPuller;
55 };
56 
pullAtomCallbackDied(void * cookie)57 void StatsPullerManager::pullAtomCallbackDied(void* cookie) {
58     PullAtomCallbackDeathCookie* cookie_ = static_cast<PullAtomCallbackDeathCookie*>(cookie);
59     sp<StatsPullerManager> thiz = cookie_->mPullerManager.promote();
60     if (!thiz) {
61         return;
62     }
63 
64     const PullerKey& pullerKey = cookie_->mPullerKey;
65     wp<StatsPuller> puller = cookie_->mPuller;
66 
67     // Erase the mapping from the puller key to the puller if the mapping still exists.
68     // Note that we are removing the StatsPuller object, which internally holds the binder
69     // IPullAtomCallback. However, each new registration creates a new StatsPuller, so this works.
70     lock_guard<mutex> lock(thiz->mLock);
71     const auto& it = thiz->kAllPullAtomInfo.find(pullerKey);
72     if (it != thiz->kAllPullAtomInfo.end() && puller != nullptr && puller == it->second) {
73         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(pullerKey.atomTag,
74                                                                          /*registered=*/false);
75         thiz->kAllPullAtomInfo.erase(pullerKey);
76     }
77     // The death recipient corresponding to this specific IPullAtomCallback can never
78     // be triggered again, so free up resources.
79     delete cookie_;
80 }
81 
82 // Values smaller than this may require to update the alarm.
83 const int64_t NO_ALARM_UPDATE = INT64_MAX;
84 
StatsPullerManager()85 StatsPullerManager::StatsPullerManager()
86     : kAllPullAtomInfo({
87               // TrainInfo.
88               {{.atomTag = util::TRAIN_INFO, .uid = AID_STATSD}, new TrainInfoPuller()},
89       }),
90       mNextPullTimeNs(NO_ALARM_UPDATE),
91       mPullAtomCallbackDeathRecipient(AIBinder_DeathRecipient_new(pullAtomCallbackDied)) {
92 }
93 
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data,bool useUids)94 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
95                               vector<shared_ptr<LogEvent>>* data, bool useUids) {
96     std::lock_guard<std::mutex> _l(mLock);
97     return PullLocked(tagId, configKey, eventTimeNs, data, useUids);
98 }
99 
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data,bool useUids)100 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
101                               vector<std::shared_ptr<LogEvent>>* data, bool useUids) {
102     std::lock_guard<std::mutex> _l(mLock);
103     return PullLocked(tagId, uids, eventTimeNs, data, useUids);
104 }
105 
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data,bool useUids)106 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
107                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data,
108                                     bool useUids) {
109     vector<int32_t> uids;
110     if (useUids) {
111         auto uidProviderIt = mPullUidProviders.find(configKey);
112         if (uidProviderIt == mPullUidProviders.end()) {
113             ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
114                   configKey.ToString().c_str());
115             StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
116             return false;
117         }
118         sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
119         if (pullUidProvider == nullptr) {
120             ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
121                   configKey.ToString().c_str());
122             StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
123             return false;
124         }
125         uids = pullUidProvider->getPullAtomUids(tagId);
126     }
127     return PullLocked(tagId, uids, eventTimeNs, data, useUids);
128 }
129 
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data,bool useUids)130 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
131                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data,
132                                     bool useUids) {
133     VLOG("Initiating pulling %d", tagId);
134     if (useUids) {
135         for (int32_t uid : uids) {
136             PullerKey key = {.atomTag = tagId, .uid = uid};
137             auto pullerIt = kAllPullAtomInfo.find(key);
138             if (pullerIt != kAllPullAtomInfo.end()) {
139                 bool ret = pullerIt->second->Pull(eventTimeNs, data);
140                 VLOG("pulled %zu items", data->size());
141                 if (!ret) {
142                     StatsdStats::getInstance().notePullFailed(tagId);
143                 }
144                 return ret;
145             }
146         }
147         StatsdStats::getInstance().notePullerNotFound(tagId);
148         ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
149         return false;  // Return early since we don't know what to pull.
150     } else {
151         PullerKey key = {.atomTag = tagId, .uid = -1};
152         auto pullerIt = kAllPullAtomInfo.find(key);
153         if (pullerIt != kAllPullAtomInfo.end()) {
154             bool ret = pullerIt->second->Pull(eventTimeNs, data);
155             VLOG("pulled %zu items", data->size());
156             if (!ret) {
157                 StatsdStats::getInstance().notePullFailed(tagId);
158             }
159             return ret;
160         }
161         ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
162         return false;  // Return early since we don't know what to pull.
163     }
164 }
165 
PullerForMatcherExists(int tagId) const166 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
167     // Pulled atoms might be registered after we parse the config, so just make sure the id is in
168     // an appropriate range.
169     return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
170 }
171 
updateAlarmLocked()172 void StatsPullerManager::updateAlarmLocked() {
173     if (mNextPullTimeNs == NO_ALARM_UPDATE) {
174         VLOG("No need to set alarms. Skipping");
175         return;
176     }
177 
178     // TODO(b/151045771): do not hold a lock while making a binder call
179     if (mStatsCompanionService != nullptr) {
180         mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
181     } else {
182         VLOG("StatsCompanionService not available. Alarm not set.");
183     }
184     return;
185 }
186 
SetStatsCompanionService(shared_ptr<IStatsCompanionService> statsCompanionService)187 void StatsPullerManager::SetStatsCompanionService(
188         shared_ptr<IStatsCompanionService> statsCompanionService) {
189     std::lock_guard<std::mutex> _l(mLock);
190     shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
191     mStatsCompanionService = statsCompanionService;
192     for (const auto& pulledAtom : kAllPullAtomInfo) {
193         pulledAtom.second->SetStatsCompanionService(statsCompanionService);
194     }
195     if (mStatsCompanionService != nullptr) {
196         updateAlarmLocked();
197     }
198 }
199 
RegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver,int64_t nextPullTimeNs,int64_t intervalNs)200 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
201                                           wp<PullDataReceiver> receiver, int64_t nextPullTimeNs,
202                                           int64_t intervalNs) {
203     std::lock_guard<std::mutex> _l(mLock);
204     auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
205     for (auto it = receivers.begin(); it != receivers.end(); it++) {
206         if (it->receiver == receiver) {
207             VLOG("Receiver already registered of %d", (int)receivers.size());
208             return;
209         }
210     }
211     ReceiverInfo receiverInfo;
212     receiverInfo.receiver = receiver;
213 
214     // Round it to the nearest minutes. This is the limit of alarm manager.
215     // In practice, we should always have larger buckets.
216     int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
217     // Scheduled pulling should be at least 1 min apart.
218     // This can be lower in cts tests, in which case we round it to 1 min.
219     if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
220         roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
221     }
222 
223     receiverInfo.intervalNs = roundedIntervalNs;
224     receiverInfo.nextPullTimeNs = nextPullTimeNs;
225     receivers.push_back(receiverInfo);
226 
227     // There is only one alarm for all pulled events. So only set it to the smallest denom.
228     if (nextPullTimeNs < mNextPullTimeNs) {
229         VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
230         mNextPullTimeNs = nextPullTimeNs;
231         updateAlarmLocked();
232     }
233     VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
234 }
235 
UnRegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver)236 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
237                                             wp<PullDataReceiver> receiver) {
238     std::lock_guard<std::mutex> _l(mLock);
239     auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
240     if (receiversIt == mReceivers.end()) {
241         VLOG("Unknown pull code or no receivers: %d", tagId);
242         return;
243     }
244     std::list<ReceiverInfo>& receivers = receiversIt->second;
245     for (auto it = receivers.begin(); it != receivers.end(); it++) {
246         if (receiver == it->receiver) {
247             receivers.erase(it);
248             VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
249             return;
250         }
251     }
252 }
253 
RegisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)254 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
255                                                  wp<PullUidProvider> provider) {
256     std::lock_guard<std::mutex> _l(mLock);
257     mPullUidProviders[configKey] = provider;
258 }
259 
UnregisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)260 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
261                                                    wp<PullUidProvider> provider) {
262     std::lock_guard<std::mutex> _l(mLock);
263     const auto& it = mPullUidProviders.find(configKey);
264     if (it != mPullUidProviders.end() && it->second == provider) {
265         mPullUidProviders.erase(it);
266     }
267 }
268 
OnAlarmFired(int64_t elapsedTimeNs)269 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
270     std::lock_guard<std::mutex> _l(mLock);
271     int64_t wallClockNs = getWallClockNs();
272 
273     int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
274 
275     vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
276     for (auto& pair : mReceivers) {
277         vector<ReceiverInfo*> receivers;
278         if (pair.second.size() != 0) {
279             for (ReceiverInfo& receiverInfo : pair.second) {
280                 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
281                     receivers.push_back(&receiverInfo);
282                 } else {
283                     if (receiverInfo.nextPullTimeNs < minNextPullTimeNs) {
284                         minNextPullTimeNs = receiverInfo.nextPullTimeNs;
285                     }
286                 }
287             }
288             if (receivers.size() > 0) {
289                 needToPull.push_back(make_pair(&pair.first, receivers));
290             }
291         }
292     }
293     for (const auto& pullInfo : needToPull) {
294         vector<shared_ptr<LogEvent>> data;
295         bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey,
296                                       elapsedTimeNs, &data);
297         if (!pullSuccess) {
298             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
299         }
300 
301         // Convention is to mark pull atom timestamp at request time.
302         // If we pull at t0, puller starts at t1, finishes at t2, and send back
303         // at t3, we mark t0 as its timestamp, which should correspond to its
304         // triggering event, such as condition change at t0.
305         // Here the triggering event is alarm fired from AlarmManager.
306         // In ValueMetricProducer and GaugeMetricProducer we do same thing
307         // when pull on condition change, etc.
308         for (auto& event : data) {
309             event->setElapsedTimestampNs(elapsedTimeNs);
310             event->setLogdWallClockTimestampNs(wallClockNs);
311         }
312 
313         for (const auto& receiverInfo : pullInfo.second) {
314             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
315             if (receiverPtr != nullptr) {
316                 receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs);
317                 // We may have just come out of a coma, compute next pull time.
318                 int numBucketsAhead =
319                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
320                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
321                 if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
322                     minNextPullTimeNs = receiverInfo->nextPullTimeNs;
323                 }
324             } else {
325                 VLOG("receiver already gone.");
326             }
327         }
328     }
329 
330     VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
331          (long long)minNextPullTimeNs);
332     mNextPullTimeNs = minNextPullTimeNs;
333     updateAlarmLocked();
334 }
335 
ForceClearPullerCache()336 int StatsPullerManager::ForceClearPullerCache() {
337     int totalCleared = 0;
338     for (const auto& pulledAtom : kAllPullAtomInfo) {
339         totalCleared += pulledAtom.second->ForceClearCache();
340     }
341     return totalCleared;
342 }
343 
ClearPullerCacheIfNecessary(int64_t timestampNs)344 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
345     int totalCleared = 0;
346     for (const auto& pulledAtom : kAllPullAtomInfo) {
347         totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
348     }
349     return totalCleared;
350 }
351 
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback,bool useUid)352 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
353                                                   const int64_t coolDownNs, const int64_t timeoutNs,
354                                                   const vector<int32_t>& additiveFields,
355                                                   const shared_ptr<IPullAtomCallback>& callback,
356                                                   bool useUid) {
357     std::lock_guard<std::mutex> _l(mLock);
358     VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
359 
360     if (callback == nullptr) {
361         ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
362         return;
363     }
364 
365     StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
366     int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
367     int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
368 
369     sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
370                                                              actualTimeoutNs, additiveFields);
371     PullerKey key = {.atomTag = atomTag, .uid = useUid ? uid : -1};
372     AIBinder_linkToDeath(callback->asBinder().get(), mPullAtomCallbackDeathRecipient.get(),
373                          new PullAtomCallbackDeathCookie(this, key, puller));
374     kAllPullAtomInfo[key] = puller;
375 }
376 
UnregisterPullAtomCallback(const int uid,const int32_t atomTag,bool useUids)377 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag,
378                                                     bool useUids) {
379     std::lock_guard<std::mutex> _l(mLock);
380     PullerKey key = {.atomTag = atomTag, .uid = useUids ? uid : -1};
381     if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
382         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
383                                                                          /*registered=*/false);
384         kAllPullAtomInfo.erase(key);
385     }
386 }
387 
388 }  // namespace statsd
389 }  // namespace os
390 }  // namespace android
391