1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/dynamic/thread_state_generator.h"
18 
19 #include <memory>
20 #include <set>
21 
22 #include "src/trace_processor/types/trace_processor_context.h"
23 
24 namespace perfetto {
25 namespace trace_processor {
26 
ThreadStateGenerator(TraceProcessorContext * context)27 ThreadStateGenerator::ThreadStateGenerator(TraceProcessorContext* context)
28     : running_string_id_(context->storage->InternString("Running")),
29       runnable_string_id_(context->storage->InternString("R")),
30       context_(context) {}
31 
32 ThreadStateGenerator::~ThreadStateGenerator() = default;
33 
ValidateConstraints(const QueryConstraints &)34 util::Status ThreadStateGenerator::ValidateConstraints(
35     const QueryConstraints&) {
36   return util::OkStatus();
37 }
38 
ComputeTable(const std::vector<Constraint> &,const std::vector<Order> &)39 std::unique_ptr<Table> ThreadStateGenerator::ComputeTable(
40     const std::vector<Constraint>&,
41     const std::vector<Order>&) {
42   if (!unsorted_thread_state_table_) {
43     int64_t trace_end_ts =
44         context_->storage->GetTraceTimestampBoundsNs().second;
45 
46     unsorted_thread_state_table_ = ComputeThreadStateTable(trace_end_ts);
47 
48     // We explicitly sort by ts here as ComputeThreadStateTable does not insert
49     // rows in sorted order but we expect our clients to always want to sort
50     // on ts. Writing ComputeThreadStateTable to insert in sorted order is
51     // more trouble than its worth.
52     sorted_thread_state_table_ = unsorted_thread_state_table_->Sort(
53         {unsorted_thread_state_table_->ts().ascending()});
54   }
55   PERFETTO_CHECK(sorted_thread_state_table_);
56   return std::unique_ptr<Table>(new Table(sorted_thread_state_table_->Copy()));
57 }
58 
59 std::unique_ptr<tables::ThreadStateTable>
ComputeThreadStateTable(int64_t trace_end_ts)60 ThreadStateGenerator::ComputeThreadStateTable(int64_t trace_end_ts) {
61   std::unique_ptr<tables::ThreadStateTable> table(new tables::ThreadStateTable(
62       context_->storage->mutable_string_pool(), nullptr));
63 
64   const auto& raw_sched = context_->storage->sched_slice_table();
65   const auto& instants = context_->storage->instant_table();
66 
67   // In both tables, exclude utid == 0 which represents the idle thread.
68   Table sched = raw_sched.Filter({raw_sched.utid().ne(0)});
69   Table waking = instants.Filter(
70       {instants.name().eq("sched_waking"), instants.ref().ne(0)});
71 
72   // We prefer to use waking if at all possible and fall back to wakeup if not
73   // available.
74   if (waking.row_count() == 0) {
75     waking = instants.Filter(
76         {instants.name().eq("sched_wakeup"), instants.ref().ne(0)});
77   }
78 
79   Table sched_blocked_reason = instants.Filter(
80       {instants.name().eq("sched_blocked_reason"), instants.ref().ne(0)});
81 
82   const auto& sched_ts_col = sched.GetTypedColumnByName<int64_t>("ts");
83   const auto& waking_ts_col = waking.GetTypedColumnByName<int64_t>("ts");
84   const auto& blocked_ts_col =
85       sched_blocked_reason.GetTypedColumnByName<int64_t>("ts");
86 
87   uint32_t sched_idx = 0;
88   uint32_t waking_idx = 0;
89   uint32_t blocked_idx = 0;
90   std::unordered_map<UniqueTid, ThreadSchedInfo> state_map;
91   while (sched_idx < sched.row_count() || waking_idx < waking.row_count() ||
92          blocked_idx < sched_blocked_reason.row_count()) {
93     int64_t sched_ts = sched_idx < sched.row_count()
94                            ? sched_ts_col[sched_idx]
95                            : std::numeric_limits<int64_t>::max();
96     int64_t waking_ts = waking_idx < waking.row_count()
97                             ? waking_ts_col[waking_idx]
98                             : std::numeric_limits<int64_t>::max();
99     int64_t blocked_ts = blocked_idx < sched_blocked_reason.row_count()
100                              ? blocked_ts_col[blocked_idx]
101                              : std::numeric_limits<int64_t>::max();
102 
103     // We go through all tables, picking the earliest timestamp from any
104     // to process that event.
105     int64_t min_ts = std::min({sched_ts, waking_ts, blocked_ts});
106     if (min_ts == sched_ts) {
107       AddSchedEvent(sched, sched_idx++, state_map, trace_end_ts, table.get());
108     } else if (min_ts == waking_ts) {
109       AddWakingEvent(waking, waking_idx++, state_map);
110     } else /* (min_ts == blocked_ts) */ {
111       AddBlockedReasonEvent(sched_blocked_reason, blocked_idx++, state_map);
112     }
113   }
114 
115   // At the end, go through and flush any remaining pending events.
116   for (const auto& utid_to_pending_info : state_map) {
117     UniqueTid utid = utid_to_pending_info.first;
118     const ThreadSchedInfo& pending_info = utid_to_pending_info.second;
119     FlushPendingEventsForThread(utid, pending_info, table.get(), base::nullopt);
120   }
121 
122   return table;
123 }
124 
AddSchedEvent(const Table & sched,uint32_t sched_idx,std::unordered_map<UniqueTid,ThreadSchedInfo> & state_map,int64_t trace_end_ts,tables::ThreadStateTable * table)125 void ThreadStateGenerator::AddSchedEvent(
126     const Table& sched,
127     uint32_t sched_idx,
128     std::unordered_map<UniqueTid, ThreadSchedInfo>& state_map,
129     int64_t trace_end_ts,
130     tables::ThreadStateTable* table) {
131   int64_t ts = sched.GetTypedColumnByName<int64_t>("ts")[sched_idx];
132   UniqueTid utid = sched.GetTypedColumnByName<uint32_t>("utid")[sched_idx];
133   ThreadSchedInfo* info = &state_map[utid];
134 
135   // Due to races in the kernel, it is possible for the same thread to be
136   // scheduled on different CPUs at the same time. This will manifest itself
137   // here by having |info->desched_ts| in the future of this scheduling slice
138   // (i.e. there was a scheduling slice in the past which ended after the start
139   // of the current scheduling slice).
140   //
141   // We work around this problem by truncating the previous slice to the start
142   // of this slice and not adding the descheduled slice (i.e. we don't call
143   // |FlushPendingEventsForThread| which adds this slice).
144   //
145   // See b/186509316 for details and an example on when this happens.
146   if (info->desched_ts && info->desched_ts.value() > ts) {
147     uint32_t prev_sched_row = info->scheduled_row.value();
148     int64_t prev_sched_start = table->ts()[prev_sched_row];
149 
150     // Just a double check that descheduling slice would have started at the
151     // same time the scheduling slice would have ended.
152     PERFETTO_DCHECK(prev_sched_start + table->dur()[prev_sched_row] ==
153                     info->desched_ts.value());
154 
155     // Truncate the duration of the old slice to end at the start of this
156     // scheduling slice.
157     table->mutable_dur()->Set(prev_sched_row, ts - prev_sched_start);
158   } else {
159     FlushPendingEventsForThread(utid, *info, table, ts);
160   }
161 
162   // Reset so we don't have any leftover data on the next round.
163   *info = {};
164 
165   // Undo the expansion of the final sched slice for each CPU to the end of the
166   // trace by setting the duration back to -1. This counteracts the code in
167   // SchedEventTracker::FlushPendingEvents
168   // TODO(lalitm): remove this hack when we stop expanding the last slice to the
169   // end of the trace.
170   int64_t dur = sched.GetTypedColumnByName<int64_t>("dur")[sched_idx];
171   if (ts + dur == trace_end_ts) {
172     dur = -1;
173   }
174 
175   // Now add the sched slice itself as "Running" with the other fields
176   // unchanged.
177   tables::ThreadStateTable::Row sched_row;
178   sched_row.ts = ts;
179   sched_row.dur = dur;
180   sched_row.cpu = sched.GetTypedColumnByName<uint32_t>("cpu")[sched_idx];
181   sched_row.state = running_string_id_;
182   sched_row.utid = utid;
183 
184   auto id_and_row = table->Insert(sched_row);
185 
186   // If the sched row had a negative duration, don't add any descheduled slice
187   // because it would be meaningless.
188   if (sched_row.dur == -1) {
189     return;
190   }
191 
192   // This will be flushed to the table on the next sched slice (or the very end
193   // of the big loop).
194   info->desched_ts = ts + dur;
195   info->desched_end_state =
196       sched.GetTypedColumnByName<StringId>("end_state")[sched_idx];
197   info->scheduled_row = id_and_row.row;
198 }
199 
AddWakingEvent(const Table & waking,uint32_t waking_idx,std::unordered_map<UniqueTid,ThreadSchedInfo> & state_map)200 void ThreadStateGenerator::AddWakingEvent(
201     const Table& waking,
202     uint32_t waking_idx,
203     std::unordered_map<UniqueTid, ThreadSchedInfo>& state_map) {
204   int64_t ts = waking.GetTypedColumnByName<int64_t>("ts")[waking_idx];
205   UniqueTid utid = static_cast<UniqueTid>(
206       waking.GetTypedColumnByName<int64_t>("ref")[waking_idx]);
207   ThreadSchedInfo* info = &state_map[utid];
208 
209   // Occasionally, it is possible to get a waking event for a thread
210   // which is already in a runnable state. When this happens, we just
211   // ignore the waking event.
212   // See b/186509316 for details and an example on when this happens.
213   if (info->desched_end_state &&
214       *info->desched_end_state == runnable_string_id_) {
215     return;
216   }
217 
218   // As counter-intuitive as it seems, occasionally we can get a waking
219   // event for a thread which is currently running.
220   //
221   // There are two cases when this can happen:
222   // 1. The kernel legitimately send a waking event for a "running" thread
223   //    because the thread was woken up before the kernel switched away
224   //    from it. In this case, the waking timestamp will be in the past
225   //    because we added the descheduled slice when we processed the sched
226   //    event.
227   // 2. We're close to the end of the trace or had data-loss and we missed
228   //    the switch out event for a thread but we see a waking after.
229 
230   // Case 1 described above. In this situation, we should drop the waking
231   // entirely.
232   if (info->desched_ts && *info->desched_ts > ts) {
233     return;
234   }
235 
236   // For case 2 and otherwise, we should just note the fact that the thread
237   // became runnable at this time. Note that we cannot check if runnable is
238   // already not set because we could have data-loss which leads to us getting
239   // back to back waking for a single thread.
240   info->runnable_ts = ts;
241 }
242 
CreateSchema()243 Table::Schema ThreadStateGenerator::CreateSchema() {
244   auto schema = tables::ThreadStateTable::Schema();
245 
246   // Because we expect our users to generally want ordered by ts, we set the
247   // ordering for the schema to match our forced sort pass in ComputeTable.
248   auto ts_it = std::find_if(
249       schema.columns.begin(), schema.columns.end(),
250       [](const Table::Schema::Column& col) { return col.name == "ts"; });
251   ts_it->is_sorted = true;
252   auto id_it = std::find_if(
253       schema.columns.begin(), schema.columns.end(),
254       [](const Table::Schema::Column& col) { return col.name == "id"; });
255   id_it->is_sorted = false;
256 
257   return schema;
258 }
259 
FlushPendingEventsForThread(UniqueTid utid,const ThreadSchedInfo & info,tables::ThreadStateTable * table,base::Optional<int64_t> end_ts)260 void ThreadStateGenerator::FlushPendingEventsForThread(
261     UniqueTid utid,
262     const ThreadSchedInfo& info,
263     tables::ThreadStateTable* table,
264     base::Optional<int64_t> end_ts) {
265   // First, let's flush the descheduled period (if any) to the table.
266   if (info.desched_ts) {
267     PERFETTO_DCHECK(info.desched_end_state);
268 
269     int64_t dur;
270     if (end_ts) {
271       int64_t desched_end_ts = info.runnable_ts ? *info.runnable_ts : *end_ts;
272       dur = desched_end_ts - *info.desched_ts;
273     } else {
274       dur = -1;
275     }
276 
277     tables::ThreadStateTable::Row row;
278     row.ts = *info.desched_ts;
279     row.dur = dur;
280     row.state = *info.desched_end_state;
281     row.utid = utid;
282     row.io_wait = info.io_wait;
283     row.blocked_function = info.blocked_function;
284     table->Insert(row);
285   }
286 
287   // Next, flush the runnable period (if any) to the table.
288   if (info.runnable_ts) {
289     tables::ThreadStateTable::Row row;
290     row.ts = *info.runnable_ts;
291     row.dur = end_ts ? *end_ts - row.ts : -1;
292     row.state = runnable_string_id_;
293     row.utid = utid;
294     table->Insert(row);
295   }
296 }
297 
AddBlockedReasonEvent(const Table & blocked_reason,uint32_t blocked_idx,std::unordered_map<UniqueTid,ThreadSchedInfo> & state_map)298 void ThreadStateGenerator::AddBlockedReasonEvent(
299     const Table& blocked_reason,
300     uint32_t blocked_idx,
301     std::unordered_map<UniqueTid, ThreadSchedInfo>& state_map) {
302   const auto& utid_col = blocked_reason.GetTypedColumnByName<int64_t>("ref");
303   const auto& arg_set_id_col =
304       blocked_reason.GetTypedColumnByName<uint32_t>("arg_set_id");
305 
306   UniqueTid utid = static_cast<UniqueTid>(utid_col[blocked_idx]);
307   uint32_t arg_set_id = arg_set_id_col[blocked_idx];
308   ThreadSchedInfo& info = state_map[utid];
309 
310   base::Optional<Variadic> opt_value;
311   util::Status status =
312       context_->storage->ExtractArg(arg_set_id, "io_wait", &opt_value);
313 
314   // We can't do anything better than ignoring any errors here.
315   // TODO(lalitm): see if there's a better way to handle this.
316   if (status.ok() && opt_value) {
317     PERFETTO_CHECK(opt_value->type == Variadic::Type::kBool);
318     info.io_wait = opt_value->bool_value;
319   }
320 
321   status = context_->storage->ExtractArg(arg_set_id, "function", &opt_value);
322   if (status.ok() && opt_value) {
323     PERFETTO_CHECK(opt_value->type == Variadic::Type::kString);
324     info.blocked_function = opt_value->string_value;
325   }
326 }
327 
TableName()328 std::string ThreadStateGenerator::TableName() {
329   return "thread_state";
330 }
331 
EstimateRowCount()332 uint32_t ThreadStateGenerator::EstimateRowCount() {
333   return context_->storage->sched_slice_table().row_count();
334 }
335 
336 }  // namespace trace_processor
337 }  // namespace perfetto
338