1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriber.h"
20
21 #include <android-base/file.h>
22
23 #include "matchers/matcher_util.h"
24 #include "stats_log_util.h"
25
26 using android::util::ProtoOutputStream;
27
28 namespace android {
29 namespace os {
30 namespace statsd {
31
32 const static int FIELD_ID_ATOM = 1;
33
startNewSubscription(int in,int out,int timeoutSec)34 void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
35 int myToken = claimToken();
36 VLOG("ShellSubscriber: new subscription %d has come in", myToken);
37 mSubscriptionShouldEnd.notify_one();
38
39 shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
40 if (!readConfig(mySubscriptionInfo)) return;
41
42 {
43 std::unique_lock<std::mutex> lock(mMutex);
44 mSubscriptionInfo = mySubscriptionInfo;
45 spawnHelperThread(myToken);
46 waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
47
48 if (mSubscriptionInfo == mySubscriptionInfo) {
49 mSubscriptionInfo = nullptr;
50 }
51
52 }
53 }
54
spawnHelperThread(int myToken)55 void ShellSubscriber::spawnHelperThread(int myToken) {
56 std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
57 t.detach();
58 }
59
waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,int myToken,std::unique_lock<std::mutex> & lock,int timeoutSec)60 void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
61 int myToken,
62 std::unique_lock<std::mutex>& lock,
63 int timeoutSec) {
64 if (timeoutSec > 0) {
65 mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
66 return mToken != myToken || !myInfo->mClientAlive;
67 });
68 } else {
69 mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
70 return mToken != myToken || !myInfo->mClientAlive;
71 });
72 }
73 }
74
75 // Atomically claim the next token. Token numbers denote subscriber ordering.
claimToken()76 int ShellSubscriber::claimToken() {
77 std::unique_lock<std::mutex> lock(mMutex);
78 int myToken = ++mToken;
79 return myToken;
80 }
81
82 // Read and parse single config. There should only one config per input.
readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)83 bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
84 // Read the size of the config.
85 size_t bufferSize;
86 if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
87 return false;
88 }
89
90 // Read the config.
91 vector<uint8_t> buffer(bufferSize);
92 if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
93 return false;
94 }
95
96 // Parse the config.
97 ShellSubscription config;
98 if (!config.ParseFromArray(buffer.data(), bufferSize)) {
99 return false;
100 }
101
102 // Update SubscriptionInfo with state from config
103 for (const auto& pushed : config.pushed()) {
104 subscriptionInfo->mPushedMatchers.push_back(pushed);
105 }
106
107 for (const auto& pulled : config.pulled()) {
108 vector<string> packages;
109 vector<int32_t> uids;
110 for (const string& pkg : pulled.packages()) {
111 auto it = UidMap::sAidToUidMapping.find(pkg);
112 if (it != UidMap::sAidToUidMapping.end()) {
113 uids.push_back(it->second);
114 } else {
115 packages.push_back(pkg);
116 }
117 }
118
119 subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages,
120 uids);
121 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
122 }
123
124 return true;
125 }
126
pullAndSendHeartbeats(int myToken)127 void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
128 VLOG("ShellSubscriber: helper thread %d starting", myToken);
129 while (true) {
130 int64_t sleepTimeMs = INT_MAX;
131 {
132 std::lock_guard<std::mutex> lock(mMutex);
133 if (!mSubscriptionInfo || mToken != myToken) {
134 VLOG("ShellSubscriber: helper thread %d done!", myToken);
135 return;
136 }
137
138 int64_t nowMillis = getElapsedRealtimeMillis();
139 int64_t nowNanos = getElapsedRealtimeNs();
140 for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
141 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
142 continue;
143 }
144
145 vector<int32_t> uids;
146 getUidsForPullAtom(&uids, pullInfo);
147
148 vector<std::shared_ptr<LogEvent>> data;
149 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
150 VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
151 writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
152
153 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
154 }
155
156 // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
157 // data from statsd. When it receives the data size of 0, perfd will not expect any
158 // atoms and recheck whether the subscription should end.
159 if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
160 attemptWriteToPipeLocked(/*dataSize=*/0);
161 }
162
163 // Determine how long to sleep before doing more work.
164 for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
165 int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
166 int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
167 if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
168 }
169 int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
170 if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
171 }
172
173 VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
174 (long long)sleepTimeMs);
175 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
176 }
177 }
178
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)179 void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
180 uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
181 // This is slow. Consider storing the uids per app and listening to uidmap updates.
182 for (const string& pkg : pullInfo.mPullPackages) {
183 set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
184 uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
185 }
186 uids->push_back(DEFAULT_PULL_UID);
187 }
188
writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)189 void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
190 const SimpleAtomMatcher& matcher) {
191 mProto.clear();
192 int count = 0;
193 for (const auto& event : data) {
194 if (matchesSimple(*mUidMap, matcher, *event)) {
195 count++;
196 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
197 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
198 event->ToProto(mProto);
199 mProto.end(atomToken);
200 }
201 }
202
203 if (count > 0) attemptWriteToPipeLocked(mProto.size());
204 }
205
onLogEvent(const LogEvent & event)206 void ShellSubscriber::onLogEvent(const LogEvent& event) {
207 std::lock_guard<std::mutex> lock(mMutex);
208 if (!mSubscriptionInfo) return;
209
210 mProto.clear();
211 for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
212 if (matchesSimple(*mUidMap, matcher, event)) {
213 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
214 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
215 event.ToProto(mProto);
216 mProto.end(atomToken);
217 attemptWriteToPipeLocked(mProto.size());
218 }
219 }
220 }
221
222 // Tries to write the atom encoded in mProto to the pipe. If the write fails
223 // because the read end of the pipe has closed, signals to other threads that
224 // the subscription should end.
attemptWriteToPipeLocked(size_t dataSize)225 void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
226 // First, write the payload size.
227 if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
228 mSubscriptionInfo->mClientAlive = false;
229 mSubscriptionShouldEnd.notify_one();
230 return;
231 }
232
233 // Then, write the payload if this is not just a heartbeat.
234 if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
235 mSubscriptionInfo->mClientAlive = false;
236 mSubscriptionShouldEnd.notify_one();
237 return;
238 }
239
240 mLastWriteMs = getElapsedRealtimeMillis();
241 }
242
243 } // namespace statsd
244 } // namespace os
245 } // namespace android
246