1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/sqlite/span_join_operator_table.h"
18 
19 #include <sqlite3.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <set>
24 #include <utility>
25 
26 #include "perfetto/base/logging.h"
27 #include "perfetto/ext/base/string_splitter.h"
28 #include "perfetto/ext/base/string_utils.h"
29 #include "perfetto/ext/base/string_view.h"
30 #include "src/trace_processor/sqlite/sqlite_utils.h"
31 #include "src/trace_processor/tp_metatrace.h"
32 
33 namespace perfetto {
34 namespace trace_processor {
35 
36 namespace {
37 
38 constexpr char kTsColumnName[] = "ts";
39 constexpr char kDurColumnName[] = "dur";
40 
IsRequiredColumn(const std::string & name)41 bool IsRequiredColumn(const std::string& name) {
42   return name == kTsColumnName || name == kDurColumnName;
43 }
44 
HasDuplicateColumns(const std::vector<SqliteTable::Column> & cols)45 base::Optional<std::string> HasDuplicateColumns(
46     const std::vector<SqliteTable::Column>& cols) {
47   std::set<std::string> names;
48   for (const auto& col : cols) {
49     if (names.count(col.name()) > 0)
50       return col.name();
51     names.insert(col.name());
52   }
53   return base::nullopt;
54 }
55 
OpToString(int op)56 std::string OpToString(int op) {
57   switch (op) {
58     case SQLITE_INDEX_CONSTRAINT_EQ:
59       return "=";
60     case SQLITE_INDEX_CONSTRAINT_NE:
61       return "!=";
62     case SQLITE_INDEX_CONSTRAINT_GE:
63       return ">=";
64     case SQLITE_INDEX_CONSTRAINT_GT:
65       return ">";
66     case SQLITE_INDEX_CONSTRAINT_LE:
67       return "<=";
68     case SQLITE_INDEX_CONSTRAINT_LT:
69       return "<";
70     case SQLITE_INDEX_CONSTRAINT_LIKE:
71       return "like";
72     case SQLITE_INDEX_CONSTRAINT_ISNULL:
73       // The "null" will be added below in EscapedSqliteValueAsString.
74       return " is ";
75     case SQLITE_INDEX_CONSTRAINT_ISNOTNULL:
76       // The "null" will be added below in EscapedSqliteValueAsString.
77       return " is not";
78     default:
79       PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
80   }
81 }
82 
EscapedSqliteValueAsString(sqlite3_value * value)83 std::string EscapedSqliteValueAsString(sqlite3_value* value) {
84   switch (sqlite3_value_type(value)) {
85     case SQLITE_INTEGER:
86       return std::to_string(sqlite3_value_int64(value));
87     case SQLITE_FLOAT:
88       return std::to_string(sqlite3_value_double(value));
89     case SQLITE_TEXT: {
90       // If str itself contains a single quote, we need to escape it with
91       // another single quote.
92       const char* str =
93           reinterpret_cast<const char*>(sqlite3_value_text(value));
94       return "'" + base::ReplaceAll(str, "'", "''") + "'";
95     }
96     case SQLITE_NULL:
97       return " null";
98     default:
99       PERFETTO_FATAL("Unknown value type %d", sqlite3_value_type(value));
100   }
101 }
102 
103 }  // namespace
104 
SpanJoinOperatorTable(sqlite3 * db,const TraceStorage *)105 SpanJoinOperatorTable::SpanJoinOperatorTable(sqlite3* db, const TraceStorage*)
106     : db_(db) {}
107 
RegisterTable(sqlite3 * db,const TraceStorage * storage)108 void SpanJoinOperatorTable::RegisterTable(sqlite3* db,
109                                           const TraceStorage* storage) {
110   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_join",
111                                                /* read_write */ false,
112                                                /* requires_args */ true);
113 
114   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_left_join",
115                                                /* read_write */ false,
116                                                /* requires_args */ true);
117 
118   SqliteTable::Register<SpanJoinOperatorTable>(db, storage, "span_outer_join",
119                                                /* read_write */ false,
120                                                /* requires_args */ true);
121 }
122 
Init(int argc,const char * const * argv,Schema * schema)123 util::Status SpanJoinOperatorTable::Init(int argc,
124                                          const char* const* argv,
125                                          Schema* schema) {
126   // argv[0] - argv[2] are SQLite populated fields which are always present.
127   if (argc < 5)
128     return util::Status("SPAN_JOIN: expected at least 2 args");
129 
130   TableDescriptor t1_desc;
131   auto status = TableDescriptor::Parse(
132       std::string(reinterpret_cast<const char*>(argv[3])), &t1_desc);
133   if (!status.ok())
134     return status;
135 
136   TableDescriptor t2_desc;
137   status = TableDescriptor::Parse(
138       std::string(reinterpret_cast<const char*>(argv[4])), &t2_desc);
139   if (!status.ok())
140     return status;
141 
142   // Check that the partition columns match between the two tables.
143   if (t1_desc.partition_col == t2_desc.partition_col) {
144     partitioning_ = t1_desc.IsPartitioned()
145                         ? PartitioningType::kSamePartitioning
146                         : PartitioningType::kNoPartitioning;
147     if (partitioning_ == PartitioningType::kNoPartitioning && IsOuterJoin()) {
148       return util::ErrStatus(
149           "SPAN_JOIN: Outer join not supported for no partition tables");
150     }
151   } else if (t1_desc.IsPartitioned() && t2_desc.IsPartitioned()) {
152     return util::ErrStatus(
153         "SPAN_JOIN: mismatching partitions between the two tables; "
154         "(partition %s in table %s, partition %s in table %s)",
155         t1_desc.partition_col.c_str(), t1_desc.name.c_str(),
156         t2_desc.partition_col.c_str(), t2_desc.name.c_str());
157   } else {
158     if (IsOuterJoin()) {
159       return util::ErrStatus(
160           "SPAN_JOIN: Outer join not supported for mixed partitioned tables");
161     }
162     partitioning_ = PartitioningType::kMixedPartitioning;
163   }
164 
165   bool t1_part_mixed = t1_desc.IsPartitioned() &&
166                        partitioning_ == PartitioningType::kMixedPartitioning;
167   bool t2_part_mixed = t2_desc.IsPartitioned() &&
168                        partitioning_ == PartitioningType::kMixedPartitioning;
169 
170   EmitShadowType t1_shadow_type;
171   if (IsOuterJoin()) {
172     if (t1_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
173       t1_shadow_type = EmitShadowType::kPresentPartitionOnly;
174     } else {
175       t1_shadow_type = EmitShadowType::kAll;
176     }
177   } else {
178     t1_shadow_type = EmitShadowType::kNone;
179   }
180   status = CreateTableDefinition(t1_desc, t1_shadow_type, &t1_defn_);
181   if (!status.ok())
182     return status;
183 
184   EmitShadowType t2_shadow_type;
185   if (IsOuterJoin() || IsLeftJoin()) {
186     if (t2_part_mixed || partitioning_ == PartitioningType::kNoPartitioning) {
187       t2_shadow_type = EmitShadowType::kPresentPartitionOnly;
188     } else {
189       t2_shadow_type = EmitShadowType::kAll;
190     }
191   } else {
192     t2_shadow_type = EmitShadowType::kNone;
193   }
194   status = CreateTableDefinition(t2_desc, t2_shadow_type, &t2_defn_);
195   if (!status.ok())
196     return status;
197 
198   std::vector<SqliteTable::Column> cols;
199   // Ensure the shared columns are consistently ordered and are not
200   // present twice in the final schema
201   cols.emplace_back(Column::kTimestamp, kTsColumnName, SqlValue::Type::kLong);
202   cols.emplace_back(Column::kDuration, kDurColumnName, SqlValue::Type::kLong);
203   if (partitioning_ != PartitioningType::kNoPartitioning)
204     cols.emplace_back(Column::kPartition, partition_col(),
205                       SqlValue::Type::kLong);
206 
207   CreateSchemaColsForDefn(t1_defn_, &cols);
208   CreateSchemaColsForDefn(t2_defn_, &cols);
209 
210   // Check if any column has : in its name. This often happens when SELECT *
211   // is used to create a view with the same column name in two joined tables.
212   for (const auto& col : cols) {
213     if (col.name().find(':') != std::string::npos) {
214       return util::ErrStatus("SPAN_JOIN: column %s has illegal character :",
215                              col.name().c_str());
216     }
217   }
218 
219   if (auto opt_dupe_col = HasDuplicateColumns(cols)) {
220     return util::ErrStatus(
221         "SPAN_JOIN: column %s present in both tables %s and %s",
222         opt_dupe_col->c_str(), t1_defn_.name().c_str(),
223         t2_defn_.name().c_str());
224   }
225   std::vector<size_t> primary_keys = {Column::kTimestamp};
226   if (partitioning_ != PartitioningType::kNoPartitioning)
227     primary_keys.push_back(Column::kPartition);
228   *schema = Schema(cols, primary_keys);
229 
230   return util::OkStatus();
231 }
232 
CreateSchemaColsForDefn(const TableDefinition & defn,std::vector<SqliteTable::Column> * cols)233 void SpanJoinOperatorTable::CreateSchemaColsForDefn(
234     const TableDefinition& defn,
235     std::vector<SqliteTable::Column>* cols) {
236   for (size_t i = 0; i < defn.columns().size(); i++) {
237     const auto& n = defn.columns()[i].name();
238     if (IsRequiredColumn(n) || n == defn.partition_col())
239       continue;
240 
241     ColumnLocator* locator = &global_index_to_column_locator_[cols->size()];
242     locator->defn = &defn;
243     locator->col_index = i;
244 
245     cols->emplace_back(cols->size(), n, defn.columns()[i].type());
246   }
247 }
248 
CreateCursor()249 std::unique_ptr<SqliteTable::Cursor> SpanJoinOperatorTable::CreateCursor() {
250   return std::unique_ptr<SpanJoinOperatorTable::Cursor>(new Cursor(this, db_));
251 }
252 
BestIndex(const QueryConstraints & qc,BestIndexInfo * info)253 int SpanJoinOperatorTable::BestIndex(const QueryConstraints& qc,
254                                      BestIndexInfo* info) {
255   // TODO(lalitm): figure out cost estimation.
256   const auto& ob = qc.order_by();
257 
258   if (partitioning_ == PartitioningType::kNoPartitioning) {
259     // If both tables are not partitioned and we have a single order by on ts,
260     // we return data in the correct order.
261     info->sqlite_omit_order_by =
262         ob.size() == 1 && ob[0].iColumn == Column::kTimestamp && !ob[0].desc;
263   } else {
264     // If one of the tables is partitioned, and we have an order by on the
265     // partition column followed (optionally) by an order by on timestamp, we
266     // return data in the correct order.
267     bool is_first_ob_partition =
268         ob.size() >= 1 && ob[0].iColumn == Column::kPartition && !ob[0].desc;
269     bool is_second_ob_ts =
270         ob.size() >= 2 && ob[1].iColumn == Column::kTimestamp && !ob[1].desc;
271     info->sqlite_omit_order_by =
272         (ob.size() == 1 && is_first_ob_partition) ||
273         (ob.size() == 2 && is_first_ob_partition && is_second_ob_ts);
274   }
275 
276   const auto& cs = qc.constraints();
277   for (uint32_t i = 0; i < cs.size(); ++i) {
278     if (cs[i].op == kSourceGeqOpCode) {
279       info->sqlite_omit_constraint[i] = true;
280     }
281   }
282 
283   return SQLITE_OK;
284 }
285 
FindFunction(const char * name,FindFunctionFn * fn,void **)286 int SpanJoinOperatorTable::FindFunction(const char* name,
287                                         FindFunctionFn* fn,
288                                         void**) {
289   if (base::CaseInsensitiveEqual(name, "source_geq")) {
290     *fn = [](sqlite3_context* ctx, int, sqlite3_value**) {
291       sqlite3_result_error(ctx, "Should not be called.", -1);
292     };
293     return kSourceGeqOpCode;
294   }
295   return 0;
296 }
297 
298 std::vector<std::string>
ComputeSqlConstraintsForDefinition(const TableDefinition & defn,const QueryConstraints & qc,sqlite3_value ** argv)299 SpanJoinOperatorTable::ComputeSqlConstraintsForDefinition(
300     const TableDefinition& defn,
301     const QueryConstraints& qc,
302     sqlite3_value** argv) {
303   std::vector<std::string> constraints;
304   for (size_t i = 0; i < qc.constraints().size(); i++) {
305     const auto& cs = qc.constraints()[i];
306     auto col_name = GetNameForGlobalColumnIndex(defn, cs.column);
307     if (col_name.empty())
308       continue;
309 
310     // Le constraints can be passed straight to the child tables as they won't
311     // affect the span join computation. Similarily, source_geq constraints
312     // explicitly request that they are passed as geq constraints to the source
313     // tables.
314     if (col_name == kTsColumnName && !sqlite_utils::IsOpLe(cs.op) &&
315         cs.op != kSourceGeqOpCode)
316       continue;
317 
318     // Allow SQLite handle any constraints on duration apart from source_geq
319     // constraints.
320     if (col_name == kDurColumnName && cs.op != kSourceGeqOpCode)
321       continue;
322 
323     // If we're emitting shadow slices, don't propogate any constraints
324     // on this table as this will break the shadow slice computation.
325     if (defn.ShouldEmitPresentPartitionShadow())
326       continue;
327 
328     auto op = OpToString(cs.op == kSourceGeqOpCode ? SQLITE_INDEX_CONSTRAINT_GE
329                                                    : cs.op);
330     auto value = EscapedSqliteValueAsString(argv[i]);
331 
332     constraints.emplace_back("`" + col_name + "`" + op + value);
333   }
334   return constraints;
335 }
336 
CreateTableDefinition(const TableDescriptor & desc,EmitShadowType emit_shadow_type,SpanJoinOperatorTable::TableDefinition * defn)337 util::Status SpanJoinOperatorTable::CreateTableDefinition(
338     const TableDescriptor& desc,
339     EmitShadowType emit_shadow_type,
340     SpanJoinOperatorTable::TableDefinition* defn) {
341   if (desc.partition_col == kTsColumnName ||
342       desc.partition_col == kDurColumnName) {
343     return util::ErrStatus(
344         "SPAN_JOIN: partition column cannot be any of {ts, dur} for table %s",
345         desc.name.c_str());
346   }
347 
348   std::vector<SqliteTable::Column> cols;
349   auto status = sqlite_utils::GetColumnsForTable(db_, desc.name, cols);
350   if (!status.ok()) {
351     return status;
352   }
353 
354   uint32_t required_columns_found = 0;
355   uint32_t ts_idx = std::numeric_limits<uint32_t>::max();
356   uint32_t dur_idx = std::numeric_limits<uint32_t>::max();
357   uint32_t partition_idx = std::numeric_limits<uint32_t>::max();
358   for (uint32_t i = 0; i < cols.size(); i++) {
359     auto col = cols[i];
360     if (IsRequiredColumn(col.name())) {
361       ++required_columns_found;
362       if (col.type() != SqlValue::Type::kLong &&
363           col.type() != SqlValue::Type::kNull) {
364         return util::ErrStatus(
365             "SPAN_JOIN: Invalid type for column %s in table %s",
366             col.name().c_str(), desc.name.c_str());
367       }
368     }
369 
370     if (col.name() == kTsColumnName) {
371       ts_idx = i;
372     } else if (col.name() == kDurColumnName) {
373       dur_idx = i;
374     } else if (col.name() == desc.partition_col) {
375       partition_idx = i;
376     }
377   }
378   if (required_columns_found != 2) {
379     return util::ErrStatus(
380         "SPAN_JOIN: Missing one of columns {ts, dur} in table %s",
381         desc.name.c_str());
382   } else if (desc.IsPartitioned() && partition_idx >= cols.size()) {
383     return util::ErrStatus("SPAN_JOIN: Missing partition column %s in table %s",
384                            desc.partition_col.c_str(), desc.name.c_str());
385   }
386 
387   PERFETTO_DCHECK(ts_idx < cols.size());
388   PERFETTO_DCHECK(dur_idx < cols.size());
389 
390   *defn = TableDefinition(desc.name, desc.partition_col, std::move(cols),
391                           emit_shadow_type, ts_idx, dur_idx, partition_idx);
392   return util::OkStatus();
393 }
394 
GetNameForGlobalColumnIndex(const TableDefinition & defn,int global_column)395 std::string SpanJoinOperatorTable::GetNameForGlobalColumnIndex(
396     const TableDefinition& defn,
397     int global_column) {
398   size_t col_idx = static_cast<size_t>(global_column);
399   if (col_idx == Column::kTimestamp)
400     return kTsColumnName;
401   else if (col_idx == Column::kDuration)
402     return kDurColumnName;
403   else if (col_idx == Column::kPartition &&
404            partitioning_ != PartitioningType::kNoPartitioning)
405     return defn.partition_col().c_str();
406 
407   const auto& locator = global_index_to_column_locator_[col_idx];
408   if (locator.defn != &defn)
409     return "";
410   return defn.columns()[locator.col_index].name().c_str();
411 }
412 
Cursor(SpanJoinOperatorTable * table,sqlite3 * db)413 SpanJoinOperatorTable::Cursor::Cursor(SpanJoinOperatorTable* table, sqlite3* db)
414     : SqliteTable::Cursor(table),
415       t1_(table, &table->t1_defn_, db),
416       t2_(table, &table->t2_defn_, db),
417       table_(table) {}
418 
Filter(const QueryConstraints & qc,sqlite3_value ** argv,FilterHistory)419 int SpanJoinOperatorTable::Cursor::Filter(const QueryConstraints& qc,
420                                           sqlite3_value** argv,
421                                           FilterHistory) {
422   PERFETTO_TP_TRACE("SPAN_JOIN_XFILTER");
423 
424   util::Status status =
425       t1_.Initialize(qc, argv, Query::InitialEofBehavior::kTreatAsEof);
426   if (!status.ok())
427     return SQLITE_ERROR;
428 
429   status = t2_.Initialize(
430       qc, argv,
431       table_->IsLeftJoin()
432           ? Query::InitialEofBehavior::kTreatAsMissingPartitionShadow
433           : Query::InitialEofBehavior::kTreatAsEof);
434   if (!status.ok())
435     return SQLITE_ERROR;
436 
437   status = FindOverlappingSpan();
438   return status.ok() ? SQLITE_OK : SQLITE_ERROR;
439 }
440 
Next()441 int SpanJoinOperatorTable::Cursor::Next() {
442   util::Status status = next_query_->Next();
443   if (!status.ok())
444     return SQLITE_ERROR;
445 
446   status = FindOverlappingSpan();
447   return status.ok() ? SQLITE_OK : SQLITE_ERROR;
448 }
449 
IsOverlappingSpan()450 bool SpanJoinOperatorTable::Cursor::IsOverlappingSpan() {
451   // If either of the tables are eof, then we cannot possibly have an
452   // overlapping span.
453   if (t1_.IsEof() || t2_.IsEof())
454     return false;
455 
456   // One of the tables always needs to have a real span to have a valid
457   // overlapping span.
458   if (!t1_.IsReal() && !t2_.IsReal())
459     return false;
460 
461   if (table_->partitioning_ == PartitioningType::kSamePartitioning) {
462     // If both tables are partitioned, then ensure that the partitions overlap.
463     bool partition_in_bounds = (t1_.FirstPartition() >= t2_.FirstPartition() &&
464                                 t1_.FirstPartition() <= t2_.LastPartition()) ||
465                                (t2_.FirstPartition() >= t1_.FirstPartition() &&
466                                 t2_.FirstPartition() <= t1_.LastPartition());
467     if (!partition_in_bounds)
468       return false;
469   }
470 
471   // We consider all slices to be [start, end) - that is the range of
472   // timestamps has an open interval at the start but a closed interval
473   // at the end. (with the exception of dur == -1 which we treat as if
474   // end == start for the purpose of this function).
475   return (t1_.ts() == t2_.ts() && t1_.IsReal() && t2_.IsReal()) ||
476          (t1_.ts() >= t2_.ts() && t1_.ts() < t2_.AdjustedTsEnd()) ||
477          (t2_.ts() >= t1_.ts() && t2_.ts() < t1_.AdjustedTsEnd());
478 }
479 
FindOverlappingSpan()480 util::Status SpanJoinOperatorTable::Cursor::FindOverlappingSpan() {
481   // We loop until we find a slice which overlaps from the two tables.
482   while (true) {
483     if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
484       // If we have a mixed partition setup, we need to have special checks
485       // for eof and to reset the unpartitioned cursor every time the partition
486       // changes in the partitioned table.
487       auto* partitioned = t1_.definition()->IsPartitioned() ? &t1_ : &t2_;
488       auto* unpartitioned = t1_.definition()->IsPartitioned() ? &t2_ : &t1_;
489 
490       // If the partitioned table reaches eof, then we are really done.
491       if (partitioned->IsEof())
492         break;
493 
494       // If the partition has changed from the previous one, reset the cursor
495       // and keep a lot of the new partition.
496       if (last_mixed_partition_ != partitioned->partition()) {
497         util::Status status = unpartitioned->Rewind();
498         if (!status.ok())
499           return status;
500         last_mixed_partition_ = partitioned->partition();
501       }
502     } else if (t1_.IsEof() || t2_.IsEof()) {
503       // For both no partition and same partition cases, either cursor ending
504       // ends the whole span join.
505       break;
506     }
507 
508     // Find which slice finishes first.
509     next_query_ = FindEarliestFinishQuery();
510 
511     // If the current span is overlapping, just finish there to emit the current
512     // slice.
513     if (IsOverlappingSpan())
514       break;
515 
516     // Otherwise, step to the next row.
517     util::Status status = next_query_->Next();
518     if (!status.ok())
519       return status;
520   }
521   return util::OkStatus();
522 }
523 
524 SpanJoinOperatorTable::Query*
FindEarliestFinishQuery()525 SpanJoinOperatorTable::Cursor::FindEarliestFinishQuery() {
526   int64_t t1_part;
527   int64_t t2_part;
528 
529   switch (table_->partitioning_) {
530     case PartitioningType::kMixedPartitioning: {
531       // If either table is EOF, forward the other table to try and make
532       // the partitions not match anymore.
533       if (t1_.IsEof())
534         return &t2_;
535       if (t2_.IsEof())
536         return &t1_;
537 
538       // Otherwise, just make the partition equal from both tables.
539       t1_part = last_mixed_partition_;
540       t2_part = last_mixed_partition_;
541       break;
542     }
543     case PartitioningType::kSamePartitioning: {
544       // Get the partition values from the cursor.
545       t1_part = t1_.LastPartition();
546       t2_part = t2_.LastPartition();
547       break;
548     }
549     case PartitioningType::kNoPartitioning: {
550       t1_part = 0;
551       t2_part = 0;
552       break;
553     }
554   }
555 
556   // Prefer to forward the earliest cursors based on the following
557   // lexiographical ordering:
558   // 1. partition
559   // 2. end timestamp
560   // 3. whether the slice is real or shadow (shadow < real)
561   bool t1_less = std::make_tuple(t1_part, t1_.AdjustedTsEnd(), t1_.IsReal()) <
562                  std::make_tuple(t2_part, t2_.AdjustedTsEnd(), t2_.IsReal());
563   return t1_less ? &t1_ : &t2_;
564 }
565 
Eof()566 int SpanJoinOperatorTable::Cursor::Eof() {
567   return t1_.IsEof() || t2_.IsEof();
568 }
569 
Column(sqlite3_context * context,int N)570 int SpanJoinOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
571   PERFETTO_DCHECK(t1_.IsReal() || t2_.IsReal());
572 
573   switch (N) {
574     case Column::kTimestamp: {
575       auto max_ts = std::max(t1_.ts(), t2_.ts());
576       sqlite3_result_int64(context, static_cast<sqlite3_int64>(max_ts));
577       break;
578     }
579     case Column::kDuration: {
580       auto max_start = std::max(t1_.ts(), t2_.ts());
581       auto min_end = std::min(t1_.raw_ts_end(), t2_.raw_ts_end());
582       auto dur = min_end - max_start;
583       sqlite3_result_int64(context, static_cast<sqlite3_int64>(dur));
584       break;
585     }
586     case Column::kPartition: {
587       if (table_->partitioning_ != PartitioningType::kNoPartitioning) {
588         int64_t partition;
589         if (table_->partitioning_ == PartitioningType::kMixedPartitioning) {
590           partition = last_mixed_partition_;
591         } else {
592           partition = t1_.IsReal() ? t1_.partition() : t2_.partition();
593         }
594         sqlite3_result_int64(context, static_cast<sqlite3_int64>(partition));
595         break;
596       }
597       [[clang::fallthrough]];
598     }
599     default: {
600       size_t index = static_cast<size_t>(N);
601       const auto& locator = table_->global_index_to_column_locator_[index];
602       if (locator.defn == t1_.definition())
603         t1_.ReportSqliteResult(context, locator.col_index);
604       else
605         t2_.ReportSqliteResult(context, locator.col_index);
606     }
607   }
608   return SQLITE_OK;
609 }
610 
Query(SpanJoinOperatorTable * table,const TableDefinition * definition,sqlite3 * db)611 SpanJoinOperatorTable::Query::Query(SpanJoinOperatorTable* table,
612                                     const TableDefinition* definition,
613                                     sqlite3* db)
614     : defn_(definition), db_(db), table_(table) {
615   PERFETTO_DCHECK(!defn_->IsPartitioned() ||
616                   defn_->partition_idx() < defn_->columns().size());
617 }
618 
619 SpanJoinOperatorTable::Query::~Query() = default;
620 
Initialize(const QueryConstraints & qc,sqlite3_value ** argv,InitialEofBehavior eof_behavior)621 util::Status SpanJoinOperatorTable::Query::Initialize(
622     const QueryConstraints& qc,
623     sqlite3_value** argv,
624     InitialEofBehavior eof_behavior) {
625   *this = Query(table_, definition(), db_);
626   sql_query_ = CreateSqlQuery(
627       table_->ComputeSqlConstraintsForDefinition(*defn_, qc, argv));
628   util::Status status = Rewind();
629   if (!status.ok())
630     return status;
631   if (eof_behavior == InitialEofBehavior::kTreatAsMissingPartitionShadow &&
632       IsEof()) {
633     state_ = State::kMissingPartitionShadow;
634   }
635   return status;
636 }
637 
Next()638 util::Status SpanJoinOperatorTable::Query::Next() {
639   util::Status status = NextSliceState();
640   if (!status.ok())
641     return status;
642   return FindNextValidSlice();
643 }
644 
IsValidSlice()645 bool SpanJoinOperatorTable::Query::IsValidSlice() {
646   // Disallow any single partition shadow slices if the definition doesn't allow
647   // them.
648   if (IsPresentPartitionShadow() && !defn_->ShouldEmitPresentPartitionShadow())
649     return false;
650 
651   // Disallow any missing partition shadow slices if the definition doesn't
652   // allow them.
653   if (IsMissingPartitionShadow() && !defn_->ShouldEmitMissingPartitionShadow())
654     return false;
655 
656   // Disallow any "empty" shadows; these are shadows which either have the same
657   // start and end time or missing-partition shadows which have the same start
658   // and end partition.
659   if (IsEmptyShadow())
660     return false;
661 
662   return true;
663 }
664 
FindNextValidSlice()665 util::Status SpanJoinOperatorTable::Query::FindNextValidSlice() {
666   // The basic idea of this function is that |NextSliceState()| always emits
667   // all possible slices (including shadows for any gaps inbetween the real
668   // slices) and we filter out the invalid slices (as defined by the table
669   // definition) using |IsValidSlice()|.
670   //
671   // This has proved to be a lot cleaner to implement than trying to choose
672   // when to emit and not emit shadows directly.
673   while (!IsEof() && !IsValidSlice()) {
674     util::Status status = NextSliceState();
675     if (!status.ok())
676       return status;
677   }
678   return util::OkStatus();
679 }
680 
NextSliceState()681 util::Status SpanJoinOperatorTable::Query::NextSliceState() {
682   switch (state_) {
683     case State::kReal: {
684       // Forward the cursor to figure out where the next slice should be.
685       util::Status status = CursorNext();
686       if (!status.ok())
687         return status;
688 
689       // Depending on the next slice, we can do two things here:
690       // 1. If the next slice is on the same partition, we can just emit a
691       //    single shadow until the start of the next slice.
692       // 2. If the next slice is on another partition or we hit eof, just emit
693       //    a shadow to the end of the whole partition.
694       bool shadow_to_end = cursor_eof_ || (defn_->IsPartitioned() &&
695                                            partition_ != CursorPartition());
696       state_ = State::kPresentPartitionShadow;
697       ts_ = AdjustedTsEnd();
698       ts_end_ =
699           shadow_to_end ? std::numeric_limits<int64_t>::max() : CursorTs();
700       return util::OkStatus();
701     }
702     case State::kPresentPartitionShadow: {
703       if (ts_end_ == std::numeric_limits<int64_t>::max()) {
704         // If the shadow is to the end of the slice, create a missing partition
705         // shadow to the start of the partition of the next slice or to the max
706         // partition if we hit eof.
707         state_ = State::kMissingPartitionShadow;
708         ts_ = 0;
709         ts_end_ = std::numeric_limits<int64_t>::max();
710 
711         missing_partition_start_ = partition_ + 1;
712         missing_partition_end_ = cursor_eof_
713                                      ? std::numeric_limits<int64_t>::max()
714                                      : CursorPartition();
715       } else {
716         // If the shadow is not to the end, we must have another slice on the
717         // current partition.
718         state_ = State::kReal;
719         ts_ = CursorTs();
720         ts_end_ = ts_ + CursorDur();
721 
722         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
723                         partition_ == CursorPartition());
724       }
725       return util::OkStatus();
726     }
727     case State::kMissingPartitionShadow: {
728       if (missing_partition_end_ == std::numeric_limits<int64_t>::max()) {
729         PERFETTO_DCHECK(cursor_eof_);
730 
731         // If we have a missing partition to the max partition, we must have hit
732         // eof.
733         state_ = State::kEof;
734       } else {
735         PERFETTO_DCHECK(!defn_->IsPartitioned() ||
736                         CursorPartition() == missing_partition_end_);
737 
738         // Otherwise, setup a single partition slice on the end partition to the
739         // start of the next slice.
740         state_ = State::kPresentPartitionShadow;
741         ts_ = 0;
742         ts_end_ = CursorTs();
743         partition_ = missing_partition_end_;
744       }
745       return util::OkStatus();
746     }
747     case State::kEof: {
748       PERFETTO_DFATAL("Called Next when EOF");
749       return util::ErrStatus("Called Next when EOF");
750     }
751   }
752   PERFETTO_FATAL("For GCC");
753 }
754 
Rewind()755 util::Status SpanJoinOperatorTable::Query::Rewind() {
756   sqlite3_stmt* stmt = nullptr;
757   int res =
758       sqlite3_prepare_v2(db_, sql_query_.c_str(),
759                          static_cast<int>(sql_query_.size()), &stmt, nullptr);
760   stmt_.reset(stmt);
761 
762   cursor_eof_ = res != SQLITE_OK;
763   if (res != SQLITE_OK)
764     return util::ErrStatus("%s", sqlite3_errmsg(db_));
765 
766   util::Status status = CursorNext();
767   if (!status.ok())
768     return status;
769 
770   // Setup the first slice as a missing partition shadow from the lowest
771   // partition until the first slice partition. We will handle finding the real
772   // slice in |FindNextValidSlice()|.
773   state_ = State::kMissingPartitionShadow;
774   ts_ = 0;
775   ts_end_ = std::numeric_limits<int64_t>::max();
776   missing_partition_start_ = std::numeric_limits<int64_t>::min();
777 
778   if (cursor_eof_) {
779     missing_partition_end_ = std::numeric_limits<int64_t>::max();
780   } else if (defn_->IsPartitioned()) {
781     missing_partition_end_ = CursorPartition();
782   } else {
783     missing_partition_end_ = std::numeric_limits<int64_t>::min();
784   }
785 
786   // Actually compute the first valid slice.
787   return FindNextValidSlice();
788 }
789 
CursorNext()790 util::Status SpanJoinOperatorTable::Query::CursorNext() {
791   auto* stmt = stmt_.get();
792   int res;
793   if (defn_->IsPartitioned()) {
794     auto partition_idx = static_cast<int>(defn_->partition_idx());
795     // Fastforward through any rows with null partition keys.
796     int row_type;
797     do {
798       res = sqlite3_step(stmt);
799       row_type = sqlite3_column_type(stmt, partition_idx);
800     } while (res == SQLITE_ROW && row_type == SQLITE_NULL);
801   } else {
802     res = sqlite3_step(stmt);
803   }
804   cursor_eof_ = res != SQLITE_ROW;
805   return res == SQLITE_ROW || res == SQLITE_DONE
806              ? util::OkStatus()
807              : util::ErrStatus("%s", sqlite3_errmsg(db_));
808 }
809 
CreateSqlQuery(const std::vector<std::string> & cs) const810 std::string SpanJoinOperatorTable::Query::CreateSqlQuery(
811     const std::vector<std::string>& cs) const {
812   std::vector<std::string> col_names;
813   for (const SqliteTable::Column& c : defn_->columns()) {
814     col_names.push_back("`" + c.name() + "`");
815   }
816 
817   std::string sql = "SELECT " + base::Join(col_names, ", ");
818   sql += " FROM " + defn_->name();
819   if (!cs.empty()) {
820     sql += " WHERE " + base::Join(cs, " AND ");
821   }
822   sql += " ORDER BY ";
823   sql += defn_->IsPartitioned()
824              ? base::Join({"`" + defn_->partition_col() + "`", "ts"}, ", ")
825              : "ts";
826   sql += ";";
827   PERFETTO_DLOG("%s", sql.c_str());
828   return sql;
829 }
830 
ReportSqliteResult(sqlite3_context * context,size_t index)831 void SpanJoinOperatorTable::Query::ReportSqliteResult(sqlite3_context* context,
832                                                       size_t index) {
833   if (state_ != State::kReal) {
834     sqlite3_result_null(context);
835     return;
836   }
837 
838   sqlite3_stmt* stmt = stmt_.get();
839   int idx = static_cast<int>(index);
840   switch (sqlite3_column_type(stmt, idx)) {
841     case SQLITE_INTEGER:
842       sqlite3_result_int64(context, sqlite3_column_int64(stmt, idx));
843       break;
844     case SQLITE_FLOAT:
845       sqlite3_result_double(context, sqlite3_column_double(stmt, idx));
846       break;
847     case SQLITE_TEXT: {
848       // TODO(lalitm): note for future optimizations: if we knew the addresses
849       // of the string intern pool, we could check if the string returned here
850       // comes from the pool, and pass it as non-transient.
851       const auto kSqliteTransient =
852           reinterpret_cast<sqlite3_destructor_type>(-1);
853       auto ptr = reinterpret_cast<const char*>(sqlite3_column_text(stmt, idx));
854       sqlite3_result_text(context, ptr, -1, kSqliteTransient);
855       break;
856     }
857   }
858 }
859 
TableDefinition(std::string name,std::string partition_col,std::vector<SqliteTable::Column> cols,EmitShadowType emit_shadow_type,uint32_t ts_idx,uint32_t dur_idx,uint32_t partition_idx)860 SpanJoinOperatorTable::TableDefinition::TableDefinition(
861     std::string name,
862     std::string partition_col,
863     std::vector<SqliteTable::Column> cols,
864     EmitShadowType emit_shadow_type,
865     uint32_t ts_idx,
866     uint32_t dur_idx,
867     uint32_t partition_idx)
868     : emit_shadow_type_(emit_shadow_type),
869       name_(std::move(name)),
870       partition_col_(std::move(partition_col)),
871       cols_(std::move(cols)),
872       ts_idx_(ts_idx),
873       dur_idx_(dur_idx),
874       partition_idx_(partition_idx) {}
875 
Parse(const std::string & raw_descriptor,SpanJoinOperatorTable::TableDescriptor * descriptor)876 util::Status SpanJoinOperatorTable::TableDescriptor::Parse(
877     const std::string& raw_descriptor,
878     SpanJoinOperatorTable::TableDescriptor* descriptor) {
879   // Descriptors have one of the following forms:
880   // table_name [PARTITIONED column_name]
881 
882   // Find the table name.
883   base::StringSplitter splitter(raw_descriptor, ' ');
884   if (!splitter.Next())
885     return util::ErrStatus("SPAN_JOIN: Missing table name");
886 
887   descriptor->name = splitter.cur_token();
888   if (!splitter.Next())
889     return util::OkStatus();
890 
891   if (!base::CaseInsensitiveEqual(splitter.cur_token(), "PARTITIONED"))
892     return util::ErrStatus("SPAN_JOIN: Invalid token");
893 
894   if (!splitter.Next())
895     return util::ErrStatus("SPAN_JOIN: Missing partitioning column");
896 
897   descriptor->partition_col = splitter.cur_token();
898   return util::OkStatus();
899 }
900 
901 }  // namespace trace_processor
902 }  // namespace perfetto
903