1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriber.h"
20 
21 #include <android-base/file.h>
22 #include "matchers/matcher_util.h"
23 #include "stats_log_util.h"
24 
25 using android::util::ProtoOutputStream;
26 
27 namespace android {
28 namespace os {
29 namespace statsd {
30 
31 const static int FIELD_ID_ATOM = 1;
32 
startNewSubscription(int in,int out,sp<IResultReceiver> resultReceiver,int timeoutSec)33 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
34                                            int timeoutSec) {
35     VLOG("start new shell subscription");
36     {
37         std::lock_guard<std::mutex> lock(mMutex);
38         if (mResultReceiver != nullptr) {
39             VLOG("Only one shell subscriber is allowed.");
40             return;
41         }
42         mInput = in;
43         mOutput = out;
44         mResultReceiver = resultReceiver;
45         IInterface::asBinder(mResultReceiver)->linkToDeath(this);
46     }
47 
48     // Note that the following is blocking, and it's intended as we cannot return until the shell
49     // cmd exits, otherwise all resources & FDs will be automatically closed.
50 
51     // Read config forever until EOF is reached. Clients may send multiple configs -- each new
52     // config replace the previous one.
53     readConfig(in);
54     VLOG("timeout : %d", timeoutSec);
55 
56     // Now we have read an EOF we now wait for the semaphore until the client exits.
57     VLOG("Now wait for client to exit");
58     std::unique_lock<std::mutex> lk(mMutex);
59 
60     if (timeoutSec > 0) {
61         mShellDied.wait_for(lk, timeoutSec * 1s,
62                             [this, resultReceiver] { return mResultReceiver != resultReceiver; });
63     } else {
64         mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
65     }
66 }
67 
updateConfig(const ShellSubscription & config)68 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
69     std::lock_guard<std::mutex> lock(mMutex);
70     mPushedMatchers.clear();
71     mPulledInfo.clear();
72 
73     for (const auto& pushed : config.pushed()) {
74         mPushedMatchers.push_back(pushed);
75         VLOG("adding matcher for atom %d", pushed.atom_id());
76     }
77 
78     int64_t token = getElapsedRealtimeNs();
79     mPullToken = token;
80 
81     int64_t minInterval = -1;
82     for (const auto& pulled : config.pulled()) {
83         // All intervals need to be multiples of the min interval.
84         if (minInterval < 0 || pulled.freq_millis() < minInterval) {
85             minInterval = pulled.freq_millis();
86         }
87 
88         mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
89         VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
90     }
91 
92     if (mPulledInfo.size() > 0 && minInterval > 0) {
93         // This thread is guaranteed to terminate after it detects the token is different or
94         // cleaned up.
95         std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
96         puller.detach();
97     }
98 }
99 
writeToOutputLocked(const vector<std::shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)100 void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
101                                           const SimpleAtomMatcher& matcher) {
102     if (mOutput == 0) return;
103     int count = 0;
104     mProto.clear();
105     for (const auto& event : data) {
106         VLOG("%s", event->ToString().c_str());
107         if (matchesSimple(*mUidMap, matcher, *event)) {
108             VLOG("matched");
109             count++;
110             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
111                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
112             event->ToProto(mProto);
113             mProto.end(atomToken);
114         }
115     }
116 
117     if (count > 0) {
118         // First write the payload size.
119         size_t bufferSize = mProto.size();
120         write(mOutput, &bufferSize, sizeof(bufferSize));
121         VLOG("%d atoms, proto size: %zu", count, bufferSize);
122         // Then write the payload.
123         mProto.flush(mOutput);
124     }
125     mProto.clear();
126 }
127 
startPull(int64_t token,int64_t intervalMillis)128 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
129     while (1) {
130         int64_t nowMillis = getElapsedRealtimeMillis();
131         {
132             std::lock_guard<std::mutex> lock(mMutex);
133             if (mPulledInfo.size() == 0 || mPullToken != token) {
134                 VLOG("Pulling thread %lld done!", (long long)token);
135                 return;
136             }
137             for (auto& pullInfo : mPulledInfo) {
138                 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
139                     VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
140 
141                     vector<std::shared_ptr<LogEvent>> data;
142                     mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
143                     VLOG("pulled %zu atoms", data.size());
144                     if (data.size() > 0) {
145                         writeToOutputLocked(data, pullInfo.mPullerMatcher);
146                     }
147                     pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
148                 }
149             }
150         }
151         VLOG("Pulling thread %lld sleep....", (long long)token);
152         std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
153     }
154 }
155 
readConfig(int in)156 void ShellSubscriber::readConfig(int in) {
157     if (in <= 0) {
158         return;
159     }
160 
161     while (1) {
162         size_t bufferSize = 0;
163         int result = 0;
164         if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
165             VLOG("Done reading");
166             break;
167         } else if (result < 0 || result != sizeof(bufferSize)) {
168             ALOGE("Error reading config size");
169             break;
170         }
171 
172         vector<uint8_t> buffer(bufferSize);
173         if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
174             ShellSubscription config;
175             if (config.ParseFromArray(buffer.data(), bufferSize)) {
176                 updateConfig(config);
177             } else {
178                 ALOGE("error parsing the config");
179                 break;
180             }
181         } else {
182             VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
183             break;
184         }
185     }
186 }
187 
cleanUpLocked()188 void ShellSubscriber::cleanUpLocked() {
189     // The file descriptors will be closed by binder.
190     mInput = 0;
191     mOutput = 0;
192     mResultReceiver = nullptr;
193     mPushedMatchers.clear();
194     mPulledInfo.clear();
195     mPullToken = 0;
196     VLOG("done clean up");
197 }
198 
onLogEvent(const LogEvent & event)199 void ShellSubscriber::onLogEvent(const LogEvent& event) {
200     std::lock_guard<std::mutex> lock(mMutex);
201 
202     if (mOutput <= 0) {
203         return;
204     }
205     for (const auto& matcher : mPushedMatchers) {
206         if (matchesSimple(*mUidMap, matcher, event)) {
207             VLOG("%s", event.ToString().c_str());
208             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
209                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
210             event.ToProto(mProto);
211             mProto.end(atomToken);
212             // First write the payload size.
213             size_t bufferSize = mProto.size();
214             write(mOutput, &bufferSize, sizeof(bufferSize));
215 
216             // Then write the payload.
217             mProto.flush(mOutput);
218             mProto.clear();
219             break;
220         }
221     }
222 }
223 
binderDied(const wp<IBinder> & who)224 void ShellSubscriber::binderDied(const wp<IBinder>& who) {
225     {
226         VLOG("Shell exits");
227         std::lock_guard<std::mutex> lock(mMutex);
228         cleanUpLocked();
229     }
230     mShellDied.notify_all();
231 }
232 
233 }  // namespace statsd
234 }  // namespace os
235 }  // namespace android
236