1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/perf/perf_producer.h"
18 
19 #include <random>
20 #include <utility>
21 
22 #include <unistd.h>
23 
24 #include <unwindstack/Error.h>
25 #include <unwindstack/Unwinder.h>
26 
27 #include "perfetto/base/logging.h"
28 #include "perfetto/base/task_runner.h"
29 #include "perfetto/ext/base/metatrace.h"
30 #include "perfetto/ext/base/utils.h"
31 #include "perfetto/ext/base/weak_ptr.h"
32 #include "perfetto/ext/tracing/core/basic_types.h"
33 #include "perfetto/ext/tracing/core/producer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/data_source_descriptor.h"
38 #include "src/profiling/common/callstack_trie.h"
39 #include "src/profiling/common/proc_utils.h"
40 #include "src/profiling/common/producer_support.h"
41 #include "src/profiling/common/profiler_guardrails.h"
42 #include "src/profiling/common/unwind_support.h"
43 #include "src/profiling/perf/common_types.h"
44 #include "src/profiling/perf/event_reader.h"
45 
46 #include "protos/perfetto/common/builtin_clock.pbzero.h"
47 #include "protos/perfetto/common/perf_events.gen.h"
48 #include "protos/perfetto/common/perf_events.pbzero.h"
49 #include "protos/perfetto/config/profiling/perf_event_config.gen.h"
50 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
51 #include "protos/perfetto/trace/trace_packet.pbzero.h"
52 #include "protos/perfetto/trace/trace_packet_defaults.pbzero.h"
53 
54 namespace perfetto {
55 namespace profiling {
56 namespace {
57 
58 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
59 // window between a process image being replaced by execve, and the new
60 // libc instance reinstalling the proper signal handlers. During this window,
61 // the signal disposition is defaulted to terminating the process.
62 // This is a best-effort mitigation from the daemon's side, using a heuristic
63 // that most execve calls follow a fork. So if we get a sample for a very fresh
64 // process, the grace period will give it a chance to get to
65 // a properly initialised state prior to getting signalled. This doesn't help
66 // cases when a mature process calls execve, or when the target gets descheduled
67 // (since this is a naive walltime wait).
68 // The proper fix is in the platform, see bug for progress.
69 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
70 
71 constexpr uint32_t kMemoryLimitCheckPeriodMs = 5 * 1000;
72 
73 constexpr uint32_t kInitialConnectionBackoffMs = 100;
74 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
75 
76 constexpr char kProducerName[] = "perfetto.traced_perf";
77 constexpr char kDataSourceName[] = "linux.perf";
78 
NumberOfCpus()79 size_t NumberOfCpus() {
80   return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
81 }
82 
StartTracePacket(TraceWriter * trace_writer)83 TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) {
84   auto packet = trace_writer->NewTracePacket();
85   packet->set_sequence_flags(
86       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
87   return packet;
88 }
89 
WritePerfEventDefaultsPacket(const EventConfig & event_config,TraceWriter * trace_writer)90 void WritePerfEventDefaultsPacket(const EventConfig& event_config,
91                                   TraceWriter* trace_writer) {
92   auto packet = trace_writer->NewTracePacket();
93   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
94   packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME);
95 
96   // start new incremental state generation:
97   packet->set_sequence_flags(
98       protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
99 
100   // default packet timestamp clockid:
101   auto* defaults = packet->set_trace_packet_defaults();
102   defaults->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW);
103   PERFETTO_DCHECK(event_config.perf_attr()->clockid == CLOCK_MONOTONIC_RAW);
104 
105   auto* perf_defaults = defaults->set_perf_sample_defaults();
106   auto* timebase_pb = perf_defaults->set_timebase();
107 
108   // frequency/period:
109   perf_event_attr* perf_attr = event_config.perf_attr();
110   if (perf_attr->freq) {
111     timebase_pb->set_frequency(perf_attr->sample_freq);
112   } else {
113     timebase_pb->set_period(perf_attr->sample_period);
114   }
115 
116   // event:
117   const PerfCounter& timebase = event_config.timebase_event();
118   if (timebase.is_counter()) {
119     timebase_pb->set_counter(
120         static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter));
121   } else {
122     PERFETTO_DCHECK(timebase.is_tracepoint());
123     // TODO(rsavitski): reconsider using a struct with two strings instead
124     // of the ::gen::Tracepoint class in the C++ code.
125     auto* tracepoint_pb = timebase_pb->set_tracepoint();
126     tracepoint_pb->set_name(timebase.tracepoint.name());
127     tracepoint_pb->set_filter(timebase.tracepoint.filter());
128   }
129 }
130 
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)131 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
132   // Normally, we'd schedule the next tick at the next |period_ms|
133   // boundary of the boot clock. However, to avoid aligning the read tasks of
134   // all concurrent data sources, we select a deterministic offset based on the
135   // data source id.
136   std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
137   std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
138   uint32_t ds_period_offset = dist(prng);
139 
140   uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
141   return period_ms - ((now_ms - ds_period_offset) % period_ms);
142 }
143 
ShouldRejectDueToFilter(pid_t pid,base::FlatSet<std::string> * additional_cmdlines,const TargetFilter & filter)144 bool ShouldRejectDueToFilter(pid_t pid,
145                              base::FlatSet<std::string>* additional_cmdlines,
146                              const TargetFilter& filter) {
147   PERFETTO_CHECK(additional_cmdlines);
148   std::string cmdline;
149   bool have_cmdline = GetCmdlineForPID(pid, &cmdline);  // normalized form
150   if (!have_cmdline) {
151     PERFETTO_DLOG("Failed to look up cmdline for pid [%d]",
152                   static_cast<int>(pid));
153   }
154 
155   if (have_cmdline && filter.exclude_cmdlines.count(cmdline)) {
156     PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
157                   static_cast<int>(pid));
158     return true;
159   }
160   if (filter.exclude_pids.count(pid)) {
161     PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
162                   static_cast<int>(pid));
163     return true;
164   }
165 
166   if (have_cmdline && filter.cmdlines.count(cmdline)) {
167     return false;
168   }
169   if (filter.pids.count(pid)) {
170     return false;
171   }
172   if (filter.cmdlines.empty() && filter.pids.empty() &&
173       !filter.additional_cmdline_count) {
174     // If no filters are set allow everything.
175     return false;
176   }
177 
178   // If we didn't read the command line that's a good prediction we will not be
179   // able to profile either.
180   if (have_cmdline) {
181     if (additional_cmdlines->count(cmdline)) {
182       return false;
183     }
184     if (additional_cmdlines->size() < filter.additional_cmdline_count) {
185       additional_cmdlines->insert(cmdline);
186       return false;
187     }
188   }
189 
190   PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
191   return true;
192 }
193 
ToCpuModeEnum(uint16_t perf_cpu_mode)194 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
195   using Profiling = protos::pbzero::Profiling;
196   switch (perf_cpu_mode) {
197     case PERF_RECORD_MISC_KERNEL:
198       return Profiling::MODE_KERNEL;
199     case PERF_RECORD_MISC_USER:
200       return Profiling::MODE_USER;
201     case PERF_RECORD_MISC_HYPERVISOR:
202       return Profiling::MODE_HYPERVISOR;
203     case PERF_RECORD_MISC_GUEST_KERNEL:
204       return Profiling::MODE_GUEST_KERNEL;
205     case PERF_RECORD_MISC_GUEST_USER:
206       return Profiling::MODE_GUEST_USER;
207     default:
208       return Profiling::MODE_UNKNOWN;
209   }
210 }
211 
ToProtoEnum(unwindstack::ErrorCode error_code)212 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
213     unwindstack::ErrorCode error_code) {
214   using Profiling = protos::pbzero::Profiling;
215   switch (error_code) {
216     case unwindstack::ERROR_NONE:
217       return Profiling::UNWIND_ERROR_NONE;
218     case unwindstack::ERROR_MEMORY_INVALID:
219       return Profiling::UNWIND_ERROR_MEMORY_INVALID;
220     case unwindstack::ERROR_UNWIND_INFO:
221       return Profiling::UNWIND_ERROR_UNWIND_INFO;
222     case unwindstack::ERROR_UNSUPPORTED:
223       return Profiling::UNWIND_ERROR_UNSUPPORTED;
224     case unwindstack::ERROR_INVALID_MAP:
225       return Profiling::UNWIND_ERROR_INVALID_MAP;
226     case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
227       return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
228     case unwindstack::ERROR_REPEATED_FRAME:
229       return Profiling::UNWIND_ERROR_REPEATED_FRAME;
230     case unwindstack::ERROR_INVALID_ELF:
231       return Profiling::UNWIND_ERROR_INVALID_ELF;
232     case unwindstack::ERROR_SYSTEM_CALL:
233       return Profiling::UNWIND_ERROR_SYSTEM_CALL;
234     case unwindstack::ERROR_THREAD_TIMEOUT:
235       return Profiling::UNWIND_ERROR_THREAD_TIMEOUT;
236     case unwindstack::ERROR_THREAD_DOES_NOT_EXIST:
237       return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST;
238   }
239   return Profiling::UNWIND_ERROR_UNKNOWN;
240 }
241 
242 }  // namespace
243 
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)244 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
245                            base::TaskRunner* task_runner)
246     : task_runner_(task_runner),
247       proc_fd_getter_(proc_fd_getter),
248       unwinding_worker_(this),
249       weak_factory_(this) {
250   proc_fd_getter->SetDelegate(this);
251 }
252 
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)253 void PerfProducer::SetupDataSource(DataSourceInstanceID,
254                                    const DataSourceConfig&) {}
255 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)256 void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
257                                    const DataSourceConfig& config) {
258   PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(ds_id),
259                config.name().c_str());
260 
261   if (config.name() == MetatraceWriter::kDataSourceName) {
262     StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
263     return;
264   }
265 
266   // linux.perf data source
267   if (config.name() != kDataSourceName)
268     return;
269 
270   // Tracepoint name -> id lookup in case the config asks for tracepoints:
271   auto tracepoint_id_lookup = [this](const std::string& group,
272                                      const std::string& name) {
273     if (!tracefs_)  // lazy init or retry
274       tracefs_ = FtraceProcfs::CreateGuessingMountPoint();
275     if (!tracefs_)  // still didn't find an accessible tracefs
276       return 0u;
277     return tracefs_->ReadEventId(group, name);
278   };
279 
280   protos::gen::PerfEventConfig event_config_pb;
281   if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) {
282     PERFETTO_ELOG("PerfEventConfig could not be parsed.");
283     return;
284   }
285   base::Optional<EventConfig> event_config =
286       EventConfig::Create(event_config_pb, config, tracepoint_id_lookup);
287   if (!event_config.has_value()) {
288     PERFETTO_ELOG("PerfEventConfig rejected.");
289     return;
290   }
291 
292   size_t num_cpus = NumberOfCpus();
293   std::vector<EventReader> per_cpu_readers;
294   for (uint32_t cpu = 0; cpu < num_cpus; cpu++) {
295     base::Optional<EventReader> event_reader =
296         EventReader::ConfigureEvents(cpu, event_config.value());
297     if (!event_reader.has_value()) {
298       PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
299                     ", discarding data source.",
300                     cpu);
301       return;
302     }
303     per_cpu_readers.emplace_back(std::move(event_reader.value()));
304   }
305 
306   auto buffer_id = static_cast<BufferID>(config.target_buffer());
307   auto writer = endpoint_->CreateTraceWriter(buffer_id);
308 
309   // Construct the data source instance.
310   std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
311   bool inserted;
312   std::tie(ds_it, inserted) = data_sources_.emplace(
313       std::piecewise_construct, std::forward_as_tuple(ds_id),
314       std::forward_as_tuple(event_config.value(), std::move(writer),
315                             std::move(per_cpu_readers)));
316   PERFETTO_CHECK(inserted);
317   DataSourceState& ds = ds_it->second;
318 
319   // Start the configured events.
320   for (auto& per_cpu_reader : ds.per_cpu_readers) {
321     per_cpu_reader.EnableEvents();
322   }
323 
324   WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
325 
326   InterningOutputTracker::WriteFixedInterningsPacket(
327       ds_it->second.trace_writer.get(),
328       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
329 
330   // Inform unwinder of the new data source instance, and optionally start a
331   // periodic task to clear its cached state.
332   unwinding_worker_->PostStartDataSource(ds_id,
333                                          ds.event_config.kernel_frames());
334   if (ds.event_config.unwind_state_clear_period_ms()) {
335     unwinding_worker_->PostClearCachedStatePeriodic(
336         ds_id, ds.event_config.unwind_state_clear_period_ms());
337   }
338 
339   // Kick off periodic read task.
340   auto tick_period_ms = ds.event_config.read_tick_period_ms();
341   auto weak_this = weak_factory_.GetWeakPtr();
342   task_runner_->PostDelayedTask(
343       [weak_this, ds_id] {
344         if (weak_this)
345           weak_this->TickDataSourceRead(ds_id);
346       },
347       TimeToNextReadTickMs(ds_id, tick_period_ms));
348 
349   // Optionally kick off periodic memory footprint limit check.
350   uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
351   if (max_daemon_memory_kb > 0) {
352     task_runner_->PostDelayedTask(
353         [weak_this, ds_id, max_daemon_memory_kb] {
354           if (weak_this)
355             weak_this->CheckMemoryFootprintPeriodic(ds_id,
356                                                     max_daemon_memory_kb);
357         },
358         kMemoryLimitCheckPeriodMs);
359   }
360 }
361 
CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,uint32_t max_daemon_memory_kb)362 void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
363                                                 uint32_t max_daemon_memory_kb) {
364   auto ds_it = data_sources_.find(ds_id);
365   if (ds_it == data_sources_.end())
366     return;  // stop recurring
367 
368   GuardrailConfig gconfig = {};
369   gconfig.memory_guardrail_kb = max_daemon_memory_kb;
370 
371   ProfilerMemoryGuardrails footprint_snapshot;
372   if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
373     PurgeDataSource(ds_id);
374     return;  // stop recurring
375   }
376 
377   // repost
378   auto weak_this = weak_factory_.GetWeakPtr();
379   task_runner_->PostDelayedTask(
380       [weak_this, ds_id, max_daemon_memory_kb] {
381         if (weak_this)
382           weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
383       },
384       kMemoryLimitCheckPeriodMs);
385 }
386 
StopDataSource(DataSourceInstanceID ds_id)387 void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
388   PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));
389 
390   // Metatrace: stop immediately (will miss the events from the
391   // asynchronous shutdown of the primary data source).
392   auto meta_it = metatrace_writers_.find(ds_id);
393   if (meta_it != metatrace_writers_.end()) {
394     meta_it->second.WriteAllAndFlushTraceWriter([] {});
395     metatrace_writers_.erase(meta_it);
396     return;
397   }
398 
399   auto ds_it = data_sources_.find(ds_id);
400   if (ds_it == data_sources_.end()) {
401     // Most likely, the source is missing due to an abrupt stop (via
402     // |PurgeDataSource|). Tell the service that we've stopped the source now,
403     // so that it doesn't wait for the ack until the timeout.
404     endpoint_->NotifyDataSourceStopped(ds_id);
405     return;
406   }
407 
408   // Start shutting down the reading frontend, which will propagate the stop
409   // further as the intermediate buffers are cleared.
410   DataSourceState& ds = ds_it->second;
411   InitiateReaderStop(&ds);
412 }
413 
414 // The perf data sources ignore flush requests, as flushing would be
415 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
416 // Instead of responding to explicit flushes, we can ensure that we're otherwise
417 // well-behaved (do not reorder packets too much), and let the service scrape
418 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources)419 void PerfProducer::Flush(FlushRequestID flush_id,
420                          const DataSourceInstanceID* data_source_ids,
421                          size_t num_data_sources) {
422   // Flush metatracing if requested.
423   for (size_t i = 0; i < num_data_sources; i++) {
424     auto ds_id = data_source_ids[i];
425     PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
426 
427     auto meta_it = metatrace_writers_.find(ds_id);
428     if (meta_it != metatrace_writers_.end()) {
429       meta_it->second.WriteAllAndFlushTraceWriter([] {});
430     }
431   }
432 
433   endpoint_->NotifyFlushComplete(flush_id);
434 }
435 
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)436 void PerfProducer::ClearIncrementalState(
437     const DataSourceInstanceID* data_source_ids,
438     size_t num_data_sources) {
439   for (size_t i = 0; i < num_data_sources; i++) {
440     auto ds_id = data_source_ids[i];
441     PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
442 
443     if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
444       continue;
445 
446     auto ds_it = data_sources_.find(ds_id);
447     if (ds_it == data_sources_.end()) {
448       PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
449                     static_cast<size_t>(ds_id));
450       continue;
451     }
452     DataSourceState& ds = ds_it->second;
453 
454     WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
455 
456     // Forget which incremental state we've emitted before.
457     ds.interning_output.ClearHistory();
458     InterningOutputTracker::WriteFixedInterningsPacket(
459         ds.trace_writer.get(),
460         protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
461 
462     // Drop the cross-datasource callstack interning trie. This is not
463     // necessary for correctness (the preceding step is sufficient). However,
464     // incremental clearing is likely to be used in ring buffer traces, where
465     // it makes sense to reset the trie's size periodically, and this is a
466     // reasonable point to do so. The trie keeps the monotonic interning IDs,
467     // so there is no confusion for other concurrent data sources. We do not
468     // bother with clearing concurrent sources' interning output trackers as
469     // their footprint should be trivial.
470     callstack_trie_.ClearTrie();
471   }
472 }
473 
TickDataSourceRead(DataSourceInstanceID ds_id)474 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
475   auto it = data_sources_.find(ds_id);
476   if (it == data_sources_.end()) {
477     PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
478                   static_cast<size_t>(ds_id));
479     return;
480   }
481   DataSourceState& ds = it->second;
482 
483   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
484 
485   // Make a pass over all per-cpu readers.
486   uint64_t max_samples = ds.event_config.samples_per_tick_limit();
487   bool more_records_available = false;
488   for (EventReader& reader : ds.per_cpu_readers) {
489     if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
490       more_records_available = true;
491     }
492   }
493 
494   // Wake up the unwinder as we've (likely) pushed samples into its queue.
495   unwinding_worker_->PostProcessQueue();
496 
497   if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
498       !more_records_available) {
499     unwinding_worker_->PostInitiateDataSourceStop(ds_id);
500   } else {
501     // otherwise, keep reading
502     auto tick_period_ms = it->second.event_config.read_tick_period_ms();
503     auto weak_this = weak_factory_.GetWeakPtr();
504     task_runner_->PostDelayedTask(
505         [weak_this, ds_id] {
506           if (weak_this)
507             weak_this->TickDataSourceRead(ds_id);
508         },
509         TimeToNextReadTickMs(ds_id, tick_period_ms));
510   }
511 }
512 
ReadAndParsePerCpuBuffer(EventReader * reader,uint64_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)513 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
514                                             uint64_t max_samples,
515                                             DataSourceInstanceID ds_id,
516                                             DataSourceState* ds) {
517   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
518 
519   // If the kernel ring buffer dropped data, record it in the trace.
520   size_t cpu = reader->cpu();
521   auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
522     auto weak_this = weak_factory_.GetWeakPtr();
523     task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
524       if (weak_this)
525         weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
526     });
527   };
528 
529   for (uint64_t i = 0; i < max_samples; i++) {
530     base::Optional<ParsedSample> sample =
531         reader->ReadUntilSample(records_lost_callback);
532     if (!sample) {
533       return false;  // caught up to the writer
534     }
535 
536     // Counter-only mode: skip the unwinding stage, enqueue the sample for
537     // output immediately.
538     if (!ds->event_config.sample_callstacks()) {
539       CompletedSample output;
540       output.common = sample->common;
541       EmitSample(ds_id, std::move(output));
542       continue;
543     }
544 
545     // If sampling callstacks, we're not interested in kernel threads/workers.
546     if (!sample->regs) {
547       continue;
548     }
549 
550     // Request proc-fds for the process if this is the first time we see it.
551     pid_t pid = sample->common.pid;
552     auto& process_state = ds->process_states[pid];  // insert if new
553 
554     if (process_state == ProcessTrackingStatus::kExpired) {
555       PERFETTO_DLOG("Skipping sample for previously expired pid [%d]",
556                     static_cast<int>(pid));
557       EmitSkippedSample(ds_id, std::move(sample.value()),
558                         SampleSkipReason::kReadStage);
559       continue;
560     }
561 
562     // Previously failed the target filter check.
563     if (process_state == ProcessTrackingStatus::kRejected) {
564       PERFETTO_DLOG("Skipping sample for pid [%d] due to target filter",
565                     static_cast<int>(pid));
566       continue;
567     }
568 
569     // Seeing pid for the first time.
570     if (process_state == ProcessTrackingStatus::kInitial) {
571       PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
572 
573       // Check whether samples for this new process should be
574       // dropped due to the target filtering.
575       const TargetFilter& filter = ds->event_config.filter();
576       if (ShouldRejectDueToFilter(pid, &ds->additional_cmdlines, filter)) {
577         process_state = ProcessTrackingStatus::kRejected;
578         continue;
579       }
580 
581       // At this point, sampled process is known to be of interest, so start
582       // resolving the proc-fds. Response is async.
583       process_state = ProcessTrackingStatus::kResolving;
584       InitiateDescriptorLookup(ds_id, pid,
585                                ds->event_config.remote_descriptor_timeout_ms());
586     }
587 
588     PERFETTO_CHECK(process_state == ProcessTrackingStatus::kResolved ||
589                    process_state == ProcessTrackingStatus::kResolving);
590 
591     // Optionally: drop sample if above a given threshold of sampled stacks
592     // that are waiting in the unwinding queue.
593     uint64_t max_footprint_bytes =
594         ds->event_config.max_enqueued_footprint_bytes();
595     uint64_t sample_stack_size = sample->stack.size();
596     if (max_footprint_bytes) {
597       uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint();
598       if (footprint_bytes + sample_stack_size >= max_footprint_bytes) {
599         PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit.");
600         EmitSkippedSample(ds_id, std::move(sample.value()),
601                           SampleSkipReason::kUnwindEnqueue);
602         continue;
603       }
604     }
605 
606     // Push the sample into the unwinding queue if there is room.
607     auto& queue = unwinding_worker_->unwind_queue();
608     WriteView write_view = queue.BeginWrite();
609     if (write_view.valid) {
610       queue.at(write_view.write_pos) =
611           UnwindEntry{ds_id, std::move(sample.value())};
612       queue.CommitWrite();
613       unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size);
614     } else {
615       PERFETTO_DLOG("Unwinder queue full, skipping sample");
616       EmitSkippedSample(ds_id, std::move(sample.value()),
617                         SampleSkipReason::kUnwindEnqueue);
618     }
619   }
620 
621   // Most likely more events in the kernel buffer. Though we might be exactly on
622   // the boundary due to |max_samples|.
623   return true;
624 }
625 
626 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
627 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,uid_t uid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)628 void PerfProducer::OnProcDescriptors(pid_t pid,
629                                      uid_t uid,
630                                      base::ScopedFile maps_fd,
631                                      base::ScopedFile mem_fd) {
632   // Find first-fit data source that requested descriptors for the process.
633   for (auto& it : data_sources_) {
634     DataSourceState& ds = it.second;
635     auto proc_status_it = ds.process_states.find(pid);
636     if (proc_status_it == ds.process_states.end())
637       continue;
638 
639     if (!CanProfile(ds.event_config.raw_ds_config(), uid,
640                     ds.event_config.target_installed_by())) {
641       PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]",
642                     static_cast<int>(pid), static_cast<int>(uid),
643                     static_cast<size_t>(it.first));
644       continue;
645     }
646 
647     // Match against either resolving, or expired state. In the latter
648     // case, it means that the async response was slow enough that we've marked
649     // the lookup as expired (but can now recover for future samples).
650     auto proc_status = proc_status_it->second;
651     if (proc_status == ProcessTrackingStatus::kResolving ||
652         proc_status == ProcessTrackingStatus::kExpired) {
653       PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
654                     static_cast<int>(pid), static_cast<size_t>(it.first));
655 
656       proc_status_it->second = ProcessTrackingStatus::kResolved;
657       unwinding_worker_->PostAdoptProcDescriptors(
658           it.first, pid, std::move(maps_fd), std::move(mem_fd));
659       return;  // done
660     }
661   }
662   PERFETTO_DLOG(
663       "Discarding proc-fds for pid [%d] as found no outstanding requests.",
664       static_cast<int>(pid));
665 }
666 
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)667 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
668                                             pid_t pid,
669                                             uint32_t timeout_ms) {
670   if (!proc_fd_getter_->RequiresDelayedRequest()) {
671     StartDescriptorLookup(ds_id, pid, timeout_ms);
672     return;
673   }
674 
675   // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
676   auto weak_this = weak_factory_.GetWeakPtr();
677   task_runner_->PostDelayedTask(
678       [weak_this, ds_id, pid, timeout_ms] {
679         if (weak_this)
680           weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
681       },
682       kProcDescriptorsAndroidDelayMs);
683 }
684 
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)685 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
686                                          pid_t pid,
687                                          uint32_t timeout_ms) {
688   proc_fd_getter_->GetDescriptorsForPid(pid);
689 
690   auto weak_this = weak_factory_.GetWeakPtr();
691   task_runner_->PostDelayedTask(
692       [weak_this, ds_id, pid] {
693         if (weak_this)
694           weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
695       },
696       timeout_ms);
697 }
698 
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)699 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
700                                                    pid_t pid) {
701   auto ds_it = data_sources_.find(ds_id);
702   if (ds_it == data_sources_.end())
703     return;
704 
705   DataSourceState& ds = ds_it->second;
706   auto proc_status_it = ds.process_states.find(pid);
707   if (proc_status_it == ds.process_states.end())
708     return;
709 
710   // If the request is still outstanding, mark the process as expired (causing
711   // outstanding and future samples to be discarded).
712   auto proc_status = proc_status_it->second;
713   if (proc_status == ProcessTrackingStatus::kResolving) {
714     PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
715                   static_cast<int>(pid), static_cast<size_t>(ds_it->first));
716 
717     proc_status_it->second = ProcessTrackingStatus::kExpired;
718     // Also inform the unwinder of the state change (so that it can discard any
719     // of the already-enqueued samples).
720     unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
721   }
722 }
723 
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)724 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
725                                   CompletedSample sample) {
726   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
727   CompletedSample* raw_sample = new CompletedSample(std::move(sample));
728   auto weak_this = weak_factory_.GetWeakPtr();
729   task_runner_->PostTask([weak_this, ds_id, raw_sample] {
730     if (weak_this)
731       weak_this->EmitSample(ds_id, std::move(*raw_sample));
732     delete raw_sample;
733   });
734 }
735 
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)736 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
737                               CompletedSample sample) {
738   auto ds_it = data_sources_.find(ds_id);
739   if (ds_it == data_sources_.end()) {
740     PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
741                   static_cast<size_t>(ds_id));
742     return;
743   }
744   DataSourceState& ds = ds_it->second;
745 
746   // intern callsite
747   GlobalCallstackTrie::Node* callstack_root =
748       callstack_trie_.CreateCallsite(sample.frames, sample.build_ids);
749   uint64_t callstack_iid = callstack_root->id();
750 
751   // start packet, timestamp domain defaults to monotonic_raw
752   auto packet = StartTracePacket(ds.trace_writer.get());
753   packet->set_timestamp(sample.common.timestamp);
754 
755   // write new interning data (if any)
756   protos::pbzero::InternedData* interned_out = packet->set_interned_data();
757   ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
758                                      interned_out);
759 
760   // write the sample itself
761   auto* perf_sample = packet->set_perf_sample();
762   perf_sample->set_cpu(sample.common.cpu);
763   perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
764   perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
765   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
766   perf_sample->set_timebase_count(sample.common.timebase_count);
767   perf_sample->set_callstack_iid(callstack_iid);
768   if (sample.unwind_error != unwindstack::ERROR_NONE) {
769     perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
770   }
771 }
772 
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)773 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
774                                       size_t cpu,
775                                       uint64_t records_lost) {
776   auto ds_it = data_sources_.find(ds_id);
777   if (ds_it == data_sources_.end())
778     return;
779   DataSourceState& ds = ds_it->second;
780   PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
781                 static_cast<size_t>(ds_id), cpu, records_lost);
782 
783   // The data loss record relates to a single ring buffer, and indicates loss
784   // since the last successfully-written record in that buffer. Therefore the
785   // data loss record itself has no timestamp.
786   // We timestamp the packet with the boot clock for packet ordering purposes,
787   // but it no longer has a (precise) interpretation relative to the sample
788   // stream from that per-cpu buffer. See the proto comments for more details.
789   auto packet = StartTracePacket(ds.trace_writer.get());
790   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
791   packet->set_timestamp_clock_id(
792       protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
793 
794   auto* perf_sample = packet->set_perf_sample();
795   perf_sample->set_cpu(static_cast<uint32_t>(cpu));
796   perf_sample->set_kernel_records_lost(records_lost);
797 }
798 
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)799 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
800                                                  ParsedSample sample) {
801   PostEmitSkippedSample(ds_id, std::move(sample),
802                         SampleSkipReason::kUnwindStage);
803 }
804 
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)805 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
806                                          ParsedSample sample,
807                                          SampleSkipReason reason) {
808   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
809   ParsedSample* raw_sample = new ParsedSample(std::move(sample));
810   auto weak_this = weak_factory_.GetWeakPtr();
811   task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
812     if (weak_this)
813       weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
814     delete raw_sample;
815   });
816 }
817 
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)818 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
819                                      ParsedSample sample,
820                                      SampleSkipReason reason) {
821   auto ds_it = data_sources_.find(ds_id);
822   if (ds_it == data_sources_.end())
823     return;
824   DataSourceState& ds = ds_it->second;
825 
826   // Note: timestamp defaults to the monotonic_raw domain.
827   auto packet = StartTracePacket(ds.trace_writer.get());
828   packet->set_timestamp(sample.common.timestamp);
829   auto* perf_sample = packet->set_perf_sample();
830   perf_sample->set_cpu(sample.common.cpu);
831   perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
832   perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
833   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
834   perf_sample->set_timebase_count(sample.common.timebase_count);
835 
836   using PerfSample = protos::pbzero::PerfSample;
837   switch (reason) {
838     case SampleSkipReason::kReadStage:
839       perf_sample->set_sample_skipped_reason(
840           PerfSample::PROFILER_SKIP_READ_STAGE);
841       break;
842     case SampleSkipReason::kUnwindEnqueue:
843       perf_sample->set_sample_skipped_reason(
844           PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
845       break;
846     case SampleSkipReason::kUnwindStage:
847       perf_sample->set_sample_skipped_reason(
848           PerfSample::PROFILER_SKIP_UNWIND_STAGE);
849       break;
850   }
851 }
852 
InitiateReaderStop(DataSourceState * ds)853 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
854   PERFETTO_DLOG("InitiateReaderStop");
855   PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
856 
857   ds->status = DataSourceState::Status::kShuttingDown;
858   for (auto& event_reader : ds->per_cpu_readers) {
859     event_reader.DisableEvents();
860   }
861 }
862 
PostFinishDataSourceStop(DataSourceInstanceID ds_id)863 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
864   auto weak_producer = weak_factory_.GetWeakPtr();
865   task_runner_->PostTask([weak_producer, ds_id] {
866     if (weak_producer)
867       weak_producer->FinishDataSourceStop(ds_id);
868   });
869 }
870 
FinishDataSourceStop(DataSourceInstanceID ds_id)871 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
872   PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
873   auto ds_it = data_sources_.find(ds_id);
874   if (ds_it == data_sources_.end()) {
875     PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
876                   static_cast<size_t>(ds_id));
877     return;
878   }
879   DataSourceState& ds = ds_it->second;
880   PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
881 
882   ds.trace_writer->Flush();
883   data_sources_.erase(ds_it);
884 
885   endpoint_->NotifyDataSourceStopped(ds_id);
886 
887   // Clean up resources if there are no more active sources.
888   if (data_sources_.empty()) {
889     callstack_trie_.ClearTrie();  // purge internings
890     base::MaybeReleaseAllocatorMemToOS();
891   }
892 }
893 
894 // TODO(rsavitski): maybe make the tracing service respect premature
895 // producer-driven stops, and then issue a NotifyDataSourceStopped here.
896 // Alternatively (and at the expense of higher complexity) introduce a new data
897 // source status of "tombstoned", and propagate it until the source is stopped
898 // by the service (this would technically allow for stricter lifetime checking
899 // of data sources, and help with discarding periodic flushes).
900 // TODO(rsavitski): Purging while stopping will currently leave the stop
901 // unacknowledged. Consider checking whether the DS is stopping here, and if so,
902 // notifying immediately after erasing.
PurgeDataSource(DataSourceInstanceID ds_id)903 void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
904   auto ds_it = data_sources_.find(ds_id);
905   if (ds_it == data_sources_.end())
906     return;
907   DataSourceState& ds = ds_it->second;
908 
909   PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
910                static_cast<size_t>(ds_id));
911 
912   unwinding_worker_->PostPurgeDataSource(ds_id);
913 
914   // Write a packet indicating the abrupt stop.
915   {
916     auto packet = StartTracePacket(ds.trace_writer.get());
917     packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
918     packet->set_timestamp_clock_id(
919         protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
920     auto* perf_sample = packet->set_perf_sample();
921     auto* producer_event = perf_sample->set_producer_event();
922     producer_event->set_source_stop_reason(
923         protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
924   }
925 
926   ds.trace_writer->Flush();
927   data_sources_.erase(ds_it);
928 
929   // Clean up resources if there are no more active sources.
930   if (data_sources_.empty()) {
931     callstack_trie_.ClearTrie();  // purge internings
932     base::MaybeReleaseAllocatorMemToOS();
933   }
934 }
935 
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)936 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
937                                         BufferID target_buffer) {
938   auto writer = endpoint_->CreateTraceWriter(target_buffer);
939 
940   auto it_and_inserted = metatrace_writers_.emplace(
941       std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
942   PERFETTO_DCHECK(it_and_inserted.second);
943   // Note: only the first concurrent writer will actually be active.
944   metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
945                                    metatrace::TAG_ANY);
946 }
947 
ConnectWithRetries(const char * socket_name)948 void PerfProducer::ConnectWithRetries(const char* socket_name) {
949   PERFETTO_DCHECK(state_ == kNotStarted);
950   state_ = kNotConnected;
951 
952   ResetConnectionBackoff();
953   producer_socket_name_ = socket_name;
954   ConnectService();
955 }
956 
ConnectService()957 void PerfProducer::ConnectService() {
958   PERFETTO_DCHECK(state_ == kNotConnected);
959   state_ = kConnecting;
960   endpoint_ = ProducerIPCClient::Connect(
961       producer_socket_name_, this, kProducerName, task_runner_,
962       TracingService::ProducerSMBScrapingMode::kEnabled);
963 }
964 
IncreaseConnectionBackoff()965 void PerfProducer::IncreaseConnectionBackoff() {
966   connection_backoff_ms_ *= 2;
967   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
968     connection_backoff_ms_ = kMaxConnectionBackoffMs;
969 }
970 
ResetConnectionBackoff()971 void PerfProducer::ResetConnectionBackoff() {
972   connection_backoff_ms_ = kInitialConnectionBackoffMs;
973 }
974 
OnConnect()975 void PerfProducer::OnConnect() {
976   PERFETTO_DCHECK(state_ == kConnecting);
977   state_ = kConnected;
978   ResetConnectionBackoff();
979   PERFETTO_LOG("Connected to the service");
980 
981   {
982     // linux.perf
983     DataSourceDescriptor desc;
984     desc.set_name(kDataSourceName);
985     desc.set_handles_incremental_state_clear(true);
986     desc.set_will_notify_on_stop(true);
987     endpoint_->RegisterDataSource(desc);
988   }
989   {
990     // metatrace
991     DataSourceDescriptor desc;
992     desc.set_name(MetatraceWriter::kDataSourceName);
993     endpoint_->RegisterDataSource(desc);
994   }
995 }
996 
OnDisconnect()997 void PerfProducer::OnDisconnect() {
998   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
999   PERFETTO_LOG("Disconnected from tracing service");
1000 
1001   auto weak_producer = weak_factory_.GetWeakPtr();
1002   if (state_ == kConnected)
1003     return task_runner_->PostTask([weak_producer] {
1004       if (weak_producer)
1005         weak_producer->Restart();
1006     });
1007 
1008   state_ = kNotConnected;
1009   IncreaseConnectionBackoff();
1010   task_runner_->PostDelayedTask(
1011       [weak_producer] {
1012         if (weak_producer)
1013           weak_producer->ConnectService();
1014       },
1015       connection_backoff_ms_);
1016 }
1017 
Restart()1018 void PerfProducer::Restart() {
1019   // We lost the connection with the tracing service. At this point we need
1020   // to reset all the data sources. Trying to handle that manually is going to
1021   // be error prone. What we do here is simply destroy the instance and
1022   // recreate it again.
1023   base::TaskRunner* task_runner = task_runner_;
1024   const char* socket_name = producer_socket_name_;
1025   ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
1026 
1027   // Invoke destructor and then the constructor again.
1028   this->~PerfProducer();
1029   new (this) PerfProducer(proc_fd_getter, task_runner);
1030 
1031   ConnectWithRetries(socket_name);
1032 }
1033 
1034 }  // namespace profiling
1035 }  // namespace perfetto
1036