/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ #include #include "perfetto/ext/base/circular_queue.h" #include "perfetto/trace_processor/basic_types.h" #include "src/trace_processor/importers/common/trace_blob_view.h" #include "src/trace_processor/storage/trace_storage.h" #include "src/trace_processor/timestamped_trace_piece.h" namespace Json { class Value; } // namespace Json namespace perfetto { namespace trace_processor { class FuchsiaProviderView; class PacketSequenceState; struct SystraceLine; // This class takes care of sorting events parsed from the trace stream in // arbitrary order and pushing them to the next pipeline stages (parsing) in // order. In order to support streaming use-cases, sorting happens within a // max window. Events are held in the TraceSorter staging area (events_) until // either (1) the (max - min) timestamp > window_size; (2) trace EOF. // // This class is designed around the assumption that: // - Most events come from ftrace. // - Ftrace events are sorted within each cpu most of the times. // // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if // necessary) before proceeding with the global merge-sort-extract. // When an event is pushed through, it is just appeneded to the end of one of // the N queues. While appending, we keep track of the fact that the queue // is still ordered or just lost ordering. When an out-of-order event is // detected on a queue we keep track of: (1) the offset within the queue where // the chaos begun, (2) the timestamp that broke the ordering. // When we decide to extract events from the queues into the next stages of // the trace processor, we re-sort the events in the queue. Rather than // re-sorting everything all the times, we use the above knowledge to restrict // sorting to the (hopefully smaller) tail of the |events_| staging area. // At any time, the first partition of |events_| [0 .. sort_start_idx_) is // ordered, and the second partition [sort_start_idx_.. end] is not. // We use a logarithmic bound search operation to figure out what is the index // within the first partition where sorting should start, and sort all events // from there to the end. class TraceSorter { public: TraceSorter(std::unique_ptr parser, int64_t window_size_ns); inline void PushTracePacket(int64_t timestamp, PacketSequenceState* state, TraceBlobView packet) { DCHECK_ftrace_batch_cpu(kNoBatch); auto* queue = GetQueue(0); queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, std::move(packet), state->current_generation())); MaybeExtractEvents(queue); } inline void PushJsonValue(int64_t timestamp, std::string json_value) { auto* queue = GetQueue(0); queue->Append( TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value))); MaybeExtractEvents(queue); } inline void PushFuchsiaRecord(int64_t timestamp, std::unique_ptr record) { DCHECK_ftrace_batch_cpu(kNoBatch); auto* queue = GetQueue(0); queue->Append( TimestampedTracePiece(timestamp, packet_idx_++, std::move(record))); MaybeExtractEvents(queue); } inline void PushSystraceLine(std::unique_ptr systrace_line) { DCHECK_ftrace_batch_cpu(kNoBatch); auto* queue = GetQueue(0); int64_t timestamp = systrace_line->ts; queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, std::move(systrace_line))); MaybeExtractEvents(queue); } inline void PushFtraceEvent(uint32_t cpu, int64_t timestamp, TraceBlobView event, PacketSequenceState* state) { set_ftrace_batch_cpu_for_DCHECK(cpu); GetQueue(cpu + 1)->Append(TimestampedTracePiece( timestamp, packet_idx_++, FtraceEventData{std::move(event), state->current_generation()})); // The caller must call FinalizeFtraceEventBatch() after having pushed a // batch of ftrace events. This is to amortize the overhead of handling // global ordering and doing that in batches only after all ftrace events // for a bundle are pushed. } // As with |PushFtraceEvent|, doesn't immediately sort the affected queues. // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being // pushed through this function), the ftrace batches will no longer be fully // sorted by timestamp. In such situations, we will have to sort at the end of // the batch. We can do better as both sub-sequences are sorted however. // Consider adding extra queues, or pushing them in a merge-sort fashion // instead. inline void PushInlineFtraceEvent(uint32_t cpu, int64_t timestamp, InlineSchedSwitch inline_sched_switch) { set_ftrace_batch_cpu_for_DCHECK(cpu); GetQueue(cpu + 1)->Append( TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch)); } inline void PushInlineFtraceEvent(uint32_t cpu, int64_t timestamp, InlineSchedWaking inline_sched_waking) { set_ftrace_batch_cpu_for_DCHECK(cpu); GetQueue(cpu + 1)->Append( TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking)); } inline void PushTrackEventPacket(int64_t timestamp, std::unique_ptr data) { auto* queue = GetQueue(0); queue->Append( TimestampedTracePiece(timestamp, packet_idx_++, std::move(data))); MaybeExtractEvents(queue); } inline void FinalizeFtraceEventBatch(uint32_t cpu) { DCHECK_ftrace_batch_cpu(cpu); set_ftrace_batch_cpu_for_DCHECK(kNoBatch); MaybeExtractEvents(GetQueue(cpu + 1)); } // Extract all events ignoring the window. void ExtractEventsForced() { SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0); queues_.resize(0); } // Sets the window size to be the size specified (which should be lower than // any previous window size specified) and flushes any data beyond // this window size. // It is undefined to call this function with a window size greater than than // the current size. void SetWindowSizeNs(int64_t window_size_ns) { PERFETTO_DCHECK(window_size_ns <= window_size_ns_); PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns); window_size_ns_ = window_size_ns; // Fast path: if, globally, we are within the window size, then just exit. if (global_max_ts_ - global_min_ts_ < window_size_ns) return; SortAndExtractEventsBeyondWindow(window_size_ns_); } int64_t max_timestamp() const { return global_max_ts_; } private: static constexpr uint32_t kNoBatch = std::numeric_limits::max(); struct Queue { inline void Append(TimestampedTracePiece ttp) { const int64_t timestamp = ttp.timestamp; events_.emplace_back(std::move(ttp)); min_ts_ = std::min(min_ts_, timestamp); // Events are often seen in order. if (PERFETTO_LIKELY(timestamp >= max_ts_)) { max_ts_ = timestamp; } else { // The event is breaking ordering. The first time it happens, keep // track of which index we are at. We know that everything before that // is sorted (because events were pushed monotonically). Everything // after that index, instead, will need a sorting pass before moving // events to the next pipeline stage. if (sort_start_idx_ == 0) { PERFETTO_DCHECK(events_.size() >= 2); sort_start_idx_ = events_.size() - 1; sort_min_ts_ = timestamp; } else { sort_min_ts_ = std::min(sort_min_ts_, timestamp); } } PERFETTO_DCHECK(min_ts_ <= max_ts_); } bool needs_sorting() const { return sort_start_idx_ != 0; } void Sort(); base::CircularQueue events_; int64_t min_ts_ = std::numeric_limits::max(); int64_t max_ts_ = 0; size_t sort_start_idx_ = 0; int64_t sort_min_ts_ = std::numeric_limits::max(); }; // This method passes any events older than window_size_ns to the // parser to be parsed and then stored. void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns); inline Queue* GetQueue(size_t index) { if (PERFETTO_UNLIKELY(index >= queues_.size())) queues_.resize(index + 1); return &queues_[index]; } inline void MaybeExtractEvents(Queue* queue) { DCHECK_ftrace_batch_cpu(kNoBatch); global_max_ts_ = std::max(global_max_ts_, queue->max_ts_); global_min_ts_ = std::min(global_min_ts_, queue->min_ts_); // Fast path: if, globally, we are within the window size, then just exit. if (global_max_ts_ - global_min_ts_ < window_size_ns_) return; SortAndExtractEventsBeyondWindow(window_size_ns_); } std::unique_ptr parser_; // queues_[0] is the general (non-ftrace) queue. // queues_[1] is the ftrace queue for CPU(0). // queues_[x] is the ftrace queue for CPU(x - 1). std::vector queues_; // Events are propagated to the next stage only after (max - min) timestamp // is larger than this value. int64_t window_size_ns_; // max(e.timestamp for e in queues_). int64_t global_max_ts_ = 0; // min(e.timestamp for e in queues_). int64_t global_min_ts_ = std::numeric_limits::max(); // Monotonic increasing value used to index timestamped trace pieces. uint64_t packet_idx_ = 0; // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1. bool bypass_next_stage_for_testing_ = false; #if PERFETTO_DCHECK_IS_ON() // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called. uint32_t ftrace_batch_cpu_ = kNoBatch; inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) { PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu); } inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) { PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch || cpu == kNoBatch); ftrace_batch_cpu_ = cpu; } #else inline void DCHECK_ftrace_batch_cpu(uint32_t) {} inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {} #endif }; } // namespace trace_processor } // namespace perfetto #endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_