1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/memory/heapprofd_producer.h"
18 
19 #include <algorithm>
20 #include <functional>
21 #include <string>
22 
23 #include <inttypes.h>
24 #include <signal.h>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <unistd.h>
28 
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/logging.h"
31 #include "perfetto/ext/base/file_utils.h"
32 #include "perfetto/ext/base/optional.h"
33 #include "perfetto/ext/base/string_splitter.h"
34 #include "perfetto/ext/base/string_utils.h"
35 #include "perfetto/ext/base/thread_task_runner.h"
36 #include "perfetto/ext/base/watchdog_posix.h"
37 #include "perfetto/ext/tracing/core/basic_types.h"
38 #include "perfetto/ext/tracing/core/trace_writer.h"
39 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
40 #include "perfetto/tracing/core/data_source_config.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "perfetto/tracing/core/forward_decls.h"
43 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
44 #include "src/profiling/common/producer_support.h"
45 #include "src/profiling/common/profiler_guardrails.h"
46 #include "src/profiling/memory/shared_ring_buffer.h"
47 #include "src/profiling/memory/unwound_messages.h"
48 #include "src/profiling/memory/wire_protocol.h"
49 
50 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
51 #include <sys/system_properties.h>
52 #endif
53 
54 namespace perfetto {
55 namespace profiling {
56 namespace {
57 using ::perfetto::protos::pbzero::ProfilePacket;
58 
59 constexpr char kHeapprofdDataSource[] = "android.heapprofd";
60 constexpr size_t kUnwinderThreads = 5;
61 
62 constexpr uint32_t kInitialConnectionBackoffMs = 100;
63 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
64 constexpr uint32_t kGuardrailIntervalMs = 30 * 1000;
65 
66 constexpr uint32_t kChildModeWatchdogPeriodMs = 10 * 1000;
67 
68 constexpr uint64_t kDefaultShmemSize = 8 * 1048576;  // ~8 MB
69 constexpr uint64_t kMaxShmemSize = 500 * 1048576;    // ~500 MB
70 
71 // Constants specified by bionic, hardcoded here for simplicity.
72 constexpr int kProfilingSignal = __SIGRTMIN + 4;
73 constexpr int kHeapprofdSignalValue = 0;
74 
MakeUnwindingWorkers(HeapprofdProducer * delegate,size_t n)75 std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
76                                                   size_t n) {
77   std::vector<UnwindingWorker> ret;
78   for (size_t i = 0; i < n; ++i) {
79     ret.emplace_back(delegate,
80                      base::ThreadTaskRunner::CreateAndStart("heapprofdunwind"));
81   }
82   return ret;
83 }
84 
ConfigTargetsProcess(const HeapprofdConfig & cfg,const Process & proc,const std::vector<std::string> & normalized_cmdlines)85 bool ConfigTargetsProcess(const HeapprofdConfig& cfg,
86                           const Process& proc,
87                           const std::vector<std::string>& normalized_cmdlines) {
88   if (cfg.all())
89     return true;
90 
91   const auto& pids = cfg.pid();
92   if (std::find(pids.cbegin(), pids.cend(), static_cast<uint64_t>(proc.pid)) !=
93       pids.cend()) {
94     return true;
95   }
96 
97   if (std::find(normalized_cmdlines.cbegin(), normalized_cmdlines.cend(),
98                 proc.cmdline) != normalized_cmdlines.cend()) {
99     return true;
100   }
101   return false;
102 }
103 
104 
IsFile(int fd,const char * fn)105 bool IsFile(int fd, const char* fn) {
106   struct stat fdstat;
107   struct stat fnstat;
108   if (fstat(fd, &fdstat) == -1) {
109     PERFETTO_PLOG("fstat");
110     return false;
111   }
112   if (lstat(fn, &fnstat) == -1) {
113     PERFETTO_PLOG("lstat");
114     return false;
115   }
116   return fdstat.st_ino == fnstat.st_ino;
117 }
118 
119 protos::pbzero::ProfilePacket::ProcessHeapSamples::ClientError
ErrorStateToProto(SharedRingBuffer::ErrorState state)120 ErrorStateToProto(SharedRingBuffer::ErrorState state) {
121   switch (state) {
122     case (SharedRingBuffer::kNoError):
123       return protos::pbzero::ProfilePacket::ProcessHeapSamples::
124           CLIENT_ERROR_NONE;
125     case (SharedRingBuffer::kHitTimeout):
126       return protos::pbzero::ProfilePacket::ProcessHeapSamples::
127           CLIENT_ERROR_HIT_TIMEOUT;
128     case (SharedRingBuffer::kInvalidStackBounds):
129       return protos::pbzero::ProfilePacket::ProcessHeapSamples::
130           CLIENT_ERROR_INVALID_STACK_BOUNDS;
131   }
132 }
133 
134 }  // namespace
135 
HeapprofdConfigToClientConfiguration(const HeapprofdConfig & heapprofd_config,ClientConfiguration * cli_config)136 bool HeapprofdConfigToClientConfiguration(
137     const HeapprofdConfig& heapprofd_config,
138     ClientConfiguration* cli_config) {
139   cli_config->default_interval = heapprofd_config.sampling_interval_bytes();
140   cli_config->block_client = heapprofd_config.block_client();
141   cli_config->disable_fork_teardown = heapprofd_config.disable_fork_teardown();
142   cli_config->disable_vfork_detection =
143       heapprofd_config.disable_vfork_detection();
144   cli_config->block_client_timeout_us =
145       heapprofd_config.block_client_timeout_us();
146   cli_config->all_heaps = heapprofd_config.all_heaps();
147   cli_config->adaptive_sampling_shmem_threshold =
148       heapprofd_config.adaptive_sampling_shmem_threshold();
149   cli_config->adaptive_sampling_max_sampling_interval_bytes =
150       heapprofd_config.adaptive_sampling_max_sampling_interval_bytes();
151   size_t n = 0;
152   const std::vector<std::string>& exclude_heaps = heapprofd_config.exclude_heaps();
153   // heaps[i] and heaps_interval[i] represent that the heap named in heaps[i]
154   // should be sampled with sampling interval of heap_interval[i].
155   std::vector<std::string> heaps = heapprofd_config.heaps();
156   std::vector<uint64_t> heap_intervals =
157       heapprofd_config.heap_sampling_intervals();
158   if (heaps.empty() && !cli_config->all_heaps) {
159     heaps.push_back("libc.malloc");
160   }
161 
162   if (heap_intervals.empty()) {
163     heap_intervals.assign(heaps.size(),
164                           heapprofd_config.sampling_interval_bytes());
165   }
166   if (heap_intervals.size() != heaps.size()) {
167     PERFETTO_ELOG("heap_sampling_intervals and heaps length mismatch.");
168     return false;
169   }
170   if (std::find(heap_intervals.begin(), heap_intervals.end(), 0u) !=
171       heap_intervals.end()) {
172     PERFETTO_ELOG("zero sampling interval.");
173     return false;
174   }
175   if (!exclude_heaps.empty()) {
176     // For disabled heaps, we add explicit entries but with sampling interval
177     // 0. The consumer of the sampling intervals in ClientConfiguration,
178     // GetSamplingInterval in wire_protocol.h, uses 0 to signal a heap is
179     // disabled, either because it isn't enabled (all_heaps is not set, and the
180     // heap isn't named), or because we explicitely set it here.
181     heaps.insert(heaps.end(), exclude_heaps.cbegin(), exclude_heaps.cend());
182     heap_intervals.insert(heap_intervals.end(), exclude_heaps.size(), 0u);
183   }
184   if (heaps.size() > base::ArraySize(cli_config->heaps)) {
185     heaps.resize(base::ArraySize(cli_config->heaps));
186     PERFETTO_ELOG("Too many heaps requested. Truncating.");
187   }
188   for (size_t i = 0; i < heaps.size(); ++i) {
189     const std::string& heap = heaps[i];
190     const uint64_t interval = heap_intervals[i];
191     // -1 for the \0 byte.
192     if (heap.size() > HEAPPROFD_HEAP_NAME_SZ - 1) {
193       PERFETTO_ELOG("Invalid heap name %s (larger than %d)", heap.c_str(),
194                     HEAPPROFD_HEAP_NAME_SZ - 1);
195       continue;
196     }
197     strncpy(&cli_config->heaps[n].name[0], heap.c_str(),
198             sizeof(cli_config->heaps[0].name));
199     cli_config->heaps[n].name[sizeof(cli_config->heaps[0].name) - 1] = '\0';
200     cli_config->heaps[n].interval = interval;
201     n++;
202   }
203   cli_config->num_heaps = n;
204   return true;
205 }
206 
207 // We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
208 // thread.
HeapprofdProducer(HeapprofdMode mode,base::TaskRunner * task_runner,bool exit_when_done)209 HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
210                                      base::TaskRunner* task_runner,
211                                      bool exit_when_done)
212     : task_runner_(task_runner),
213       mode_(mode),
214       exit_when_done_(exit_when_done),
215       unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)),
216       socket_delegate_(this),
217       weak_factory_(this) {
218   CheckDataSourceCpuTask();
219   CheckDataSourceMemoryTask();
220 }
221 
222 HeapprofdProducer::~HeapprofdProducer() = default;
223 
SetTargetProcess(pid_t target_pid,std::string target_cmdline)224 void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
225                                          std::string target_cmdline) {
226   target_process_.pid = target_pid;
227   target_process_.cmdline = target_cmdline;
228 }
229 
SetDataSourceCallback(std::function<void ()> fn)230 void HeapprofdProducer::SetDataSourceCallback(std::function<void()> fn) {
231   data_source_callback_ = fn;
232 }
233 
AdoptSocket(base::ScopedFile fd)234 void HeapprofdProducer::AdoptSocket(base::ScopedFile fd) {
235   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
236   auto socket = base::UnixSocket::AdoptConnected(
237       std::move(fd), &socket_delegate_, task_runner_, base::SockFamily::kUnix,
238       base::SockType::kStream);
239 
240   HandleClientConnection(std::move(socket), target_process_);
241 }
242 
OnConnect()243 void HeapprofdProducer::OnConnect() {
244   PERFETTO_DCHECK(state_ == kConnecting);
245   state_ = kConnected;
246   ResetConnectionBackoff();
247   PERFETTO_LOG("Connected to the service, mode [%s].",
248                mode_ == HeapprofdMode::kCentral ? "central" : "child");
249 
250   DataSourceDescriptor desc;
251   desc.set_name(kHeapprofdDataSource);
252   desc.set_will_notify_on_stop(true);
253   endpoint_->RegisterDataSource(desc);
254 }
255 
OnDisconnect()256 void HeapprofdProducer::OnDisconnect() {
257   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
258   PERFETTO_LOG("Disconnected from tracing service");
259 
260   // Do not attempt to reconnect if we're a process-private process, just quit.
261   if (exit_when_done_) {
262     TerminateProcess(/*exit_status=*/1);  // does not return
263   }
264 
265   // Central mode - attempt to reconnect.
266   auto weak_producer = weak_factory_.GetWeakPtr();
267   if (state_ == kConnected)
268     return task_runner_->PostTask([weak_producer] {
269       if (!weak_producer)
270         return;
271       weak_producer->Restart();
272     });
273 
274   state_ = kNotConnected;
275   IncreaseConnectionBackoff();
276   task_runner_->PostDelayedTask(
277       [weak_producer] {
278         if (!weak_producer)
279           return;
280         weak_producer->ConnectService();
281       },
282       connection_backoff_ms_);
283 }
284 
ConnectWithRetries(const char * socket_name)285 void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
286   PERFETTO_DCHECK(state_ == kNotStarted);
287   state_ = kNotConnected;
288 
289   ResetConnectionBackoff();
290   producer_sock_name_ = socket_name;
291   ConnectService();
292 }
293 
ConnectService()294 void HeapprofdProducer::ConnectService() {
295   SetProducerEndpoint(ProducerIPCClient::Connect(
296       producer_sock_name_, this, "android.heapprofd", task_runner_));
297 }
298 
SetProducerEndpoint(std::unique_ptr<TracingService::ProducerEndpoint> endpoint)299 void HeapprofdProducer::SetProducerEndpoint(
300     std::unique_ptr<TracingService::ProducerEndpoint> endpoint) {
301   PERFETTO_DCHECK(state_ == kNotConnected || state_ == kNotStarted);
302   state_ = kConnecting;
303   endpoint_ = std::move(endpoint);
304 }
305 
IncreaseConnectionBackoff()306 void HeapprofdProducer::IncreaseConnectionBackoff() {
307   connection_backoff_ms_ *= 2;
308   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
309     connection_backoff_ms_ = kMaxConnectionBackoffMs;
310 }
311 
ResetConnectionBackoff()312 void HeapprofdProducer::ResetConnectionBackoff() {
313   connection_backoff_ms_ = kInitialConnectionBackoffMs;
314 }
315 
Restart()316 void HeapprofdProducer::Restart() {
317   // We lost the connection with the tracing service. At this point we need
318   // to reset all the data sources. Trying to handle that manually is going to
319   // be error prone. What we do here is simply destroy the instance and
320   // recreate it again.
321 
322   // Oneshot producer should not attempt restarts.
323   if (exit_when_done_)
324     PERFETTO_FATAL("Attempting to restart a one shot producer.");
325 
326   HeapprofdMode mode = mode_;
327   base::TaskRunner* task_runner = task_runner_;
328   const char* socket_name = producer_sock_name_;
329   const bool exit_when_done = exit_when_done_;
330 
331   // Invoke destructor and then the constructor again.
332   this->~HeapprofdProducer();
333   new (this) HeapprofdProducer(mode, task_runner, exit_when_done);
334 
335   ConnectWithRetries(socket_name);
336 }
337 
ActiveDataSourceWatchdogCheck()338 void HeapprofdProducer::ActiveDataSourceWatchdogCheck() {
339   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
340 
341   // Fork mode heapprofd should be working on exactly one data source matching
342   // its target process.
343   if (data_sources_.empty()) {
344     PERFETTO_LOG(
345         "Child heapprofd exiting as it never received a data source for the "
346         "target process, or somehow lost/finished the task without exiting.");
347     TerminateProcess(/*exit_status=*/1);
348   } else {
349     // reschedule check.
350     auto weak_producer = weak_factory_.GetWeakPtr();
351     task_runner_->PostDelayedTask(
352         [weak_producer]() {
353           if (!weak_producer)
354             return;
355           weak_producer->ActiveDataSourceWatchdogCheck();
356         },
357         kChildModeWatchdogPeriodMs);
358   }
359 }
360 
361 // TODO(rsavitski): would be cleaner to shut down the event loop instead
362 // (letting main exit). One test-friendly approach is to supply a shutdown
363 // callback in the constructor.
TerminateProcess(int exit_status)364 __attribute__((noreturn)) void HeapprofdProducer::TerminateProcess(
365     int exit_status) {
366   PERFETTO_CHECK(mode_ == HeapprofdMode::kChild);
367   PERFETTO_LOG("Shutting down child heapprofd (status %d).", exit_status);
368   exit(exit_status);
369 }
370 
OnTracingSetup()371 void HeapprofdProducer::OnTracingSetup() {}
372 
WriteRejectedConcurrentSession(BufferID buffer_id,pid_t pid)373 void HeapprofdProducer::WriteRejectedConcurrentSession(BufferID buffer_id,
374                                                        pid_t pid) {
375   auto trace_writer = endpoint_->CreateTraceWriter(buffer_id);
376   auto trace_packet = trace_writer->NewTracePacket();
377   trace_packet->set_timestamp(
378       static_cast<uint64_t>(base::GetBootTimeNs().count()));
379   auto profile_packet = trace_packet->set_profile_packet();
380   auto process_dump = profile_packet->add_process_dumps();
381   process_dump->set_pid(static_cast<uint64_t>(pid));
382   process_dump->set_rejected_concurrent(true);
383   trace_packet->Finalize();
384   trace_writer->Flush();
385 }
386 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & ds_config)387 void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
388                                         const DataSourceConfig& ds_config) {
389   if (ds_config.session_initiator() ==
390       DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM) {
391     PERFETTO_LOG("Setting up datasource: statsd initiator.");
392   } else {
393     PERFETTO_LOG("Setting up datasource: non-statsd initiator.");
394   }
395   if (mode_ == HeapprofdMode::kChild && ds_config.enable_extra_guardrails()) {
396     PERFETTO_ELOG("enable_extra_guardrails is not supported on user.");
397     return;
398   }
399 
400   HeapprofdConfig heapprofd_config;
401   heapprofd_config.ParseFromString(ds_config.heapprofd_config_raw());
402 
403   if (heapprofd_config.all() && !heapprofd_config.pid().empty())
404     PERFETTO_ELOG("No point setting all and pid");
405   if (heapprofd_config.all() && !heapprofd_config.process_cmdline().empty())
406     PERFETTO_ELOG("No point setting all and process_cmdline");
407 
408   if (ds_config.name() != kHeapprofdDataSource) {
409     PERFETTO_DLOG("Invalid data source name.");
410     return;
411   }
412 
413   if (data_sources_.find(id) != data_sources_.end()) {
414     PERFETTO_DFATAL_OR_ELOG(
415         "Received duplicated data source instance id: %" PRIu64, id);
416     return;
417   }
418 
419   base::Optional<std::vector<std::string>> normalized_cmdlines =
420       NormalizeCmdlines(heapprofd_config.process_cmdline());
421   if (!normalized_cmdlines.has_value()) {
422     PERFETTO_ELOG("Rejecting data source due to invalid cmdline in config.");
423     return;
424   }
425 
426   // Child mode is only interested in the first data source matching the
427   // already-connected process.
428   if (mode_ == HeapprofdMode::kChild) {
429     if (!ConfigTargetsProcess(heapprofd_config, target_process_,
430                               normalized_cmdlines.value())) {
431       PERFETTO_DLOG("Child mode skipping setup of unrelated data source.");
432       return;
433     }
434 
435     if (!data_sources_.empty()) {
436       PERFETTO_LOG("Child mode skipping concurrent data source.");
437 
438       // Manually write one ProfilePacket about the rejected session.
439       auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
440       WriteRejectedConcurrentSession(buffer_id, target_process_.pid);
441       return;
442     }
443   }
444 
445   base::Optional<uint64_t> start_cputime_sec;
446   if (heapprofd_config.max_heapprofd_cpu_secs() > 0) {
447     start_cputime_sec = GetCputimeSecForCurrentProcess();
448 
449     if (!start_cputime_sec) {
450       PERFETTO_ELOG("Failed to enforce CPU guardrail. Rejecting config.");
451       return;
452     }
453   }
454 
455   auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
456   DataSource data_source(endpoint_->CreateTraceWriter(buffer_id));
457   data_source.id = id;
458   auto& cli_config = data_source.client_configuration;
459   if (!HeapprofdConfigToClientConfiguration(heapprofd_config, &cli_config))
460     return;
461   data_source.config = heapprofd_config;
462   data_source.ds_config = ds_config;
463   data_source.normalized_cmdlines = std::move(normalized_cmdlines.value());
464   data_source.stop_timeout_ms = ds_config.stop_timeout_ms()
465                                     ? ds_config.stop_timeout_ms()
466                                     : 5000 /* kDataSourceStopTimeoutMs */;
467   data_source.guardrail_config.cpu_start_secs = start_cputime_sec;
468   data_source.guardrail_config.memory_guardrail_kb =
469       heapprofd_config.max_heapprofd_memory_kb();
470   data_source.guardrail_config.cpu_guardrail_sec =
471       heapprofd_config.max_heapprofd_cpu_secs();
472 
473   InterningOutputTracker::WriteFixedInterningsPacket(
474       data_source.trace_writer.get(),
475       protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
476   data_sources_.emplace(id, std::move(data_source));
477   PERFETTO_DLOG("Set up data source.");
478 
479   if (mode_ == HeapprofdMode::kChild && data_source_callback_)
480     (*data_source_callback_)();
481 }
482 
IsPidProfiled(pid_t pid)483 bool HeapprofdProducer::IsPidProfiled(pid_t pid) {
484   return std::any_of(
485       data_sources_.cbegin(), data_sources_.cend(),
486       [pid](const std::pair<const DataSourceInstanceID, DataSource>& p) {
487         const DataSource& ds = p.second;
488         return ds.process_states.count(pid) > 0;
489       });
490 }
491 
SetStartupProperties(DataSource * data_source)492 void HeapprofdProducer::SetStartupProperties(DataSource* data_source) {
493   const HeapprofdConfig& heapprofd_config = data_source->config;
494   if (heapprofd_config.all())
495     data_source->properties.emplace_back(properties_.SetAll());
496 
497   for (std::string cmdline : data_source->normalized_cmdlines)
498     data_source->properties.emplace_back(
499         properties_.SetProperty(std::move(cmdline)));
500 }
501 
SignalRunningProcesses(DataSource * data_source)502 void HeapprofdProducer::SignalRunningProcesses(DataSource* data_source) {
503   const HeapprofdConfig& heapprofd_config = data_source->config;
504 
505   std::set<pid_t> pids;
506   if (heapprofd_config.all())
507     FindAllProfilablePids(&pids);
508   for (uint64_t pid : heapprofd_config.pid())
509     pids.emplace(static_cast<pid_t>(pid));
510 
511   if (!data_source->normalized_cmdlines.empty())
512     FindPidsForCmdlines(data_source->normalized_cmdlines, &pids);
513 
514   if (heapprofd_config.min_anonymous_memory_kb() > 0)
515     RemoveUnderAnonThreshold(heapprofd_config.min_anonymous_memory_kb(), &pids);
516 
517   for (auto pid_it = pids.cbegin(); pid_it != pids.cend();) {
518     pid_t pid = *pid_it;
519     if (IsPidProfiled(pid)) {
520       PERFETTO_LOG("Rejecting concurrent session for %" PRIdMAX,
521                    static_cast<intmax_t>(pid));
522       data_source->rejected_pids.emplace(pid);
523       pid_it = pids.erase(pid_it);
524       continue;
525     }
526 
527     PERFETTO_DLOG("Sending signal: %d (si_value: %d) to pid: %d",
528                   kProfilingSignal, kHeapprofdSignalValue, pid);
529     union sigval signal_value;
530     signal_value.sival_int = kHeapprofdSignalValue;
531     if (sigqueue(pid, kProfilingSignal, signal_value) != 0) {
532       PERFETTO_DPLOG("sigqueue");
533     }
534     ++pid_it;
535   }
536   data_source->signaled_pids = std::move(pids);
537 }
538 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)539 void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
540                                         const DataSourceConfig&) {
541   PERFETTO_DLOG("Starting data source %" PRIu64, id);
542 
543   auto it = data_sources_.find(id);
544   if (it == data_sources_.end()) {
545     // This is expected in child heapprofd, where we reject uninteresting data
546     // sources in SetupDataSource.
547     if (mode_ == HeapprofdMode::kCentral) {
548       PERFETTO_DFATAL_OR_ELOG(
549           "Received invalid data source instance to start: %" PRIu64, id);
550     }
551     return;
552   }
553 
554   DataSource& data_source = it->second;
555   if (data_source.started) {
556     PERFETTO_DFATAL_OR_ELOG(
557         "Trying to start already started data-source: %" PRIu64, id);
558     return;
559   }
560   const HeapprofdConfig& heapprofd_config = data_source.config;
561 
562   // Central daemon - set system properties for any targets that start later,
563   // and signal already-running targets to start the profiling client.
564   if (mode_ == HeapprofdMode::kCentral) {
565     if (!heapprofd_config.no_startup())
566       SetStartupProperties(&data_source);
567     if (!heapprofd_config.no_running())
568       SignalRunningProcesses(&data_source);
569   }
570 
571   const auto continuous_dump_config = heapprofd_config.continuous_dump_config();
572   uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
573   if (dump_interval) {
574     auto weak_producer = weak_factory_.GetWeakPtr();
575     task_runner_->PostDelayedTask(
576         [weak_producer, id, dump_interval] {
577           if (!weak_producer)
578             return;
579           weak_producer->DoContinuousDump(id, dump_interval);
580         },
581         continuous_dump_config.dump_phase_ms());
582   }
583   data_source.started = true;
584   PERFETTO_DLOG("Started DataSource");
585 }
586 
UnwinderForPID(pid_t pid)587 UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
588   return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
589 }
590 
StopDataSource(DataSourceInstanceID id)591 void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
592   auto it = data_sources_.find(id);
593   if (it == data_sources_.end()) {
594     endpoint_->NotifyDataSourceStopped(id);
595     if (mode_ == HeapprofdMode::kCentral)
596       PERFETTO_DFATAL_OR_ELOG(
597           "Trying to stop non existing data source: %" PRIu64, id);
598     return;
599   }
600 
601   PERFETTO_DLOG("Stopping data source %" PRIu64, id);
602 
603   DataSource& data_source = it->second;
604   data_source.was_stopped = true;
605   ShutdownDataSource(&data_source);
606 }
607 
ShutdownDataSource(DataSource * data_source)608 void HeapprofdProducer::ShutdownDataSource(DataSource* data_source) {
609   data_source->shutting_down = true;
610   // If no processes connected, or all of them have already disconnected
611   // (and have been dumped) and no PIDs have been rejected,
612   // MaybeFinishDataSource can tear down the data source.
613   if (MaybeFinishDataSource(data_source))
614     return;
615 
616   if (!data_source->rejected_pids.empty()) {
617     auto trace_packet = data_source->trace_writer->NewTracePacket();
618     ProfilePacket* profile_packet = trace_packet->set_profile_packet();
619     for (pid_t rejected_pid : data_source->rejected_pids) {
620       ProfilePacket::ProcessHeapSamples* proto =
621           profile_packet->add_process_dumps();
622       proto->set_pid(static_cast<uint64_t>(rejected_pid));
623       proto->set_rejected_concurrent(true);
624     }
625     trace_packet->Finalize();
626     data_source->rejected_pids.clear();
627     if (MaybeFinishDataSource(data_source))
628       return;
629   }
630 
631   for (const auto& pid_and_process_state : data_source->process_states) {
632     pid_t pid = pid_and_process_state.first;
633     UnwinderForPID(pid).PostDisconnectSocket(pid);
634   }
635 
636   auto id = data_source->id;
637   auto weak_producer = weak_factory_.GetWeakPtr();
638   task_runner_->PostDelayedTask(
639       [weak_producer, id] {
640         if (!weak_producer)
641           return;
642         auto ds_it = weak_producer->data_sources_.find(id);
643         if (ds_it != weak_producer->data_sources_.end()) {
644           PERFETTO_ELOG("Final dump timed out.");
645           DataSource& ds = ds_it->second;
646           // Do not dump any stragglers, just trigger the Flush and tear down
647           // the data source.
648           ds.process_states.clear();
649           ds.rejected_pids.clear();
650           PERFETTO_CHECK(weak_producer->MaybeFinishDataSource(&ds));
651         }
652       },
653       data_source->stop_timeout_ms);
654 }
655 
DoContinuousDump(DataSourceInstanceID id,uint32_t dump_interval)656 void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
657                                          uint32_t dump_interval) {
658   auto it = data_sources_.find(id);
659   if (it == data_sources_.end())
660     return;
661   DataSource& data_source = it->second;
662   DumpProcessesInDataSource(&data_source);
663   auto weak_producer = weak_factory_.GetWeakPtr();
664   task_runner_->PostDelayedTask(
665       [weak_producer, id, dump_interval] {
666         if (!weak_producer)
667           return;
668         weak_producer->DoContinuousDump(id, dump_interval);
669       },
670       dump_interval);
671 }
672 
673 // static
SetStats(protos::pbzero::ProfilePacket::ProcessStats * stats,const ProcessState & process_state)674 void HeapprofdProducer::SetStats(
675     protos::pbzero::ProfilePacket::ProcessStats* stats,
676     const ProcessState& process_state) {
677   stats->set_unwinding_errors(process_state.unwinding_errors);
678   stats->set_heap_samples(process_state.heap_samples);
679   stats->set_map_reparses(process_state.map_reparses);
680   stats->set_total_unwinding_time_us(process_state.total_unwinding_time_us);
681   stats->set_client_spinlock_blocked_us(
682       process_state.client_spinlock_blocked_us);
683   auto* unwinding_hist = stats->set_unwinding_time_us();
684   for (const auto& p : process_state.unwinding_time_us.GetData()) {
685     auto* bucket = unwinding_hist->add_buckets();
686     if (p.first == LogHistogram::kMaxBucket)
687       bucket->set_max_bucket(true);
688     else
689       bucket->set_upper_limit(p.first);
690     bucket->set_count(p.second);
691   }
692 }
693 
DumpProcessState(DataSource * data_source,pid_t pid,ProcessState * process_state)694 void HeapprofdProducer::DumpProcessState(DataSource* data_source,
695                                          pid_t pid,
696                                          ProcessState* process_state) {
697   for (auto& heap_id_and_heap_info : process_state->heap_infos) {
698     ProcessState::HeapInfo& heap_info = heap_id_and_heap_info.second;
699 
700     bool from_startup = data_source->signaled_pids.find(pid) ==
701                         data_source->signaled_pids.cend();
702     uint64_t dump_timestamp;
703     if (data_source->config.dump_at_max())
704       dump_timestamp = heap_info.heap_tracker.max_timestamp();
705     else
706       dump_timestamp = heap_info.heap_tracker.committed_timestamp();
707 
708     const char* heap_name = nullptr;
709     if (!heap_info.heap_name.empty())
710       heap_name = heap_info.heap_name.c_str();
711     uint64_t sampling_interval = heap_info.sampling_interval;
712     uint64_t orig_sampling_interval = heap_info.orig_sampling_interval;
713 
714     auto new_heapsamples =
715         [pid, from_startup, dump_timestamp, process_state, data_source,
716          heap_name, sampling_interval,
717          orig_sampling_interval](ProfilePacket::ProcessHeapSamples* proto) {
718           proto->set_pid(static_cast<uint64_t>(pid));
719           proto->set_timestamp(dump_timestamp);
720           proto->set_from_startup(from_startup);
721           proto->set_disconnected(process_state->disconnected);
722           proto->set_buffer_overran(process_state->error_state ==
723                                     SharedRingBuffer::kHitTimeout);
724           proto->set_client_error(
725               ErrorStateToProto(process_state->error_state));
726           proto->set_buffer_corrupted(process_state->buffer_corrupted);
727           proto->set_hit_guardrail(data_source->hit_guardrail);
728           if (heap_name)
729             proto->set_heap_name(heap_name);
730           proto->set_sampling_interval_bytes(sampling_interval);
731           proto->set_orig_sampling_interval_bytes(orig_sampling_interval);
732           auto* stats = proto->set_stats();
733           SetStats(stats, *process_state);
734         };
735 
736     DumpState dump_state(data_source->trace_writer.get(),
737                          std::move(new_heapsamples),
738                          &data_source->intern_state);
739 
740     heap_info.heap_tracker.GetCallstackAllocations(
741         [&dump_state,
742          &data_source](const HeapTracker::CallstackAllocations& alloc) {
743           dump_state.WriteAllocation(alloc, data_source->config.dump_at_max());
744         });
745     dump_state.DumpCallstacks(&callsites_);
746   }
747 }
748 
DumpProcessesInDataSource(DataSource * ds)749 void HeapprofdProducer::DumpProcessesInDataSource(DataSource* ds) {
750   for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
751        ds->process_states) {
752     pid_t pid = pid_and_process_state.first;
753     ProcessState& process_state = pid_and_process_state.second;
754     DumpProcessState(ds, pid, &process_state);
755   }
756 }
757 
DumpAll()758 void HeapprofdProducer::DumpAll() {
759   PERFETTO_LOG("Received signal. Dumping all data sources.");
760   for (auto& id_and_data_source : data_sources_)
761     DumpProcessesInDataSource(&id_and_data_source.second);
762 }
763 
Flush(FlushRequestID flush_id,const DataSourceInstanceID * ids,size_t num_ids)764 void HeapprofdProducer::Flush(FlushRequestID flush_id,
765                               const DataSourceInstanceID* ids,
766                               size_t num_ids) {
767   size_t& flush_in_progress = flushes_in_progress_[flush_id];
768   PERFETTO_DCHECK(flush_in_progress == 0);
769   flush_in_progress = num_ids;
770   for (size_t i = 0; i < num_ids; ++i) {
771     auto it = data_sources_.find(ids[i]);
772     if (it == data_sources_.end()) {
773       PERFETTO_DFATAL_OR_ELOG("Trying to flush unknown data-source %" PRIu64,
774                               ids[i]);
775       flush_in_progress--;
776       continue;
777     }
778     DataSource& data_source = it->second;
779     auto weak_producer = weak_factory_.GetWeakPtr();
780 
781     auto callback = [weak_producer, flush_id] {
782       if (weak_producer)
783         // Reposting because this task runner could be on a different thread
784         // than the IPC task runner.
785         return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
786           if (weak_producer)
787             return weak_producer->FinishDataSourceFlush(flush_id);
788         });
789     };
790     data_source.trace_writer->Flush(std::move(callback));
791   }
792   if (flush_in_progress == 0) {
793     endpoint_->NotifyFlushComplete(flush_id);
794     flushes_in_progress_.erase(flush_id);
795   }
796 }
797 
FinishDataSourceFlush(FlushRequestID flush_id)798 void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
799   auto it = flushes_in_progress_.find(flush_id);
800   if (it == flushes_in_progress_.end()) {
801     PERFETTO_DFATAL_OR_ELOG("FinishDataSourceFlush id invalid: %" PRIu64,
802                             flush_id);
803     return;
804   }
805   size_t& flush_in_progress = it->second;
806   if (--flush_in_progress == 0) {
807     endpoint_->NotifyFlushComplete(flush_id);
808     flushes_in_progress_.erase(flush_id);
809   }
810 }
811 
OnDisconnect(base::UnixSocket * self)812 void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
813   auto it = producer_->pending_processes_.find(self->peer_pid_linux());
814   if (it == producer_->pending_processes_.end()) {
815     PERFETTO_DFATAL_OR_ELOG("Unexpected disconnect.");
816     return;
817   }
818 
819   if (self == it->second.sock.get())
820     producer_->pending_processes_.erase(it);
821 }
822 
OnNewIncomingConnection(base::UnixSocket *,std::unique_ptr<base::UnixSocket> new_connection)823 void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
824     base::UnixSocket*,
825     std::unique_ptr<base::UnixSocket> new_connection) {
826   Process peer_process;
827   peer_process.pid = new_connection->peer_pid_linux();
828   if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
829     PERFETTO_PLOG("Failed to get cmdline for %d", peer_process.pid);
830 
831   producer_->HandleClientConnection(std::move(new_connection), peer_process);
832 }
833 
OnDataAvailable(base::UnixSocket * self)834 void HeapprofdProducer::SocketDelegate::OnDataAvailable(
835     base::UnixSocket* self) {
836   auto it = producer_->pending_processes_.find(self->peer_pid_linux());
837   if (it == producer_->pending_processes_.end()) {
838     PERFETTO_DFATAL_OR_ELOG("Unexpected data.");
839     return;
840   }
841 
842   PendingProcess& pending_process = it->second;
843 
844   base::ScopedFile fds[kHandshakeSize];
845   char buf[1];
846   self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
847 
848   static_assert(kHandshakeSize == 2, "change if and else if below.");
849   if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
850     auto ds_it =
851         producer_->data_sources_.find(pending_process.data_source_instance_id);
852     if (ds_it == producer_->data_sources_.end()) {
853       producer_->pending_processes_.erase(it);
854       return;
855     }
856     DataSource& data_source = ds_it->second;
857 
858     if (data_source.shutting_down) {
859       producer_->pending_processes_.erase(it);
860       PERFETTO_LOG("Got handshake for DS that is shutting down. Rejecting.");
861       return;
862     }
863 
864     std::string maps_file =
865         "/proc/" + std::to_string(self->peer_pid_linux()) + "/maps";
866     if (!IsFile(*fds[kHandshakeMaps], maps_file.c_str())) {
867       producer_->pending_processes_.erase(it);
868       PERFETTO_ELOG("Received invalid maps FD.");
869       return;
870     }
871 
872     std::string mem_file =
873         "/proc/" + std::to_string(self->peer_pid_linux()) + "/mem";
874     if (!IsFile(*fds[kHandshakeMem], mem_file.c_str())) {
875       producer_->pending_processes_.erase(it);
876       PERFETTO_ELOG("Received invalid mem FD.");
877       return;
878     }
879 
880     data_source.process_states.emplace(
881         std::piecewise_construct, std::forward_as_tuple(self->peer_pid_linux()),
882         std::forward_as_tuple(&producer_->callsites_,
883                               data_source.config.dump_at_max()));
884 
885     PERFETTO_DLOG("%d: Received FDs.", self->peer_pid_linux());
886     int raw_fd = pending_process.shmem.fd();
887     // TODO(fmayer): Full buffer could deadlock us here.
888     if (!self->Send(&data_source.client_configuration,
889                     sizeof(data_source.client_configuration), &raw_fd, 1)) {
890       // If Send fails, the socket will have been Shutdown, and the raw socket
891       // closed.
892       producer_->pending_processes_.erase(it);
893       return;
894     }
895 
896     UnwindingWorker::HandoffData handoff_data;
897     handoff_data.data_source_instance_id =
898         pending_process.data_source_instance_id;
899     handoff_data.sock = self->ReleaseSocket();
900     handoff_data.maps_fd = std::move(fds[kHandshakeMaps]);
901     handoff_data.mem_fd = std::move(fds[kHandshakeMem]);
902     handoff_data.shmem = std::move(pending_process.shmem);
903     handoff_data.client_config = data_source.client_configuration;
904     handoff_data.stream_allocations = data_source.config.stream_allocations();
905 
906     producer_->UnwinderForPID(self->peer_pid_linux())
907         .PostHandoffSocket(std::move(handoff_data));
908     producer_->pending_processes_.erase(it);
909   } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
910     PERFETTO_DFATAL_OR_ELOG("%d: Received partial FDs.",
911                             self->peer_pid_linux());
912     producer_->pending_processes_.erase(it);
913   } else {
914     PERFETTO_ELOG("%d: Received no FDs.", self->peer_pid_linux());
915   }
916 }
917 
GetDataSourceForProcess(const Process & proc)918 HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
919     const Process& proc) {
920   for (auto& ds_id_and_datasource : data_sources_) {
921     DataSource& ds = ds_id_and_datasource.second;
922     if (ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
923       return &ds;
924   }
925   return nullptr;
926 }
927 
RecordOtherSourcesAsRejected(DataSource * active_ds,const Process & proc)928 void HeapprofdProducer::RecordOtherSourcesAsRejected(DataSource* active_ds,
929                                                      const Process& proc) {
930   for (auto& ds_id_and_datasource : data_sources_) {
931     DataSource& ds = ds_id_and_datasource.second;
932     if (&ds != active_ds &&
933         ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
934       ds.rejected_pids.emplace(proc.pid);
935   }
936 }
937 
HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,Process process)938 void HeapprofdProducer::HandleClientConnection(
939     std::unique_ptr<base::UnixSocket> new_connection,
940     Process process) {
941   DataSource* data_source = GetDataSourceForProcess(process);
942   if (!data_source) {
943     PERFETTO_LOG("No data source found.");
944     return;
945   }
946   RecordOtherSourcesAsRejected(data_source, process);
947 
948   // In fork mode, right now we check whether the target is not profileable
949   // in the client, because we cannot read packages.list there.
950   if (mode_ == HeapprofdMode::kCentral &&
951       !CanProfile(data_source->ds_config, new_connection->peer_uid_posix(),
952                   data_source->config.target_installed_by())) {
953     PERFETTO_ELOG("%d (%s) is not profileable.", process.pid,
954                   process.cmdline.c_str());
955     return;
956   }
957 
958   uint64_t shmem_size = data_source->config.shmem_size_bytes();
959   if (!shmem_size)
960     shmem_size = kDefaultShmemSize;
961   if (shmem_size > kMaxShmemSize) {
962     PERFETTO_LOG("Specified shared memory size of %" PRIu64
963                  " exceeds maximum size of %" PRIu64 ". Reducing.",
964                  shmem_size, kMaxShmemSize);
965     shmem_size = kMaxShmemSize;
966   }
967 
968   auto shmem = SharedRingBuffer::Create(static_cast<size_t>(shmem_size));
969   if (!shmem || !shmem->is_valid()) {
970     PERFETTO_LOG("Failed to create shared memory.");
971     return;
972   }
973 
974   pid_t peer_pid = new_connection->peer_pid_linux();
975   if (peer_pid != process.pid) {
976     PERFETTO_DFATAL_OR_ELOG("Invalid PID connected.");
977     return;
978   }
979 
980   PendingProcess pending_process;
981   pending_process.sock = std::move(new_connection);
982   pending_process.data_source_instance_id = data_source->id;
983   pending_process.shmem = std::move(*shmem);
984   pending_processes_.emplace(peer_pid, std::move(pending_process));
985 }
986 
PostAllocRecord(UnwindingWorker * worker,std::unique_ptr<AllocRecord> alloc_rec)987 void HeapprofdProducer::PostAllocRecord(
988     UnwindingWorker* worker,
989     std::unique_ptr<AllocRecord> alloc_rec) {
990   // Once we can use C++14, this should be std::moved into the lambda instead.
991   auto* raw_alloc_rec = alloc_rec.release();
992   auto weak_this = weak_factory_.GetWeakPtr();
993   task_runner_->PostTask([weak_this, raw_alloc_rec, worker] {
994     std::unique_ptr<AllocRecord> unique_alloc_ref =
995         std::unique_ptr<AllocRecord>(raw_alloc_rec);
996     if (weak_this) {
997       weak_this->HandleAllocRecord(unique_alloc_ref.get());
998       worker->ReturnAllocRecord(std::move(unique_alloc_ref));
999     }
1000   });
1001 }
1002 
PostFreeRecord(UnwindingWorker *,std::vector<FreeRecord> free_recs)1003 void HeapprofdProducer::PostFreeRecord(UnwindingWorker*,
1004                                        std::vector<FreeRecord> free_recs) {
1005   // Once we can use C++14, this should be std::moved into the lambda instead.
1006   std::vector<FreeRecord>* raw_free_recs =
1007       new std::vector<FreeRecord>(std::move(free_recs));
1008   auto weak_this = weak_factory_.GetWeakPtr();
1009   task_runner_->PostTask([weak_this, raw_free_recs] {
1010     if (weak_this) {
1011       for (FreeRecord& free_rec : *raw_free_recs)
1012         weak_this->HandleFreeRecord(std::move(free_rec));
1013     }
1014     delete raw_free_recs;
1015   });
1016 }
1017 
PostHeapNameRecord(UnwindingWorker *,HeapNameRecord rec)1018 void HeapprofdProducer::PostHeapNameRecord(UnwindingWorker*,
1019                                            HeapNameRecord rec) {
1020   auto weak_this = weak_factory_.GetWeakPtr();
1021   task_runner_->PostTask([weak_this, rec] {
1022     if (weak_this)
1023       weak_this->HandleHeapNameRecord(rec);
1024   });
1025 }
1026 
PostSocketDisconnected(UnwindingWorker *,DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)1027 void HeapprofdProducer::PostSocketDisconnected(UnwindingWorker*,
1028                                                DataSourceInstanceID ds_id,
1029                                                pid_t pid,
1030                                                SharedRingBuffer::Stats stats) {
1031   auto weak_this = weak_factory_.GetWeakPtr();
1032   task_runner_->PostTask([weak_this, ds_id, pid, stats] {
1033     if (weak_this)
1034       weak_this->HandleSocketDisconnected(ds_id, pid, stats);
1035   });
1036 }
1037 
HandleAllocRecord(AllocRecord * alloc_rec)1038 void HeapprofdProducer::HandleAllocRecord(AllocRecord* alloc_rec) {
1039   const AllocMetadata& alloc_metadata = alloc_rec->alloc_metadata;
1040   auto it = data_sources_.find(alloc_rec->data_source_instance_id);
1041   if (it == data_sources_.end()) {
1042     PERFETTO_LOG("Invalid data source in alloc record.");
1043     return;
1044   }
1045 
1046   DataSource& ds = it->second;
1047   auto process_state_it = ds.process_states.find(alloc_rec->pid);
1048   if (process_state_it == ds.process_states.end()) {
1049     PERFETTO_LOG("Invalid PID in alloc record.");
1050     return;
1051   }
1052 
1053   if (ds.config.stream_allocations()) {
1054     auto packet = ds.trace_writer->NewTracePacket();
1055     auto* streaming_alloc = packet->set_streaming_allocation();
1056     streaming_alloc->add_address(alloc_metadata.alloc_address);
1057     streaming_alloc->add_size(alloc_metadata.alloc_size);
1058     streaming_alloc->add_sample_size(alloc_metadata.sample_size);
1059     streaming_alloc->add_clock_monotonic_coarse_timestamp(
1060         alloc_metadata.clock_monotonic_coarse_timestamp);
1061     streaming_alloc->add_heap_id(alloc_metadata.heap_id);
1062     streaming_alloc->add_sequence_number(alloc_metadata.sequence_number);
1063     return;
1064   }
1065 
1066   const auto& prefixes = ds.config.skip_symbol_prefix();
1067   if (!prefixes.empty()) {
1068     for (unwindstack::FrameData& frame_data : alloc_rec->frames) {
1069       const std::string& map = frame_data.map_name;
1070       if (std::find_if(prefixes.cbegin(), prefixes.cend(),
1071                        [&map](const std::string& prefix) {
1072                          return base::StartsWith(map, prefix);
1073                        }) != prefixes.cend()) {
1074         frame_data.function_name = "FILTERED";
1075       }
1076     }
1077   }
1078 
1079   ProcessState& process_state = process_state_it->second;
1080   HeapTracker& heap_tracker =
1081       process_state.GetHeapTracker(alloc_rec->alloc_metadata.heap_id);
1082 
1083   if (alloc_rec->error)
1084     process_state.unwinding_errors++;
1085   if (alloc_rec->reparsed_map)
1086     process_state.map_reparses++;
1087   process_state.heap_samples++;
1088   process_state.unwinding_time_us.Add(alloc_rec->unwinding_time_us);
1089   process_state.total_unwinding_time_us += alloc_rec->unwinding_time_us;
1090 
1091   // abspc may no longer refer to the same functions, as we had to reparse
1092   // maps. Reset the cache.
1093   if (alloc_rec->reparsed_map)
1094     heap_tracker.ClearFrameCache();
1095 
1096   heap_tracker.RecordMalloc(
1097       alloc_rec->frames, alloc_rec->build_ids, alloc_metadata.alloc_address,
1098       alloc_metadata.sample_size, alloc_metadata.alloc_size,
1099       alloc_metadata.sequence_number,
1100       alloc_metadata.clock_monotonic_coarse_timestamp);
1101 }
1102 
HandleFreeRecord(FreeRecord free_rec)1103 void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
1104   auto it = data_sources_.find(free_rec.data_source_instance_id);
1105   if (it == data_sources_.end()) {
1106     PERFETTO_LOG("Invalid data source in free record.");
1107     return;
1108   }
1109 
1110   DataSource& ds = it->second;
1111   auto process_state_it = ds.process_states.find(free_rec.pid);
1112   if (process_state_it == ds.process_states.end()) {
1113     PERFETTO_LOG("Invalid PID in free record.");
1114     return;
1115   }
1116 
1117   if (ds.config.stream_allocations()) {
1118     auto packet = ds.trace_writer->NewTracePacket();
1119     auto* streaming_free = packet->set_streaming_free();
1120     streaming_free->add_address(free_rec.entry.addr);
1121     streaming_free->add_heap_id(free_rec.entry.heap_id);
1122     streaming_free->add_sequence_number(free_rec.entry.sequence_number);
1123     return;
1124   }
1125 
1126   ProcessState& process_state = process_state_it->second;
1127 
1128   const FreeEntry& entry = free_rec.entry;
1129   HeapTracker& heap_tracker = process_state.GetHeapTracker(entry.heap_id);
1130   heap_tracker.RecordFree(entry.addr, entry.sequence_number, 0);
1131 }
1132 
HandleHeapNameRecord(HeapNameRecord rec)1133 void HeapprofdProducer::HandleHeapNameRecord(HeapNameRecord rec) {
1134   auto it = data_sources_.find(rec.data_source_instance_id);
1135   if (it == data_sources_.end()) {
1136     PERFETTO_LOG("Invalid data source in free record.");
1137     return;
1138   }
1139 
1140   DataSource& ds = it->second;
1141   auto process_state_it = ds.process_states.find(rec.pid);
1142   if (process_state_it == ds.process_states.end()) {
1143     PERFETTO_LOG("Invalid PID in free record.");
1144     return;
1145   }
1146 
1147   ProcessState& process_state = process_state_it->second;
1148   const HeapName& entry = rec.entry;
1149   if (entry.heap_name[0] != '\0') {
1150     std::string heap_name = entry.heap_name;
1151     if (entry.heap_id == 0) {
1152       PERFETTO_ELOG("Invalid zero heap ID.");
1153       return;
1154     }
1155     ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1156     if (!hi.heap_name.empty() && hi.heap_name != heap_name) {
1157       PERFETTO_ELOG("Overriding heap name %s with %s", hi.heap_name.c_str(),
1158                     heap_name.c_str());
1159     }
1160     hi.heap_name = entry.heap_name;
1161   }
1162   if (entry.sample_interval != 0) {
1163     ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1164     if (!hi.sampling_interval)
1165       hi.orig_sampling_interval = entry.sample_interval;
1166     hi.sampling_interval = entry.sample_interval;
1167   }
1168 }
1169 
TerminateWhenDone()1170 void HeapprofdProducer::TerminateWhenDone() {
1171   if (data_sources_.empty())
1172     TerminateProcess(0);
1173   exit_when_done_ = true;
1174 }
1175 
MaybeFinishDataSource(DataSource * ds)1176 bool HeapprofdProducer::MaybeFinishDataSource(DataSource* ds) {
1177   if (!ds->process_states.empty() || !ds->rejected_pids.empty() ||
1178       !ds->shutting_down) {
1179     return false;
1180   }
1181 
1182   bool was_stopped = ds->was_stopped;
1183   DataSourceInstanceID ds_id = ds->id;
1184   auto weak_producer = weak_factory_.GetWeakPtr();
1185   bool exit_when_done = exit_when_done_;
1186   ds->trace_writer->Flush([weak_producer, exit_when_done, ds_id, was_stopped] {
1187     if (!weak_producer)
1188       return;
1189 
1190     if (was_stopped)
1191       weak_producer->endpoint_->NotifyDataSourceStopped(ds_id);
1192     weak_producer->data_sources_.erase(ds_id);
1193 
1194     if (exit_when_done) {
1195       // Post this as a task to allow NotifyDataSourceStopped to post tasks.
1196       weak_producer->task_runner_->PostTask([weak_producer] {
1197         if (!weak_producer)
1198           return;
1199         weak_producer->TerminateProcess(
1200             /*exit_status=*/0);  // does not return
1201       });
1202     }
1203   });
1204   return true;
1205 }
1206 
HandleSocketDisconnected(DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)1207 void HeapprofdProducer::HandleSocketDisconnected(
1208     DataSourceInstanceID ds_id,
1209     pid_t pid,
1210     SharedRingBuffer::Stats stats) {
1211   auto it = data_sources_.find(ds_id);
1212   if (it == data_sources_.end())
1213     return;
1214   DataSource& ds = it->second;
1215 
1216   auto process_state_it = ds.process_states.find(pid);
1217   if (process_state_it == ds.process_states.end()) {
1218     PERFETTO_ELOG("Unexpected disconnect from %d", pid);
1219     return;
1220   }
1221 
1222   PERFETTO_LOG("%d disconnected from heapprofd (ds shutting down: %d).", pid,
1223                ds.shutting_down);
1224 
1225   ProcessState& process_state = process_state_it->second;
1226   process_state.disconnected = !ds.shutting_down;
1227   process_state.error_state = stats.error_state;
1228   process_state.client_spinlock_blocked_us = stats.client_spinlock_blocked_us;
1229   process_state.buffer_corrupted =
1230       stats.num_writes_corrupt > 0 || stats.num_reads_corrupt > 0;
1231 
1232   DumpProcessState(&ds, pid, &process_state);
1233   ds.process_states.erase(pid);
1234   MaybeFinishDataSource(&ds);
1235 }
1236 
CheckDataSourceCpuTask()1237 void HeapprofdProducer::CheckDataSourceCpuTask() {
1238   auto weak_producer = weak_factory_.GetWeakPtr();
1239   task_runner_->PostDelayedTask(
1240       [weak_producer] {
1241         if (!weak_producer)
1242           return;
1243         weak_producer->CheckDataSourceCpuTask();
1244       },
1245       kGuardrailIntervalMs);
1246 
1247   ProfilerCpuGuardrails gr;
1248   for (auto& p : data_sources_) {
1249     DataSource& ds = p.second;
1250     if (gr.IsOverCpuThreshold(ds.guardrail_config)) {
1251       ds.hit_guardrail = true;
1252       ShutdownDataSource(&ds);
1253     }
1254   }
1255 }
1256 
CheckDataSourceMemoryTask()1257 void HeapprofdProducer::CheckDataSourceMemoryTask() {
1258   auto weak_producer = weak_factory_.GetWeakPtr();
1259   task_runner_->PostDelayedTask(
1260       [weak_producer] {
1261         if (!weak_producer)
1262           return;
1263         weak_producer->CheckDataSourceMemoryTask();
1264       },
1265       kGuardrailIntervalMs);
1266   ProfilerMemoryGuardrails gr;
1267   for (auto& p : data_sources_) {
1268     DataSource& ds = p.second;
1269     if (gr.IsOverMemoryThreshold(ds.guardrail_config)) {
1270       ds.hit_guardrail = true;
1271       ShutdownDataSource(&ds);
1272     }
1273   }
1274 }
1275 
1276 }  // namespace profiling
1277 }  // namespace perfetto
1278