1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <android/util/ProtoOutputStream.h> 20 #include <private/android_filesystem_config.h> 21 22 #include <condition_variable> 23 #include <mutex> 24 #include <thread> 25 26 #include "external/StatsPullerManager.h" 27 #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" 28 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" 29 #include "logd/LogEvent.h" 30 #include "packages/UidMap.h" 31 32 namespace android { 33 namespace os { 34 namespace statsd { 35 36 /** 37 * Handles atoms subscription via shell cmd. 38 * 39 * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client 40 * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms. 41 * The atoms are sent back to the client in real time, as opposed to keeping the data in memory. 42 * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the 43 * aggregation after receiving the atom events. 44 * 45 * Shell clients pass ShellSubscription in the proto binary format. Clients can update the 46 * subscription by sending a new subscription. The new subscription would replace the old one. 47 * Input data stream format is: 48 * 49 * |size_t|subscription proto|size_t|subscription proto|.... 50 * 51 * statsd sends the events back in Atom proto binary format. Each Atom message is preceded 52 * with sizeof(size_t) bytes indicating the size of the proto message payload. 53 * 54 * The stream would be in the following format: 55 * |size_t|shellData proto|size_t|shellData proto|.... 56 * 57 * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread 58 * until it exits. 59 */ 60 class ShellSubscriber : public virtual RefBase { 61 public: ShellSubscriber(sp<UidMap> uidMap,sp<StatsPullerManager> pullerMgr)62 ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) 63 : mUidMap(uidMap), mPullerMgr(pullerMgr){}; 64 65 void startNewSubscription(int inFd, int outFd, int timeoutSec); 66 67 void onLogEvent(const LogEvent& event); 68 69 private: 70 struct PullInfo { PullInfoPullInfo71 PullInfo(const SimpleAtomMatcher& matcher, int64_t interval, 72 const std::vector<std::string>& packages, const std::vector<int32_t>& uids) 73 : mPullerMatcher(matcher), 74 mInterval(interval), 75 mPrevPullElapsedRealtimeMs(0), 76 mPullPackages(packages), 77 mPullUids(uids) { 78 } 79 SimpleAtomMatcher mPullerMatcher; 80 int64_t mInterval; 81 int64_t mPrevPullElapsedRealtimeMs; 82 std::vector<std::string> mPullPackages; 83 std::vector<int32_t> mPullUids; 84 }; 85 86 struct SubscriptionInfo { SubscriptionInfoSubscriptionInfo87 SubscriptionInfo(const int& inputFd, const int& outputFd) 88 : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) { 89 } 90 91 int mInputFd; 92 int mOutputFd; 93 std::vector<SimpleAtomMatcher> mPushedMatchers; 94 std::vector<PullInfo> mPulledInfo; 95 bool mClientAlive; 96 }; 97 98 int claimToken(); 99 100 bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); 101 102 void spawnHelperThread(int myToken); 103 104 void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo, 105 int myToken, 106 std::unique_lock<std::mutex>& lock, 107 int timeoutSec); 108 109 // Helper thread that pulls atoms at a regular frequency and sends 110 // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must 111 // send heartbeats for perfd to escape a blocking read call and recheck if 112 // the user has terminated the subscription. 113 void pullAndSendHeartbeats(int myToken); 114 115 void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, 116 const SimpleAtomMatcher& matcher); 117 118 void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo); 119 120 void attemptWriteToPipeLocked(size_t dataSize); 121 122 sp<UidMap> mUidMap; 123 124 sp<StatsPullerManager> mPullerMgr; 125 126 android::util::ProtoOutputStream mProto; 127 128 mutable std::mutex mMutex; 129 130 std::condition_variable mSubscriptionShouldEnd; 131 132 std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr; 133 134 int mToken = 0; 135 136 const int32_t DEFAULT_PULL_UID = AID_SYSTEM; 137 138 // Tracks when we last send data to perfd. We need that time to determine 139 // when next to send a heartbeat. 140 int64_t mLastWriteMs = 0; 141 const int64_t kMsBetweenHeartbeats = 1000; 142 }; 143 144 } // namespace statsd 145 } // namespace os 146 } // namespace android 147