/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define DEBUG false // STOPSHIP if true #include "Log.h" #include "ShellSubscriber.h" #include #include "matchers/matcher_util.h" #include "stats_log_util.h" using android::util::ProtoOutputStream; namespace android { namespace os { namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { int myToken = claimToken(); VLOG("ShellSubscriber: new subscription %d has come in", myToken); mSubscriptionShouldEnd.notify_one(); shared_ptr mySubscriptionInfo = make_shared(in, out); if (!readConfig(mySubscriptionInfo)) return; { std::unique_lock lock(mMutex); mSubscriptionInfo = mySubscriptionInfo; spawnHelperThread(myToken); waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); if (mSubscriptionInfo == mySubscriptionInfo) { mSubscriptionInfo = nullptr; } } } void ShellSubscriber::spawnHelperThread(int myToken) { std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); t.detach(); } void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr myInfo, int myToken, std::unique_lock& lock, int timeoutSec) { if (timeoutSec > 0) { mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { return mToken != myToken || !myInfo->mClientAlive; }); } else { mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { return mToken != myToken || !myInfo->mClientAlive; }); } } // Atomically claim the next token. Token numbers denote subscriber ordering. int ShellSubscriber::claimToken() { std::unique_lock lock(mMutex); int myToken = ++mToken; return myToken; } // Read and parse single config. There should only one config per input. bool ShellSubscriber::readConfig(shared_ptr subscriptionInfo) { // Read the size of the config. size_t bufferSize; if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } // Read the config. vector buffer(bufferSize); if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { return false; } // Parse the config. ShellSubscription config; if (!config.ParseFromArray(buffer.data(), bufferSize)) { return false; } // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { subscriptionInfo->mPushedMatchers.push_back(pushed); } for (const auto& pulled : config.pulled()) { vector packages; vector uids; for (const string& pkg : pulled.packages()) { auto it = UidMap::sAidToUidMapping.find(pkg); if (it != UidMap::sAidToUidMapping.end()) { uids.push_back(it->second); } else { packages.push_back(pkg); } } subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, uids); VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); } return true; } void ShellSubscriber::pullAndSendHeartbeats(int myToken) { VLOG("ShellSubscriber: helper thread %d starting", myToken); while (true) { int64_t sleepTimeMs = INT_MAX; { std::lock_guard lock(mMutex); if (!mSubscriptionInfo || mToken != myToken) { VLOG("ShellSubscriber: helper thread %d done!", myToken); return; } int64_t nowMillis = getElapsedRealtimeMillis(); int64_t nowNanos = getElapsedRealtimeNs(); for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { continue; } vector uids; getUidsForPullAtom(&uids, pullInfo); vector> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data); VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); writePulledAtomsLocked(data, pullInfo.mPullerMatcher); pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received // data from statsd. When it receives the data size of 0, perfd will not expect any // atoms and recheck whether the subscription should end. if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { attemptWriteToPipeLocked(/*dataSize=*/0); } // Determine how long to sleep before doing more work. for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; } int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; } VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, (long long)sleepTimeMs); std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); } } void ShellSubscriber::getUidsForPullAtom(vector* uids, const PullInfo& pullInfo) { uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); // This is slow. Consider storing the uids per app and listening to uidmap updates. for (const string& pkg : pullInfo.mPullPackages) { set uidsForPkg = mUidMap->getAppUid(pkg); uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); } uids->push_back(DEFAULT_PULL_UID); } void ShellSubscriber::writePulledAtomsLocked(const vector>& data, const SimpleAtomMatcher& matcher) { mProto.clear(); int count = 0; for (const auto& event : data) { if (matchesSimple(*mUidMap, matcher, *event)) { count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event->ToProto(mProto); mProto.end(atomToken); } } if (count > 0) attemptWriteToPipeLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard lock(mMutex); if (!mSubscriptionInfo) return; mProto.clear(); for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); attemptWriteToPipeLocked(mProto.size()); } } } // Tries to write the atom encoded in mProto to the pipe. If the write fails // because the read end of the pipe has closed, signals to other threads that // the subscription should end. void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { // First, write the payload size. if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } // Then, write the payload if this is not just a heartbeat. if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { mSubscriptionInfo->mClientAlive = false; mSubscriptionShouldEnd.notify_one(); return; } mLastWriteMs = getElapsedRealtimeMillis(); } } // namespace statsd } // namespace os } // namespace android