1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriber.h"
20 
21 #include <android-base/file.h>
22 
23 #include "matchers/matcher_util.h"
24 #include "stats_log_util.h"
25 
26 using android::util::ProtoOutputStream;
27 
28 namespace android {
29 namespace os {
30 namespace statsd {
31 
32 const static int FIELD_ID_ATOM = 1;
33 
startNewSubscription(int in,int out,int timeoutSec)34 void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
35     int myToken = claimToken();
36     VLOG("ShellSubscriber: new subscription %d has come in", myToken);
37     mSubscriptionShouldEnd.notify_one();
38 
39     shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
40     if (!readConfig(mySubscriptionInfo)) return;
41 
42     {
43         std::unique_lock<std::mutex> lock(mMutex);
44         mSubscriptionInfo = mySubscriptionInfo;
45         spawnHelperThread(myToken);
46         waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
47 
48         if (mSubscriptionInfo == mySubscriptionInfo) {
49             mSubscriptionInfo = nullptr;
50         }
51 
52     }
53 }
54 
spawnHelperThread(int myToken)55 void ShellSubscriber::spawnHelperThread(int myToken) {
56     std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
57     t.detach();
58 }
59 
waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,int myToken,std::unique_lock<std::mutex> & lock,int timeoutSec)60 void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
61                                                      int myToken,
62                                                      std::unique_lock<std::mutex>& lock,
63                                                      int timeoutSec) {
64     if (timeoutSec > 0) {
65         mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
66             return mToken != myToken || !myInfo->mClientAlive;
67         });
68     } else {
69         mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
70             return mToken != myToken || !myInfo->mClientAlive;
71         });
72     }
73 }
74 
75 // Atomically claim the next token. Token numbers denote subscriber ordering.
claimToken()76 int ShellSubscriber::claimToken() {
77     std::unique_lock<std::mutex> lock(mMutex);
78     int myToken = ++mToken;
79     return myToken;
80 }
81 
82 // Read and parse single config. There should only one config per input.
readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)83 bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
84     // Read the size of the config.
85     size_t bufferSize;
86     if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
87         return false;
88     }
89 
90     // Read the config.
91     vector<uint8_t> buffer(bufferSize);
92     if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
93         return false;
94     }
95 
96     // Parse the config.
97     ShellSubscription config;
98     if (!config.ParseFromArray(buffer.data(), bufferSize)) {
99         return false;
100     }
101 
102     // Update SubscriptionInfo with state from config
103     for (const auto& pushed : config.pushed()) {
104         subscriptionInfo->mPushedMatchers.push_back(pushed);
105     }
106 
107     for (const auto& pulled : config.pulled()) {
108         vector<string> packages;
109         vector<int32_t> uids;
110         for (const string& pkg : pulled.packages()) {
111             auto it = UidMap::sAidToUidMapping.find(pkg);
112             if (it != UidMap::sAidToUidMapping.end()) {
113                 uids.push_back(it->second);
114             } else {
115                 packages.push_back(pkg);
116             }
117         }
118 
119         subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages,
120                                                    uids);
121         VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
122     }
123 
124     return true;
125 }
126 
pullAndSendHeartbeats(int myToken)127 void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
128     VLOG("ShellSubscriber: helper thread %d starting", myToken);
129     while (true) {
130         int64_t sleepTimeMs = INT_MAX;
131         {
132             std::lock_guard<std::mutex> lock(mMutex);
133             if (!mSubscriptionInfo || mToken != myToken) {
134                 VLOG("ShellSubscriber: helper thread %d done!", myToken);
135                 return;
136             }
137 
138             int64_t nowMillis = getElapsedRealtimeMillis();
139             int64_t nowNanos = getElapsedRealtimeNs();
140             for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
141                 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
142                     continue;
143                 }
144 
145                 vector<int32_t> uids;
146                 getUidsForPullAtom(&uids, pullInfo);
147 
148                 vector<std::shared_ptr<LogEvent>> data;
149                 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
150                 VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
151                 writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
152 
153                 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
154             }
155 
156             // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
157             // data from statsd. When it receives the data size of 0, perfd will not expect any
158             // atoms and recheck whether the subscription should end.
159             if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
160                 attemptWriteToPipeLocked(/*dataSize=*/0);
161             }
162 
163             // Determine how long to sleep before doing more work.
164             for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
165                 int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
166                 int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
167                 if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
168             }
169             int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
170             if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
171         }
172 
173         VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
174              (long long)sleepTimeMs);
175         std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
176     }
177 }
178 
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)179 void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
180     uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
181     // This is slow. Consider storing the uids per app and listening to uidmap updates.
182     for (const string& pkg : pullInfo.mPullPackages) {
183         set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
184         uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
185     }
186     uids->push_back(DEFAULT_PULL_UID);
187 }
188 
writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)189 void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
190                                              const SimpleAtomMatcher& matcher) {
191     mProto.clear();
192     int count = 0;
193     for (const auto& event : data) {
194         if (matchesSimple(*mUidMap, matcher, *event)) {
195             count++;
196             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
197                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
198             event->ToProto(mProto);
199             mProto.end(atomToken);
200         }
201     }
202 
203     if (count > 0) attemptWriteToPipeLocked(mProto.size());
204 }
205 
onLogEvent(const LogEvent & event)206 void ShellSubscriber::onLogEvent(const LogEvent& event) {
207     std::lock_guard<std::mutex> lock(mMutex);
208     if (!mSubscriptionInfo) return;
209 
210     mProto.clear();
211     for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
212         if (matchesSimple(*mUidMap, matcher, event)) {
213             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
214                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
215             event.ToProto(mProto);
216             mProto.end(atomToken);
217             attemptWriteToPipeLocked(mProto.size());
218         }
219     }
220 }
221 
222 // Tries to write the atom encoded in mProto to the pipe. If the write fails
223 // because the read end of the pipe has closed, signals to other threads that
224 // the subscription should end.
attemptWriteToPipeLocked(size_t dataSize)225 void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
226     // First, write the payload size.
227     if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
228         mSubscriptionInfo->mClientAlive = false;
229         mSubscriptionShouldEnd.notify_one();
230         return;
231     }
232 
233     // Then, write the payload if this is not just a heartbeat.
234     if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
235         mSubscriptionInfo->mClientAlive = false;
236         mSubscriptionShouldEnd.notify_one();
237         return;
238     }
239 
240     mLastWriteMs = getElapsedRealtimeMillis();
241 }
242 
243 }  // namespace statsd
244 }  // namespace os
245 }  // namespace android
246