1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {defer, Deferred} from './deferred'; 16import {assertExists, assertTrue} from './logging'; 17import {perfetto} from '../../deps_build/trace_processor/ui/tsc/gen/protos'; 18import {ProtoRingBuffer} from './proto_ring_buffer'; 19import { 20 ComputeMetricArgs, 21 ComputeMetricResult, 22 DisableAndReadMetatraceResult, 23 QueryArgs, 24 ResetTraceProcessorArgs, 25} from './protos'; 26import { 27 createQueryResult, 28 NUM, 29 QueryError, 30 QueryResult, 31 WritableQueryResult, 32} from './query_result'; 33import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc; 34import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream; 35import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod; 36 37export interface LoadingTracker { 38 beginLoading(): void; 39 endLoading(): void; 40} 41 42export class NullLoadingTracker implements LoadingTracker { 43 beginLoading(): void {} 44 endLoading(): void {} 45} 46 47 48// This is used to skip the decoding of queryResult from protobufjs and deal 49// with it ourselves. See the comment below around `QueryResult.decode = ...`. 50interface QueryResultBypass { 51 rawQueryResult: Uint8Array; 52} 53 54export interface TraceProcessorConfig { 55 cropTrackEvents: boolean; 56 ingestFtraceInRawTable: boolean; 57 analyzeTraceProtoContent: boolean; 58} 59 60// Abstract interface of a trace proccessor. 61// This is the TypeScript equivalent of src/trace_processor/rpc.h. 62// There are two concrete implementations: 63// 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage(). 64// 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`. 65// and interacts via fetch(). 66// In both cases, we have a byte-oriented pipe to interact with TraceProcessor. 67// The derived class is only expected to deal with these two functions: 68// 1. Implement the abstract rpcSendRequestBytes() function, sending the 69// proto-encoded TraceProcessorRpc requests to the TraceProcessor instance. 70// 2. Call onRpcResponseBytes() when response data is received. 71export abstract class Engine { 72 abstract readonly id: string; 73 private _cpus?: number[]; 74 private _numGpus?: number; 75 private loadingTracker: LoadingTracker; 76 private txSeqId = 0; 77 private rxSeqId = 0; 78 private rxBuf = new ProtoRingBuffer(); 79 private pendingParses = new Array<Deferred<void>>(); 80 private pendingEOFs = new Array<Deferred<void>>(); 81 private pendingResetTraceProcessors = new Array<Deferred<void>>(); 82 private pendingQueries = new Array<WritableQueryResult>(); 83 private pendingRestoreTables = new Array<Deferred<void>>(); 84 private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>(); 85 private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>; 86 private _isMetatracingEnabled = false; 87 88 constructor(tracker?: LoadingTracker) { 89 this.loadingTracker = tracker ? tracker : new NullLoadingTracker(); 90 } 91 92 // Called to send data to the TraceProcessor instance. This turns into a 93 // postMessage() or a HTTP request, depending on the Engine implementation. 94 abstract rpcSendRequestBytes(data: Uint8Array): void; 95 96 // Called when an inbound message is received by the Engine implementation 97 // (e.g. onmessage for the Wasm case, on when HTTP replies are received for 98 // the HTTP+RPC case). 99 onRpcResponseBytes(dataWillBeRetained: Uint8Array) { 100 // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer 101 // is returned back by readMessage() (% subarray()-ing it) and held onto by 102 // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine 103 // because every response creates a new buffer. 104 this.rxBuf.append(dataWillBeRetained); 105 for (;;) { 106 const msg = this.rxBuf.readMessage(); 107 if (msg === undefined) break; 108 this.onRpcResponseMessage(msg); 109 } 110 } 111 112 // Parses a response message. 113 // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc 114 // proto-encoded message (without the proto preamble and varint size). 115 private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) { 116 // Here we override the protobufjs-generated code to skip the parsing of the 117 // new streaming QueryResult and instead passing it through like a buffer. 118 // This is the overall problem: All trace processor responses are wrapped 119 // into a perfetto.protos.TraceProcessorRpc proto message. In all cases % 120 // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and 121 // give us a structured object. In the case of TPM_QUERY_STREAMING, instead, 122 // we want to deal with the proto parsing ourselves using the new 123 // QueryResult.appendResultBatch() method, because that handled streaming 124 // results more efficiently and skips several copies. 125 // By overriding the decode method below, we achieve two things: 126 // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field. 127 // 2. We stash (a view of) the original buffer into the |rawQueryResult| so 128 // the `case TPM_QUERY_STREAMING` below can take it. 129 perfetto.protos.QueryResult.decode = 130 (reader: protobuf.Reader, length: number) => { 131 const res = 132 perfetto.protos.QueryResult.create() as {} as QueryResultBypass; 133 res.rawQueryResult = 134 reader.buf.subarray(reader.pos, reader.pos + length); 135 // All this works only if protobufjs returns the original ArrayBuffer 136 // from |rpcMsgEncoded|. It should be always the case given the 137 // current implementation. This check mainly guards against future 138 // behavioral changes of protobufjs. We don't want to accidentally 139 // hold onto some internal protobufjs buffer. We are fine holding 140 // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which 141 // is buffer-retention-friendly. 142 assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer); 143 reader.pos += length; 144 return res as {} as perfetto.protos.QueryResult; 145 }; 146 147 const rpc = TraceProcessorRpc.decode(rpcMsgEncoded); 148 149 if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) { 150 throw new Error(`${rpc.fatalError}`); 151 } 152 153 // Allow restarting sequences from zero (when reloading the browser). 154 if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) { 155 // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more 156 // graceful and actionable error. 157 throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${ 158 this.rxSeqId} (ERR:rpc_seq)`); 159 } 160 161 this.rxSeqId = rpc.seq; 162 163 let isFinalResponse = true; 164 165 switch (rpc.response) { 166 case TPM.TPM_APPEND_TRACE_DATA: 167 const appendResult = assertExists(rpc.appendResult); 168 const pendingPromise = assertExists(this.pendingParses.shift()); 169 if (appendResult.error && appendResult.error.length > 0) { 170 pendingPromise.reject(appendResult.error); 171 } else { 172 pendingPromise.resolve(); 173 } 174 break; 175 case TPM.TPM_FINALIZE_TRACE_DATA: 176 assertExists(this.pendingEOFs.shift()).resolve(); 177 break; 178 case TPM.TPM_RESET_TRACE_PROCESSOR: 179 assertExists(this.pendingResetTraceProcessors.shift()).resolve(); 180 break; 181 case TPM.TPM_RESTORE_INITIAL_TABLES: 182 assertExists(this.pendingRestoreTables.shift()).resolve(); 183 break; 184 case TPM.TPM_QUERY_STREAMING: 185 const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass; 186 const pendingQuery = assertExists(this.pendingQueries[0]); 187 pendingQuery.appendResultBatch(qRes.rawQueryResult); 188 if (pendingQuery.isComplete()) { 189 this.pendingQueries.shift(); 190 } else { 191 isFinalResponse = false; 192 } 193 break; 194 case TPM.TPM_COMPUTE_METRIC: 195 const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult; 196 const pendingComputeMetric = 197 assertExists(this.pendingComputeMetrics.shift()); 198 if (metricRes.error && metricRes.error.length > 0) { 199 const error = 200 new QueryError(`ComputeMetric() error: ${metricRes.error}`, { 201 query: 'COMPUTE_METRIC', 202 }); 203 pendingComputeMetric.reject(error); 204 } else { 205 pendingComputeMetric.resolve(metricRes); 206 } 207 break; 208 case TPM.TPM_DISABLE_AND_READ_METATRACE: 209 const metatraceRes = 210 assertExists(rpc.metatrace) as DisableAndReadMetatraceResult; 211 assertExists(this.pendingReadMetatrace).resolve(metatraceRes); 212 this.pendingReadMetatrace = undefined; 213 break; 214 default: 215 console.log( 216 'Unexpected TraceProcessor response received: ', rpc.response); 217 break; 218 } // switch(rpc.response); 219 220 if (isFinalResponse) { 221 this.loadingTracker.endLoading(); 222 } 223 } 224 225 // TraceProcessor methods below this point. 226 // The methods below are called by the various controllers in the UI and 227 // deal with marshalling / unmarshaling requests to/from TraceProcessor. 228 229 230 // Push trace data into the engine. The engine is supposed to automatically 231 // figure out the type of the trace (JSON vs Protobuf). 232 parse(data: Uint8Array): Promise<void> { 233 const asyncRes = defer<void>(); 234 this.pendingParses.push(asyncRes); 235 const rpc = TraceProcessorRpc.create(); 236 rpc.request = TPM.TPM_APPEND_TRACE_DATA; 237 rpc.appendTraceData = data; 238 this.rpcSendRequest(rpc); 239 return asyncRes; // Linearize with the worker. 240 } 241 242 // Notify the engine that we reached the end of the trace. 243 // Called after the last parse() call. 244 notifyEof(): Promise<void> { 245 const asyncRes = defer<void>(); 246 this.pendingEOFs.push(asyncRes); 247 const rpc = TraceProcessorRpc.create(); 248 rpc.request = TPM.TPM_FINALIZE_TRACE_DATA; 249 this.rpcSendRequest(rpc); 250 return asyncRes; // Linearize with the worker. 251 } 252 253 // Updates the TraceProcessor Config. This method creates a new 254 // TraceProcessor instance, so it should be called before passing any trace 255 // data. 256 resetTraceProcessor( 257 {cropTrackEvents, ingestFtraceInRawTable, analyzeTraceProtoContent}: 258 TraceProcessorConfig): Promise<void> { 259 const asyncRes = defer<void>(); 260 this.pendingResetTraceProcessors.push(asyncRes); 261 const rpc = TraceProcessorRpc.create(); 262 rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR; 263 const args = rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs(); 264 args.dropTrackEventDataBefore = cropTrackEvents ? 265 ResetTraceProcessorArgs.DropTrackEventDataBefore 266 .TRACK_EVENT_RANGE_OF_INTEREST : 267 ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP; 268 args.ingestFtraceInRawTable = ingestFtraceInRawTable; 269 args.analyzeTraceProtoContent = analyzeTraceProtoContent; 270 this.rpcSendRequest(rpc); 271 return asyncRes; 272 } 273 274 // Resets the trace processor state by destroying any table/views created by 275 // the UI after loading. 276 restoreInitialTables(): Promise<void> { 277 const asyncRes = defer<void>(); 278 this.pendingRestoreTables.push(asyncRes); 279 const rpc = TraceProcessorRpc.create(); 280 rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES; 281 this.rpcSendRequest(rpc); 282 return asyncRes; // Linearize with the worker. 283 } 284 285 // Shorthand for sending a compute metrics request to the engine. 286 async computeMetric(metrics: string[]): Promise<ComputeMetricResult> { 287 const asyncRes = defer<ComputeMetricResult>(); 288 this.pendingComputeMetrics.push(asyncRes); 289 const rpc = TraceProcessorRpc.create(); 290 rpc.request = TPM.TPM_COMPUTE_METRIC; 291 const args = rpc.computeMetricArgs = new ComputeMetricArgs(); 292 args.metricNames = metrics; 293 args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO; 294 this.rpcSendRequest(rpc); 295 return asyncRes; 296 } 297 298 // Issues a streaming query and retrieve results in batches. 299 // The returned QueryResult object will be populated over time with batches 300 // of rows (each batch conveys ~128KB of data and a variable number of rows). 301 // The caller can decide whether to wait that all batches have been received 302 // (by awaiting the returned object or calling result.waitAllRows()) or handle 303 // the rows incrementally. 304 // 305 // Example usage: 306 // const res = engine.query('SELECT foo, bar FROM table'); 307 // console.log(res.numRows()); // Will print 0 because we didn't await. 308 // await(res.waitAllRows()); 309 // console.log(res.numRows()); // Will print the total number of rows. 310 // 311 // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) { 312 // console.log(it.foo, it.bar); 313 // } 314 // 315 // Optional |tag| (usually a component name) can be provided to allow 316 // attributing trace processor workload to different UI components. 317 query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult { 318 const rpc = TraceProcessorRpc.create(); 319 rpc.request = TPM.TPM_QUERY_STREAMING; 320 rpc.queryArgs = new QueryArgs(); 321 rpc.queryArgs.sqlQuery = sqlQuery; 322 if (tag) { 323 rpc.queryArgs.tag = tag; 324 } 325 const result = createQueryResult({ 326 query: sqlQuery, 327 }); 328 this.pendingQueries.push(result); 329 this.rpcSendRequest(rpc); 330 return result; 331 } 332 333 isMetatracingEnabled(): boolean { 334 return this._isMetatracingEnabled; 335 } 336 337 enableMetatrace(categories?: perfetto.protos.MetatraceCategories) { 338 const rpc = TraceProcessorRpc.create(); 339 rpc.request = TPM.TPM_ENABLE_METATRACE; 340 if (categories) { 341 rpc.enableMetatraceArgs = new perfetto.protos.EnableMetatraceArgs(); 342 rpc.enableMetatraceArgs.categories = categories; 343 } 344 this._isMetatracingEnabled = true; 345 this.rpcSendRequest(rpc); 346 } 347 348 stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> { 349 // If we are already finalising a metatrace, ignore the request. 350 if (this.pendingReadMetatrace) { 351 return Promise.reject(new Error('Already finalising a metatrace')); 352 } 353 354 const result = defer<DisableAndReadMetatraceResult>(); 355 356 const rpc = TraceProcessorRpc.create(); 357 rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE; 358 this._isMetatracingEnabled = false; 359 this.pendingReadMetatrace = result; 360 this.rpcSendRequest(rpc); 361 return result; 362 } 363 364 // Marshals the TraceProcessorRpc request arguments and sends the request 365 // to the concrete Engine (Wasm or HTTP). 366 private rpcSendRequest(rpc: TraceProcessorRpc) { 367 rpc.seq = this.txSeqId++; 368 // Each message is wrapped in a TraceProcessorRpcStream to add the varint 369 // preamble with the size, which allows tokenization on the other end. 370 const outerProto = TraceProcessorRpcStream.create(); 371 outerProto.msg.push(rpc); 372 const buf = TraceProcessorRpcStream.encode(outerProto).finish(); 373 this.loadingTracker.beginLoading(); 374 this.rpcSendRequestBytes(buf); 375 } 376 377 // TODO(hjd): When streaming must invalidate this somehow. 378 async getCpus(): Promise<number[]> { 379 if (!this._cpus) { 380 const cpus = []; 381 const queryRes = await this.query( 382 'select distinct(cpu) as cpu from sched order by cpu;'); 383 for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) { 384 cpus.push(it.cpu); 385 } 386 this._cpus = cpus; 387 } 388 return this._cpus; 389 } 390 391 async getNumberOfGpus(): Promise<number> { 392 if (!this._numGpus) { 393 const result = await this.query(` 394 select count(distinct(gpu_id)) as gpuCount 395 from gpu_counter_track 396 where name = 'gpufreq'; 397 `); 398 this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount; 399 } 400 return this._numGpus; 401 } 402 403 // TODO: This should live in code that's more specific to chrome, instead of 404 // in engine. 405 async getNumberOfProcesses(): Promise<number> { 406 const result = await this.query('select count(*) as cnt from process;'); 407 return result.firstRow({cnt: NUM}).cnt; 408 } 409 410 getProxy(tag: string): EngineProxy { 411 return new EngineProxy(this, tag); 412 } 413} 414 415// Lightweight wrapper over Engine exposing only `query` method and annotating 416// all queries going through it with a tag. 417export class EngineProxy { 418 private engine: Engine; 419 private tag: string; 420 421 constructor(engine: Engine, tag: string) { 422 this.engine = engine; 423 this.tag = tag; 424 } 425 426 query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult { 427 return this.engine.query(sqlQuery, tag || this.tag); 428 } 429 430 get engineId(): string { 431 return this.engine.id; 432 } 433} 434