1// Copyright (C) 2021 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// This file deals with deserialization and iteration of the proto-encoded 16// byte buffer that is returned by TraceProcessor when invoking the 17// TPM_QUERY_STREAMING method. The returned |query_result| buffer is optimized 18// for being moved cheaply across workers and decoded on-the-flight as we step 19// through the iterator. 20// See comments around QueryResult in trace_processor.proto for more details. 21 22// The classes in this file are organized as follows: 23// 24// QueryResultImpl: 25// The object returned by the Engine.query(sql) method. 26// This object is a holder of row data. Batches of raw get appended 27// incrementally as they are received by the remote TraceProcessor instance. 28// QueryResultImpl also deals with asynchronicity of queries and allows callers 29// to obtain a promise that waits for more (or all) rows. 30// At any point in time the following objects hold a reference to QueryResult: 31// - The Engine: for appending row batches. 32// - UI code, typically controllers, who make queries. 33// 34// ResultBatch: 35// Hold the data, returned by the remote TraceProcessor instance, for a number 36// of rows (TP typically chunks the results in batches of 128KB). 37// A QueryResultImpl holds exclusively ResultBatches for a given query. 38// ResultBatch is not exposed externally, it's just an internal representation 39// that helps with proto decoding. ResultBatch is immutable after it gets 40// appended and decoded. The iteration state is held by the RowIteratorImpl. 41// 42// RowIteratorImpl: 43// Decouples the data owned by QueryResultImpl (and its ResultBatch(es)) from 44// the iteration state. The iterator effectively is the union of a ResultBatch 45// and the row number in it. Rows within the batch are decoded as the user calls 46// next(). When getting at the end of the batch, it takes care of switching to 47// the next batch (if any) within the QueryResultImpl. 48// This object is part of the API exposed to tracks / controllers. 49 50// Import below commented out to prevent the protobufjs initialiation with: 51// `protobuf.util.Long = undefined as any;` 52// The Winscope parsers need the 64-bit proto fields to be retrieved as Long instead of number, 53// otherwise data (e.g. state flags) would be lost because of the 53-bit integer limitation. 54// import './static_initializers'; 55import protobuf from 'protobufjs/minimal'; 56import {defer, Deferred} from './deferred'; 57import {assertExists, assertFalse, assertTrue} from './logging'; 58import {utf8Decode} from './string_utils'; 59 60export const NUM = 0; 61export const STR = 'str'; 62export const NUM_NULL: number|null = 1; 63export const STR_NULL: string|null = 'str_null'; 64export const BLOB = new Uint8Array(); 65export const BLOB_NULL: Uint8Array|null = new Uint8Array(); 66export const LONG: bigint = 0n; 67export const LONG_NULL: bigint|null = 1n; 68 69export type ColumnType = string|number|bigint|Uint8Array; 70export type SqlValue = ColumnType|null; 71 72const SHIFT_32BITS = 32n; 73 74// Fast decode varint int64 into a bigint 75// Inspired by 76// https://github.com/protobufjs/protobuf.js/blob/56b1e64979dae757b67a21d326e16acee39f2267/src/reader.js#L123 77export function decodeInt64Varint(buf: Uint8Array, pos: number): bigint { 78 let hi: number = 0; 79 let lo: number = 0; 80 let i = 0; 81 82 if (buf.length - pos > 4) { // fast route (lo) 83 for (; i < 4; ++i) { 84 // 1st..4th 85 lo = (lo | (buf[pos] & 127) << i * 7) >>> 0; 86 if (buf[pos++] < 128) { 87 return BigInt(lo); 88 } 89 } 90 // 5th 91 lo = (lo | (buf[pos] & 127) << 28) >>> 0; 92 hi = (hi | (buf[pos] & 127) >> 4) >>> 0; 93 if (buf[pos++] < 128) { 94 return BigInt(hi) << SHIFT_32BITS | BigInt(lo); 95 } 96 i = 0; 97 } else { 98 for (; i < 3; ++i) { 99 if (pos >= buf.length) { 100 throw Error('Index out of range'); 101 } 102 // 1st..3rd 103 lo = (lo | (buf[pos] & 127) << i * 7) >>> 0; 104 if (buf[pos++] < 128) { 105 return BigInt(lo); 106 } 107 } 108 // 4th 109 lo = (lo | (buf[pos++] & 127) << i * 7) >>> 0; 110 return BigInt(hi) << SHIFT_32BITS | BigInt(lo); 111 } 112 if (buf.length - pos > 4) { // fast route (hi) 113 for (; i < 5; ++i) { 114 // 6th..10th 115 hi = (hi | (buf[pos] & 127) << i * 7 + 3) >>> 0; 116 if (buf[pos++] < 128) { 117 const big = BigInt(hi) << SHIFT_32BITS | BigInt(lo); 118 return BigInt.asIntN(64, big); 119 } 120 } 121 } else { 122 for (; i < 5; ++i) { 123 if (pos >= buf.length) { 124 throw Error('Index out of range'); 125 } 126 // 6th..10th 127 hi = (hi | (buf[pos] & 127) << i * 7 + 3) >>> 0; 128 if (buf[pos++] < 128) { 129 const big = BigInt(hi) << SHIFT_32BITS | BigInt(lo); 130 return BigInt.asIntN(64, big); 131 } 132 } 133 } 134 throw Error('invalid varint encoding'); 135} 136 137// Info that could help debug a query error. For example the query 138// in question, the stack where the query was issued, the active 139// plugin etc. 140export interface QueryErrorInfo { 141 query: string; 142} 143 144export class QueryError extends Error { 145 readonly query: string; 146 147 constructor(message: string, info: QueryErrorInfo) { 148 super(message); 149 this.query = info.query; 150 } 151 152 override toString() { 153 return `Query: ${this.query}\n` + super.toString(); 154 } 155} 156 157// One row extracted from an SQL result: 158export interface Row { 159 [key: string]: ColumnType|null; 160} 161 162// The methods that any iterator has to implement. 163export interface RowIteratorBase { 164 valid(): boolean; 165 next(): void; 166 167 // Reflection support for cases where the column names are not known upfront 168 // (e.g. the query result table for user-provided SQL queries). 169 // It throws if the passed column name doesn't exist. 170 // Example usage: 171 // for (const it = queryResult.iter({}); it.valid(); it.next()) { 172 // for (const columnName : queryResult.columns()) { 173 // console.log(it.get(columnName)); 174 get(columnName: string): ColumnType|null; 175} 176 177// A RowIterator is a type that has all the fields defined in the query spec 178// plus the valid() and next() operators. This is to ultimately allow the 179// clients to do: 180// const result = await engine.query("select name, surname, id from people;"); 181// const iter = queryResult.iter({name: STR, surname: STR, id: NUM}); 182// for (; iter.valid(); iter.next()) 183// console.log(iter.name, iter.surname); 184export type RowIterator<T extends Row> = RowIteratorBase&T; 185 186function columnTypeToString(t: ColumnType|null): string { 187 switch (t) { 188 case NUM: 189 return 'NUM'; 190 case NUM_NULL: 191 return 'NUM_NULL'; 192 case STR: 193 return 'STR'; 194 case STR_NULL: 195 return 'STR_NULL'; 196 case BLOB: 197 return 'BLOB'; 198 case BLOB_NULL: 199 return 'BLOB_NULL'; 200 case LONG: 201 return 'LONG'; 202 case LONG_NULL: 203 return 'LONG_NULL'; 204 default: 205 return `INVALID(${t})`; 206 } 207} 208 209function isCompatible(actual: CellType, expected: ColumnType|null): boolean { 210 switch (actual) { 211 case CellType.CELL_NULL: 212 return expected === NUM_NULL || expected === STR_NULL || 213 expected === BLOB_NULL || expected === LONG_NULL; 214 case CellType.CELL_VARINT: 215 return expected === NUM || expected === NUM_NULL || expected === LONG || 216 expected === LONG_NULL; 217 case CellType.CELL_FLOAT64: 218 return expected === NUM || expected === NUM_NULL; 219 case CellType.CELL_STRING: 220 return expected === STR || expected === STR_NULL; 221 case CellType.CELL_BLOB: 222 return expected === BLOB || expected === BLOB_NULL; 223 default: 224 throw new Error(`Unknown CellType ${actual}`); 225 } 226} 227 228// This has to match CellType in trace_processor.proto. 229enum CellType { 230 CELL_NULL = 1, 231 CELL_VARINT = 2, 232 CELL_FLOAT64 = 3, 233 CELL_STRING = 4, 234 CELL_BLOB = 5, 235} 236 237const CELL_TYPE_NAMES = 238 ['UNKNOWN', 'NULL', 'VARINT', 'FLOAT64', 'STRING', 'BLOB']; 239 240const TAG_LEN_DELIM = 2; 241 242// This is the interface exposed to readers (e.g. tracks). The underlying object 243// (QueryResultImpl) owns the result data. This allows to obtain iterators on 244// that. In future it will allow to wait for incremental updates (new rows being 245// fetched) for streaming queries. 246export interface QueryResult { 247 // Obtains an iterator. 248 // TODO(primiano): this should have an option to destruct data as we read. In 249 // the case of a long query (e.g. `SELECT * FROM sched` in the query prompt) 250 // we don't want to accumulate everything in memory. OTOH UI tracks want to 251 // keep the data around so they can redraw them on each animation frame. For 252 // now we keep everything in memory in the QueryResultImpl object. 253 // iter<T extends Row>(spec: T): RowIterator<T>; 254 iter<T extends Row>(spec: T): RowIterator<T>; 255 256 // Like iter() for queries that expect only one row. It embeds the valid() 257 // check (i.e. throws if no rows are available) and returns directly the 258 // first result. 259 firstRow<T extends Row>(spec: T): T; 260 261 // If != undefined the query errored out and error() contains the message. 262 error(): string|undefined; 263 264 // Returns the number of rows accumulated so far. Note that this number can 265 // change over time as more batches are received. It becomes stable only 266 // when isComplete() returns true or after waitAllRows() is resolved. 267 numRows(): number; 268 269 // If true all rows have been fetched. Calling iter() will iterate through the 270 // last row. If false, iter() will return an iterator which might iterate 271 // through some rows (or none) but will surely not reach the end. 272 273 isComplete(): boolean; 274 275 // Returns a promise that is resolved only when all rows (i.e. all batches) 276 // have been fetched. The promise return value is always the object iself. 277 waitAllRows(): Promise<QueryResult>; 278 279 // Returns a promise that is resolved when either: 280 // - more rows are available 281 // - all rows are available 282 // The promise return value is always the object iself. 283 waitMoreRows(): Promise<QueryResult>; 284 285 // Can return an empty array if called before the first batch is resolved. 286 // This should be called only after having awaited for at least one batch. 287 columns(): string[]; 288 289 // Returns the number of SQL statements in the query 290 // (e.g. 2 'if SELECT 1; SELECT 2;') 291 statementCount(): number; 292 293 // Returns the number of SQL statement that produced output rows. This number 294 // is <= statementCount(). 295 statementWithOutputCount(): number; 296 297 // Returns the last SQL statement. 298 lastStatementSql(): string; 299} 300 301// Interface exposed to engine.ts to pump in the data as new row batches arrive. 302export interface WritableQueryResult extends QueryResult { 303 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 304 // The overall flow looks as follows: 305 // - The user calls engine.query('select ...') and gets a QueryResult back. 306 // - The query call posts a message to the worker that runs the SQL engine ( 307 // or sends a HTTP request in case of the RPC+HTTP interface). 308 // - The returned QueryResult object is initially empty. 309 // - Over time, the sql engine will postMessage() back results in batches. 310 // - Each bach will end up calling this appendResultBatch() method. 311 // - If there is any pending promise (e.g. the caller called 312 // queryResult.waitAllRows()), this call will awake them (if this is the 313 // last batch). 314 appendResultBatch(resBytes: Uint8Array): void; 315} 316 317// The actual implementation, which bridges together the reader side and the 318// writer side (the one exposed to the Engine). This is the same object so that 319// when the engine pumps new row batches we can resolve pending promises that 320// readers (e.g. track code) are waiting for. 321class QueryResultImpl implements QueryResult, WritableQueryResult { 322 columnNames: string[] = []; 323 private _error?: string; 324 private _numRows = 0; 325 private _isComplete = false; 326 private _errorInfo: QueryErrorInfo; 327 private _statementCount = 0; 328 private _statementWithOutputCount = 0; 329 private _lastStatementSql = ''; 330 331 constructor(errorInfo: QueryErrorInfo) { 332 this._errorInfo = errorInfo; 333 } 334 335 // --- QueryResult implementation. 336 337 // TODO(primiano): for the moment new batches are appended but old batches 338 // are never removed. This won't work with abnormally large result sets, as 339 // it will stash all rows in memory. We could switch to a model where the 340 // iterator is destructive and deletes batch objects once iterating past the 341 // end of each batch. If we do that, than we need to assign monotonic IDs to 342 // batches. Also if we do that, we should prevent creating more than one 343 // iterator for a QueryResult. 344 batches: ResultBatch[] = []; 345 346 // Promise awaiting on waitAllRows(). This should be resolved only when the 347 // last result batch has been been retrieved. 348 private allRowsPromise?: Deferred<QueryResult>; 349 350 // Promise awaiting on waitMoreRows(). This resolved when the next 351 // batch is appended via appendResultBatch. 352 private moreRowsPromise?: Deferred<QueryResult>; 353 354 isComplete(): boolean { 355 return this._isComplete; 356 } 357 numRows(): number { 358 return this._numRows; 359 } 360 error(): string|undefined { 361 return this._error; 362 } 363 columns(): string[] { 364 return this.columnNames; 365 } 366 statementCount(): number { 367 return this._statementCount; 368 } 369 statementWithOutputCount(): number { 370 return this._statementWithOutputCount; 371 } 372 lastStatementSql(): string { 373 return this._lastStatementSql; 374 } 375 376 iter<T extends Row>(spec: T): RowIterator<T> { 377 const impl = new RowIteratorImplWithRowData(spec, this); 378 return impl as {} as RowIterator<T>; 379 } 380 381 firstRow<T extends Row>(spec: T): T { 382 const impl = new RowIteratorImplWithRowData(spec, this); 383 assertTrue(impl.valid()); 384 return impl as {} as RowIterator<T>as T; 385 } 386 387 // Can be called only once. 388 waitAllRows(): Promise<QueryResult> { 389 assertTrue(this.allRowsPromise === undefined); 390 this.allRowsPromise = defer<QueryResult>(); 391 if (this._isComplete) { 392 this.resolveOrReject(this.allRowsPromise, this); 393 } 394 return this.allRowsPromise; 395 } 396 397 waitMoreRows(): Promise<QueryResult> { 398 if (this.moreRowsPromise !== undefined) { 399 return this.moreRowsPromise; 400 } 401 402 const moreRowsPromise = defer<QueryResult>(); 403 if (this._isComplete) { 404 this.resolveOrReject(moreRowsPromise, this); 405 } else { 406 this.moreRowsPromise = moreRowsPromise; 407 } 408 return moreRowsPromise; 409 } 410 411 // --- WritableQueryResult implementation. 412 413 // Called by the engine when a new QueryResult is available. Note that a 414 // single Query() call can yield >1 QueryResult due to result batching 415 // if more than ~64K of data are returned, e.g. when returning O(M) rows. 416 // |resBytes| is a proto-encoded trace_processor.QueryResult message. 417 // It is fine to retain the resBytes without slicing a copy, because 418 // ProtoRingBuffer does the slice() for us (or passes through the buffer 419 // coming from postMessage() (Wasm case) of fetch() (HTTP+RPC case). 420 appendResultBatch(resBytes: Uint8Array) { 421 const reader = protobuf.Reader.create(resBytes); 422 assertTrue(reader.pos === 0); 423 const columnNamesEmptyAtStartOfBatch = this.columnNames.length === 0; 424 const columnNamesSet = new Set<string>(); 425 while (reader.pos < reader.len) { 426 const tag = reader.uint32(); 427 switch (tag >>> 3) { 428 case 1: // column_names 429 // Only the first batch should contain the column names. If this fires 430 // something is going wrong in the handling of the batch stream. 431 assertTrue(columnNamesEmptyAtStartOfBatch); 432 const origColName = reader.string(); 433 let colName = origColName; 434 // In some rare cases two columns can have the same name (b/194891824) 435 // e.g. `select 1 as x, 2 as x`. These queries don't happen in the 436 // UI code, but they can happen when the user types a query (e.g. 437 // with a join). The most practical thing we can do here is renaming 438 // the columns with a suffix. Keeping the same name will break when 439 // iterating, because column names become iterator object keys. 440 for (let i = 1; columnNamesSet.has(colName); ++i) { 441 colName = `${origColName}_${i}`; 442 assertTrue(i < 100); // Give up at some point; 443 } 444 columnNamesSet.add(colName); 445 this.columnNames.push(colName); 446 break; 447 case 2: // error 448 // The query has errored only if the |error| field is non-empty. 449 // In protos, we don't distinguish between non-present and empty. 450 // Make sure we don't propagate ambiguous empty strings to JS. 451 const err = reader.string(); 452 this._error = (err !== undefined && err.length) ? err : undefined; 453 break; 454 case 3: // batch 455 const batchLen = reader.uint32(); 456 const batchRaw = resBytes.subarray(reader.pos, reader.pos + batchLen); 457 reader.pos += batchLen; 458 459 // The ResultBatch ctor parses the CellsBatch submessage. 460 const parsedBatch = new ResultBatch(batchRaw); 461 this.batches.push(parsedBatch); 462 this._isComplete = parsedBatch.isLastBatch; 463 464 // In theory one could construct a valid proto serializing the column 465 // names after the cell batches. In practice the QueryResultSerializer 466 // doesn't do that so it's not worth complicating the code. 467 const numColumns = this.columnNames.length; 468 if (numColumns !== 0) { 469 assertTrue(parsedBatch.numCells % numColumns === 0); 470 this._numRows += parsedBatch.numCells / numColumns; 471 } else { 472 // numColumns == 0 is plausible for queries like CREATE TABLE ... . 473 assertTrue(parsedBatch.numCells === 0); 474 } 475 break; 476 477 case 4: 478 this._statementCount = reader.uint32(); 479 break; 480 481 case 5: 482 this._statementWithOutputCount = reader.uint32(); 483 break; 484 485 case 6: 486 this._lastStatementSql = reader.string(); 487 break; 488 489 default: 490 console.warn(`Unexpected QueryResult field ${tag >>> 3}`); 491 reader.skipType(tag & 7); 492 break; 493 } // switch (tag) 494 } // while (pos < end) 495 496 if (this.moreRowsPromise !== undefined) { 497 this.resolveOrReject(this.moreRowsPromise, this); 498 this.moreRowsPromise = undefined; 499 } 500 501 if (this._isComplete && this.allRowsPromise !== undefined) { 502 this.resolveOrReject(this.allRowsPromise, this); 503 } 504 } 505 506 ensureAllRowsPromise(): Promise<QueryResult> { 507 if (this.allRowsPromise === undefined) { 508 this.waitAllRows(); // Will populate |this.allRowsPromise|. 509 } 510 return assertExists(this.allRowsPromise); 511 } 512 513 private resolveOrReject(promise: Deferred<QueryResult>, arg: QueryResult) { 514 if (this._error === undefined) { 515 promise.resolve(arg); 516 } else { 517 promise.reject(new QueryError(this._error, this._errorInfo)); 518 } 519 } 520} 521 522// This class holds onto a received result batch (a Uint8Array) and does some 523// partial parsing to tokenize the various cell groups. This parsing mainly 524// consists of identifying and caching the offsets of each cell group and 525// initializing the varint decoders. This half parsing is done to keep the 526// iterator's next() fast, without decoding everything into memory. 527// This is an internal implementation detail and is not exposed outside. The 528// RowIteratorImpl uses this class to iterate through batches (this class takes 529// care of iterating within a batch, RowIteratorImpl takes care of switching 530// batches when needed). 531// Note: at any point in time there can be more than one ResultIterator 532// referencing the same batch. The batch must be immutable. 533class ResultBatch { 534 readonly isLastBatch: boolean = false; 535 readonly batchBytes: Uint8Array; 536 readonly cellTypesOff: number = 0; 537 readonly cellTypesLen: number = 0; 538 readonly varintOff: number = 0; 539 readonly varintLen: number = 0; 540 readonly float64Cells = new Float64Array(); 541 readonly blobCells: Uint8Array[] = []; 542 readonly stringCells: string[] = []; 543 544 // batchBytes is a trace_processor.QueryResult.CellsBatch proto. 545 constructor(batchBytes: Uint8Array) { 546 this.batchBytes = batchBytes; 547 const reader = protobuf.Reader.create(batchBytes); 548 assertTrue(reader.pos === 0); 549 const end = reader.len; 550 551 // Here we deconstruct the proto by hand. The CellsBatch is carefully 552 // designed to allow a very fast parsing from the TS side. We pack all cells 553 // of the same types together, so we can do only one call (per batch) to 554 // TextDecoder.decode(), we can overlay a memory-aligned typedarray for 555 // float values and can quickly tell and type-check the cell types. 556 // One row = N cells (we know the number upfront from the outer message). 557 // Each bach contains always an integer multiple of N cells (i.e. rows are 558 // never fragmented across different batches). 559 while (reader.pos < end) { 560 const tag = reader.uint32(); 561 switch (tag >>> 3) { 562 case 1: // cell types, a packed array containing one CellType per cell. 563 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 564 this.cellTypesLen = reader.uint32(); 565 this.cellTypesOff = reader.pos; 566 reader.pos += this.cellTypesLen; 567 break; 568 569 case 2: // varint_cells, a packed varint buffer. 570 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 571 const packLen = reader.uint32(); 572 this.varintOff = reader.pos; 573 this.varintLen = packLen; 574 assertTrue(reader.buf === batchBytes); 575 assertTrue( 576 this.varintOff + this.varintLen <= 577 batchBytes.byteOffset + batchBytes.byteLength); 578 reader.pos += packLen; 579 break; 580 581 case 3: // float64_cells, a 64-bit aligned packed fixed64 buffer. 582 assertTrue((tag & 7) === TAG_LEN_DELIM); // Must be packed varint. 583 const f64Len = reader.uint32(); 584 assertTrue(f64Len % 8 === 0); 585 // Float64Array's constructor is evil: the offset is in bytes but the 586 // length is in 8-byte words. 587 const f64Words = f64Len / 8; 588 const f64Off = batchBytes.byteOffset + reader.pos; 589 if (f64Off % 8 === 0) { 590 this.float64Cells = 591 new Float64Array(batchBytes.buffer, f64Off, f64Words); 592 } else { 593 // When using the production code in trace_processor's rpc.cc, the 594 // float64 should be 8-bytes aligned. The slow-path case is only for 595 // tests. 596 const slice = batchBytes.buffer.slice(f64Off, f64Off + f64Len); 597 this.float64Cells = new Float64Array(slice); 598 } 599 reader.pos += f64Len; 600 break; 601 602 case 4: // blob_cells: one entry per blob. 603 assertTrue((tag & 7) === TAG_LEN_DELIM); 604 // protobufjs's bytes() under the hoods calls slice() and creates 605 // a copy. Fine here as blobs are rare and not a fastpath. 606 this.blobCells.push(new Uint8Array(reader.bytes())); 607 break; 608 609 case 5: // string_cells: all the string cells concatenated with \0s. 610 assertTrue((tag & 7) === TAG_LEN_DELIM); 611 const strLen = reader.uint32(); 612 assertTrue(reader.pos + strLen <= end); 613 const subArr = batchBytes.subarray(reader.pos, reader.pos + strLen); 614 assertTrue(subArr.length === strLen); 615 // The reason why we do this split rather than creating one string 616 // per entry is that utf8 decoding has some non-negligible cost. See 617 // go/postmessage-benchmark . 618 this.stringCells = utf8Decode(subArr).split('\0'); 619 reader.pos += strLen; 620 break; 621 622 case 6: // is_last_batch (boolean). 623 this.isLastBatch = !!reader.bool(); 624 break; 625 626 case 7: // padding for realignment, skip silently. 627 reader.skipType(tag & 7); 628 break; 629 630 default: 631 console.warn(`Unexpected QueryResult.CellsBatch field ${tag >>> 3}`); 632 reader.skipType(tag & 7); 633 break; 634 } // switch(tag) 635 } // while (pos < end) 636 } 637 638 get numCells() { 639 return this.cellTypesLen; 640 } 641} 642 643class RowIteratorImpl implements RowIteratorBase { 644 // The spec passed to the iter call containing the expected types, e.g.: 645 // {'colA': NUM, 'colB': NUM_NULL, 'colC': STRING}. 646 // This doesn't ever change. 647 readonly rowSpec: Row; 648 649 // The object that holds the current row. This points to the parent 650 // RowIteratorImplWithRowData instance that created this class. 651 rowData: Row; 652 653 // The QueryResult object we are reading data from. The engine will pump 654 // batches over time into this object. 655 private resultObj: QueryResultImpl; 656 657 // All the member variables in the group below point to the identically-named 658 // members in result.batch[batchIdx]. This is to avoid indirection layers in 659 // the next() hotpath, so we can do this.float64Cells vs 660 // this.resultObj.batch[this.batchIdx].float64Cells. 661 // These are re-set every time tryMoveToNextBatch() is called (and succeeds). 662 private batchIdx = -1; // The batch index within |result.batches[]|. 663 private batchBytes = new Uint8Array(); 664 private columnNames: string[] = []; 665 private numColumns = 0; 666 private cellTypesEnd = -1; // -1 so the 1st next() hits tryMoveToNextBatch(). 667 private float64Cells = new Float64Array(); 668 private varIntReader = protobuf.Reader.create(this.batchBytes); 669 private blobCells: Uint8Array[] = []; 670 private stringCells: string[] = []; 671 672 // These members instead are incremented as we read cells from next(). They 673 // are the mutable state of the iterator. 674 private nextCellTypeOff = 0; 675 private nextFloat64Cell = 0; 676 private nextStringCell = 0; 677 private nextBlobCell = 0; 678 private isValid = false; 679 680 constructor(querySpec: Row, rowData: Row, res: QueryResultImpl) { 681 Object.assign(this, querySpec); 682 this.rowData = rowData; 683 this.rowSpec = {...querySpec}; // ... -> Copy all the key/value pairs. 684 this.resultObj = res; 685 this.next(); 686 } 687 688 valid(): boolean { 689 return this.isValid; 690 } 691 692 693 get(columnName: string): ColumnType|null { 694 const res = this.rowData[columnName]; 695 if (res === undefined) { 696 throw new Error( 697 `Column '${columnName}' doesn't exist. ` + 698 `Actual columns: [${this.columnNames.join(',')}]`); 699 } 700 return res; 701 } 702 703 // Moves the cursor next by one row and updates |isValid|. 704 // When this fails to move, two cases are possible: 705 // 1. We reached the end of the result set (this is the case if 706 // QueryResult.isComplete() == true when this fails). 707 // 2. We reached the end of the current batch, but more rows might come later 708 // (if QueryResult.isComplete() == false). 709 next() { 710 // At some point we might reach the end of the current batch, but the next 711 // batch might be available already. In this case we want next() to 712 // transparently move on to the next batch. 713 while (this.nextCellTypeOff + this.numColumns > this.cellTypesEnd) { 714 // If TraceProcessor is behaving well, we should never end up in a 715 // situation where we have leftover cells. TP is expected to serialize 716 // whole rows in each QueryResult batch and NOT truncate them midway. 717 // If this assert fires the TP RPC logic has a bug. 718 assertTrue( 719 this.nextCellTypeOff === this.cellTypesEnd || 720 this.cellTypesEnd === -1); 721 if (!this.tryMoveToNextBatch()) { 722 this.isValid = false; 723 return; 724 } 725 } 726 727 const rowData = this.rowData; 728 const numColumns = this.numColumns; 729 730 // Read the current row. 731 for (let i = 0; i < numColumns; i++) { 732 const cellType = this.batchBytes[this.nextCellTypeOff++]; 733 const colName = this.columnNames[i]; 734 const expType = this.rowSpec[colName]; 735 736 switch (cellType) { 737 case CellType.CELL_NULL: 738 rowData[colName] = null; 739 break; 740 741 case CellType.CELL_VARINT: 742 if (expType === NUM || expType === NUM_NULL) { 743 // This is very subtle. The return type of int64 can be either a 744 // number or a Long.js {high:number, low:number} if Long.js is 745 // installed. The default state seems different in node and browser. 746 // We force-disable Long.js support in the top of this source file. 747 const val = this.varIntReader.int64(); 748 rowData[colName] = val as {} as number; 749 } else { 750 // LONG, LONG_NULL, or unspecified - return as bigint 751 const value = 752 decodeInt64Varint(this.batchBytes, this.varIntReader.pos); 753 rowData[colName] = value; 754 this.varIntReader.skip(); // Skips a varint 755 } 756 break; 757 758 case CellType.CELL_FLOAT64: 759 rowData[colName] = this.float64Cells[this.nextFloat64Cell++]; 760 break; 761 762 case CellType.CELL_STRING: 763 rowData[colName] = this.stringCells[this.nextStringCell++]; 764 break; 765 766 case CellType.CELL_BLOB: 767 const blob = this.blobCells[this.nextBlobCell++]; 768 rowData[colName] = blob; 769 break; 770 771 default: 772 throw new Error(`Invalid cell type ${cellType}`); 773 } 774 } // For (cells) 775 this.isValid = true; 776 } 777 778 private tryMoveToNextBatch(): boolean { 779 const nextBatchIdx = this.batchIdx + 1; 780 if (nextBatchIdx >= this.resultObj.batches.length) { 781 return false; 782 } 783 784 this.columnNames = this.resultObj.columnNames; 785 this.numColumns = this.columnNames.length; 786 787 this.batchIdx = nextBatchIdx; 788 const batch = assertExists(this.resultObj.batches[nextBatchIdx]); 789 this.batchBytes = batch.batchBytes; 790 this.nextCellTypeOff = batch.cellTypesOff; 791 this.cellTypesEnd = batch.cellTypesOff + batch.cellTypesLen; 792 this.float64Cells = batch.float64Cells; 793 this.blobCells = batch.blobCells; 794 this.stringCells = batch.stringCells; 795 this.varIntReader = protobuf.Reader.create(batch.batchBytes); 796 this.varIntReader.pos = batch.varintOff; 797 this.varIntReader.len = batch.varintOff + batch.varintLen; 798 this.nextFloat64Cell = 0; 799 this.nextStringCell = 0; 800 this.nextBlobCell = 0; 801 802 // Check that all the expected columns are present. 803 for (const expectedCol of Object.keys(this.rowSpec)) { 804 if (this.columnNames.indexOf(expectedCol) < 0) { 805 throw new Error( 806 `Column ${expectedCol} not found in the SQL result ` + 807 `set {${this.columnNames.join(' ')}}`); 808 } 809 } 810 811 // Check that the cells types are consistent. 812 const numColumns = this.numColumns; 813 if (batch.numCells === 0) { 814 // This can happen if the query result contains just an error. In this 815 // an empty batch with isLastBatch=true is appended as an EOF marker. 816 // In theory TraceProcessor could return an empty batch in the middle and 817 // that would be fine from a protocol viewpoint. In practice, no code path 818 // does that today so it doesn't make sense trying supporting it with a 819 // recursive call to tryMoveToNextBatch(). 820 assertTrue(batch.isLastBatch); 821 return false; 822 } 823 824 assertTrue(numColumns > 0); 825 for (let i = this.nextCellTypeOff; i < this.cellTypesEnd; i++) { 826 const col = (i - this.nextCellTypeOff) % numColumns; 827 const colName = this.columnNames[col]; 828 const actualType = this.batchBytes[i] as CellType; 829 const expType = this.rowSpec[colName]; 830 831 // If undefined, the caller doesn't want to read this column at all, so 832 // it can be whatever. 833 if (expType === undefined) continue; 834 835 let err = ''; 836 if (!isCompatible(actualType, expType)) { 837 if (actualType === CellType.CELL_NULL) { 838 err = 'SQL value is NULL but that was not expected' + 839 ` (expected type: ${columnTypeToString(expType)}). ` + 840 'Did you mean NUM_NULL, LONG_NULL, STR_NULL or BLOB_NULL?'; 841 } else { 842 err = `Incompatible cell type. Expected: ${ 843 columnTypeToString( 844 expType)} actual: ${CELL_TYPE_NAMES[actualType]}`; 845 } 846 } 847 if (err.length > 0) { 848 throw new Error( 849 `Error @ row: ${Math.floor(i / numColumns)} col: '` + 850 `${colName}': ${err}`); 851 } 852 } 853 return true; 854 } 855} 856 857// This is the object ultimately returned to the client when calling 858// QueryResult.iter(...). 859// The only reason why this is disjoint from RowIteratorImpl is to avoid 860// naming collisions between the members variables required by RowIteratorImpl 861// and the column names returned by the iterator. 862class RowIteratorImplWithRowData implements RowIteratorBase { 863 private _impl: RowIteratorImpl; 864 865 next: () => void; 866 valid: () => boolean; 867 get: (columnName: string) => ColumnType|null; 868 869 constructor(querySpec: Row, res: QueryResultImpl) { 870 const thisAsRow = this as {} as Row; 871 Object.assign(thisAsRow, querySpec); 872 this._impl = new RowIteratorImpl(querySpec, thisAsRow, res); 873 this.next = this._impl.next.bind(this._impl); 874 this.valid = this._impl.valid.bind(this._impl); 875 this.get = this._impl.get.bind(this._impl); 876 } 877} 878 879// This is a proxy object that wraps QueryResultImpl, adding await-ability. 880// This is so that: 881// 1. Clients that just want to await for the full result set can just call 882// await engine.query('...') and will get a QueryResult that is guaranteed 883// to be complete. 884// 2. Clients that know how to handle the streaming can use it straight away. 885class WaitableQueryResultImpl implements QueryResult, WritableQueryResult, 886 PromiseLike<QueryResult> { 887 private impl: QueryResultImpl; 888 private thenCalled = false; 889 890 constructor(errorInfo: QueryErrorInfo) { 891 this.impl = new QueryResultImpl(errorInfo); 892 } 893 894 // QueryResult implementation. Proxies all calls to the impl object. 895 iter<T extends Row>(spec: T) { 896 return this.impl.iter(spec); 897 } 898 firstRow<T extends Row>(spec: T) { 899 return this.impl.firstRow(spec); 900 } 901 waitAllRows() { 902 return this.impl.waitAllRows(); 903 } 904 waitMoreRows() { 905 return this.impl.waitMoreRows(); 906 } 907 isComplete() { 908 return this.impl.isComplete(); 909 } 910 numRows() { 911 return this.impl.numRows(); 912 } 913 columns() { 914 return this.impl.columns(); 915 } 916 error() { 917 return this.impl.error(); 918 } 919 statementCount() { 920 return this.impl.statementCount(); 921 } 922 statementWithOutputCount() { 923 return this.impl.statementWithOutputCount(); 924 } 925 lastStatementSql() { 926 return this.impl.lastStatementSql(); 927 } 928 929 // WritableQueryResult implementation. 930 appendResultBatch(resBytes: Uint8Array) { 931 return this.impl.appendResultBatch(resBytes); 932 } 933 934 // PromiseLike<QueryResult> implementaton. 935 936 then(onfulfilled: any, onrejected: any): any { 937 assertFalse(this.thenCalled); 938 this.thenCalled = true; 939 return this.impl.ensureAllRowsPromise().then(onfulfilled, onrejected); 940 } 941 942 catch(error: any): any { 943 return this.impl.ensureAllRowsPromise().catch(error); 944 } 945 946 finally(callback: () => void): any { 947 return this.impl.ensureAllRowsPromise().finally(callback); 948 } 949 950 // eslint and clang-format disagree on how to format get[foo](). Let 951 // clang-format win: 952 // eslint-disable-next-line keyword-spacing 953 get[Symbol.toStringTag](): string { 954 return 'Promise<WaitableQueryResult>'; 955 } 956} 957 958export function createQueryResult(errorInfo: QueryErrorInfo): QueryResult& 959 Promise<QueryResult>&WritableQueryResult { 960 return new WaitableQueryResultImpl(errorInfo); 961} 962