1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define STATSD_DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriber.h"
20 
21 #include <android-base/file.h>
22 #include <inttypes.h>
23 #include <utils/Timers.h>
24 
25 #include "guardrail/StatsdStats.h"
26 #include "stats_log_util.h"
27 #include "utils/api_tracing.h"
28 
29 using aidl::android::os::IStatsSubscriptionCallback;
30 
31 namespace android {
32 namespace os {
33 namespace statsd {
34 
~ShellSubscriber()35 ShellSubscriber::~ShellSubscriber() {
36     {
37         std::unique_lock<std::mutex> lock(mMutex);
38         mClientSet.clear();
39         updateLogEventFilterLocked();
40     }
41     mThreadSleepCV.notify_one();
42     if (mThread.joinable()) {
43         mThread.join();
44     }
45 }
46 
startNewSubscription(int in,int out,int64_t timeoutSec)47 bool ShellSubscriber::startNewSubscription(int in, int out, int64_t timeoutSec) {
48     std::unique_lock<std::mutex> lock(mMutex);
49     VLOG("ShellSubscriber: new subscription has come in");
50     if (mClientSet.size() >= kMaxSubscriptions) {
51         ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
52               "%zu. Limit: %zu",
53               mClientSet.size(), kMaxSubscriptions);
54         return false;
55     }
56 
57     return startNewSubscriptionLocked(ShellSubscriberClient::create(
58             in, out, timeoutSec, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
59 }
60 
startNewSubscription(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback)61 bool ShellSubscriber::startNewSubscription(const vector<uint8_t>& subscriptionConfig,
62                                            const shared_ptr<IStatsSubscriptionCallback>& callback) {
63     std::unique_lock<std::mutex> lock(mMutex);
64     VLOG("ShellSubscriber: new subscription has come in");
65     if (mClientSet.size() >= kMaxSubscriptions) {
66         ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
67               "%zu. Limit: %zu",
68               mClientSet.size(), kMaxSubscriptions);
69         return false;
70     }
71 
72     return startNewSubscriptionLocked(ShellSubscriberClient::create(
73             subscriptionConfig, callback, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
74 }
75 
startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client)76 bool ShellSubscriber::startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client) {
77     if (client == nullptr) return false;
78 
79     // Add new valid client to the client set
80     mClientSet.insert(std::move(client));
81     updateLogEventFilterLocked();
82 
83     // Only spawn one thread to manage pulling atoms and sending
84     // heartbeats.
85     if (!mThreadAlive) {
86         mThreadAlive = true;
87         if (mThread.joinable()) {
88             mThread.join();
89         }
90         mThread = thread([this] { pullAndSendHeartbeats(); });
91     }
92 
93     return true;
94 }
95 
96 // Sends heartbeat signals and sleeps between doing work
pullAndSendHeartbeats()97 void ShellSubscriber::pullAndSendHeartbeats() {
98     VLOG("ShellSubscriber: helper thread starting");
99     std::unique_lock<std::mutex> lock(mMutex);
100     while (true) {
101         StatsdStats::getInstance().noteSubscriptionPullThreadWakeup();
102         int64_t sleepTimeMs = 24 * 60 * 60 * 1000;  // 24 hours.
103         const int64_t nowNanos = getElapsedRealtimeNs();
104         const int64_t nowMillis = nanoseconds_to_milliseconds(nowNanos);
105         const int64_t nowSecs = nanoseconds_to_seconds(nowNanos);
106         for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
107             int64_t subscriptionSleepMs =
108                     (*clientIt)->pullAndSendHeartbeatsIfNeeded(nowSecs, nowMillis, nowNanos);
109             sleepTimeMs = std::min(sleepTimeMs, subscriptionSleepMs);
110             if ((*clientIt)->isAlive()) {
111                 ++clientIt;
112             } else {
113                 VLOG("ShellSubscriber: removing client!");
114                 (*clientIt)->onUnsubscribe();
115                 clientIt = mClientSet.erase(clientIt);
116                 updateLogEventFilterLocked();
117             }
118         }
119         if (mClientSet.empty()) {
120             mThreadAlive = false;
121             VLOG("ShellSubscriber: helper thread done!");
122             return;
123         }
124         VLOG("ShellSubscriber: helper thread sleeping for %" PRId64 "ms", sleepTimeMs);
125         mThreadSleepCV.wait_for(lock, sleepTimeMs * 1ms, [this] { return mClientSet.empty(); });
126     }
127 }
128 
onLogEvent(const LogEvent & event)129 void ShellSubscriber::onLogEvent(const LogEvent& event) {
130     ATRACE_CALL();
131     // Skip if event is skipped
132     if (event.isParsedHeaderOnly()) {
133         return;
134     }
135     // Skip RestrictedLogEvents
136     if (event.isRestricted()) {
137         return;
138     }
139     std::unique_lock<std::mutex> lock(mMutex);
140     for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
141         (*clientIt)->onLogEvent(event);
142         if ((*clientIt)->isAlive()) {
143             ++clientIt;
144         } else {
145             VLOG("ShellSubscriber: removing client!");
146 
147             (*clientIt)->onUnsubscribe();
148             clientIt = mClientSet.erase(clientIt);
149             updateLogEventFilterLocked();
150         }
151     }
152 }
153 
flushSubscription(const shared_ptr<IStatsSubscriptionCallback> & callback)154 void ShellSubscriber::flushSubscription(const shared_ptr<IStatsSubscriptionCallback>& callback) {
155     std::unique_lock<std::mutex> lock(mMutex);
156 
157     // TODO(b/268822860): Consider storing callback clients in a map keyed by
158     // IStatsSubscriptionCallback to avoid this linear search.
159     for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
160         if ((*clientIt)->hasCallback(callback)) {
161             if ((*clientIt)->isAlive()) {
162                 (*clientIt)->flush();
163             } else {
164                 VLOG("ShellSubscriber: removing client!");
165 
166                 (*clientIt)->onUnsubscribe();
167 
168                 // Erasing a value moves the iterator to the next value. The update expression also
169                 // moves the iterator, skipping a value. This is fine because we do an early return
170                 // before next iteration of the loop.
171                 clientIt = mClientSet.erase(clientIt);
172                 updateLogEventFilterLocked();
173             }
174             return;
175         }
176     }
177 }
178 
unsubscribe(const shared_ptr<IStatsSubscriptionCallback> & callback)179 void ShellSubscriber::unsubscribe(const shared_ptr<IStatsSubscriptionCallback>& callback) {
180     std::unique_lock<std::mutex> lock(mMutex);
181 
182     // TODO(b/268822860): Consider storing callback clients in a map keyed by
183     // IStatsSubscriptionCallback to avoid this linear search.
184     for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
185         if ((*clientIt)->hasCallback(callback)) {
186             VLOG("ShellSubscriber: removing client!");
187 
188             (*clientIt)->onUnsubscribe();
189 
190             // Erasing a value moves the iterator to the next value. The update expression also
191             // moves the iterator, skipping a value. This is fine because we do an early return
192             // before next iteration of the loop.
193             clientIt = mClientSet.erase(clientIt);
194             updateLogEventFilterLocked();
195             return;
196         }
197     }
198 }
199 
updateLogEventFilterLocked() const200 void ShellSubscriber::updateLogEventFilterLocked() const {
201     VLOG("ShellSubscriber: Updating allAtomIds");
202     LogEventFilter::AtomIdSet allAtomIds;
203     for (const auto& client : mClientSet) {
204         client->addAllAtomIds(allAtomIds);
205     }
206     VLOG("ShellSubscriber: Updating allAtomIds done. Total atoms %d", (int)allAtomIds.size());
207     mLogEventFilter->setAtomIds(std::move(allAtomIds), this);
208 }
209 
210 }  // namespace statsd
211 }  // namespace os
212 }  // namespace android
213