1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <android/util/ProtoOutputStream.h>
20 #include <private/android_filesystem_config.h>
21 
22 #include <condition_variable>
23 #include <mutex>
24 #include <thread>
25 
26 #include "external/StatsPullerManager.h"
27 #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
28 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
29 #include "logd/LogEvent.h"
30 #include "packages/UidMap.h"
31 
32 namespace android {
33 namespace os {
34 namespace statsd {
35 
36 /**
37  * Handles atoms subscription via shell cmd.
38  *
39  * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
40  * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
41  * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
42  * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
43  * aggregation after receiving the atom events.
44  *
45  * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
46  * subscription by sending a new subscription. The new subscription would replace the old one.
47  * Input data stream format is:
48  *
49  * |size_t|subscription proto|size_t|subscription proto|....
50  *
51  * statsd sends the events back in Atom proto binary format. Each Atom message is preceded
52  * with sizeof(size_t) bytes indicating the size of the proto message payload.
53  *
54  * The stream would be in the following format:
55  * |size_t|shellData proto|size_t|shellData proto|....
56  *
57  * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
58  * until it exits.
59  */
60 class ShellSubscriber : public virtual RefBase {
61 public:
ShellSubscriber(sp<UidMap> uidMap,sp<StatsPullerManager> pullerMgr)62     ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
63         : mUidMap(uidMap), mPullerMgr(pullerMgr){};
64 
65     void startNewSubscription(int inFd, int outFd, int timeoutSec);
66 
67     void onLogEvent(const LogEvent& event);
68 
69 private:
70     struct PullInfo {
PullInfoPullInfo71         PullInfo(const SimpleAtomMatcher& matcher, int64_t interval,
72                  const std::vector<std::string>& packages, const std::vector<int32_t>& uids)
73             : mPullerMatcher(matcher),
74               mInterval(interval),
75               mPrevPullElapsedRealtimeMs(0),
76               mPullPackages(packages),
77               mPullUids(uids) {
78         }
79         SimpleAtomMatcher mPullerMatcher;
80         int64_t mInterval;
81         int64_t mPrevPullElapsedRealtimeMs;
82         std::vector<std::string> mPullPackages;
83         std::vector<int32_t> mPullUids;
84     };
85 
86     struct SubscriptionInfo {
SubscriptionInfoSubscriptionInfo87         SubscriptionInfo(const int& inputFd, const int& outputFd)
88             : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) {
89         }
90 
91         int mInputFd;
92         int mOutputFd;
93         std::vector<SimpleAtomMatcher> mPushedMatchers;
94         std::vector<PullInfo> mPulledInfo;
95         bool mClientAlive;
96     };
97 
98     int claimToken();
99 
100     bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
101 
102     void spawnHelperThread(int myToken);
103 
104     void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
105                                         int myToken,
106                                         std::unique_lock<std::mutex>& lock,
107                                         int timeoutSec);
108 
109     // Helper thread that pulls atoms at a regular frequency and sends
110     // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must
111     // send heartbeats for perfd to escape a blocking read call and recheck if
112     // the user has terminated the subscription.
113     void pullAndSendHeartbeats(int myToken);
114 
115     void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
116                                 const SimpleAtomMatcher& matcher);
117 
118     void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);
119 
120     void attemptWriteToPipeLocked(size_t dataSize);
121 
122     sp<UidMap> mUidMap;
123 
124     sp<StatsPullerManager> mPullerMgr;
125 
126     android::util::ProtoOutputStream mProto;
127 
128     mutable std::mutex mMutex;
129 
130     std::condition_variable mSubscriptionShouldEnd;
131 
132     std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr;
133 
134     int mToken = 0;
135 
136     const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
137 
138     // Tracks when we last send data to perfd. We need that time to determine
139     // when next to send a heartbeat.
140     int64_t mLastWriteMs = 0;
141     const int64_t kMsBetweenHeartbeats = 1000;
142 };
143 
144 }  // namespace statsd
145 }  // namespace os
146 }  // namespace android
147