1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriber.h"
20
21 #include <android-base/file.h>
22 #include "matchers/matcher_util.h"
23 #include "stats_log_util.h"
24
25 using android::util::ProtoOutputStream;
26
27 namespace android {
28 namespace os {
29 namespace statsd {
30
31 const static int FIELD_ID_ATOM = 1;
32
startNewSubscription(int in,int out,sp<IResultReceiver> resultReceiver,int timeoutSec)33 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
34 int timeoutSec) {
35 VLOG("start new shell subscription");
36 {
37 std::lock_guard<std::mutex> lock(mMutex);
38 if (mResultReceiver != nullptr) {
39 VLOG("Only one shell subscriber is allowed.");
40 return;
41 }
42 mInput = in;
43 mOutput = out;
44 mResultReceiver = resultReceiver;
45 IInterface::asBinder(mResultReceiver)->linkToDeath(this);
46 }
47
48 // Note that the following is blocking, and it's intended as we cannot return until the shell
49 // cmd exits, otherwise all resources & FDs will be automatically closed.
50
51 // Read config forever until EOF is reached. Clients may send multiple configs -- each new
52 // config replace the previous one.
53 readConfig(in);
54 VLOG("timeout : %d", timeoutSec);
55
56 // Now we have read an EOF we now wait for the semaphore until the client exits.
57 VLOG("Now wait for client to exit");
58 std::unique_lock<std::mutex> lk(mMutex);
59
60 if (timeoutSec > 0) {
61 mShellDied.wait_for(lk, timeoutSec * 1s,
62 [this, resultReceiver] { return mResultReceiver != resultReceiver; });
63 } else {
64 mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
65 }
66 }
67
updateConfig(const ShellSubscription & config)68 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
69 std::lock_guard<std::mutex> lock(mMutex);
70 mPushedMatchers.clear();
71 mPulledInfo.clear();
72
73 for (const auto& pushed : config.pushed()) {
74 mPushedMatchers.push_back(pushed);
75 VLOG("adding matcher for atom %d", pushed.atom_id());
76 }
77
78 int64_t token = getElapsedRealtimeNs();
79 mPullToken = token;
80
81 int64_t minInterval = -1;
82 for (const auto& pulled : config.pulled()) {
83 // All intervals need to be multiples of the min interval.
84 if (minInterval < 0 || pulled.freq_millis() < minInterval) {
85 minInterval = pulled.freq_millis();
86 }
87
88 mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
89 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
90 }
91
92 if (mPulledInfo.size() > 0 && minInterval > 0) {
93 // This thread is guaranteed to terminate after it detects the token is different or
94 // cleaned up.
95 std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
96 puller.detach();
97 }
98 }
99
writeToOutputLocked(const vector<std::shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)100 void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
101 const SimpleAtomMatcher& matcher) {
102 if (mOutput == 0) return;
103 int count = 0;
104 mProto.clear();
105 for (const auto& event : data) {
106 VLOG("%s", event->ToString().c_str());
107 if (matchesSimple(*mUidMap, matcher, *event)) {
108 VLOG("matched");
109 count++;
110 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
111 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
112 event->ToProto(mProto);
113 mProto.end(atomToken);
114 }
115 }
116
117 if (count > 0) {
118 // First write the payload size.
119 size_t bufferSize = mProto.size();
120 write(mOutput, &bufferSize, sizeof(bufferSize));
121 VLOG("%d atoms, proto size: %zu", count, bufferSize);
122 // Then write the payload.
123 mProto.flush(mOutput);
124 }
125 mProto.clear();
126 }
127
startPull(int64_t token,int64_t intervalMillis)128 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
129 while (1) {
130 int64_t nowMillis = getElapsedRealtimeMillis();
131 {
132 std::lock_guard<std::mutex> lock(mMutex);
133 if (mPulledInfo.size() == 0 || mPullToken != token) {
134 VLOG("Pulling thread %lld done!", (long long)token);
135 return;
136 }
137 for (auto& pullInfo : mPulledInfo) {
138 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
139 VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
140
141 vector<std::shared_ptr<LogEvent>> data;
142 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
143 VLOG("pulled %zu atoms", data.size());
144 if (data.size() > 0) {
145 writeToOutputLocked(data, pullInfo.mPullerMatcher);
146 }
147 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
148 }
149 }
150 }
151 VLOG("Pulling thread %lld sleep....", (long long)token);
152 std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
153 }
154 }
155
readConfig(int in)156 void ShellSubscriber::readConfig(int in) {
157 if (in <= 0) {
158 return;
159 }
160
161 while (1) {
162 size_t bufferSize = 0;
163 int result = 0;
164 if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
165 VLOG("Done reading");
166 break;
167 } else if (result < 0 || result != sizeof(bufferSize)) {
168 ALOGE("Error reading config size");
169 break;
170 }
171
172 vector<uint8_t> buffer(bufferSize);
173 if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
174 ShellSubscription config;
175 if (config.ParseFromArray(buffer.data(), bufferSize)) {
176 updateConfig(config);
177 } else {
178 ALOGE("error parsing the config");
179 break;
180 }
181 } else {
182 VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
183 break;
184 }
185 }
186 }
187
cleanUpLocked()188 void ShellSubscriber::cleanUpLocked() {
189 // The file descriptors will be closed by binder.
190 mInput = 0;
191 mOutput = 0;
192 mResultReceiver = nullptr;
193 mPushedMatchers.clear();
194 mPulledInfo.clear();
195 mPullToken = 0;
196 VLOG("done clean up");
197 }
198
onLogEvent(const LogEvent & event)199 void ShellSubscriber::onLogEvent(const LogEvent& event) {
200 std::lock_guard<std::mutex> lock(mMutex);
201
202 if (mOutput <= 0) {
203 return;
204 }
205 for (const auto& matcher : mPushedMatchers) {
206 if (matchesSimple(*mUidMap, matcher, event)) {
207 VLOG("%s", event.ToString().c_str());
208 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
209 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
210 event.ToProto(mProto);
211 mProto.end(atomToken);
212 // First write the payload size.
213 size_t bufferSize = mProto.size();
214 write(mOutput, &bufferSize, sizeof(bufferSize));
215
216 // Then write the payload.
217 mProto.flush(mOutput);
218 mProto.clear();
219 break;
220 }
221 }
222 }
223
binderDied(const wp<IBinder> & who)224 void ShellSubscriber::binderDied(const wp<IBinder>& who) {
225 {
226 VLOG("Shell exits");
227 std::lock_guard<std::mutex> lock(mMutex);
228 cleanUpLocked();
229 }
230 mShellDied.notify_all();
231 }
232
233 } // namespace statsd
234 } // namespace os
235 } // namespace android
236