1// Copyright (C) 2018 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import '../tracks/all_controller'; 16 17import {assertExists, assertTrue} from '../base/logging'; 18import { 19 Actions, 20 DeferredAction, 21} from '../common/actions'; 22import {TRACE_MARGIN_TIME_S} from '../common/constants'; 23import {Engine, QueryError} from '../common/engine'; 24import {HttpRpcEngine} from '../common/http_rpc_engine'; 25import {slowlyCountRows} from '../common/query_iterator'; 26import {EngineMode} from '../common/state'; 27import {toNs, toNsCeil, toNsFloor} from '../common/time'; 28import {TimeSpan} from '../common/time'; 29import { 30 createWasmEngine, 31 destroyWasmEngine, 32 WasmEngineProxy 33} from '../common/wasm_engine_proxy'; 34import {QuantizedLoad, ThreadDesc} from '../frontend/globals'; 35 36import { 37 CounterAggregationController 38} from './aggregation/counter_aggregation_controller'; 39import { 40 CpuAggregationController 41} from './aggregation/cpu_aggregation_controller'; 42import { 43 CpuByProcessAggregationController 44} from './aggregation/cpu_by_process_aggregation_controller'; 45import { 46 SliceAggregationController 47} from './aggregation/slice_aggregation_controller'; 48import { 49 ThreadAggregationController 50} from './aggregation/thread_aggregation_controller'; 51import {Child, Children, Controller} from './controller'; 52import { 53 CpuProfileController, 54 CpuProfileControllerArgs 55} from './cpu_profile_controller'; 56import { 57 FlowEventsController, 58 FlowEventsControllerArgs 59} from './flow_events_controller'; 60import {globals} from './globals'; 61import { 62 HeapProfileController, 63 HeapProfileControllerArgs 64} from './heap_profile_controller'; 65import {LoadingManager} from './loading_manager'; 66import {LogsController} from './logs_controller'; 67import {MetricsController} from './metrics_controller'; 68import {QueryController, QueryControllerArgs} from './query_controller'; 69import {SearchController} from './search_controller'; 70import { 71 SelectionController, 72 SelectionControllerArgs 73} from './selection_controller'; 74import { 75 TraceErrorController, 76} from './trace_error_controller'; 77import { 78 TraceBufferStream, 79 TraceFileStream, 80 TraceHttpStream, 81 TraceStream 82} from './trace_stream'; 83import {TrackControllerArgs, trackControllerRegistry} from './track_controller'; 84import {decideTracks} from './track_decider'; 85 86type States = 'init'|'loading_trace'|'ready'; 87 88// TraceController handles handshakes with the frontend for everything that 89// concerns a single trace. It owns the WASM trace processor engine, handles 90// tracks data and SQL queries. There is one TraceController instance for each 91// trace opened in the UI (for now only one trace is supported). 92export class TraceController extends Controller<States> { 93 private readonly engineId: string; 94 private engine?: Engine; 95 96 constructor(engineId: string) { 97 super('init'); 98 this.engineId = engineId; 99 } 100 101 onDestroy() { 102 if (this.engine instanceof WasmEngineProxy) { 103 destroyWasmEngine(this.engine.id); 104 } 105 } 106 107 run() { 108 const engineCfg = assertExists(globals.state.engines[this.engineId]); 109 switch (this.state) { 110 case 'init': 111 this.loadTrace() 112 .then(mode => { 113 globals.dispatch(Actions.setEngineReady({ 114 engineId: this.engineId, 115 ready: true, 116 mode, 117 })); 118 }) 119 .catch(err => { 120 this.updateStatus(`${err}`); 121 throw err; 122 }); 123 this.updateStatus('Opening trace'); 124 this.setState('loading_trace'); 125 break; 126 127 case 'loading_trace': 128 // Stay in this state until loadTrace() returns and marks the engine as 129 // ready. 130 if (this.engine === undefined || !engineCfg.ready) return; 131 this.setState('ready'); 132 break; 133 134 case 'ready': 135 // At this point we are ready to serve queries and handle tracks. 136 const engine = assertExists(this.engine); 137 assertTrue(engineCfg.ready); 138 const childControllers: Children = []; 139 140 // Create a TrackController for each track. 141 for (const trackId of Object.keys(globals.state.tracks)) { 142 const trackCfg = globals.state.tracks[trackId]; 143 if (trackCfg.engineId !== this.engineId) continue; 144 if (!trackControllerRegistry.has(trackCfg.kind)) continue; 145 const trackCtlFactory = trackControllerRegistry.get(trackCfg.kind); 146 const trackArgs: TrackControllerArgs = {trackId, engine}; 147 childControllers.push(Child(trackId, trackCtlFactory, trackArgs)); 148 } 149 150 // Create a QueryController for each query. 151 for (const queryId of Object.keys(globals.state.queries)) { 152 const queryArgs: QueryControllerArgs = {queryId, engine}; 153 childControllers.push(Child(queryId, QueryController, queryArgs)); 154 } 155 156 const selectionArgs: SelectionControllerArgs = {engine}; 157 childControllers.push( 158 Child('selection', SelectionController, selectionArgs)); 159 160 const flowEventsArgs: FlowEventsControllerArgs = {engine}; 161 childControllers.push( 162 Child('flowEvents', FlowEventsController, flowEventsArgs)); 163 164 const cpuProfileArgs: CpuProfileControllerArgs = {engine}; 165 childControllers.push( 166 Child('cpuProfile', CpuProfileController, cpuProfileArgs)); 167 168 const heapProfileArgs: HeapProfileControllerArgs = {engine}; 169 childControllers.push( 170 Child('heapProfile', HeapProfileController, heapProfileArgs)); 171 childControllers.push(Child( 172 'cpu_aggregation', 173 CpuAggregationController, 174 {engine, kind: 'cpu_aggregation'})); 175 childControllers.push(Child( 176 'thread_aggregation', 177 ThreadAggregationController, 178 {engine, kind: 'thread_state_aggregation'})); 179 childControllers.push(Child( 180 'cpu_process_aggregation', 181 CpuByProcessAggregationController, 182 {engine, kind: 'cpu_by_process_aggregation'})); 183 childControllers.push(Child( 184 'slice_aggregation', 185 SliceAggregationController, 186 {engine, kind: 'slice_aggregation'})); 187 childControllers.push(Child( 188 'counter_aggregation', 189 CounterAggregationController, 190 {engine, kind: 'counter_aggregation'})); 191 childControllers.push(Child('search', SearchController, { 192 engine, 193 app: globals, 194 })); 195 196 childControllers.push(Child('logs', LogsController, { 197 engine, 198 app: globals, 199 })); 200 childControllers.push( 201 Child('traceError', TraceErrorController, {engine})); 202 childControllers.push(Child('metrics', MetricsController, {engine})); 203 return childControllers; 204 205 default: 206 throw new Error(`unknown state ${this.state}`); 207 } 208 return; 209 } 210 211 private async loadTrace(): Promise<EngineMode> { 212 this.updateStatus('Creating trace processor'); 213 // Check if there is any instance of the trace_processor_shell running in 214 // HTTP RPC mode (i.e. trace_processor_shell -D). 215 let engineMode: EngineMode; 216 let useRpc = false; 217 if (globals.state.newEngineMode === 'USE_HTTP_RPC_IF_AVAILABLE') { 218 useRpc = (await HttpRpcEngine.checkConnection()).connected; 219 } 220 if (useRpc) { 221 console.log('Opening trace using native accelerator over HTTP+RPC'); 222 engineMode = 'HTTP_RPC'; 223 const engine = 224 new HttpRpcEngine(this.engineId, LoadingManager.getInstance); 225 engine.errorHandler = (err) => { 226 globals.dispatch( 227 Actions.setEngineFailed({mode: 'HTTP_RPC', failure: `${err}`})); 228 throw err; 229 }; 230 this.engine = engine; 231 } else { 232 console.log('Opening trace using built-in WASM engine'); 233 engineMode = 'WASM'; 234 this.engine = new WasmEngineProxy( 235 this.engineId, 236 createWasmEngine(this.engineId), 237 LoadingManager.getInstance); 238 } 239 240 globals.dispatch(Actions.setEngineReady({ 241 engineId: this.engineId, 242 ready: false, 243 mode: engineMode, 244 })); 245 const engineCfg = assertExists(globals.state.engines[this.engineId]); 246 let traceStream: TraceStream|undefined; 247 if (engineCfg.source.type === 'FILE') { 248 traceStream = new TraceFileStream(engineCfg.source.file); 249 } else if (engineCfg.source.type === 'ARRAY_BUFFER') { 250 traceStream = new TraceBufferStream(engineCfg.source.buffer); 251 } else if (engineCfg.source.type === 'URL') { 252 traceStream = new TraceHttpStream(engineCfg.source.url); 253 } else if (engineCfg.source.type === 'HTTP_RPC') { 254 traceStream = undefined; 255 } else { 256 throw new Error(`Unknown source: ${JSON.stringify(engineCfg.source)}`); 257 } 258 259 // |traceStream| can be undefined in the case when we are using the external 260 // HTTP+RPC endpoint and the trace processor instance has already loaded 261 // a trace (because it was passed as a cmdline argument to 262 // trace_processor_shell). In this case we don't want the UI to load any 263 // file/stream and we just want to jump to the loading phase. 264 if (traceStream !== undefined) { 265 const tStart = performance.now(); 266 for (;;) { 267 const res = await traceStream.readChunk(); 268 await this.engine.parse(res.data); 269 const elapsed = (performance.now() - tStart) / 1000; 270 let status = 'Loading trace '; 271 if (res.bytesTotal > 0) { 272 const progress = Math.round(res.bytesRead / res.bytesTotal * 100); 273 status += `${progress}%`; 274 } else { 275 status += `${Math.round(res.bytesRead / 1e6)} MB`; 276 } 277 status += ` - ${Math.ceil(res.bytesRead / elapsed / 1e6)} MB/s`; 278 this.updateStatus(status); 279 if (res.eof) break; 280 } 281 await this.engine.notifyEof(); 282 } else { 283 assertTrue(this.engine instanceof HttpRpcEngine); 284 await this.engine.restoreInitialTables(); 285 } 286 287 const traceTime = await this.engine.getTraceTimeBounds(); 288 let startSec = traceTime.start; 289 let endSec = traceTime.end; 290 startSec -= TRACE_MARGIN_TIME_S; 291 endSec += TRACE_MARGIN_TIME_S; 292 const traceTimeState = { 293 startSec, 294 endSec, 295 }; 296 const actions: DeferredAction[] = [ 297 Actions.setTraceTime(traceTimeState), 298 Actions.navigate({route: '/viewer'}), 299 ]; 300 301 let visibleStartSec = startSec; 302 let visibleEndSec = endSec; 303 const mdTime = await this.engine.getTracingMetadataTimeBounds(); 304 // make sure the bounds hold 305 if (Math.max(visibleStartSec, mdTime.start - TRACE_MARGIN_TIME_S) < 306 Math.min(visibleEndSec, mdTime.end + TRACE_MARGIN_TIME_S)) { 307 visibleStartSec = 308 Math.max(visibleStartSec, mdTime.start - TRACE_MARGIN_TIME_S); 309 visibleEndSec = Math.min(visibleEndSec, mdTime.end + TRACE_MARGIN_TIME_S); 310 } 311 312 // We don't know the resolution at this point. However this will be 313 // replaced in 50ms so a guess is fine. 314 const resolution = (visibleStartSec - visibleEndSec) / 1000; 315 actions.push(Actions.setVisibleTraceTime({ 316 startSec: visibleStartSec, 317 endSec: visibleEndSec, 318 lastUpdate: Date.now() / 1000, 319 resolution 320 })); 321 322 globals.dispatchMultiple(actions); 323 324 // Make sure the helper views are available before we start adding tracks. 325 await this.initialiseHelperViews(); 326 327 { 328 // When we reload from a permalink don't create extra tracks: 329 const {pinnedTracks, tracks} = globals.state; 330 if (!pinnedTracks.length && !Object.keys(tracks).length) { 331 await this.listTracks(); 332 } 333 } 334 335 await this.listThreads(); 336 await this.loadTimelineOverview(traceTime); 337 globals.dispatch(Actions.sortThreadTracks({})); 338 await this.selectFirstHeapProfile(); 339 340 return engineMode; 341 } 342 343 private async selectFirstHeapProfile() { 344 const query = `select * from 345 (select distinct(ts) as ts, 'native' as type, 346 upid from heap_profile_allocation 347 union 348 select distinct(graph_sample_ts) as ts, 'graph' as type, upid from 349 heap_graph_object) order by ts limit 1`; 350 const profile = await assertExists(this.engine).query(query); 351 if (profile.numRecords !== 1) return; 352 const ts = profile.columns[0].longValues![0]; 353 const type = profile.columns[1].stringValues![0]; 354 const upid = profile.columns[2].longValues![0]; 355 globals.dispatch(Actions.selectHeapProfile({id: 0, upid, ts, type})); 356 } 357 358 private async listTracks() { 359 this.updateStatus('Loading tracks'); 360 const engine = assertExists<Engine>(this.engine); 361 const actions = await decideTracks(this.engineId, engine); 362 globals.dispatchMultiple(actions); 363 } 364 365 private async listThreads() { 366 this.updateStatus('Reading thread list'); 367 const sqlQuery = `select utid, tid, pid, thread.name, 368 ifnull( 369 case when length(process.name) > 0 then process.name else null end, 370 thread.name), 371 process.cmdline 372 from (select * from thread order by upid) as thread 373 left join (select * from process order by upid) as process 374 using(upid)`; 375 const threadRows = await assertExists(this.engine).query(sqlQuery); 376 const threads: ThreadDesc[] = []; 377 for (let i = 0; i < slowlyCountRows(threadRows); i++) { 378 const utid = threadRows.columns[0].longValues![i]; 379 const tid = threadRows.columns[1].longValues![i]; 380 const pid = threadRows.columns[2].longValues![i]; 381 const threadName = threadRows.columns[3].stringValues![i]; 382 const procName = threadRows.columns[4].stringValues![i]; 383 const cmdline = threadRows.columns[5].stringValues![i]; 384 threads.push({utid, tid, threadName, pid, procName, cmdline}); 385 } // for (record ...) 386 globals.publish('Threads', threads); 387 } 388 389 private async loadTimelineOverview(traceTime: TimeSpan) { 390 const engine = assertExists<Engine>(this.engine); 391 const numSteps = 100; 392 const stepSec = traceTime.duration / numSteps; 393 let hasSchedOverview = false; 394 for (let step = 0; step < numSteps; step++) { 395 this.updateStatus( 396 'Loading overview ' + 397 `${Math.round((step + 1) / numSteps * 1000) / 10}%`); 398 const startSec = traceTime.start + step * stepSec; 399 const startNs = toNsFloor(startSec); 400 const endSec = startSec + stepSec; 401 const endNs = toNsCeil(endSec); 402 403 // Sched overview. 404 const schedRows = await engine.query( 405 `select sum(dur)/${stepSec}/1e9, cpu from sched ` + 406 `where ts >= ${startNs} and ts < ${endNs} and utid != 0 ` + 407 'group by cpu order by cpu'); 408 const schedData: {[key: string]: QuantizedLoad} = {}; 409 for (let i = 0; i < slowlyCountRows(schedRows); i++) { 410 const load = schedRows.columns[0].doubleValues![i]; 411 const cpu = schedRows.columns[1].longValues![i]; 412 schedData[cpu] = {startSec, endSec, load}; 413 hasSchedOverview = true; 414 } // for (record ...) 415 globals.publish('OverviewData', schedData); 416 } // for (step ...) 417 418 if (hasSchedOverview) { 419 return; 420 } 421 422 // Slices overview. 423 const traceStartNs = toNs(traceTime.start); 424 const stepSecNs = toNs(stepSec); 425 const sliceSummaryQuery = await engine.query(`select 426 bucket, 427 upid, 428 sum(utid_sum) / cast(${stepSecNs} as float) as upid_sum 429 from thread 430 inner join ( 431 select 432 cast((ts - ${traceStartNs})/${stepSecNs} as int) as bucket, 433 sum(dur) as utid_sum, 434 utid 435 from slice 436 inner join thread_track on slice.track_id = thread_track.id 437 group by bucket, utid 438 ) using(utid) 439 group by bucket, upid`); 440 441 const slicesData: {[key: string]: QuantizedLoad[]} = {}; 442 for (let i = 0; i < slowlyCountRows(sliceSummaryQuery); i++) { 443 const bucket = sliceSummaryQuery.columns[0].longValues![i]; 444 const upid = sliceSummaryQuery.columns[1].longValues![i]; 445 const load = sliceSummaryQuery.columns[2].doubleValues![i]; 446 447 const startSec = traceTime.start + stepSec * bucket; 448 const endSec = startSec + stepSec; 449 450 const upidStr = upid.toString(); 451 let loadArray = slicesData[upidStr]; 452 if (loadArray === undefined) { 453 loadArray = slicesData[upidStr] = []; 454 } 455 loadArray.push({startSec, endSec, load}); 456 } 457 globals.publish('OverviewData', slicesData); 458 } 459 460 async initialiseHelperViews() { 461 const engine = assertExists<Engine>(this.engine); 462 463 this.updateStatus('Creating annotation counter track table'); 464 // Create the helper tables for all the annotations related data. 465 // NULL in min/max means "figure it out per track in the usual way". 466 await engine.query(` 467 CREATE TABLE annotation_counter_track( 468 id INTEGER PRIMARY KEY, 469 name STRING, 470 __metric_name STRING, 471 upid INTEGER, 472 min_value DOUBLE, 473 max_value DOUBLE 474 ); 475 `); 476 this.updateStatus('Creating annotation slice track table'); 477 await engine.query(` 478 CREATE TABLE annotation_slice_track( 479 id INTEGER PRIMARY KEY, 480 name STRING, 481 __metric_name STRING, 482 upid INTEGER 483 ); 484 `); 485 486 this.updateStatus('Creating annotation counter table'); 487 await engine.query(` 488 CREATE TABLE annotation_counter( 489 id BIG INT, 490 track_id INT, 491 ts BIG INT, 492 value DOUBLE, 493 PRIMARY KEY (track_id, ts) 494 ) WITHOUT ROWID; 495 `); 496 this.updateStatus('Creating annotation slice table'); 497 await engine.query(` 498 CREATE TABLE annotation_slice( 499 id INTEGER PRIMARY KEY, 500 track_id INT, 501 ts BIG INT, 502 dur BIG INT, 503 depth INT, 504 cat STRING, 505 name STRING, 506 UNIQUE(track_id, ts) 507 ); 508 `); 509 510 for (const metric 511 of ['android_startup', 512 'android_ion', 513 'android_dma_heap', 514 'android_thread_time_in_state', 515 'android_surfaceflinger', 516 'android_batt', 517 'android_sysui_cuj', 518 'android_jank']) { 519 this.updateStatus(`Computing ${metric} metric`); 520 try { 521 // We don't care about the actual result of metric here as we are just 522 // interested in the annotation tracks. 523 await engine.computeMetric([metric]); 524 } catch (e) { 525 if (e instanceof QueryError) { 526 globals.publish('MetricError', 'MetricError: ' + e.message); 527 continue; 528 } else { 529 throw e; 530 } 531 } 532 533 this.updateStatus(`Inserting data for ${metric} metric`); 534 try { 535 const result = await engine.query(`pragma table_info(${metric}_event)`); 536 let hasSliceName = false; 537 let hasDur = false; 538 let hasUpid = false; 539 for (let i = 0; i < slowlyCountRows(result); i++) { 540 const name = result.columns[1].stringValues![i]; 541 hasSliceName = hasSliceName || name === 'slice_name'; 542 hasDur = hasDur || name === 'dur'; 543 hasUpid = hasUpid || name === 'upid'; 544 } 545 546 const upidColumnSelect = hasUpid ? 'upid' : '0 AS upid'; 547 const upidColumnWhere = hasUpid ? 'upid' : '0'; 548 if (hasSliceName && hasDur) { 549 await engine.query(` 550 INSERT INTO annotation_slice_track(name, __metric_name, upid) 551 SELECT DISTINCT 552 track_name, 553 '${metric}' as metric_name, 554 ${upidColumnSelect} 555 FROM ${metric}_event 556 WHERE track_type = 'slice' 557 `); 558 await engine.query(` 559 INSERT INTO annotation_slice(track_id, ts, dur, depth, cat, name) 560 SELECT 561 t.id AS track_id, 562 ts, 563 dur, 564 0 AS depth, 565 a.track_name as cat, 566 slice_name AS name 567 FROM ${metric}_event a 568 JOIN annotation_slice_track t 569 ON a.track_name = t.name AND t.__metric_name = '${metric}' 570 ORDER BY t.id, ts 571 `); 572 } 573 574 const hasValue = result.columnDescriptors.some(x => x.name === 'value'); 575 if (hasValue) { 576 const minMax = await engine.query(` 577 SELECT MIN(value) as min_value, MAX(value) as max_value 578 FROM ${metric}_event 579 WHERE ${upidColumnWhere} != 0`); 580 const min = minMax.columns[0].longValues![0]; 581 const max = minMax.columns[1].longValues![0]; 582 await engine.query(` 583 INSERT INTO annotation_counter_track( 584 name, __metric_name, min_value, max_value, upid) 585 SELECT DISTINCT 586 track_name, 587 '${metric}' as metric_name, 588 CASE ${upidColumnWhere} WHEN 0 THEN NULL ELSE ${min} END, 589 CASE ${upidColumnWhere} WHEN 0 THEN NULL ELSE ${max} END, 590 ${upidColumnSelect} 591 FROM ${metric}_event 592 WHERE track_type = 'counter' 593 `); 594 await engine.query(` 595 INSERT INTO annotation_counter(id, track_id, ts, value) 596 SELECT 597 -1 as id, 598 t.id AS track_id, 599 ts, 600 value 601 FROM ${metric}_event a 602 JOIN annotation_counter_track t 603 ON a.track_name = t.name AND t.__metric_name = '${metric}' 604 ORDER BY t.id, ts 605 `); 606 } 607 } catch (e) { 608 if (e instanceof QueryError) { 609 globals.publish('MetricError', 'MetricError: ' + e.message); 610 } else { 611 throw e; 612 } 613 } 614 } 615 } 616 617 private updateStatus(msg: string): void { 618 globals.dispatch(Actions.updateStatus({ 619 msg, 620 timestamp: Date.now() / 1000, 621 })); 622 } 623} 624 625 626