1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
18 #define SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
19 
20 #include <array>
21 #include <functional>
22 #include <map>
23 #include <vector>
24 
25 #include <inttypes.h>
26 
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/ext/base/optional.h"
29 #include "perfetto/ext/base/unix_socket.h"
30 #include "perfetto/ext/base/unix_task_runner.h"
31 
32 #include "perfetto/ext/tracing/core/basic_types.h"
33 #include "perfetto/ext/tracing/core/producer.h"
34 #include "perfetto/ext/tracing/core/trace_writer.h"
35 #include "perfetto/ext/tracing/core/tracing_service.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 
38 #include "perfetto/tracing/core/forward_decls.h"
39 #include "src/profiling/common/interning_output.h"
40 #include "src/profiling/common/proc_utils.h"
41 #include "src/profiling/common/profiler_guardrails.h"
42 #include "src/profiling/memory/bookkeeping.h"
43 #include "src/profiling/memory/bookkeeping_dump.h"
44 #include "src/profiling/memory/log_histogram.h"
45 #include "src/profiling/memory/shared_ring_buffer.h"
46 #include "src/profiling/memory/system_property.h"
47 #include "src/profiling/memory/unwinding.h"
48 #include "src/profiling/memory/unwound_messages.h"
49 
50 #include "protos/perfetto/config/profiling/heapprofd_config.gen.h"
51 
52 namespace perfetto {
53 namespace profiling {
54 
55 using HeapprofdConfig = protos::gen::HeapprofdConfig;
56 
57 struct Process {
58   pid_t pid;
59   std::string cmdline;
60 };
61 
62 // TODO(rsavitski): central daemon can do less work if it knows that the global
63 // operating mode is fork-based, as it then will not be interacting with the
64 // clients. This can be implemented as an additional mode here.
65 enum class HeapprofdMode { kCentral, kChild };
66 
67 bool HeapprofdConfigToClientConfiguration(
68     const HeapprofdConfig& heapprofd_config,
69     ClientConfiguration* cli_config);
70 
71 // Heap profiling producer. Can be instantiated in two modes, central and
72 // child (also referred to as fork mode).
73 //
74 // The central mode producer is instantiated by the system heapprofd daemon. Its
75 // primary responsibility is activating profiling (via system properties and
76 // signals) in targets identified by profiling configs. On debug platform
77 // builds, the central producer can also handle the out-of-process unwinding &
78 // writing of the profiles for all client processes.
79 //
80 // An alternative model is where the central heapprofd triggers the profiling in
81 // the target process, but the latter fork-execs a private heapprofd binary to
82 // handle unwinding only for that process. The forked heapprofd instantiates
83 // this producer in the "child" mode. In this scenario, the profiled process
84 // never talks to the system daemon.
85 //
86 // TODO(fmayer||rsavitski): cover interesting invariants/structure of the
87 // implementation (e.g. number of data sources in child mode), including
88 // threading structure.
89 class HeapprofdProducer : public Producer, public UnwindingWorker::Delegate {
90  public:
91   friend class SocketDelegate;
92 
93   // TODO(fmayer): Split into two delegates for the listening socket in kCentral
94   // and for the per-client sockets to make this easier to understand?
95   // Alternatively, find a better name for this.
96   class SocketDelegate : public base::UnixSocket::EventListener {
97    public:
SocketDelegate(HeapprofdProducer * producer)98     explicit SocketDelegate(HeapprofdProducer* producer)
99         : producer_(producer) {}
100 
101     void OnDisconnect(base::UnixSocket* self) override;
102     void OnNewIncomingConnection(
103         base::UnixSocket* self,
104         std::unique_ptr<base::UnixSocket> new_connection) override;
105     void OnDataAvailable(base::UnixSocket* self) override;
106 
107    private:
108     HeapprofdProducer* producer_;
109   };
110 
111   HeapprofdProducer(HeapprofdMode mode,
112                     base::TaskRunner* task_runner,
113                     bool exit_when_done);
114   ~HeapprofdProducer() override;
115 
116   // Producer Impl:
117   void OnConnect() override;
118   void OnDisconnect() override;
119   void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
120   void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
121   void StopDataSource(DataSourceInstanceID) override;
122   void OnTracingSetup() override;
123   void Flush(FlushRequestID,
124              const DataSourceInstanceID* data_source_ids,
125              size_t num_data_sources) override;
ClearIncrementalState(const DataSourceInstanceID *,size_t)126   void ClearIncrementalState(const DataSourceInstanceID* /*data_source_ids*/,
127                              size_t /*num_data_sources*/) override {}
128 
129   // TODO(fmayer): Refactor once/if we have generic reconnect logic.
130   void ConnectWithRetries(const char* socket_name);
131   void DumpAll();
132 
133   // UnwindingWorker::Delegate impl:
134   void PostAllocRecord(UnwindingWorker*, std::unique_ptr<AllocRecord>) override;
135   void PostFreeRecord(UnwindingWorker*, std::vector<FreeRecord>) override;
136   void PostHeapNameRecord(UnwindingWorker*, HeapNameRecord) override;
137   void PostSocketDisconnected(UnwindingWorker*,
138                               DataSourceInstanceID,
139                               pid_t,
140                               SharedRingBuffer::Stats) override;
141 
142   void HandleAllocRecord(AllocRecord*);
143   void HandleFreeRecord(FreeRecord);
144   void HandleHeapNameRecord(HeapNameRecord);
145   void HandleSocketDisconnected(DataSourceInstanceID,
146                                 pid_t,
147                                 SharedRingBuffer::Stats);
148 
149   // Valid only if mode_ == kChild.
150   void SetTargetProcess(pid_t target_pid, std::string target_cmdline);
151   void SetDataSourceCallback(std::function<void()> fn);
152 
153   // Exposed for testing.
154   void SetProducerEndpoint(
155       std::unique_ptr<TracingService::ProducerEndpoint> endpoint);
156 
socket_delegate()157   base::UnixSocket::EventListener& socket_delegate() {
158     return socket_delegate_;
159   }
160 
161   // Adopts the (connected) sockets inherited from the target process, invoking
162   // the on-connection callback.
163   // Specific to mode_ == kChild
164   void AdoptSocket(base::ScopedFile fd);
165 
166   void TerminateWhenDone();
167 
168  private:
169   // State of the connection to tracing service (traced).
170   enum State {
171     kNotStarted = 0,
172     kNotConnected,
173     kConnecting,
174     kConnected,
175   };
176 
177   struct ProcessState {
178     struct HeapInfo {
HeapInfoProcessState::HeapInfo179       HeapInfo(GlobalCallstackTrie* cs, bool dam) : heap_tracker(cs, dam) {}
180 
181       HeapTracker heap_tracker;
182       std::string heap_name;
183       uint64_t sampling_interval = 0u;
184       uint64_t orig_sampling_interval = 0u;
185     };
ProcessStateProcessState186     ProcessState(GlobalCallstackTrie* c, bool d)
187         : callsites(c), dump_at_max_mode(d) {}
188     bool disconnected = false;
189     SharedRingBuffer::ErrorState error_state =
190         SharedRingBuffer::ErrorState::kNoError;
191     bool buffer_corrupted = false;
192 
193     uint64_t heap_samples = 0;
194     uint64_t map_reparses = 0;
195     uint64_t unwinding_errors = 0;
196 
197     uint64_t total_unwinding_time_us = 0;
198     uint64_t client_spinlock_blocked_us = 0;
199     GlobalCallstackTrie* callsites;
200     bool dump_at_max_mode;
201     LogHistogram unwinding_time_us;
202     std::map<uint32_t, HeapInfo> heap_infos;
203 
GetHeapInfoProcessState204     HeapInfo& GetHeapInfo(uint32_t heap_id) {
205       auto it = heap_infos.find(heap_id);
206       if (it == heap_infos.end()) {
207         std::tie(it, std::ignore) = heap_infos.emplace(
208             std::piecewise_construct, std::forward_as_tuple(heap_id),
209             std::forward_as_tuple(callsites, dump_at_max_mode));
210       }
211       return it->second;
212     }
213 
GetHeapTrackerProcessState214     HeapTracker& GetHeapTracker(uint32_t heap_id) {
215       return GetHeapInfo(heap_id).heap_tracker;
216     }
217   };
218 
219   struct DataSource {
DataSourceDataSource220     explicit DataSource(std::unique_ptr<TraceWriter> tw)
221         : trace_writer(std::move(tw)) {
222       // Make MSAN happy.
223       memset(&client_configuration, 0, sizeof(client_configuration));
224     }
225 
226     DataSourceInstanceID id;
227     std::unique_ptr<TraceWriter> trace_writer;
228     DataSourceConfig ds_config;
229     HeapprofdConfig config;
230     ClientConfiguration client_configuration;
231     std::vector<SystemProperties::Handle> properties;
232     std::set<pid_t> signaled_pids;
233     std::set<pid_t> rejected_pids;
234     std::map<pid_t, ProcessState> process_states;
235     std::vector<std::string> normalized_cmdlines;
236     InterningOutputTracker intern_state;
237     bool shutting_down = false;
238     bool started = false;
239     bool hit_guardrail = false;
240     bool was_stopped = false;
241     uint32_t stop_timeout_ms;
242     GuardrailConfig guardrail_config;
243   };
244 
245   struct PendingProcess {
246     std::unique_ptr<base::UnixSocket> sock;
247     DataSourceInstanceID data_source_instance_id;
248     SharedRingBuffer shmem;
249   };
250 
251   void HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,
252                               Process process);
253 
254   void ConnectService();
255   void Restart();
256   void ResetConnectionBackoff();
257   void IncreaseConnectionBackoff();
258 
259   void CheckDataSourceMemoryTask();
260   void CheckDataSourceCpuTask();
261 
262   void FinishDataSourceFlush(FlushRequestID flush_id);
263   void DumpProcessesInDataSource(DataSource* ds);
264   void DumpProcessState(DataSource* ds, pid_t pid, ProcessState* process);
265   static void SetStats(protos::pbzero::ProfilePacket::ProcessStats* stats,
266                        const ProcessState& process_state);
267 
268   void DoContinuousDump(DataSourceInstanceID id, uint32_t dump_interval);
269 
270   UnwindingWorker& UnwinderForPID(pid_t);
271   bool IsPidProfiled(pid_t);
272   DataSource* GetDataSourceForProcess(const Process& proc);
273   void RecordOtherSourcesAsRejected(DataSource* active_ds, const Process& proc);
274 
275   void SetStartupProperties(DataSource* data_source);
276   void SignalRunningProcesses(DataSource* data_source);
277 
278   // Specific to mode_ == kChild
279   void TerminateProcess(int exit_status);
280   // Specific to mode_ == kChild
281   void ActiveDataSourceWatchdogCheck();
282 
283   void ShutdownDataSource(DataSource* ds);
284   bool MaybeFinishDataSource(DataSource* ds);
285 
286   void WriteRejectedConcurrentSession(BufferID buffer_id, pid_t pid);
287 
288   // Class state:
289 
290   // Task runner is owned by the main thread.
291   base::TaskRunner* const task_runner_;
292   const HeapprofdMode mode_;
293   // TODO(fmayer): Refactor to make this boolean unnecessary.
294   // Whether to terminate this producer after the first data-source has
295   // finished.
296   bool exit_when_done_;
297 
298   // State of connection to the tracing service.
299   State state_ = kNotStarted;
300   uint32_t connection_backoff_ms_ = 0;
301   const char* producer_sock_name_ = nullptr;
302 
303   // Client processes that have connected, but with which we have not yet
304   // finished the handshake.
305   std::map<pid_t, PendingProcess> pending_processes_;
306 
307   // Must outlive data_sources_ - owns at least the shared memory referenced by
308   // TraceWriters.
309   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
310 
311   // Must outlive data_sources_ - HeapTracker references the trie.
312   GlobalCallstackTrie callsites_;
313 
314   // Must outlive data_sources_ - DataSource can hold
315   // SystemProperties::Handle-s.
316   // Specific to mode_ == kCentral
317   SystemProperties properties_;
318 
319   std::map<FlushRequestID, size_t> flushes_in_progress_;
320   std::map<DataSourceInstanceID, DataSource> data_sources_;
321   std::vector<UnwindingWorker> unwinding_workers_;
322 
323   // Specific to mode_ == kChild
324   Process target_process_{base::kInvalidPid, ""};
325   base::Optional<std::function<void()>> data_source_callback_;
326 
327   SocketDelegate socket_delegate_;
328 
329   base::WeakPtrFactory<HeapprofdProducer> weak_factory_;  // Keep last.
330 };
331 
332 }  // namespace profiling
333 }  // namespace perfetto
334 
335 #endif  // SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
336