1// Copyright (C) 2018 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import {defer, Deferred} from './deferred';
16import {assertExists, assertTrue} from './logging';
17import {perfetto} from '../../deps_build/trace_processor/ui/tsc/gen/protos';
18import {ProtoRingBuffer} from './proto_ring_buffer';
19import {
20  ComputeMetricArgs,
21  ComputeMetricResult,
22  DisableAndReadMetatraceResult,
23  QueryArgs,
24  ResetTraceProcessorArgs,
25} from './protos';
26import {
27  createQueryResult,
28  NUM,
29  QueryError,
30  QueryResult,
31  WritableQueryResult,
32} from './query_result';
33import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc;
34import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream;
35import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod;
36
37export interface LoadingTracker {
38  beginLoading(): void;
39  endLoading(): void;
40}
41
42export class NullLoadingTracker implements LoadingTracker {
43  beginLoading(): void {}
44  endLoading(): void {}
45}
46
47
48// This is used to skip the decoding of queryResult from protobufjs and deal
49// with it ourselves. See the comment below around `QueryResult.decode = ...`.
50interface QueryResultBypass {
51  rawQueryResult: Uint8Array;
52}
53
54export interface TraceProcessorConfig {
55  cropTrackEvents: boolean;
56  ingestFtraceInRawTable: boolean;
57  analyzeTraceProtoContent: boolean;
58}
59
60// Abstract interface of a trace proccessor.
61// This is the TypeScript equivalent of src/trace_processor/rpc.h.
62// There are two concrete implementations:
63//   1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
64//   2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
65//      and interacts via fetch().
66// In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
67// The derived class is only expected to deal with these two functions:
68// 1. Implement the abstract rpcSendRequestBytes() function, sending the
69//    proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
70// 2. Call onRpcResponseBytes() when response data is received.
71export abstract class Engine {
72  abstract readonly id: string;
73  private _cpus?: number[];
74  private _numGpus?: number;
75  private loadingTracker: LoadingTracker;
76  private txSeqId = 0;
77  private rxSeqId = 0;
78  private rxBuf = new ProtoRingBuffer();
79  private pendingParses = new Array<Deferred<void>>();
80  private pendingEOFs = new Array<Deferred<void>>();
81  private pendingResetTraceProcessors = new Array<Deferred<void>>();
82  private pendingQueries = new Array<WritableQueryResult>();
83  private pendingRestoreTables = new Array<Deferred<void>>();
84  private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>();
85  private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>;
86  private _isMetatracingEnabled = false;
87
88  constructor(tracker?: LoadingTracker) {
89    this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
90  }
91
92  // Called to send data to the TraceProcessor instance. This turns into a
93  // postMessage() or a HTTP request, depending on the Engine implementation.
94  abstract rpcSendRequestBytes(data: Uint8Array): void;
95
96  // Called when an inbound message is received by the Engine implementation
97  // (e.g. onmessage for the Wasm case, on when HTTP replies are received for
98  // the HTTP+RPC case).
99  onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
100    // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
101    // is returned back by readMessage() (% subarray()-ing it) and held onto by
102    // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
103    // because every response creates a new buffer.
104    this.rxBuf.append(dataWillBeRetained);
105    for (;;) {
106      const msg = this.rxBuf.readMessage();
107      if (msg === undefined) break;
108      this.onRpcResponseMessage(msg);
109    }
110  }
111
112  // Parses a response message.
113  // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
114  // proto-encoded message (without the proto preamble and varint size).
115  private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
116    // Here we override the protobufjs-generated code to skip the parsing of the
117    // new streaming QueryResult and instead passing it through like a buffer.
118    // This is the overall problem: All trace processor responses are wrapped
119    // into a perfetto.protos.TraceProcessorRpc proto message. In all cases %
120    // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and
121    // give us a structured object. In the case of TPM_QUERY_STREAMING, instead,
122    // we want to deal with the proto parsing ourselves using the new
123    // QueryResult.appendResultBatch() method, because that handled streaming
124    // results more efficiently and skips several copies.
125    // By overriding the decode method below, we achieve two things:
126    // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field.
127    // 2. We stash (a view of) the original buffer into the |rawQueryResult| so
128    //    the `case TPM_QUERY_STREAMING` below can take it.
129    perfetto.protos.QueryResult.decode =
130        (reader: protobuf.Reader, length: number) => {
131          const res =
132              perfetto.protos.QueryResult.create() as {} as QueryResultBypass;
133          res.rawQueryResult =
134              reader.buf.subarray(reader.pos, reader.pos + length);
135          // All this works only if protobufjs returns the original ArrayBuffer
136          // from |rpcMsgEncoded|. It should be always the case given the
137          // current implementation. This check mainly guards against future
138          // behavioral changes of protobufjs. We don't want to accidentally
139          // hold onto some internal protobufjs buffer. We are fine holding
140          // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which
141          // is buffer-retention-friendly.
142          assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer);
143          reader.pos += length;
144          return res as {} as perfetto.protos.QueryResult;
145        };
146
147    const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);
148
149    if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
150      throw new Error(`${rpc.fatalError}`);
151    }
152
153    // Allow restarting sequences from zero (when reloading the browser).
154    if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
155      // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
156      // graceful and actionable error.
157      throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${
158          this.rxSeqId} (ERR:rpc_seq)`);
159    }
160
161    this.rxSeqId = rpc.seq;
162
163    let isFinalResponse = true;
164
165    switch (rpc.response) {
166      case TPM.TPM_APPEND_TRACE_DATA:
167        const appendResult = assertExists(rpc.appendResult);
168        const pendingPromise = assertExists(this.pendingParses.shift());
169        if (appendResult.error && appendResult.error.length > 0) {
170          pendingPromise.reject(appendResult.error);
171        } else {
172          pendingPromise.resolve();
173        }
174        break;
175      case TPM.TPM_FINALIZE_TRACE_DATA:
176        assertExists(this.pendingEOFs.shift()).resolve();
177        break;
178      case TPM.TPM_RESET_TRACE_PROCESSOR:
179        assertExists(this.pendingResetTraceProcessors.shift()).resolve();
180        break;
181      case TPM.TPM_RESTORE_INITIAL_TABLES:
182        assertExists(this.pendingRestoreTables.shift()).resolve();
183        break;
184      case TPM.TPM_QUERY_STREAMING:
185        const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass;
186        const pendingQuery = assertExists(this.pendingQueries[0]);
187        pendingQuery.appendResultBatch(qRes.rawQueryResult);
188        if (pendingQuery.isComplete()) {
189          this.pendingQueries.shift();
190        } else {
191          isFinalResponse = false;
192        }
193        break;
194      case TPM.TPM_COMPUTE_METRIC:
195        const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
196        const pendingComputeMetric =
197            assertExists(this.pendingComputeMetrics.shift());
198        if (metricRes.error && metricRes.error.length > 0) {
199          const error =
200              new QueryError(`ComputeMetric() error: ${metricRes.error}`, {
201                query: 'COMPUTE_METRIC',
202              });
203          pendingComputeMetric.reject(error);
204        } else {
205          pendingComputeMetric.resolve(metricRes);
206        }
207        break;
208      case TPM.TPM_DISABLE_AND_READ_METATRACE:
209        const metatraceRes =
210            assertExists(rpc.metatrace) as DisableAndReadMetatraceResult;
211        assertExists(this.pendingReadMetatrace).resolve(metatraceRes);
212        this.pendingReadMetatrace = undefined;
213        break;
214      default:
215        console.log(
216            'Unexpected TraceProcessor response received: ', rpc.response);
217        break;
218    }  // switch(rpc.response);
219
220    if (isFinalResponse) {
221      this.loadingTracker.endLoading();
222    }
223  }
224
225  // TraceProcessor methods below this point.
226  // The methods below are called by the various controllers in the UI and
227  // deal with marshalling / unmarshaling requests to/from TraceProcessor.
228
229
230  // Push trace data into the engine. The engine is supposed to automatically
231  // figure out the type of the trace (JSON vs Protobuf).
232  parse(data: Uint8Array): Promise<void> {
233    const asyncRes = defer<void>();
234    this.pendingParses.push(asyncRes);
235    const rpc = TraceProcessorRpc.create();
236    rpc.request = TPM.TPM_APPEND_TRACE_DATA;
237    rpc.appendTraceData = data;
238    this.rpcSendRequest(rpc);
239    return asyncRes;  // Linearize with the worker.
240  }
241
242  // Notify the engine that we reached the end of the trace.
243  // Called after the last parse() call.
244  notifyEof(): Promise<void> {
245    const asyncRes = defer<void>();
246    this.pendingEOFs.push(asyncRes);
247    const rpc = TraceProcessorRpc.create();
248    rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
249    this.rpcSendRequest(rpc);
250    return asyncRes;  // Linearize with the worker.
251  }
252
253  // Updates the TraceProcessor Config. This method creates a new
254  // TraceProcessor instance, so it should be called before passing any trace
255  // data.
256  resetTraceProcessor(
257      {cropTrackEvents, ingestFtraceInRawTable, analyzeTraceProtoContent}:
258          TraceProcessorConfig): Promise<void> {
259    const asyncRes = defer<void>();
260    this.pendingResetTraceProcessors.push(asyncRes);
261    const rpc = TraceProcessorRpc.create();
262    rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR;
263    const args = rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs();
264    args.dropTrackEventDataBefore = cropTrackEvents ?
265        ResetTraceProcessorArgs.DropTrackEventDataBefore
266            .TRACK_EVENT_RANGE_OF_INTEREST :
267        ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP;
268    args.ingestFtraceInRawTable = ingestFtraceInRawTable;
269    args.analyzeTraceProtoContent = analyzeTraceProtoContent;
270    this.rpcSendRequest(rpc);
271    return asyncRes;
272  }
273
274  // Resets the trace processor state by destroying any table/views created by
275  // the UI after loading.
276  restoreInitialTables(): Promise<void> {
277    const asyncRes = defer<void>();
278    this.pendingRestoreTables.push(asyncRes);
279    const rpc = TraceProcessorRpc.create();
280    rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
281    this.rpcSendRequest(rpc);
282    return asyncRes;  // Linearize with the worker.
283  }
284
285  // Shorthand for sending a compute metrics request to the engine.
286  async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
287    const asyncRes = defer<ComputeMetricResult>();
288    this.pendingComputeMetrics.push(asyncRes);
289    const rpc = TraceProcessorRpc.create();
290    rpc.request = TPM.TPM_COMPUTE_METRIC;
291    const args = rpc.computeMetricArgs = new ComputeMetricArgs();
292    args.metricNames = metrics;
293    args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
294    this.rpcSendRequest(rpc);
295    return asyncRes;
296  }
297
298  // Issues a streaming query and retrieve results in batches.
299  // The returned QueryResult object will be populated over time with batches
300  // of rows (each batch conveys ~128KB of data and a variable number of rows).
301  // The caller can decide whether to wait that all batches have been received
302  // (by awaiting the returned object or calling result.waitAllRows()) or handle
303  // the rows incrementally.
304  //
305  // Example usage:
306  // const res = engine.query('SELECT foo, bar FROM table');
307  // console.log(res.numRows());  // Will print 0 because we didn't await.
308  // await(res.waitAllRows());
309  // console.log(res.numRows());  // Will print the total number of rows.
310  //
311  // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) {
312  //   console.log(it.foo, it.bar);
313  // }
314  //
315  // Optional |tag| (usually a component name) can be provided to allow
316  // attributing trace processor workload to different UI components.
317  query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult {
318    const rpc = TraceProcessorRpc.create();
319    rpc.request = TPM.TPM_QUERY_STREAMING;
320    rpc.queryArgs = new QueryArgs();
321    rpc.queryArgs.sqlQuery = sqlQuery;
322    if (tag) {
323      rpc.queryArgs.tag = tag;
324    }
325    const result = createQueryResult({
326      query: sqlQuery,
327    });
328    this.pendingQueries.push(result);
329    this.rpcSendRequest(rpc);
330    return result;
331  }
332
333  isMetatracingEnabled(): boolean {
334    return this._isMetatracingEnabled;
335  }
336
337  enableMetatrace(categories?: perfetto.protos.MetatraceCategories) {
338    const rpc = TraceProcessorRpc.create();
339    rpc.request = TPM.TPM_ENABLE_METATRACE;
340    if (categories) {
341      rpc.enableMetatraceArgs = new perfetto.protos.EnableMetatraceArgs();
342      rpc.enableMetatraceArgs.categories = categories;
343    }
344    this._isMetatracingEnabled = true;
345    this.rpcSendRequest(rpc);
346  }
347
348  stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> {
349    // If we are already finalising a metatrace, ignore the request.
350    if (this.pendingReadMetatrace) {
351      return Promise.reject(new Error('Already finalising a metatrace'));
352    }
353
354    const result = defer<DisableAndReadMetatraceResult>();
355
356    const rpc = TraceProcessorRpc.create();
357    rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE;
358    this._isMetatracingEnabled = false;
359    this.pendingReadMetatrace = result;
360    this.rpcSendRequest(rpc);
361    return result;
362  }
363
364  // Marshals the TraceProcessorRpc request arguments and sends the request
365  // to the concrete Engine (Wasm or HTTP).
366  private rpcSendRequest(rpc: TraceProcessorRpc) {
367    rpc.seq = this.txSeqId++;
368    // Each message is wrapped in a TraceProcessorRpcStream to add the varint
369    // preamble with the size, which allows tokenization on the other end.
370    const outerProto = TraceProcessorRpcStream.create();
371    outerProto.msg.push(rpc);
372    const buf = TraceProcessorRpcStream.encode(outerProto).finish();
373    this.loadingTracker.beginLoading();
374    this.rpcSendRequestBytes(buf);
375  }
376
377  // TODO(hjd): When streaming must invalidate this somehow.
378  async getCpus(): Promise<number[]> {
379    if (!this._cpus) {
380      const cpus = [];
381      const queryRes = await this.query(
382          'select distinct(cpu) as cpu from sched order by cpu;');
383      for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) {
384        cpus.push(it.cpu);
385      }
386      this._cpus = cpus;
387    }
388    return this._cpus;
389  }
390
391  async getNumberOfGpus(): Promise<number> {
392    if (!this._numGpus) {
393      const result = await this.query(`
394        select count(distinct(gpu_id)) as gpuCount
395        from gpu_counter_track
396        where name = 'gpufreq';
397      `);
398      this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount;
399    }
400    return this._numGpus;
401  }
402
403  // TODO: This should live in code that's more specific to chrome, instead of
404  // in engine.
405  async getNumberOfProcesses(): Promise<number> {
406    const result = await this.query('select count(*) as cnt from process;');
407    return result.firstRow({cnt: NUM}).cnt;
408  }
409
410  getProxy(tag: string): EngineProxy {
411    return new EngineProxy(this, tag);
412  }
413}
414
415// Lightweight wrapper over Engine exposing only `query` method and annotating
416// all queries going through it with a tag.
417export class EngineProxy {
418  private engine: Engine;
419  private tag: string;
420
421  constructor(engine: Engine, tag: string) {
422    this.engine = engine;
423    this.tag = tag;
424  }
425
426  query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult {
427    return this.engine.query(sqlQuery, tag || this.tag);
428  }
429
430  get engineId(): string {
431    return this.engine.id;
432  }
433}
434