1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_SPAN_JOIN_OPERATOR_TABLE_H_
18 #define SRC_TRACE_PROCESSOR_SPAN_JOIN_OPERATOR_TABLE_H_
19 
20 #include <sqlite3.h>
21 #include <array>
22 #include <deque>
23 #include <limits>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <unordered_map>
28 #include <vector>
29 
30 #include "src/trace_processor/scoped_db.h"
31 #include "src/trace_processor/table.h"
32 
33 namespace perfetto {
34 namespace trace_processor {
35 
36 // Implements the SPAN JOIN operation between two tables on a particular column.
37 //
38 // Span:
39 // A span is a row with a timestamp and a duration. It can is used to model
40 // operations which run for a particular *span* of time.
41 //
42 // We draw spans like so (time on the x-axis):
43 // start of span->[ time where opertion is running ]<- end of span
44 //
45 // Multiple spans can happen in parallel:
46 // [      ]
47 //    [        ]
48 //   [                    ]
49 //  [ ]
50 //
51 // The above for example, models scheduling activity on a 4-core computer for a
52 // short period of time.
53 //
54 // Span join:
55 // The span join operation can be thought of as the intersection of span tables.
56 // That is, the join table has a span for each pair of spans in the child tables
57 // where the spans overlap. Because many spans are possible in parallel, an
58 // extra metadata column (labelled the "join column") is used to distinguish
59 // between the spanned tables.
60 //
61 // For a given join key suppose these were the two span tables:
62 // Table 1:   [        ]              [      ]         [ ]
63 // Table 2:          [      ]            [  ]           [      ]
64 // Output :          [ ]                 [  ]           []
65 //
66 // All other columns apart from timestamp (ts), duration (dur) and the join key
67 // are passed through unchanged.
68 class SpanJoinOperatorTable : public Table {
69  public:
70   // Columns of the span operator table.
71   enum Column {
72     kTimestamp = 0,
73     kDuration = 1,
74     kPartition = 2,
75     // All other columns are dynamic depending on the joined tables.
76   };
77 
78   enum class PartitioningType {
79     kNoPartitioning = 0,
80     kSamePartitioning = 1,
81     kMixedPartitioning = 2
82   };
83 
84   // Parsed version of a table descriptor.
85   struct TableDescriptor {
86     static base::Optional<TableDescriptor> Parse(
87         const std::string& raw_descriptor);
88 
IsPartitionedTableDescriptor89     bool IsPartitioned() const { return !partition_col.empty(); }
90 
91     std::string name;
92     std::string partition_col;
93   };
94 
95   // Contains the definition of the child tables.
96   class TableDefinition {
97    public:
98     TableDefinition() = default;
99 
100     TableDefinition(std::string name,
101                     std::string partition_col,
102                     std::vector<Table::Column> cols,
103                     bool emit_shadow_slices,
104                     uint32_t ts_idx,
105                     uint32_t dur_idx,
106                     uint32_t partition_idx);
107 
name()108     const std::string& name() const { return name_; }
partition_col()109     const std::string& partition_col() const { return partition_col_; }
columns()110     const std::vector<Table::Column>& columns() const { return cols_; }
111 
emit_shadow_slices()112     bool emit_shadow_slices() const { return emit_shadow_slices_; }
ts_idx()113     uint32_t ts_idx() const { return ts_idx_; }
dur_idx()114     uint32_t dur_idx() const { return dur_idx_; }
partition_idx()115     uint32_t partition_idx() const { return partition_idx_; }
116 
IsPartitioned()117     bool IsPartitioned() const { return !partition_col_.empty(); }
118 
119    private:
120     std::string name_;
121     std::string partition_col_;
122     std::vector<Table::Column> cols_;
123     bool emit_shadow_slices_;
124     uint32_t ts_idx_ = std::numeric_limits<uint32_t>::max();
125     uint32_t dur_idx_ = std::numeric_limits<uint32_t>::max();
126     uint32_t partition_idx_ = std::numeric_limits<uint32_t>::max();
127   };
128 
129   class Query {
130    public:
131     struct StepRet {
132       enum Code {
133         kRow,
134         kEof,
135         kError,
136       };
137 
codeStepRet138       StepRet(Code c, int e = SQLITE_OK) : code(c), err_code(e) {}
139 
is_rowStepRet140       bool is_row() const { return code == Code::kRow; }
is_eofStepRet141       bool is_eof() const { return code == Code::kEof; }
is_errStepRet142       bool is_err() const { return code == Code::kError; }
143 
144       Code code = Code::kEof;
145       int err_code = SQLITE_OK;
146     };
147 
148     Query(SpanJoinOperatorTable*, const TableDefinition*, sqlite3* db);
149     virtual ~Query();
150 
151     Query(Query&) = delete;
152     Query& operator=(const Query&) = delete;
153 
154     Query(Query&&) noexcept = default;
155     Query& operator=(Query&&) = default;
156 
157     int Initialize(const QueryConstraints& qc, sqlite3_value** argv);
158 
159     StepRet Step();
160     StepRet StepToNextPartition();
161     StepRet StepToPartition(int64_t target_partition);
162     StepRet StepUntil(int64_t timestamp);
163 
164     void ReportSqliteResult(sqlite3_context* context, size_t index);
165 
ts_start()166     int64_t ts_start() const { return ts_start_; }
ts_end()167     int64_t ts_end() const { return ts_end_; }
partition()168     int64_t partition() const { return partition_; }
169 
definition()170     const TableDefinition* definition() const { return defn_; }
171 
Eof()172     bool Eof() const { return cursor_eof_ && mode_ == Mode::kRealSlice; }
IsPartitioned()173     bool IsPartitioned() const { return defn_->IsPartitioned(); }
IsRealSlice()174     bool IsRealSlice() const { return mode_ == Mode::kRealSlice; }
175 
IsFullPartitionShadowSlice()176     bool IsFullPartitionShadowSlice() const {
177       return mode_ == Mode::kShadowSlice && ts_start_ == 0 &&
178              ts_end_ == std::numeric_limits<int64_t>::max();
179     }
180 
CursorPartition()181     int64_t CursorPartition() const {
182       PERFETTO_DCHECK(defn_->IsPartitioned());
183       auto partition_idx = static_cast<int>(defn_->partition_idx());
184       return sqlite3_column_int64(stmt_.get(), partition_idx);
185     }
186 
187    private:
188     enum Mode {
189       kRealSlice,
190       kShadowSlice,
191     };
192 
193     int PrepareRawStmt();
194     std::string CreateSqlQuery(const std::vector<std::string>& cs) const;
195 
CursorTs()196     int64_t CursorTs() const {
197       auto ts_idx = static_cast<int>(defn_->ts_idx());
198       return sqlite3_column_int64(stmt_.get(), ts_idx);
199     }
200 
CursorDur()201     int64_t CursorDur() const {
202       auto dur_idx = static_cast<int>(defn_->dur_idx());
203       return sqlite3_column_int64(stmt_.get(), dur_idx);
204     }
205 
206     std::string sql_query_;
207     ScopedStmt stmt_;
208 
209     int64_t ts_start_ = 0;
210     int64_t ts_end_ = 0;
211     int64_t partition_ = std::numeric_limits<int64_t>::lowest();
212 
213     bool cursor_eof_ = false;
214     Mode mode_ = Mode::kRealSlice;
215 
216     const TableDefinition* defn_ = nullptr;
217     sqlite3* db_ = nullptr;
218     SpanJoinOperatorTable* table_ = nullptr;
219   };
220 
221   // Base class for a cursor on the span table.
222   class Cursor : public Table::Cursor {
223    public:
224     Cursor(SpanJoinOperatorTable*, sqlite3* db);
225     ~Cursor() override = default;
226 
227     int Filter(const QueryConstraints& qc, sqlite3_value** argv) override;
228     int Column(sqlite3_context* context, int N) override;
229     int Next() override;
230     int Eof() override;
231 
232    private:
233     Cursor(Cursor&) = delete;
234     Cursor& operator=(const Cursor&) = delete;
235 
236     Cursor(Cursor&&) noexcept = default;
237     Cursor& operator=(Cursor&&) = default;
238 
239     bool IsOverlappingSpan();
240     Query::StepRet StepUntilRealSlice();
241 
242     Query t1_;
243     Query t2_;
244     Query* next_stepped_ = nullptr;
245 
246     SpanJoinOperatorTable* table_;
247   };
248 
249   SpanJoinOperatorTable(sqlite3*, const TraceStorage*);
250 
251   static void RegisterTable(sqlite3* db, const TraceStorage* storage);
252 
253   // Table implementation.
254   base::Optional<Table::Schema> Init(int, const char* const*) override;
255   std::unique_ptr<Table::Cursor> CreateCursor() override;
256   int BestIndex(const QueryConstraints& qc, BestIndexInfo* info) override;
257 
258  private:
259   // Identifier for a column by index in a given table.
260   struct ColumnLocator {
261     const TableDefinition* defn;
262     size_t col_index;
263   };
264 
IsLeftJoin()265   bool IsLeftJoin() const { return name() == "span_left_join"; }
IsOuterJoin()266   bool IsOuterJoin() const { return name() == "span_outer_join"; }
267 
partition_col()268   const std::string& partition_col() const {
269     return t1_defn_.IsPartitioned() ? t1_defn_.partition_col()
270                                     : t2_defn_.partition_col();
271   }
272 
273   base::Optional<TableDefinition> CreateTableDefinition(
274       const TableDescriptor& desc,
275       bool emit_shadow_slices);
276 
277   std::vector<std::string> ComputeSqlConstraintsForDefinition(
278       const TableDefinition& defn,
279       const QueryConstraints& qc,
280       sqlite3_value** argv);
281 
282   std::string GetNameForGlobalColumnIndex(const TableDefinition& defn,
283                                           int global_column);
284 
285   void CreateSchemaColsForDefn(const TableDefinition& defn,
286                                std::vector<Table::Column>* cols);
287 
288   TableDefinition t1_defn_;
289   TableDefinition t2_defn_;
290   PartitioningType partitioning_;
291   std::unordered_map<size_t, ColumnLocator> global_index_to_column_locator_;
292 
293   sqlite3* const db_;
294 };
295 
296 }  // namespace trace_processor
297 }  // namespace perfetto
298 
299 #endif  // SRC_TRACE_PROCESSOR_SPAN_JOIN_OPERATOR_TABLE_H_
300