1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/span_join_operator_table.h"
18 
19 #include <sqlite3.h>
20 #include <string.h>
21 #include <algorithm>
22 #include <set>
23 #include <utility>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/string_splitter.h"
27 #include "perfetto/base/string_utils.h"
28 #include "perfetto/base/string_view.h"
29 #include "src/trace_processor/sqlite_utils.h"
30 
31 namespace perfetto {
32 namespace trace_processor {
33 
34 namespace {
35 
36 constexpr char kTsColumnName[] = "ts";
37 constexpr char kDurColumnName[] = "dur";
38 
IsRequiredColumn(const std::string & name)39 bool IsRequiredColumn(const std::string& name) {
40   return name == kTsColumnName || name == kDurColumnName;
41 }
42 
HasDuplicateColumns(const std::vector<Table::Column> & cols)43 bool HasDuplicateColumns(const std::vector<Table::Column>& cols) {
44   std::set<std::string> names;
45   for (const auto& col : cols) {
46     if (names.count(col.name()) > 0) {
47       PERFETTO_ELOG("Column '%s' present in the output schema more than once.",
48                     col.name().c_str());
49       return true;
50     }
51     names.insert(col.name());
52   }
53   return false;
54 }
55 }  // namespace
56 
SpanJoinOperatorTable(sqlite3 * db,const TraceStorage *)57 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
58     : db_(db) {}
59 
RegisterTable(sqlite3 * db,const TraceStorage * storage)60 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
61                                           const TraceStorage* storage) {
62   Table::Register<SpanJoinOperatorTable>(db, storage, "span_join",
63                                          /* read_write */ false,
64                                          /* requires_args */ true);
65 
66   Table::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
67                                          /* read_write */ false,
68                                          /* requires_args */ true);
69 
70   Table::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
71                                          /* read_write */ false,
72                                          /* requires_args */ true);
73 }
74 
Init(int argc,const char * const * argv)75 base::Optional<Table::Schema> SpanJoinOperatorTable::Init(
76     int argc,
77     const char* const* argv) {
78   // argv[0] - argv[2] are SQLite populated fields which are always present.
79   if (argc < 5) {
80     PERFETTO_ELOG("SPAN JOIN expected at least 2 args, received %d", argc - 3);
81     return base::nullopt;
82   }
83 
84   auto maybe_t1_desc = TableDescriptor::Parse(
85       std::string(reinterpret_cast<const char*>(argv[3])));
86   if (!maybe_t1_desc.has_value())
87     return base::nullopt;
88   auto t1_desc = *maybe_t1_desc;
89 
90   auto maybe_t2_desc = TableDescriptor::Parse(
91       std::string(reinterpret_cast<const char*>(argv[4])));
92   if (!maybe_t2_desc.has_value())
93     return base::nullopt;
94   auto t2_desc = *maybe_t2_desc;
95 
96   if (t1_desc.partition_col == t2_desc.partition_col) {
97     partitioning_ = t1_desc.IsPartitioned()
98                         ? PartitioningType::kSamePartitioning
99                         : PartitioningType::kNoPartitioning;
100     if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) {
101       PERFETTO_ELOG("Outer join not supported for no partition tables");
102       return base::nullopt;
103     }
104   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
105     PERFETTO_ELOG("Mismatching partitions (%s, %s)",
106                   t1_desc.partition_col.c_str(), t2_desc.partition_col.c_str());
107     return base::nullopt;
108   } else {
109     if (IsOuterJoin()) {
110       PERFETTO_ELOG("Outer join not supported for mixed partitioned tables");
111       return base::nullopt;
112     }
113     partitioning_ = PartitioningType::kMixedPartitioning;
114   }
115 
116   auto maybe_t1_defn = CreateTableDefinition(t1_desc, IsOuterJoin());
117   if (!maybe_t1_defn.has_value())
118     return base::nullopt;
119   t1_defn_ = maybe_t1_defn.value();
120 
121   auto maybe_t2_defn =
122       CreateTableDefinition(t2_desc, IsOuterJoin() || IsLeftJoin());
123   if (!maybe_t2_defn.has_value())
124     return base::nullopt;
125   t2_defn_ = maybe_t2_defn.value();
126 
127   std::vector<Table::Column> cols;
128   // Ensure the shared columns are consistently ordered and are not
129   // present twice in the final schema
130   cols.emplace_back(Column::kTimestamp, kTsColumnName, ColumnType::kLong);
131   cols.emplace_back(Column::kDuration, kDurColumnName, ColumnType::kLong);
132   if (partitioning_ != PartitioningType::kNoPartitioning)
133     cols.emplace_back(Column::kPartition, partition_col(), ColumnType::kLong);
134 
135   CreateSchemaColsForDefn(t1_defn_, &cols);
136   CreateSchemaColsForDefn(t2_defn_, &cols);
137 
138   if (HasDuplicateColumns(cols)) {
139     return base::nullopt;
140   }
141   std::vector<size_t> primary_keys = {Column::kTimestamp};
142   if (partitioning_ != PartitioningType::kNoPartitioning)
143     primary_keys.push_back(Column::kPartition);
144   return Schema(cols, primary_keys);
145 }
146 
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<Table::Column> * cols)147 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
148     const TableDefinition& defn,
149     std::vector<Table::Column>* cols) {
150   for (size_t i = 0; i < defn.columns().size(); i++) {
151     const auto& n = defn.columns()[i].name();
152     if (IsRequiredColumn(n) || n == defn.partition_col())
153       continue;
154 
155     ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
156     locator->defn = &defn;
157     locator->col_index = i;
158 
159     cols->emplace_back(cols->size(), n, defn.columns()[i].type());
160   }
161 }
162 
CreateCursor()163 std::unique_ptr<Table::Cursor> SpanJoinOperatorTable::CreateCursor() {
164   return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
165 }
166 
BestIndex(const QueryConstraints &,BestIndexInfo *)167 int SpanJoinOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
168   // TODO(lalitm): figure out cost estimation.
169   return SQLITE_OK;
170 }
171 
172 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)173 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
174     const TableDefinition& defn,
175     const QueryConstraints& qc,
176     sqlite3_value** argv) {
177   std::vector<std::string> constraints;
178   for (size_t i = 0; i < qc.constraints().size(); i++) {
179     const auto& cs = qc.constraints()[i];
180     auto col_name = GetNameForGlobalColumnIndex(defn, cs.iColumn);
181     if (col_name == "")
182       continue;
183 
184     if (col_name == kTsColumnName || col_name == kDurColumnName) {
185       // We don't support constraints on ts or duration in the child tables.
186       PERFETTO_DFATAL("ts or duration constraints on child tables");
187       continue;
188     }
189     auto op = sqlite_utils::OpToString(cs.op);
190     auto value = sqlite_utils::SqliteValueAsString(argv[i]);
191 
192     constraints.emplace_back("`" + col_name + "`" + op + value);
193   }
194   return constraints;
195 }
196 
197 base::Optional<SpanJoinOperatorTable::TableDefinition>
CreateTableDefinition(const TableDescriptor & desc,bool emit_shadow_slices)198 SpanJoinOperatorTable::CreateTableDefinition(const TableDescriptor& desc,
199                                              bool emit_shadow_slices) {
200   auto cols = sqlite_utils::GetColumnsForTable(db_, desc.name);
201 
202   uint32_t required_columns_found = 0;
203   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
204   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
205   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
206   for (uint32_t i = 0; i < cols.size(); i++) {
207     auto col = cols[i];
208     if (IsRequiredColumn(col.name())) {
209       ++required_columns_found;
210       if (col.type() != Table::ColumnType::kLong &&
211           col.type() != Table::ColumnType::kUnknown) {
212         PERFETTO_ELOG("Invalid column type for %s", col.name().c_str());
213         return base::nullopt;
214       }
215     }
216 
217     if (col.name() == kTsColumnName) {
218       ts_idx = i;
219     } else if (col.name() == kDurColumnName) {
220       dur_idx = i;
221     } else if (col.name() == desc.partition_col) {
222       partition_idx = i;
223     }
224   }
225   if (required_columns_found != 2) {
226     PERFETTO_ELOG("Required columns not found (found %d)",
227                   required_columns_found);
228     return base::nullopt;
229   }
230 
231   PERFETTO_DCHECK(ts_idx < cols.size());
232   PERFETTO_DCHECK(dur_idx < cols.size());
233   PERFETTO_DCHECK(desc.partition_col.empty() || partition_idx < cols.size());
234 
235   return TableDefinition(desc.name, desc.partition_col, std::move(cols),
236                          emit_shadow_slices, ts_idx, dur_idx, partition_idx);
237 }
238 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)239 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
240     const TableDefinition& defn,
241     int global_column) {
242   size_t col_idx = static_cast<size_t>(global_column);
243   if (col_idx == Column::kTimestamp)
244     return kTsColumnName;
245   else if (col_idx == Column::kDuration)
246     return kDurColumnName;
247   else if (col_idx == Column::kPartition &&
248            partitioning_ != PartitioningType::kNoPartitioning)
249     return defn.partition_col().c_str();
250 
251   const auto& locator = global_index_to_column_locator_[col_idx];
252   if (locator.defn != &defn)
253     return "";
254   return defn.columns()[locator.col_index].name().c_str();
255 }
256 
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)257 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
258     : Table::Cursor(table),
259       t1_(table, &table->t1_defn_, db),
260       t2_(table, &table->t2_defn_, db),
261       table_(table) {}
262 
Filter(const QueryConstraints & qc,sqlite3_value ** argv)263 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
264                                           sqlite3_value** argv) {
265   int err = t1_.Initialize(qc, argv);
266   if (err != SQLITE_OK)
267     return err;
268 
269   err = t2_.Initialize(qc, argv);
270   if (err != SQLITE_OK)
271     return err;
272 
273   // Step the partitioned table to allow for us to look into it below.
274   Query* step_now = t1_.IsPartitioned() ? &t1_ : &t2_;
275   next_stepped_ = step_now == &t1_ ? &t2_ : &t1_;
276 
277   auto res = step_now->Step();
278   if (PERFETTO_UNLIKELY(res.is_err()))
279     return res.err_code;
280 
281   // Forward the unpartitioned table to reflect the partition of the partitoined
282   // table.
283   if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
284     PERFETTO_DCHECK(step_now->IsPartitioned());
285 
286     // If we emit shadow slices, we need to step because the first slice will
287     // be a full partition shadow slice that we need to skip.
288     if (step_now->definition()->emit_shadow_slices()) {
289       PERFETTO_DCHECK(step_now->IsFullPartitionShadowSlice());
290       res = step_now->StepToNextPartition();
291       if (PERFETTO_UNLIKELY(res.is_err()))
292         return res.err_code;
293     }
294 
295     res = next_stepped_->StepToPartition(step_now->partition());
296     if (PERFETTO_UNLIKELY(res.is_err()))
297       return res.err_code;
298   }
299 
300   // Otherwise, find an overlapping span.
301   return Next();
302 }
303 
IsOverlappingSpan()304 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
305   if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) {
306     return false;
307   } else if (t1_.partition() != t2_.partition()) {
308     return false;
309   } else if (t1_.ts_end() <= t2_.ts_start() || t2_.ts_end() <= t1_.ts_start()) {
310     return false;
311   }
312   return true;
313 }
314 
Next()315 int SpanJoinOperatorTable::Cursor::Next() {
316   // TODO: Propagate error msg to the table.
317   auto res = next_stepped_->Step();
318   if (res.is_err())
319     return res.err_code;
320 
321   while (true) {
322     if (t1_.Eof() || t2_.Eof()) {
323       if (table_->partitioning_ != PartitioningType::kMixedPartitioning)
324         return SQLITE_OK;
325 
326       auto* partitioned = t1_.IsPartitioned() ? &t1_ : &t2_;
327       auto* unpartitioned = t1_.IsPartitioned() ? &t2_ : &t1_;
328       if (partitioned->Eof())
329         return SQLITE_OK;
330 
331       res = partitioned->StepToNextPartition();
332       if (PERFETTO_UNLIKELY(res.is_err()))
333         return res.err_code;
334       else if (PERFETTO_UNLIKELY(res.is_eof()))
335         continue;
336 
337       res = unpartitioned->StepToPartition(partitioned->partition());
338       if (PERFETTO_UNLIKELY(res.is_err()))
339         return res.err_code;
340       else if (PERFETTO_UNLIKELY(res.is_eof()))
341         continue;
342     }
343 
344     int64_t partition = std::max(t1_.partition(), t2_.partition());
345     res = t1_.StepToPartition(partition);
346     if (PERFETTO_UNLIKELY(res.is_err()))
347       return res.err_code;
348     else if (PERFETTO_UNLIKELY(res.is_eof()))
349       continue;
350 
351     res = t2_.StepToPartition(t1_.partition());
352     if (PERFETTO_UNLIKELY(res.is_err()))
353       return res.err_code;
354     else if (PERFETTO_UNLIKELY(res.is_eof()))
355       continue;
356 
357     if (t1_.partition() != t2_.partition())
358       continue;
359 
360     auto ts = std::max(t1_.ts_start(), t2_.ts_start());
361     res = t1_.StepUntil(ts);
362     if (PERFETTO_UNLIKELY(res.is_err()))
363       return res.err_code;
364     else if (PERFETTO_UNLIKELY(res.is_eof()))
365       continue;
366 
367     res = t2_.StepUntil(t1_.ts_start());
368     if (PERFETTO_UNLIKELY(res.is_err()))
369       return res.err_code;
370     else if (PERFETTO_UNLIKELY(res.is_eof()))
371       continue;
372 
373     // If we're in the case where we have shadow slices on both tables, try
374     // and forward the earliest table and see what happens. IsOverlappingSpan()
375     // will double check that we have at least one non-real slice now.
376     // Note: if we don't do this, we end up in an infinite loop because all
377     // the code above will not change anything because these shadow slices will
378     // be overlapping.
379     if (!t1_.IsRealSlice() && !t2_.IsRealSlice()) {
380       PERFETTO_DCHECK(t1_.partition() == t2_.partition());
381 
382       // If the table is not partitioned, partition() will return the partition
383       // the table was set to have by StepToPartition().
384       auto t1_partition =
385           t1_.IsPartitioned() ? t1_.CursorPartition() : t1_.partition();
386       auto t2_partition =
387           t2_.IsPartitioned() ? t2_.CursorPartition() : t2_.partition();
388 
389       Query* stepped;
390       if (t1_partition == t2_partition) {
391         stepped = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_;
392       } else {
393         stepped = t1_partition <= t2_partition ? &t1_ : &t2_;
394       }
395 
396       res = stepped->Step();
397       if (PERFETTO_UNLIKELY(res.is_err()))
398         return res.err_code;
399       else if (PERFETTO_UNLIKELY(res.is_eof()))
400         continue;
401     }
402 
403     if (IsOverlappingSpan())
404       break;
405   }
406   next_stepped_ = t1_.ts_end() <= t2_.ts_end() ? &t1_ : &t2_;
407 
408   return SQLITE_OK;
409 }
410 
Eof()411 int SpanJoinOperatorTable::Cursor::Eof() {
412   return t1_.Eof() || t2_.Eof();
413 }
414 
Column(sqlite3_context * context,int N)415 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
416   PERFETTO_DCHECK(!t1_.Eof());
417   PERFETTO_DCHECK(!t2_.Eof());
418 
419   if (N == Column::kTimestamp) {
420     auto max_ts = std::max(t1_.ts_start(), t2_.ts_start());
421     sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
422   } else if (N == Column::kDuration) {
423     auto max_start = std::max(t1_.ts_start(), t2_.ts_start());
424     auto min_end = std::min(t1_.ts_end(), t2_.ts_end());
425     PERFETTO_DCHECK(min_end > max_start);
426     auto dur = min_end - max_start;
427     sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
428   } else if (N == Column::kPartition &&
429              table_->partitioning_ != PartitioningType::kNoPartitioning) {
430     PERFETTO_DCHECK(table_->partitioning_ ==
431                         PartitioningType::kMixedPartitioning ||
432                     t1_.partition() == t2_.partition());
433     auto partition = t1_.IsPartitioned() ? t1_.partition() : t2_.partition();
434     sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
435   } else {
436     size_t index = static_cast<size_t>(N);
437     const auto& locator = table_->global_index_to_column_locator_[index];
438     if (locator.defn == t1_.definition())
439       t1_.ReportSqliteResult(context, locator.col_index);
440     else
441       t2_.ReportSqliteResult(context, locator.col_index);
442   }
443   return SQLITE_OK;
444 }
445 
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)446 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
447                                     const TableDefinition* definition,
448                                     sqlite3* db)
449     : defn_(definition), db_(db), table_(table) {
450   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
451                   defn_->partition_idx() < defn_->columns().size());
452 }
453 
454 SpanJoinOperatorTable::Query::~Query() = default;
455 
Initialize(const QueryConstraints & qc,sqlite3_value ** argv)456 int SpanJoinOperatorTable::Query::Initialize(const QueryConstraints& qc,
457                                              sqlite3_value** argv) {
458   *this = Query(table_, definition(), db_);
459   sql_query_ = CreateSqlQuery(
460       table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
461   return PrepareRawStmt();
462 }
463 
Step()464 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::Step() {
465   PERFETTO_DCHECK(!Eof());
466   sqlite3_stmt* stmt = stmt_.get();
467 
468   // In this loop, we will try and find the slice to from the cursor.
469   // Terminology: "Shadow slices" are slices which fill in the gaps between real
470   // slices from the underlying cursor in each partition (if any).
471   // For queries which don't need "shadow slices", we simply return non-zero
472   // duration slices from the underlying cursor.
473   do {
474     if (mode_ == Mode::kShadowSlice) {
475       PERFETTO_DCHECK(defn_->emit_shadow_slices());
476 
477       // If we're out of slices in the cursor, this shadow slice will be the
478       // final slice.
479       if (cursor_eof_) {
480         mode_ = Mode::kRealSlice;
481         return StepRet(StepRet::Code::kEof);
482       }
483 
484       // Look ahead to see if the cursor changes partition (if the cursor is
485       // partitioned). If so, then we need to fill the gap between the ts == 0
486       // and the start of that slice. Otherwise after this slice, we have the
487       // real slice from the cursor.
488       if (!defn_->IsPartitioned() || partition_ == CursorPartition()) {
489         mode_ = Mode::kRealSlice;
490         ts_start_ = CursorTs();
491         ts_end_ = ts_start_ + CursorDur();
492       } else if (IsFullPartitionShadowSlice()) {
493         mode_ = Mode::kShadowSlice;
494         ts_start_ = 0;
495         ts_end_ = CursorTs();
496         partition_ = CursorPartition();
497       } else {
498         mode_ = Mode::kShadowSlice;
499         ts_start_ = 0;
500         ts_end_ = std::numeric_limits<int64_t>::max();
501       }
502       continue;
503     }
504 
505     int res;
506     if (defn_->IsPartitioned()) {
507       auto partition_idx = static_cast<int>(defn_->partition_idx());
508       // Fastforward through any rows with null partition keys.
509       int row_type;
510       do {
511         res = sqlite3_step(stmt);
512         row_type = sqlite3_column_type(stmt, partition_idx);
513       } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
514     } else {
515       res = sqlite3_step(stmt);
516     }
517 
518     if (res == SQLITE_ROW) {
519       // After every row, there will be a shadow slice so emit that if we need
520       // to do so. Otherwise, just emit the underlying slice.
521       if (defn_->emit_shadow_slices()) {
522         mode_ = Mode::kShadowSlice;
523         ts_start_ = ts_end_;
524         ts_end_ = !defn_->IsPartitioned() || partition_ == CursorPartition()
525                       ? CursorTs()
526                       : std::numeric_limits<int64_t>::max();
527       } else {
528         mode_ = Mode::kRealSlice;
529         ts_start_ = CursorTs();
530         ts_end_ = ts_start_ + CursorDur();
531         if (defn_->IsPartitioned())
532           partition_ = CursorPartition();
533       }
534     } else if (res == SQLITE_DONE) {
535       cursor_eof_ = true;
536       if (!defn_->emit_shadow_slices())
537         return StepRet(StepRet::Code::kEof);
538 
539       // Close off the remainder of this partition with a shadow slice.
540       mode_ = Mode::kShadowSlice;
541       ts_start_ = ts_end_;
542       ts_end_ = std::numeric_limits<int64_t>::max();
543     } else {
544       return StepRet(StepRet::Code::kError, res);
545     }
546   } while (ts_start_ == ts_end_);
547 
548   return StepRet(StepRet::Code::kRow);
549 }
550 
551 SpanJoinOperatorTable::Query::StepRet
StepToNextPartition()552 SpanJoinOperatorTable::Query::StepToNextPartition() {
553   PERFETTO_DCHECK(defn_->IsPartitioned());
554   PERFETTO_DCHECK(!Eof());
555 
556   auto current_partition = partition_;
557   while (partition_ <= current_partition) {
558     auto res = Step();
559     if (!res.is_row())
560       return res;
561   }
562   return StepRet(StepRet::Code::kRow);
563 }
564 
565 SpanJoinOperatorTable::Query::StepRet
StepToPartition(int64_t target_partition)566 SpanJoinOperatorTable::Query::StepToPartition(int64_t target_partition) {
567   PERFETTO_DCHECK(partition_ <= target_partition);
568   if (defn_->IsPartitioned()) {
569     while (partition_ < target_partition) {
570       if (IsFullPartitionShadowSlice() &&
571           target_partition < CursorPartition()) {
572         partition_ = target_partition;
573         return StepRet(StepRet::Code::kRow);
574       }
575 
576       auto res = Step();
577       if (!res.is_row())
578         return res;
579     }
580   } else if (/* !defn_->IsPartitioned() && */ partition_ < target_partition) {
581     int res = PrepareRawStmt();
582     if (res != SQLITE_OK)
583       return StepRet(StepRet::Code::kError, res);
584     partition_ = target_partition;
585   }
586   return StepRet(StepRet::Code::kRow);
587 }
588 
StepUntil(int64_t timestamp)589 SpanJoinOperatorTable::Query::StepRet SpanJoinOperatorTable::Query::StepUntil(
590     int64_t timestamp) {
591   PERFETTO_DCHECK(!Eof());
592   auto partition = partition_;
593   while (partition_ == partition && ts_end_ <= timestamp) {
594     auto res = Step();
595     if (!res.is_row())
596       return res;
597   }
598   return StepRet(StepRet::Code::kRow);
599 }
600 
CreateSqlQuery(const std::vector<std::string> & cs) const601 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
602     const std::vector<std::string>& cs) const {
603   std::vector<std::string> col_names;
604   for (const Table::Column& c : defn_->columns()) {
605     col_names.push_back("`" + c.name() + "`");
606   }
607 
608   std::string sql = "SELECT " + base::Join(col_names, ", ");
609   sql += " FROM " + defn_->name();
610   if (!cs.empty()) {
611     sql += " WHERE " + base::Join(cs, " AND ");
612   }
613   sql += " ORDER BY ";
614   sql += defn_->IsPartitioned()
615              ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
616              : "ts";
617   sql += ";";
618   PERFETTO_DLOG("%s", sql.c_str());
619   return sql;
620 }
621 
PrepareRawStmt()622 int SpanJoinOperatorTable::Query::PrepareRawStmt() {
623   sqlite3_stmt* stmt = nullptr;
624   int err =
625       sqlite3_prepare_v2(db_, sql_query_.c_str(),
626                          static_cast<int>(sql_query_.size()), &stmt, nullptr);
627   stmt_.reset(stmt);
628 
629   ts_start_ = 0;
630   ts_end_ = 0;
631   partition_ = std::numeric_limits<int64_t>::lowest();
632   cursor_eof_ = false;
633   mode_ = Mode::kRealSlice;
634 
635   return err;
636 }
637 
ReportSqliteResult(sqlite3_context * context,size_t index)638 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
639                                                       size_t index) {
640   if (mode_ != Mode::kRealSlice) {
641     sqlite3_result_null(context);
642     return;
643   }
644 
645   sqlite3_stmt* stmt = stmt_.get();
646   int idx = static_cast<int>(index);
647   switch (sqlite3_column_type(stmt, idx)) {
648     case SQLITE_INTEGER:
649       sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
650       break;
651     case SQLITE_FLOAT:
652       sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
653       break;
654     case SQLITE_TEXT: {
655       // TODO(lalitm): note for future optimizations: if we knew the addresses
656       // of the string intern pool, we could check if the string returned here
657       // comes from the pool, and pass it as non-transient.
658       const auto kSqliteTransient =
659           reinterpret_cast<sqlite3_destructor_type>(-1);
660       auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
661       sqlite3_result_text(context, ptr, -1, kSqliteTransient);
662       break;
663     }
664   }
665 }
666 
TableDefinition(std::string name,std::string partition_col,std::vector<Table::Column> cols,bool emit_shadow_slices,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)667 SpanJoinOperatorTable::TableDefinition::TableDefinition(
668     std::string name,
669     std::string partition_col,
670     std::vector<Table::Column> cols,
671     bool emit_shadow_slices,
672     uint32_t ts_idx,
673     uint32_t dur_idx,
674     uint32_t partition_idx)
675     : name_(std::move(name)),
676       partition_col_(std::move(partition_col)),
677       cols_(std::move(cols)),
678       emit_shadow_slices_(emit_shadow_slices),
679       ts_idx_(ts_idx),
680       dur_idx_(dur_idx),
681       partition_idx_(partition_idx) {}
682 
683 base::Optional<SpanJoinOperatorTable::TableDescriptor>
Parse(const std::string & raw_descriptor)684 SpanJoinOperatorTable::TableDescriptor::Parse(
685     const std::string& raw_descriptor) {
686   // Descriptors have one of the following forms:
687   // table_name [PARTITIONED column_name]
688 
689   // Find the table name.
690   base::StringSplitter splitter(raw_descriptor, ' ');
691   if (!splitter.Next())
692     return base::nullopt;
693 
694   TableDescriptor descriptor;
695   descriptor.name = splitter.cur_token();
696   if (!splitter.Next())
697     return std::move(descriptor);
698 
699   if (strcasecmp(splitter.cur_token(), "PARTITIONED") != 0) {
700     PERFETTO_ELOG("Invalid SPAN_JOIN token %s", splitter.cur_token());
701     return base::nullopt;
702   }
703   if (!splitter.Next()) {
704     PERFETTO_ELOG("Missing partitioning column");
705     return base::nullopt;
706   }
707 
708   descriptor.partition_col = splitter.cur_token();
709   return std::move(descriptor);
710 }
711 
712 }  // namespace trace_processor
713 }  // namespace perfetto
714