1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_TRACING_SERVICE_IMPL_H_
18 #define SRC_TRACING_CORE_TRACING_SERVICE_IMPL_H_
19 
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <mutex>
24 #include <set>
25 #include <vector>
26 
27 #include "perfetto/base/gtest_prod_util.h"
28 #include "perfetto/base/logging.h"
29 #include "perfetto/base/optional.h"
30 #include "perfetto/base/time.h"
31 #include "perfetto/base/weak_ptr.h"
32 #include "perfetto/tracing/core/basic_types.h"
33 #include "perfetto/tracing/core/commit_data_request.h"
34 #include "perfetto/tracing/core/data_source_descriptor.h"
35 #include "perfetto/tracing/core/observable_events.h"
36 #include "perfetto/tracing/core/shared_memory_abi.h"
37 #include "perfetto/tracing/core/trace_config.h"
38 #include "perfetto/tracing/core/trace_stats.h"
39 #include "perfetto/tracing/core/tracing_service.h"
40 #include "src/tracing/core/id_allocator.h"
41 
42 namespace perfetto {
43 
44 namespace base {
45 class TaskRunner;
46 }  // namespace base
47 
48 class Consumer;
49 class DataSourceConfig;
50 class Producer;
51 class SharedMemory;
52 class SharedMemoryArbiterImpl;
53 class TraceBuffer;
54 class TraceConfig;
55 class TracePacket;
56 
57 // The tracing service business logic.
58 class TracingServiceImpl : public TracingService {
59  private:
60   struct DataSourceInstance;
61 
62  public:
63   static constexpr size_t kDefaultShmSize = 256 * 1024ul;
64   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
65   static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
66   static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
67                                             0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
68                                             0x6d, 0x57, 0xa0, 0x79};
69 
70   // The implementation behind the service endpoint exposed to each producer.
71   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
72    public:
73     ProducerEndpointImpl(ProducerID,
74                          uid_t uid,
75                          TracingServiceImpl*,
76                          base::TaskRunner*,
77                          Producer*,
78                          const std::string& producer_name,
79                          bool in_process,
80                          bool smb_scraping_enabled);
81     ~ProducerEndpointImpl() override;
82 
83     // TracingService::ProducerEndpoint implementation.
84     void RegisterDataSource(const DataSourceDescriptor&) override;
85     void UnregisterDataSource(const std::string& name) override;
86     void RegisterTraceWriter(uint32_t writer_id,
87                              uint32_t target_buffer) override;
88     void UnregisterTraceWriter(uint32_t writer_id) override;
89     void CommitData(const CommitDataRequest&, CommitDataCallback) override;
90     void SetSharedMemory(std::unique_ptr<SharedMemory>);
91     std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
92     SharedMemoryArbiter* GetInProcessShmemArbiter() override;
93     void NotifyFlushComplete(FlushRequestID) override;
94     void NotifyDataSourceStarted(DataSourceInstanceID) override;
95     void NotifyDataSourceStopped(DataSourceInstanceID) override;
96     SharedMemory* shared_memory() const override;
97     size_t shared_buffer_page_size_kb() const override;
98     void ActivateTriggers(const std::vector<std::string>&) override;
99 
100     void OnTracingSetup();
101     void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
102     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
103     void StopDataSource(DataSourceInstanceID);
104     void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
105     void OnFreeBuffers(const std::vector<BufferID>& target_buffers);
106     void ClearIncrementalState(const std::vector<DataSourceInstanceID>&);
107 
is_allowed_target_buffer(BufferID buffer_id)108     bool is_allowed_target_buffer(BufferID buffer_id) const {
109       return allowed_target_buffers_.count(buffer_id);
110     }
111 
buffer_id_for_writer(WriterID writer_id)112     base::Optional<BufferID> buffer_id_for_writer(WriterID writer_id) const {
113       const auto it = writers_.find(writer_id);
114       if (it != writers_.end())
115         return it->second;
116       return base::nullopt;
117     }
118 
119    private:
120     friend class TracingServiceImpl;
121     friend class TracingServiceImplTest;
122     friend class TracingIntegrationTest;
123     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
124     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
125 
126     ProducerID const id_;
127     const uid_t uid_;
128     TracingServiceImpl* const service_;
129     base::TaskRunner* const task_runner_;
130     Producer* producer_;
131     std::unique_ptr<SharedMemory> shared_memory_;
132     size_t shared_buffer_page_size_kb_ = 0;
133     SharedMemoryABI shmem_abi_;
134     size_t shmem_size_hint_bytes_ = 0;
135     const std::string name_;
136     bool in_process_;
137     bool smb_scraping_enabled_;
138 
139     // Set of the global target_buffer IDs that the producer is configured to
140     // write into in any active tracing session.
141     std::set<BufferID> allowed_target_buffers_;
142 
143     // Maps registered TraceWriter IDs to their target buffers as registered by
144     // the producer. Note that producers aren't required to register their
145     // writers, so we may see commits of chunks with WriterIDs that aren't
146     // contained in this map. However, if a producer does register a writer, the
147     // service will prevent the writer from writing into any other buffer than
148     // the one associated with it here. The BufferIDs stored in this map are
149     // untrusted, so need to be verified against |allowed_target_buffers_|
150     // before use.
151     std::map<WriterID, BufferID> writers_;
152 
153     // This is used only in in-process configurations.
154     // SharedMemoryArbiterImpl methods themselves are thread-safe.
155     std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
156 
157     PERFETTO_THREAD_CHECKER(thread_checker_)
158     base::WeakPtrFactory<ProducerEndpointImpl> weak_ptr_factory_;  // Keep last.
159   };
160 
161   // The implementation behind the service endpoint exposed to each consumer.
162   class ConsumerEndpointImpl : public TracingService::ConsumerEndpoint {
163    public:
164     ConsumerEndpointImpl(TracingServiceImpl*,
165                          base::TaskRunner*,
166                          Consumer*,
167                          uid_t uid);
168     ~ConsumerEndpointImpl() override;
169 
170     void NotifyOnTracingDisabled();
171     base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr();
172 
173     // TracingService::ConsumerEndpoint implementation.
174     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
175     void ChangeTraceConfig(const TraceConfig& cfg) override;
176     void StartTracing() override;
177     void DisableTracing() override;
178     void ReadBuffers() override;
179     void FreeBuffers() override;
180     void Flush(uint32_t timeout_ms, FlushCallback) override;
181     void Detach(const std::string& key) override;
182     void Attach(const std::string& key) override;
183     void GetTraceStats() override;
184     void ObserveEvents(uint32_t enabled_event_types) override;
185 
186     // If |observe_data_source_instances == true|, will queue a task to notify
187     // the consumer about the state change.
188     void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
189                                          const DataSourceInstance&);
190 
191    private:
192     friend class TracingServiceImpl;
193     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
194     ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
195 
196     // Returns a pointer to an ObservableEvents object that the caller can fill
197     // and schedules a task to send the ObservableEvents to the consumer.
198     ObservableEvents* AddObservableEvents();
199 
200     base::TaskRunner* const task_runner_;
201     TracingServiceImpl* const service_;
202     Consumer* const consumer_;
203     uid_t const uid_;
204     TracingSessionID tracing_session_id_ = 0;
205 
206     // Whether the consumer is interested in DataSourceInstance state change
207     // events.
208     uint32_t enabled_observable_event_types_ = ObservableEventType::kNone;
209     // ObservableEvents that will be sent to the consumer. If set, a task to
210     // flush the events to the consumer has been queued.
211     std::unique_ptr<ObservableEvents> observable_events_;
212 
213     PERFETTO_THREAD_CHECKER(thread_checker_)
214     base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
215   };
216 
217   explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
218                               base::TaskRunner*);
219   ~TracingServiceImpl() override;
220 
221   // Called by ProducerEndpointImpl.
222   void DisconnectProducer(ProducerID);
223   void RegisterDataSource(ProducerID, const DataSourceDescriptor&);
224   void UnregisterDataSource(ProducerID, const std::string& name);
225   void CopyProducerPageIntoLogBuffer(ProducerID,
226                                      uid_t,
227                                      WriterID,
228                                      ChunkID,
229                                      BufferID,
230                                      uint16_t num_fragments,
231                                      uint8_t chunk_flags,
232                                      bool chunk_complete,
233                                      const uint8_t* src,
234                                      size_t size);
235   void ApplyChunkPatches(ProducerID,
236                          const std::vector<CommitDataRequest::ChunkToPatch>&);
237   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
238   void NotifyDataSourceStarted(ProducerID, const DataSourceInstanceID);
239   void NotifyDataSourceStopped(ProducerID, const DataSourceInstanceID);
240   void ActivateTriggers(ProducerID, const std::vector<std::string>& triggers);
241 
242   // Called by ConsumerEndpointImpl.
243   bool DetachConsumer(ConsumerEndpointImpl*, const std::string& key);
244   bool AttachConsumer(ConsumerEndpointImpl*, const std::string& key);
245   void DisconnectConsumer(ConsumerEndpointImpl*);
246   bool EnableTracing(ConsumerEndpointImpl*,
247                      const TraceConfig&,
248                      base::ScopedFile);
249   void ChangeTraceConfig(ConsumerEndpointImpl*, const TraceConfig&);
250 
251   bool StartTracing(TracingSessionID);
252   void DisableTracing(TracingSessionID, bool disable_immediately = false);
253   void Flush(TracingSessionID tsid,
254              uint32_t timeout_ms,
255              ConsumerEndpoint::FlushCallback);
256   void FlushAndDisableTracing(TracingSessionID);
257   void ReadBuffers(TracingSessionID, ConsumerEndpointImpl*);
258   void FreeBuffers(TracingSessionID);
259 
260   // Service implementation.
261   std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
262       Producer*,
263       uid_t uid,
264       const std::string& producer_name,
265       size_t shared_memory_size_hint_bytes = 0,
266       bool in_process = false,
267       ProducerSMBScrapingMode smb_scraping_mode =
268           ProducerSMBScrapingMode::kDefault) override;
269 
270   std::unique_ptr<TracingService::ConsumerEndpoint> ConnectConsumer(
271       Consumer*,
272       uid_t) override;
273 
274   // Set whether SMB scraping should be enabled by default or not. Producers can
275   // override this setting for their own SMBs.
SetSMBScrapingEnabled(bool enabled)276   void SetSMBScrapingEnabled(bool enabled) override {
277     smb_scraping_enabled_ = enabled;
278   }
279 
280   // Exposed mainly for testing.
num_producers()281   size_t num_producers() const { return producers_.size(); }
282   ProducerEndpointImpl* GetProducer(ProducerID) const;
283 
284   uint32_t override_data_source_test_timeout_ms_for_testing = 0;
285 
286  private:
287   friend class TracingServiceImplTest;
288   friend class TracingIntegrationTest;
289 
290   struct RegisteredDataSource {
291     ProducerID producer_id;
292     DataSourceDescriptor descriptor;
293   };
294 
295   // Represents an active data source for a tracing session.
296   struct DataSourceInstance {
DataSourceInstanceDataSourceInstance297     DataSourceInstance(DataSourceInstanceID id,
298                        const DataSourceConfig& cfg,
299                        const std::string& ds_name,
300                        bool notify_on_start,
301                        bool notify_on_stop,
302                        bool handles_incremental_state_invalidation)
303         : instance_id(id),
304           config(cfg),
305           data_source_name(ds_name),
306           will_notify_on_start(notify_on_start),
307           will_notify_on_stop(notify_on_stop),
308           handles_incremental_state_clear(
309               handles_incremental_state_invalidation) {}
310     DataSourceInstance(const DataSourceInstance&) = delete;
311     DataSourceInstance& operator=(const DataSourceInstance&) = delete;
312 
313     DataSourceInstanceID instance_id;
314     DataSourceConfig config;
315     std::string data_source_name;
316     bool will_notify_on_start;
317     bool will_notify_on_stop;
318     bool handles_incremental_state_clear;
319 
320     enum DataSourceInstanceState {
321       CONFIGURED,
322       STARTING,
323       STARTED,
324       STOPPING,
325       STOPPED
326     };
327     DataSourceInstanceState state = CONFIGURED;
328   };
329 
330   struct PendingFlush {
331     std::set<ProducerID> producers;
332     ConsumerEndpoint::FlushCallback callback;
PendingFlushPendingFlush333     explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
334   };
335 
336   // Holds the state of a tracing session. A tracing session is uniquely bound
337   // a specific Consumer. Each Consumer can own one or more sessions.
338   struct TracingSession {
339     enum State {
340       DISABLED = 0,
341       CONFIGURED,
342       STARTED,
343       DISABLING_WAITING_STOP_ACKS
344     };
345 
346     TracingSession(TracingSessionID, ConsumerEndpointImpl*, const TraceConfig&);
347 
num_buffersTracingSession348     size_t num_buffers() const { return buffers_index.size(); }
349 
delay_to_next_write_period_msTracingSession350     uint32_t delay_to_next_write_period_ms() const {
351       PERFETTO_DCHECK(write_period_ms > 0);
352       return write_period_ms -
353              (base::GetWallTimeMs().count() % write_period_ms);
354     }
355 
flush_timeout_msTracingSession356     uint32_t flush_timeout_ms() {
357       uint32_t timeout_ms = config.flush_timeout_ms();
358       return timeout_ms ? timeout_ms : kDefaultFlushTimeoutMs;
359     }
360 
GetPacketSequenceIDTracingSession361     PacketSequenceID GetPacketSequenceID(ProducerID producer_id,
362                                          WriterID writer_id) {
363       auto key = std::make_pair(producer_id, writer_id);
364       auto it = packet_sequence_ids.find(key);
365       if (it != packet_sequence_ids.end())
366         return it->second;
367       // We shouldn't run out of sequence IDs (producer ID is 16 bit, writer IDs
368       // are limited to 1024).
369       static_assert(kMaxPacketSequenceID > kMaxProducerID * kMaxWriterID,
370                     "PacketSequenceID value space doesn't cover service "
371                     "sequence ID and all producer/writer ID combinations!");
372       PERFETTO_DCHECK(last_packet_sequence_id < kMaxPacketSequenceID);
373       PacketSequenceID sequence_id = ++last_packet_sequence_id;
374       packet_sequence_ids[key] = sequence_id;
375       return sequence_id;
376     }
377 
GetDataSourceInstanceTracingSession378     DataSourceInstance* GetDataSourceInstance(
379         ProducerID producer_id,
380         DataSourceInstanceID instance_id) {
381       for (auto& inst_kv : data_source_instances) {
382         if (inst_kv.first != producer_id ||
383             inst_kv.second.instance_id != instance_id) {
384           continue;
385         }
386         return &inst_kv.second;
387       }
388       return nullptr;
389     }
390 
AllDataSourceInstancesStoppedTracingSession391     bool AllDataSourceInstancesStopped() {
392       for (const auto& inst_kv : data_source_instances) {
393         if (inst_kv.second.state != DataSourceInstance::STOPPED)
394           return false;
395       }
396       return true;
397     }
398 
399     const TracingSessionID id;
400 
401     // The consumer that started the session.
402     // Can be nullptr if the consumer detached from the session.
403     ConsumerEndpointImpl* consumer_maybe_null;
404 
405     // Unix uid of the consumer. This is valid even after the consumer detaches
406     // and does not change for the entire duration of the session. It is used to
407     // prevent that a consumer re-attaches to a session from a different uid.
408     uid_t const consumer_uid;
409 
410     // The list of triggers this session received while alive and the time they
411     // were received at. This is used to insert 'fake' packets back to the
412     // consumer so they can tell when some event happened. The order matches the
413     // order they were received.
414     struct TriggerInfo {
415       uint64_t boot_time_ns;
416       std::string trigger_name;
417       std::string producer_name;
418       uid_t producer_uid;
419     };
420     std::vector<TriggerInfo> received_triggers;
421 
422     // The trace config provided by the Consumer when calling
423     // EnableTracing(), plus any updates performed by ChangeTraceConfig.
424     TraceConfig config;
425 
426     // List of data source instances that have been enabled on the various
427     // producers for this tracing session.
428     // TODO(rsavitski): at the time of writing, the map structure is unused
429     // (even when the calling code has a key). This is also an opportunity to
430     // consider an alternative data type, e.g. a map of vectors.
431     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
432 
433     // For each Flush(N) request, keeps track of the set of producers for which
434     // we are still awaiting a NotifyFlushComplete(N) ack.
435     std::map<FlushRequestID, PendingFlush> pending_flushes;
436 
437     // Maps a per-trace-session buffer index into the corresponding global
438     // BufferID (shared namespace amongst all consumers). This vector has as
439     // many entries as |config.buffers_size()|.
440     std::vector<BufferID> buffers_index;
441 
442     std::map<std::pair<ProducerID, WriterID>, PacketSequenceID>
443         packet_sequence_ids;
444     PacketSequenceID last_packet_sequence_id = kServicePacketSequenceID;
445 
446     // When the last snapshots (clock, stats, sync marker) were emitted into
447     // the output stream.
448     base::TimeMillis last_snapshot_time = {};
449 
450     // Whether we mirrored the trace config back to the trace output yet.
451     bool did_emit_config = false;
452 
453     // Whether we put the system info into the trace output yet.
454     bool did_emit_system_info = false;
455 
456     // The number of received triggers we've emitted into the trace output.
457     size_t num_triggers_emitted_into_trace = 0;
458 
459     // Initial clock snapshot, captured at trace start time (when state goes
460     // to TracingSession::STARTED). Emitted into the trace when the consumer
461     // first begins reading the trace.
462     std::vector<TracePacket> initial_clock_snapshot_;
463 
464     State state = DISABLED;
465 
466     // If the consumer detached the session, this variable defines the key used
467     // for identifying the session later when reattaching.
468     std::string detach_key;
469 
470     // This is set when the Consumer calls sets |write_into_file| == true in the
471     // TraceConfig. In this case this represents the file we should stream the
472     // trace packets into, rather than returning it to the consumer via
473     // OnTraceData().
474     base::ScopedFile write_into_file;
475     uint32_t write_period_ms = 0;
476     uint64_t max_file_size_bytes = 0;
477     uint64_t bytes_written_into_file = 0;
478   };
479 
480   TracingServiceImpl(const TracingServiceImpl&) = delete;
481   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
482 
483   DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
484                                       const TraceConfig::ProducerConfig&,
485                                       const RegisteredDataSource&,
486                                       TracingSession*);
487 
488   // Returns the next available ProducerID that is not in |producers_|.
489   ProducerID GetNextProducerID();
490 
491   // Returns a pointer to the |tracing_sessions_| entry or nullptr if the
492   // session doesn't exists.
493   TracingSession* GetTracingSession(TracingSessionID);
494 
495   // Returns a pointer to the |tracing_sessions_| entry, matching the given
496   // uid and detach key, or nullptr if no such session exists.
497   TracingSession* GetDetachedSession(uid_t, const std::string& key);
498 
499   // Update the memory guard rail by using the latest information from the
500   // shared memory and trace buffers.
501   void UpdateMemoryGuardrail();
502 
503   void StartDataSourceInstance(ProducerEndpointImpl* producer,
504                                TracingSession* tracing_session,
505                                DataSourceInstance* instance);
506   void SnapshotSyncMarker(std::vector<TracePacket>*);
507   void SnapshotClocks(std::vector<TracePacket>*, bool set_root_timestamp);
508   void SnapshotStats(TracingSession*, std::vector<TracePacket>*);
509   TraceStats GetTraceStats(TracingSession* tracing_session);
510   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
511   void MaybeEmitSystemInfo(TracingSession*, std::vector<TracePacket>*);
512   void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
513   void OnFlushTimeout(TracingSessionID, FlushRequestID);
514   void OnDisableTracingTimeout(TracingSessionID);
515   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
516   void PeriodicFlushTask(TracingSessionID, bool post_next_only);
517   void CompleteFlush(TracingSessionID tsid,
518                      ConsumerEndpoint::FlushCallback callback,
519                      bool success);
520   void ScrapeSharedMemoryBuffers(TracingSession* tracing_session,
521                                  ProducerEndpointImpl* producer);
522   void PeriodicClearIncrementalStateTask(TracingSessionID, bool post_next_only);
523   TraceBuffer* GetBufferByID(BufferID);
524   void OnStartTriggersTimeout(TracingSessionID tsid);
525 
526   base::TaskRunner* const task_runner_;
527   std::unique_ptr<SharedMemory::Factory> shm_factory_;
528   ProducerID last_producer_id_ = 0;
529   DataSourceInstanceID last_data_source_instance_id_ = 0;
530   TracingSessionID last_tracing_session_id_ = 0;
531   FlushRequestID last_flush_request_id_ = 0;
532   uid_t uid_ = 0;
533 
534   // Buffer IDs are global across all consumers (because a Producer can produce
535   // data for more than one trace session, hence more than one consumer).
536   IdAllocator<BufferID> buffer_ids_;
537 
538   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
539   std::map<ProducerID, ProducerEndpointImpl*> producers_;
540   std::set<ConsumerEndpointImpl*> consumers_;
541   std::map<TracingSessionID, TracingSession> tracing_sessions_;
542   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
543 
544   bool smb_scraping_enabled_ = false;
545   bool lockdown_mode_ = false;
546   uint32_t min_write_period_ms_ = 100;  // Overridable for testing.
547 
548   uint8_t sync_marker_packet_[32];  // Lazily initialized.
549   size_t sync_marker_packet_size_ = 0;
550 
551   // Stats.
552   uint64_t chunks_discarded_ = 0;
553   uint64_t patches_discarded_ = 0;
554 
555   PERFETTO_THREAD_CHECKER(thread_checker_)
556 
557   base::WeakPtrFactory<TracingServiceImpl>
558       weak_ptr_factory_;  // Keep at the end.
559 };
560 
561 }  // namespace perfetto
562 
563 #endif  // SRC_TRACING_CORE_TRACING_SERVICE_IMPL_H_
564