1// Copyright (C) 2018 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import '../tracks/all_controller';
16
17import {assertExists, assertTrue} from '../base/logging';
18import {
19  Actions,
20  DeferredAction,
21} from '../common/actions';
22import {TRACE_MARGIN_TIME_S} from '../common/constants';
23import {Engine, QueryError} from '../common/engine';
24import {HttpRpcEngine} from '../common/http_rpc_engine';
25import {slowlyCountRows} from '../common/query_iterator';
26import {EngineMode} from '../common/state';
27import {toNs, toNsCeil, toNsFloor} from '../common/time';
28import {TimeSpan} from '../common/time';
29import {
30  createWasmEngine,
31  destroyWasmEngine,
32  WasmEngineProxy
33} from '../common/wasm_engine_proxy';
34import {QuantizedLoad, ThreadDesc} from '../frontend/globals';
35
36import {
37  CounterAggregationController
38} from './aggregation/counter_aggregation_controller';
39import {
40  CpuAggregationController
41} from './aggregation/cpu_aggregation_controller';
42import {
43  CpuByProcessAggregationController
44} from './aggregation/cpu_by_process_aggregation_controller';
45import {
46  SliceAggregationController
47} from './aggregation/slice_aggregation_controller';
48import {
49  ThreadAggregationController
50} from './aggregation/thread_aggregation_controller';
51import {Child, Children, Controller} from './controller';
52import {
53  CpuProfileController,
54  CpuProfileControllerArgs
55} from './cpu_profile_controller';
56import {
57  FlowEventsController,
58  FlowEventsControllerArgs
59} from './flow_events_controller';
60import {globals} from './globals';
61import {
62  HeapProfileController,
63  HeapProfileControllerArgs
64} from './heap_profile_controller';
65import {LoadingManager} from './loading_manager';
66import {LogsController} from './logs_controller';
67import {MetricsController} from './metrics_controller';
68import {QueryController, QueryControllerArgs} from './query_controller';
69import {SearchController} from './search_controller';
70import {
71  SelectionController,
72  SelectionControllerArgs
73} from './selection_controller';
74import {
75  TraceErrorController,
76} from './trace_error_controller';
77import {
78  TraceBufferStream,
79  TraceFileStream,
80  TraceHttpStream,
81  TraceStream
82} from './trace_stream';
83import {TrackControllerArgs, trackControllerRegistry} from './track_controller';
84import {decideTracks} from './track_decider';
85
86type States = 'init'|'loading_trace'|'ready';
87
88// TraceController handles handshakes with the frontend for everything that
89// concerns a single trace. It owns the WASM trace processor engine, handles
90// tracks data and SQL queries. There is one TraceController instance for each
91// trace opened in the UI (for now only one trace is supported).
92export class TraceController extends Controller<States> {
93  private readonly engineId: string;
94  private engine?: Engine;
95
96  constructor(engineId: string) {
97    super('init');
98    this.engineId = engineId;
99  }
100
101  onDestroy() {
102    if (this.engine instanceof WasmEngineProxy) {
103      destroyWasmEngine(this.engine.id);
104    }
105  }
106
107  run() {
108    const engineCfg = assertExists(globals.state.engines[this.engineId]);
109    switch (this.state) {
110      case 'init':
111        this.loadTrace()
112            .then(mode => {
113              globals.dispatch(Actions.setEngineReady({
114                engineId: this.engineId,
115                ready: true,
116                mode,
117              }));
118            })
119            .catch(err => {
120              this.updateStatus(`${err}`);
121              throw err;
122            });
123        this.updateStatus('Opening trace');
124        this.setState('loading_trace');
125        break;
126
127      case 'loading_trace':
128        // Stay in this state until loadTrace() returns and marks the engine as
129        // ready.
130        if (this.engine === undefined || !engineCfg.ready) return;
131        this.setState('ready');
132        break;
133
134      case 'ready':
135        // At this point we are ready to serve queries and handle tracks.
136        const engine = assertExists(this.engine);
137        assertTrue(engineCfg.ready);
138        const childControllers: Children = [];
139
140        // Create a TrackController for each track.
141        for (const trackId of Object.keys(globals.state.tracks)) {
142          const trackCfg = globals.state.tracks[trackId];
143          if (trackCfg.engineId !== this.engineId) continue;
144          if (!trackControllerRegistry.has(trackCfg.kind)) continue;
145          const trackCtlFactory = trackControllerRegistry.get(trackCfg.kind);
146          const trackArgs: TrackControllerArgs = {trackId, engine};
147          childControllers.push(Child(trackId, trackCtlFactory, trackArgs));
148        }
149
150        // Create a QueryController for each query.
151        for (const queryId of Object.keys(globals.state.queries)) {
152          const queryArgs: QueryControllerArgs = {queryId, engine};
153          childControllers.push(Child(queryId, QueryController, queryArgs));
154        }
155
156        const selectionArgs: SelectionControllerArgs = {engine};
157        childControllers.push(
158            Child('selection', SelectionController, selectionArgs));
159
160        const flowEventsArgs: FlowEventsControllerArgs = {engine};
161        childControllers.push(
162            Child('flowEvents', FlowEventsController, flowEventsArgs));
163
164        const cpuProfileArgs: CpuProfileControllerArgs = {engine};
165        childControllers.push(
166            Child('cpuProfile', CpuProfileController, cpuProfileArgs));
167
168        const heapProfileArgs: HeapProfileControllerArgs = {engine};
169        childControllers.push(
170            Child('heapProfile', HeapProfileController, heapProfileArgs));
171        childControllers.push(Child(
172            'cpu_aggregation',
173            CpuAggregationController,
174            {engine, kind: 'cpu_aggregation'}));
175        childControllers.push(Child(
176            'thread_aggregation',
177            ThreadAggregationController,
178            {engine, kind: 'thread_state_aggregation'}));
179        childControllers.push(Child(
180            'cpu_process_aggregation',
181            CpuByProcessAggregationController,
182            {engine, kind: 'cpu_by_process_aggregation'}));
183        childControllers.push(Child(
184            'slice_aggregation',
185            SliceAggregationController,
186            {engine, kind: 'slice_aggregation'}));
187        childControllers.push(Child(
188            'counter_aggregation',
189            CounterAggregationController,
190            {engine, kind: 'counter_aggregation'}));
191        childControllers.push(Child('search', SearchController, {
192          engine,
193          app: globals,
194        }));
195
196        childControllers.push(Child('logs', LogsController, {
197          engine,
198          app: globals,
199        }));
200        childControllers.push(
201            Child('traceError', TraceErrorController, {engine}));
202        childControllers.push(Child('metrics', MetricsController, {engine}));
203        return childControllers;
204
205      default:
206        throw new Error(`unknown state ${this.state}`);
207    }
208    return;
209  }
210
211  private async loadTrace(): Promise<EngineMode> {
212    this.updateStatus('Creating trace processor');
213    // Check if there is any instance of the trace_processor_shell running in
214    // HTTP RPC mode (i.e. trace_processor_shell -D).
215    let engineMode: EngineMode;
216    let useRpc = false;
217    if (globals.state.newEngineMode === 'USE_HTTP_RPC_IF_AVAILABLE') {
218      useRpc = (await HttpRpcEngine.checkConnection()).connected;
219    }
220    if (useRpc) {
221      console.log('Opening trace using native accelerator over HTTP+RPC');
222      engineMode = 'HTTP_RPC';
223      const engine =
224          new HttpRpcEngine(this.engineId, LoadingManager.getInstance);
225      engine.errorHandler = (err) => {
226        globals.dispatch(
227            Actions.setEngineFailed({mode: 'HTTP_RPC', failure: `${err}`}));
228        throw err;
229      };
230      this.engine = engine;
231    } else {
232      console.log('Opening trace using built-in WASM engine');
233      engineMode = 'WASM';
234      this.engine = new WasmEngineProxy(
235          this.engineId,
236          createWasmEngine(this.engineId),
237          LoadingManager.getInstance);
238    }
239
240    globals.dispatch(Actions.setEngineReady({
241      engineId: this.engineId,
242      ready: false,
243      mode: engineMode,
244    }));
245    const engineCfg = assertExists(globals.state.engines[this.engineId]);
246    let traceStream: TraceStream|undefined;
247    if (engineCfg.source.type === 'FILE') {
248      traceStream = new TraceFileStream(engineCfg.source.file);
249    } else if (engineCfg.source.type === 'ARRAY_BUFFER') {
250      traceStream = new TraceBufferStream(engineCfg.source.buffer);
251    } else if (engineCfg.source.type === 'URL') {
252      traceStream = new TraceHttpStream(engineCfg.source.url);
253    } else if (engineCfg.source.type === 'HTTP_RPC') {
254      traceStream = undefined;
255    } else {
256      throw new Error(`Unknown source: ${JSON.stringify(engineCfg.source)}`);
257    }
258
259    // |traceStream| can be undefined in the case when we are using the external
260    // HTTP+RPC endpoint and the trace processor instance has already loaded
261    // a trace (because it was passed as a cmdline argument to
262    // trace_processor_shell). In this case we don't want the UI to load any
263    // file/stream and we just want to jump to the loading phase.
264    if (traceStream !== undefined) {
265      const tStart = performance.now();
266      for (;;) {
267        const res = await traceStream.readChunk();
268        await this.engine.parse(res.data);
269        const elapsed = (performance.now() - tStart) / 1000;
270        let status = 'Loading trace ';
271        if (res.bytesTotal > 0) {
272          const progress = Math.round(res.bytesRead / res.bytesTotal * 100);
273          status += `${progress}%`;
274        } else {
275          status += `${Math.round(res.bytesRead / 1e6)} MB`;
276        }
277        status += ` - ${Math.ceil(res.bytesRead / elapsed / 1e6)} MB/s`;
278        this.updateStatus(status);
279        if (res.eof) break;
280      }
281      await this.engine.notifyEof();
282    } else {
283      assertTrue(this.engine instanceof HttpRpcEngine);
284      await this.engine.restoreInitialTables();
285    }
286
287    const traceTime = await this.engine.getTraceTimeBounds();
288    let startSec = traceTime.start;
289    let endSec = traceTime.end;
290    startSec -= TRACE_MARGIN_TIME_S;
291    endSec += TRACE_MARGIN_TIME_S;
292    const traceTimeState = {
293      startSec,
294      endSec,
295    };
296    const actions: DeferredAction[] = [
297      Actions.setTraceTime(traceTimeState),
298      Actions.navigate({route: '/viewer'}),
299    ];
300
301    let visibleStartSec = startSec;
302    let visibleEndSec = endSec;
303    const mdTime = await this.engine.getTracingMetadataTimeBounds();
304    // make sure the bounds hold
305    if (Math.max(visibleStartSec, mdTime.start - TRACE_MARGIN_TIME_S) <
306        Math.min(visibleEndSec, mdTime.end + TRACE_MARGIN_TIME_S)) {
307      visibleStartSec =
308          Math.max(visibleStartSec, mdTime.start - TRACE_MARGIN_TIME_S);
309      visibleEndSec = Math.min(visibleEndSec, mdTime.end + TRACE_MARGIN_TIME_S);
310    }
311
312    // We don't know the resolution at this point. However this will be
313    // replaced in 50ms so a guess is fine.
314    const resolution = (visibleStartSec - visibleEndSec) / 1000;
315    actions.push(Actions.setVisibleTraceTime({
316      startSec: visibleStartSec,
317      endSec: visibleEndSec,
318      lastUpdate: Date.now() / 1000,
319      resolution
320    }));
321
322    globals.dispatchMultiple(actions);
323
324    // Make sure the helper views are available before we start adding tracks.
325    await this.initialiseHelperViews();
326
327    {
328      // When we reload from a permalink don't create extra tracks:
329      const {pinnedTracks, tracks} = globals.state;
330      if (!pinnedTracks.length && !Object.keys(tracks).length) {
331        await this.listTracks();
332      }
333    }
334
335    await this.listThreads();
336    await this.loadTimelineOverview(traceTime);
337    globals.dispatch(Actions.sortThreadTracks({}));
338    await this.selectFirstHeapProfile();
339
340    return engineMode;
341  }
342
343  private async selectFirstHeapProfile() {
344    const query = `select * from
345    (select distinct(ts) as ts, 'native' as type,
346        upid from heap_profile_allocation
347        union
348        select distinct(graph_sample_ts) as ts, 'graph' as type, upid from
349        heap_graph_object) order by ts limit 1`;
350    const profile = await assertExists(this.engine).query(query);
351    if (profile.numRecords !== 1) return;
352    const ts = profile.columns[0].longValues![0];
353    const type = profile.columns[1].stringValues![0];
354    const upid = profile.columns[2].longValues![0];
355    globals.dispatch(Actions.selectHeapProfile({id: 0, upid, ts, type}));
356  }
357
358  private async listTracks() {
359    this.updateStatus('Loading tracks');
360    const engine = assertExists<Engine>(this.engine);
361    const actions = await decideTracks(this.engineId, engine);
362    globals.dispatchMultiple(actions);
363  }
364
365  private async listThreads() {
366    this.updateStatus('Reading thread list');
367    const sqlQuery = `select utid, tid, pid, thread.name,
368        ifnull(
369          case when length(process.name) > 0 then process.name else null end,
370          thread.name),
371        process.cmdline
372        from (select * from thread order by upid) as thread
373        left join (select * from process order by upid) as process
374        using(upid)`;
375    const threadRows = await assertExists(this.engine).query(sqlQuery);
376    const threads: ThreadDesc[] = [];
377    for (let i = 0; i < slowlyCountRows(threadRows); i++) {
378      const utid = threadRows.columns[0].longValues![i];
379      const tid = threadRows.columns[1].longValues![i];
380      const pid = threadRows.columns[2].longValues![i];
381      const threadName = threadRows.columns[3].stringValues![i];
382      const procName = threadRows.columns[4].stringValues![i];
383      const cmdline = threadRows.columns[5].stringValues![i];
384      threads.push({utid, tid, threadName, pid, procName, cmdline});
385    }  // for (record ...)
386    globals.publish('Threads', threads);
387  }
388
389  private async loadTimelineOverview(traceTime: TimeSpan) {
390    const engine = assertExists<Engine>(this.engine);
391    const numSteps = 100;
392    const stepSec = traceTime.duration / numSteps;
393    let hasSchedOverview = false;
394    for (let step = 0; step < numSteps; step++) {
395      this.updateStatus(
396          'Loading overview ' +
397          `${Math.round((step + 1) / numSteps * 1000) / 10}%`);
398      const startSec = traceTime.start + step * stepSec;
399      const startNs = toNsFloor(startSec);
400      const endSec = startSec + stepSec;
401      const endNs = toNsCeil(endSec);
402
403      // Sched overview.
404      const schedRows = await engine.query(
405          `select sum(dur)/${stepSec}/1e9, cpu from sched ` +
406          `where ts >= ${startNs} and ts < ${endNs} and utid != 0 ` +
407          'group by cpu order by cpu');
408      const schedData: {[key: string]: QuantizedLoad} = {};
409      for (let i = 0; i < slowlyCountRows(schedRows); i++) {
410        const load = schedRows.columns[0].doubleValues![i];
411        const cpu = schedRows.columns[1].longValues![i];
412        schedData[cpu] = {startSec, endSec, load};
413        hasSchedOverview = true;
414      }  // for (record ...)
415      globals.publish('OverviewData', schedData);
416    }  // for (step ...)
417
418    if (hasSchedOverview) {
419      return;
420    }
421
422    // Slices overview.
423    const traceStartNs = toNs(traceTime.start);
424    const stepSecNs = toNs(stepSec);
425    const sliceSummaryQuery = await engine.query(`select
426           bucket,
427           upid,
428           sum(utid_sum) / cast(${stepSecNs} as float) as upid_sum
429         from thread
430         inner join (
431           select
432             cast((ts - ${traceStartNs})/${stepSecNs} as int) as bucket,
433             sum(dur) as utid_sum,
434             utid
435           from slice
436           inner join thread_track on slice.track_id = thread_track.id
437           group by bucket, utid
438         ) using(utid)
439         group by bucket, upid`);
440
441    const slicesData: {[key: string]: QuantizedLoad[]} = {};
442    for (let i = 0; i < slowlyCountRows(sliceSummaryQuery); i++) {
443      const bucket = sliceSummaryQuery.columns[0].longValues![i];
444      const upid = sliceSummaryQuery.columns[1].longValues![i];
445      const load = sliceSummaryQuery.columns[2].doubleValues![i];
446
447      const startSec = traceTime.start + stepSec * bucket;
448      const endSec = startSec + stepSec;
449
450      const upidStr = upid.toString();
451      let loadArray = slicesData[upidStr];
452      if (loadArray === undefined) {
453        loadArray = slicesData[upidStr] = [];
454      }
455      loadArray.push({startSec, endSec, load});
456    }
457    globals.publish('OverviewData', slicesData);
458  }
459
460  async initialiseHelperViews() {
461    const engine = assertExists<Engine>(this.engine);
462
463    this.updateStatus('Creating annotation counter track table');
464    // Create the helper tables for all the annotations related data.
465    // NULL in min/max means "figure it out per track in the usual way".
466    await engine.query(`
467      CREATE TABLE annotation_counter_track(
468        id INTEGER PRIMARY KEY,
469        name STRING,
470        __metric_name STRING,
471        upid INTEGER,
472        min_value DOUBLE,
473        max_value DOUBLE
474      );
475    `);
476    this.updateStatus('Creating annotation slice track table');
477    await engine.query(`
478      CREATE TABLE annotation_slice_track(
479        id INTEGER PRIMARY KEY,
480        name STRING,
481        __metric_name STRING,
482        upid INTEGER
483      );
484    `);
485
486    this.updateStatus('Creating annotation counter table');
487    await engine.query(`
488      CREATE TABLE annotation_counter(
489        id BIG INT,
490        track_id INT,
491        ts BIG INT,
492        value DOUBLE,
493        PRIMARY KEY (track_id, ts)
494      ) WITHOUT ROWID;
495    `);
496    this.updateStatus('Creating annotation slice table');
497    await engine.query(`
498      CREATE TABLE annotation_slice(
499        id INTEGER PRIMARY KEY,
500        track_id INT,
501        ts BIG INT,
502        dur BIG INT,
503        depth INT,
504        cat STRING,
505        name STRING,
506        UNIQUE(track_id, ts)
507      );
508    `);
509
510    for (const metric
511             of ['android_startup',
512                 'android_ion',
513                 'android_dma_heap',
514                 'android_thread_time_in_state',
515                 'android_surfaceflinger',
516                 'android_batt',
517                 'android_sysui_cuj',
518                 'android_jank']) {
519      this.updateStatus(`Computing ${metric} metric`);
520      try {
521        // We don't care about the actual result of metric here as we are just
522        // interested in the annotation tracks.
523        await engine.computeMetric([metric]);
524      } catch (e) {
525        if (e instanceof QueryError) {
526          globals.publish('MetricError', 'MetricError: ' + e.message);
527          continue;
528        } else {
529          throw e;
530        }
531      }
532
533      this.updateStatus(`Inserting data for ${metric} metric`);
534      try {
535        const result = await engine.query(`pragma table_info(${metric}_event)`);
536        let hasSliceName = false;
537        let hasDur = false;
538        let hasUpid = false;
539        for (let i = 0; i < slowlyCountRows(result); i++) {
540          const name = result.columns[1].stringValues![i];
541          hasSliceName = hasSliceName || name === 'slice_name';
542          hasDur = hasDur || name === 'dur';
543          hasUpid = hasUpid || name === 'upid';
544        }
545
546        const upidColumnSelect = hasUpid ? 'upid' : '0 AS upid';
547        const upidColumnWhere = hasUpid ? 'upid' : '0';
548        if (hasSliceName && hasDur) {
549          await engine.query(`
550            INSERT INTO annotation_slice_track(name, __metric_name, upid)
551            SELECT DISTINCT
552              track_name,
553              '${metric}' as metric_name,
554              ${upidColumnSelect}
555            FROM ${metric}_event
556            WHERE track_type = 'slice'
557          `);
558          await engine.query(`
559            INSERT INTO annotation_slice(track_id, ts, dur, depth, cat, name)
560            SELECT
561              t.id AS track_id,
562              ts,
563              dur,
564              0 AS depth,
565              a.track_name as cat,
566              slice_name AS name
567            FROM ${metric}_event a
568            JOIN annotation_slice_track t
569            ON a.track_name = t.name AND t.__metric_name = '${metric}'
570            ORDER BY t.id, ts
571          `);
572        }
573
574        const hasValue = result.columnDescriptors.some(x => x.name === 'value');
575        if (hasValue) {
576          const minMax = await engine.query(`
577          SELECT MIN(value) as min_value, MAX(value) as max_value
578          FROM ${metric}_event
579          WHERE ${upidColumnWhere} != 0`);
580          const min = minMax.columns[0].longValues![0];
581          const max = minMax.columns[1].longValues![0];
582          await engine.query(`
583          INSERT INTO annotation_counter_track(
584            name, __metric_name, min_value, max_value, upid)
585          SELECT DISTINCT
586            track_name,
587            '${metric}' as metric_name,
588            CASE ${upidColumnWhere} WHEN 0 THEN NULL ELSE ${min} END,
589            CASE ${upidColumnWhere} WHEN 0 THEN NULL ELSE ${max} END,
590            ${upidColumnSelect}
591          FROM ${metric}_event
592          WHERE track_type = 'counter'
593        `);
594          await engine.query(`
595          INSERT INTO annotation_counter(id, track_id, ts, value)
596          SELECT
597            -1 as id,
598            t.id AS track_id,
599            ts,
600            value
601          FROM ${metric}_event a
602          JOIN annotation_counter_track t
603          ON a.track_name = t.name AND t.__metric_name = '${metric}'
604          ORDER BY t.id, ts
605        `);
606        }
607      } catch (e) {
608        if (e instanceof QueryError) {
609          globals.publish('MetricError', 'MetricError: ' + e.message);
610        } else {
611          throw e;
612        }
613      }
614    }
615  }
616
617  private updateStatus(msg: string): void {
618    globals.dispatch(Actions.updateStatus({
619      msg,
620      timestamp: Date.now() / 1000,
621    }));
622  }
623}
624
625
626