1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <vector>
23 
24 #include "perfetto/base/build_config.h"
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/task_runner.h"
27 #include "perfetto/ext/base/hash.h"
28 #include "perfetto/ext/base/thread_checker.h"
29 #include "perfetto/ext/base/waitable_event.h"
30 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
31 #include "perfetto/ext/tracing/core/trace_packet.h"
32 #include "perfetto/ext/tracing/core/trace_stats.h"
33 #include "perfetto/ext/tracing/core/trace_writer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/tracing/buffer_exhausted_policy.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/tracing_service_state.h"
38 #include "perfetto/tracing/data_source.h"
39 #include "perfetto/tracing/internal/data_source_internal.h"
40 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
41 #include "perfetto/tracing/internal/tracing_backend_fake.h"
42 #include "perfetto/tracing/trace_writer_base.h"
43 #include "perfetto/tracing/tracing.h"
44 #include "perfetto/tracing/tracing_backend.h"
45 
46 #include "protos/perfetto/config/interceptor_config.gen.h"
47 
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49 
50 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
51 #include <io.h>  // For dup()
52 #else
53 #include <unistd.h>  // For dup()
54 #endif
55 
56 namespace perfetto {
57 namespace internal {
58 
59 namespace {
60 
61 // A task runner which prevents calls to DataSource::Trace() while an operation
62 // is in progress. Used to guard against unexpected re-entrancy where the
63 // user-provided task runner implementation tries to enter a trace point under
64 // the hood.
65 class NonReentrantTaskRunner : public base::TaskRunner {
66  public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)67   NonReentrantTaskRunner(TracingMuxer* muxer,
68                          std::unique_ptr<base::TaskRunner> task_runner)
69       : muxer_(muxer), task_runner_(std::move(task_runner)) {}
70 
71   // base::TaskRunner implementation.
PostTask(std::function<void ()> task)72   void PostTask(std::function<void()> task) override {
73     CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
74   }
75 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)76   void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
77     CallWithGuard(
78         [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
79   }
80 
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)81   void AddFileDescriptorWatch(base::PlatformHandle fd,
82                               std::function<void()> callback) override {
83     CallWithGuard(
84         [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
85   }
86 
RemoveFileDescriptorWatch(base::PlatformHandle fd)87   void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
88     CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
89   }
90 
RunsTasksOnCurrentThread() const91   bool RunsTasksOnCurrentThread() const override {
92     bool result;
93     CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
94     return result;
95   }
96 
97  private:
98   template <typename T>
CallWithGuard(T lambda) const99   void CallWithGuard(T lambda) const {
100     auto* root_tls = muxer_->GetOrCreateTracingTLS();
101     if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
102       lambda();
103       return;
104     }
105     ScopedReentrancyAnnotator scoped_annotator(*root_tls);
106     lambda();
107   }
108 
109   TracingMuxer* const muxer_;
110   std::unique_ptr<base::TaskRunner> task_runner_;
111 };
112 
113 class StopArgsImpl : public DataSourceBase::StopArgs {
114  public:
HandleStopAsynchronously() const115   std::function<void()> HandleStopAsynchronously() const override {
116     auto closure = std::move(async_stop_closure);
117     async_stop_closure = std::function<void()>();
118     return closure;
119   }
120 
121   mutable std::function<void()> async_stop_closure;
122 };
123 
ComputeConfigHash(const DataSourceConfig & config)124 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
125   base::Hash hasher;
126   std::string config_bytes = config.SerializeAsString();
127   hasher.Update(config_bytes.data(), config_bytes.size());
128   return hasher.digest();
129 }
130 
131 }  // namespace
132 
133 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms)134 TracingMuxerImpl::ProducerImpl::ProducerImpl(
135     TracingMuxerImpl* muxer,
136     TracingBackendId backend_id,
137     uint32_t shmem_batch_commits_duration_ms)
138     : muxer_(muxer),
139       backend_id_(backend_id),
140       shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms) {}
141 
142 TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
143 
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)144 void TracingMuxerImpl::ProducerImpl::Initialize(
145     std::unique_ptr<ProducerEndpoint> endpoint) {
146   PERFETTO_DCHECK_THREAD(thread_checker_);
147   PERFETTO_DCHECK(!connected_);
148   connection_id_++;
149 
150   // Adopt the endpoint into a shared pointer so that we can safely share it
151   // across threads that create trace writers. The custom deleter function
152   // ensures that the endpoint is always destroyed on the muxer's thread. (Note
153   // that |task_runner| is assumed to outlive tracing sessions on all threads.)
154   auto* task_runner = muxer_->task_runner_.get();
155   auto deleter = [task_runner](ProducerEndpoint* e) {
156     task_runner->PostTask([e] { delete e; });
157   };
158   std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
159   // This atomic store is needed because another thread might be concurrently
160   // creating a trace writer using the previous (disconnected) |service_|. See
161   // CreateTraceWriter().
162   std::atomic_store(&service_, std::move(service));
163   // Don't try to use the service here since it may not have connected yet. See
164   // OnConnect().
165 }
166 
OnConnect()167 void TracingMuxerImpl::ProducerImpl::OnConnect() {
168   PERFETTO_DLOG("Producer connected");
169   PERFETTO_DCHECK_THREAD(thread_checker_);
170   PERFETTO_DCHECK(!connected_);
171   connected_ = true;
172   muxer_->UpdateDataSourcesOnAllBackends();
173 }
174 
OnDisconnect()175 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
176   PERFETTO_DCHECK_THREAD(thread_checker_);
177   connected_ = false;
178   // Active data sources for this producer will be stopped by
179   // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
180   // will have a different connection id (even before it has finished
181   // connecting).
182   registered_data_sources_.reset();
183   // Keep the old service around as a dead connection in case it has active
184   // trace writers. We can't clear |service_| here because other threads may be
185   // concurrently creating new trace writers. The reconnection below will
186   // atomically swap the new service in place of the old one.
187   dead_services_.push_back(service_);
188   // Try reconnecting the producer.
189   muxer_->OnProducerDisconnected(this);
190 }
191 
OnTracingSetup()192 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
193   PERFETTO_DCHECK_THREAD(thread_checker_);
194   service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
195       shmem_batch_commits_duration_ms_);
196 }
197 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)198 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
199     DataSourceInstanceID id,
200     const DataSourceConfig& cfg) {
201   PERFETTO_DCHECK_THREAD(thread_checker_);
202   muxer_->SetupDataSource(backend_id_, connection_id_, id, cfg);
203 }
204 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)205 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
206                                                      const DataSourceConfig&) {
207   PERFETTO_DCHECK_THREAD(thread_checker_);
208   muxer_->StartDataSource(backend_id_, id);
209   service_->NotifyDataSourceStarted(id);
210 }
211 
StopDataSource(DataSourceInstanceID id)212 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
213   PERFETTO_DCHECK_THREAD(thread_checker_);
214   muxer_->StopDataSource_AsyncBegin(backend_id_, id);
215 }
216 
Flush(FlushRequestID flush_id,const DataSourceInstanceID *,size_t)217 void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
218                                            const DataSourceInstanceID*,
219                                            size_t) {
220   // Flush is not plumbed for now, we just ack straight away.
221   PERFETTO_DCHECK_THREAD(thread_checker_);
222   service_->NotifyFlushComplete(flush_id);
223 }
224 
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)225 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
226     const DataSourceInstanceID* instances,
227     size_t instance_count) {
228   PERFETTO_DCHECK_THREAD(thread_checker_);
229   for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
230     muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
231   }
232 }
233 
SweepDeadServices()234 void TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
235   PERFETTO_DCHECK_THREAD(thread_checker_);
236   auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
237     auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
238     return !arbiter || arbiter->TryShutdown();
239   };
240   for (auto it = dead_services_.begin(); it != dead_services_.end();) {
241     auto next_it = it;
242     next_it++;
243     if (is_unused(*it)) {
244       dead_services_.erase(it);
245     }
246     it = next_it;
247   }
248 }
249 
250 // ----- End of TracingMuxerImpl::ProducerImpl methods.
251 
252 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingBackendId backend_id,TracingSessionGlobalID session_id)253 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
254                                              BackendType backend_type,
255                                              TracingBackendId backend_id,
256                                              TracingSessionGlobalID session_id)
257     : muxer_(muxer),
258       backend_type_(backend_type),
259       backend_id_(backend_id),
260       session_id_(session_id) {}
261 
262 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
263 
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)264 void TracingMuxerImpl::ConsumerImpl::Initialize(
265     std::unique_ptr<ConsumerEndpoint> endpoint) {
266   PERFETTO_DCHECK_THREAD(thread_checker_);
267   service_ = std::move(endpoint);
268   // Don't try to use the service here since it may not have connected yet. See
269   // OnConnect().
270 }
271 
OnConnect()272 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
273   PERFETTO_DCHECK_THREAD(thread_checker_);
274   PERFETTO_DCHECK(!connected_);
275   connected_ = true;
276 
277   // Observe data source instance events so we get notified when tracing starts.
278   service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
279                           ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
280 
281   // If the API client configured and started tracing before we connected,
282   // tell the backend about it now.
283   if (trace_config_)
284     muxer_->SetupTracingSession(session_id_, trace_config_);
285   if (start_pending_)
286     muxer_->StartTracingSession(session_id_);
287   if (get_trace_stats_pending_) {
288     auto callback = std::move(get_trace_stats_callback_);
289     get_trace_stats_callback_ = nullptr;
290     muxer_->GetTraceStats(session_id_, std::move(callback));
291   }
292   if (query_service_state_callback_) {
293     auto callback = std::move(query_service_state_callback_);
294     query_service_state_callback_ = nullptr;
295     muxer_->QueryServiceState(session_id_, std::move(callback));
296   }
297   if (stop_pending_)
298     muxer_->StopTracingSession(session_id_);
299 }
300 
OnDisconnect()301 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
302   PERFETTO_DCHECK_THREAD(thread_checker_);
303 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
304   if (!connected_ && backend_type_ == kSystemBackend) {
305     PERFETTO_ELOG(
306         "Unable to connect to the system tracing service as a consumer. On "
307         "Android, use the \"perfetto\" command line tool instead to start "
308         "system-wide tracing sessions");
309   }
310 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
311 
312   // Notify the client about disconnection.
313   NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
314 
315   // Make sure the client doesn't hang in a blocking start/stop because of the
316   // disconnection.
317   NotifyStartComplete();
318   NotifyStopComplete();
319 
320   // It shouldn't be necessary to call StopTracingSession. If we get this call
321   // it means that the service did shutdown before us, so there is no point
322   // trying it to ask it to stop the session. We should just remember to cleanup
323   // the consumer vector.
324   connected_ = false;
325 
326   // Notify the muxer that it is safe to destroy |this|. This is needed because
327   // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
328   // access until OnDisconnect() is called.
329   muxer_->OnConsumerDisconnected(this);
330 }
331 
Disconnect()332 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
333   // This is weird and deserves a comment.
334   //
335   // When we called the ConnectConsumer method on the service it returns
336   // us a ConsumerEndpoint which we stored in |service_|, however this
337   // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
338   // |this|. Part of the API contract to TracingService::ConnectConsumer is that
339   // the ConsumerImpl pointer has to be valid until the
340   // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
341   // ConsumerEndpoint |service_|. Eventually this will call
342   // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
343   // call the destructor of |this|.
344   service_.reset();
345 }
346 
OnTracingDisabled(const std::string & error)347 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
348     const std::string& error) {
349   PERFETTO_DCHECK_THREAD(thread_checker_);
350   PERFETTO_DCHECK(!stopped_);
351   stopped_ = true;
352 
353   if (!error.empty())
354     NotifyError(TracingError{TracingError::kTracingFailed, error});
355 
356   // If we're still waiting for the start event, fire it now. This may happen if
357   // there are no active data sources in the session.
358   NotifyStartComplete();
359   NotifyStopComplete();
360 }
361 
NotifyStartComplete()362 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
363   PERFETTO_DCHECK_THREAD(thread_checker_);
364   if (start_complete_callback_) {
365     muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
366     start_complete_callback_ = nullptr;
367   }
368   if (blocking_start_complete_callback_) {
369     muxer_->task_runner_->PostTask(
370         std::move(blocking_start_complete_callback_));
371     blocking_start_complete_callback_ = nullptr;
372   }
373 }
374 
NotifyError(const TracingError & error)375 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
376   PERFETTO_DCHECK_THREAD(thread_checker_);
377   if (error_callback_) {
378     muxer_->task_runner_->PostTask(
379         std::bind(std::move(error_callback_), error));
380   }
381 }
382 
NotifyStopComplete()383 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
384   PERFETTO_DCHECK_THREAD(thread_checker_);
385   if (stop_complete_callback_) {
386     muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
387     stop_complete_callback_ = nullptr;
388   }
389   if (blocking_stop_complete_callback_) {
390     muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
391     blocking_stop_complete_callback_ = nullptr;
392   }
393 }
394 
OnTraceData(std::vector<TracePacket> packets,bool has_more)395 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
396     std::vector<TracePacket> packets,
397     bool has_more) {
398   PERFETTO_DCHECK_THREAD(thread_checker_);
399   if (!read_trace_callback_)
400     return;
401 
402   size_t capacity = 0;
403   for (const auto& packet : packets) {
404     // 16 is an over-estimation of the proto preamble size
405     capacity += packet.size() + 16;
406   }
407 
408   // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
409   std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
410   buf->reserve(capacity);
411   for (auto& packet : packets) {
412     char* start;
413     size_t size;
414     std::tie(start, size) = packet.GetProtoPreamble();
415     buf->insert(buf->end(), start, start + size);
416     for (auto& slice : packet.slices()) {
417       const auto* slice_data = reinterpret_cast<const char*>(slice.start);
418       buf->insert(buf->end(), slice_data, slice_data + slice.size);
419     }
420   }
421 
422   auto callback = read_trace_callback_;
423   muxer_->task_runner_->PostTask([callback, buf, has_more] {
424     TracingSession::ReadTraceCallbackArgs callback_arg{};
425     callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
426     callback_arg.size = buf->size();
427     callback_arg.has_more = has_more;
428     callback(callback_arg);
429   });
430 
431   if (!has_more)
432     read_trace_callback_ = nullptr;
433 }
434 
OnObservableEvents(const ObservableEvents & events)435 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
436     const ObservableEvents& events) {
437   if (events.instance_state_changes_size()) {
438     for (const auto& state_change : events.instance_state_changes()) {
439       DataSourceHandle handle{state_change.producer_name(),
440                               state_change.data_source_name()};
441       data_source_states_[handle] =
442           state_change.state() ==
443           ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
444     }
445   }
446 
447   if (events.instance_state_changes_size() ||
448       events.all_data_sources_started()) {
449     // Data sources are first reported as being stopped before starting, so once
450     // all the data sources we know about have started we can declare tracing
451     // begun. In the case where there are no matching data sources for the
452     // session, the service will report the all_data_sources_started() event
453     // without adding any instances (only since Android S / Perfetto v10.0).
454     if (start_complete_callback_ || blocking_start_complete_callback_) {
455       bool all_data_sources_started = std::all_of(
456           data_source_states_.cbegin(), data_source_states_.cend(),
457           [](std::pair<DataSourceHandle, bool> state) { return state.second; });
458       if (all_data_sources_started)
459         NotifyStartComplete();
460     }
461   }
462 }
463 
OnTraceStats(bool success,const TraceStats & trace_stats)464 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
465     bool success,
466     const TraceStats& trace_stats) {
467   if (!get_trace_stats_callback_)
468     return;
469   TracingSession::GetTraceStatsCallbackArgs callback_arg{};
470   callback_arg.success = success;
471   callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
472   muxer_->task_runner_->PostTask(
473       std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
474   get_trace_stats_callback_ = nullptr;
475 }
476 
477 // The callbacks below are not used.
OnDetach(bool)478 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)479 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
480 // ----- End of TracingMuxerImpl::ConsumerImpl
481 
482 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
483 
484 // TracingSessionImpl is the RAII object returned to API clients when they
485 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
486 // tracing.
487 
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)488 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
489     TracingMuxerImpl* muxer,
490     TracingSessionGlobalID session_id,
491     BackendType backend_type)
492     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
493 
494 // Can be destroyed from any thread.
~TracingSessionImpl()495 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
496   auto* muxer = muxer_;
497   auto session_id = session_id_;
498   muxer->task_runner_->PostTask(
499       [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
500 }
501 
502 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)503 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
504                                                  int fd) {
505   auto* muxer = muxer_;
506   auto session_id = session_id_;
507   std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
508   if (fd >= 0) {
509     base::ignore_result(backend_type_);  // For -Wunused in the amalgamation.
510 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
511     if (backend_type_ != kInProcessBackend) {
512       PERFETTO_FATAL(
513           "Passing a file descriptor to TracingSession::Setup() is only "
514           "supported with the kInProcessBackend on Windows. Use "
515           "TracingSession::ReadTrace() instead");
516     }
517 #endif
518     trace_config->set_write_into_file(true);
519     fd = dup(fd);
520   }
521   muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
522     muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
523   });
524 }
525 
526 // Can be called from any thread.
Start()527 void TracingMuxerImpl::TracingSessionImpl::Start() {
528   auto* muxer = muxer_;
529   auto session_id = session_id_;
530   muxer->task_runner_->PostTask(
531       [muxer, session_id] { muxer->StartTracingSession(session_id); });
532 }
533 
534 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)535 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
536     const TraceConfig& cfg) {
537   auto* muxer = muxer_;
538   auto session_id = session_id_;
539   muxer->task_runner_->PostTask([muxer, session_id, cfg] {
540     muxer->ChangeTracingSessionConfig(session_id, cfg);
541   });
542 }
543 
544 // Can be called from any thread except the service thread.
StartBlocking()545 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
546   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
547   auto* muxer = muxer_;
548   auto session_id = session_id_;
549   base::WaitableEvent tracing_started;
550   muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
551     auto* consumer = muxer->FindConsumer(session_id);
552     if (!consumer) {
553       // TODO(skyostil): Signal an error to the user.
554       tracing_started.Notify();
555       return;
556     }
557     PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
558     consumer->blocking_start_complete_callback_ = [&] {
559       tracing_started.Notify();
560     };
561     muxer->StartTracingSession(session_id);
562   });
563   tracing_started.Wait();
564 }
565 
566 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)567 void TracingMuxerImpl::TracingSessionImpl::Flush(
568     std::function<void(bool)> user_callback,
569     uint32_t timeout_ms) {
570   auto* muxer = muxer_;
571   auto session_id = session_id_;
572   muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
573     auto* consumer = muxer->FindConsumer(session_id);
574     if (!consumer) {
575       std::move(user_callback)(false);
576       return;
577     }
578     muxer->FlushTracingSession(session_id, timeout_ms,
579                                std::move(user_callback));
580   });
581 }
582 
583 // Can be called from any thread.
Stop()584 void TracingMuxerImpl::TracingSessionImpl::Stop() {
585   auto* muxer = muxer_;
586   auto session_id = session_id_;
587   muxer->task_runner_->PostTask(
588       [muxer, session_id] { muxer->StopTracingSession(session_id); });
589 }
590 
591 // Can be called from any thread except the service thread.
StopBlocking()592 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
593   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
594   auto* muxer = muxer_;
595   auto session_id = session_id_;
596   base::WaitableEvent tracing_stopped;
597   muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
598     auto* consumer = muxer->FindConsumer(session_id);
599     if (!consumer) {
600       // TODO(skyostil): Signal an error to the user.
601       tracing_stopped.Notify();
602       return;
603     }
604     PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
605     consumer->blocking_stop_complete_callback_ = [&] {
606       tracing_stopped.Notify();
607     };
608     muxer->StopTracingSession(session_id);
609   });
610   tracing_stopped.Wait();
611 }
612 
613 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)614 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
615   auto* muxer = muxer_;
616   auto session_id = session_id_;
617   muxer->task_runner_->PostTask([muxer, session_id, cb] {
618     muxer->ReadTracingSessionData(session_id, std::move(cb));
619   });
620 }
621 
622 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)623 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
624     std::function<void()> cb) {
625   auto* muxer = muxer_;
626   auto session_id = session_id_;
627   muxer->task_runner_->PostTask([muxer, session_id, cb] {
628     auto* consumer = muxer->FindConsumer(session_id);
629     if (!consumer)
630       return;
631     consumer->start_complete_callback_ = cb;
632   });
633 }
634 
635 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)636 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
637     std::function<void(TracingError)> cb) {
638   auto* muxer = muxer_;
639   auto session_id = session_id_;
640   muxer->task_runner_->PostTask([muxer, session_id, cb] {
641     auto* consumer = muxer->FindConsumer(session_id);
642     if (!consumer) {
643       // Notify the client about concurrent disconnection of the session.
644       if (cb)
645         cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
646       return;
647     }
648     consumer->error_callback_ = cb;
649   });
650 }
651 
652 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)653 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
654     std::function<void()> cb) {
655   auto* muxer = muxer_;
656   auto session_id = session_id_;
657   muxer->task_runner_->PostTask([muxer, session_id, cb] {
658     auto* consumer = muxer->FindConsumer(session_id);
659     if (!consumer)
660       return;
661     consumer->stop_complete_callback_ = cb;
662   });
663 }
664 
665 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)666 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
667     GetTraceStatsCallback cb) {
668   auto* muxer = muxer_;
669   auto session_id = session_id_;
670   muxer->task_runner_->PostTask([muxer, session_id, cb] {
671     muxer->GetTraceStats(session_id, std::move(cb));
672   });
673 }
674 
675 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)676 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
677     QueryServiceStateCallback cb) {
678   auto* muxer = muxer_;
679   auto session_id = session_id_;
680   muxer->task_runner_->PostTask([muxer, session_id, cb] {
681     muxer->QueryServiceState(session_id, std::move(cb));
682   });
683 }
684 
685 // ----- End of TracingMuxerImpl::TracingSessionImpl
686 
687 // static
688 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
689 
690 // This is called by perfetto::Tracing::Initialize().
691 // Can be called on any thread. Typically, but not necessarily, that will be
692 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)693 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
694     : TracingMuxer(args.platform ? args.platform
695                                  : Platform::GetDefaultPlatform()) {
696   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
697   instance_ = this;
698 
699   // Create the thread where muxer, producers and service will live.
700   task_runner_.reset(
701       new NonReentrantTaskRunner(this, platform_->CreateTaskRunner({})));
702 
703   // Run the initializer on that thread.
704   task_runner_->PostTask([this, args] { Initialize(args); });
705 }
706 
Initialize(const TracingInitArgs & args)707 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
708   PERFETTO_DCHECK_THREAD(thread_checker_);  // Rebind the thread checker.
709 
710   policy_ = args.tracing_policy;
711 
712   auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
713     if (!backend) {
714       // We skip the log in release builds because the *_backend_fake.cc code
715       // has already an ELOG before returning a nullptr.
716       PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
717       return;
718     }
719     TracingBackendId backend_id = backends_.size();
720     backends_.emplace_back();
721     RegisteredBackend& rb = backends_.back();
722     rb.backend = backend;
723     rb.id = backend_id;
724     rb.type = type;
725     rb.producer.reset(new ProducerImpl(this, backend_id,
726                                        args.shmem_batch_commits_duration_ms));
727     rb.producer_conn_args.producer = rb.producer.get();
728     rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
729     rb.producer_conn_args.task_runner = task_runner_.get();
730     rb.producer_conn_args.shmem_size_hint_bytes =
731         args.shmem_size_hint_kb * 1024;
732     rb.producer_conn_args.shmem_page_size_hint_bytes =
733         args.shmem_page_size_hint_kb * 1024;
734     rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
735   };
736 
737   if (args.backends & kSystemBackend) {
738     PERFETTO_CHECK(args.system_backend_factory_);
739     add_backend(args.system_backend_factory_(), kSystemBackend);
740   }
741 
742   if (args.backends & kInProcessBackend) {
743     PERFETTO_CHECK(args.in_process_backend_factory_);
744     add_backend(args.in_process_backend_factory_(), kInProcessBackend);
745   }
746 
747   if (args.backends & kCustomBackend) {
748     PERFETTO_CHECK(args.custom_backend);
749     add_backend(args.custom_backend, kCustomBackend);
750   }
751 
752   if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
753     PERFETTO_FATAL("Unsupported tracing backend type");
754   }
755 
756   // Fallback backend for consumer creation for an unsupported backend type.
757   // This backend simply fails any attempt to start a tracing session.
758   // NOTE: This backend instance has to be added last.
759   add_backend(internal::TracingBackendFake::GetInstance(),
760               BackendType::kUnspecifiedBackend);
761 }
762 
763 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceStaticState * static_state)764 bool TracingMuxerImpl::RegisterDataSource(
765     const DataSourceDescriptor& descriptor,
766     DataSourceFactory factory,
767     DataSourceStaticState* static_state) {
768   // Ignore repeated registrations.
769   if (static_state->index != kMaxDataSources)
770     return true;
771 
772   static std::atomic<uint32_t> last_id{};
773   uint32_t new_index = last_id++;
774   if (new_index >= kMaxDataSources) {
775     PERFETTO_DLOG(
776         "RegisterDataSource failed: too many data sources already registered");
777     return false;
778   }
779 
780   // Initialize the static state.
781   static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
782                 "instances[] size mismatch");
783   for (size_t i = 0; i < static_state->instances.size(); i++)
784     new (&static_state->instances[i]) DataSourceState{};
785 
786   static_state->index = new_index;
787 
788   task_runner_->PostTask([this, descriptor, factory, static_state] {
789     data_sources_.emplace_back();
790     RegisteredDataSource& rds = data_sources_.back();
791     rds.descriptor = descriptor;
792     rds.factory = factory;
793     rds.static_state = static_state;
794     UpdateDataSourcesOnAllBackends();
795   });
796   return true;
797 }
798 
799 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)800 void TracingMuxerImpl::RegisterInterceptor(
801     const InterceptorDescriptor& descriptor,
802     InterceptorFactory factory,
803     InterceptorBase::TLSFactory tls_factory,
804     InterceptorBase::TracePacketCallback packet_callback) {
805   task_runner_->PostTask(
806       [this, descriptor, factory, tls_factory, packet_callback] {
807         // Ignore repeated registrations.
808         for (const auto& interceptor : interceptors_) {
809           if (interceptor.descriptor.name() == descriptor.name()) {
810             PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
811             PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
812             return;
813           }
814         }
815         // Only allow certain interceptors for now.
816         if (descriptor.name() != "test_interceptor" &&
817             descriptor.name() != "console") {
818           PERFETTO_ELOG(
819               "Interceptors are experimental. If you want to use them, please "
820               "get in touch with the project maintainers "
821               "(https://perfetto.dev/docs/contributing/"
822               "getting-started#community).");
823           return;
824         }
825         interceptors_.emplace_back();
826         RegisteredInterceptor& interceptor = interceptors_.back();
827         interceptor.descriptor = descriptor;
828         interceptor.factory = factory;
829         interceptor.tls_factory = tls_factory;
830         interceptor.packet_callback = packet_callback;
831       });
832 }
833 
834 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)835 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
836                                        uint32_t backend_connection_id,
837                                        DataSourceInstanceID instance_id,
838                                        const DataSourceConfig& cfg) {
839   PERFETTO_DCHECK_THREAD(thread_checker_);
840   PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
841                 cfg.name().c_str());
842   uint64_t config_hash = ComputeConfigHash(cfg);
843 
844   for (const auto& rds : data_sources_) {
845     if (rds.descriptor.name() != cfg.name())
846       continue;
847     DataSourceStaticState& static_state = *rds.static_state;
848 
849     // If this data source is already active for this exact config, don't start
850     // another instance. This happens when we have several data sources with the
851     // same name, in which case the service sends one SetupDataSource event for
852     // each one. Since we can't map which event maps to which data source, we
853     // ensure each event only starts one data source instance.
854     // TODO(skyostil): Register a unique id with each data source to the service
855     // to disambiguate.
856     bool active_for_config = false;
857     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
858       if (!static_state.TryGet(i))
859         continue;
860       auto* internal_state =
861           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
862       if (internal_state->backend_id == backend_id &&
863           internal_state->config_hash == config_hash) {
864         active_for_config = true;
865         break;
866       }
867     }
868     if (active_for_config) {
869       PERFETTO_DLOG(
870           "Data source %s is already active with this config, skipping",
871           cfg.name().c_str());
872       continue;
873     }
874 
875     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
876       // Find a free slot.
877       if (static_state.TryGet(i))
878         continue;
879 
880       auto* internal_state =
881           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
882       std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
883       static_assert(
884           std::is_same<decltype(internal_state->data_source_instance_id),
885                        DataSourceInstanceID>::value,
886           "data_source_instance_id type mismatch");
887       internal_state->backend_id = backend_id;
888       internal_state->backend_connection_id = backend_connection_id;
889       internal_state->data_source_instance_id = instance_id;
890       internal_state->buffer_id =
891           static_cast<internal::BufferId>(cfg.target_buffer());
892       internal_state->config_hash = config_hash;
893       internal_state->data_source = rds.factory();
894       internal_state->interceptor = nullptr;
895       internal_state->interceptor_id = 0;
896 
897       if (cfg.has_interceptor_config()) {
898         for (size_t j = 0; j < interceptors_.size(); j++) {
899           if (cfg.interceptor_config().name() ==
900               interceptors_[j].descriptor.name()) {
901             PERFETTO_DLOG("Intercepting data source %" PRIu64
902                           " \"%s\" into \"%s\"",
903                           instance_id, cfg.name().c_str(),
904                           cfg.interceptor_config().name().c_str());
905             internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
906             internal_state->interceptor = interceptors_[j].factory();
907             internal_state->interceptor->OnSetup({cfg});
908             break;
909           }
910         }
911         if (!internal_state->interceptor_id) {
912           PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
913                         cfg.interceptor_config().name().c_str());
914         }
915       }
916 
917       // This must be made at the end. See matching acquire-load in
918       // DataSource::Trace().
919       static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
920 
921       DataSourceBase::SetupArgs setup_args;
922       setup_args.config = &cfg;
923       setup_args.internal_instance_index = i;
924       internal_state->data_source->OnSetup(setup_args);
925       return;
926     }
927     PERFETTO_ELOG(
928         "Maximum number of data source instances exhausted. "
929         "Dropping data source %" PRIu64,
930         instance_id);
931     break;
932   }
933 }
934 
935 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)936 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
937                                        DataSourceInstanceID instance_id) {
938   PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
939   PERFETTO_DCHECK_THREAD(thread_checker_);
940 
941   auto ds = FindDataSource(backend_id, instance_id);
942   if (!ds) {
943     PERFETTO_ELOG("Could not find data source to start");
944     return;
945   }
946 
947   DataSourceBase::StartArgs start_args{};
948   start_args.internal_instance_index = ds.instance_idx;
949 
950   std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
951   if (ds.internal_state->interceptor)
952     ds.internal_state->interceptor->OnStart({});
953   ds.internal_state->trace_lambda_enabled = true;
954   ds.internal_state->data_source->OnStart(start_args);
955 }
956 
957 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)958 void TracingMuxerImpl::StopDataSource_AsyncBegin(
959     TracingBackendId backend_id,
960     DataSourceInstanceID instance_id) {
961   PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
962   PERFETTO_DCHECK_THREAD(thread_checker_);
963 
964   auto ds = FindDataSource(backend_id, instance_id);
965   if (!ds) {
966     PERFETTO_ELOG("Could not find data source to stop");
967     return;
968   }
969 
970   StopArgsImpl stop_args{};
971   stop_args.internal_instance_index = ds.instance_idx;
972   stop_args.async_stop_closure = [this, backend_id, instance_id] {
973     // TracingMuxerImpl is long lived, capturing |this| is okay.
974     // The notification closure can be moved out of the StopArgs by the
975     // embedder to handle stop asynchronously. The embedder might then
976     // call the closure on a different thread than the current one, hence
977     // this nested PostTask().
978     task_runner_->PostTask([this, backend_id, instance_id] {
979       StopDataSource_AsyncEnd(backend_id, instance_id);
980     });
981   };
982 
983   {
984     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
985     if (ds.internal_state->interceptor)
986       ds.internal_state->interceptor->OnStop({});
987     ds.internal_state->data_source->OnStop(stop_args);
988   }
989 
990   // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
991   // async closure here. In theory we could avoid the PostTask and call
992   // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
993   // divergencies between the deferred-stop vs non-deferred-stop code paths.
994   if (stop_args.async_stop_closure)
995     std::move(stop_args.async_stop_closure)();
996 }
997 
StopDataSource_AsyncEnd(TracingBackendId backend_id,DataSourceInstanceID instance_id)998 void TracingMuxerImpl::StopDataSource_AsyncEnd(
999     TracingBackendId backend_id,
1000     DataSourceInstanceID instance_id) {
1001   PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1002   PERFETTO_DCHECK_THREAD(thread_checker_);
1003 
1004   auto ds = FindDataSource(backend_id, instance_id);
1005   if (!ds) {
1006     PERFETTO_ELOG(
1007         "Async stop of data source %" PRIu64
1008         " failed. This might be due to calling the async_stop_closure twice.",
1009         instance_id);
1010     return;
1011   }
1012 
1013   const uint32_t mask = ~(1 << ds.instance_idx);
1014   ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1015 
1016   // Take the mutex to prevent that the data source is in the middle of
1017   // a Trace() execution where it called GetDataSourceLocked() while we
1018   // destroy it.
1019   {
1020     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1021     ds.internal_state->trace_lambda_enabled = false;
1022     ds.internal_state->data_source.reset();
1023   }
1024 
1025   // The other fields of internal_state are deliberately *not* cleared.
1026   // See races-related comments of DataSource::Trace().
1027 
1028   TracingMuxer::generation_++;
1029 
1030   // |backends_| is append-only, Backend instances are always valid.
1031   PERFETTO_CHECK(backend_id < backends_.size());
1032   ProducerImpl* producer = backends_[backend_id].producer.get();
1033   if (!producer)
1034     return;
1035   if (producer->connected_) {
1036     // Flush any commits that might have been batched by SharedMemoryArbiter.
1037     producer->service_->MaybeSharedMemoryArbiter()
1038         ->FlushPendingCommitDataRequests();
1039     producer->service_->NotifyDataSourceStopped(instance_id);
1040   }
1041   producer->SweepDeadServices();
1042 }
1043 
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1044 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1045     TracingBackendId backend_id,
1046     DataSourceInstanceID instance_id) {
1047   PERFETTO_DCHECK_THREAD(thread_checker_);
1048   PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1049                 instance_id);
1050   auto ds = FindDataSource(backend_id, instance_id);
1051   if (!ds) {
1052     PERFETTO_ELOG("Could not find data source to clear incremental state for");
1053     return;
1054   }
1055   // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1056   // the incremental state should be cleared.
1057   ds.static_state->incremental_state_generation.fetch_add(
1058       1, std::memory_order_relaxed);
1059 }
1060 
SyncProducersForTesting()1061 void TracingMuxerImpl::SyncProducersForTesting() {
1062   std::mutex mutex;
1063   std::condition_variable cv;
1064 
1065   // IPC-based producers don't report connection errors explicitly for each
1066   // command, but instead with an asynchronous callback
1067   // (ProducerImpl::OnDisconnected). This means that the sync command below
1068   // may have completed but failed to reach the service because of a
1069   // disconnection, but we can't tell until the disconnection message comes
1070   // through. To guard against this, we run two whole rounds of sync round-trips
1071   // before returning; the first one will detect any disconnected producers and
1072   // the second one will ensure any reconnections have completed and all data
1073   // sources are registered in the service again.
1074   for (size_t i = 0; i < 2; i++) {
1075     size_t countdown = std::numeric_limits<size_t>::max();
1076     task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1077       {
1078         std::unique_lock<std::mutex> countdown_lock(mutex);
1079         countdown = backends_.size();
1080       }
1081       for (auto& backend : backends_) {
1082         auto* producer = backend.producer.get();
1083         producer->service_->Sync([&mutex, &cv, &countdown] {
1084           std::unique_lock<std::mutex> countdown_lock(mutex);
1085           countdown--;
1086           cv.notify_one();
1087         });
1088       }
1089     });
1090 
1091     {
1092       std::unique_lock<std::mutex> countdown_lock(mutex);
1093       cv.wait(countdown_lock, [&countdown] { return !countdown; });
1094     }
1095   }
1096 
1097   // Check that all producers are indeed connected.
1098   bool done = false;
1099   bool all_producers_connected = true;
1100   task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1101     for (auto& backend : backends_)
1102       all_producers_connected &= backend.producer->connected_;
1103     std::unique_lock<std::mutex> lock(mutex);
1104     done = true;
1105     cv.notify_one();
1106   });
1107 
1108   {
1109     std::unique_lock<std::mutex> lock(mutex);
1110     cv.wait(lock, [&done] { return done; });
1111   }
1112   PERFETTO_DCHECK(all_producers_connected);
1113 }
1114 
DestroyStoppedTraceWritersForCurrentThread()1115 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1116   // Iterate across all possible data source types.
1117   auto cur_generation = generation_.load(std::memory_order_acquire);
1118   auto* root_tls = GetOrCreateTracingTLS();
1119 
1120   auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1121     // |tls| has a vector of per-data-source-instance thread-local state.
1122     DataSourceStaticState* static_state = tls.static_state;
1123     if (!static_state)
1124       return;  // Slot not used.
1125 
1126     // Iterate across all possible instances for this data source.
1127     for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1128       DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1129       if (!ds_tls.trace_writer)
1130         continue;
1131 
1132       DataSourceState* ds_state = static_state->TryGet(inst);
1133       if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
1134           ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1135           ds_state->buffer_id == ds_tls.buffer_id &&
1136           ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1137         continue;
1138       }
1139 
1140       // The DataSource instance has been destroyed or recycled.
1141       ds_tls.Reset();  // Will also destroy the |ds_tls.trace_writer|.
1142     }
1143   };
1144 
1145   for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1146     // |tls| has a vector of per-data-source-instance thread-local state.
1147     DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1148     destroy_stopped_instances(tls);
1149   }
1150   destroy_stopped_instances(root_tls->track_event_tls);
1151   root_tls->generation = cur_generation;
1152 }
1153 
1154 // Called both when a new data source is registered or when a new backend
1155 // connects. In both cases we want to be sure we reflected the data source
1156 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1157 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1158   PERFETTO_DCHECK_THREAD(thread_checker_);
1159   for (RegisteredDataSource& rds : data_sources_) {
1160     for (RegisteredBackend& backend : backends_) {
1161       // We cannot call RegisterDataSource on the backend before it connects.
1162       if (!backend.producer->connected_)
1163         continue;
1164 
1165       PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1166       if (backend.producer->registered_data_sources_.test(
1167               rds.static_state->index))
1168         continue;
1169 
1170       rds.descriptor.set_will_notify_on_start(true);
1171       rds.descriptor.set_will_notify_on_stop(true);
1172       rds.descriptor.set_handles_incremental_state_clear(true);
1173       backend.producer->service_->RegisterDataSource(rds.descriptor);
1174       backend.producer->registered_data_sources_.set(rds.static_state->index);
1175     }
1176   }
1177 }
1178 
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1179 void TracingMuxerImpl::SetupTracingSession(
1180     TracingSessionGlobalID session_id,
1181     const std::shared_ptr<TraceConfig>& trace_config,
1182     base::ScopedFile trace_fd) {
1183   PERFETTO_DCHECK_THREAD(thread_checker_);
1184   PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1185 
1186   auto* consumer = FindConsumer(session_id);
1187   if (!consumer)
1188     return;
1189 
1190   consumer->trace_config_ = trace_config;
1191   if (trace_fd)
1192     consumer->trace_fd_ = std::move(trace_fd);
1193 
1194   if (!consumer->connected_)
1195     return;
1196 
1197   // Only used in the deferred start mode.
1198   if (trace_config->deferred_start()) {
1199     consumer->service_->EnableTracing(*trace_config,
1200                                       std::move(consumer->trace_fd_));
1201   }
1202 }
1203 
StartTracingSession(TracingSessionGlobalID session_id)1204 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1205   PERFETTO_DCHECK_THREAD(thread_checker_);
1206 
1207   auto* consumer = FindConsumer(session_id);
1208 
1209   if (!consumer)
1210     return;
1211 
1212   if (!consumer->trace_config_) {
1213     PERFETTO_ELOG("Must call Setup(config) first");
1214     return;
1215   }
1216 
1217   if (!consumer->connected_) {
1218     consumer->start_pending_ = true;
1219     return;
1220   }
1221 
1222   consumer->start_pending_ = false;
1223   if (consumer->trace_config_->deferred_start()) {
1224     consumer->service_->StartTracing();
1225   } else {
1226     consumer->service_->EnableTracing(*consumer->trace_config_,
1227                                       std::move(consumer->trace_fd_));
1228   }
1229 
1230   // TODO implement support for the deferred-start + fast-triggering case.
1231 }
1232 
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1233 void TracingMuxerImpl::ChangeTracingSessionConfig(
1234     TracingSessionGlobalID session_id,
1235     const TraceConfig& trace_config) {
1236   PERFETTO_DCHECK_THREAD(thread_checker_);
1237 
1238   auto* consumer = FindConsumer(session_id);
1239 
1240   if (!consumer)
1241     return;
1242 
1243   if (!consumer->trace_config_) {
1244     // Changing the config is only supported for started sessions.
1245     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1246     return;
1247   }
1248 
1249   consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1250   if (consumer->connected_)
1251     consumer->service_->ChangeTraceConfig(trace_config);
1252 }
1253 
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1254 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1255                                            uint32_t timeout_ms,
1256                                            std::function<void(bool)> callback) {
1257   PERFETTO_DCHECK_THREAD(thread_checker_);
1258   auto* consumer = FindConsumer(session_id);
1259   if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1260       !consumer->trace_config_) {
1261     PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1262     std::move(callback)(false);
1263     return;
1264   }
1265 
1266   consumer->service_->Flush(timeout_ms, std::move(callback));
1267 }
1268 
StopTracingSession(TracingSessionGlobalID session_id)1269 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1270   PERFETTO_DCHECK_THREAD(thread_checker_);
1271   auto* consumer = FindConsumer(session_id);
1272   if (!consumer)
1273     return;
1274 
1275   if (consumer->start_pending_) {
1276     // If the session hasn't started yet, wait until it does before stopping.
1277     consumer->stop_pending_ = true;
1278     return;
1279   }
1280 
1281   consumer->stop_pending_ = false;
1282   if (consumer->stopped_) {
1283     // If the session was already stopped (e.g., it failed to start), don't try
1284     // stopping again.
1285     consumer->NotifyStopComplete();
1286   } else if (!consumer->trace_config_) {
1287     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1288     return;
1289   } else {
1290     consumer->service_->DisableTracing();
1291   }
1292 
1293   consumer->trace_config_.reset();
1294 }
1295 
DestroyTracingSession(TracingSessionGlobalID session_id)1296 void TracingMuxerImpl::DestroyTracingSession(
1297     TracingSessionGlobalID session_id) {
1298   PERFETTO_DCHECK_THREAD(thread_checker_);
1299   for (RegisteredBackend& backend : backends_) {
1300     // We need to find the consumer (if any) and call Disconnect as we destroy
1301     // the tracing session. We can't call Disconnect() inside this for loop
1302     // because in the in-process case this will end up to a synchronous call to
1303     // OnConsumerDisconnect which will invalidate all the iterators to
1304     // |backend.consumers|.
1305     ConsumerImpl* consumer = nullptr;
1306     for (auto& con : backend.consumers) {
1307       if (con->session_id_ == session_id) {
1308         consumer = con.get();
1309         break;
1310       }
1311     }
1312     if (consumer) {
1313       // We broke out of the loop above on the assumption that each backend will
1314       // only have a single consumer per session. This DCHECK ensures that
1315       // this is the case.
1316       PERFETTO_DCHECK(
1317           std::count_if(backend.consumers.begin(), backend.consumers.end(),
1318                         [session_id](const std::unique_ptr<ConsumerImpl>& con) {
1319                           return con->session_id_ == session_id;
1320                         }) == 1u);
1321       consumer->Disconnect();
1322     }
1323   }
1324 }
1325 
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)1326 void TracingMuxerImpl::ReadTracingSessionData(
1327     TracingSessionGlobalID session_id,
1328     std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
1329   PERFETTO_DCHECK_THREAD(thread_checker_);
1330   auto* consumer = FindConsumer(session_id);
1331   if (!consumer) {
1332     // TODO(skyostil): Signal an error to the user.
1333     TracingSession::ReadTraceCallbackArgs callback_arg{};
1334     callback(callback_arg);
1335     return;
1336   }
1337   PERFETTO_DCHECK(!consumer->read_trace_callback_);
1338   consumer->read_trace_callback_ = std::move(callback);
1339   consumer->service_->ReadBuffers();
1340 }
1341 
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)1342 void TracingMuxerImpl::GetTraceStats(
1343     TracingSessionGlobalID session_id,
1344     TracingSession::GetTraceStatsCallback callback) {
1345   PERFETTO_DCHECK_THREAD(thread_checker_);
1346   auto* consumer = FindConsumer(session_id);
1347   if (!consumer) {
1348     TracingSession::GetTraceStatsCallbackArgs callback_arg{};
1349     callback_arg.success = false;
1350     callback(std::move(callback_arg));
1351     return;
1352   }
1353   PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
1354   consumer->get_trace_stats_callback_ = std::move(callback);
1355   if (!consumer->connected_) {
1356     consumer->get_trace_stats_pending_ = true;
1357     return;
1358   }
1359   consumer->get_trace_stats_pending_ = false;
1360   consumer->service_->GetTraceStats();
1361 }
1362 
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)1363 void TracingMuxerImpl::QueryServiceState(
1364     TracingSessionGlobalID session_id,
1365     TracingSession::QueryServiceStateCallback callback) {
1366   PERFETTO_DCHECK_THREAD(thread_checker_);
1367   auto* consumer = FindConsumer(session_id);
1368   if (!consumer) {
1369     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1370     callback_arg.success = false;
1371     callback(std::move(callback_arg));
1372     return;
1373   }
1374   PERFETTO_DCHECK(!consumer->query_service_state_callback_);
1375   if (!consumer->connected_) {
1376     consumer->query_service_state_callback_ = std::move(callback);
1377     return;
1378   }
1379   auto callback_wrapper = [callback](bool success,
1380                                      protos::gen::TracingServiceState state) {
1381     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1382     callback_arg.success = success;
1383     callback_arg.service_state_data = state.SerializeAsArray();
1384     callback(std::move(callback_arg));
1385   };
1386   consumer->service_->QueryServiceState(std::move(callback_wrapper));
1387 }
1388 
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)1389 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
1390     uint32_t batch_commits_duration_ms,
1391     BackendType backend_type) {
1392   for (RegisteredBackend& backend : backends_) {
1393     if (backend.producer && backend.producer->connected_ &&
1394         backend.type == backend_type) {
1395       backend.producer->service_->MaybeSharedMemoryArbiter()
1396           ->SetBatchCommitsDuration(batch_commits_duration_ms);
1397     }
1398   }
1399 }
1400 
EnableDirectSMBPatchingForTesting(BackendType backend_type)1401 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
1402     BackendType backend_type) {
1403   for (RegisteredBackend& backend : backends_) {
1404     if (backend.producer && backend.producer->connected_ &&
1405         backend.type == backend_type &&
1406         !backend.producer->service_->MaybeSharedMemoryArbiter()
1407              ->EnableDirectSMBPatching()) {
1408       return false;
1409     }
1410   }
1411   return true;
1412 }
1413 
FindConsumer(TracingSessionGlobalID session_id)1414 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
1415     TracingSessionGlobalID session_id) {
1416   PERFETTO_DCHECK_THREAD(thread_checker_);
1417   for (RegisteredBackend& backend : backends_) {
1418     for (auto& consumer : backend.consumers) {
1419       if (consumer->session_id_ == session_id) {
1420         return consumer.get();
1421       }
1422     }
1423   }
1424   return nullptr;
1425 }
1426 
InitializeConsumer(TracingSessionGlobalID session_id)1427 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
1428   PERFETTO_DCHECK_THREAD(thread_checker_);
1429 
1430   auto* consumer = FindConsumer(session_id);
1431   if (!consumer)
1432     return;
1433 
1434   TracingBackendId backend_id = consumer->backend_id_;
1435   // |backends_| is append-only, Backend instances are always valid.
1436   PERFETTO_CHECK(backend_id < backends_.size());
1437   RegisteredBackend& backend = backends_[backend_id];
1438 
1439   TracingBackend::ConnectConsumerArgs conn_args;
1440   conn_args.consumer = consumer;
1441   conn_args.task_runner = task_runner_.get();
1442   consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
1443 }
1444 
OnConsumerDisconnected(ConsumerImpl * consumer)1445 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
1446   PERFETTO_DCHECK_THREAD(thread_checker_);
1447   for (RegisteredBackend& backend : backends_) {
1448     auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
1449       return con.get() == consumer;
1450     };
1451     backend.consumers.erase(std::remove_if(backend.consumers.begin(),
1452                                            backend.consumers.end(), pred),
1453                             backend.consumers.end());
1454   }
1455 }
1456 
SetMaxProducerReconnectionsForTesting(uint32_t count)1457 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
1458   max_producer_reconnections_.store(count);
1459 }
1460 
OnProducerDisconnected(ProducerImpl * producer)1461 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
1462   PERFETTO_DCHECK_THREAD(thread_checker_);
1463   for (RegisteredBackend& backend : backends_) {
1464     if (backend.producer.get() != producer)
1465       continue;
1466     // Try reconnecting the disconnected producer. If the connection succeeds,
1467     // all the data sources will be automatically re-registered.
1468     if (producer->connection_id_ > max_producer_reconnections_.load()) {
1469       // Avoid reconnecting a failing producer too many times. Instead we just
1470       // leak the producer instead of trying to avoid further complicating
1471       // cross-thread trace writer creation.
1472       PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
1473       continue;
1474     }
1475     backend.producer->Initialize(
1476         backend.backend->ConnectProducer(backend.producer_conn_args));
1477   }
1478 
1479   // Increment the generation counter to atomically ensure that:
1480   // 1. Old trace writers from the severed connection eventually get cleaned up
1481   //    by DestroyStoppedTraceWritersForCurrentThread().
1482   // 2. No new trace writers can be created for the SharedMemoryArbiter from the
1483   //    old connection.
1484   TracingMuxer::generation_++;
1485 }
1486 
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1487 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
1488     TracingBackendId backend_id,
1489     DataSourceInstanceID instance_id) {
1490   PERFETTO_DCHECK_THREAD(thread_checker_);
1491   for (const auto& rds : data_sources_) {
1492     DataSourceStaticState* static_state = rds.static_state;
1493     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1494       auto* internal_state = static_state->TryGet(i);
1495       if (internal_state && internal_state->backend_id == backend_id &&
1496           internal_state->data_source_instance_id == instance_id) {
1497         return FindDataSourceRes(static_state, internal_state, i);
1498       }
1499     }
1500   }
1501   return FindDataSourceRes();
1502 }
1503 
1504 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)1505 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
1506     DataSourceStaticState* static_state,
1507     uint32_t data_source_instance_index,
1508     DataSourceState* data_source,
1509     BufferExhaustedPolicy buffer_exhausted_policy) {
1510   if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
1511     // If the session is being intercepted, return a heap-backed trace writer
1512     // instead. This is safe because all the data given to the interceptor is
1513     // either thread-local (|instance_index|), statically allocated
1514     // (|static_state|) or constant after initialization (|interceptor|). Access
1515     // to the interceptor instance itself through |data_source| is protected by
1516     // a statically allocated lock (similarly to the data source instance).
1517     auto& interceptor = interceptors_[data_source->interceptor_id - 1];
1518     return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
1519         interceptor.tls_factory(static_state, data_source_instance_index),
1520         interceptor.packet_callback, static_state, data_source_instance_index));
1521   }
1522   ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
1523   // Atomically load the current service endpoint. We keep the pointer as a
1524   // shared pointer on the stack to guard against it from being concurrently
1525   // modified on the thread by ProducerImpl::Initialize() swapping in a
1526   // reconnected service on the muxer task runner thread.
1527   //
1528   // The endpoint may also be concurrently modified by SweepDeadServices()
1529   // clearing out old disconnected services. We guard against that by
1530   // SharedMemoryArbiter keeping track of any outstanding trace writers. After
1531   // shutdown has started, the trace writer created below will be a null one
1532   // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
1533   //
1534   // We use an atomic pointer instead of holding a lock because
1535   // CreateTraceWriter posts tasks under the hood.
1536   std::shared_ptr<ProducerEndpoint> service =
1537       std::atomic_load(&producer->service_);
1538   return service->CreateTraceWriter(data_source->buffer_id,
1539                                     buffer_exhausted_policy);
1540 }
1541 
1542 // This is called via the public API Tracing::NewTrace().
1543 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type)1544 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
1545     BackendType requested_backend_type) {
1546   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
1547 
1548   // |backend_type| can only specify one backend, not an OR-ed mask.
1549   PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
1550 
1551   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
1552   task_runner_->PostTask([this, requested_backend_type, session_id] {
1553     for (RegisteredBackend& backend : backends_) {
1554       if (requested_backend_type && backend.type &&
1555           backend.type != requested_backend_type) {
1556         continue;
1557       }
1558 
1559       TracingBackendId backend_id = backend.id;
1560 
1561       // Create the consumer now, even if we have to ask the embedder below, so
1562       // that any other tasks executing after this one can find the consumer and
1563       // change its pending attributes.
1564       backend.consumers.emplace_back(
1565           new ConsumerImpl(this, backend.type, backend.id, session_id));
1566 
1567       // The last registered backend in |backends_| is the unsupported backend
1568       // without a valid type.
1569       if (!backend.type) {
1570         PERFETTO_ELOG(
1571             "No tracing backend ready for type=%d, consumer will disconnect",
1572             requested_backend_type);
1573         InitializeConsumer(session_id);
1574         return;
1575       }
1576 
1577       // Check if the embedder wants to be asked for permission before
1578       // connecting the consumer.
1579       if (!policy_) {
1580         InitializeConsumer(session_id);
1581         return;
1582       }
1583 
1584       TracingPolicy::ShouldAllowConsumerSessionArgs args;
1585       args.backend_type = backend.type;
1586       args.result_callback = [this, backend_id, session_id](bool allow) {
1587         task_runner_->PostTask([this, backend_id, session_id, allow] {
1588           if (allow) {
1589             InitializeConsumer(session_id);
1590             return;
1591           }
1592 
1593           PERFETTO_ELOG(
1594               "Consumer session for backend type type=%d forbidden, "
1595               "consumer will disconnect",
1596               backends_[backend_id].type);
1597 
1598           auto* consumer = FindConsumer(session_id);
1599           if (!consumer)
1600             return;
1601 
1602           consumer->OnDisconnect();
1603         });
1604       };
1605       policy_->ShouldAllowConsumerSession(args);
1606       return;
1607     }
1608     PERFETTO_DFATAL("Not reached");
1609   });
1610 
1611   return std::unique_ptr<TracingSession>(
1612       new TracingSessionImpl(this, session_id, requested_backend_type));
1613 }
1614 
InitializeInstance(const TracingInitArgs & args)1615 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
1616   if (instance_ != TracingMuxerFake::Get())
1617     PERFETTO_FATAL("Tracing already initialized");
1618   new TracingMuxerImpl(args);
1619 }
1620 
1621 TracingMuxer::~TracingMuxer() = default;
1622 
1623 static_assert(std::is_same<internal::BufferId, BufferID>::value,
1624               "public's BufferId and tracing/core's BufferID diverged");
1625 
1626 }  // namespace internal
1627 }  // namespace perfetto
1628