1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define STATSD_DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriberClient.h"
20 
21 #include "FieldValue.h"
22 #include "guardrail/StatsdStats.h"
23 #include "matchers/matcher_util.h"
24 #include "stats_log_util.h"
25 
26 using android::base::unique_fd;
27 using Status = ::ndk::ScopedAStatus;
28 
29 namespace android {
30 namespace os {
31 namespace statsd {
32 
33 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
34 const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
35 
36 // Store next subscription ID for StatsdStats.
37 // Not thread-safe; should only be accessed while holding ShellSubscriber::mMutex lock.
38 static int nextSubId = 0;
39 
40 struct ReadConfigResult {
41     vector<SimpleAtomMatcher> pushedMatchers;
42     vector<ShellSubscriberClient::PullInfo> pullInfo;
43 };
44 
45 // Read and parse single config. There should only one config in the input.
readConfig(const vector<uint8_t> & configBytes,int64_t startTimeMs,int64_t minPullIntervalMs)46 static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
47                                              int64_t startTimeMs, int64_t minPullIntervalMs) {
48     // Parse the config.
49     ShellSubscription config;
50     if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
51         ALOGE("ShellSubscriberClient: failed to parse the config");
52         return nullopt;
53     }
54 
55     ReadConfigResult result;
56 
57     result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
58 
59     vector<ShellSubscriberClient::PullInfo> pullInfo;
60     for (const auto& pulled : config.pulled()) {
61         vector<string> packages;
62         vector<int32_t> uids;
63         for (const string& pkg : pulled.packages()) {
64             auto it = UidMap::sAidToUidMapping.find(pkg);
65             if (it != UidMap::sAidToUidMapping.end()) {
66                 uids.push_back(it->second);
67             } else {
68                 packages.push_back(pkg);
69             }
70         }
71 
72         const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
73         result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
74         ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
75               pulled.matcher().atom_id());
76     }
77 
78     return result;
79 }
80 
PullInfo(const SimpleAtomMatcher & matcher,int64_t startTimeMs,int64_t intervalMs,const std::vector<std::string> & packages,const std::vector<int32_t> & uids)81 ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
82                                           int64_t intervalMs,
83                                           const std::vector<std::string>& packages,
84                                           const std::vector<int32_t>& uids)
85     : mPullerMatcher(matcher),
86       mIntervalMs(intervalMs),
87       mPrevPullElapsedRealtimeMs(startTimeMs),
88       mPullPackages(packages),
89       mPullUids(uids) {
90 }
91 
ShellSubscriberClient(int id,int out,const std::shared_ptr<IStatsSubscriptionCallback> & callback,const std::vector<SimpleAtomMatcher> & pushedMatchers,const std::vector<PullInfo> & pulledInfo,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)92 ShellSubscriberClient::ShellSubscriberClient(
93         int id, int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
94         const std::vector<SimpleAtomMatcher>& pushedMatchers,
95         const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
96         const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
97     : mId(id),
98       mUidMap(uidMap),
99       mPullerMgr(pullerMgr),
100       mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
101       mPushedMatchers(pushedMatchers),
102       mPulledInfo(pulledInfo),
103       mCallback(callback),
104       mTimeoutSec(timeoutSec),
105       mStartTimeSec(startTimeSec),
106       mLastWriteMs(startTimeSec * 1000),
107       mCacheSize(0){};
108 
create(int in,int out,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)109 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
110         int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
111         const sp<StatsPullerManager>& pullerMgr) {
112     // Read the size of the config.
113     size_t bufferSize;
114     if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
115         return nullptr;
116     }
117 
118     // Check bufferSize
119     if (bufferSize > (kMaxSizeKb * 1024)) {
120         ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
121               "bytes)",
122               bufferSize, (kMaxSizeKb * 1024));
123         return nullptr;
124     }
125 
126     // Read the config.
127     vector<uint8_t> buffer(bufferSize);
128     if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
129         ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
130         return nullptr;
131     }
132 
133     const optional<ReadConfigResult> readConfigResult =
134             readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
135     if (!readConfigResult.has_value()) {
136         return nullptr;
137     }
138 
139     return make_unique<ShellSubscriberClient>(
140             nextSubId++, out, /*callback=*/nullptr, readConfigResult->pushedMatchers,
141             readConfigResult->pullInfo, timeoutSec, startTimeSec, uidMap, pullerMgr);
142 }
143 
create(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)144 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
145         const vector<uint8_t>& subscriptionConfig,
146         const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
147         const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
148     if (callback == nullptr) {
149         ALOGE("ShellSubscriberClient: received nullptr callback");
150         return nullptr;
151     }
152 
153     if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
154         ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
155               "bytes)",
156               subscriptionConfig.size(), (kMaxSizeKb * 1024));
157         return nullptr;
158     }
159 
160     const optional<ReadConfigResult> readConfigResult =
161             readConfig(subscriptionConfig, startTimeSec * 1000,
162                        ShellSubscriberClient::kMinCallbackPullIntervalMs);
163     if (!readConfigResult.has_value()) {
164         return nullptr;
165     }
166 
167     const int id = nextSubId++;
168 
169     StatsdStats::getInstance().noteSubscriptionStarted(id, readConfigResult->pushedMatchers.size(),
170                                                        readConfigResult->pullInfo.size());
171     return make_unique<ShellSubscriberClient>(
172             id, /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
173             /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr);
174 }
175 
writeEventToProtoIfMatched(const LogEvent & event,const SimpleAtomMatcher & matcher,const sp<UidMap> & uidMap)176 bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
177                                                        const SimpleAtomMatcher& matcher,
178                                                        const sp<UidMap>& uidMap) {
179     auto [matched, transformedEvent] = matchesSimple(mUidMap, matcher, event);
180     if (!matched) {
181         return false;
182     }
183     const LogEvent& eventRef = transformedEvent == nullptr ? event : *transformedEvent;
184 
185     // Cache atom event in mProtoOut.
186     uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
187                                          FIELD_ID_SHELL_DATA__ATOM);
188     eventRef.ToProto(mProtoOut);
189     mProtoOut.end(atomToken);
190 
191     const int64_t timestampNs = truncateTimestampIfNecessary(eventRef);
192     mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
193                             FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
194                     static_cast<long long>(timestampNs));
195 
196     // Update byte size of cached data.
197     mCacheSize += getSize(eventRef.getValues()) + sizeof(timestampNs);
198 
199     return true;
200 }
201 
202 // Called by ShellSubscriber when a pushed event occurs
onLogEvent(const LogEvent & event)203 void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
204     for (const auto& matcher : mPushedMatchers) {
205         if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
206             flushProtoIfNeeded();
207             break;
208         }
209     }
210 }
211 
flushProtoIfNeeded()212 void ShellSubscriberClient::flushProtoIfNeeded() {
213     if (mCallback == nullptr) {  // Using file descriptor.
214         triggerFdFlush();
215     } else if (mCacheSize >= kMaxCacheSizeBytes) {  // Using callback.
216         // Flush data if cache is full.
217         triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
218     }
219 }
220 
pullIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)221 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
222     int64_t sleepTimeMs = 24 * 60 * 60 * 1000;  // 24 hours.
223     for (PullInfo& pullInfo : mPulledInfo) {
224         if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
225             vector<int32_t> uids;
226             getUidsForPullAtom(&uids, pullInfo);
227 
228             vector<shared_ptr<LogEvent>> data;
229             mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
230             VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
231                  pullInfo.mPullerMatcher.atom_id());
232             if (mCallback != nullptr) {  // Callback subscription
233                 StatsdStats::getInstance().noteSubscriptionAtomPulled(
234                         pullInfo.mPullerMatcher.atom_id());
235             }
236 
237             writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
238             pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
239         }
240 
241         // Determine how long to sleep before doing more work.
242         const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
243 
244         const int64_t timeBeforePullMs =
245                 nextPullTimeMs - nowMillis;  // guaranteed to be non-negative
246         sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
247     }
248     return sleepTimeMs;
249 }
250 
251 // The pullAndHeartbeat threads sleep for the minimum time
252 // among all clients' input
pullAndSendHeartbeatsIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)253 int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
254                                                              int64_t nowNanos) {
255     int64_t sleepTimeMs;
256     if (mCallback == nullptr) {  // File descriptor subscription
257         if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
258             mClientAlive = false;
259             return kMsBetweenHeartbeats;
260         }
261 
262         sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
263 
264         // Send a heartbeat consisting of data size of 0, if
265         // the user hasn't recently received data from statsd. When it receives the data size of 0,
266         // the user will not expect any atoms and recheck whether the subscription should end.
267         if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
268             triggerFdFlush();
269             if (!mClientAlive) return kMsBetweenHeartbeats;
270         }
271 
272         int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
273         sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
274     } else {  // Callback subscription.
275         sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
276 
277         if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
278             // Flush data if cache has kept data for longer than kMsBetweenCallbacks.
279             triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
280         }
281 
282         // Cache should be flushed kMsBetweenCallbacks after mLastWrite.
283         const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
284 
285         // For callback subscriptions, ensure minimum sleep time is at least
286         // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
287         // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
288         // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
289         // now to have their pulls batched together, mitigating frequent wakeups of the puller
290         // thread.
291         sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
292     }
293     return sleepTimeMs;
294 }
295 
writePulledAtomsLocked(const vector<shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)296 void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
297                                                    const SimpleAtomMatcher& matcher) {
298     bool hasData = false;
299     for (const shared_ptr<LogEvent>& event : data) {
300         if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
301             hasData = true;
302         }
303     }
304 
305     if (hasData) {
306         flushProtoIfNeeded();
307     }
308 }
309 
310 // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
311 // because the read end of the pipe has closed, change the client status so
312 // the manager knows the subscription is no longer active
attemptWriteToPipeLocked()313 void ShellSubscriberClient::attemptWriteToPipeLocked() {
314     const size_t dataSize = mProtoOut.size();
315     // First, write the payload size.
316     if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
317         mClientAlive = false;
318         return;
319     }
320     // Then, write the payload if this is not just a heartbeat.
321     if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
322         mClientAlive = false;
323         return;
324     }
325     mLastWriteMs = getElapsedRealtimeMillis();
326 }
327 
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)328 void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
329     uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
330     // This is slow. Consider storing the uids per app and listening to uidmap updates.
331     for (const string& pkg : pullInfo.mPullPackages) {
332         set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
333         uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
334     }
335     uids->push_back(DEFAULT_PULL_UID);
336 }
337 
clearCache()338 void ShellSubscriberClient::clearCache() {
339     mProtoOut.clear();
340     mCacheSize = 0;
341 }
342 
triggerFdFlush()343 void ShellSubscriberClient::triggerFdFlush() {
344     attemptWriteToPipeLocked();
345     clearCache();
346 }
347 
triggerCallback(StatsSubscriptionCallbackReason reason)348 void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
349     // Invoke Binder callback with cached event data.
350     vector<uint8_t> payloadBytes;
351     mProtoOut.serializeToVector(&payloadBytes);
352     StatsdStats::getInstance().noteSubscriptionFlushed(mId);
353     const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
354     if (status.getStatus() == STATUS_DEAD_OBJECT &&
355         status.getExceptionCode() == EX_TRANSACTION_FAILED) {
356         mClientAlive = false;
357         return;
358     }
359 
360     mLastWriteMs = getElapsedRealtimeMillis();
361     clearCache();
362 }
363 
flush()364 void ShellSubscriberClient::flush() {
365     triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
366 }
367 
onUnsubscribe()368 void ShellSubscriberClient::onUnsubscribe() {
369     StatsdStats::getInstance().noteSubscriptionEnded(mId);
370     if (mClientAlive) {
371         triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
372     }
373 }
374 
addAllAtomIds(LogEventFilter::AtomIdSet & allAtomIds) const375 void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
376     for (const auto& matcher : mPushedMatchers) {
377         allAtomIds.insert(matcher.atom_id());
378     }
379 }
380 
381 }  // namespace statsd
382 }  // namespace os
383 }  // namespace android
384