1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file deals with deserialization and iteration of the proto-encoded
16// byte buffer that is returned by TraceProcessor when invoking the
17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized
18// for being moved cheaply across workers and decoded on-the-flight as we step
19// through the iterator.
20// See comments around QueryResult in trace_processor.proto for more details.
21
22// The classes in this file are organized as follows:
23//
24// QueryResultImpl:
25// The object returned by the Engine.query(sql) method.
26// This object is a holder of row data. Batches of raw get appended
27// incrementally as they are received by the remote TraceProcessor instance.
28// QueryResultImpl also deals with asynchronicity of queries and allows callers
29// to obtain a promise that waits for more (or all) rows.
30// At any point in time the following objects hold a reference to QueryResult:
31// - The Engine: for appending row batches.
32// - UI code, typically controllers, who make queries.
33//
34// ResultBatch:
35// Hold the data, returned by the remote TraceProcessor instance, for a number
36// of rows (TP typically chunks the results in batches of 128KB).
37// A QueryResultImpl holds exclusively ResultBatches for a given query.
38// ResultBatch is not exposed externally, it's just an internal representation
39// that helps with proto decoding. ResultBatch is immutable after it gets
40// appended and decoded. The iteration state is held by the RowIteratorImpl.
41//
42// RowIteratorImpl:
43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from
44// the iteration state. The iterator effectively is the union of a ResultBatch
45// and the row number in it. Rows within the batch are decoded as the user calls
46// next(). When getting at the end of the batch, it takes care of switching to
47// the next batch (if any) within the QueryResultImpl.
48// This object is part of the API exposed to tracks / controllers.
49
50// Import below commented out to prevent the protobufjs initialiation with:
51// `protobuf.util.Long = undefined as any;`
52// The Winscope parsers need the 64-bit proto fields to be retrieved as Long instead of number,
53// otherwise data (e.g. state flags) would be lost because of the 53-bit integer limitation.
54// import './static_initializers';
55import protobuf from 'protobufjs/minimal';
56import {defer, Deferred} from './deferred';
57import {assertExists, assertFalse, assertTrue} from './logging';
58import {utf8Decode} from './string_utils';
59
60export const NUM = 0;
61export const STR = 'str';
62export const NUM_NULL: number|null = 1;
63export const STR_NULL: string|null = 'str_null';
64export const BLOB = new Uint8Array();
65export const BLOB_NULL: Uint8Array|null = new Uint8Array();
66export const LONG: bigint = 0n;
67export const LONG_NULL: bigint|null = 1n;
68
69export type ColumnType = string|number|bigint|Uint8Array;
70export type SqlValue = ColumnType|null;
71
72const SHIFT_32BITS = 32n;
73
74// Fast decode varint int64 into a bigint
75// Inspired by
76// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123
77export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint {
78  let hi: number = 0;
79  let lo: number = 0;
80  let i = 0;
81
82  if (buf.length - pos > 4) {  // fast route (lo)
83    for (; i < 4; ++i) {
84      // 1st..4th
85      lo = (lo | (buf[pos] & 127) << i * 7) >>> 0;
86      if (buf[pos++] < 128) {
87        return BigInt(lo);
88      }
89    }
90    // 5th
91    lo = (lo | (buf[pos] & 127) << 28) >>> 0;
92    hi = (hi | (buf[pos] & 127) >> 4) >>> 0;
93    if (buf[pos++] < 128) {
94      return BigInt(hi) << SHIFT_32BITS | BigInt(lo);
95    }
96    i = 0;
97  } else {
98    for (; i < 3; ++i) {
99      if (pos >= buf.length) {
100        throw Error('Index out of range');
101      }
102      // 1st..3rd
103      lo = (lo | (buf[pos] & 127) << i * 7) >>> 0;
104      if (buf[pos++] < 128) {
105        return BigInt(lo);
106      }
107    }
108    // 4th
109    lo = (lo | (buf[pos++] & 127) << i * 7) >>> 0;
110    return BigInt(hi) << SHIFT_32BITS | BigInt(lo);
111  }
112  if (buf.length - pos > 4) {  // fast route (hi)
113    for (; i < 5; ++i) {
114      // 6th..10th
115      hi = (hi | (buf[pos] & 127) << i * 7 + 3) >>> 0;
116      if (buf[pos++] < 128) {
117        const big = BigInt(hi) << SHIFT_32BITS | BigInt(lo);
118        return BigInt.asIntN(64, big);
119      }
120    }
121  } else {
122    for (; i < 5; ++i) {
123      if (pos >= buf.length) {
124        throw Error('Index out of range');
125      }
126      // 6th..10th
127      hi = (hi | (buf[pos] & 127) << i * 7 + 3) >>> 0;
128      if (buf[pos++] < 128) {
129        const big = BigInt(hi) << SHIFT_32BITS | BigInt(lo);
130        return BigInt.asIntN(64, big);
131      }
132    }
133  }
134  throw Error('invalid varint encoding');
135}
136
137// Info that could help debug a query error. For example the query
138// in question, the stack where the query was issued, the active
139// plugin etc.
140export interface QueryErrorInfo {
141  query: string;
142}
143
144export class QueryError extends Error {
145  readonly query: string;
146
147  constructor(message: string, info: QueryErrorInfo) {
148    super(message);
149    this.query = info.query;
150  }
151
152  override toString() {
153    return `Query: ${this.query}\n` + super.toString();
154  }
155}
156
157// One row extracted from an SQL result:
158export interface Row {
159  [key: string]: ColumnType|null;
160}
161
162// The methods that any iterator has to implement.
163export interface RowIteratorBase {
164  valid(): boolean;
165  next(): void;
166
167  // Reflection support for cases where the column names are not known upfront
168  // (e.g. the query result table for user-provided SQL queries).
169  // It throws if the passed column name doesn't exist.
170  // Example usage:
171  // for (const it = queryResult.iter({}); it.valid(); it.next()) {
172  //   for (const columnName : queryResult.columns()) {
173  //      console.log(it.get(columnName));
174  get(columnName: string): ColumnType|null;
175}
176
177// A RowIterator is a type that has all the fields defined in the query spec
178// plus the valid() and next() operators. This is to ultimately allow the
179// clients to do:
180// const result = await engine.query("select name, surname, id from people;");
181// const iter = queryResult.iter({name: STR, surname: STR, id: NUM});
182// for (; iter.valid(); iter.next())
183//  console.log(iter.name, iter.surname);
184export type RowIterator<T extends Row> = RowIteratorBase&T;
185
186function columnTypeToString(t: ColumnType|null): string {
187  switch (t) {
188    case NUM:
189      return 'NUM';
190    case NUM_NULL:
191      return 'NUM_NULL';
192    case STR:
193      return 'STR';
194    case STR_NULL:
195      return 'STR_NULL';
196    case BLOB:
197      return 'BLOB';
198    case BLOB_NULL:
199      return 'BLOB_NULL';
200    case LONG:
201      return 'LONG';
202    case LONG_NULL:
203      return 'LONG_NULL';
204    default:
205      return `INVALID(${t})`;
206  }
207}
208
209function isCompatible(actual: CellType, expected: ColumnType|null): boolean {
210  switch (actual) {
211    case CellType.CELL_NULL:
212      return expected === NUM_NULL || expected === STR_NULL ||
213          expected === BLOB_NULL || expected === LONG_NULL;
214    case CellType.CELL_VARINT:
215      return expected === NUM || expected === NUM_NULL || expected === LONG ||
216          expected === LONG_NULL;
217    case CellType.CELL_FLOAT64:
218      return expected === NUM || expected === NUM_NULL;
219    case CellType.CELL_STRING:
220      return expected === STR || expected === STR_NULL;
221    case CellType.CELL_BLOB:
222      return expected === BLOB || expected === BLOB_NULL;
223    default:
224      throw new Error(`Unknown CellType ${actual}`);
225  }
226}
227
228// This has to match CellType in trace_processor.proto.
229enum CellType {
230  CELL_NULL = 1,
231  CELL_VARINT = 2,
232  CELL_FLOAT64 = 3,
233  CELL_STRING = 4,
234  CELL_BLOB = 5,
235}
236
237const CELL_TYPE_NAMES =
238    ['UNKNOWN', 'NULL', 'VARINT', 'FLOAT64', 'STRING', 'BLOB'];
239
240const TAG_LEN_DELIM = 2;
241
242// This is the interface exposed to readers (e.g. tracks). The underlying object
243// (QueryResultImpl) owns the result data. This allows to obtain iterators on
244// that. In future it will allow to wait for incremental updates (new rows being
245// fetched) for streaming queries.
246export interface QueryResult {
247  // Obtains an iterator.
248  // TODO(primiano): this should have an option to destruct data as we read. In
249  // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt)
250  // we don't want to accumulate everything in memory. OTOH UI tracks want to
251  // keep the data around so they can redraw them on each animation frame. For
252  // now we keep everything in memory in the QueryResultImpl object.
253  // iter<T extends Row>(spec: T): RowIterator<T>;
254  iter<T extends Row>(spec: T): RowIterator<T>;
255
256  // Like iter() for queries that expect only one row. It embeds the valid()
257  // check (i.e. throws if no rows are available) and returns directly the
258  // first result.
259  firstRow<T extends Row>(spec: T): T;
260
261  // If != undefined the query errored out and error() contains the message.
262  error(): string|undefined;
263
264  // Returns the number of rows accumulated so far. Note that this number can
265  // change over time as more batches are received. It becomes stable only
266  // when isComplete() returns true or after waitAllRows() is resolved.
267  numRows(): number;
268
269  // If true all rows have been fetched. Calling iter() will iterate through the
270  // last row. If false, iter() will return an iterator which might iterate
271  // through some rows (or none) but will surely not reach the end.
272
273  isComplete(): boolean;
274
275  // Returns a promise that is resolved only when all rows (i.e. all batches)
276  // have been fetched. The promise return value is always the object iself.
277  waitAllRows(): Promise<QueryResult>;
278
279  // Returns a promise that is resolved when either:
280  // - more rows are available
281  // - all rows are available
282  // The promise return value is always the object iself.
283  waitMoreRows(): Promise<QueryResult>;
284
285  // Can return an empty array if called before the first batch is resolved.
286  // This should be called only after having awaited for at least one batch.
287  columns(): string[];
288
289  // Returns the number of SQL statements in the query
290  // (e.g. 2 'if SELECT 1; SELECT 2;')
291  statementCount(): number;
292
293  // Returns the number of SQL statement that produced output rows. This number
294  // is <= statementCount().
295  statementWithOutputCount(): number;
296
297  // Returns the last SQL statement.
298  lastStatementSql(): string;
299}
300
301// Interface exposed to engine.ts to pump in the data as new row batches arrive.
302export interface WritableQueryResult extends QueryResult {
303  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
304  //  The overall flow looks as follows:
305  // - The user calls engine.query('select ...') and gets a QueryResult back.
306  // - The query call posts a message to the worker that runs the SQL engine (
307  //   or sends a HTTP request in case of the RPC+HTTP interface).
308  // - The returned QueryResult object is initially empty.
309  // - Over time, the sql engine will postMessage() back results in batches.
310  // - Each bach will end up calling this appendResultBatch() method.
311  // - If there is any pending promise (e.g. the caller called
312  //   queryResult.waitAllRows()), this call will awake them (if this is the
313  //   last batch).
314  appendResultBatch(resBytes: Uint8Array): void;
315}
316
317// The actual implementation, which bridges together the reader side and the
318// writer side (the one exposed to the Engine). This is the same object so that
319// when the engine pumps new row batches we can resolve pending promises that
320// readers (e.g. track code) are waiting for.
321class QueryResultImpl implements QueryResult, WritableQueryResult {
322  columnNames: string[] = [];
323  private _error?: string;
324  private _numRows = 0;
325  private _isComplete = false;
326  private _errorInfo: QueryErrorInfo;
327  private _statementCount = 0;
328  private _statementWithOutputCount = 0;
329  private _lastStatementSql = '';
330
331  constructor(errorInfo: QueryErrorInfo) {
332    this._errorInfo = errorInfo;
333  }
334
335  // --- QueryResult implementation.
336
337  // TODO(primiano): for the moment new batches are appended but old batches
338  // are never removed. This won't work with abnormally large result sets, as
339  // it will stash all rows in memory. We could switch to a model where the
340  // iterator is destructive and deletes batch objects once iterating past the
341  // end of each batch. If we do that, than we need to assign monotonic IDs to
342  // batches. Also if we do that, we should prevent creating more than one
343  // iterator for a QueryResult.
344  batches: ResultBatch[] = [];
345
346  // Promise awaiting on waitAllRows(). This should be resolved only when the
347  // last result batch has been been retrieved.
348  private allRowsPromise?: Deferred<QueryResult>;
349
350  // Promise awaiting on waitMoreRows(). This resolved when the next
351  // batch is appended via appendResultBatch.
352  private moreRowsPromise?: Deferred<QueryResult>;
353
354  isComplete(): boolean {
355    return this._isComplete;
356  }
357  numRows(): number {
358    return this._numRows;
359  }
360  error(): string|undefined {
361    return this._error;
362  }
363  columns(): string[] {
364    return this.columnNames;
365  }
366  statementCount(): number {
367    return this._statementCount;
368  }
369  statementWithOutputCount(): number {
370    return this._statementWithOutputCount;
371  }
372  lastStatementSql(): string {
373    return this._lastStatementSql;
374  }
375
376  iter<T extends Row>(spec: T): RowIterator<T> {
377    const impl = new RowIteratorImplWithRowData(spec, this);
378    return impl as {} as RowIterator<T>;
379  }
380
381  firstRow<T extends Row>(spec: T): T {
382    const impl = new RowIteratorImplWithRowData(spec, this);
383    assertTrue(impl.valid());
384    return impl as {} as RowIterator<T>as T;
385  }
386
387  // Can be called only once.
388  waitAllRows(): Promise<QueryResult> {
389    assertTrue(this.allRowsPromise === undefined);
390    this.allRowsPromise = defer<QueryResult>();
391    if (this._isComplete) {
392      this.resolveOrReject(this.allRowsPromise, this);
393    }
394    return this.allRowsPromise;
395  }
396
397  waitMoreRows(): Promise<QueryResult> {
398    if (this.moreRowsPromise !== undefined) {
399      return this.moreRowsPromise;
400    }
401
402    const moreRowsPromise = defer<QueryResult>();
403    if (this._isComplete) {
404      this.resolveOrReject(moreRowsPromise, this);
405    } else {
406      this.moreRowsPromise = moreRowsPromise;
407    }
408    return moreRowsPromise;
409  }
410
411  // --- WritableQueryResult implementation.
412
413  // Called by the engine when a new QueryResult is available. Note that a
414  // single Query() call can yield >1 QueryResult due to result batching
415  // if more than ~64K of data are returned, e.g. when returning O(M) rows.
416  // |resBytes| is a proto-encoded trace_processor.QueryResult message.
417  // It is fine to retain the resBytes without slicing a copy, because
418  // ProtoRingBuffer does the slice() for us (or passes through the buffer
419  // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case).
420  appendResultBatch(resBytes: Uint8Array) {
421    const reader = protobuf.Reader.create(resBytes);
422    assertTrue(reader.pos === 0);
423    const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0;
424    const columnNamesSet = new Set<string>();
425    while (reader.pos < reader.len) {
426      const tag = reader.uint32();
427      switch (tag >>> 3) {
428        case 1:  // column_names
429          // Only the first batch should contain the column names. If this fires
430          // something is going wrong in the handling of the batch stream.
431          assertTrue(columnNamesEmptyAtStartOfBatch);
432          const origColName = reader.string();
433          let colName = origColName;
434          // In some rare cases two columns can have the same name (b/194891824)
435          // e.g. `select 1 as x, 2 as x`. These queries don't happen in the
436          // UI code, but they can happen when the user types a query (e.g.
437          // with a join). The most practical thing we can do here is renaming
438          // the columns with a suffix. Keeping the same name will break when
439          // iterating, because column names become iterator object keys.
440          for (let i = 1; columnNamesSet.has(colName); ++i) {
441            colName = `${origColName}_${i}`;
442            assertTrue(i < 100);  // Give up at some point;
443          }
444          columnNamesSet.add(colName);
445          this.columnNames.push(colName);
446          break;
447        case 2:  // error
448          // The query has errored only if the |error| field is non-empty.
449          // In protos, we don't distinguish between non-present and empty.
450          // Make sure we don't propagate ambiguous empty strings to JS.
451          const err = reader.string();
452          this._error = (err !== undefined && err.length) ? err : undefined;
453          break;
454        case 3:  // batch
455          const batchLen = reader.uint32();
456          const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen);
457          reader.pos += batchLen;
458
459          // The ResultBatch ctor parses the CellsBatch submessage.
460          const parsedBatch = new ResultBatch(batchRaw);
461          this.batches.push(parsedBatch);
462          this._isComplete = parsedBatch.isLastBatch;
463
464          // In theory one could construct a valid proto serializing the column
465          // names after the cell batches. In practice the QueryResultSerializer
466          // doesn't do that so it's not worth complicating the code.
467          const numColumns = this.columnNames.length;
468          if (numColumns !== 0) {
469            assertTrue(parsedBatch.numCells % numColumns === 0);
470            this._numRows += parsedBatch.numCells / numColumns;
471          } else {
472            // numColumns == 0 is  plausible for queries like CREATE TABLE ... .
473            assertTrue(parsedBatch.numCells === 0);
474          }
475          break;
476
477        case 4:
478          this._statementCount = reader.uint32();
479          break;
480
481        case 5:
482          this._statementWithOutputCount = reader.uint32();
483          break;
484
485        case 6:
486          this._lastStatementSql = reader.string();
487          break;
488
489        default:
490          console.warn(`Unexpected QueryResult field ${tag >>> 3}`);
491          reader.skipType(tag & 7);
492          break;
493      }  // switch (tag)
494    }    // while (pos < end)
495
496    if (this.moreRowsPromise !== undefined) {
497      this.resolveOrReject(this.moreRowsPromise, this);
498      this.moreRowsPromise = undefined;
499    }
500
501    if (this._isComplete && this.allRowsPromise !== undefined) {
502      this.resolveOrReject(this.allRowsPromise, this);
503    }
504  }
505
506  ensureAllRowsPromise(): Promise<QueryResult> {
507    if (this.allRowsPromise === undefined) {
508      this.waitAllRows();  // Will populate |this.allRowsPromise|.
509    }
510    return assertExists(this.allRowsPromise);
511  }
512
513  private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) {
514    if (this._error === undefined) {
515      promise.resolve(arg);
516    } else {
517      promise.reject(new QueryError(this._error, this._errorInfo));
518    }
519  }
520}
521
522// This class holds onto a received result batch (a Uint8Array) and does some
523// partial parsing to tokenize the various cell groups. This parsing mainly
524// consists of identifying and caching the offsets of each cell group and
525// initializing the varint decoders. This half parsing is done to keep the
526// iterator's next() fast, without decoding everything into memory.
527// This is an internal implementation detail and is not exposed outside. The
528// RowIteratorImpl uses this class to iterate through batches (this class takes
529// care of iterating within a batch, RowIteratorImpl takes care of switching
530// batches when needed).
531// Note: at any point in time there can be more than one ResultIterator
532// referencing the same batch. The batch must be immutable.
533class ResultBatch {
534  readonly isLastBatch: boolean = false;
535  readonly batchBytes: Uint8Array;
536  readonly cellTypesOff: number = 0;
537  readonly cellTypesLen: number = 0;
538  readonly varintOff: number = 0;
539  readonly varintLen: number = 0;
540  readonly float64Cells = new Float64Array();
541  readonly blobCells: Uint8Array[] = [];
542  readonly stringCells: string[] = [];
543
544  // batchBytes is a trace_processor.QueryResult.CellsBatch proto.
545  constructor(batchBytes: Uint8Array) {
546    this.batchBytes = batchBytes;
547    const reader = protobuf.Reader.create(batchBytes);
548    assertTrue(reader.pos === 0);
549    const end = reader.len;
550
551    // Here we deconstruct the proto by hand. The CellsBatch is carefully
552    // designed to allow a very fast parsing from the TS side. We pack all cells
553    // of the same types together, so we can do only one call (per batch) to
554    // TextDecoder.decode(), we can overlay a memory-aligned typedarray for
555    // float values and can quickly tell and type-check the cell types.
556    // One row = N cells (we know the number upfront from the outer message).
557    // Each bach contains always an integer multiple of N cells (i.e. rows are
558    // never fragmented across different batches).
559    while (reader.pos < end) {
560      const tag = reader.uint32();
561      switch (tag >>> 3) {
562        case 1:  // cell types, a packed array containing one CellType per cell.
563          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
564          this.cellTypesLen = reader.uint32();
565          this.cellTypesOff = reader.pos;
566          reader.pos += this.cellTypesLen;
567          break;
568
569        case 2:  // varint_cells, a packed varint buffer.
570          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
571          const packLen = reader.uint32();
572          this.varintOff = reader.pos;
573          this.varintLen = packLen;
574          assertTrue(reader.buf === batchBytes);
575          assertTrue(
576              this.varintOff + this.varintLen <=
577              batchBytes.byteOffset + batchBytes.byteLength);
578          reader.pos += packLen;
579          break;
580
581        case 3:  // float64_cells, a 64-bit aligned packed fixed64 buffer.
582          assertTrue((tag & 7) === TAG_LEN_DELIM);  // Must be packed varint.
583          const f64Len = reader.uint32();
584          assertTrue(f64Len % 8 === 0);
585          // Float64Array's constructor is evil: the offset is in bytes but the
586          // length is in 8-byte words.
587          const f64Words = f64Len / 8;
588          const f64Off = batchBytes.byteOffset + reader.pos;
589          if (f64Off % 8 === 0) {
590            this.float64Cells =
591                new Float64Array(batchBytes.buffer, f64Off, f64Words);
592          } else {
593            // When using the production code in trace_processor's rpc.cc, the
594            // float64 should be 8-bytes aligned. The slow-path case is only for
595            // tests.
596            const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len);
597            this.float64Cells = new Float64Array(slice);
598          }
599          reader.pos += f64Len;
600          break;
601
602        case 4:  // blob_cells: one entry per blob.
603          assertTrue((tag & 7) === TAG_LEN_DELIM);
604          // protobufjs's bytes() under the hoods calls slice() and creates
605          // a copy. Fine here as blobs are rare and not a fastpath.
606          this.blobCells.push(new Uint8Array(reader.bytes()));
607          break;
608
609        case 5:  // string_cells: all the string cells concatenated with \0s.
610          assertTrue((tag & 7) === TAG_LEN_DELIM);
611          const strLen = reader.uint32();
612          assertTrue(reader.pos + strLen <= end);
613          const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen);
614          assertTrue(subArr.length === strLen);
615          // The reason why we do this split rather than creating one string
616          // per entry is that utf8 decoding has some non-negligible cost. See
617          // go/postmessage-benchmark .
618          this.stringCells = utf8Decode(subArr).split('\0');
619          reader.pos += strLen;
620          break;
621
622        case 6:  // is_last_batch (boolean).
623          this.isLastBatch = !!reader.bool();
624          break;
625
626        case 7:  // padding for realignment, skip silently.
627          reader.skipType(tag & 7);
628          break;
629
630        default:
631          console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`);
632          reader.skipType(tag & 7);
633          break;
634      }  // switch(tag)
635    }    // while (pos < end)
636  }
637
638  get numCells() {
639    return this.cellTypesLen;
640  }
641}
642
643class RowIteratorImpl implements RowIteratorBase {
644  // The spec passed to the iter call containing the expected types, e.g.:
645  // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}.
646  // This doesn't ever change.
647  readonly rowSpec: Row;
648
649  // The object that holds the current row. This points to the parent
650  // RowIteratorImplWithRowData instance that created this class.
651  rowData: Row;
652
653  // The QueryResult object we are reading data from. The engine will pump
654  // batches over time into this object.
655  private resultObj: QueryResultImpl;
656
657  // All the member variables in the group below point to the identically-named
658  // members in result.batch[batchIdx]. This is to avoid indirection layers in
659  // the next() hotpath, so we can do this.float64Cells vs
660  // this.resultObj.batch[this.batchIdx].float64Cells.
661  // These are re-set every time tryMoveToNextBatch() is called (and succeeds).
662  private batchIdx = -1;  // The batch index within |result.batches[]|.
663  private batchBytes = new Uint8Array();
664  private columnNames: string[] = [];
665  private numColumns = 0;
666  private cellTypesEnd = -1;  // -1 so the 1st next() hits tryMoveToNextBatch().
667  private float64Cells = new Float64Array();
668  private varIntReader = protobuf.Reader.create(this.batchBytes);
669  private blobCells: Uint8Array[] = [];
670  private stringCells: string[] = [];
671
672  // These members instead are incremented as we read cells from next(). They
673  // are the mutable state of the iterator.
674  private nextCellTypeOff = 0;
675  private nextFloat64Cell = 0;
676  private nextStringCell = 0;
677  private nextBlobCell = 0;
678  private isValid = false;
679
680  constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) {
681    Object.assign(this, querySpec);
682    this.rowData = rowData;
683    this.rowSpec = {...querySpec};  // ... -> Copy all the key/value pairs.
684    this.resultObj = res;
685    this.next();
686  }
687
688  valid(): boolean {
689    return this.isValid;
690  }
691
692
693  get(columnName: string): ColumnType|null {
694    const res = this.rowData[columnName];
695    if (res === undefined) {
696      throw new Error(
697          `Column '${columnName}' doesn't exist. ` +
698          `Actual columns: [${this.columnNames.join(',')}]`);
699    }
700    return res;
701  }
702
703  // Moves the cursor next by one row and updates |isValid|.
704  // When this fails to move, two cases are possible:
705  // 1. We reached the end of the result set (this is the case if
706  //    QueryResult.isComplete() == true when this fails).
707  // 2. We reached the end of the current batch, but more rows might come later
708  //    (if QueryResult.isComplete() == false).
709  next() {
710    // At some point we might reach the end of the current batch, but the next
711    // batch might be available already. In this case we want next() to
712    // transparently move on to the next batch.
713    while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) {
714      // If TraceProcessor is behaving well, we should never end up in a
715      // situation where we have leftover cells. TP is expected to serialize
716      // whole rows in each QueryResult batch and NOT truncate them midway.
717      // If this assert fires the TP RPC logic has a bug.
718      assertTrue(
719          this.nextCellTypeOff === this.cellTypesEnd ||
720          this.cellTypesEnd === -1);
721      if (!this.tryMoveToNextBatch()) {
722        this.isValid = false;
723        return;
724      }
725    }
726
727    const rowData = this.rowData;
728    const numColumns = this.numColumns;
729
730    // Read the current row.
731    for (let i = 0; i < numColumns; i++) {
732      const cellType = this.batchBytes[this.nextCellTypeOff++];
733      const colName = this.columnNames[i];
734      const expType = this.rowSpec[colName];
735
736      switch (cellType) {
737        case CellType.CELL_NULL:
738          rowData[colName] = null;
739          break;
740
741        case CellType.CELL_VARINT:
742          if (expType === NUM || expType === NUM_NULL) {
743            // This is very subtle. The return type of int64 can be either a
744            // number or a Long.js {high:number, low:number} if Long.js is
745            // installed. The default state seems different in node and browser.
746            // We force-disable Long.js support in the top of this source file.
747            const val = this.varIntReader.int64();
748            rowData[colName] = val as {} as number;
749          } else {
750            // LONG, LONG_NULL, or unspecified - return as bigint
751            const value =
752                decodeInt64Varint(this.batchBytes, this.varIntReader.pos);
753            rowData[colName] = value;
754            this.varIntReader.skip();  // Skips a varint
755          }
756          break;
757
758        case CellType.CELL_FLOAT64:
759          rowData[colName] = this.float64Cells[this.nextFloat64Cell++];
760          break;
761
762        case CellType.CELL_STRING:
763          rowData[colName] = this.stringCells[this.nextStringCell++];
764          break;
765
766        case CellType.CELL_BLOB:
767          const blob = this.blobCells[this.nextBlobCell++];
768          rowData[colName] = blob;
769          break;
770
771        default:
772          throw new Error(`Invalid cell type ${cellType}`);
773      }
774    }  // For (cells)
775    this.isValid = true;
776  }
777
778  private tryMoveToNextBatch(): boolean {
779    const nextBatchIdx = this.batchIdx + 1;
780    if (nextBatchIdx >= this.resultObj.batches.length) {
781      return false;
782    }
783
784    this.columnNames = this.resultObj.columnNames;
785    this.numColumns = this.columnNames.length;
786
787    this.batchIdx = nextBatchIdx;
788    const batch = assertExists(this.resultObj.batches[nextBatchIdx]);
789    this.batchBytes = batch.batchBytes;
790    this.nextCellTypeOff = batch.cellTypesOff;
791    this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen;
792    this.float64Cells = batch.float64Cells;
793    this.blobCells = batch.blobCells;
794    this.stringCells = batch.stringCells;
795    this.varIntReader = protobuf.Reader.create(batch.batchBytes);
796    this.varIntReader.pos = batch.varintOff;
797    this.varIntReader.len = batch.varintOff + batch.varintLen;
798    this.nextFloat64Cell = 0;
799    this.nextStringCell = 0;
800    this.nextBlobCell = 0;
801
802    // Check that all the expected columns are present.
803    for (const expectedCol of Object.keys(this.rowSpec)) {
804      if (this.columnNames.indexOf(expectedCol) < 0) {
805        throw new Error(
806            `Column ${expectedCol} not found in the SQL result ` +
807            `set {${this.columnNames.join(' ')}}`);
808      }
809    }
810
811    // Check that the cells types are consistent.
812    const numColumns = this.numColumns;
813    if (batch.numCells === 0) {
814      // This can happen if the query result contains just an error. In this
815      // an empty batch with isLastBatch=true is appended as an EOF marker.
816      // In theory TraceProcessor could return an empty batch in the middle and
817      // that would be fine from a protocol viewpoint. In practice, no code path
818      // does that today so it doesn't make sense trying supporting it with a
819      // recursive call to tryMoveToNextBatch().
820      assertTrue(batch.isLastBatch);
821      return false;
822    }
823
824    assertTrue(numColumns > 0);
825    for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) {
826      const col = (i - this.nextCellTypeOff) % numColumns;
827      const colName = this.columnNames[col];
828      const actualType = this.batchBytes[i] as CellType;
829      const expType = this.rowSpec[colName];
830
831      // If undefined, the caller doesn't want to read this column at all, so
832      // it can be whatever.
833      if (expType === undefined) continue;
834
835      let err = '';
836      if (!isCompatible(actualType, expType)) {
837        if (actualType === CellType.CELL_NULL) {
838          err = 'SQL value is NULL but that was not expected' +
839              ` (expected type: ${columnTypeToString(expType)}). ` +
840              'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?';
841        } else {
842          err = `Incompatible cell type. Expected: ${
843              columnTypeToString(
844                  expType)} actual: ${CELL_TYPE_NAMES[actualType]}`;
845        }
846      }
847      if (err.length > 0) {
848        throw new Error(
849            `Error @ row: ${Math.floor(i / numColumns)} col: '` +
850            `${colName}': ${err}`);
851      }
852    }
853    return true;
854  }
855}
856
857// This is the object ultimately returned to the client when calling
858// QueryResult.iter(...).
859// The only reason why this is disjoint from RowIteratorImpl is to avoid
860// naming collisions between the members variables required by RowIteratorImpl
861// and the column names returned by the iterator.
862class RowIteratorImplWithRowData implements RowIteratorBase {
863  private _impl: RowIteratorImpl;
864
865  next: () => void;
866  valid: () => boolean;
867  get: (columnName: string) => ColumnType|null;
868
869  constructor(querySpec: Row, res: QueryResultImpl) {
870    const thisAsRow = this as {} as Row;
871    Object.assign(thisAsRow, querySpec);
872    this._impl = new RowIteratorImpl(querySpec, thisAsRow, res);
873    this.next = this._impl.next.bind(this._impl);
874    this.valid = this._impl.valid.bind(this._impl);
875    this.get = this._impl.get.bind(this._impl);
876  }
877}
878
879// This is a proxy object that wraps QueryResultImpl, adding await-ability.
880// This is so that:
881// 1. Clients that just want to await for the full result set can just call
882//    await engine.query('...') and will get a QueryResult that is guaranteed
883//    to be complete.
884// 2. Clients that know how to handle the streaming can use it straight away.
885class WaitableQueryResultImpl implements QueryResult, WritableQueryResult,
886                                         PromiseLike<QueryResult> {
887  private impl: QueryResultImpl;
888  private thenCalled = false;
889
890  constructor(errorInfo: QueryErrorInfo) {
891    this.impl = new QueryResultImpl(errorInfo);
892  }
893
894  // QueryResult implementation. Proxies all calls to the impl object.
895  iter<T extends Row>(spec: T) {
896    return this.impl.iter(spec);
897  }
898  firstRow<T extends Row>(spec: T) {
899    return this.impl.firstRow(spec);
900  }
901  waitAllRows() {
902    return this.impl.waitAllRows();
903  }
904  waitMoreRows() {
905    return this.impl.waitMoreRows();
906  }
907  isComplete() {
908    return this.impl.isComplete();
909  }
910  numRows() {
911    return this.impl.numRows();
912  }
913  columns() {
914    return this.impl.columns();
915  }
916  error() {
917    return this.impl.error();
918  }
919  statementCount() {
920    return this.impl.statementCount();
921  }
922  statementWithOutputCount() {
923    return this.impl.statementWithOutputCount();
924  }
925  lastStatementSql() {
926    return this.impl.lastStatementSql();
927  }
928
929  // WritableQueryResult implementation.
930  appendResultBatch(resBytes: Uint8Array) {
931    return this.impl.appendResultBatch(resBytes);
932  }
933
934  // PromiseLike<QueryResult> implementaton.
935
936  then(onfulfilled: any, onrejected: any): any {
937    assertFalse(this.thenCalled);
938    this.thenCalled = true;
939    return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected);
940  }
941
942  catch(error: any): any {
943    return this.impl.ensureAllRowsPromise().catch(error);
944  }
945
946  finally(callback: () => void): any {
947    return this.impl.ensureAllRowsPromise().finally(callback);
948  }
949
950  // eslint and clang-format disagree on how to format get[foo](). Let
951  // clang-format win:
952  // eslint-disable-next-line keyword-spacing
953  get[Symbol.toStringTag](): string {
954    return 'Promise<WaitableQueryResult>';
955  }
956}
957
958export function createQueryResult(errorInfo: QueryErrorInfo): QueryResult&
959    Promise<QueryResult>&WritableQueryResult {
960  return new WaitableQueryResultImpl(errorInfo);
961}
962