1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18 
19 #include <string>
20 
21 #include "perfetto/base/build_config.h"
22 #include "perfetto/base/logging.h"
23 #include "perfetto/ext/base/optional.h"
24 #include "perfetto/ext/base/string_view.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/protozero/proto_decoder.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/trace_processor/status.h"
29 #include "src/trace_processor/importers/common/clock_tracker.h"
30 #include "src/trace_processor/importers/common/event_tracker.h"
31 #include "src/trace_processor/importers/common/track_tracker.h"
32 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
33 #include "src/trace_processor/importers/gzip/gzip_utils.h"
34 #include "src/trace_processor/importers/proto/metadata_tracker.h"
35 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
36 #include "src/trace_processor/importers/proto/proto_incremental_state.h"
37 #include "src/trace_processor/storage/stats.h"
38 #include "src/trace_processor/storage/trace_storage.h"
39 #include "src/trace_processor/trace_sorter.h"
40 #include "src/trace_processor/util/descriptors.h"
41 
42 #include "protos/perfetto/common/builtin_clock.pbzero.h"
43 #include "protos/perfetto/config/trace_config.pbzero.h"
44 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
45 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
46 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
47 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
48 #include "protos/perfetto/trace/trace.pbzero.h"
49 #include "protos/perfetto/trace/trace_packet.pbzero.h"
50 
51 namespace perfetto {
52 namespace trace_processor {
53 
ProtoTraceReader(TraceProcessorContext * ctx)54 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
55     : context_(ctx) {}
56 ProtoTraceReader::~ProtoTraceReader() = default;
57 
Parse(std::unique_ptr<uint8_t[]> owned_buf,size_t size)58 util::Status ProtoTraceReader::Parse(std::unique_ptr<uint8_t[]> owned_buf,
59                                      size_t size) {
60   return tokenizer_.Tokenize(
61       std::move(owned_buf), size,
62       [this](TraceBlobView packet) { return ParsePacket(std::move(packet)); });
63 }
64 
ParseExtensionDescriptor(ConstBytes descriptor)65 util::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
66   protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
67                                                        descriptor.size);
68 
69   auto extension = decoder.extension_set();
70   return context_->descriptor_pool_->AddFromFileDescriptorSet(
71       extension.data, extension.size,
72       /*merge_existing_messages=*/true);
73 }
74 
ParsePacket(TraceBlobView packet)75 util::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
76   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
77   if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
78     return util::ErrStatus(
79         "Failed to parse proto packet fully; the trace is probably corrupt.");
80   }
81 
82   // Any compressed packets should have been handled by the tokenizer.
83   PERFETTO_CHECK(!decoder.has_compressed_packets());
84 
85   const uint32_t seq_id = decoder.trusted_packet_sequence_id();
86   auto* state = GetIncrementalStateForPacketSequence(seq_id);
87 
88   uint32_t sequence_flags = decoder.sequence_flags();
89 
90   if (decoder.incremental_state_cleared() ||
91       sequence_flags &
92           protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
93     HandleIncrementalStateCleared(decoder);
94   } else if (decoder.previous_packet_dropped()) {
95     HandlePreviousPacketDropped(decoder);
96   }
97 
98   // It is important that we parse defaults before parsing other fields such as
99   // the timestamp, since the defaults could affect them.
100   if (decoder.has_trace_packet_defaults()) {
101     auto field = decoder.trace_packet_defaults();
102     const size_t offset = packet.offset_of(field.data);
103     ParseTracePacketDefaults(decoder, packet.slice(offset, field.size));
104   }
105 
106   if (decoder.has_interned_data()) {
107     auto field = decoder.interned_data();
108     const size_t offset = packet.offset_of(field.data);
109     ParseInternedData(decoder, packet.slice(offset, field.size));
110   }
111 
112   if (decoder.has_clock_snapshot()) {
113     return ParseClockSnapshot(decoder.clock_snapshot(),
114                               decoder.trusted_packet_sequence_id());
115   }
116 
117   if (decoder.has_service_event()) {
118     PERFETTO_DCHECK(decoder.has_timestamp());
119     int64_t ts = static_cast<int64_t>(decoder.timestamp());
120     return ParseServiceEvent(ts, decoder.service_event());
121   }
122 
123   if (decoder.has_extension_descriptor()) {
124     return ParseExtensionDescriptor(decoder.extension_descriptor());
125   }
126 
127   if (decoder.sequence_flags() &
128       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
129     if (!seq_id) {
130       return util::ErrStatus(
131           "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
132           "TraceWriter's sequence_id is zero (the service is "
133           "probably too old)");
134     }
135 
136     if (!state->IsIncrementalStateValid()) {
137       context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
138       return util::OkStatus();
139     }
140   }
141 
142   // Workaround a bug in the frame timeline traces which is emitting packets
143   // with zero timestamp (b/179905685).
144   // TODO(primiano): around mid-2021 there should be no traces that have this
145   // bug and we should be able to remove this workaround.
146   if (decoder.has_frame_timeline_event() && decoder.timestamp() == 0) {
147     context_->storage->IncrementStats(
148         stats::frame_timeline_event_parser_errors);
149     return util::OkStatus();
150   }
151 
152   protos::pbzero::TracePacketDefaults::Decoder* defaults =
153       state->current_generation()->GetTracePacketDefaults();
154 
155   int64_t timestamp;
156   if (decoder.has_timestamp()) {
157     timestamp = static_cast<int64_t>(decoder.timestamp());
158 
159     uint32_t timestamp_clock_id =
160         decoder.has_timestamp_clock_id()
161             ? decoder.timestamp_clock_id()
162             : (defaults ? defaults->timestamp_clock_id() : 0);
163 
164     if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
165         (!timestamp_clock_id ||
166          timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
167       // Chrome event timestamps are in MONOTONIC domain, but may occur in
168       // traces where (a) no clock snapshots exist or (b) no clock_id is
169       // specified for their timestamps. Adjust to trace time if we have a clock
170       // snapshot.
171       // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
172       // chrome and then remove this.
173       auto trace_ts = context_->clock_tracker->ToTraceTime(
174           protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
175       if (trace_ts.has_value())
176         timestamp = trace_ts.value();
177     } else if (timestamp_clock_id) {
178       // If the TracePacket specifies a non-zero clock-id, translate the
179       // timestamp into the trace-time clock domain.
180       ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
181       bool is_seq_scoped =
182           ClockTracker::IsReservedSeqScopedClockId(converted_clock_id);
183       if (is_seq_scoped) {
184         if (!seq_id) {
185           return util::ErrStatus(
186               "TracePacket specified a sequence-local clock id (%" PRIu32
187               ") but the TraceWriter's sequence_id is zero (the service is "
188               "probably too old)",
189               timestamp_clock_id);
190         }
191         converted_clock_id =
192             ClockTracker::SeqScopedClockIdToGlobal(seq_id, timestamp_clock_id);
193       }
194       auto trace_ts =
195           context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
196       if (!trace_ts.has_value()) {
197         // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
198         // We don't return an error here as it will cause the trace to stop
199         // parsing. Instead, we rely on the stat increment in ToTraceTime() to
200         // inform the user about the error.
201         return util::OkStatus();
202       }
203       timestamp = trace_ts.value();
204     }
205   } else {
206     timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
207   }
208   latest_timestamp_ = std::max(timestamp, latest_timestamp_);
209 
210   auto& modules = context_->modules_by_field;
211   for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
212     if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
213       for (ProtoImporterModule* module : modules[field_id]) {
214         ModuleResult res = module->TokenizePacket(decoder, &packet, timestamp,
215                                                   state, field_id);
216         if (!res.ignored())
217           return res.ToStatus();
218       }
219     }
220   }
221 
222   if (decoder.has_trace_config()) {
223     ParseTraceConfig(decoder.trace_config());
224   }
225 
226   // Use parent data and length because we want to parse this again
227   // later to get the exact type of the packet.
228   context_->sorter->PushTracePacket(timestamp, state, std::move(packet));
229 
230   return util::OkStatus();
231 }
232 
ParseTraceConfig(protozero::ConstBytes blob)233 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
234   protos::pbzero::TraceConfig::Decoder trace_config(blob);
235   // If we're forcing a full sort, we can keep using the INT_MAX duration set
236   // when we created the sorter.
237   const auto& cfg = context_->config;
238   if (cfg.sorting_mode == SortingMode::kForceFullSort) {
239     return;
240   }
241 
242   base::Optional<int64_t> flush_period_window_size_ns;
243   if (trace_config.has_flush_period_ms() &&
244       trace_config.flush_period_ms() > 0) {
245     flush_period_window_size_ns =
246         static_cast<int64_t>(trace_config.flush_period_ms()) * 2 * 1000 * 1000;
247   }
248 
249   // If we're trying to force flush period based sorting, use that as the
250   // window size if it exists.
251   if (cfg.sorting_mode == SortingMode::kForceFlushPeriodWindowedSort &&
252       flush_period_window_size_ns.has_value()) {
253     context_->sorter->SetWindowSizeNs(flush_period_window_size_ns.value());
254     return;
255   }
256 
257   // If we end up here, we should use heuristics because either the sorting mode
258   // was set as such or we don't have a flush period to force the window size
259   // to.
260 
261   // If we're not forcing anything and this is a write_into_file trace, then
262   // use flush_period_ms as an indiciator for how big the sliding window for the
263   // sorter should be.
264   if (trace_config.write_into_file()) {
265     int64_t window_size_ns;
266     if (flush_period_window_size_ns.has_value()) {
267       window_size_ns = flush_period_window_size_ns.value();
268     } else {
269       constexpr uint64_t kDefaultWindowNs =
270           180 * 1000 * 1000 * 1000ULL;  // 3 minutes.
271       PERFETTO_ELOG(
272           "It is strongly recommended to have flush_period_ms set when "
273           "write_into_file is turned on. You will likely have many dropped "
274           "events because of inability to sort the events correctly.");
275       window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
276     }
277     context_->sorter->SetWindowSizeNs(window_size_ns);
278   }
279 }
280 
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)281 void ProtoTraceReader::HandleIncrementalStateCleared(
282     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
283   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
284     PERFETTO_ELOG(
285         "incremental_state_cleared without trusted_packet_sequence_id");
286     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
287     return;
288   }
289   GetIncrementalStateForPacketSequence(
290       packet_decoder.trusted_packet_sequence_id())
291       ->OnIncrementalStateCleared();
292   for (auto& module : context_->modules) {
293     module->OnIncrementalStateCleared(
294         packet_decoder.trusted_packet_sequence_id());
295   }
296 }
297 
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)298 void ProtoTraceReader::HandlePreviousPacketDropped(
299     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
300   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
301     PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
302     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
303     return;
304   }
305   GetIncrementalStateForPacketSequence(
306       packet_decoder.trusted_packet_sequence_id())
307       ->OnPacketLoss();
308 }
309 
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)310 void ProtoTraceReader::ParseTracePacketDefaults(
311     const protos::pbzero::TracePacket_Decoder& packet_decoder,
312     TraceBlobView trace_packet_defaults) {
313   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
314     PERFETTO_ELOG(
315         "TracePacketDefaults packet without trusted_packet_sequence_id");
316     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
317     return;
318   }
319 
320   auto* state = GetIncrementalStateForPacketSequence(
321       packet_decoder.trusted_packet_sequence_id());
322   state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
323 }
324 
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)325 void ProtoTraceReader::ParseInternedData(
326     const protos::pbzero::TracePacket::Decoder& packet_decoder,
327     TraceBlobView interned_data) {
328   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
329     PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
330     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
331     return;
332   }
333 
334   auto* state = GetIncrementalStateForPacketSequence(
335       packet_decoder.trusted_packet_sequence_id());
336 
337   // Don't parse interned data entries until incremental state is valid, because
338   // they could otherwise be associated with the wrong generation in the state.
339   if (!state->IsIncrementalStateValid()) {
340     context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
341     return;
342   }
343 
344   // Store references to interned data submessages into the sequence's state.
345   protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
346   for (protozero::Field f = decoder.ReadField(); f.valid();
347        f = decoder.ReadField()) {
348     auto bytes = f.as_bytes();
349     auto offset = interned_data.offset_of(bytes.data);
350     state->InternMessage(f.id(), interned_data.slice(offset, bytes.size));
351   }
352 }
353 
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)354 util::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
355                                                   uint32_t seq_id) {
356   std::vector<ClockTracker::ClockValue> clocks;
357   protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
358   if (evt.primary_trace_clock()) {
359     context_->clock_tracker->SetTraceTimeClock(
360         static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
361   }
362   for (auto it = evt.clocks(); it; ++it) {
363     protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
364     ClockTracker::ClockId clock_id = clk.clock_id();
365     if (ClockTracker::IsReservedSeqScopedClockId(clk.clock_id())) {
366       if (!seq_id) {
367         return util::ErrStatus(
368             "ClockSnapshot packet is specifying a sequence-scoped clock id "
369             "(%" PRIu64 ") but the TracePacket sequence_id is zero",
370             clock_id);
371       }
372       clock_id = ClockTracker::SeqScopedClockIdToGlobal(seq_id, clk.clock_id());
373     }
374     int64_t unit_multiplier_ns =
375         clk.unit_multiplier_ns()
376             ? static_cast<int64_t>(clk.unit_multiplier_ns())
377             : 1;
378     clocks.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
379                         clk.is_incremental());
380   }
381 
382   uint32_t snapshot_id = context_->clock_tracker->AddSnapshot(clocks);
383 
384   // Add the all the clock values to the clock snapshot table.
385   base::Optional<int64_t> trace_ts_for_check;
386   for (const auto& clock : clocks) {
387     // If the clock is incremental, we need to use 0 to map correctly to
388     // |absolute_timestamp|.
389     int64_t ts_to_convert = clock.is_incremental ? 0 : clock.absolute_timestamp;
390     base::Optional<int64_t> opt_trace_ts =
391         context_->clock_tracker->ToTraceTime(clock.clock_id, ts_to_convert);
392     if (!opt_trace_ts) {
393       // This can happen if |AddSnapshot| failed to resolve this clock. Just
394       // ignore this and move on.
395       continue;
396     }
397 
398     // Double check that all the clocks in this snapshot resolve to the same
399     // trace timestamp value.
400     PERFETTO_DCHECK(!trace_ts_for_check || opt_trace_ts == trace_ts_for_check);
401     trace_ts_for_check = *opt_trace_ts;
402 
403     tables::ClockSnapshotTable::Row row;
404     row.ts = *opt_trace_ts;
405     row.clock_id = static_cast<int64_t>(clock.clock_id);
406     row.clock_value = clock.absolute_timestamp;
407     row.clock_name = GetBuiltinClockNameOrNull(clock.clock_id);
408     row.snapshot_id = snapshot_id;
409 
410     auto* snapshot_table = context_->storage->mutable_clock_snapshot_table();
411     snapshot_table->Insert(row);
412   }
413   return util::OkStatus();
414 }
415 
GetBuiltinClockNameOrNull(uint64_t clock_id)416 base::Optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
417     uint64_t clock_id) {
418   switch (clock_id) {
419     case protos::pbzero::ClockSnapshot::Clock::REALTIME:
420       return context_->storage->InternString("REALTIME");
421     case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
422       return context_->storage->InternString("REALTIME_COARSE");
423     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
424       return context_->storage->InternString("MONOTONIC");
425     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
426       return context_->storage->InternString("MONOTONIC_COARSE");
427     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
428       return context_->storage->InternString("MONOTONIC_RAW");
429     case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
430       return context_->storage->InternString("BOOTTIME");
431     default:
432       return base::nullopt;
433   }
434 }
435 
ParseServiceEvent(int64_t ts,ConstBytes blob)436 util::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
437   protos::pbzero::TracingServiceEvent::Decoder tse(blob);
438   if (tse.tracing_started()) {
439     context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
440                                             Variadic::Integer(ts));
441   }
442   if (tse.tracing_disabled()) {
443     context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
444                                             Variadic::Integer(ts));
445   }
446   if (tse.all_data_sources_started()) {
447     context_->metadata_tracker->SetMetadata(
448         metadata::all_data_source_started_ns, Variadic::Integer(ts));
449   }
450   return util::OkStatus();
451 }
452 
NotifyEndOfFile()453 void ProtoTraceReader::NotifyEndOfFile() {}
454 
455 }  // namespace trace_processor
456 }  // namespace perfetto
457