1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
19 
20 #include <vector>
21 
22 #include "perfetto/ext/base/circular_queue.h"
23 #include "perfetto/trace_processor/basic_types.h"
24 #include "src/trace_processor/importers/common/trace_blob_view.h"
25 #include "src/trace_processor/storage/trace_storage.h"
26 #include "src/trace_processor/timestamped_trace_piece.h"
27 
28 namespace Json {
29 class Value;
30 }  // namespace Json
31 
32 namespace perfetto {
33 namespace trace_processor {
34 
35 class FuchsiaProviderView;
36 class PacketSequenceState;
37 struct SystraceLine;
38 
39 // This class takes care of sorting events parsed from the trace stream in
40 // arbitrary order and pushing them to the next pipeline stages (parsing) in
41 // order. In order to support streaming use-cases, sorting happens within a
42 // max window. Events are held in the TraceSorter staging area (events_) until
43 // either (1) the (max - min) timestamp > window_size; (2) trace EOF.
44 //
45 // This class is designed around the assumption that:
46 // - Most events come from ftrace.
47 // - Ftrace events are sorted within each cpu most of the times.
48 //
49 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
50 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
51 // necessary) before proceeding with the global merge-sort-extract.
52 // When an event is pushed through, it is just appeneded to the end of one of
53 // the N queues. While appending, we keep track of the fact that the queue
54 // is still ordered or just lost ordering. When an out-of-order event is
55 // detected on a queue we keep track of: (1) the offset within the queue where
56 // the chaos begun, (2) the timestamp that broke the ordering.
57 // When we decide to extract events from the queues into the next stages of
58 // the trace processor, we re-sort the events in the queue. Rather than
59 // re-sorting everything all the times, we use the above knowledge to restrict
60 // sorting to the (hopefully smaller) tail of the |events_| staging area.
61 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
62 // ordered, and the second partition [sort_start_idx_.. end] is not.
63 // We use a logarithmic bound search operation to figure out what is the index
64 // within the first partition where sorting should start, and sort all events
65 // from there to the end.
66 class TraceSorter {
67  public:
68   TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns);
69 
PushTracePacket(int64_t timestamp,PacketSequenceState * state,TraceBlobView packet)70   inline void PushTracePacket(int64_t timestamp,
71                               PacketSequenceState* state,
72                               TraceBlobView packet) {
73     DCHECK_ftrace_batch_cpu(kNoBatch);
74     auto* queue = GetQueue(0);
75     queue->Append(TimestampedTracePiece(timestamp, packet_idx_++,
76                                         std::move(packet),
77                                         state->current_generation()));
78     MaybeExtractEvents(queue);
79   }
80 
PushJsonValue(int64_t timestamp,std::string json_value)81   inline void PushJsonValue(int64_t timestamp, std::string json_value) {
82     auto* queue = GetQueue(0);
83     queue->Append(
84         TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
85     MaybeExtractEvents(queue);
86   }
87 
PushFuchsiaRecord(int64_t timestamp,std::unique_ptr<FuchsiaRecord> record)88   inline void PushFuchsiaRecord(int64_t timestamp,
89                                 std::unique_ptr<FuchsiaRecord> record) {
90     DCHECK_ftrace_batch_cpu(kNoBatch);
91     auto* queue = GetQueue(0);
92     queue->Append(
93         TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
94     MaybeExtractEvents(queue);
95   }
96 
PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line)97   inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
98     DCHECK_ftrace_batch_cpu(kNoBatch);
99     auto* queue = GetQueue(0);
100     int64_t timestamp = systrace_line->ts;
101     queue->Append(TimestampedTracePiece(timestamp, packet_idx_++,
102                                         std::move(systrace_line)));
103     MaybeExtractEvents(queue);
104   }
105 
PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event,PacketSequenceState * state)106   inline void PushFtraceEvent(uint32_t cpu,
107                               int64_t timestamp,
108                               TraceBlobView event,
109                               PacketSequenceState* state) {
110     set_ftrace_batch_cpu_for_DCHECK(cpu);
111     GetQueue(cpu + 1)->Append(TimestampedTracePiece(
112         timestamp, packet_idx_++,
113         FtraceEventData{std::move(event), state->current_generation()}));
114 
115     // The caller must call FinalizeFtraceEventBatch() after having pushed a
116     // batch of ftrace events. This is to amortize the overhead of handling
117     // global ordering and doing that in batches only after all ftrace events
118     // for a bundle are pushed.
119   }
120 
121   // As with |PushFtraceEvent|, doesn't immediately sort the affected queues.
122   // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
123   // pushed through this function), the ftrace batches will no longer be fully
124   // sorted by timestamp. In such situations, we will have to sort at the end of
125   // the batch. We can do better as both sub-sequences are sorted however.
126   // Consider adding extra queues, or pushing them in a merge-sort fashion
127   // instead.
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)128   inline void PushInlineFtraceEvent(uint32_t cpu,
129                                     int64_t timestamp,
130                                     InlineSchedSwitch inline_sched_switch) {
131     set_ftrace_batch_cpu_for_DCHECK(cpu);
132     GetQueue(cpu + 1)->Append(
133         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
134   }
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)135   inline void PushInlineFtraceEvent(uint32_t cpu,
136                                     int64_t timestamp,
137                                     InlineSchedWaking inline_sched_waking) {
138     set_ftrace_batch_cpu_for_DCHECK(cpu);
139     GetQueue(cpu + 1)->Append(
140         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
141   }
142 
PushTrackEventPacket(int64_t timestamp,std::unique_ptr<TrackEventData> data)143   inline void PushTrackEventPacket(int64_t timestamp,
144                                    std::unique_ptr<TrackEventData> data) {
145     auto* queue = GetQueue(0);
146     queue->Append(
147         TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
148     MaybeExtractEvents(queue);
149   }
150 
FinalizeFtraceEventBatch(uint32_t cpu)151   inline void FinalizeFtraceEventBatch(uint32_t cpu) {
152     DCHECK_ftrace_batch_cpu(cpu);
153     set_ftrace_batch_cpu_for_DCHECK(kNoBatch);
154     MaybeExtractEvents(GetQueue(cpu + 1));
155   }
156 
157   // Extract all events ignoring the window.
ExtractEventsForced()158   void ExtractEventsForced() {
159     SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0);
160     queues_.resize(0);
161   }
162 
163   // Sets the window size to be the size specified (which should be lower than
164   // any previous window size specified) and flushes any data beyond
165   // this window size.
166   // It is undefined to call this function with a window size greater than than
167   // the current size.
SetWindowSizeNs(int64_t window_size_ns)168   void SetWindowSizeNs(int64_t window_size_ns) {
169     PERFETTO_DCHECK(window_size_ns <= window_size_ns_);
170 
171     PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns);
172     window_size_ns_ = window_size_ns;
173 
174     // Fast path: if, globally, we are within the window size, then just exit.
175     if (global_max_ts_ - global_min_ts_ < window_size_ns)
176       return;
177     SortAndExtractEventsBeyondWindow(window_size_ns_);
178   }
179 
max_timestamp()180   int64_t max_timestamp() const { return global_max_ts_; }
181 
182  private:
183   static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max();
184 
185   struct Queue {
AppendQueue186     inline void Append(TimestampedTracePiece ttp) {
187       const int64_t timestamp = ttp.timestamp;
188       events_.emplace_back(std::move(ttp));
189       min_ts_ = std::min(min_ts_, timestamp);
190 
191       // Events are often seen in order.
192       if (PERFETTO_LIKELY(timestamp >= max_ts_)) {
193         max_ts_ = timestamp;
194       } else {
195         // The event is breaking ordering. The first time it happens, keep
196         // track of which index we are at. We know that everything before that
197         // is sorted (because events were pushed monotonically). Everything
198         // after that index, instead, will need a sorting pass before moving
199         // events to the next pipeline stage.
200         if (sort_start_idx_ == 0) {
201           PERFETTO_DCHECK(events_.size() >= 2);
202           sort_start_idx_ = events_.size() - 1;
203           sort_min_ts_ = timestamp;
204         } else {
205           sort_min_ts_ = std::min(sort_min_ts_, timestamp);
206         }
207       }
208 
209       PERFETTO_DCHECK(min_ts_ <= max_ts_);
210     }
211 
needs_sortingQueue212     bool needs_sorting() const { return sort_start_idx_ != 0; }
213     void Sort();
214 
215     base::CircularQueue<TimestampedTracePiece> events_;
216     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
217     int64_t max_ts_ = 0;
218     size_t sort_start_idx_ = 0;
219     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
220   };
221 
222   // This method passes any events older than window_size_ns to the
223   // parser to be parsed and then stored.
224   void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns);
225 
GetQueue(size_t index)226   inline Queue* GetQueue(size_t index) {
227     if (PERFETTO_UNLIKELY(index >= queues_.size()))
228       queues_.resize(index + 1);
229     return &queues_[index];
230   }
231 
MaybeExtractEvents(Queue * queue)232   inline void MaybeExtractEvents(Queue* queue) {
233     DCHECK_ftrace_batch_cpu(kNoBatch);
234     global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
235     global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
236 
237     // Fast path: if, globally, we are within the window size, then just exit.
238     if (global_max_ts_ - global_min_ts_ < window_size_ns_)
239       return;
240     SortAndExtractEventsBeyondWindow(window_size_ns_);
241   }
242 
243   std::unique_ptr<TraceParser> parser_;
244 
245   // queues_[0] is the general (non-ftrace) queue.
246   // queues_[1] is the ftrace queue for CPU(0).
247   // queues_[x] is the ftrace queue for CPU(x - 1).
248   std::vector<Queue> queues_;
249 
250   // Events are propagated to the next stage only after (max - min) timestamp
251   // is larger than this value.
252   int64_t window_size_ns_;
253 
254   // max(e.timestamp for e in queues_).
255   int64_t global_max_ts_ = 0;
256 
257   // min(e.timestamp for e in queues_).
258   int64_t global_min_ts_ = std::numeric_limits<int64_t>::max();
259 
260   // Monotonic increasing value used to index timestamped trace pieces.
261   uint64_t packet_idx_ = 0;
262 
263   // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
264   bool bypass_next_stage_for_testing_ = false;
265 
266 #if PERFETTO_DCHECK_IS_ON()
267   // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called.
268   uint32_t ftrace_batch_cpu_ = kNoBatch;
269 
DCHECK_ftrace_batch_cpu(uint32_t cpu)270   inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) {
271     PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu);
272   }
273 
set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu)274   inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) {
275     PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch ||
276                     cpu == kNoBatch);
277     ftrace_batch_cpu_ = cpu;
278   }
279 #else
DCHECK_ftrace_batch_cpu(uint32_t)280   inline void DCHECK_ftrace_batch_cpu(uint32_t) {}
set_ftrace_batch_cpu_for_DCHECK(uint32_t)281   inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {}
282 #endif
283 };
284 
285 }  // namespace trace_processor
286 }  // namespace perfetto
287 
288 #endif  // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
289