1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define STATSD_DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriber.h"
20
21 #include <android-base/file.h>
22 #include <inttypes.h>
23 #include <utils/Timers.h>
24
25 #include "guardrail/StatsdStats.h"
26 #include "stats_log_util.h"
27 #include "utils/api_tracing.h"
28
29 using aidl::android::os::IStatsSubscriptionCallback;
30
31 namespace android {
32 namespace os {
33 namespace statsd {
34
~ShellSubscriber()35 ShellSubscriber::~ShellSubscriber() {
36 {
37 std::unique_lock<std::mutex> lock(mMutex);
38 mClientSet.clear();
39 updateLogEventFilterLocked();
40 }
41 mThreadSleepCV.notify_one();
42 if (mThread.joinable()) {
43 mThread.join();
44 }
45 }
46
startNewSubscription(int in,int out,int64_t timeoutSec)47 bool ShellSubscriber::startNewSubscription(int in, int out, int64_t timeoutSec) {
48 std::unique_lock<std::mutex> lock(mMutex);
49 VLOG("ShellSubscriber: new subscription has come in");
50 if (mClientSet.size() >= kMaxSubscriptions) {
51 ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
52 "%zu. Limit: %zu",
53 mClientSet.size(), kMaxSubscriptions);
54 return false;
55 }
56
57 return startNewSubscriptionLocked(ShellSubscriberClient::create(
58 in, out, timeoutSec, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
59 }
60
startNewSubscription(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback)61 bool ShellSubscriber::startNewSubscription(const vector<uint8_t>& subscriptionConfig,
62 const shared_ptr<IStatsSubscriptionCallback>& callback) {
63 std::unique_lock<std::mutex> lock(mMutex);
64 VLOG("ShellSubscriber: new subscription has come in");
65 if (mClientSet.size() >= kMaxSubscriptions) {
66 ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
67 "%zu. Limit: %zu",
68 mClientSet.size(), kMaxSubscriptions);
69 return false;
70 }
71
72 return startNewSubscriptionLocked(ShellSubscriberClient::create(
73 subscriptionConfig, callback, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
74 }
75
startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client)76 bool ShellSubscriber::startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client) {
77 if (client == nullptr) return false;
78
79 // Add new valid client to the client set
80 mClientSet.insert(std::move(client));
81 updateLogEventFilterLocked();
82
83 // Only spawn one thread to manage pulling atoms and sending
84 // heartbeats.
85 if (!mThreadAlive) {
86 mThreadAlive = true;
87 if (mThread.joinable()) {
88 mThread.join();
89 }
90 mThread = thread([this] { pullAndSendHeartbeats(); });
91 }
92
93 return true;
94 }
95
96 // Sends heartbeat signals and sleeps between doing work
pullAndSendHeartbeats()97 void ShellSubscriber::pullAndSendHeartbeats() {
98 VLOG("ShellSubscriber: helper thread starting");
99 std::unique_lock<std::mutex> lock(mMutex);
100 while (true) {
101 StatsdStats::getInstance().noteSubscriptionPullThreadWakeup();
102 int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours.
103 const int64_t nowNanos = getElapsedRealtimeNs();
104 const int64_t nowMillis = nanoseconds_to_milliseconds(nowNanos);
105 const int64_t nowSecs = nanoseconds_to_seconds(nowNanos);
106 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
107 int64_t subscriptionSleepMs =
108 (*clientIt)->pullAndSendHeartbeatsIfNeeded(nowSecs, nowMillis, nowNanos);
109 sleepTimeMs = std::min(sleepTimeMs, subscriptionSleepMs);
110 if ((*clientIt)->isAlive()) {
111 ++clientIt;
112 } else {
113 VLOG("ShellSubscriber: removing client!");
114 (*clientIt)->onUnsubscribe();
115 clientIt = mClientSet.erase(clientIt);
116 updateLogEventFilterLocked();
117 }
118 }
119 if (mClientSet.empty()) {
120 mThreadAlive = false;
121 VLOG("ShellSubscriber: helper thread done!");
122 return;
123 }
124 VLOG("ShellSubscriber: helper thread sleeping for %" PRId64 "ms", sleepTimeMs);
125 mThreadSleepCV.wait_for(lock, sleepTimeMs * 1ms, [this] { return mClientSet.empty(); });
126 }
127 }
128
onLogEvent(const LogEvent & event)129 void ShellSubscriber::onLogEvent(const LogEvent& event) {
130 ATRACE_CALL();
131 // Skip if event is skipped
132 if (event.isParsedHeaderOnly()) {
133 return;
134 }
135 // Skip RestrictedLogEvents
136 if (event.isRestricted()) {
137 return;
138 }
139 std::unique_lock<std::mutex> lock(mMutex);
140 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
141 (*clientIt)->onLogEvent(event);
142 if ((*clientIt)->isAlive()) {
143 ++clientIt;
144 } else {
145 VLOG("ShellSubscriber: removing client!");
146
147 (*clientIt)->onUnsubscribe();
148 clientIt = mClientSet.erase(clientIt);
149 updateLogEventFilterLocked();
150 }
151 }
152 }
153
flushSubscription(const shared_ptr<IStatsSubscriptionCallback> & callback)154 void ShellSubscriber::flushSubscription(const shared_ptr<IStatsSubscriptionCallback>& callback) {
155 std::unique_lock<std::mutex> lock(mMutex);
156
157 // TODO(b/268822860): Consider storing callback clients in a map keyed by
158 // IStatsSubscriptionCallback to avoid this linear search.
159 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
160 if ((*clientIt)->hasCallback(callback)) {
161 if ((*clientIt)->isAlive()) {
162 (*clientIt)->flush();
163 } else {
164 VLOG("ShellSubscriber: removing client!");
165
166 (*clientIt)->onUnsubscribe();
167
168 // Erasing a value moves the iterator to the next value. The update expression also
169 // moves the iterator, skipping a value. This is fine because we do an early return
170 // before next iteration of the loop.
171 clientIt = mClientSet.erase(clientIt);
172 updateLogEventFilterLocked();
173 }
174 return;
175 }
176 }
177 }
178
unsubscribe(const shared_ptr<IStatsSubscriptionCallback> & callback)179 void ShellSubscriber::unsubscribe(const shared_ptr<IStatsSubscriptionCallback>& callback) {
180 std::unique_lock<std::mutex> lock(mMutex);
181
182 // TODO(b/268822860): Consider storing callback clients in a map keyed by
183 // IStatsSubscriptionCallback to avoid this linear search.
184 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
185 if ((*clientIt)->hasCallback(callback)) {
186 VLOG("ShellSubscriber: removing client!");
187
188 (*clientIt)->onUnsubscribe();
189
190 // Erasing a value moves the iterator to the next value. The update expression also
191 // moves the iterator, skipping a value. This is fine because we do an early return
192 // before next iteration of the loop.
193 clientIt = mClientSet.erase(clientIt);
194 updateLogEventFilterLocked();
195 return;
196 }
197 }
198 }
199
updateLogEventFilterLocked() const200 void ShellSubscriber::updateLogEventFilterLocked() const {
201 VLOG("ShellSubscriber: Updating allAtomIds");
202 LogEventFilter::AtomIdSet allAtomIds;
203 for (const auto& client : mClientSet) {
204 client->addAllAtomIds(allAtomIds);
205 }
206 VLOG("ShellSubscriber: Updating allAtomIds done. Total atoms %d", (int)allAtomIds.size());
207 mLogEventFilter->setAtomIds(std::move(allAtomIds), this);
208 }
209
210 } // namespace statsd
211 } // namespace os
212 } // namespace android
213