/* * Copyright (C) 2018 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "src/trace_processor/proto_trace_parser.h" #include "src/trace_processor/trace_sorter.h" namespace perfetto { namespace trace_processor { TraceSorter::TraceSorter(TraceProcessorContext* context, int64_t window_size_ns) : context_(context), window_size_ns_(window_size_ns) { const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY"); bypass_next_stage_for_testing_ = env && !strcmp(env, "1"); if (bypass_next_stage_for_testing_) PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage"); } void TraceSorter::Queue::Sort() { PERFETTO_DCHECK(needs_sorting()); PERFETTO_DCHECK(sort_start_idx_ < events_.size()); // If sort_min_ts_ has been set, it will no long be max_int, and so will be // smaller than max_ts_. PERFETTO_DCHECK(sort_min_ts_ < max_ts_); // We know that all events between [0, sort_start_idx_] are sorted. Witin // this range, perform a bound search and find the iterator for the min // timestamp that broke the monotonicity. Re-sort from there to the end. auto sort_end = events_.begin() + static_cast(sort_start_idx_); PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end)); auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_, &TimestampedTracePiece::Compare); std::sort(sort_begin, events_.end()); sort_start_idx_ = 0; sort_min_ts_ = 0; // At this point |events_| must be fully sorted. PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end())); } // Removes all the events in |queues_| that are earlier than the given window // size and moves them to the next parser stages, respecting global timestamp // order. This function is a "extract min from N sorted queues", with some // little cleverness: we know that events tend to be bursty, so events are // not going to be randomly distributed on the N |queues_|. // Upon each iteration this function finds the first two queues (if any) that // have the oldest events, and extracts events from the 1st until hitting the // min_ts of the 2nd. Imagine the queues are as follows: // // q0 {min_ts: 10 max_ts: 30} // q1 {min_ts:5 max_ts: 35} // q2 {min_ts: 12 max_ts: 40} // // We know that we can extract all events from q1 until we hit ts=10 without // looking at any other queue. After hitting ts=10, we need to re-look to all of // them to figure out the next min-event. // There are more suitable data structures to do this (e.g. keeping a min-heap // to avoid re-scanning all the queues all the times) but doesn't seem worth it. // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu // time in a profiler. void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) { DCHECK_ftrace_batch_cpu(kNoBatch); constexpr int64_t kTsMax = std::numeric_limits::max(); const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0; int64_t extract_end_ts = global_max_ts_ - window_size_ns; auto* next_stage = context_->parser.get(); size_t iterations = 0; for (;; iterations++) { size_t min_queue_idx = 0; // The index of the queue with the min(ts). // The top-2 min(ts) among all queues. // queues_[min_queue_idx].events.timestamp == min_queue_ts[0]. int64_t min_queue_ts[2]{kTsMax, kTsMax}; // This loop identifies the queue which starts with the earliest event and // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]). bool has_queues_with_expired_events = false; for (size_t i = 0; i < queues_.size(); i++) { auto& queue = queues_[i]; if (queue.events_.empty()) continue; PERFETTO_DCHECK(queue.min_ts_ >= global_min_ts_); PERFETTO_DCHECK(queue.max_ts_ <= global_max_ts_); if (queue.min_ts_ < min_queue_ts[0]) { min_queue_ts[1] = min_queue_ts[0]; min_queue_ts[0] = queue.min_ts_; min_queue_idx = i; has_queues_with_expired_events = true; } else if (queue.min_ts_ < min_queue_ts[1]) { min_queue_ts[1] = queue.min_ts_; } } if (!has_queues_with_expired_events) { // All the queues have events that start after the window (i.e. they are // too recent and not eligible to be extracted given the current window). break; } Queue& queue = queues_[min_queue_idx]; auto& events = queue.events_; if (queue.needs_sorting()) queue.Sort(); PERFETTO_DCHECK(queue.min_ts_ == events.front().timestamp); PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_); // Now that we identified the min-queue, extract all events from it until // we hit either: (1) the min-ts of the 2nd queue or (2) the window limit, // whichever comes first. int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]); size_t num_extracted = 0; for (auto& event : events) { int64_t timestamp = event.timestamp; if (timestamp > extract_until_ts) break; ++num_extracted; if (bypass_next_stage_for_testing_) continue; if (min_queue_idx == 0) { // queues_[0] is for non-ftrace packets. next_stage->ParseTracePacket(timestamp, std::move(event)); } else { // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on. uint32_t cpu = static_cast(min_queue_idx - 1); next_stage->ParseFtracePacket(cpu, timestamp, std::move(event)); } } // for (event: events) if (!num_extracted) { // No events can be extracted from any of the queues. This means that // either we hit the window or all queues are empty. break; } // Now remove the entries from the event buffer and update the queue-local // and global time bounds. events.erase_front(num_extracted); // Update the global_{min,max}_ts to reflect the bounds after extraction. if (events.empty()) { queue.min_ts_ = kTsMax; queue.max_ts_ = 0; global_min_ts_ = min_queue_ts[1]; // If we extraced the max entry from a queue (i.e. we emptied the queue) // we need to recompute the global max, because it might have been the one // just extracted. global_max_ts_ = 0; for (auto& q : queues_) global_max_ts_ = std::max(global_max_ts_, q.max_ts_); } else { queue.min_ts_ = queue.events_.front().timestamp; global_min_ts_ = std::min(queue.min_ts_, min_queue_ts[1]); } } // for(;;) // We decide to extract events only when we know (using the global_{min,max} // bounds) that there are eligible events. We should never end up in a // situation where we call this function but then realize that there was // nothing to extract. PERFETTO_DCHECK(iterations > 0 || was_empty); #if PERFETTO_DCHECK_IS_ON() // Check that the global min/max are consistent. int64_t dbg_min_ts = kTsMax; int64_t dbg_max_ts = 0; for (auto& q : queues_) { dbg_min_ts = std::min(dbg_min_ts, q.min_ts_); dbg_max_ts = std::max(dbg_max_ts, q.max_ts_); } PERFETTO_DCHECK(global_min_ts_ == dbg_min_ts); PERFETTO_DCHECK(global_max_ts_ == dbg_max_ts); #endif } } // namespace trace_processor } // namespace perfetto