1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define STATSD_DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriberClient.h"
20
21 #include "FieldValue.h"
22 #include "guardrail/StatsdStats.h"
23 #include "matchers/matcher_util.h"
24 #include "stats_log_util.h"
25
26 using android::base::unique_fd;
27 using Status = ::ndk::ScopedAStatus;
28
29 namespace android {
30 namespace os {
31 namespace statsd {
32
33 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
34 const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
35
36 // Store next subscription ID for StatsdStats.
37 // Not thread-safe; should only be accessed while holding ShellSubscriber::mMutex lock.
38 static int nextSubId = 0;
39
40 struct ReadConfigResult {
41 vector<SimpleAtomMatcher> pushedMatchers;
42 vector<ShellSubscriberClient::PullInfo> pullInfo;
43 };
44
45 // Read and parse single config. There should only one config in the input.
readConfig(const vector<uint8_t> & configBytes,int64_t startTimeMs,int64_t minPullIntervalMs)46 static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
47 int64_t startTimeMs, int64_t minPullIntervalMs) {
48 // Parse the config.
49 ShellSubscription config;
50 if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
51 ALOGE("ShellSubscriberClient: failed to parse the config");
52 return nullopt;
53 }
54
55 ReadConfigResult result;
56
57 result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
58
59 vector<ShellSubscriberClient::PullInfo> pullInfo;
60 for (const auto& pulled : config.pulled()) {
61 vector<string> packages;
62 vector<int32_t> uids;
63 for (const string& pkg : pulled.packages()) {
64 auto it = UidMap::sAidToUidMapping.find(pkg);
65 if (it != UidMap::sAidToUidMapping.end()) {
66 uids.push_back(it->second);
67 } else {
68 packages.push_back(pkg);
69 }
70 }
71
72 const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
73 result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
74 ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
75 pulled.matcher().atom_id());
76 }
77
78 return result;
79 }
80
PullInfo(const SimpleAtomMatcher & matcher,int64_t startTimeMs,int64_t intervalMs,const std::vector<std::string> & packages,const std::vector<int32_t> & uids)81 ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
82 int64_t intervalMs,
83 const std::vector<std::string>& packages,
84 const std::vector<int32_t>& uids)
85 : mPullerMatcher(matcher),
86 mIntervalMs(intervalMs),
87 mPrevPullElapsedRealtimeMs(startTimeMs),
88 mPullPackages(packages),
89 mPullUids(uids) {
90 }
91
ShellSubscriberClient(int id,int out,const std::shared_ptr<IStatsSubscriptionCallback> & callback,const std::vector<SimpleAtomMatcher> & pushedMatchers,const std::vector<PullInfo> & pulledInfo,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)92 ShellSubscriberClient::ShellSubscriberClient(
93 int id, int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
94 const std::vector<SimpleAtomMatcher>& pushedMatchers,
95 const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
96 const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
97 : mId(id),
98 mUidMap(uidMap),
99 mPullerMgr(pullerMgr),
100 mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
101 mPushedMatchers(pushedMatchers),
102 mPulledInfo(pulledInfo),
103 mCallback(callback),
104 mTimeoutSec(timeoutSec),
105 mStartTimeSec(startTimeSec),
106 mLastWriteMs(startTimeSec * 1000),
107 mCacheSize(0){};
108
create(int in,int out,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)109 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
110 int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
111 const sp<StatsPullerManager>& pullerMgr) {
112 // Read the size of the config.
113 size_t bufferSize;
114 if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
115 return nullptr;
116 }
117
118 // Check bufferSize
119 if (bufferSize > (kMaxSizeKb * 1024)) {
120 ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
121 "bytes)",
122 bufferSize, (kMaxSizeKb * 1024));
123 return nullptr;
124 }
125
126 // Read the config.
127 vector<uint8_t> buffer(bufferSize);
128 if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
129 ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
130 return nullptr;
131 }
132
133 const optional<ReadConfigResult> readConfigResult =
134 readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
135 if (!readConfigResult.has_value()) {
136 return nullptr;
137 }
138
139 return make_unique<ShellSubscriberClient>(
140 nextSubId++, out, /*callback=*/nullptr, readConfigResult->pushedMatchers,
141 readConfigResult->pullInfo, timeoutSec, startTimeSec, uidMap, pullerMgr);
142 }
143
create(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)144 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
145 const vector<uint8_t>& subscriptionConfig,
146 const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
147 const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
148 if (callback == nullptr) {
149 ALOGE("ShellSubscriberClient: received nullptr callback");
150 return nullptr;
151 }
152
153 if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
154 ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
155 "bytes)",
156 subscriptionConfig.size(), (kMaxSizeKb * 1024));
157 return nullptr;
158 }
159
160 const optional<ReadConfigResult> readConfigResult =
161 readConfig(subscriptionConfig, startTimeSec * 1000,
162 ShellSubscriberClient::kMinCallbackPullIntervalMs);
163 if (!readConfigResult.has_value()) {
164 return nullptr;
165 }
166
167 const int id = nextSubId++;
168
169 StatsdStats::getInstance().noteSubscriptionStarted(id, readConfigResult->pushedMatchers.size(),
170 readConfigResult->pullInfo.size());
171 return make_unique<ShellSubscriberClient>(
172 id, /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
173 /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr);
174 }
175
writeEventToProtoIfMatched(const LogEvent & event,const SimpleAtomMatcher & matcher,const sp<UidMap> & uidMap)176 bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
177 const SimpleAtomMatcher& matcher,
178 const sp<UidMap>& uidMap) {
179 auto [matched, transformedEvent] = matchesSimple(mUidMap, matcher, event);
180 if (!matched) {
181 return false;
182 }
183 const LogEvent& eventRef = transformedEvent == nullptr ? event : *transformedEvent;
184
185 // Cache atom event in mProtoOut.
186 uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
187 FIELD_ID_SHELL_DATA__ATOM);
188 eventRef.ToProto(mProtoOut);
189 mProtoOut.end(atomToken);
190
191 const int64_t timestampNs = truncateTimestampIfNecessary(eventRef);
192 mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
193 FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
194 static_cast<long long>(timestampNs));
195
196 // Update byte size of cached data.
197 mCacheSize += getSize(eventRef.getValues()) + sizeof(timestampNs);
198
199 return true;
200 }
201
202 // Called by ShellSubscriber when a pushed event occurs
onLogEvent(const LogEvent & event)203 void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
204 for (const auto& matcher : mPushedMatchers) {
205 if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
206 flushProtoIfNeeded();
207 break;
208 }
209 }
210 }
211
flushProtoIfNeeded()212 void ShellSubscriberClient::flushProtoIfNeeded() {
213 if (mCallback == nullptr) { // Using file descriptor.
214 triggerFdFlush();
215 } else if (mCacheSize >= kMaxCacheSizeBytes) { // Using callback.
216 // Flush data if cache is full.
217 triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
218 }
219 }
220
pullIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)221 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
222 int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours.
223 for (PullInfo& pullInfo : mPulledInfo) {
224 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
225 vector<int32_t> uids;
226 getUidsForPullAtom(&uids, pullInfo);
227
228 vector<shared_ptr<LogEvent>> data;
229 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
230 VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
231 pullInfo.mPullerMatcher.atom_id());
232 if (mCallback != nullptr) { // Callback subscription
233 StatsdStats::getInstance().noteSubscriptionAtomPulled(
234 pullInfo.mPullerMatcher.atom_id());
235 }
236
237 writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
238 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
239 }
240
241 // Determine how long to sleep before doing more work.
242 const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
243
244 const int64_t timeBeforePullMs =
245 nextPullTimeMs - nowMillis; // guaranteed to be non-negative
246 sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
247 }
248 return sleepTimeMs;
249 }
250
251 // The pullAndHeartbeat threads sleep for the minimum time
252 // among all clients' input
pullAndSendHeartbeatsIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)253 int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
254 int64_t nowNanos) {
255 int64_t sleepTimeMs;
256 if (mCallback == nullptr) { // File descriptor subscription
257 if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
258 mClientAlive = false;
259 return kMsBetweenHeartbeats;
260 }
261
262 sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
263
264 // Send a heartbeat consisting of data size of 0, if
265 // the user hasn't recently received data from statsd. When it receives the data size of 0,
266 // the user will not expect any atoms and recheck whether the subscription should end.
267 if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
268 triggerFdFlush();
269 if (!mClientAlive) return kMsBetweenHeartbeats;
270 }
271
272 int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
273 sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
274 } else { // Callback subscription.
275 sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
276
277 if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
278 // Flush data if cache has kept data for longer than kMsBetweenCallbacks.
279 triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
280 }
281
282 // Cache should be flushed kMsBetweenCallbacks after mLastWrite.
283 const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
284
285 // For callback subscriptions, ensure minimum sleep time is at least
286 // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
287 // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
288 // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
289 // now to have their pulls batched together, mitigating frequent wakeups of the puller
290 // thread.
291 sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
292 }
293 return sleepTimeMs;
294 }
295
writePulledAtomsLocked(const vector<shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)296 void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
297 const SimpleAtomMatcher& matcher) {
298 bool hasData = false;
299 for (const shared_ptr<LogEvent>& event : data) {
300 if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
301 hasData = true;
302 }
303 }
304
305 if (hasData) {
306 flushProtoIfNeeded();
307 }
308 }
309
310 // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
311 // because the read end of the pipe has closed, change the client status so
312 // the manager knows the subscription is no longer active
attemptWriteToPipeLocked()313 void ShellSubscriberClient::attemptWriteToPipeLocked() {
314 const size_t dataSize = mProtoOut.size();
315 // First, write the payload size.
316 if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
317 mClientAlive = false;
318 return;
319 }
320 // Then, write the payload if this is not just a heartbeat.
321 if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
322 mClientAlive = false;
323 return;
324 }
325 mLastWriteMs = getElapsedRealtimeMillis();
326 }
327
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)328 void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
329 uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
330 // This is slow. Consider storing the uids per app and listening to uidmap updates.
331 for (const string& pkg : pullInfo.mPullPackages) {
332 set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
333 uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
334 }
335 uids->push_back(DEFAULT_PULL_UID);
336 }
337
clearCache()338 void ShellSubscriberClient::clearCache() {
339 mProtoOut.clear();
340 mCacheSize = 0;
341 }
342
triggerFdFlush()343 void ShellSubscriberClient::triggerFdFlush() {
344 attemptWriteToPipeLocked();
345 clearCache();
346 }
347
triggerCallback(StatsSubscriptionCallbackReason reason)348 void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
349 // Invoke Binder callback with cached event data.
350 vector<uint8_t> payloadBytes;
351 mProtoOut.serializeToVector(&payloadBytes);
352 StatsdStats::getInstance().noteSubscriptionFlushed(mId);
353 const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
354 if (status.getStatus() == STATUS_DEAD_OBJECT &&
355 status.getExceptionCode() == EX_TRANSACTION_FAILED) {
356 mClientAlive = false;
357 return;
358 }
359
360 mLastWriteMs = getElapsedRealtimeMillis();
361 clearCache();
362 }
363
flush()364 void ShellSubscriberClient::flush() {
365 triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
366 }
367
onUnsubscribe()368 void ShellSubscriberClient::onUnsubscribe() {
369 StatsdStats::getInstance().noteSubscriptionEnded(mId);
370 if (mClientAlive) {
371 triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
372 }
373 }
374
addAllAtomIds(LogEventFilter::AtomIdSet & allAtomIds) const375 void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
376 for (const auto& matcher : mPushedMatchers) {
377 allAtomIds.insert(matcher.atom_id());
378 }
379 }
380
381 } // namespace statsd
382 } // namespace os
383 } // namespace android
384