1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
19 
20 #include <vector>
21 
22 #include "perfetto/base/circular_queue.h"
23 #include "perfetto/trace_processor/basic_types.h"
24 #include "src/trace_processor/fuchsia_provider_view.h"
25 #include "src/trace_processor/proto_incremental_state.h"
26 #include "src/trace_processor/trace_blob_view.h"
27 #include "src/trace_processor/trace_processor_context.h"
28 #include "src/trace_processor/trace_storage.h"
29 
30 #if PERFETTO_BUILDFLAG(PERFETTO_STANDALONE_BUILD)
31 #include <json/value.h>
32 #else
33 // Json traces are only supported in standalone build.
34 namespace Json {
35 class Value {};
36 }  // namespace Json
37 #endif
38 
39 namespace perfetto {
40 namespace trace_processor {
41 
42 // This class takes care of sorting events parsed from the trace stream in
43 // arbitrary order and pushing them to the next pipeline stages (parsing) in
44 // order. In order to support streaming use-cases, sorting happens within a
45 // max window. Events are held in the TraceSorter staging area (events_) until
46 // either (1) the (max - min) timestamp > window_size; (2) trace EOF.
47 //
48 // This class is designed around the assumption that:
49 // - Most events come from ftrace.
50 // - Ftrace events are sorted within each cpu most of the times.
51 //
52 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
53 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
54 // necessary) before proceeding with the global merge-sort-extract.
55 // When an event is pushed through, it is just appeneded to the end of one of
56 // the N queues. While appending, we keep track of the fact that the queue
57 // is still ordered or just lost ordering. When an out-of-order event is
58 // detected on a queue we keep track of: (1) the offset within the queue where
59 // the chaos begun, (2) the timestamp that broke the ordering.
60 // When we decide to extract events from the queues into the next stages of
61 // the trace processor, we re-sort the events in the queue. Rather than
62 // re-sorting everything all the times, we use the above knowledge to restrict
63 // sorting to the (hopefully smaller) tail of the |events_| staging area.
64 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
65 // ordered, and the second partition [sort_start_idx_.. end] is not.
66 // We use a logarithmic bound search operation to figure out what is the index
67 // within the first partition where sorting should start, and sort all events
68 // from there to the end.
69 class TraceSorter {
70  public:
71   struct TimestampedTracePiece {
TimestampedTracePieceTimestampedTracePiece72     TimestampedTracePiece(int64_t ts, uint64_t idx, TraceBlobView tbv)
73         : TimestampedTracePiece(ts,
74                                 /*thread_ts=*/0,
75                                 idx,
76                                 std::move(tbv),
77                                 /*value=*/nullptr,
78                                 /*fpv=*/nullptr,
79                                 /*sequence_state=*/nullptr) {}
80 
TimestampedTracePieceTimestampedTracePiece81     TimestampedTracePiece(int64_t ts,
82                           uint64_t idx,
83                           std::unique_ptr<Json::Value> value)
84         : TimestampedTracePiece(ts,
85                                 /*thread_ts=*/0,
86                                 idx,
87                                 // TODO(dproy): Stop requiring TraceBlobView in
88                                 // TimestampedTracePiece.
89                                 TraceBlobView(nullptr, 0, 0),
90                                 std::move(value),
91                                 /*fpv=*/nullptr,
92                                 /*sequence_state=*/nullptr) {}
93 
TimestampedTracePieceTimestampedTracePiece94     TimestampedTracePiece(int64_t ts,
95                           uint64_t idx,
96                           TraceBlobView tbv,
97                           std::unique_ptr<FuchsiaProviderView> fpv)
98         : TimestampedTracePiece(ts,
99                                 /*thread_ts=*/0,
100                                 idx,
101                                 std::move(tbv),
102                                 /*value=*/nullptr,
103                                 std::move(fpv),
104                                 /*sequence_state=*/nullptr) {}
105 
TimestampedTracePieceTimestampedTracePiece106     TimestampedTracePiece(
107         int64_t ts,
108         int64_t thread_ts,
109         uint64_t idx,
110         TraceBlobView tbv,
111         ProtoIncrementalState::PacketSequenceState* sequence_state)
112         : TimestampedTracePiece(ts,
113                                 thread_ts,
114                                 idx,
115                                 std::move(tbv),
116                                 /*value=*/nullptr,
117                                 /*fpv=*/nullptr,
118                                 sequence_state) {}
119 
TimestampedTracePieceTimestampedTracePiece120     TimestampedTracePiece(
121         int64_t ts,
122         int64_t thread_ts,
123         uint64_t idx,
124         TraceBlobView tbv,
125         std::unique_ptr<Json::Value> value,
126         std::unique_ptr<FuchsiaProviderView> fpv,
127         ProtoIncrementalState::PacketSequenceState* sequence_state)
128         : json_value(std::move(value)),
129           fuchsia_provider_view(std::move(fpv)),
130           packet_sequence_state(sequence_state),
131           timestamp(ts),
132           thread_timestamp(thread_ts),
133           packet_idx_(idx),
134           blob_view(std::move(tbv)) {}
135 
136     TimestampedTracePiece(TimestampedTracePiece&&) noexcept = default;
137     TimestampedTracePiece& operator=(TimestampedTracePiece&&) = default;
138 
139     // For std::lower_bound().
CompareTimestampedTracePiece140     static inline bool Compare(const TimestampedTracePiece& x, int64_t ts) {
141       return x.timestamp < ts;
142     }
143 
144     // For std::sort().
145     inline bool operator<(const TimestampedTracePiece& o) const {
146       return timestamp < o.timestamp ||
147              (timestamp == o.timestamp && packet_idx_ < o.packet_idx_);
148     }
149 
150     std::unique_ptr<Json::Value> json_value;
151     std::unique_ptr<FuchsiaProviderView> fuchsia_provider_view;
152     ProtoIncrementalState::PacketSequenceState* packet_sequence_state;
153 
154     int64_t timestamp;
155     int64_t thread_timestamp;
156     uint64_t packet_idx_;
157     TraceBlobView blob_view;
158   };
159 
160   TraceSorter(TraceProcessorContext*, int64_t window_size_ns);
161 
PushTracePacket(int64_t timestamp,TraceBlobView packet)162   inline void PushTracePacket(int64_t timestamp, TraceBlobView packet) {
163     DCHECK_ftrace_batch_cpu(kNoBatch);
164     auto* queue = GetQueue(0);
165     queue->Append(
166         TimestampedTracePiece(timestamp, packet_idx_++, std::move(packet)));
167     MaybeExtractEvents(queue);
168   }
169 
PushJsonValue(int64_t timestamp,std::unique_ptr<Json::Value> json_value)170   inline void PushJsonValue(int64_t timestamp,
171                             std::unique_ptr<Json::Value> json_value) {
172     auto* queue = GetQueue(0);
173     queue->Append(
174         TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
175     MaybeExtractEvents(queue);
176   }
177 
PushFuchsiaRecord(int64_t timestamp,TraceBlobView record,std::unique_ptr<FuchsiaProviderView> provider_view)178   inline void PushFuchsiaRecord(
179       int64_t timestamp,
180       TraceBlobView record,
181       std::unique_ptr<FuchsiaProviderView> provider_view) {
182     DCHECK_ftrace_batch_cpu(kNoBatch);
183     auto* queue = GetQueue(0);
184     queue->Append(TimestampedTracePiece(
185         timestamp, packet_idx_++, std::move(record), std::move(provider_view)));
186     MaybeExtractEvents(queue);
187   }
188 
PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event)189   inline void PushFtraceEvent(uint32_t cpu,
190                               int64_t timestamp,
191                               TraceBlobView event) {
192     set_ftrace_batch_cpu_for_DCHECK(cpu);
193     GetQueue(cpu + 1)->Append(
194         TimestampedTracePiece(timestamp, packet_idx_++, std::move(event)));
195 
196     // The caller must call FinalizeFtraceEventBatch() after having pushed a
197     // batch of ftrace events. This is to amortize the overhead of handling
198     // global ordering and doing that in batches only after all ftrace events
199     // for a bundle are pushed.
200   }
201 
PushTrackEventPacket(int64_t timestamp,int64_t thread_time,ProtoIncrementalState::PacketSequenceState * state,TraceBlobView packet)202   inline void PushTrackEventPacket(
203       int64_t timestamp,
204       int64_t thread_time,
205       ProtoIncrementalState::PacketSequenceState* state,
206       TraceBlobView packet) {
207     auto* queue = GetQueue(0);
208     queue->Append(TimestampedTracePiece(timestamp, thread_time, packet_idx_++,
209                                         std::move(packet), state));
210     MaybeExtractEvents(queue);
211   }
212 
FinalizeFtraceEventBatch(uint32_t cpu)213   inline void FinalizeFtraceEventBatch(uint32_t cpu) {
214     DCHECK_ftrace_batch_cpu(cpu);
215     set_ftrace_batch_cpu_for_DCHECK(kNoBatch);
216     MaybeExtractEvents(GetQueue(cpu + 1));
217   }
218 
219   // Extract all events ignoring the window.
ExtractEventsForced()220   void ExtractEventsForced() {
221     SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0);
222   }
223 
set_window_ns_for_testing(int64_t window_size_ns)224   void set_window_ns_for_testing(int64_t window_size_ns) {
225     window_size_ns_ = window_size_ns;
226   }
227 
228  private:
229   static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max();
230 
231   struct Queue {
AppendQueue232     inline void Append(TimestampedTracePiece ttp) {
233       const int64_t timestamp = ttp.timestamp;
234       events_.emplace_back(std::move(ttp));
235       min_ts_ = std::min(min_ts_, timestamp);
236 
237       // Events are often seen in order.
238       if (PERFETTO_LIKELY(timestamp >= max_ts_)) {
239         max_ts_ = timestamp;
240       } else {
241         // The event is breaking ordering. The first time it happens, keep
242         // track of which index we are at. We know that everything before that
243         // is sorted (because events were pushed monotonically). Everything
244         // after that index, instead, will need a sorting pass before moving
245         // events to the next pipeline stage.
246         if (sort_start_idx_ == 0) {
247           PERFETTO_DCHECK(events_.size() >= 2);
248           sort_start_idx_ = events_.size() - 1;
249           sort_min_ts_ = timestamp;
250         } else {
251           sort_min_ts_ = std::min(sort_min_ts_, timestamp);
252         }
253       }
254 
255       PERFETTO_DCHECK(min_ts_ <= max_ts_);
256     }
257 
needs_sortingQueue258     bool needs_sorting() const { return sort_start_idx_ != 0; }
259     void Sort();
260 
261     base::CircularQueue<TimestampedTracePiece> events_;
262     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
263     int64_t max_ts_ = 0;
264     size_t sort_start_idx_ = 0;
265     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
266   };
267 
268   // This method passes any events older than window_size_ns to the
269   // parser to be parsed and then stored.
270   void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns);
271 
GetQueue(size_t index)272   inline Queue* GetQueue(size_t index) {
273     if (PERFETTO_UNLIKELY(index >= queues_.size()))
274       queues_.resize(index + 1);
275     return &queues_[index];
276   }
277 
MaybeExtractEvents(Queue * queue)278   inline void MaybeExtractEvents(Queue* queue) {
279     DCHECK_ftrace_batch_cpu(kNoBatch);
280     global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
281     global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
282 
283     if (global_max_ts_ - global_min_ts_ < window_size_ns_)
284       return;
285 
286     SortAndExtractEventsBeyondWindow(window_size_ns_);
287   }
288 
289   TraceProcessorContext* const context_;
290 
291   // queues_[0] is the general (non-ftrace) queue.
292   // queues_[1] is the ftrace queue for CPU(0).
293   // queues_[x] is the ftrace queue for CPU(x - 1).
294   std::vector<Queue> queues_;
295 
296   // Events are propagated to the next stage only after (max - min) timestamp
297   // is larger than this value.
298   int64_t window_size_ns_;
299 
300   // max(e.timestamp for e in queues_).
301   int64_t global_max_ts_ = 0;
302 
303   // min(e.timestamp for e in queues_).
304   int64_t global_min_ts_ = std::numeric_limits<int64_t>::max();
305 
306   // Monotonic increasing value used to index timestamped trace pieces.
307   uint64_t packet_idx_ = 0;
308 
309   // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
310   bool bypass_next_stage_for_testing_ = false;
311 
312 #if PERFETTO_DCHECK_IS_ON()
313   // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called.
314   uint32_t ftrace_batch_cpu_ = kNoBatch;
315 
DCHECK_ftrace_batch_cpu(uint32_t cpu)316   inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) {
317     PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu);
318   }
319 
set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu)320   inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) {
321     PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch ||
322                     cpu == kNoBatch);
323     ftrace_batch_cpu_ = cpu;
324   }
325 #else
DCHECK_ftrace_batch_cpu(uint32_t)326   inline void DCHECK_ftrace_batch_cpu(uint32_t) {}
set_ftrace_batch_cpu_for_DCHECK(uint32_t)327   inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {}
328 #endif
329 };
330 
331 }  // namespace trace_processor
332 }  // namespace perfetto
333 
334 #endif  // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
335