/* * Copyright (C) 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define STATSD_DEBUG false // STOPSHIP if true #include "Log.h" #include "ShellSubscriberClient.h" #include "FieldValue.h" #include "guardrail/StatsdStats.h" #include "matchers/matcher_util.h" #include "stats_log_util.h" using android::base::unique_fd; using Status = ::ndk::ScopedAStatus; namespace android { namespace os { namespace statsd { const static int FIELD_ID_SHELL_DATA__ATOM = 1; const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2; // Store next subscription ID for StatsdStats. // Not thread-safe; should only be accessed while holding ShellSubscriber::mMutex lock. static int nextSubId = 0; struct ReadConfigResult { vector pushedMatchers; vector pullInfo; }; // Read and parse single config. There should only one config in the input. static optional readConfig(const vector& configBytes, int64_t startTimeMs, int64_t minPullIntervalMs) { // Parse the config. ShellSubscription config; if (!config.ParseFromArray(configBytes.data(), configBytes.size())) { ALOGE("ShellSubscriberClient: failed to parse the config"); return nullopt; } ReadConfigResult result; result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end()); vector pullInfo; for (const auto& pulled : config.pulled()) { vector packages; vector uids; for (const string& pkg : pulled.packages()) { auto it = UidMap::sAidToUidMapping.find(pkg); if (it != UidMap::sAidToUidMapping.end()) { uids.push_back(it->second); } else { packages.push_back(pkg); } } const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs); result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids); ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d", pulled.matcher().atom_id()); } return result; } ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs, int64_t intervalMs, const std::vector& packages, const std::vector& uids) : mPullerMatcher(matcher), mIntervalMs(intervalMs), mPrevPullElapsedRealtimeMs(startTimeMs), mPullPackages(packages), mPullUids(uids) { } ShellSubscriberClient::ShellSubscriberClient( int id, int out, const std::shared_ptr& callback, const std::vector& pushedMatchers, const std::vector& pulledInfo, int64_t timeoutSec, int64_t startTimeSec, const sp& uidMap, const sp& pullerMgr) : mId(id), mUidMap(uidMap), mPullerMgr(pullerMgr), mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)), mPushedMatchers(pushedMatchers), mPulledInfo(pulledInfo), mCallback(callback), mTimeoutSec(timeoutSec), mStartTimeSec(startTimeSec), mLastWriteMs(startTimeSec * 1000), mCacheSize(0){}; unique_ptr ShellSubscriberClient::create( int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp& uidMap, const sp& pullerMgr) { // Read the size of the config. size_t bufferSize; if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) { return nullptr; } // Check bufferSize if (bufferSize > (kMaxSizeKb * 1024)) { ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu " "bytes)", bufferSize, (kMaxSizeKb * 1024)); return nullptr; } // Read the config. vector buffer(bufferSize); if (!android::base::ReadFully(in, buffer.data(), bufferSize)) { ALOGE("ShellSubscriberClient: failed to read the config from file descriptor"); return nullptr; } const optional readConfigResult = readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0); if (!readConfigResult.has_value()) { return nullptr; } return make_unique( nextSubId++, out, /*callback=*/nullptr, readConfigResult->pushedMatchers, readConfigResult->pullInfo, timeoutSec, startTimeSec, uidMap, pullerMgr); } unique_ptr ShellSubscriberClient::create( const vector& subscriptionConfig, const shared_ptr& callback, int64_t startTimeSec, const sp& uidMap, const sp& pullerMgr) { if (callback == nullptr) { ALOGE("ShellSubscriberClient: received nullptr callback"); return nullptr; } if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) { ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu " "bytes)", subscriptionConfig.size(), (kMaxSizeKb * 1024)); return nullptr; } const optional readConfigResult = readConfig(subscriptionConfig, startTimeSec * 1000, ShellSubscriberClient::kMinCallbackPullIntervalMs); if (!readConfigResult.has_value()) { return nullptr; } const int id = nextSubId++; StatsdStats::getInstance().noteSubscriptionStarted(id, readConfigResult->pushedMatchers.size(), readConfigResult->pullInfo.size()); return make_unique( id, /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo, /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr); } bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event, const SimpleAtomMatcher& matcher, const sp& uidMap) { auto [matched, transformedEvent] = matchesSimple(mUidMap, matcher, event); if (!matched) { return false; } const LogEvent& eventRef = transformedEvent == nullptr ? event : *transformedEvent; // Cache atom event in mProtoOut. uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_SHELL_DATA__ATOM); eventRef.ToProto(mProtoOut); mProtoOut.end(atomToken); const int64_t timestampNs = truncateTimestampIfNecessary(eventRef); mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED | FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS, static_cast(timestampNs)); // Update byte size of cached data. mCacheSize += getSize(eventRef.getValues()) + sizeof(timestampNs); return true; } // Called by ShellSubscriber when a pushed event occurs void ShellSubscriberClient::onLogEvent(const LogEvent& event) { for (const auto& matcher : mPushedMatchers) { if (writeEventToProtoIfMatched(event, matcher, mUidMap)) { flushProtoIfNeeded(); break; } } } void ShellSubscriberClient::flushProtoIfNeeded() { if (mCallback == nullptr) { // Using file descriptor. triggerFdFlush(); } else if (mCacheSize >= kMaxCacheSizeBytes) { // Using callback. // Flush data if cache is full. triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED); } } int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) { int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours. for (PullInfo& pullInfo : mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) { vector uids; getUidsForPullAtom(&uids, pullInfo); vector> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data); VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); if (mCallback != nullptr) { // Callback subscription StatsdStats::getInstance().noteSubscriptionAtomPulled( pullInfo.mPullerMatcher.atom_id()); } writePulledAtomsLocked(data, pullInfo.mPullerMatcher); pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } // Determine how long to sleep before doing more work. const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs; const int64_t timeBeforePullMs = nextPullTimeMs - nowMillis; // guaranteed to be non-negative sleepTimeMs = min(sleepTimeMs, timeBeforePullMs); } return sleepTimeMs; } // The pullAndHeartbeat threads sleep for the minimum time // among all clients' input int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) { int64_t sleepTimeMs; if (mCallback == nullptr) { // File descriptor subscription if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) { mClientAlive = false; return kMsBetweenHeartbeats; } sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos)); // Send a heartbeat consisting of data size of 0, if // the user hasn't recently received data from statsd. When it receives the data size of 0, // the user will not expect any atoms and recheck whether the subscription should end. if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) { triggerFdFlush(); if (!mClientAlive) return kMsBetweenHeartbeats; } int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis; sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat); } else { // Callback subscription. sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos)); if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) { // Flush data if cache has kept data for longer than kMsBetweenCallbacks. triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED); } // Cache should be flushed kMsBetweenCallbacks after mLastWrite. const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis; // For callback subscriptions, ensure minimum sleep time is at least // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from // now to have their pulls batched together, mitigating frequent wakeups of the puller // thread. sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs)); } return sleepTimeMs; } void ShellSubscriberClient::writePulledAtomsLocked(const vector>& data, const SimpleAtomMatcher& matcher) { bool hasData = false; for (const shared_ptr& event : data) { if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) { hasData = true; } } if (hasData) { flushProtoIfNeeded(); } } // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails // because the read end of the pipe has closed, change the client status so // the manager knows the subscription is no longer active void ShellSubscriberClient::attemptWriteToPipeLocked() { const size_t dataSize = mProtoOut.size(); // First, write the payload size. if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) { mClientAlive = false; return; } // Then, write the payload if this is not just a heartbeat. if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) { mClientAlive = false; return; } mLastWriteMs = getElapsedRealtimeMillis(); } void ShellSubscriberClient::getUidsForPullAtom(vector* uids, const PullInfo& pullInfo) { uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); // This is slow. Consider storing the uids per app and listening to uidmap updates. for (const string& pkg : pullInfo.mPullPackages) { set uidsForPkg = mUidMap->getAppUid(pkg); uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); } uids->push_back(DEFAULT_PULL_UID); } void ShellSubscriberClient::clearCache() { mProtoOut.clear(); mCacheSize = 0; } void ShellSubscriberClient::triggerFdFlush() { attemptWriteToPipeLocked(); clearCache(); } void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) { // Invoke Binder callback with cached event data. vector payloadBytes; mProtoOut.serializeToVector(&payloadBytes); StatsdStats::getInstance().noteSubscriptionFlushed(mId); const Status status = mCallback->onSubscriptionData(reason, payloadBytes); if (status.getStatus() == STATUS_DEAD_OBJECT && status.getExceptionCode() == EX_TRANSACTION_FAILED) { mClientAlive = false; return; } mLastWriteMs = getElapsedRealtimeMillis(); clearCache(); } void ShellSubscriberClient::flush() { triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED); } void ShellSubscriberClient::onUnsubscribe() { StatsdStats::getInstance().noteSubscriptionEnded(mId); if (mClientAlive) { triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED); } } void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const { for (const auto& matcher : mPushedMatchers) { allAtomIds.insert(matcher.atom_id()); } } } // namespace statsd } // namespace os } // namespace android